ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.73
Committed: Tue Mar 19 17:04:36 2013 UTC (11 years, 2 months ago) by jsr166
Branch: MAIN
Changes since 1.72: +90 -99 lines
Log Message:
no special wording for unchecked exceptions

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.locks.LockSupport;
24
25 /**
26 * A {@link Future} that may be explicitly completed (setting its
27 * value and status), and may include dependent functions and actions
28 * that trigger upon its completion.
29 *
30 * <p>When two or more threads attempt to
31 * {@link #complete complete},
32 * {@link #completeExceptionally completeExceptionally}, or
33 * {@link #cancel cancel}
34 * a CompletableFuture, only one of them succeeds.
35 *
36 * <p>Methods are available for adding dependents based on
37 * user-provided Functions, Consumers, or Runnables. The appropriate
38 * form to use depends on whether actions require arguments and/or
39 * produce results. Completion of a dependent action will trigger the
40 * completion of another CompletableFuture. Actions may also be
41 * triggered after either or both the current and another
42 * CompletableFuture complete. Multiple CompletableFutures may also
43 * be grouped as one using {@link #anyOf(CompletableFuture...)} and
44 * {@link #allOf(CompletableFuture...)}.
45 *
46 * <p>CompletableFutures themselves do not execute asynchronously.
47 * However, actions supplied for dependent completions of another
48 * CompletableFuture may do so, depending on whether they are provided
49 * via one of the <em>async</em> methods (that is, methods with names
50 * of the form <tt><var>xxx</var>Async</tt>). The <em>async</em>
51 * methods provide a way to commence asynchronous processing of an
52 * action using either a given {@link Executor} or by default the
53 * {@link ForkJoinPool#commonPool()}. To simplify monitoring,
54 * debugging, and tracking, all generated asynchronous tasks are
55 * instances of the marker interface {@link AsynchronousCompletionTask}.
56 *
57 * <p>Actions supplied for dependent completions of <em>non-async</em>
58 * methods may be performed by the thread that completes the current
59 * CompletableFuture, or by any other caller of these methods. There
60 * are no guarantees about the order of processing completions unless
61 * constrained by these methods.
62 *
63 * <p>Since (unlike {@link FutureTask}) this class has no direct
64 * control over the computation that causes it to be completed,
65 * cancellation is treated as just another form of exceptional completion.
66 * Method {@link #cancel cancel} has the same effect as
67 * {@code completeExceptionally(new CancellationException())}.
68 *
69 * <p>Upon exceptional completion (including cancellation), or when a
70 * completion entails an additional computation which terminates
71 * abruptly with an (unchecked) exception or error, then all of their
72 * dependent completions (and their dependents in turn) generally act
73 * as {@code completeExceptionally} with a {@link CompletionException}
74 * holding that exception as its cause. However, the {@link
75 * #exceptionally exceptionally} and {@link #handle handle}
76 * completions <em>are</em> able to handle exceptional completions of
77 * the CompletableFutures they depend on.
78 *
79 * <p>In case of exceptional completion with a CompletionException,
80 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
81 * {@link ExecutionException} with the same cause as held in the
82 * corresponding CompletionException. However, in these cases,
83 * methods {@link #join()} and {@link #getNow} throw the
84 * CompletionException, which simplifies usage.
85 *
86 * @author Doug Lea
87 * @since 1.8
88 */
89 public class CompletableFuture<T> implements Future<T> {
90
91 /*
92 * Overview:
93 *
94 * 1. Non-nullness of field result (set via CAS) indicates done.
95 * An AltResult is used to box null as a result, as well as to
96 * hold exceptions. Using a single field makes completion fast
97 * and simple to detect and trigger, at the expense of a lot of
98 * encoding and decoding that infiltrates many methods. One minor
99 * simplification relies on the (static) NIL (to box null results)
100 * being the only AltResult with a null exception field, so we
101 * don't usually need explicit comparisons with NIL. The CF
102 * exception propagation mechanics surrounding decoding rely on
103 * unchecked casts of decoded results really being unchecked,
104 * where user type errors are caught at point of use, as is
105 * currently the case in Java. These are highlighted by using
106 * SuppressWarnings-annotated temporaries.
107 *
108 * 2. Waiters are held in a Treiber stack similar to the one used
109 * in FutureTask, Phaser, and SynchronousQueue. See their
110 * internal documentation for algorithmic details.
111 *
112 * 3. Completions are also kept in a list/stack, and pulled off
113 * and run when completion is triggered. (We could even use the
114 * same stack as for waiters, but would give up the potential
115 * parallelism obtained because woken waiters help release/run
116 * others -- see method postComplete). Because post-processing
117 * may race with direct calls, class Completion opportunistically
118 * extends AtomicInteger so callers can claim the action via
119 * compareAndSet(0, 1). The Completion.run methods are all
120 * written a boringly similar uniform way (that sometimes includes
121 * unnecessary-looking checks, kept to maintain uniformity).
122 * There are enough dimensions upon which they differ that
123 * attempts to factor commonalities while maintaining efficiency
124 * require more lines of code than they would save.
125 *
126 * 4. The exported then/and/or methods do support a bit of
127 * factoring (see doThenApply etc). They must cope with the
128 * intrinsic races surrounding addition of a dependent action
129 * versus performing the action directly because the task is
130 * already complete. For example, a CF may not be complete upon
131 * entry, so a dependent completion is added, but by the time it
132 * is added, the target CF is complete, so must be directly
133 * executed. This is all done while avoiding unnecessary object
134 * construction in safe-bypass cases.
135 */
136
137 // preliminaries
138
139 static final class AltResult {
140 final Throwable ex; // null only for NIL
141 AltResult(Throwable ex) { this.ex = ex; }
142 }
143
144 static final AltResult NIL = new AltResult(null);
145
146 // Fields
147
148 volatile Object result; // Either the result or boxed AltResult
149 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
150 volatile CompletionNode completions; // list (Treiber stack) of completions
151
152 // Basic utilities for triggering and processing completions
153
154 /**
155 * Removes and signals all waiting threads and runs all completions.
156 */
157 final void postComplete() {
158 WaitNode q; Thread t;
159 while ((q = waiters) != null) {
160 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
161 (t = q.thread) != null) {
162 q.thread = null;
163 LockSupport.unpark(t);
164 }
165 }
166
167 CompletionNode h; Completion c;
168 while ((h = completions) != null) {
169 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
170 (c = h.completion) != null)
171 c.run();
172 }
173 }
174
175 /**
176 * Triggers completion with the encoding of the given arguments:
177 * if the exception is non-null, encodes it as a wrapped
178 * CompletionException unless it is one already. Otherwise uses
179 * the given result, boxed as NIL if null.
180 */
181 final void internalComplete(Object v, Throwable ex) {
182 if (result == null)
183 UNSAFE.compareAndSwapObject
184 (this, RESULT, null,
185 (ex == null) ? (v == null) ? NIL : v :
186 new AltResult((ex instanceof CompletionException) ? ex :
187 new CompletionException(ex)));
188 postComplete(); // help out even if not triggered
189 }
190
191 /**
192 * If triggered, helps release and/or process completions.
193 */
194 final void helpPostComplete() {
195 if (result != null)
196 postComplete();
197 }
198
199 /* ------------- waiting for completions -------------- */
200
201 /** Number of processors, for spin control */
202 static final int NCPU = Runtime.getRuntime().availableProcessors();
203
204 /**
205 * Heuristic spin value for waitingGet() before blocking on
206 * multiprocessors
207 */
208 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
209
210 /**
211 * Linked nodes to record waiting threads in a Treiber stack. See
212 * other classes such as Phaser and SynchronousQueue for more
213 * detailed explanation. This class implements ManagedBlocker to
214 * avoid starvation when blocking actions pile up in
215 * ForkJoinPools.
216 */
217 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
218 long nanos; // wait time if timed
219 final long deadline; // non-zero if timed
220 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
221 volatile Thread thread;
222 volatile WaitNode next;
223 WaitNode(boolean interruptible, long nanos, long deadline) {
224 this.thread = Thread.currentThread();
225 this.interruptControl = interruptible ? 1 : 0;
226 this.nanos = nanos;
227 this.deadline = deadline;
228 }
229 public boolean isReleasable() {
230 if (thread == null)
231 return true;
232 if (Thread.interrupted()) {
233 int i = interruptControl;
234 interruptControl = -1;
235 if (i > 0)
236 return true;
237 }
238 if (deadline != 0L &&
239 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
240 thread = null;
241 return true;
242 }
243 return false;
244 }
245 public boolean block() {
246 if (isReleasable())
247 return true;
248 else if (deadline == 0L)
249 LockSupport.park(this);
250 else if (nanos > 0L)
251 LockSupport.parkNanos(this, nanos);
252 return isReleasable();
253 }
254 }
255
256 /**
257 * Returns raw result after waiting, or null if interruptible and
258 * interrupted.
259 */
260 private Object waitingGet(boolean interruptible) {
261 WaitNode q = null;
262 boolean queued = false;
263 int spins = SPINS;
264 for (Object r;;) {
265 if ((r = result) != null) {
266 if (q != null) { // suppress unpark
267 q.thread = null;
268 if (q.interruptControl < 0) {
269 if (interruptible) {
270 removeWaiter(q);
271 return null;
272 }
273 Thread.currentThread().interrupt();
274 }
275 }
276 postComplete(); // help release others
277 return r;
278 }
279 else if (spins > 0) {
280 int rnd = ThreadLocalRandom.nextSecondarySeed();
281 if (rnd == 0)
282 rnd = ThreadLocalRandom.current().nextInt();
283 if (rnd >= 0)
284 --spins;
285 }
286 else if (q == null)
287 q = new WaitNode(interruptible, 0L, 0L);
288 else if (!queued)
289 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
290 q.next = waiters, q);
291 else if (interruptible && q.interruptControl < 0) {
292 removeWaiter(q);
293 return null;
294 }
295 else if (q.thread != null && result == null) {
296 try {
297 ForkJoinPool.managedBlock(q);
298 } catch (InterruptedException ex) {
299 q.interruptControl = -1;
300 }
301 }
302 }
303 }
304
305 /**
306 * Awaits completion or aborts on interrupt or timeout.
307 *
308 * @param nanos time to wait
309 * @return raw result
310 */
311 private Object timedAwaitDone(long nanos)
312 throws InterruptedException, TimeoutException {
313 WaitNode q = null;
314 boolean queued = false;
315 for (Object r;;) {
316 if ((r = result) != null) {
317 if (q != null) {
318 q.thread = null;
319 if (q.interruptControl < 0) {
320 removeWaiter(q);
321 throw new InterruptedException();
322 }
323 }
324 postComplete();
325 return r;
326 }
327 else if (q == null) {
328 if (nanos <= 0L)
329 throw new TimeoutException();
330 long d = System.nanoTime() + nanos;
331 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
332 }
333 else if (!queued)
334 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
335 q.next = waiters, q);
336 else if (q.interruptControl < 0) {
337 removeWaiter(q);
338 throw new InterruptedException();
339 }
340 else if (q.nanos <= 0L) {
341 if (result == null) {
342 removeWaiter(q);
343 throw new TimeoutException();
344 }
345 }
346 else if (q.thread != null && result == null) {
347 try {
348 ForkJoinPool.managedBlock(q);
349 } catch (InterruptedException ex) {
350 q.interruptControl = -1;
351 }
352 }
353 }
354 }
355
356 /**
357 * Tries to unlink a timed-out or interrupted wait node to avoid
358 * accumulating garbage. Internal nodes are simply unspliced
359 * without CAS since it is harmless if they are traversed anyway
360 * by releasers. To avoid effects of unsplicing from already
361 * removed nodes, the list is retraversed in case of an apparent
362 * race. This is slow when there are a lot of nodes, but we don't
363 * expect lists to be long enough to outweigh higher-overhead
364 * schemes.
365 */
366 private void removeWaiter(WaitNode node) {
367 if (node != null) {
368 node.thread = null;
369 retry:
370 for (;;) { // restart on removeWaiter race
371 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
372 s = q.next;
373 if (q.thread != null)
374 pred = q;
375 else if (pred != null) {
376 pred.next = s;
377 if (pred.thread == null) // check for race
378 continue retry;
379 }
380 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
381 continue retry;
382 }
383 break;
384 }
385 }
386 }
387
388 /* ------------- Async tasks -------------- */
389
390 /**
391 * A marker interface identifying asynchronous tasks produced by
392 * {@code async} methods. This may be useful for monitoring,
393 * debugging, and tracking asynchronous activities.
394 *
395 * @since 1.8
396 */
397 public static interface AsynchronousCompletionTask {
398 }
399
400 /** Base class can act as either FJ or plain Runnable */
401 abstract static class Async extends ForkJoinTask<Void>
402 implements Runnable, AsynchronousCompletionTask {
403 public final Void getRawResult() { return null; }
404 public final void setRawResult(Void v) { }
405 public final void run() { exec(); }
406 }
407
408 static final class AsyncRun extends Async {
409 final Runnable fn;
410 final CompletableFuture<Void> dst;
411 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
412 this.fn = fn; this.dst = dst;
413 }
414 public final boolean exec() {
415 CompletableFuture<Void> d; Throwable ex;
416 if ((d = this.dst) != null && d.result == null) {
417 try {
418 fn.run();
419 ex = null;
420 } catch (Throwable rex) {
421 ex = rex;
422 }
423 d.internalComplete(null, ex);
424 }
425 return true;
426 }
427 private static final long serialVersionUID = 5232453952276885070L;
428 }
429
430 static final class AsyncSupply<U> extends Async {
431 final Supplier<U> fn;
432 final CompletableFuture<U> dst;
433 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
434 this.fn = fn; this.dst = dst;
435 }
436 public final boolean exec() {
437 CompletableFuture<U> d; U u; Throwable ex;
438 if ((d = this.dst) != null && d.result == null) {
439 try {
440 u = fn.get();
441 ex = null;
442 } catch (Throwable rex) {
443 ex = rex;
444 u = null;
445 }
446 d.internalComplete(u, ex);
447 }
448 return true;
449 }
450 private static final long serialVersionUID = 5232453952276885070L;
451 }
452
453 static final class AsyncApply<T,U> extends Async {
454 final T arg;
455 final Function<? super T,? extends U> fn;
456 final CompletableFuture<U> dst;
457 AsyncApply(T arg, Function<? super T,? extends U> fn,
458 CompletableFuture<U> dst) {
459 this.arg = arg; this.fn = fn; this.dst = dst;
460 }
461 public final boolean exec() {
462 CompletableFuture<U> d; U u; Throwable ex;
463 if ((d = this.dst) != null && d.result == null) {
464 try {
465 u = fn.apply(arg);
466 ex = null;
467 } catch (Throwable rex) {
468 ex = rex;
469 u = null;
470 }
471 d.internalComplete(u, ex);
472 }
473 return true;
474 }
475 private static final long serialVersionUID = 5232453952276885070L;
476 }
477
478 static final class AsyncBiApply<T,U,V> extends Async {
479 final T arg1;
480 final U arg2;
481 final BiFunction<? super T,? super U,? extends V> fn;
482 final CompletableFuture<V> dst;
483 AsyncBiApply(T arg1, U arg2,
484 BiFunction<? super T,? super U,? extends V> fn,
485 CompletableFuture<V> dst) {
486 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
487 }
488 public final boolean exec() {
489 CompletableFuture<V> d; V v; Throwable ex;
490 if ((d = this.dst) != null && d.result == null) {
491 try {
492 v = fn.apply(arg1, arg2);
493 ex = null;
494 } catch (Throwable rex) {
495 ex = rex;
496 v = null;
497 }
498 d.internalComplete(v, ex);
499 }
500 return true;
501 }
502 private static final long serialVersionUID = 5232453952276885070L;
503 }
504
505 static final class AsyncAccept<T> extends Async {
506 final T arg;
507 final Consumer<? super T> fn;
508 final CompletableFuture<Void> dst;
509 AsyncAccept(T arg, Consumer<? super T> fn,
510 CompletableFuture<Void> dst) {
511 this.arg = arg; this.fn = fn; this.dst = dst;
512 }
513 public final boolean exec() {
514 CompletableFuture<Void> d; Throwable ex;
515 if ((d = this.dst) != null && d.result == null) {
516 try {
517 fn.accept(arg);
518 ex = null;
519 } catch (Throwable rex) {
520 ex = rex;
521 }
522 d.internalComplete(null, ex);
523 }
524 return true;
525 }
526 private static final long serialVersionUID = 5232453952276885070L;
527 }
528
529 static final class AsyncBiAccept<T,U> extends Async {
530 final T arg1;
531 final U arg2;
532 final BiConsumer<? super T,? super U> fn;
533 final CompletableFuture<Void> dst;
534 AsyncBiAccept(T arg1, U arg2,
535 BiConsumer<? super T,? super U> fn,
536 CompletableFuture<Void> dst) {
537 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
538 }
539 public final boolean exec() {
540 CompletableFuture<Void> d; Throwable ex;
541 if ((d = this.dst) != null && d.result == null) {
542 try {
543 fn.accept(arg1, arg2);
544 ex = null;
545 } catch (Throwable rex) {
546 ex = rex;
547 }
548 d.internalComplete(null, ex);
549 }
550 return true;
551 }
552 private static final long serialVersionUID = 5232453952276885070L;
553 }
554
555 static final class AsyncCompose<T,U> extends Async {
556 final T arg;
557 final Function<? super T, CompletableFuture<U>> fn;
558 final CompletableFuture<U> dst;
559 AsyncCompose(T arg,
560 Function<? super T, CompletableFuture<U>> fn,
561 CompletableFuture<U> dst) {
562 this.arg = arg; this.fn = fn; this.dst = dst;
563 }
564 public final boolean exec() {
565 CompletableFuture<U> d, fr; U u; Throwable ex;
566 if ((d = this.dst) != null && d.result == null) {
567 try {
568 fr = fn.apply(arg);
569 ex = (fr == null) ? new NullPointerException() : null;
570 } catch (Throwable rex) {
571 ex = rex;
572 fr = null;
573 }
574 if (ex != null)
575 u = null;
576 else {
577 Object r = fr.result;
578 if (r == null)
579 r = fr.waitingGet(false);
580 if (r instanceof AltResult) {
581 ex = ((AltResult)r).ex;
582 u = null;
583 }
584 else {
585 @SuppressWarnings("unchecked") U ur = (U) r;
586 u = ur;
587 }
588 }
589 d.internalComplete(u, ex);
590 }
591 return true;
592 }
593 private static final long serialVersionUID = 5232453952276885070L;
594 }
595
596 /* ------------- Completions -------------- */
597
598 /**
599 * Simple linked list nodes to record completions, used in
600 * basically the same way as WaitNodes. (We separate nodes from
601 * the Completions themselves mainly because for the And and Or
602 * methods, the same Completion object resides in two lists.)
603 */
604 static final class CompletionNode {
605 final Completion completion;
606 volatile CompletionNode next;
607 CompletionNode(Completion completion) { this.completion = completion; }
608 }
609
610 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
611 abstract static class Completion extends AtomicInteger implements Runnable {
612 }
613
614 static final class ApplyCompletion<T,U> extends Completion {
615 final CompletableFuture<? extends T> src;
616 final Function<? super T,? extends U> fn;
617 final CompletableFuture<U> dst;
618 final Executor executor;
619 ApplyCompletion(CompletableFuture<? extends T> src,
620 Function<? super T,? extends U> fn,
621 CompletableFuture<U> dst, Executor executor) {
622 this.src = src; this.fn = fn; this.dst = dst;
623 this.executor = executor;
624 }
625 public final void run() {
626 final CompletableFuture<? extends T> a;
627 final Function<? super T,? extends U> fn;
628 final CompletableFuture<U> dst;
629 Object r; T t; Throwable ex;
630 if ((dst = this.dst) != null &&
631 (fn = this.fn) != null &&
632 (a = this.src) != null &&
633 (r = a.result) != null &&
634 compareAndSet(0, 1)) {
635 if (r instanceof AltResult) {
636 ex = ((AltResult)r).ex;
637 t = null;
638 }
639 else {
640 ex = null;
641 @SuppressWarnings("unchecked") T tr = (T) r;
642 t = tr;
643 }
644 Executor e = executor;
645 U u = null;
646 if (ex == null) {
647 try {
648 if (e != null)
649 e.execute(new AsyncApply<T,U>(t, fn, dst));
650 else
651 u = fn.apply(t);
652 } catch (Throwable rex) {
653 ex = rex;
654 }
655 }
656 if (e == null || ex != null)
657 dst.internalComplete(u, ex);
658 }
659 }
660 private static final long serialVersionUID = 5232453952276885070L;
661 }
662
663 static final class AcceptCompletion<T> extends Completion {
664 final CompletableFuture<? extends T> src;
665 final Consumer<? super T> fn;
666 final CompletableFuture<Void> dst;
667 final Executor executor;
668 AcceptCompletion(CompletableFuture<? extends T> src,
669 Consumer<? super T> fn,
670 CompletableFuture<Void> dst, Executor executor) {
671 this.src = src; this.fn = fn; this.dst = dst;
672 this.executor = executor;
673 }
674 public final void run() {
675 final CompletableFuture<? extends T> a;
676 final Consumer<? super T> fn;
677 final CompletableFuture<Void> dst;
678 Object r; T t; Throwable ex;
679 if ((dst = this.dst) != null &&
680 (fn = this.fn) != null &&
681 (a = this.src) != null &&
682 (r = a.result) != null &&
683 compareAndSet(0, 1)) {
684 if (r instanceof AltResult) {
685 ex = ((AltResult)r).ex;
686 t = null;
687 }
688 else {
689 ex = null;
690 @SuppressWarnings("unchecked") T tr = (T) r;
691 t = tr;
692 }
693 Executor e = executor;
694 if (ex == null) {
695 try {
696 if (e != null)
697 e.execute(new AsyncAccept<T>(t, fn, dst));
698 else
699 fn.accept(t);
700 } catch (Throwable rex) {
701 ex = rex;
702 }
703 }
704 if (e == null || ex != null)
705 dst.internalComplete(null, ex);
706 }
707 }
708 private static final long serialVersionUID = 5232453952276885070L;
709 }
710
711 static final class RunCompletion<T> extends Completion {
712 final CompletableFuture<? extends T> src;
713 final Runnable fn;
714 final CompletableFuture<Void> dst;
715 final Executor executor;
716 RunCompletion(CompletableFuture<? extends T> src,
717 Runnable fn,
718 CompletableFuture<Void> dst,
719 Executor executor) {
720 this.src = src; this.fn = fn; this.dst = dst;
721 this.executor = executor;
722 }
723 public final void run() {
724 final CompletableFuture<? extends T> a;
725 final Runnable fn;
726 final CompletableFuture<Void> dst;
727 Object r; Throwable ex;
728 if ((dst = this.dst) != null &&
729 (fn = this.fn) != null &&
730 (a = this.src) != null &&
731 (r = a.result) != null &&
732 compareAndSet(0, 1)) {
733 if (r instanceof AltResult)
734 ex = ((AltResult)r).ex;
735 else
736 ex = null;
737 Executor e = executor;
738 if (ex == null) {
739 try {
740 if (e != null)
741 e.execute(new AsyncRun(fn, dst));
742 else
743 fn.run();
744 } catch (Throwable rex) {
745 ex = rex;
746 }
747 }
748 if (e == null || ex != null)
749 dst.internalComplete(null, ex);
750 }
751 }
752 private static final long serialVersionUID = 5232453952276885070L;
753 }
754
755 static final class BiApplyCompletion<T,U,V> extends Completion {
756 final CompletableFuture<? extends T> src;
757 final CompletableFuture<? extends U> snd;
758 final BiFunction<? super T,? super U,? extends V> fn;
759 final CompletableFuture<V> dst;
760 final Executor executor;
761 BiApplyCompletion(CompletableFuture<? extends T> src,
762 CompletableFuture<? extends U> snd,
763 BiFunction<? super T,? super U,? extends V> fn,
764 CompletableFuture<V> dst, Executor executor) {
765 this.src = src; this.snd = snd;
766 this.fn = fn; this.dst = dst;
767 this.executor = executor;
768 }
769 public final void run() {
770 final CompletableFuture<? extends T> a;
771 final CompletableFuture<? extends U> b;
772 final BiFunction<? super T,? super U,? extends V> fn;
773 final CompletableFuture<V> dst;
774 Object r, s; T t; U u; Throwable ex;
775 if ((dst = this.dst) != null &&
776 (fn = this.fn) != null &&
777 (a = this.src) != null &&
778 (r = a.result) != null &&
779 (b = this.snd) != null &&
780 (s = b.result) != null &&
781 compareAndSet(0, 1)) {
782 if (r instanceof AltResult) {
783 ex = ((AltResult)r).ex;
784 t = null;
785 }
786 else {
787 ex = null;
788 @SuppressWarnings("unchecked") T tr = (T) r;
789 t = tr;
790 }
791 if (ex != null)
792 u = null;
793 else if (s instanceof AltResult) {
794 ex = ((AltResult)s).ex;
795 u = null;
796 }
797 else {
798 @SuppressWarnings("unchecked") U us = (U) s;
799 u = us;
800 }
801 Executor e = executor;
802 V v = null;
803 if (ex == null) {
804 try {
805 if (e != null)
806 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
807 else
808 v = fn.apply(t, u);
809 } catch (Throwable rex) {
810 ex = rex;
811 }
812 }
813 if (e == null || ex != null)
814 dst.internalComplete(v, ex);
815 }
816 }
817 private static final long serialVersionUID = 5232453952276885070L;
818 }
819
820 static final class BiAcceptCompletion<T,U> extends Completion {
821 final CompletableFuture<? extends T> src;
822 final CompletableFuture<? extends U> snd;
823 final BiConsumer<? super T,? super U> fn;
824 final CompletableFuture<Void> dst;
825 final Executor executor;
826 BiAcceptCompletion(CompletableFuture<? extends T> src,
827 CompletableFuture<? extends U> snd,
828 BiConsumer<? super T,? super U> fn,
829 CompletableFuture<Void> dst, Executor executor) {
830 this.src = src; this.snd = snd;
831 this.fn = fn; this.dst = dst;
832 this.executor = executor;
833 }
834 public final void run() {
835 final CompletableFuture<? extends T> a;
836 final CompletableFuture<? extends U> b;
837 final BiConsumer<? super T,? super U> fn;
838 final CompletableFuture<Void> dst;
839 Object r, s; T t; U u; Throwable ex;
840 if ((dst = this.dst) != null &&
841 (fn = this.fn) != null &&
842 (a = this.src) != null &&
843 (r = a.result) != null &&
844 (b = this.snd) != null &&
845 (s = b.result) != null &&
846 compareAndSet(0, 1)) {
847 if (r instanceof AltResult) {
848 ex = ((AltResult)r).ex;
849 t = null;
850 }
851 else {
852 ex = null;
853 @SuppressWarnings("unchecked") T tr = (T) r;
854 t = tr;
855 }
856 if (ex != null)
857 u = null;
858 else if (s instanceof AltResult) {
859 ex = ((AltResult)s).ex;
860 u = null;
861 }
862 else {
863 @SuppressWarnings("unchecked") U us = (U) s;
864 u = us;
865 }
866 Executor e = executor;
867 if (ex == null) {
868 try {
869 if (e != null)
870 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
871 else
872 fn.accept(t, u);
873 } catch (Throwable rex) {
874 ex = rex;
875 }
876 }
877 if (e == null || ex != null)
878 dst.internalComplete(null, ex);
879 }
880 }
881 private static final long serialVersionUID = 5232453952276885070L;
882 }
883
884 static final class BiRunCompletion<T> extends Completion {
885 final CompletableFuture<? extends T> src;
886 final CompletableFuture<?> snd;
887 final Runnable fn;
888 final CompletableFuture<Void> dst;
889 final Executor executor;
890 BiRunCompletion(CompletableFuture<? extends T> src,
891 CompletableFuture<?> snd,
892 Runnable fn,
893 CompletableFuture<Void> dst, Executor executor) {
894 this.src = src; this.snd = snd;
895 this.fn = fn; this.dst = dst;
896 this.executor = executor;
897 }
898 public final void run() {
899 final CompletableFuture<? extends T> a;
900 final CompletableFuture<?> b;
901 final Runnable fn;
902 final CompletableFuture<Void> dst;
903 Object r, s; Throwable ex;
904 if ((dst = this.dst) != null &&
905 (fn = this.fn) != null &&
906 (a = this.src) != null &&
907 (r = a.result) != null &&
908 (b = this.snd) != null &&
909 (s = b.result) != null &&
910 compareAndSet(0, 1)) {
911 if (r instanceof AltResult)
912 ex = ((AltResult)r).ex;
913 else
914 ex = null;
915 if (ex == null && (s instanceof AltResult))
916 ex = ((AltResult)s).ex;
917 Executor e = executor;
918 if (ex == null) {
919 try {
920 if (e != null)
921 e.execute(new AsyncRun(fn, dst));
922 else
923 fn.run();
924 } catch (Throwable rex) {
925 ex = rex;
926 }
927 }
928 if (e == null || ex != null)
929 dst.internalComplete(null, ex);
930 }
931 }
932 private static final long serialVersionUID = 5232453952276885070L;
933 }
934
935 static final class AndCompletion extends Completion {
936 final CompletableFuture<?> src;
937 final CompletableFuture<?> snd;
938 final CompletableFuture<Void> dst;
939 AndCompletion(CompletableFuture<?> src,
940 CompletableFuture<?> snd,
941 CompletableFuture<Void> dst) {
942 this.src = src; this.snd = snd; this.dst = dst;
943 }
944 public final void run() {
945 final CompletableFuture<?> a;
946 final CompletableFuture<?> b;
947 final CompletableFuture<Void> dst;
948 Object r, s; Throwable ex;
949 if ((dst = this.dst) != null &&
950 (a = this.src) != null &&
951 (r = a.result) != null &&
952 (b = this.snd) != null &&
953 (s = b.result) != null &&
954 compareAndSet(0, 1)) {
955 if (r instanceof AltResult)
956 ex = ((AltResult)r).ex;
957 else
958 ex = null;
959 if (ex == null && (s instanceof AltResult))
960 ex = ((AltResult)s).ex;
961 dst.internalComplete(null, ex);
962 }
963 }
964 private static final long serialVersionUID = 5232453952276885070L;
965 }
966
967 static final class OrApplyCompletion<T,U> extends Completion {
968 final CompletableFuture<? extends T> src;
969 final CompletableFuture<? extends T> snd;
970 final Function<? super T,? extends U> fn;
971 final CompletableFuture<U> dst;
972 final Executor executor;
973 OrApplyCompletion(CompletableFuture<? extends T> src,
974 CompletableFuture<? extends T> snd,
975 Function<? super T,? extends U> fn,
976 CompletableFuture<U> dst, Executor executor) {
977 this.src = src; this.snd = snd;
978 this.fn = fn; this.dst = dst;
979 this.executor = executor;
980 }
981 public final void run() {
982 final CompletableFuture<? extends T> a;
983 final CompletableFuture<? extends T> b;
984 final Function<? super T,? extends U> fn;
985 final CompletableFuture<U> dst;
986 Object r; T t; Throwable ex;
987 if ((dst = this.dst) != null &&
988 (fn = this.fn) != null &&
989 (((a = this.src) != null && (r = a.result) != null) ||
990 ((b = this.snd) != null && (r = b.result) != null)) &&
991 compareAndSet(0, 1)) {
992 if (r instanceof AltResult) {
993 ex = ((AltResult)r).ex;
994 t = null;
995 }
996 else {
997 ex = null;
998 @SuppressWarnings("unchecked") T tr = (T) r;
999 t = tr;
1000 }
1001 Executor e = executor;
1002 U u = null;
1003 if (ex == null) {
1004 try {
1005 if (e != null)
1006 e.execute(new AsyncApply<T,U>(t, fn, dst));
1007 else
1008 u = fn.apply(t);
1009 } catch (Throwable rex) {
1010 ex = rex;
1011 }
1012 }
1013 if (e == null || ex != null)
1014 dst.internalComplete(u, ex);
1015 }
1016 }
1017 private static final long serialVersionUID = 5232453952276885070L;
1018 }
1019
1020 static final class OrAcceptCompletion<T> extends Completion {
1021 final CompletableFuture<? extends T> src;
1022 final CompletableFuture<? extends T> snd;
1023 final Consumer<? super T> fn;
1024 final CompletableFuture<Void> dst;
1025 final Executor executor;
1026 OrAcceptCompletion(CompletableFuture<? extends T> src,
1027 CompletableFuture<? extends T> snd,
1028 Consumer<? super T> fn,
1029 CompletableFuture<Void> dst, Executor executor) {
1030 this.src = src; this.snd = snd;
1031 this.fn = fn; this.dst = dst;
1032 this.executor = executor;
1033 }
1034 public final void run() {
1035 final CompletableFuture<? extends T> a;
1036 final CompletableFuture<? extends T> b;
1037 final Consumer<? super T> fn;
1038 final CompletableFuture<Void> dst;
1039 Object r; T t; Throwable ex;
1040 if ((dst = this.dst) != null &&
1041 (fn = this.fn) != null &&
1042 (((a = this.src) != null && (r = a.result) != null) ||
1043 ((b = this.snd) != null && (r = b.result) != null)) &&
1044 compareAndSet(0, 1)) {
1045 if (r instanceof AltResult) {
1046 ex = ((AltResult)r).ex;
1047 t = null;
1048 }
1049 else {
1050 ex = null;
1051 @SuppressWarnings("unchecked") T tr = (T) r;
1052 t = tr;
1053 }
1054 Executor e = executor;
1055 if (ex == null) {
1056 try {
1057 if (e != null)
1058 e.execute(new AsyncAccept<T>(t, fn, dst));
1059 else
1060 fn.accept(t);
1061 } catch (Throwable rex) {
1062 ex = rex;
1063 }
1064 }
1065 if (e == null || ex != null)
1066 dst.internalComplete(null, ex);
1067 }
1068 }
1069 private static final long serialVersionUID = 5232453952276885070L;
1070 }
1071
1072 static final class OrRunCompletion<T> extends Completion {
1073 final CompletableFuture<? extends T> src;
1074 final CompletableFuture<?> snd;
1075 final Runnable fn;
1076 final CompletableFuture<Void> dst;
1077 final Executor executor;
1078 OrRunCompletion(CompletableFuture<? extends T> src,
1079 CompletableFuture<?> snd,
1080 Runnable fn,
1081 CompletableFuture<Void> dst, Executor executor) {
1082 this.src = src; this.snd = snd;
1083 this.fn = fn; this.dst = dst;
1084 this.executor = executor;
1085 }
1086 public final void run() {
1087 final CompletableFuture<? extends T> a;
1088 final CompletableFuture<?> b;
1089 final Runnable fn;
1090 final CompletableFuture<Void> dst;
1091 Object r; Throwable ex;
1092 if ((dst = this.dst) != null &&
1093 (fn = this.fn) != null &&
1094 (((a = this.src) != null && (r = a.result) != null) ||
1095 ((b = this.snd) != null && (r = b.result) != null)) &&
1096 compareAndSet(0, 1)) {
1097 if (r instanceof AltResult)
1098 ex = ((AltResult)r).ex;
1099 else
1100 ex = null;
1101 Executor e = executor;
1102 if (ex == null) {
1103 try {
1104 if (e != null)
1105 e.execute(new AsyncRun(fn, dst));
1106 else
1107 fn.run();
1108 } catch (Throwable rex) {
1109 ex = rex;
1110 }
1111 }
1112 if (e == null || ex != null)
1113 dst.internalComplete(null, ex);
1114 }
1115 }
1116 private static final long serialVersionUID = 5232453952276885070L;
1117 }
1118
1119 static final class OrCompletion extends Completion {
1120 final CompletableFuture<?> src;
1121 final CompletableFuture<?> snd;
1122 final CompletableFuture<?> dst;
1123 OrCompletion(CompletableFuture<?> src,
1124 CompletableFuture<?> snd,
1125 CompletableFuture<?> dst) {
1126 this.src = src; this.snd = snd; this.dst = dst;
1127 }
1128 public final void run() {
1129 final CompletableFuture<?> a;
1130 final CompletableFuture<?> b;
1131 final CompletableFuture<?> dst;
1132 Object r, t; Throwable ex;
1133 if ((dst = this.dst) != null &&
1134 (((a = this.src) != null && (r = a.result) != null) ||
1135 ((b = this.snd) != null && (r = b.result) != null)) &&
1136 compareAndSet(0, 1)) {
1137 if (r instanceof AltResult) {
1138 ex = ((AltResult)r).ex;
1139 t = null;
1140 }
1141 else {
1142 ex = null;
1143 t = r;
1144 }
1145 dst.internalComplete(t, ex);
1146 }
1147 }
1148 private static final long serialVersionUID = 5232453952276885070L;
1149 }
1150
1151 static final class ExceptionCompletion<T> extends Completion {
1152 final CompletableFuture<? extends T> src;
1153 final Function<? super Throwable, ? extends T> fn;
1154 final CompletableFuture<T> dst;
1155 ExceptionCompletion(CompletableFuture<? extends T> src,
1156 Function<? super Throwable, ? extends T> fn,
1157 CompletableFuture<T> dst) {
1158 this.src = src; this.fn = fn; this.dst = dst;
1159 }
1160 public final void run() {
1161 final CompletableFuture<? extends T> a;
1162 final Function<? super Throwable, ? extends T> fn;
1163 final CompletableFuture<T> dst;
1164 Object r; T t = null; Throwable ex, dx = null;
1165 if ((dst = this.dst) != null &&
1166 (fn = this.fn) != null &&
1167 (a = this.src) != null &&
1168 (r = a.result) != null &&
1169 compareAndSet(0, 1)) {
1170 if ((r instanceof AltResult) &&
1171 (ex = ((AltResult)r).ex) != null) {
1172 try {
1173 t = fn.apply(ex);
1174 } catch (Throwable rex) {
1175 dx = rex;
1176 }
1177 }
1178 else {
1179 @SuppressWarnings("unchecked") T tr = (T) r;
1180 t = tr;
1181 }
1182 dst.internalComplete(t, dx);
1183 }
1184 }
1185 private static final long serialVersionUID = 5232453952276885070L;
1186 }
1187
1188 static final class ThenCopy extends Completion {
1189 final CompletableFuture<?> src;
1190 final CompletableFuture<?> dst;
1191 ThenCopy(CompletableFuture<?> src,
1192 CompletableFuture<?> dst) {
1193 this.src = src; this.dst = dst;
1194 }
1195 public final void run() {
1196 final CompletableFuture<?> a;
1197 final CompletableFuture<?> dst;
1198 Object r; Object t; Throwable ex;
1199 if ((dst = this.dst) != null &&
1200 (a = this.src) != null &&
1201 (r = a.result) != null &&
1202 compareAndSet(0, 1)) {
1203 if (r instanceof AltResult) {
1204 ex = ((AltResult)r).ex;
1205 t = null;
1206 }
1207 else {
1208 ex = null;
1209 t = r;
1210 }
1211 dst.internalComplete(t, ex);
1212 }
1213 }
1214 private static final long serialVersionUID = 5232453952276885070L;
1215 }
1216
1217 static final class HandleCompletion<T,U> extends Completion {
1218 final CompletableFuture<? extends T> src;
1219 final BiFunction<? super T, Throwable, ? extends U> fn;
1220 final CompletableFuture<U> dst;
1221 HandleCompletion(CompletableFuture<? extends T> src,
1222 BiFunction<? super T, Throwable, ? extends U> fn,
1223 CompletableFuture<U> dst) {
1224 this.src = src; this.fn = fn; this.dst = dst;
1225 }
1226 public final void run() {
1227 final CompletableFuture<? extends T> a;
1228 final BiFunction<? super T, Throwable, ? extends U> fn;
1229 final CompletableFuture<U> dst;
1230 Object r; T t; Throwable ex;
1231 if ((dst = this.dst) != null &&
1232 (fn = this.fn) != null &&
1233 (a = this.src) != null &&
1234 (r = a.result) != null &&
1235 compareAndSet(0, 1)) {
1236 if (r instanceof AltResult) {
1237 ex = ((AltResult)r).ex;
1238 t = null;
1239 }
1240 else {
1241 ex = null;
1242 @SuppressWarnings("unchecked") T tr = (T) r;
1243 t = tr;
1244 }
1245 U u = null; Throwable dx = null;
1246 try {
1247 u = fn.apply(t, ex);
1248 } catch (Throwable rex) {
1249 dx = rex;
1250 }
1251 dst.internalComplete(u, dx);
1252 }
1253 }
1254 private static final long serialVersionUID = 5232453952276885070L;
1255 }
1256
1257 static final class ComposeCompletion<T,U> extends Completion {
1258 final CompletableFuture<? extends T> src;
1259 final Function<? super T, CompletableFuture<U>> fn;
1260 final CompletableFuture<U> dst;
1261 final Executor executor;
1262 ComposeCompletion(CompletableFuture<? extends T> src,
1263 Function<? super T, CompletableFuture<U>> fn,
1264 CompletableFuture<U> dst, Executor executor) {
1265 this.src = src; this.fn = fn; this.dst = dst;
1266 this.executor = executor;
1267 }
1268 public final void run() {
1269 final CompletableFuture<? extends T> a;
1270 final Function<? super T, CompletableFuture<U>> fn;
1271 final CompletableFuture<U> dst;
1272 Object r; T t; Throwable ex; Executor e;
1273 if ((dst = this.dst) != null &&
1274 (fn = this.fn) != null &&
1275 (a = this.src) != null &&
1276 (r = a.result) != null &&
1277 compareAndSet(0, 1)) {
1278 if (r instanceof AltResult) {
1279 ex = ((AltResult)r).ex;
1280 t = null;
1281 }
1282 else {
1283 ex = null;
1284 @SuppressWarnings("unchecked") T tr = (T) r;
1285 t = tr;
1286 }
1287 CompletableFuture<U> c = null;
1288 U u = null;
1289 boolean complete = false;
1290 if (ex == null) {
1291 if ((e = executor) != null)
1292 e.execute(new AsyncCompose<T,U>(t, fn, dst));
1293 else {
1294 try {
1295 if ((c = fn.apply(t)) == null)
1296 ex = new NullPointerException();
1297 } catch (Throwable rex) {
1298 ex = rex;
1299 }
1300 }
1301 }
1302 if (c != null) {
1303 ThenCopy d = null;
1304 Object s;
1305 if ((s = c.result) == null) {
1306 CompletionNode p = new CompletionNode
1307 (d = new ThenCopy(c, dst));
1308 while ((s = c.result) == null) {
1309 if (UNSAFE.compareAndSwapObject
1310 (c, COMPLETIONS, p.next = c.completions, p))
1311 break;
1312 }
1313 }
1314 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1315 complete = true;
1316 if (s instanceof AltResult) {
1317 ex = ((AltResult)s).ex; // no rewrap
1318 u = null;
1319 }
1320 else {
1321 @SuppressWarnings("unchecked") U us = (U) s;
1322 u = us;
1323 }
1324 }
1325 }
1326 if (complete || ex != null)
1327 dst.internalComplete(u, ex);
1328 if (c != null)
1329 c.helpPostComplete();
1330 }
1331 }
1332 private static final long serialVersionUID = 5232453952276885070L;
1333 }
1334
1335 // public methods
1336
1337 /**
1338 * Creates a new incomplete CompletableFuture.
1339 */
1340 public CompletableFuture() {
1341 }
1342
1343 /**
1344 * Returns a new CompletableFuture that is asynchronously completed
1345 * by a task running in the {@link ForkJoinPool#commonPool()} with
1346 * the value obtained by calling the given Supplier.
1347 *
1348 * @param supplier a function returning the value to be used
1349 * to complete the returned CompletableFuture
1350 * @return the new CompletableFuture
1351 */
1352 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1353 if (supplier == null) throw new NullPointerException();
1354 CompletableFuture<U> f = new CompletableFuture<U>();
1355 ForkJoinPool.commonPool().
1356 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1357 return f;
1358 }
1359
1360 /**
1361 * Returns a new CompletableFuture that is asynchronously completed
1362 * by a task running in the given executor with the value obtained
1363 * by calling the given Supplier.
1364 *
1365 * @param supplier a function returning the value to be used
1366 * to complete the returned CompletableFuture
1367 * @param executor the executor to use for asynchronous execution
1368 * @return the new CompletableFuture
1369 */
1370 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1371 Executor executor) {
1372 if (executor == null || supplier == null)
1373 throw new NullPointerException();
1374 CompletableFuture<U> f = new CompletableFuture<U>();
1375 executor.execute(new AsyncSupply<U>(supplier, f));
1376 return f;
1377 }
1378
1379 /**
1380 * Returns a new CompletableFuture that is asynchronously completed
1381 * by a task running in the {@link ForkJoinPool#commonPool()} after
1382 * it runs the given action.
1383 *
1384 * @param runnable the action to run before completing the
1385 * returned CompletableFuture
1386 * @return the new CompletableFuture
1387 */
1388 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1389 if (runnable == null) throw new NullPointerException();
1390 CompletableFuture<Void> f = new CompletableFuture<Void>();
1391 ForkJoinPool.commonPool().
1392 execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1393 return f;
1394 }
1395
1396 /**
1397 * Returns a new CompletableFuture that is asynchronously completed
1398 * by a task running in the given executor after it runs the given
1399 * action.
1400 *
1401 * @param runnable the action to run before completing the
1402 * returned CompletableFuture
1403 * @param executor the executor to use for asynchronous execution
1404 * @return the new CompletableFuture
1405 */
1406 public static CompletableFuture<Void> runAsync(Runnable runnable,
1407 Executor executor) {
1408 if (executor == null || runnable == null)
1409 throw new NullPointerException();
1410 CompletableFuture<Void> f = new CompletableFuture<Void>();
1411 executor.execute(new AsyncRun(runnable, f));
1412 return f;
1413 }
1414
1415 /**
1416 * Returns {@code true} if completed in any fashion: normally,
1417 * exceptionally, or via cancellation.
1418 *
1419 * @return {@code true} if completed
1420 */
1421 public boolean isDone() {
1422 return result != null;
1423 }
1424
1425 /**
1426 * Waits if necessary for this future to complete, and then
1427 * returns its result.
1428 *
1429 * @return the result value
1430 * @throws CancellationException if this future was cancelled
1431 * @throws ExecutionException if this future completed exceptionally
1432 * @throws InterruptedException if the current thread was interrupted
1433 * while waiting
1434 */
1435 public T get() throws InterruptedException, ExecutionException {
1436 Object r; Throwable ex, cause;
1437 if ((r = result) == null && (r = waitingGet(true)) == null)
1438 throw new InterruptedException();
1439 if (!(r instanceof AltResult)) {
1440 @SuppressWarnings("unchecked") T tr = (T) r;
1441 return tr;
1442 }
1443 if ((ex = ((AltResult)r).ex) == null)
1444 return null;
1445 if (ex instanceof CancellationException)
1446 throw (CancellationException)ex;
1447 if ((ex instanceof CompletionException) &&
1448 (cause = ex.getCause()) != null)
1449 ex = cause;
1450 throw new ExecutionException(ex);
1451 }
1452
1453 /**
1454 * Waits if necessary for at most the given time for this future
1455 * to complete, and then returns its result, if available.
1456 *
1457 * @param timeout the maximum time to wait
1458 * @param unit the time unit of the timeout argument
1459 * @return the result value
1460 * @throws CancellationException if this future was cancelled
1461 * @throws ExecutionException if this future completed exceptionally
1462 * @throws InterruptedException if the current thread was interrupted
1463 * while waiting
1464 * @throws TimeoutException if the wait timed out
1465 */
1466 public T get(long timeout, TimeUnit unit)
1467 throws InterruptedException, ExecutionException, TimeoutException {
1468 Object r; Throwable ex, cause;
1469 long nanos = unit.toNanos(timeout);
1470 if (Thread.interrupted())
1471 throw new InterruptedException();
1472 if ((r = result) == null)
1473 r = timedAwaitDone(nanos);
1474 if (!(r instanceof AltResult)) {
1475 @SuppressWarnings("unchecked") T tr = (T) r;
1476 return tr;
1477 }
1478 if ((ex = ((AltResult)r).ex) == null)
1479 return null;
1480 if (ex instanceof CancellationException)
1481 throw (CancellationException)ex;
1482 if ((ex instanceof CompletionException) &&
1483 (cause = ex.getCause()) != null)
1484 ex = cause;
1485 throw new ExecutionException(ex);
1486 }
1487
1488 /**
1489 * Returns the result value when complete, or throws an
1490 * (unchecked) exception if completed exceptionally. To better
1491 * conform with the use of common functional forms, if a
1492 * computation involved in the completion of this
1493 * CompletableFuture threw an exception, this method throws an
1494 * (unchecked) {@link CompletionException} with the underlying
1495 * exception as its cause.
1496 *
1497 * @return the result value
1498 * @throws CancellationException if the computation was cancelled
1499 * @throws CompletionException if this future completed
1500 * exceptionally or a completion computation threw an exception
1501 */
1502 public T join() {
1503 Object r; Throwable ex;
1504 if ((r = result) == null)
1505 r = waitingGet(false);
1506 if (!(r instanceof AltResult)) {
1507 @SuppressWarnings("unchecked") T tr = (T) r;
1508 return tr;
1509 }
1510 if ((ex = ((AltResult)r).ex) == null)
1511 return null;
1512 if (ex instanceof CancellationException)
1513 throw (CancellationException)ex;
1514 if (ex instanceof CompletionException)
1515 throw (CompletionException)ex;
1516 throw new CompletionException(ex);
1517 }
1518
1519 /**
1520 * Returns the result value (or throws any encountered exception)
1521 * if completed, else returns the given valueIfAbsent.
1522 *
1523 * @param valueIfAbsent the value to return if not completed
1524 * @return the result value, if completed, else the given valueIfAbsent
1525 * @throws CancellationException if the computation was cancelled
1526 * @throws CompletionException if this future completed
1527 * exceptionally or a completion computation threw an exception
1528 */
1529 public T getNow(T valueIfAbsent) {
1530 Object r; Throwable ex;
1531 if ((r = result) == null)
1532 return valueIfAbsent;
1533 if (!(r instanceof AltResult)) {
1534 @SuppressWarnings("unchecked") T tr = (T) r;
1535 return tr;
1536 }
1537 if ((ex = ((AltResult)r).ex) == null)
1538 return null;
1539 if (ex instanceof CancellationException)
1540 throw (CancellationException)ex;
1541 if (ex instanceof CompletionException)
1542 throw (CompletionException)ex;
1543 throw new CompletionException(ex);
1544 }
1545
1546 /**
1547 * If not already completed, sets the value returned by {@link
1548 * #get()} and related methods to the given value.
1549 *
1550 * @param value the result value
1551 * @return {@code true} if this invocation caused this CompletableFuture
1552 * to transition to a completed state, else {@code false}
1553 */
1554 public boolean complete(T value) {
1555 boolean triggered = result == null &&
1556 UNSAFE.compareAndSwapObject(this, RESULT, null,
1557 value == null ? NIL : value);
1558 postComplete();
1559 return triggered;
1560 }
1561
1562 /**
1563 * If not already completed, causes invocations of {@link #get()}
1564 * and related methods to throw the given exception.
1565 *
1566 * @param ex the exception
1567 * @return {@code true} if this invocation caused this CompletableFuture
1568 * to transition to a completed state, else {@code false}
1569 */
1570 public boolean completeExceptionally(Throwable ex) {
1571 if (ex == null) throw new NullPointerException();
1572 boolean triggered = result == null &&
1573 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1574 postComplete();
1575 return triggered;
1576 }
1577
1578 /**
1579 * Returns a new CompletableFuture that is completed
1580 * when this CompletableFuture completes, with the result of the
1581 * given function of this CompletableFuture's result.
1582 *
1583 * <p>If this CompletableFuture completes exceptionally, or the
1584 * supplied function throws an exception, then the returned
1585 * CompletableFuture completes exceptionally with a
1586 * CompletionException holding the exception as its cause.
1587 *
1588 * @param fn the function to use to compute the value of
1589 * the returned CompletableFuture
1590 * @return the new CompletableFuture
1591 */
1592 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
1593 return doThenApply(fn, null);
1594 }
1595
1596 /**
1597 * Returns a new CompletableFuture that is asynchronously completed
1598 * when this CompletableFuture completes, with the result of the
1599 * given function of this CompletableFuture's result from a
1600 * task running in the {@link ForkJoinPool#commonPool()}.
1601 *
1602 * <p>If this CompletableFuture completes exceptionally, or the
1603 * supplied function throws an exception, then the returned
1604 * CompletableFuture completes exceptionally with a
1605 * CompletionException holding the exception as its cause.
1606 *
1607 * @param fn the function to use to compute the value of
1608 * the returned CompletableFuture
1609 * @return the new CompletableFuture
1610 */
1611 public <U> CompletableFuture<U> thenApplyAsync
1612 (Function<? super T,? extends U> fn) {
1613 return doThenApply(fn, ForkJoinPool.commonPool());
1614 }
1615
1616 /**
1617 * Returns a new CompletableFuture that is asynchronously completed
1618 * when this CompletableFuture completes, with the result of the
1619 * given function of this CompletableFuture's result from a
1620 * task running in the given executor.
1621 *
1622 * <p>If this CompletableFuture completes exceptionally, or the
1623 * supplied function throws an exception, then the returned
1624 * CompletableFuture completes exceptionally with a
1625 * CompletionException holding the exception as its cause.
1626 *
1627 * @param fn the function to use to compute the value of
1628 * the returned CompletableFuture
1629 * @param executor the executor to use for asynchronous execution
1630 * @return the new CompletableFuture
1631 */
1632 public <U> CompletableFuture<U> thenApplyAsync
1633 (Function<? super T,? extends U> fn,
1634 Executor executor) {
1635 if (executor == null) throw new NullPointerException();
1636 return doThenApply(fn, executor);
1637 }
1638
1639 private <U> CompletableFuture<U> doThenApply
1640 (Function<? super T,? extends U> fn,
1641 Executor e) {
1642 if (fn == null) throw new NullPointerException();
1643 CompletableFuture<U> dst = new CompletableFuture<U>();
1644 ApplyCompletion<T,U> d = null;
1645 Object r;
1646 if ((r = result) == null) {
1647 CompletionNode p = new CompletionNode
1648 (d = new ApplyCompletion<T,U>(this, fn, dst, e));
1649 while ((r = result) == null) {
1650 if (UNSAFE.compareAndSwapObject
1651 (this, COMPLETIONS, p.next = completions, p))
1652 break;
1653 }
1654 }
1655 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1656 T t; Throwable ex;
1657 if (r instanceof AltResult) {
1658 ex = ((AltResult)r).ex;
1659 t = null;
1660 }
1661 else {
1662 ex = null;
1663 @SuppressWarnings("unchecked") T tr = (T) r;
1664 t = tr;
1665 }
1666 U u = null;
1667 if (ex == null) {
1668 try {
1669 if (e != null)
1670 e.execute(new AsyncApply<T,U>(t, fn, dst));
1671 else
1672 u = fn.apply(t);
1673 } catch (Throwable rex) {
1674 ex = rex;
1675 }
1676 }
1677 if (e == null || ex != null)
1678 dst.internalComplete(u, ex);
1679 }
1680 helpPostComplete();
1681 return dst;
1682 }
1683
1684 /**
1685 * Returns a new CompletableFuture that is completed
1686 * when this CompletableFuture completes, after performing the given
1687 * action with this CompletableFuture's result.
1688 *
1689 * <p>If this CompletableFuture completes exceptionally, or the
1690 * supplied action throws an exception, then the returned
1691 * CompletableFuture completes exceptionally with a
1692 * CompletionException holding the exception as its cause.
1693 *
1694 * @param block the action to perform before completing the
1695 * returned CompletableFuture
1696 * @return the new CompletableFuture
1697 */
1698 public CompletableFuture<Void> thenAccept(Consumer<? super T> block) {
1699 return doThenAccept(block, null);
1700 }
1701
1702 /**
1703 * Returns a new CompletableFuture that is asynchronously completed
1704 * when this CompletableFuture completes, after performing the given
1705 * action with this CompletableFuture's result from a task running
1706 * in the {@link ForkJoinPool#commonPool()}.
1707 *
1708 * <p>If this CompletableFuture completes exceptionally, or the
1709 * supplied action throws an exception, then the returned
1710 * CompletableFuture completes exceptionally with a
1711 * CompletionException holding the exception as its cause.
1712 *
1713 * @param block the action to perform before completing the
1714 * returned CompletableFuture
1715 * @return the new CompletableFuture
1716 */
1717 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block) {
1718 return doThenAccept(block, ForkJoinPool.commonPool());
1719 }
1720
1721 /**
1722 * Returns a new CompletableFuture that is asynchronously completed
1723 * when this CompletableFuture completes, after performing the given
1724 * action with this CompletableFuture's result from a task running
1725 * in the given executor.
1726 *
1727 * <p>If this CompletableFuture completes exceptionally, or the
1728 * supplied action throws an exception, then the returned
1729 * CompletableFuture completes exceptionally with a
1730 * CompletionException holding the exception as its cause.
1731 *
1732 * @param block the action to perform before completing the
1733 * returned CompletableFuture
1734 * @param executor the executor to use for asynchronous execution
1735 * @return the new CompletableFuture
1736 */
1737 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block,
1738 Executor executor) {
1739 if (executor == null) throw new NullPointerException();
1740 return doThenAccept(block, executor);
1741 }
1742
1743 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1744 Executor e) {
1745 if (fn == null) throw new NullPointerException();
1746 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1747 AcceptCompletion<T> d = null;
1748 Object r;
1749 if ((r = result) == null) {
1750 CompletionNode p = new CompletionNode
1751 (d = new AcceptCompletion<T>(this, fn, dst, e));
1752 while ((r = result) == null) {
1753 if (UNSAFE.compareAndSwapObject
1754 (this, COMPLETIONS, p.next = completions, p))
1755 break;
1756 }
1757 }
1758 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1759 T t; Throwable ex;
1760 if (r instanceof AltResult) {
1761 ex = ((AltResult)r).ex;
1762 t = null;
1763 }
1764 else {
1765 ex = null;
1766 @SuppressWarnings("unchecked") T tr = (T) r;
1767 t = tr;
1768 }
1769 if (ex == null) {
1770 try {
1771 if (e != null)
1772 e.execute(new AsyncAccept<T>(t, fn, dst));
1773 else
1774 fn.accept(t);
1775 } catch (Throwable rex) {
1776 ex = rex;
1777 }
1778 }
1779 if (e == null || ex != null)
1780 dst.internalComplete(null, ex);
1781 }
1782 helpPostComplete();
1783 return dst;
1784 }
1785
1786 /**
1787 * Returns a new CompletableFuture that is completed
1788 * when this CompletableFuture completes, after performing the given
1789 * action.
1790 *
1791 * <p>If this CompletableFuture completes exceptionally, or the
1792 * supplied action throws an exception, then the returned
1793 * CompletableFuture completes exceptionally with a
1794 * CompletionException holding the exception as its cause.
1795 *
1796 * @param action the action to perform before completing the
1797 * returned CompletableFuture
1798 * @return the new CompletableFuture
1799 */
1800 public CompletableFuture<Void> thenRun(Runnable action) {
1801 return doThenRun(action, null);
1802 }
1803
1804 /**
1805 * Returns a new CompletableFuture that is asynchronously completed
1806 * when this CompletableFuture completes, after performing the given
1807 * action from a task running in the {@link ForkJoinPool#commonPool()}.
1808 *
1809 * <p>If this CompletableFuture completes exceptionally, or the
1810 * supplied action throws an exception, then the returned
1811 * CompletableFuture completes exceptionally with a
1812 * CompletionException holding the exception as its cause.
1813 *
1814 * @param action the action to perform before completing the
1815 * returned CompletableFuture
1816 * @return the new CompletableFuture
1817 */
1818 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1819 return doThenRun(action, ForkJoinPool.commonPool());
1820 }
1821
1822 /**
1823 * Returns a new CompletableFuture that is asynchronously completed
1824 * when this CompletableFuture completes, after performing the given
1825 * action from a task running in the given executor.
1826 *
1827 * <p>If this CompletableFuture completes exceptionally, or the
1828 * supplied action throws an exception, then the returned
1829 * CompletableFuture completes exceptionally with a
1830 * CompletionException holding the exception as its cause.
1831 *
1832 * @param action the action to perform before completing the
1833 * returned CompletableFuture
1834 * @param executor the executor to use for asynchronous execution
1835 * @return the new CompletableFuture
1836 */
1837 public CompletableFuture<Void> thenRunAsync(Runnable action,
1838 Executor executor) {
1839 if (executor == null) throw new NullPointerException();
1840 return doThenRun(action, executor);
1841 }
1842
1843 private CompletableFuture<Void> doThenRun(Runnable action,
1844 Executor e) {
1845 if (action == null) throw new NullPointerException();
1846 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1847 RunCompletion<T> d = null;
1848 Object r;
1849 if ((r = result) == null) {
1850 CompletionNode p = new CompletionNode
1851 (d = new RunCompletion<T>(this, action, dst, e));
1852 while ((r = result) == null) {
1853 if (UNSAFE.compareAndSwapObject
1854 (this, COMPLETIONS, p.next = completions, p))
1855 break;
1856 }
1857 }
1858 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1859 Throwable ex;
1860 if (r instanceof AltResult)
1861 ex = ((AltResult)r).ex;
1862 else
1863 ex = null;
1864 if (ex == null) {
1865 try {
1866 if (e != null)
1867 e.execute(new AsyncRun(action, dst));
1868 else
1869 action.run();
1870 } catch (Throwable rex) {
1871 ex = rex;
1872 }
1873 }
1874 if (e == null || ex != null)
1875 dst.internalComplete(null, ex);
1876 }
1877 helpPostComplete();
1878 return dst;
1879 }
1880
1881 /**
1882 * Returns a new CompletableFuture that is completed
1883 * when both this and the other given CompletableFuture complete,
1884 * with the result of the given function of the results of the two
1885 * CompletableFutures.
1886 *
1887 * <p>If this and/or the other CompletableFuture complete
1888 * exceptionally, or the supplied function throws an exception,
1889 * then the returned CompletableFuture completes exceptionally
1890 * with a CompletionException holding the exception as its cause.
1891 *
1892 * @param other the other CompletableFuture
1893 * @param fn the function to use to compute the value of
1894 * the returned CompletableFuture
1895 * @return the new CompletableFuture
1896 */
1897 public <U,V> CompletableFuture<V> thenCombine
1898 (CompletableFuture<? extends U> other,
1899 BiFunction<? super T,? super U,? extends V> fn) {
1900 return doThenBiApply(other, fn, null);
1901 }
1902
1903 /**
1904 * Returns a new CompletableFuture that is asynchronously completed
1905 * when both this and the other given CompletableFuture complete,
1906 * with the result of the given function of the results of the two
1907 * CompletableFutures from a task running in the
1908 * {@link ForkJoinPool#commonPool()}.
1909 *
1910 * <p>If this and/or the other CompletableFuture complete
1911 * exceptionally, or the supplied function throws an exception,
1912 * then the returned CompletableFuture completes exceptionally
1913 * with a CompletionException holding the exception as its cause.
1914 *
1915 * @param other the other CompletableFuture
1916 * @param fn the function to use to compute the value of
1917 * the returned CompletableFuture
1918 * @return the new CompletableFuture
1919 */
1920 public <U,V> CompletableFuture<V> thenCombineAsync
1921 (CompletableFuture<? extends U> other,
1922 BiFunction<? super T,? super U,? extends V> fn) {
1923 return doThenBiApply(other, fn, ForkJoinPool.commonPool());
1924 }
1925
1926 /**
1927 * Returns a new CompletableFuture that is asynchronously completed
1928 * when both this and the other given CompletableFuture complete,
1929 * with the result of the given function of the results of the two
1930 * CompletableFutures from a task running in the given executor.
1931 *
1932 * <p>If this and/or the other CompletableFuture complete
1933 * exceptionally, or the supplied function throws an exception,
1934 * then the returned CompletableFuture completes exceptionally
1935 * with a CompletionException holding the exception as its cause.
1936 *
1937 * @param other the other CompletableFuture
1938 * @param fn the function to use to compute the value of
1939 * the returned CompletableFuture
1940 * @param executor the executor to use for asynchronous execution
1941 * @return the new CompletableFuture
1942 */
1943 public <U,V> CompletableFuture<V> thenCombineAsync
1944 (CompletableFuture<? extends U> other,
1945 BiFunction<? super T,? super U,? extends V> fn,
1946 Executor executor) {
1947 if (executor == null) throw new NullPointerException();
1948 return doThenBiApply(other, fn, executor);
1949 }
1950
1951 private <U,V> CompletableFuture<V> doThenBiApply
1952 (CompletableFuture<? extends U> other,
1953 BiFunction<? super T,? super U,? extends V> fn,
1954 Executor e) {
1955 if (other == null || fn == null) throw new NullPointerException();
1956 CompletableFuture<V> dst = new CompletableFuture<V>();
1957 BiApplyCompletion<T,U,V> d = null;
1958 Object r, s = null;
1959 if ((r = result) == null || (s = other.result) == null) {
1960 d = new BiApplyCompletion<T,U,V>(this, other, fn, dst, e);
1961 CompletionNode q = null, p = new CompletionNode(d);
1962 while ((r == null && (r = result) == null) ||
1963 (s == null && (s = other.result) == null)) {
1964 if (q != null) {
1965 if (s != null ||
1966 UNSAFE.compareAndSwapObject
1967 (other, COMPLETIONS, q.next = other.completions, q))
1968 break;
1969 }
1970 else if (r != null ||
1971 UNSAFE.compareAndSwapObject
1972 (this, COMPLETIONS, p.next = completions, p)) {
1973 if (s != null)
1974 break;
1975 q = new CompletionNode(d);
1976 }
1977 }
1978 }
1979 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1980 T t; U u; Throwable ex;
1981 if (r instanceof AltResult) {
1982 ex = ((AltResult)r).ex;
1983 t = null;
1984 }
1985 else {
1986 ex = null;
1987 @SuppressWarnings("unchecked") T tr = (T) r;
1988 t = tr;
1989 }
1990 if (ex != null)
1991 u = null;
1992 else if (s instanceof AltResult) {
1993 ex = ((AltResult)s).ex;
1994 u = null;
1995 }
1996 else {
1997 @SuppressWarnings("unchecked") U us = (U) s;
1998 u = us;
1999 }
2000 V v = null;
2001 if (ex == null) {
2002 try {
2003 if (e != null)
2004 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
2005 else
2006 v = fn.apply(t, u);
2007 } catch (Throwable rex) {
2008 ex = rex;
2009 }
2010 }
2011 if (e == null || ex != null)
2012 dst.internalComplete(v, ex);
2013 }
2014 helpPostComplete();
2015 other.helpPostComplete();
2016 return dst;
2017 }
2018
2019 /**
2020 * Returns a new CompletableFuture that is completed
2021 * when both this and the other given CompletableFuture complete,
2022 * after performing the given action with the results of the two
2023 * CompletableFutures.
2024 *
2025 * <p>If this and/or the other CompletableFuture complete
2026 * exceptionally, or the supplied action throws an exception,
2027 * then the returned CompletableFuture completes exceptionally
2028 * with a CompletionException holding the exception as its cause.
2029 *
2030 * @param other the other CompletableFuture
2031 * @param block the action to perform before completing the
2032 * returned CompletableFuture
2033 * @return the new CompletableFuture
2034 */
2035 public <U> CompletableFuture<Void> thenAcceptBoth
2036 (CompletableFuture<? extends U> other,
2037 BiConsumer<? super T, ? super U> block) {
2038 return doThenBiAccept(other, block, null);
2039 }
2040
2041 /**
2042 * Returns a new CompletableFuture that is asynchronously completed
2043 * when both this and the other given CompletableFuture complete,
2044 * after performing the given action with the results of the two
2045 * CompletableFutures from a task running in the {@link
2046 * ForkJoinPool#commonPool()}.
2047 *
2048 * <p>If this and/or the other CompletableFuture complete
2049 * exceptionally, or the supplied action throws an exception,
2050 * then the returned CompletableFuture completes exceptionally
2051 * with a CompletionException holding the exception as its cause.
2052 *
2053 * @param other the other CompletableFuture
2054 * @param block the action to perform before completing the
2055 * returned CompletableFuture
2056 * @return the new CompletableFuture
2057 */
2058 public <U> CompletableFuture<Void> thenAcceptBothAsync
2059 (CompletableFuture<? extends U> other,
2060 BiConsumer<? super T, ? super U> block) {
2061 return doThenBiAccept(other, block, ForkJoinPool.commonPool());
2062 }
2063
2064 /**
2065 * Returns a new CompletableFuture that is asynchronously completed
2066 * when both this and the other given CompletableFuture complete,
2067 * after performing the given action with the results of the two
2068 * CompletableFutures from a task running in the given executor.
2069 *
2070 * <p>If this and/or the other CompletableFuture complete
2071 * exceptionally, or the supplied action throws an exception,
2072 * then the returned CompletableFuture completes exceptionally
2073 * with a CompletionException holding the exception as its cause.
2074 *
2075 * @param other the other CompletableFuture
2076 * @param block the action to perform before completing the
2077 * returned CompletableFuture
2078 * @param executor the executor to use for asynchronous execution
2079 * @return the new CompletableFuture
2080 */
2081 public <U> CompletableFuture<Void> thenAcceptBothAsync
2082 (CompletableFuture<? extends U> other,
2083 BiConsumer<? super T, ? super U> block,
2084 Executor executor) {
2085 if (executor == null) throw new NullPointerException();
2086 return doThenBiAccept(other, block, executor);
2087 }
2088
2089 private <U> CompletableFuture<Void> doThenBiAccept
2090 (CompletableFuture<? extends U> other,
2091 BiConsumer<? super T,? super U> fn,
2092 Executor e) {
2093 if (other == null || fn == null) throw new NullPointerException();
2094 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2095 BiAcceptCompletion<T,U> d = null;
2096 Object r, s = null;
2097 if ((r = result) == null || (s = other.result) == null) {
2098 d = new BiAcceptCompletion<T,U>(this, other, fn, dst, e);
2099 CompletionNode q = null, p = new CompletionNode(d);
2100 while ((r == null && (r = result) == null) ||
2101 (s == null && (s = other.result) == null)) {
2102 if (q != null) {
2103 if (s != null ||
2104 UNSAFE.compareAndSwapObject
2105 (other, COMPLETIONS, q.next = other.completions, q))
2106 break;
2107 }
2108 else if (r != null ||
2109 UNSAFE.compareAndSwapObject
2110 (this, COMPLETIONS, p.next = completions, p)) {
2111 if (s != null)
2112 break;
2113 q = new CompletionNode(d);
2114 }
2115 }
2116 }
2117 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2118 T t; U u; Throwable ex;
2119 if (r instanceof AltResult) {
2120 ex = ((AltResult)r).ex;
2121 t = null;
2122 }
2123 else {
2124 ex = null;
2125 @SuppressWarnings("unchecked") T tr = (T) r;
2126 t = tr;
2127 }
2128 if (ex != null)
2129 u = null;
2130 else if (s instanceof AltResult) {
2131 ex = ((AltResult)s).ex;
2132 u = null;
2133 }
2134 else {
2135 @SuppressWarnings("unchecked") U us = (U) s;
2136 u = us;
2137 }
2138 if (ex == null) {
2139 try {
2140 if (e != null)
2141 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
2142 else
2143 fn.accept(t, u);
2144 } catch (Throwable rex) {
2145 ex = rex;
2146 }
2147 }
2148 if (e == null || ex != null)
2149 dst.internalComplete(null, ex);
2150 }
2151 helpPostComplete();
2152 other.helpPostComplete();
2153 return dst;
2154 }
2155
2156 /**
2157 * Returns a new CompletableFuture that is completed
2158 * when both this and the other given CompletableFuture complete,
2159 * after performing the given action.
2160 *
2161 * <p>If this and/or the other CompletableFuture complete
2162 * exceptionally, or the supplied action throws an exception,
2163 * then the returned CompletableFuture completes exceptionally
2164 * with a CompletionException holding the exception as its cause.
2165 *
2166 * @param other the other CompletableFuture
2167 * @param action the action to perform before completing the
2168 * returned CompletableFuture
2169 * @return the new CompletableFuture
2170 */
2171 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2172 Runnable action) {
2173 return doThenBiRun(other, action, null);
2174 }
2175
2176 /**
2177 * Returns a new CompletableFuture that is asynchronously completed
2178 * when both this and the other given CompletableFuture complete,
2179 * after performing the given action from a task running in the
2180 * {@link ForkJoinPool#commonPool()}.
2181 *
2182 * <p>If this and/or the other CompletableFuture complete
2183 * exceptionally, or the supplied action throws an exception,
2184 * then the returned CompletableFuture completes exceptionally
2185 * with a CompletionException holding the exception as its cause.
2186 *
2187 * @param other the other CompletableFuture
2188 * @param action the action to perform before completing the
2189 * returned CompletableFuture
2190 * @return the new CompletableFuture
2191 */
2192 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2193 Runnable action) {
2194 return doThenBiRun(other, action, ForkJoinPool.commonPool());
2195 }
2196
2197 /**
2198 * Returns a new CompletableFuture that is asynchronously completed
2199 * when both this and the other given CompletableFuture complete,
2200 * after performing the given action from a task running in the
2201 * given executor.
2202 *
2203 * <p>If this and/or the other CompletableFuture complete
2204 * exceptionally, or the supplied action throws an exception,
2205 * then the returned CompletableFuture completes exceptionally
2206 * with a CompletionException holding the exception as its cause.
2207 *
2208 * @param other the other CompletableFuture
2209 * @param action the action to perform before completing the
2210 * returned CompletableFuture
2211 * @param executor the executor to use for asynchronous execution
2212 * @return the new CompletableFuture
2213 */
2214 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2215 Runnable action,
2216 Executor executor) {
2217 if (executor == null) throw new NullPointerException();
2218 return doThenBiRun(other, action, executor);
2219 }
2220
2221 private CompletableFuture<Void> doThenBiRun(CompletableFuture<?> other,
2222 Runnable action,
2223 Executor e) {
2224 if (other == null || action == null) throw new NullPointerException();
2225 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2226 BiRunCompletion<T> d = null;
2227 Object r, s = null;
2228 if ((r = result) == null || (s = other.result) == null) {
2229 d = new BiRunCompletion<T>(this, other, action, dst, e);
2230 CompletionNode q = null, p = new CompletionNode(d);
2231 while ((r == null && (r = result) == null) ||
2232 (s == null && (s = other.result) == null)) {
2233 if (q != null) {
2234 if (s != null ||
2235 UNSAFE.compareAndSwapObject
2236 (other, COMPLETIONS, q.next = other.completions, q))
2237 break;
2238 }
2239 else if (r != null ||
2240 UNSAFE.compareAndSwapObject
2241 (this, COMPLETIONS, p.next = completions, p)) {
2242 if (s != null)
2243 break;
2244 q = new CompletionNode(d);
2245 }
2246 }
2247 }
2248 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2249 Throwable ex;
2250 if (r instanceof AltResult)
2251 ex = ((AltResult)r).ex;
2252 else
2253 ex = null;
2254 if (ex == null && (s instanceof AltResult))
2255 ex = ((AltResult)s).ex;
2256 if (ex == null) {
2257 try {
2258 if (e != null)
2259 e.execute(new AsyncRun(action, dst));
2260 else
2261 action.run();
2262 } catch (Throwable rex) {
2263 ex = rex;
2264 }
2265 }
2266 if (e == null || ex != null)
2267 dst.internalComplete(null, ex);
2268 }
2269 helpPostComplete();
2270 other.helpPostComplete();
2271 return dst;
2272 }
2273
2274 /**
2275 * Returns a new CompletableFuture that is completed
2276 * when either this or the other given CompletableFuture completes,
2277 * with the result of the given function of either this or the other
2278 * CompletableFuture's result.
2279 *
2280 * <p>If this and/or the other CompletableFuture complete
2281 * exceptionally, or the supplied function throws an exception,
2282 * then the returned CompletableFuture may also do so, with a
2283 * CompletionException holding one of these exceptions as its
2284 * cause. No guarantees are made about which result or exception
2285 * is used in the returned CompletableFuture.
2286 *
2287 * @param other the other CompletableFuture
2288 * @param fn the function to use to compute the value of
2289 * the returned CompletableFuture
2290 * @return the new CompletableFuture
2291 */
2292 public <U> CompletableFuture<U> applyToEither
2293 (CompletableFuture<? extends T> other,
2294 Function<? super T, U> fn) {
2295 return doOrApply(other, fn, null);
2296 }
2297
2298 /**
2299 * Returns a new CompletableFuture that is asynchronously completed
2300 * when either this or the other given CompletableFuture completes,
2301 * with the result of the given function of either this or the other
2302 * CompletableFuture's result from a task running in the
2303 * {@link ForkJoinPool#commonPool()}.
2304 *
2305 * <p>If this and/or the other CompletableFuture complete
2306 * exceptionally, or the supplied function throws an exception,
2307 * then the returned CompletableFuture may also do so, with a
2308 * CompletionException holding one of these exceptions as its
2309 * cause. No guarantees are made about which result or exception
2310 * is used in the returned CompletableFuture.
2311 *
2312 * @param other the other CompletableFuture
2313 * @param fn the function to use to compute the value of
2314 * the returned CompletableFuture
2315 * @return the new CompletableFuture
2316 */
2317 public <U> CompletableFuture<U> applyToEitherAsync
2318 (CompletableFuture<? extends T> other,
2319 Function<? super T, U> fn) {
2320 return doOrApply(other, fn, ForkJoinPool.commonPool());
2321 }
2322
2323 /**
2324 * Returns a new CompletableFuture that is asynchronously completed
2325 * when either this or the other given CompletableFuture completes,
2326 * with the result of the given function of either this or the other
2327 * CompletableFuture's result from a task running in the
2328 * given executor.
2329 *
2330 * <p>If this and/or the other CompletableFuture complete
2331 * exceptionally, or the supplied function throws an exception,
2332 * then the returned CompletableFuture may also do so, with a
2333 * CompletionException holding one of these exceptions as its
2334 * cause. No guarantees are made about which result or exception
2335 * is used in the returned CompletableFuture.
2336 *
2337 * @param other the other CompletableFuture
2338 * @param fn the function to use to compute the value of
2339 * the returned CompletableFuture
2340 * @param executor the executor to use for asynchronous execution
2341 * @return the new CompletableFuture
2342 */
2343 public <U> CompletableFuture<U> applyToEitherAsync
2344 (CompletableFuture<? extends T> other,
2345 Function<? super T, U> fn,
2346 Executor executor) {
2347 if (executor == null) throw new NullPointerException();
2348 return doOrApply(other, fn, executor);
2349 }
2350
2351 private <U> CompletableFuture<U> doOrApply
2352 (CompletableFuture<? extends T> other,
2353 Function<? super T, U> fn,
2354 Executor e) {
2355 if (other == null || fn == null) throw new NullPointerException();
2356 CompletableFuture<U> dst = new CompletableFuture<U>();
2357 OrApplyCompletion<T,U> d = null;
2358 Object r;
2359 if ((r = result) == null && (r = other.result) == null) {
2360 d = new OrApplyCompletion<T,U>(this, other, fn, dst, e);
2361 CompletionNode q = null, p = new CompletionNode(d);
2362 while ((r = result) == null && (r = other.result) == null) {
2363 if (q != null) {
2364 if (UNSAFE.compareAndSwapObject
2365 (other, COMPLETIONS, q.next = other.completions, q))
2366 break;
2367 }
2368 else if (UNSAFE.compareAndSwapObject
2369 (this, COMPLETIONS, p.next = completions, p))
2370 q = new CompletionNode(d);
2371 }
2372 }
2373 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2374 T t; Throwable ex;
2375 if (r instanceof AltResult) {
2376 ex = ((AltResult)r).ex;
2377 t = null;
2378 }
2379 else {
2380 ex = null;
2381 @SuppressWarnings("unchecked") T tr = (T) r;
2382 t = tr;
2383 }
2384 U u = null;
2385 if (ex == null) {
2386 try {
2387 if (e != null)
2388 e.execute(new AsyncApply<T,U>(t, fn, dst));
2389 else
2390 u = fn.apply(t);
2391 } catch (Throwable rex) {
2392 ex = rex;
2393 }
2394 }
2395 if (e == null || ex != null)
2396 dst.internalComplete(u, ex);
2397 }
2398 helpPostComplete();
2399 other.helpPostComplete();
2400 return dst;
2401 }
2402
2403 /**
2404 * Returns a new CompletableFuture that is completed
2405 * when either this or the other given CompletableFuture completes,
2406 * after performing the given action with the result of either this
2407 * or the other CompletableFuture's result.
2408 *
2409 * <p>If this and/or the other CompletableFuture complete
2410 * exceptionally, or the supplied action throws an exception,
2411 * then the returned CompletableFuture may also do so, with a
2412 * CompletionException holding one of these exceptions as its
2413 * cause. No guarantees are made about which result or exception
2414 * is used in the returned CompletableFuture.
2415 *
2416 * @param other the other CompletableFuture
2417 * @param block the action to perform before completing the
2418 * returned CompletableFuture
2419 * @return the new CompletableFuture
2420 */
2421 public CompletableFuture<Void> acceptEither
2422 (CompletableFuture<? extends T> other,
2423 Consumer<? super T> block) {
2424 return doOrAccept(other, block, null);
2425 }
2426
2427 /**
2428 * Returns a new CompletableFuture that is asynchronously completed
2429 * when either this or the other given CompletableFuture completes,
2430 * after performing the given action with the result of either this
2431 * or the other CompletableFuture's result from a task running in
2432 * the {@link ForkJoinPool#commonPool()}.
2433 *
2434 * <p>If this and/or the other CompletableFuture complete
2435 * exceptionally, or the supplied action throws an exception,
2436 * then the returned CompletableFuture may also do so, with a
2437 * CompletionException holding one of these exceptions as its
2438 * cause. No guarantees are made about which result or exception
2439 * is used in the returned CompletableFuture.
2440 *
2441 * @param other the other CompletableFuture
2442 * @param block the action to perform before completing the
2443 * returned CompletableFuture
2444 * @return the new CompletableFuture
2445 */
2446 public CompletableFuture<Void> acceptEitherAsync
2447 (CompletableFuture<? extends T> other,
2448 Consumer<? super T> block) {
2449 return doOrAccept(other, block, ForkJoinPool.commonPool());
2450 }
2451
2452 /**
2453 * Returns a new CompletableFuture that is asynchronously completed
2454 * when either this or the other given CompletableFuture completes,
2455 * after performing the given action with the result of either this
2456 * or the other CompletableFuture's result from a task running in
2457 * the given executor.
2458 *
2459 * <p>If this and/or the other CompletableFuture complete
2460 * exceptionally, or the supplied action throws an exception,
2461 * then the returned CompletableFuture may also do so, with a
2462 * CompletionException holding one of these exceptions as its
2463 * cause. No guarantees are made about which result or exception
2464 * is used in the returned CompletableFuture.
2465 *
2466 * @param other the other CompletableFuture
2467 * @param block the action to perform before completing the
2468 * returned CompletableFuture
2469 * @param executor the executor to use for asynchronous execution
2470 * @return the new CompletableFuture
2471 */
2472 public CompletableFuture<Void> acceptEitherAsync
2473 (CompletableFuture<? extends T> other,
2474 Consumer<? super T> block,
2475 Executor executor) {
2476 if (executor == null) throw new NullPointerException();
2477 return doOrAccept(other, block, executor);
2478 }
2479
2480 private CompletableFuture<Void> doOrAccept
2481 (CompletableFuture<? extends T> other,
2482 Consumer<? super T> fn,
2483 Executor e) {
2484 if (other == null || fn == null) throw new NullPointerException();
2485 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2486 OrAcceptCompletion<T> d = null;
2487 Object r;
2488 if ((r = result) == null && (r = other.result) == null) {
2489 d = new OrAcceptCompletion<T>(this, other, fn, dst, e);
2490 CompletionNode q = null, p = new CompletionNode(d);
2491 while ((r = result) == null && (r = other.result) == null) {
2492 if (q != null) {
2493 if (UNSAFE.compareAndSwapObject
2494 (other, COMPLETIONS, q.next = other.completions, q))
2495 break;
2496 }
2497 else if (UNSAFE.compareAndSwapObject
2498 (this, COMPLETIONS, p.next = completions, p))
2499 q = new CompletionNode(d);
2500 }
2501 }
2502 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2503 T t; Throwable ex;
2504 if (r instanceof AltResult) {
2505 ex = ((AltResult)r).ex;
2506 t = null;
2507 }
2508 else {
2509 ex = null;
2510 @SuppressWarnings("unchecked") T tr = (T) r;
2511 t = tr;
2512 }
2513 if (ex == null) {
2514 try {
2515 if (e != null)
2516 e.execute(new AsyncAccept<T>(t, fn, dst));
2517 else
2518 fn.accept(t);
2519 } catch (Throwable rex) {
2520 ex = rex;
2521 }
2522 }
2523 if (e == null || ex != null)
2524 dst.internalComplete(null, ex);
2525 }
2526 helpPostComplete();
2527 other.helpPostComplete();
2528 return dst;
2529 }
2530
2531 /**
2532 * Returns a new CompletableFuture that is completed
2533 * when either this or the other given CompletableFuture completes,
2534 * after performing the given action.
2535 *
2536 * <p>If this and/or the other CompletableFuture complete
2537 * exceptionally, or the supplied action throws an exception,
2538 * then the returned CompletableFuture may also do so, with a
2539 * CompletionException holding one of these exceptions as its
2540 * cause. No guarantees are made about which result or exception
2541 * is used in the returned CompletableFuture.
2542 *
2543 * @param other the other CompletableFuture
2544 * @param action the action to perform before completing the
2545 * returned CompletableFuture
2546 * @return the new CompletableFuture
2547 */
2548 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2549 Runnable action) {
2550 return doOrRun(other, action, null);
2551 }
2552
2553 /**
2554 * Returns a new CompletableFuture that is asynchronously completed
2555 * when either this or the other given CompletableFuture completes,
2556 * after performing the given action from a task running in the
2557 * {@link ForkJoinPool#commonPool()}.
2558 *
2559 * <p>If this and/or the other CompletableFuture complete
2560 * exceptionally, or the supplied action throws an exception,
2561 * then the returned CompletableFuture may also do so, with a
2562 * CompletionException holding one of these exceptions as its
2563 * cause. No guarantees are made about which result or exception
2564 * is used in the returned CompletableFuture.
2565 *
2566 * @param other the other CompletableFuture
2567 * @param action the action to perform before completing the
2568 * returned CompletableFuture
2569 * @return the new CompletableFuture
2570 */
2571 public CompletableFuture<Void> runAfterEitherAsync
2572 (CompletableFuture<?> other,
2573 Runnable action) {
2574 return doOrRun(other, action, ForkJoinPool.commonPool());
2575 }
2576
2577 /**
2578 * Returns a new CompletableFuture that is asynchronously completed
2579 * when either this or the other given CompletableFuture completes,
2580 * after performing the given action from a task running in the
2581 * given executor.
2582 *
2583 * <p>If this and/or the other CompletableFuture complete
2584 * exceptionally, or the supplied action throws an exception,
2585 * then the returned CompletableFuture may also do so, with a
2586 * CompletionException holding one of these exceptions as its
2587 * cause. No guarantees are made about which result or exception
2588 * is used in the returned CompletableFuture.
2589 *
2590 * @param other the other CompletableFuture
2591 * @param action the action to perform before completing the
2592 * returned CompletableFuture
2593 * @param executor the executor to use for asynchronous execution
2594 * @return the new CompletableFuture
2595 */
2596 public CompletableFuture<Void> runAfterEitherAsync
2597 (CompletableFuture<?> other,
2598 Runnable action,
2599 Executor executor) {
2600 if (executor == null) throw new NullPointerException();
2601 return doOrRun(other, action, executor);
2602 }
2603
2604 private CompletableFuture<Void> doOrRun(CompletableFuture<?> other,
2605 Runnable action,
2606 Executor e) {
2607 if (other == null || action == null) throw new NullPointerException();
2608 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2609 OrRunCompletion<T> d = null;
2610 Object r;
2611 if ((r = result) == null && (r = other.result) == null) {
2612 d = new OrRunCompletion<T>(this, other, action, dst, e);
2613 CompletionNode q = null, p = new CompletionNode(d);
2614 while ((r = result) == null && (r = other.result) == null) {
2615 if (q != null) {
2616 if (UNSAFE.compareAndSwapObject
2617 (other, COMPLETIONS, q.next = other.completions, q))
2618 break;
2619 }
2620 else if (UNSAFE.compareAndSwapObject
2621 (this, COMPLETIONS, p.next = completions, p))
2622 q = new CompletionNode(d);
2623 }
2624 }
2625 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2626 Throwable ex;
2627 if (r instanceof AltResult)
2628 ex = ((AltResult)r).ex;
2629 else
2630 ex = null;
2631 if (ex == null) {
2632 try {
2633 if (e != null)
2634 e.execute(new AsyncRun(action, dst));
2635 else
2636 action.run();
2637 } catch (Throwable rex) {
2638 ex = rex;
2639 }
2640 }
2641 if (e == null || ex != null)
2642 dst.internalComplete(null, ex);
2643 }
2644 helpPostComplete();
2645 other.helpPostComplete();
2646 return dst;
2647 }
2648
2649 /**
2650 * Returns a CompletableFuture that upon completion, has the same
2651 * value as produced by the given function of the result of this
2652 * CompletableFuture.
2653 *
2654 * <p>If this CompletableFuture completes exceptionally, then the
2655 * returned CompletableFuture also does so, with a
2656 * CompletionException holding this exception as its cause.
2657 * Similarly, if the computed CompletableFuture completes
2658 * exceptionally, then so does the returned CompletableFuture.
2659 *
2660 * @param fn the function returning a new CompletableFuture
2661 * @return the CompletableFuture
2662 */
2663 public <U> CompletableFuture<U> thenCompose
2664 (Function<? super T, CompletableFuture<U>> fn) {
2665 return doCompose(fn, null);
2666 }
2667
2668 /**
2669 * Returns a CompletableFuture that upon completion, has the same
2670 * value as that produced asynchronously using the {@link
2671 * ForkJoinPool#commonPool()} by the given function of the result
2672 * of this CompletableFuture.
2673 *
2674 * <p>If this CompletableFuture completes exceptionally, then the
2675 * returned CompletableFuture also does so, with a
2676 * CompletionException holding this exception as its cause.
2677 * Similarly, if the computed CompletableFuture completes
2678 * exceptionally, then so does the returned CompletableFuture.
2679 *
2680 * @param fn the function returning a new CompletableFuture
2681 * @return the CompletableFuture
2682 */
2683 public <U> CompletableFuture<U> thenComposeAsync
2684 (Function<? super T, CompletableFuture<U>> fn) {
2685 return doCompose(fn, ForkJoinPool.commonPool());
2686 }
2687
2688 /**
2689 * Returns a CompletableFuture that upon completion, has the same
2690 * value as that produced asynchronously using the given executor
2691 * by the given function of this CompletableFuture.
2692 *
2693 * <p>If this CompletableFuture completes exceptionally, then the
2694 * returned CompletableFuture also does so, with a
2695 * CompletionException holding this exception as its cause.
2696 * Similarly, if the computed CompletableFuture completes
2697 * exceptionally, then so does the returned CompletableFuture.
2698 *
2699 * @param fn the function returning a new CompletableFuture
2700 * @param executor the executor to use for asynchronous execution
2701 * @return the CompletableFuture
2702 */
2703 public <U> CompletableFuture<U> thenComposeAsync
2704 (Function<? super T, CompletableFuture<U>> fn,
2705 Executor executor) {
2706 if (executor == null) throw new NullPointerException();
2707 return doCompose(fn, executor);
2708 }
2709
2710 private <U> CompletableFuture<U> doCompose
2711 (Function<? super T, CompletableFuture<U>> fn,
2712 Executor e) {
2713 if (fn == null) throw new NullPointerException();
2714 CompletableFuture<U> dst = null;
2715 ComposeCompletion<T,U> d = null;
2716 Object r;
2717 if ((r = result) == null) {
2718 dst = new CompletableFuture<U>();
2719 CompletionNode p = new CompletionNode
2720 (d = new ComposeCompletion<T,U>(this, fn, dst, e));
2721 while ((r = result) == null) {
2722 if (UNSAFE.compareAndSwapObject
2723 (this, COMPLETIONS, p.next = completions, p))
2724 break;
2725 }
2726 }
2727 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2728 T t; Throwable ex;
2729 if (r instanceof AltResult) {
2730 ex = ((AltResult)r).ex;
2731 t = null;
2732 }
2733 else {
2734 ex = null;
2735 @SuppressWarnings("unchecked") T tr = (T) r;
2736 t = tr;
2737 }
2738 if (ex == null) {
2739 if (e != null) {
2740 if (dst == null)
2741 dst = new CompletableFuture<U>();
2742 e.execute(new AsyncCompose<T,U>(t, fn, dst));
2743 }
2744 else {
2745 try {
2746 if ((dst = fn.apply(t)) == null)
2747 ex = new NullPointerException();
2748 } catch (Throwable rex) {
2749 ex = rex;
2750 }
2751 if (dst == null)
2752 dst = new CompletableFuture<U>();
2753 }
2754 }
2755 if (e == null && ex != null)
2756 dst.internalComplete(null, ex);
2757 }
2758 helpPostComplete();
2759 dst.helpPostComplete();
2760 return dst;
2761 }
2762
2763 /**
2764 * Returns a new CompletableFuture that is completed when this
2765 * CompletableFuture completes, with the result of the given
2766 * function of the exception triggering this CompletableFuture's
2767 * completion when it completes exceptionally; otherwise, if this
2768 * CompletableFuture completes normally, then the returned
2769 * CompletableFuture also completes normally with the same value.
2770 *
2771 * @param fn the function to use to compute the value of the
2772 * returned CompletableFuture if this CompletableFuture completed
2773 * exceptionally
2774 * @return the new CompletableFuture
2775 */
2776 public CompletableFuture<T> exceptionally
2777 (Function<Throwable, ? extends T> fn) {
2778 if (fn == null) throw new NullPointerException();
2779 CompletableFuture<T> dst = new CompletableFuture<T>();
2780 ExceptionCompletion<T> d = null;
2781 Object r;
2782 if ((r = result) == null) {
2783 CompletionNode p =
2784 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2785 while ((r = result) == null) {
2786 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2787 p.next = completions, p))
2788 break;
2789 }
2790 }
2791 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2792 T t = null; Throwable ex, dx = null;
2793 if (r instanceof AltResult) {
2794 if ((ex = ((AltResult)r).ex) != null) {
2795 try {
2796 t = fn.apply(ex);
2797 } catch (Throwable rex) {
2798 dx = rex;
2799 }
2800 }
2801 }
2802 else {
2803 @SuppressWarnings("unchecked") T tr = (T) r;
2804 t = tr;
2805 }
2806 dst.internalComplete(t, dx);
2807 }
2808 helpPostComplete();
2809 return dst;
2810 }
2811
2812 /**
2813 * Returns a new CompletableFuture that is completed when this
2814 * CompletableFuture completes, with the result of the given
2815 * function of the result and exception of this CompletableFuture's
2816 * completion. The given function is invoked with the result (or
2817 * {@code null} if none) and the exception (or {@code null} if none)
2818 * of this CompletableFuture when complete.
2819 *
2820 * @param fn the function to use to compute the value of the
2821 * returned CompletableFuture
2822 * @return the new CompletableFuture
2823 */
2824 public <U> CompletableFuture<U> handle
2825 (BiFunction<? super T, Throwable, ? extends U> fn) {
2826 if (fn == null) throw new NullPointerException();
2827 CompletableFuture<U> dst = new CompletableFuture<U>();
2828 HandleCompletion<T,U> d = null;
2829 Object r;
2830 if ((r = result) == null) {
2831 CompletionNode p =
2832 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2833 while ((r = result) == null) {
2834 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2835 p.next = completions, p))
2836 break;
2837 }
2838 }
2839 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2840 T t; Throwable ex;
2841 if (r instanceof AltResult) {
2842 ex = ((AltResult)r).ex;
2843 t = null;
2844 }
2845 else {
2846 ex = null;
2847 @SuppressWarnings("unchecked") T tr = (T) r;
2848 t = tr;
2849 }
2850 U u; Throwable dx;
2851 try {
2852 u = fn.apply(t, ex);
2853 dx = null;
2854 } catch (Throwable rex) {
2855 dx = rex;
2856 u = null;
2857 }
2858 dst.internalComplete(u, dx);
2859 }
2860 helpPostComplete();
2861 return dst;
2862 }
2863
2864
2865 /* ------------- Arbitrary-arity constructions -------------- */
2866
2867 /*
2868 * The basic plan of attack is to recursively form binary
2869 * completion trees of elements. This can be overkill for small
2870 * sets, but scales nicely. The And/All vs Or/Any forms use the
2871 * same idea, but details differ.
2872 */
2873
2874 /**
2875 * Returns a new CompletableFuture that is completed when all of
2876 * the given CompletableFutures complete. If any of the given
2877 * CompletableFutures complete exceptionally, then the returned
2878 * CompletableFuture also does so, with a CompletionException
2879 * holding this exception as its cause. Otherwise, the results,
2880 * if any, of the given CompletableFutures are not reflected in
2881 * the returned CompletableFuture, but may be obtained by
2882 * inspecting them individually. If no CompletableFutures are
2883 * provided, returns a CompletableFuture completed with the value
2884 * {@code null}.
2885 *
2886 * <p>Among the applications of this method is to await completion
2887 * of a set of independent CompletableFutures before continuing a
2888 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2889 * c3).join();}.
2890 *
2891 * @param cfs the CompletableFutures
2892 * @return a new CompletableFuture that is completed when all of the
2893 * given CompletableFutures complete
2894 * @throws NullPointerException if the array or any of its elements are
2895 * {@code null}
2896 */
2897 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2898 int len = cfs.length; // Directly handle empty and singleton cases
2899 if (len > 1)
2900 return allTree(cfs, 0, len - 1);
2901 else {
2902 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2903 CompletableFuture<?> f;
2904 if (len == 0)
2905 dst.result = NIL;
2906 else if ((f = cfs[0]) == null)
2907 throw new NullPointerException();
2908 else {
2909 ThenCopy d = null;
2910 CompletionNode p = null;
2911 Object r;
2912 while ((r = f.result) == null) {
2913 if (d == null)
2914 d = new ThenCopy(f, dst);
2915 else if (p == null)
2916 p = new CompletionNode(d);
2917 else if (UNSAFE.compareAndSwapObject
2918 (f, COMPLETIONS, p.next = f.completions, p))
2919 break;
2920 }
2921 if (r != null && (d == null || d.compareAndSet(0, 1)))
2922 dst.internalComplete(null, (r instanceof AltResult) ?
2923 ((AltResult)r).ex : null);
2924 f.helpPostComplete();
2925 }
2926 return dst;
2927 }
2928 }
2929
2930 /**
2931 * Recursively constructs an And'ed tree of CompletableFutures.
2932 * Called only when array known to have at least two elements.
2933 */
2934 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2935 int lo, int hi) {
2936 CompletableFuture<?> fst, snd;
2937 int mid = (lo + hi) >>> 1;
2938 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2939 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2940 throw new NullPointerException();
2941 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2942 AndCompletion d = null;
2943 CompletionNode p = null, q = null;
2944 Object r = null, s = null;
2945 while ((r = fst.result) == null || (s = snd.result) == null) {
2946 if (d == null)
2947 d = new AndCompletion(fst, snd, dst);
2948 else if (p == null)
2949 p = new CompletionNode(d);
2950 else if (q == null) {
2951 if (UNSAFE.compareAndSwapObject
2952 (fst, COMPLETIONS, p.next = fst.completions, p))
2953 q = new CompletionNode(d);
2954 }
2955 else if (UNSAFE.compareAndSwapObject
2956 (snd, COMPLETIONS, q.next = snd.completions, q))
2957 break;
2958 }
2959 if ((r != null || (r = fst.result) != null) &&
2960 (s != null || (s = snd.result) != null) &&
2961 (d == null || d.compareAndSet(0, 1))) {
2962 Throwable ex;
2963 if (r instanceof AltResult)
2964 ex = ((AltResult)r).ex;
2965 else
2966 ex = null;
2967 if (ex == null && (s instanceof AltResult))
2968 ex = ((AltResult)s).ex;
2969 dst.internalComplete(null, ex);
2970 }
2971 fst.helpPostComplete();
2972 snd.helpPostComplete();
2973 return dst;
2974 }
2975
2976 /**
2977 * Returns a new CompletableFuture that is completed when any of the
2978 * given CompletableFutures complete, with the same result if it
2979 * completed normally. Otherwise, if it completed exceptionally,
2980 * the returned CompletableFuture also does so, with a
2981 * CompletionException holding this exception as its cause. If no
2982 * CompletableFutures are provided, returns an incomplete
2983 * CompletableFuture.
2984 *
2985 * @param cfs the CompletableFutures
2986 * @return a new CompletableFuture that is completed when any of the
2987 * given CompletableFutures complete
2988 * @throws NullPointerException if the array or any of its elements are
2989 * {@code null}
2990 */
2991 public static CompletableFuture<?> anyOf(CompletableFuture<?>... cfs) {
2992 int len = cfs.length; // Same idea as allOf
2993 if (len > 1)
2994 return anyTree(cfs, 0, len - 1);
2995 else {
2996 CompletableFuture<?> dst = new CompletableFuture<Object>();
2997 CompletableFuture<?> f;
2998 if (len == 0)
2999 ; // skip
3000 else if ((f = cfs[0]) == null)
3001 throw new NullPointerException();
3002 else {
3003 ThenCopy d = null;
3004 CompletionNode p = null;
3005 Object r;
3006 while ((r = f.result) == null) {
3007 if (d == null)
3008 d = new ThenCopy(f, dst);
3009 else if (p == null)
3010 p = new CompletionNode(d);
3011 else if (UNSAFE.compareAndSwapObject
3012 (f, COMPLETIONS, p.next = f.completions, p))
3013 break;
3014 }
3015 if (r != null && (d == null || d.compareAndSet(0, 1))) {
3016 Throwable ex; Object t;
3017 if (r instanceof AltResult) {
3018 ex = ((AltResult)r).ex;
3019 t = null;
3020 }
3021 else {
3022 ex = null;
3023 t = r;
3024 }
3025 dst.internalComplete(t, ex);
3026 }
3027 f.helpPostComplete();
3028 }
3029 return dst;
3030 }
3031 }
3032
3033 /**
3034 * Recursively constructs an Or'ed tree of CompletableFutures.
3035 */
3036 private static CompletableFuture<?> anyTree(CompletableFuture<?>[] cfs,
3037 int lo, int hi) {
3038 CompletableFuture<?> fst, snd;
3039 int mid = (lo + hi) >>> 1;
3040 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
3041 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
3042 throw new NullPointerException();
3043 CompletableFuture<?> dst = new CompletableFuture<Object>();
3044 OrCompletion d = null;
3045 CompletionNode p = null, q = null;
3046 Object r;
3047 while ((r = fst.result) == null && (r = snd.result) == null) {
3048 if (d == null)
3049 d = new OrCompletion(fst, snd, dst);
3050 else if (p == null)
3051 p = new CompletionNode(d);
3052 else if (q == null) {
3053 if (UNSAFE.compareAndSwapObject
3054 (fst, COMPLETIONS, p.next = fst.completions, p))
3055 q = new CompletionNode(d);
3056 }
3057 else if (UNSAFE.compareAndSwapObject
3058 (snd, COMPLETIONS, q.next = snd.completions, q))
3059 break;
3060 }
3061 if ((r != null || (r = fst.result) != null ||
3062 (r = snd.result) != null) &&
3063 (d == null || d.compareAndSet(0, 1))) {
3064 Throwable ex; Object t;
3065 if (r instanceof AltResult) {
3066 ex = ((AltResult)r).ex;
3067 t = null;
3068 }
3069 else {
3070 ex = null;
3071 t = r;
3072 }
3073 dst.internalComplete(t, ex);
3074 }
3075 fst.helpPostComplete();
3076 snd.helpPostComplete();
3077 return dst;
3078 }
3079
3080
3081 /* ------------- Control and status methods -------------- */
3082
3083 /**
3084 * If not already completed, completes this CompletableFuture with
3085 * a {@link CancellationException}. Dependent CompletableFutures
3086 * that have not already completed will also complete
3087 * exceptionally, with a {@link CompletionException} caused by
3088 * this {@code CancellationException}.
3089 *
3090 * @param mayInterruptIfRunning this value has no effect in this
3091 * implementation because interrupts are not used to control
3092 * processing.
3093 *
3094 * @return {@code true} if this task is now cancelled
3095 */
3096 public boolean cancel(boolean mayInterruptIfRunning) {
3097 boolean cancelled = (result == null) &&
3098 UNSAFE.compareAndSwapObject
3099 (this, RESULT, null, new AltResult(new CancellationException()));
3100 postComplete();
3101 return cancelled || isCancelled();
3102 }
3103
3104 /**
3105 * Returns {@code true} if this CompletableFuture was cancelled
3106 * before it completed normally.
3107 *
3108 * @return {@code true} if this CompletableFuture was cancelled
3109 * before it completed normally
3110 */
3111 public boolean isCancelled() {
3112 Object r;
3113 return ((r = result) instanceof AltResult) &&
3114 (((AltResult)r).ex instanceof CancellationException);
3115 }
3116
3117 /**
3118 * Forcibly sets or resets the value subsequently returned by
3119 * method {@link #get()} and related methods, whether or not
3120 * already completed. This method is designed for use only in
3121 * error recovery actions, and even in such situations may result
3122 * in ongoing dependent completions using established versus
3123 * overwritten outcomes.
3124 *
3125 * @param value the completion value
3126 */
3127 public void obtrudeValue(T value) {
3128 result = (value == null) ? NIL : value;
3129 postComplete();
3130 }
3131
3132 /**
3133 * Forcibly causes subsequent invocations of method {@link #get()}
3134 * and related methods to throw the given exception, whether or
3135 * not already completed. This method is designed for use only in
3136 * recovery actions, and even in such situations may result in
3137 * ongoing dependent completions using established versus
3138 * overwritten outcomes.
3139 *
3140 * @param ex the exception
3141 */
3142 public void obtrudeException(Throwable ex) {
3143 if (ex == null) throw new NullPointerException();
3144 result = new AltResult(ex);
3145 postComplete();
3146 }
3147
3148 /**
3149 * Returns the estimated number of CompletableFutures whose
3150 * completions are awaiting completion of this CompletableFuture.
3151 * This method is designed for use in monitoring system state, not
3152 * for synchronization control.
3153 *
3154 * @return the number of dependent CompletableFutures
3155 */
3156 public int getNumberOfDependents() {
3157 int count = 0;
3158 for (CompletionNode p = completions; p != null; p = p.next)
3159 ++count;
3160 return count;
3161 }
3162
3163 /**
3164 * Returns a string identifying this CompletableFuture, as well as
3165 * its completion state. The state, in brackets, contains the
3166 * String {@code "Completed Normally"} or the String {@code
3167 * "Completed Exceptionally"}, or the String {@code "Not
3168 * completed"} followed by the number of CompletableFutures
3169 * dependent upon its completion, if any.
3170 *
3171 * @return a string identifying this CompletableFuture, as well as its state
3172 */
3173 public String toString() {
3174 Object r = result;
3175 int count;
3176 return super.toString() +
3177 ((r == null) ?
3178 (((count = getNumberOfDependents()) == 0) ?
3179 "[Not completed]" :
3180 "[Not completed, " + count + " dependents]") :
3181 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
3182 "[Completed exceptionally]" :
3183 "[Completed normally]"));
3184 }
3185
3186 // Unsafe mechanics
3187 private static final sun.misc.Unsafe UNSAFE;
3188 private static final long RESULT;
3189 private static final long WAITERS;
3190 private static final long COMPLETIONS;
3191 static {
3192 try {
3193 UNSAFE = sun.misc.Unsafe.getUnsafe();
3194 Class<?> k = CompletableFuture.class;
3195 RESULT = UNSAFE.objectFieldOffset
3196 (k.getDeclaredField("result"));
3197 WAITERS = UNSAFE.objectFieldOffset
3198 (k.getDeclaredField("waiters"));
3199 COMPLETIONS = UNSAFE.objectFieldOffset
3200 (k.getDeclaredField("completions"));
3201 } catch (Exception e) {
3202 throw new Error(e);
3203 }
3204 }
3205 }