ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.67
Committed: Mon Mar 18 18:07:39 2013 UTC (11 years, 2 months ago) by dl
Branch: MAIN
Changes since 1.66: +2 -0 lines
Log Message:
Wait if necessary in asyncCompose

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.locks.LockSupport;
24
25 /**
26 * A {@link Future} that may be explicitly completed (setting its
27 * value and status), and may include dependent functions and actions
28 * that trigger upon its completion.
29 *
30 * <p>When two or more threads attempt to
31 * {@link #complete complete},
32 * {@link #completeExceptionally completeExceptionally}, or
33 * {@link #cancel cancel}
34 * a CompletableFuture, only one of them succeeds.
35 *
36 * <p>Methods are available for adding dependents based on
37 * user-provided Functions, Consumers, or Runnables. The appropriate
38 * form to use depends on whether actions require arguments and/or
39 * produce results. Completion of a dependent action will trigger the
40 * completion of another CompletableFuture. Actions may also be
41 * triggered after either or both the current and another
42 * CompletableFuture complete. Multiple CompletableFutures may also
43 * be grouped as one using {@link #anyOf(CompletableFuture...)} and
44 * {@link #allOf(CompletableFuture...)}.
45 *
46 * <p>CompletableFutures themselves do not execute asynchronously.
47 * However, actions supplied for dependent completions of another
48 * CompletableFuture may do so, depending on whether they are provided
49 * via one of the <em>async</em> methods (that is, methods with names
50 * of the form <tt><var>xxx</var>Async</tt>). The <em>async</em>
51 * methods provide a way to commence asynchronous processing of an
52 * action using either a given {@link Executor} or by default the
53 * {@link ForkJoinPool#commonPool()}. To simplify monitoring,
54 * debugging, and tracking, all generated asynchronous tasks are
55 * instances of the marker interface {@link AsynchronousCompletionTask}.
56 *
57 * <p>Actions supplied for dependent completions of <em>non-async</em>
58 * methods may be performed by the thread that completes the current
59 * CompletableFuture, or by any other caller of these methods. There
60 * are no guarantees about the order of processing completions unless
61 * constrained by these methods.
62 *
63 * <p>Since (unlike {@link FutureTask}) this class has no direct
64 * control over the computation that causes it to be completed,
65 * cancellation is treated as just another form of exceptional completion.
66 * Method {@link #cancel cancel} has the same effect as
67 * {@code completeExceptionally(new CancellationException())}.
68 *
69 * <p>Upon exceptional completion (including cancellation), or when a
70 * completion entails an additional computation which terminates
71 * abruptly with an (unchecked) exception or error, then all of their
72 * dependent completions (and their dependents in turn) generally act
73 * as {@code completeExceptionally} with a {@link CompletionException}
74 * holding that exception as its cause. However, the {@link
75 * #exceptionally exceptionally} and {@link #handle handle}
76 * completions <em>are</em> able to handle exceptional completions of
77 * the CompletableFutures they depend on.
78 *
79 * <p>In case of exceptional completion with a CompletionException,
80 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
81 * {@link ExecutionException} with the same cause as held in the
82 * corresponding CompletionException. However, in these cases,
83 * methods {@link #join()} and {@link #getNow} throw the
84 * CompletionException, which simplifies usage.
85 *
86 * @author Doug Lea
87 * @since 1.8
88 */
89 public class CompletableFuture<T> implements Future<T> {
90
91 /*
92 * Overview:
93 *
94 * 1. Non-nullness of field result (set via CAS) indicates done.
95 * An AltResult is used to box null as a result, as well as to
96 * hold exceptions. Using a single field makes completion fast
97 * and simple to detect and trigger, at the expense of a lot of
98 * encoding and decoding that infiltrates many methods. One minor
99 * simplification relies on the (static) NIL (to box null results)
100 * being the only AltResult with a null exception field, so we
101 * don't usually need explicit comparisons with NIL. The CF
102 * exception propagation mechanics surrounding decoding rely on
103 * unchecked casts of decoded results really being unchecked,
104 * where user type errors are caught at point of use, as is
105 * currently the case in Java. These are highlighted by using
106 * SuppressWarnings-annotated temporaries.
107 *
108 * 2. Waiters are held in a Treiber stack similar to the one used
109 * in FutureTask, Phaser, and SynchronousQueue. See their
110 * internal documentation for algorithmic details.
111 *
112 * 3. Completions are also kept in a list/stack, and pulled off
113 * and run when completion is triggered. (We could even use the
114 * same stack as for waiters, but would give up the potential
115 * parallelism obtained because woken waiters help release/run
116 * others -- see method postComplete). Because post-processing
117 * may race with direct calls, class Completion opportunistically
118 * extends AtomicInteger so callers can claim the action via
119 * compareAndSet(0, 1). The Completion.run methods are all
120 * written a boringly similar uniform way (that sometimes includes
121 * unnecessary-looking checks, kept to maintain uniformity).
122 * There are enough dimensions upon which they differ that
123 * attempts to factor commonalities while maintaining efficiency
124 * require more lines of code than they would save.
125 *
126 * 4. The exported then/and/or methods do support a bit of
127 * factoring (see doThenApply etc). They must cope with the
128 * intrinsic races surrounding addition of a dependent action
129 * versus performing the action directly because the task is
130 * already complete. For example, a CF may not be complete upon
131 * entry, so a dependent completion is added, but by the time it
132 * is added, the target CF is complete, so must be directly
133 * executed. This is all done while avoiding unnecessary object
134 * construction in safe-bypass cases.
135 */
136
137 // preliminaries
138
139 static final class AltResult {
140 final Throwable ex; // null only for NIL
141 AltResult(Throwable ex) { this.ex = ex; }
142 }
143
144 static final AltResult NIL = new AltResult(null);
145
146 // Fields
147
148 volatile Object result; // Either the result or boxed AltResult
149 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
150 volatile CompletionNode completions; // list (Treiber stack) of completions
151
152 // Basic utilities for triggering and processing completions
153
154 /**
155 * Removes and signals all waiting threads and runs all completions.
156 */
157 final void postComplete() {
158 WaitNode q; Thread t;
159 while ((q = waiters) != null) {
160 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
161 (t = q.thread) != null) {
162 q.thread = null;
163 LockSupport.unpark(t);
164 }
165 }
166
167 CompletionNode h; Completion c;
168 while ((h = completions) != null) {
169 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
170 (c = h.completion) != null)
171 c.run();
172 }
173 }
174
175 /**
176 * Triggers completion with the encoding of the given arguments:
177 * if the exception is non-null, encodes it as a wrapped
178 * CompletionException unless it is one already. Otherwise uses
179 * the given result, boxed as NIL if null.
180 */
181 final void internalComplete(Object v, Throwable ex) {
182 if (result == null)
183 UNSAFE.compareAndSwapObject
184 (this, RESULT, null,
185 (ex == null) ? (v == null) ? NIL : v :
186 new AltResult((ex instanceof CompletionException) ? ex :
187 new CompletionException(ex)));
188 postComplete(); // help out even if not triggered
189 }
190
191 /**
192 * If triggered, helps release and/or process completions.
193 */
194 final void helpPostComplete() {
195 if (result != null)
196 postComplete();
197 }
198
199 /* ------------- waiting for completions -------------- */
200
201 /** Number of processors, for spin control */
202 static final int NCPU = Runtime.getRuntime().availableProcessors();
203
204 /**
205 * Heuristic spin value for waitingGet() before blocking on
206 * multiprocessors
207 */
208 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
209
210 /**
211 * Linked nodes to record waiting threads in a Treiber stack. See
212 * other classes such as Phaser and SynchronousQueue for more
213 * detailed explanation. This class implements ManagedBlocker to
214 * avoid starvation when blocking actions pile up in
215 * ForkJoinPools.
216 */
217 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
218 long nanos; // wait time if timed
219 final long deadline; // non-zero if timed
220 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
221 volatile Thread thread;
222 volatile WaitNode next;
223 WaitNode(boolean interruptible, long nanos, long deadline) {
224 this.thread = Thread.currentThread();
225 this.interruptControl = interruptible ? 1 : 0;
226 this.nanos = nanos;
227 this.deadline = deadline;
228 }
229 public boolean isReleasable() {
230 if (thread == null)
231 return true;
232 if (Thread.interrupted()) {
233 int i = interruptControl;
234 interruptControl = -1;
235 if (i > 0)
236 return true;
237 }
238 if (deadline != 0L &&
239 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
240 thread = null;
241 return true;
242 }
243 return false;
244 }
245 public boolean block() {
246 if (isReleasable())
247 return true;
248 else if (deadline == 0L)
249 LockSupport.park(this);
250 else if (nanos > 0L)
251 LockSupport.parkNanos(this, nanos);
252 return isReleasable();
253 }
254 }
255
256 /**
257 * Returns raw result after waiting, or null if interruptible and
258 * interrupted.
259 */
260 private Object waitingGet(boolean interruptible) {
261 WaitNode q = null;
262 boolean queued = false;
263 int spins = SPINS;
264 for (Object r;;) {
265 if ((r = result) != null) {
266 if (q != null) { // suppress unpark
267 q.thread = null;
268 if (q.interruptControl < 0) {
269 if (interruptible) {
270 removeWaiter(q);
271 return null;
272 }
273 Thread.currentThread().interrupt();
274 }
275 }
276 postComplete(); // help release others
277 return r;
278 }
279 else if (spins > 0) {
280 int rnd = ThreadLocalRandom.nextSecondarySeed();
281 if (rnd == 0)
282 rnd = ThreadLocalRandom.current().nextInt();
283 if (rnd >= 0)
284 --spins;
285 }
286 else if (q == null)
287 q = new WaitNode(interruptible, 0L, 0L);
288 else if (!queued)
289 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
290 q.next = waiters, q);
291 else if (interruptible && q.interruptControl < 0) {
292 removeWaiter(q);
293 return null;
294 }
295 else if (q.thread != null && result == null) {
296 try {
297 ForkJoinPool.managedBlock(q);
298 } catch (InterruptedException ex) {
299 q.interruptControl = -1;
300 }
301 }
302 }
303 }
304
305 /**
306 * Awaits completion or aborts on interrupt or timeout.
307 *
308 * @param nanos time to wait
309 * @return raw result
310 */
311 private Object timedAwaitDone(long nanos)
312 throws InterruptedException, TimeoutException {
313 WaitNode q = null;
314 boolean queued = false;
315 for (Object r;;) {
316 if ((r = result) != null) {
317 if (q != null) {
318 q.thread = null;
319 if (q.interruptControl < 0) {
320 removeWaiter(q);
321 throw new InterruptedException();
322 }
323 }
324 postComplete();
325 return r;
326 }
327 else if (q == null) {
328 if (nanos <= 0L)
329 throw new TimeoutException();
330 long d = System.nanoTime() + nanos;
331 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
332 }
333 else if (!queued)
334 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
335 q.next = waiters, q);
336 else if (q.interruptControl < 0) {
337 removeWaiter(q);
338 throw new InterruptedException();
339 }
340 else if (q.nanos <= 0L) {
341 if (result == null) {
342 removeWaiter(q);
343 throw new TimeoutException();
344 }
345 }
346 else if (q.thread != null && result == null) {
347 try {
348 ForkJoinPool.managedBlock(q);
349 } catch (InterruptedException ex) {
350 q.interruptControl = -1;
351 }
352 }
353 }
354 }
355
356 /**
357 * Tries to unlink a timed-out or interrupted wait node to avoid
358 * accumulating garbage. Internal nodes are simply unspliced
359 * without CAS since it is harmless if they are traversed anyway
360 * by releasers. To avoid effects of unsplicing from already
361 * removed nodes, the list is retraversed in case of an apparent
362 * race. This is slow when there are a lot of nodes, but we don't
363 * expect lists to be long enough to outweigh higher-overhead
364 * schemes.
365 */
366 private void removeWaiter(WaitNode node) {
367 if (node != null) {
368 node.thread = null;
369 retry:
370 for (;;) { // restart on removeWaiter race
371 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
372 s = q.next;
373 if (q.thread != null)
374 pred = q;
375 else if (pred != null) {
376 pred.next = s;
377 if (pred.thread == null) // check for race
378 continue retry;
379 }
380 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
381 continue retry;
382 }
383 break;
384 }
385 }
386 }
387
388 /* ------------- Async tasks -------------- */
389
390 /**
391 * A marker interface identifying asynchronous tasks produced by
392 * {@code async} methods. This may be useful for monitoring,
393 * debugging, and tracking asynchronous activities.
394 *
395 * @since 1.8
396 */
397 public static interface AsynchronousCompletionTask {
398 }
399
400 /** Base class can act as either FJ or plain Runnable */
401 abstract static class Async extends ForkJoinTask<Void>
402 implements Runnable, AsynchronousCompletionTask {
403 public final Void getRawResult() { return null; }
404 public final void setRawResult(Void v) { }
405 public final void run() { exec(); }
406 }
407
408 static final class AsyncRun extends Async {
409 final Runnable fn;
410 final CompletableFuture<Void> dst;
411 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
412 this.fn = fn; this.dst = dst;
413 }
414 public final boolean exec() {
415 CompletableFuture<Void> d; Throwable ex;
416 if ((d = this.dst) != null && d.result == null) {
417 try {
418 fn.run();
419 ex = null;
420 } catch (Throwable rex) {
421 ex = rex;
422 }
423 d.internalComplete(null, ex);
424 }
425 return true;
426 }
427 private static final long serialVersionUID = 5232453952276885070L;
428 }
429
430 static final class AsyncSupply<U> extends Async {
431 final Supplier<U> fn;
432 final CompletableFuture<U> dst;
433 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
434 this.fn = fn; this.dst = dst;
435 }
436 public final boolean exec() {
437 CompletableFuture<U> d; U u; Throwable ex;
438 if ((d = this.dst) != null && d.result == null) {
439 try {
440 u = fn.get();
441 ex = null;
442 } catch (Throwable rex) {
443 ex = rex;
444 u = null;
445 }
446 d.internalComplete(u, ex);
447 }
448 return true;
449 }
450 private static final long serialVersionUID = 5232453952276885070L;
451 }
452
453 static final class AsyncApply<T,U> extends Async {
454 final T arg;
455 final Function<? super T,? extends U> fn;
456 final CompletableFuture<U> dst;
457 AsyncApply(T arg, Function<? super T,? extends U> fn,
458 CompletableFuture<U> dst) {
459 this.arg = arg; this.fn = fn; this.dst = dst;
460 }
461 public final boolean exec() {
462 CompletableFuture<U> d; U u; Throwable ex;
463 if ((d = this.dst) != null && d.result == null) {
464 try {
465 u = fn.apply(arg);
466 ex = null;
467 } catch (Throwable rex) {
468 ex = rex;
469 u = null;
470 }
471 d.internalComplete(u, ex);
472 }
473 return true;
474 }
475 private static final long serialVersionUID = 5232453952276885070L;
476 }
477
478 static final class AsyncBiApply<T,U,V> extends Async {
479 final T arg1;
480 final U arg2;
481 final BiFunction<? super T,? super U,? extends V> fn;
482 final CompletableFuture<V> dst;
483 AsyncBiApply(T arg1, U arg2,
484 BiFunction<? super T,? super U,? extends V> fn,
485 CompletableFuture<V> dst) {
486 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
487 }
488 public final boolean exec() {
489 CompletableFuture<V> d; V v; Throwable ex;
490 if ((d = this.dst) != null && d.result == null) {
491 try {
492 v = fn.apply(arg1, arg2);
493 ex = null;
494 } catch (Throwable rex) {
495 ex = rex;
496 v = null;
497 }
498 d.internalComplete(v, ex);
499 }
500 return true;
501 }
502 private static final long serialVersionUID = 5232453952276885070L;
503 }
504
505 static final class AsyncAccept<T> extends Async {
506 final T arg;
507 final Consumer<? super T> fn;
508 final CompletableFuture<Void> dst;
509 AsyncAccept(T arg, Consumer<? super T> fn,
510 CompletableFuture<Void> dst) {
511 this.arg = arg; this.fn = fn; this.dst = dst;
512 }
513 public final boolean exec() {
514 CompletableFuture<Void> d; Throwable ex;
515 if ((d = this.dst) != null && d.result == null) {
516 try {
517 fn.accept(arg);
518 ex = null;
519 } catch (Throwable rex) {
520 ex = rex;
521 }
522 d.internalComplete(null, ex);
523 }
524 return true;
525 }
526 private static final long serialVersionUID = 5232453952276885070L;
527 }
528
529 static final class AsyncBiAccept<T,U> extends Async {
530 final T arg1;
531 final U arg2;
532 final BiConsumer<? super T,? super U> fn;
533 final CompletableFuture<Void> dst;
534 AsyncBiAccept(T arg1, U arg2,
535 BiConsumer<? super T,? super U> fn,
536 CompletableFuture<Void> dst) {
537 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
538 }
539 public final boolean exec() {
540 CompletableFuture<Void> d; Throwable ex;
541 if ((d = this.dst) != null && d.result == null) {
542 try {
543 fn.accept(arg1, arg2);
544 ex = null;
545 } catch (Throwable rex) {
546 ex = rex;
547 }
548 d.internalComplete(null, ex);
549 }
550 return true;
551 }
552 private static final long serialVersionUID = 5232453952276885070L;
553 }
554
555 static final class AsyncCompose<T,U> extends Async {
556 final T arg;
557 final Function<? super T, CompletableFuture<U>> fn;
558 final CompletableFuture<U> dst;
559 AsyncCompose(T arg,
560 Function<? super T, CompletableFuture<U>> fn,
561 CompletableFuture<U> dst) {
562 this.arg = arg; this.fn = fn; this.dst = dst;
563 }
564 public final boolean exec() {
565 CompletableFuture<U> d, fr; U u; Throwable ex;
566 if ((d = this.dst) != null && d.result == null) {
567 try {
568 fr = fn.apply(arg);
569 ex = (fr == null) ? new NullPointerException() : null;
570 } catch (Throwable rex) {
571 ex = rex;
572 fr = null;
573 }
574 if (ex != null)
575 u = null;
576 else {
577 Object r = fr.result;
578 if (r == null)
579 r = fr.waitingGet(false);
580 if (r instanceof AltResult) {
581 ex = ((AltResult)r).ex;
582 u = null;
583 }
584 else {
585 @SuppressWarnings("unchecked") U ur = (U) r;
586 u = ur;
587 }
588 }
589 d.internalComplete(u, ex);
590 }
591 return true;
592 }
593 private static final long serialVersionUID = 5232453952276885070L;
594 }
595
596 /* ------------- Completions -------------- */
597
598 /**
599 * Simple linked list nodes to record completions, used in
600 * basically the same way as WaitNodes. (We separate nodes from
601 * the Completions themselves mainly because for the And and Or
602 * methods, the same Completion object resides in two lists.)
603 */
604 static final class CompletionNode {
605 final Completion completion;
606 volatile CompletionNode next;
607 CompletionNode(Completion completion) { this.completion = completion; }
608 }
609
610 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
611 abstract static class Completion extends AtomicInteger implements Runnable {
612 }
613
614 static final class ApplyCompletion<T,U> extends Completion {
615 final CompletableFuture<? extends T> src;
616 final Function<? super T,? extends U> fn;
617 final CompletableFuture<U> dst;
618 final Executor executor;
619 ApplyCompletion(CompletableFuture<? extends T> src,
620 Function<? super T,? extends U> fn,
621 CompletableFuture<U> dst, Executor executor) {
622 this.src = src; this.fn = fn; this.dst = dst;
623 this.executor = executor;
624 }
625 public final void run() {
626 final CompletableFuture<? extends T> a;
627 final Function<? super T,? extends U> fn;
628 final CompletableFuture<U> dst;
629 Object r; T t; Throwable ex;
630 if ((dst = this.dst) != null &&
631 (fn = this.fn) != null &&
632 (a = this.src) != null &&
633 (r = a.result) != null &&
634 compareAndSet(0, 1)) {
635 if (r instanceof AltResult) {
636 ex = ((AltResult)r).ex;
637 t = null;
638 }
639 else {
640 ex = null;
641 @SuppressWarnings("unchecked") T tr = (T) r;
642 t = tr;
643 }
644 Executor e = executor;
645 U u = null;
646 if (ex == null) {
647 try {
648 if (e != null)
649 e.execute(new AsyncApply<T,U>(t, fn, dst));
650 else
651 u = fn.apply(t);
652 } catch (Throwable rex) {
653 ex = rex;
654 }
655 }
656 if (e == null || ex != null)
657 dst.internalComplete(u, ex);
658 }
659 }
660 private static final long serialVersionUID = 5232453952276885070L;
661 }
662
663 static final class AcceptCompletion<T> extends Completion {
664 final CompletableFuture<? extends T> src;
665 final Consumer<? super T> fn;
666 final CompletableFuture<Void> dst;
667 final Executor executor;
668 AcceptCompletion(CompletableFuture<? extends T> src,
669 Consumer<? super T> fn,
670 CompletableFuture<Void> dst, Executor executor) {
671 this.src = src; this.fn = fn; this.dst = dst;
672 this.executor = executor;
673 }
674 public final void run() {
675 final CompletableFuture<? extends T> a;
676 final Consumer<? super T> fn;
677 final CompletableFuture<Void> dst;
678 Object r; T t; Throwable ex;
679 if ((dst = this.dst) != null &&
680 (fn = this.fn) != null &&
681 (a = this.src) != null &&
682 (r = a.result) != null &&
683 compareAndSet(0, 1)) {
684 if (r instanceof AltResult) {
685 ex = ((AltResult)r).ex;
686 t = null;
687 }
688 else {
689 ex = null;
690 @SuppressWarnings("unchecked") T tr = (T) r;
691 t = tr;
692 }
693 Executor e = executor;
694 if (ex == null) {
695 try {
696 if (e != null)
697 e.execute(new AsyncAccept<T>(t, fn, dst));
698 else
699 fn.accept(t);
700 } catch (Throwable rex) {
701 ex = rex;
702 }
703 }
704 if (e == null || ex != null)
705 dst.internalComplete(null, ex);
706 }
707 }
708 private static final long serialVersionUID = 5232453952276885070L;
709 }
710
711 static final class RunCompletion<T> extends Completion {
712 final CompletableFuture<? extends T> src;
713 final Runnable fn;
714 final CompletableFuture<Void> dst;
715 final Executor executor;
716 RunCompletion(CompletableFuture<? extends T> src,
717 Runnable fn,
718 CompletableFuture<Void> dst,
719 Executor executor) {
720 this.src = src; this.fn = fn; this.dst = dst;
721 this.executor = executor;
722 }
723 public final void run() {
724 final CompletableFuture<? extends T> a;
725 final Runnable fn;
726 final CompletableFuture<Void> dst;
727 Object r; Throwable ex;
728 if ((dst = this.dst) != null &&
729 (fn = this.fn) != null &&
730 (a = this.src) != null &&
731 (r = a.result) != null &&
732 compareAndSet(0, 1)) {
733 if (r instanceof AltResult)
734 ex = ((AltResult)r).ex;
735 else
736 ex = null;
737 Executor e = executor;
738 if (ex == null) {
739 try {
740 if (e != null)
741 e.execute(new AsyncRun(fn, dst));
742 else
743 fn.run();
744 } catch (Throwable rex) {
745 ex = rex;
746 }
747 }
748 if (e == null || ex != null)
749 dst.internalComplete(null, ex);
750 }
751 }
752 private static final long serialVersionUID = 5232453952276885070L;
753 }
754
755 static final class BiApplyCompletion<T,U,V> extends Completion {
756 final CompletableFuture<? extends T> src;
757 final CompletableFuture<? extends U> snd;
758 final BiFunction<? super T,? super U,? extends V> fn;
759 final CompletableFuture<V> dst;
760 final Executor executor;
761 BiApplyCompletion(CompletableFuture<? extends T> src,
762 CompletableFuture<? extends U> snd,
763 BiFunction<? super T,? super U,? extends V> fn,
764 CompletableFuture<V> dst, Executor executor) {
765 this.src = src; this.snd = snd;
766 this.fn = fn; this.dst = dst;
767 this.executor = executor;
768 }
769 public final void run() {
770 final CompletableFuture<? extends T> a;
771 final CompletableFuture<? extends U> b;
772 final BiFunction<? super T,? super U,? extends V> fn;
773 final CompletableFuture<V> dst;
774 Object r, s; T t; U u; Throwable ex;
775 if ((dst = this.dst) != null &&
776 (fn = this.fn) != null &&
777 (a = this.src) != null &&
778 (r = a.result) != null &&
779 (b = this.snd) != null &&
780 (s = b.result) != null &&
781 compareAndSet(0, 1)) {
782 if (r instanceof AltResult) {
783 ex = ((AltResult)r).ex;
784 t = null;
785 }
786 else {
787 ex = null;
788 @SuppressWarnings("unchecked") T tr = (T) r;
789 t = tr;
790 }
791 if (ex != null)
792 u = null;
793 else if (s instanceof AltResult) {
794 ex = ((AltResult)s).ex;
795 u = null;
796 }
797 else {
798 @SuppressWarnings("unchecked") U us = (U) s;
799 u = us;
800 }
801 Executor e = executor;
802 V v = null;
803 if (ex == null) {
804 try {
805 if (e != null)
806 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
807 else
808 v = fn.apply(t, u);
809 } catch (Throwable rex) {
810 ex = rex;
811 }
812 }
813 if (e == null || ex != null)
814 dst.internalComplete(v, ex);
815 }
816 }
817 private static final long serialVersionUID = 5232453952276885070L;
818 }
819
820 static final class BiAcceptCompletion<T,U> extends Completion {
821 final CompletableFuture<? extends T> src;
822 final CompletableFuture<? extends U> snd;
823 final BiConsumer<? super T,? super U> fn;
824 final CompletableFuture<Void> dst;
825 final Executor executor;
826 BiAcceptCompletion(CompletableFuture<? extends T> src,
827 CompletableFuture<? extends U> snd,
828 BiConsumer<? super T,? super U> fn,
829 CompletableFuture<Void> dst, Executor executor) {
830 this.src = src; this.snd = snd;
831 this.fn = fn; this.dst = dst;
832 this.executor = executor;
833 }
834 public final void run() {
835 final CompletableFuture<? extends T> a;
836 final CompletableFuture<? extends U> b;
837 final BiConsumer<? super T,? super U> fn;
838 final CompletableFuture<Void> dst;
839 Object r, s; T t; U u; Throwable ex;
840 if ((dst = this.dst) != null &&
841 (fn = this.fn) != null &&
842 (a = this.src) != null &&
843 (r = a.result) != null &&
844 (b = this.snd) != null &&
845 (s = b.result) != null &&
846 compareAndSet(0, 1)) {
847 if (r instanceof AltResult) {
848 ex = ((AltResult)r).ex;
849 t = null;
850 }
851 else {
852 ex = null;
853 @SuppressWarnings("unchecked") T tr = (T) r;
854 t = tr;
855 }
856 if (ex != null)
857 u = null;
858 else if (s instanceof AltResult) {
859 ex = ((AltResult)s).ex;
860 u = null;
861 }
862 else {
863 @SuppressWarnings("unchecked") U us = (U) s;
864 u = us;
865 }
866 Executor e = executor;
867 if (ex == null) {
868 try {
869 if (e != null)
870 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
871 else
872 fn.accept(t, u);
873 } catch (Throwable rex) {
874 ex = rex;
875 }
876 }
877 if (e == null || ex != null)
878 dst.internalComplete(null, ex);
879 }
880 }
881 private static final long serialVersionUID = 5232453952276885070L;
882 }
883
884 static final class BiRunCompletion<T> extends Completion {
885 final CompletableFuture<? extends T> src;
886 final CompletableFuture<?> snd;
887 final Runnable fn;
888 final CompletableFuture<Void> dst;
889 final Executor executor;
890 BiRunCompletion(CompletableFuture<? extends T> src,
891 CompletableFuture<?> snd,
892 Runnable fn,
893 CompletableFuture<Void> dst, Executor executor) {
894 this.src = src; this.snd = snd;
895 this.fn = fn; this.dst = dst;
896 this.executor = executor;
897 }
898 public final void run() {
899 final CompletableFuture<? extends T> a;
900 final CompletableFuture<?> b;
901 final Runnable fn;
902 final CompletableFuture<Void> dst;
903 Object r, s; Throwable ex;
904 if ((dst = this.dst) != null &&
905 (fn = this.fn) != null &&
906 (a = this.src) != null &&
907 (r = a.result) != null &&
908 (b = this.snd) != null &&
909 (s = b.result) != null &&
910 compareAndSet(0, 1)) {
911 if (r instanceof AltResult)
912 ex = ((AltResult)r).ex;
913 else
914 ex = null;
915 if (ex == null && (s instanceof AltResult))
916 ex = ((AltResult)s).ex;
917 Executor e = executor;
918 if (ex == null) {
919 try {
920 if (e != null)
921 e.execute(new AsyncRun(fn, dst));
922 else
923 fn.run();
924 } catch (Throwable rex) {
925 ex = rex;
926 }
927 }
928 if (e == null || ex != null)
929 dst.internalComplete(null, ex);
930 }
931 }
932 private static final long serialVersionUID = 5232453952276885070L;
933 }
934
935 static final class AndCompletion extends Completion {
936 final CompletableFuture<?> src;
937 final CompletableFuture<?> snd;
938 final CompletableFuture<Void> dst;
939 AndCompletion(CompletableFuture<?> src,
940 CompletableFuture<?> snd,
941 CompletableFuture<Void> dst) {
942 this.src = src; this.snd = snd; this.dst = dst;
943 }
944 public final void run() {
945 final CompletableFuture<?> a;
946 final CompletableFuture<?> b;
947 final CompletableFuture<Void> dst;
948 Object r, s; Throwable ex;
949 if ((dst = this.dst) != null &&
950 (a = this.src) != null &&
951 (r = a.result) != null &&
952 (b = this.snd) != null &&
953 (s = b.result) != null &&
954 compareAndSet(0, 1)) {
955 if (r instanceof AltResult)
956 ex = ((AltResult)r).ex;
957 else
958 ex = null;
959 if (ex == null && (s instanceof AltResult))
960 ex = ((AltResult)s).ex;
961 dst.internalComplete(null, ex);
962 }
963 }
964 private static final long serialVersionUID = 5232453952276885070L;
965 }
966
967 static final class OrApplyCompletion<T,U> extends Completion {
968 final CompletableFuture<? extends T> src;
969 final CompletableFuture<? extends T> snd;
970 final Function<? super T,? extends U> fn;
971 final CompletableFuture<U> dst;
972 final Executor executor;
973 OrApplyCompletion(CompletableFuture<? extends T> src,
974 CompletableFuture<? extends T> snd,
975 Function<? super T,? extends U> fn,
976 CompletableFuture<U> dst, Executor executor) {
977 this.src = src; this.snd = snd;
978 this.fn = fn; this.dst = dst;
979 this.executor = executor;
980 }
981 public final void run() {
982 final CompletableFuture<? extends T> a;
983 final CompletableFuture<? extends T> b;
984 final Function<? super T,? extends U> fn;
985 final CompletableFuture<U> dst;
986 Object r; T t; Throwable ex;
987 if ((dst = this.dst) != null &&
988 (fn = this.fn) != null &&
989 (((a = this.src) != null && (r = a.result) != null) ||
990 ((b = this.snd) != null && (r = b.result) != null)) &&
991 compareAndSet(0, 1)) {
992 if (r instanceof AltResult) {
993 ex = ((AltResult)r).ex;
994 t = null;
995 }
996 else {
997 ex = null;
998 @SuppressWarnings("unchecked") T tr = (T) r;
999 t = tr;
1000 }
1001 Executor e = executor;
1002 U u = null;
1003 if (ex == null) {
1004 try {
1005 if (e != null)
1006 e.execute(new AsyncApply<T,U>(t, fn, dst));
1007 else
1008 u = fn.apply(t);
1009 } catch (Throwable rex) {
1010 ex = rex;
1011 }
1012 }
1013 if (e == null || ex != null)
1014 dst.internalComplete(u, ex);
1015 }
1016 }
1017 private static final long serialVersionUID = 5232453952276885070L;
1018 }
1019
1020 static final class OrAcceptCompletion<T> extends Completion {
1021 final CompletableFuture<? extends T> src;
1022 final CompletableFuture<? extends T> snd;
1023 final Consumer<? super T> fn;
1024 final CompletableFuture<Void> dst;
1025 final Executor executor;
1026 OrAcceptCompletion(CompletableFuture<? extends T> src,
1027 CompletableFuture<? extends T> snd,
1028 Consumer<? super T> fn,
1029 CompletableFuture<Void> dst, Executor executor) {
1030 this.src = src; this.snd = snd;
1031 this.fn = fn; this.dst = dst;
1032 this.executor = executor;
1033 }
1034 public final void run() {
1035 final CompletableFuture<? extends T> a;
1036 final CompletableFuture<? extends T> b;
1037 final Consumer<? super T> fn;
1038 final CompletableFuture<Void> dst;
1039 Object r; T t; Throwable ex;
1040 if ((dst = this.dst) != null &&
1041 (fn = this.fn) != null &&
1042 (((a = this.src) != null && (r = a.result) != null) ||
1043 ((b = this.snd) != null && (r = b.result) != null)) &&
1044 compareAndSet(0, 1)) {
1045 if (r instanceof AltResult) {
1046 ex = ((AltResult)r).ex;
1047 t = null;
1048 }
1049 else {
1050 ex = null;
1051 @SuppressWarnings("unchecked") T tr = (T) r;
1052 t = tr;
1053 }
1054 Executor e = executor;
1055 if (ex == null) {
1056 try {
1057 if (e != null)
1058 e.execute(new AsyncAccept<T>(t, fn, dst));
1059 else
1060 fn.accept(t);
1061 } catch (Throwable rex) {
1062 ex = rex;
1063 }
1064 }
1065 if (e == null || ex != null)
1066 dst.internalComplete(null, ex);
1067 }
1068 }
1069 private static final long serialVersionUID = 5232453952276885070L;
1070 }
1071
1072 static final class OrRunCompletion<T> extends Completion {
1073 final CompletableFuture<? extends T> src;
1074 final CompletableFuture<?> snd;
1075 final Runnable fn;
1076 final CompletableFuture<Void> dst;
1077 final Executor executor;
1078 OrRunCompletion(CompletableFuture<? extends T> src,
1079 CompletableFuture<?> snd,
1080 Runnable fn,
1081 CompletableFuture<Void> dst, Executor executor) {
1082 this.src = src; this.snd = snd;
1083 this.fn = fn; this.dst = dst;
1084 this.executor = executor;
1085 }
1086 public final void run() {
1087 final CompletableFuture<? extends T> a;
1088 final CompletableFuture<?> b;
1089 final Runnable fn;
1090 final CompletableFuture<Void> dst;
1091 Object r; Throwable ex;
1092 if ((dst = this.dst) != null &&
1093 (fn = this.fn) != null &&
1094 (((a = this.src) != null && (r = a.result) != null) ||
1095 ((b = this.snd) != null && (r = b.result) != null)) &&
1096 compareAndSet(0, 1)) {
1097 if (r instanceof AltResult)
1098 ex = ((AltResult)r).ex;
1099 else
1100 ex = null;
1101 Executor e = executor;
1102 if (ex == null) {
1103 try {
1104 if (e != null)
1105 e.execute(new AsyncRun(fn, dst));
1106 else
1107 fn.run();
1108 } catch (Throwable rex) {
1109 ex = rex;
1110 }
1111 }
1112 if (e == null || ex != null)
1113 dst.internalComplete(null, ex);
1114 }
1115 }
1116 private static final long serialVersionUID = 5232453952276885070L;
1117 }
1118
1119 static final class OrCompletion extends Completion {
1120 final CompletableFuture<?> src;
1121 final CompletableFuture<?> snd;
1122 final CompletableFuture<?> dst;
1123 OrCompletion(CompletableFuture<?> src,
1124 CompletableFuture<?> snd,
1125 CompletableFuture<?> dst) {
1126 this.src = src; this.snd = snd; this.dst = dst;
1127 }
1128 public final void run() {
1129 final CompletableFuture<?> a;
1130 final CompletableFuture<?> b;
1131 final CompletableFuture<?> dst;
1132 Object r, t; Throwable ex;
1133 if ((dst = this.dst) != null &&
1134 (((a = this.src) != null && (r = a.result) != null) ||
1135 ((b = this.snd) != null && (r = b.result) != null)) &&
1136 compareAndSet(0, 1)) {
1137 if (r instanceof AltResult) {
1138 ex = ((AltResult)r).ex;
1139 t = null;
1140 }
1141 else {
1142 ex = null;
1143 t = r;
1144 }
1145 dst.internalComplete(t, ex);
1146 }
1147 }
1148 private static final long serialVersionUID = 5232453952276885070L;
1149 }
1150
1151 static final class ExceptionCompletion<T> extends Completion {
1152 final CompletableFuture<? extends T> src;
1153 final Function<? super Throwable, ? extends T> fn;
1154 final CompletableFuture<T> dst;
1155 ExceptionCompletion(CompletableFuture<? extends T> src,
1156 Function<? super Throwable, ? extends T> fn,
1157 CompletableFuture<T> dst) {
1158 this.src = src; this.fn = fn; this.dst = dst;
1159 }
1160 public final void run() {
1161 final CompletableFuture<? extends T> a;
1162 final Function<? super Throwable, ? extends T> fn;
1163 final CompletableFuture<T> dst;
1164 Object r; T t = null; Throwable ex, dx = null;
1165 if ((dst = this.dst) != null &&
1166 (fn = this.fn) != null &&
1167 (a = this.src) != null &&
1168 (r = a.result) != null &&
1169 compareAndSet(0, 1)) {
1170 if ((r instanceof AltResult) &&
1171 (ex = ((AltResult)r).ex) != null) {
1172 try {
1173 t = fn.apply(ex);
1174 } catch (Throwable rex) {
1175 dx = rex;
1176 }
1177 }
1178 else {
1179 @SuppressWarnings("unchecked") T tr = (T) r;
1180 t = tr;
1181 }
1182 dst.internalComplete(t, dx);
1183 }
1184 }
1185 private static final long serialVersionUID = 5232453952276885070L;
1186 }
1187
1188 static final class ThenCopy extends Completion {
1189 final CompletableFuture<?> src;
1190 final CompletableFuture<?> dst;
1191 ThenCopy(CompletableFuture<?> src,
1192 CompletableFuture<?> dst) {
1193 this.src = src; this.dst = dst;
1194 }
1195 public final void run() {
1196 final CompletableFuture<?> a;
1197 final CompletableFuture<?> dst;
1198 Object r; Object t; Throwable ex;
1199 if ((dst = this.dst) != null &&
1200 (a = this.src) != null &&
1201 (r = a.result) != null &&
1202 compareAndSet(0, 1)) {
1203 if (r instanceof AltResult) {
1204 ex = ((AltResult)r).ex;
1205 t = null;
1206 }
1207 else {
1208 ex = null;
1209 t = r;
1210 }
1211 dst.internalComplete(t, ex);
1212 }
1213 }
1214 private static final long serialVersionUID = 5232453952276885070L;
1215 }
1216
1217 static final class HandleCompletion<T,U> extends Completion {
1218 final CompletableFuture<? extends T> src;
1219 final BiFunction<? super T, Throwable, ? extends U> fn;
1220 final CompletableFuture<U> dst;
1221 HandleCompletion(CompletableFuture<? extends T> src,
1222 BiFunction<? super T, Throwable, ? extends U> fn,
1223 CompletableFuture<U> dst) {
1224 this.src = src; this.fn = fn; this.dst = dst;
1225 }
1226 public final void run() {
1227 final CompletableFuture<? extends T> a;
1228 final BiFunction<? super T, Throwable, ? extends U> fn;
1229 final CompletableFuture<U> dst;
1230 Object r; T t; Throwable ex;
1231 if ((dst = this.dst) != null &&
1232 (fn = this.fn) != null &&
1233 (a = this.src) != null &&
1234 (r = a.result) != null &&
1235 compareAndSet(0, 1)) {
1236 if (r instanceof AltResult) {
1237 ex = ((AltResult)r).ex;
1238 t = null;
1239 }
1240 else {
1241 ex = null;
1242 @SuppressWarnings("unchecked") T tr = (T) r;
1243 t = tr;
1244 }
1245 U u = null; Throwable dx = null;
1246 try {
1247 u = fn.apply(t, ex);
1248 } catch (Throwable rex) {
1249 dx = rex;
1250 }
1251 dst.internalComplete(u, dx);
1252 }
1253 }
1254 private static final long serialVersionUID = 5232453952276885070L;
1255 }
1256
1257 static final class ComposeCompletion<T,U> extends Completion {
1258 final CompletableFuture<? extends T> src;
1259 final Function<? super T, CompletableFuture<U>> fn;
1260 final CompletableFuture<U> dst;
1261 final Executor executor;
1262 ComposeCompletion(CompletableFuture<? extends T> src,
1263 Function<? super T, CompletableFuture<U>> fn,
1264 CompletableFuture<U> dst, Executor executor) {
1265 this.src = src; this.fn = fn; this.dst = dst;
1266 this.executor = executor;
1267 }
1268 public final void run() {
1269 final CompletableFuture<? extends T> a;
1270 final Function<? super T, CompletableFuture<U>> fn;
1271 final CompletableFuture<U> dst;
1272 Object r; T t; Throwable ex; Executor e;
1273 if ((dst = this.dst) != null &&
1274 (fn = this.fn) != null &&
1275 (a = this.src) != null &&
1276 (r = a.result) != null &&
1277 compareAndSet(0, 1)) {
1278 if (r instanceof AltResult) {
1279 ex = ((AltResult)r).ex;
1280 t = null;
1281 }
1282 else {
1283 ex = null;
1284 @SuppressWarnings("unchecked") T tr = (T) r;
1285 t = tr;
1286 }
1287 CompletableFuture<U> c = null;
1288 U u = null;
1289 boolean complete = false;
1290 if (ex == null) {
1291 if ((e = executor) != null)
1292 e.execute(new AsyncCompose<T,U>(t, fn, dst));
1293 else {
1294 try {
1295 if ((c = fn.apply(t)) == null)
1296 ex = new NullPointerException();
1297 } catch (Throwable rex) {
1298 ex = rex;
1299 }
1300 }
1301 }
1302 if (c != null) {
1303 ThenCopy d = null;
1304 Object s;
1305 if ((s = c.result) == null) {
1306 CompletionNode p = new CompletionNode
1307 (d = new ThenCopy(c, dst));
1308 while ((s = c.result) == null) {
1309 if (UNSAFE.compareAndSwapObject
1310 (c, COMPLETIONS, p.next = c.completions, p))
1311 break;
1312 }
1313 }
1314 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1315 complete = true;
1316 if (s instanceof AltResult) {
1317 ex = ((AltResult)s).ex; // no rewrap
1318 u = null;
1319 }
1320 else {
1321 @SuppressWarnings("unchecked") U us = (U) s;
1322 u = us;
1323 }
1324 }
1325 }
1326 if (complete || ex != null)
1327 dst.internalComplete(u, ex);
1328 if (c != null)
1329 c.helpPostComplete();
1330 }
1331 }
1332 private static final long serialVersionUID = 5232453952276885070L;
1333 }
1334
1335 // public methods
1336
1337 /**
1338 * Creates a new incomplete CompletableFuture.
1339 */
1340 public CompletableFuture() {
1341 }
1342
1343 /**
1344 * Returns a new CompletableFuture that is asynchronously completed
1345 * by a task running in the {@link ForkJoinPool#commonPool()} with
1346 * the value obtained by calling the given Supplier.
1347 *
1348 * @param supplier a function returning the value to be used
1349 * to complete the returned CompletableFuture
1350 * @return the new CompletableFuture
1351 */
1352 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1353 if (supplier == null) throw new NullPointerException();
1354 CompletableFuture<U> f = new CompletableFuture<U>();
1355 ForkJoinPool.commonPool().
1356 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1357 return f;
1358 }
1359
1360 /**
1361 * Returns a new CompletableFuture that is asynchronously completed
1362 * by a task running in the given executor with the value obtained
1363 * by calling the given Supplier.
1364 *
1365 * @param supplier a function returning the value to be used
1366 * to complete the returned CompletableFuture
1367 * @param executor the executor to use for asynchronous execution
1368 * @return the new CompletableFuture
1369 */
1370 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1371 Executor executor) {
1372 if (executor == null || supplier == null)
1373 throw new NullPointerException();
1374 CompletableFuture<U> f = new CompletableFuture<U>();
1375 executor.execute(new AsyncSupply<U>(supplier, f));
1376 return f;
1377 }
1378
1379 /**
1380 * Returns a new CompletableFuture that is asynchronously completed
1381 * by a task running in the {@link ForkJoinPool#commonPool()} after
1382 * it runs the given action.
1383 *
1384 * @param runnable the action to run before completing the
1385 * returned CompletableFuture
1386 * @return the new CompletableFuture
1387 */
1388 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1389 if (runnable == null) throw new NullPointerException();
1390 CompletableFuture<Void> f = new CompletableFuture<Void>();
1391 ForkJoinPool.commonPool().
1392 execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1393 return f;
1394 }
1395
1396 /**
1397 * Returns a new CompletableFuture that is asynchronously completed
1398 * by a task running in the given executor after it runs the given
1399 * action.
1400 *
1401 * @param runnable the action to run before completing the
1402 * returned CompletableFuture
1403 * @param executor the executor to use for asynchronous execution
1404 * @return the new CompletableFuture
1405 */
1406 public static CompletableFuture<Void> runAsync(Runnable runnable,
1407 Executor executor) {
1408 if (executor == null || runnable == null)
1409 throw new NullPointerException();
1410 CompletableFuture<Void> f = new CompletableFuture<Void>();
1411 executor.execute(new AsyncRun(runnable, f));
1412 return f;
1413 }
1414
1415 /**
1416 * Returns {@code true} if completed in any fashion: normally,
1417 * exceptionally, or via cancellation.
1418 *
1419 * @return {@code true} if completed
1420 */
1421 public boolean isDone() {
1422 return result != null;
1423 }
1424
1425 /**
1426 * Waits if necessary for this future to complete, and then
1427 * returns its result.
1428 *
1429 * @return the result value
1430 * @throws CancellationException if this future was cancelled
1431 * @throws ExecutionException if this future completed exceptionally
1432 * @throws InterruptedException if the current thread was interrupted
1433 * while waiting
1434 */
1435 public T get() throws InterruptedException, ExecutionException {
1436 Object r; Throwable ex, cause;
1437 if ((r = result) == null && (r = waitingGet(true)) == null)
1438 throw new InterruptedException();
1439 if (!(r instanceof AltResult)) {
1440 @SuppressWarnings("unchecked") T tr = (T) r;
1441 return tr;
1442 }
1443 if ((ex = ((AltResult)r).ex) == null)
1444 return null;
1445 if (ex instanceof CancellationException)
1446 throw (CancellationException)ex;
1447 if ((ex instanceof CompletionException) &&
1448 (cause = ex.getCause()) != null)
1449 ex = cause;
1450 throw new ExecutionException(ex);
1451 }
1452
1453 /**
1454 * Waits if necessary for at most the given time for this future
1455 * to complete, and then returns its result, if available.
1456 *
1457 * @param timeout the maximum time to wait
1458 * @param unit the time unit of the timeout argument
1459 * @return the result value
1460 * @throws CancellationException if this future was cancelled
1461 * @throws ExecutionException if this future completed exceptionally
1462 * @throws InterruptedException if the current thread was interrupted
1463 * while waiting
1464 * @throws TimeoutException if the wait timed out
1465 */
1466 public T get(long timeout, TimeUnit unit)
1467 throws InterruptedException, ExecutionException, TimeoutException {
1468 Object r; Throwable ex, cause;
1469 long nanos = unit.toNanos(timeout);
1470 if (Thread.interrupted())
1471 throw new InterruptedException();
1472 if ((r = result) == null)
1473 r = timedAwaitDone(nanos);
1474 if (!(r instanceof AltResult)) {
1475 @SuppressWarnings("unchecked") T tr = (T) r;
1476 return tr;
1477 }
1478 if ((ex = ((AltResult)r).ex) == null)
1479 return null;
1480 if (ex instanceof CancellationException)
1481 throw (CancellationException)ex;
1482 if ((ex instanceof CompletionException) &&
1483 (cause = ex.getCause()) != null)
1484 ex = cause;
1485 throw new ExecutionException(ex);
1486 }
1487
1488 /**
1489 * Returns the result value when complete, or throws an
1490 * (unchecked) exception if completed exceptionally. To better
1491 * conform with the use of common functional forms, if a
1492 * computation involved in the completion of this
1493 * CompletableFuture threw an exception, this method throws an
1494 * (unchecked) {@link CompletionException} with the underlying
1495 * exception as its cause.
1496 *
1497 * @return the result value
1498 * @throws CancellationException if the computation was cancelled
1499 * @throws CompletionException if this future completed
1500 * exceptionally or a completion computation threw an exception
1501 */
1502 public T join() {
1503 Object r; Throwable ex;
1504 if ((r = result) == null)
1505 r = waitingGet(false);
1506 if (!(r instanceof AltResult)) {
1507 @SuppressWarnings("unchecked") T tr = (T) r;
1508 return tr;
1509 }
1510 if ((ex = ((AltResult)r).ex) == null)
1511 return null;
1512 if (ex instanceof CancellationException)
1513 throw (CancellationException)ex;
1514 if (ex instanceof CompletionException)
1515 throw (CompletionException)ex;
1516 throw new CompletionException(ex);
1517 }
1518
1519 /**
1520 * Returns the result value (or throws any encountered exception)
1521 * if completed, else returns the given valueIfAbsent.
1522 *
1523 * @param valueIfAbsent the value to return if not completed
1524 * @return the result value, if completed, else the given valueIfAbsent
1525 * @throws CancellationException if the computation was cancelled
1526 * @throws CompletionException if this future completed
1527 * exceptionally or a completion computation threw an exception
1528 */
1529 public T getNow(T valueIfAbsent) {
1530 Object r; Throwable ex;
1531 if ((r = result) == null)
1532 return valueIfAbsent;
1533 if (!(r instanceof AltResult)) {
1534 @SuppressWarnings("unchecked") T tr = (T) r;
1535 return tr;
1536 }
1537 if ((ex = ((AltResult)r).ex) == null)
1538 return null;
1539 if (ex instanceof CancellationException)
1540 throw (CancellationException)ex;
1541 if (ex instanceof CompletionException)
1542 throw (CompletionException)ex;
1543 throw new CompletionException(ex);
1544 }
1545
1546 /**
1547 * If not already completed, sets the value returned by {@link
1548 * #get()} and related methods to the given value.
1549 *
1550 * @param value the result value
1551 * @return {@code true} if this invocation caused this CompletableFuture
1552 * to transition to a completed state, else {@code false}
1553 */
1554 public boolean complete(T value) {
1555 boolean triggered = result == null &&
1556 UNSAFE.compareAndSwapObject(this, RESULT, null,
1557 value == null ? NIL : value);
1558 postComplete();
1559 return triggered;
1560 }
1561
1562 /**
1563 * If not already completed, causes invocations of {@link #get()}
1564 * and related methods to throw the given exception.
1565 *
1566 * @param ex the exception
1567 * @return {@code true} if this invocation caused this CompletableFuture
1568 * to transition to a completed state, else {@code false}
1569 */
1570 public boolean completeExceptionally(Throwable ex) {
1571 if (ex == null) throw new NullPointerException();
1572 boolean triggered = result == null &&
1573 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1574 postComplete();
1575 return triggered;
1576 }
1577
1578 /**
1579 * Returns a new CompletableFuture that is completed
1580 * when this CompletableFuture completes, with the result of the
1581 * given function of this CompletableFuture's result.
1582 *
1583 * <p>If this CompletableFuture completes exceptionally,
1584 * then the returned CompletableFuture also does so, with a
1585 * CompletionException holding this exception as its cause.
1586 *
1587 * @param fn the function to use to compute the value of
1588 * the returned CompletableFuture
1589 * @return the new CompletableFuture
1590 */
1591 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
1592 return doThenApply(fn, null);
1593 }
1594
1595 /**
1596 * Returns a new CompletableFuture that is asynchronously completed
1597 * when this CompletableFuture completes, with the result of the
1598 * given function of this CompletableFuture's result, called from a
1599 * task running in the {@link ForkJoinPool#commonPool()}.
1600 *
1601 * <p>If this CompletableFuture completes exceptionally,
1602 * then the returned CompletableFuture also does so, with a
1603 * CompletionException holding this exception as its cause.
1604 *
1605 * @param fn the function to use to compute the value of
1606 * the returned CompletableFuture
1607 * @return the new CompletableFuture
1608 */
1609 public <U> CompletableFuture<U> thenApplyAsync
1610 (Function<? super T,? extends U> fn) {
1611 return doThenApply(fn, ForkJoinPool.commonPool());
1612 }
1613
1614 /**
1615 * Returns a new CompletableFuture that is asynchronously completed
1616 * when this CompletableFuture completes, with the result of the
1617 * given function of this CompletableFuture's result, called from a
1618 * task running in the given executor.
1619 *
1620 * <p>If this CompletableFuture completes exceptionally,
1621 * then the returned CompletableFuture also does so, with a
1622 * CompletionException holding this exception as its cause.
1623 *
1624 * @param fn the function to use to compute the value of
1625 * the returned CompletableFuture
1626 * @param executor the executor to use for asynchronous execution
1627 * @return the new CompletableFuture
1628 */
1629 public <U> CompletableFuture<U> thenApplyAsync
1630 (Function<? super T,? extends U> fn,
1631 Executor executor) {
1632 if (executor == null) throw new NullPointerException();
1633 return doThenApply(fn, executor);
1634 }
1635
1636 private <U> CompletableFuture<U> doThenApply
1637 (Function<? super T,? extends U> fn,
1638 Executor e) {
1639 if (fn == null) throw new NullPointerException();
1640 CompletableFuture<U> dst = new CompletableFuture<U>();
1641 ApplyCompletion<T,U> d = null;
1642 Object r;
1643 if ((r = result) == null) {
1644 CompletionNode p = new CompletionNode
1645 (d = new ApplyCompletion<T,U>(this, fn, dst, e));
1646 while ((r = result) == null) {
1647 if (UNSAFE.compareAndSwapObject
1648 (this, COMPLETIONS, p.next = completions, p))
1649 break;
1650 }
1651 }
1652 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1653 T t; Throwable ex;
1654 if (r instanceof AltResult) {
1655 ex = ((AltResult)r).ex;
1656 t = null;
1657 }
1658 else {
1659 ex = null;
1660 @SuppressWarnings("unchecked") T tr = (T) r;
1661 t = tr;
1662 }
1663 U u = null;
1664 if (ex == null) {
1665 try {
1666 if (e != null)
1667 e.execute(new AsyncApply<T,U>(t, fn, dst));
1668 else
1669 u = fn.apply(t);
1670 } catch (Throwable rex) {
1671 ex = rex;
1672 }
1673 }
1674 if (e == null || ex != null)
1675 dst.internalComplete(u, ex);
1676 }
1677 helpPostComplete();
1678 return dst;
1679 }
1680
1681 /**
1682 * Returns a new CompletableFuture that is completed
1683 * when this CompletableFuture completes, after performing the given
1684 * action with this CompletableFuture's result.
1685 *
1686 * <p>If this CompletableFuture completes exceptionally,
1687 * then the returned CompletableFuture also does so, with a
1688 * CompletionException holding this exception as its cause.
1689 *
1690 * @param block the action to perform before completing the
1691 * returned CompletableFuture
1692 * @return the new CompletableFuture
1693 */
1694 public CompletableFuture<Void> thenAccept(Consumer<? super T> block) {
1695 return doThenAccept(block, null);
1696 }
1697
1698 /**
1699 * Returns a new CompletableFuture that is asynchronously completed
1700 * when this CompletableFuture completes, after performing the given
1701 * action with this CompletableFuture's result from a task running
1702 * in the {@link ForkJoinPool#commonPool()}.
1703 *
1704 * <p>If this CompletableFuture completes exceptionally,
1705 * then the returned CompletableFuture also does so, with a
1706 * CompletionException holding this exception as its cause.
1707 *
1708 * @param block the action to perform before completing the
1709 * returned CompletableFuture
1710 * @return the new CompletableFuture
1711 */
1712 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block) {
1713 return doThenAccept(block, ForkJoinPool.commonPool());
1714 }
1715
1716 /**
1717 * Returns a new CompletableFuture that is asynchronously completed
1718 * when this CompletableFuture completes, after performing the given
1719 * action with this CompletableFuture's result from a task running
1720 * in the given executor.
1721 *
1722 * <p>If this CompletableFuture completes exceptionally,
1723 * then the returned CompletableFuture also does so, with a
1724 * CompletionException holding this exception as its cause.
1725 *
1726 * @param block the action to perform before completing the
1727 * returned CompletableFuture
1728 * @param executor the executor to use for asynchronous execution
1729 * @return the new CompletableFuture
1730 */
1731 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block,
1732 Executor executor) {
1733 if (executor == null) throw new NullPointerException();
1734 return doThenAccept(block, executor);
1735 }
1736
1737 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1738 Executor e) {
1739 if (fn == null) throw new NullPointerException();
1740 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1741 AcceptCompletion<T> d = null;
1742 Object r;
1743 if ((r = result) == null) {
1744 CompletionNode p = new CompletionNode
1745 (d = new AcceptCompletion<T>(this, fn, dst, e));
1746 while ((r = result) == null) {
1747 if (UNSAFE.compareAndSwapObject
1748 (this, COMPLETIONS, p.next = completions, p))
1749 break;
1750 }
1751 }
1752 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1753 T t; Throwable ex;
1754 if (r instanceof AltResult) {
1755 ex = ((AltResult)r).ex;
1756 t = null;
1757 }
1758 else {
1759 ex = null;
1760 @SuppressWarnings("unchecked") T tr = (T) r;
1761 t = tr;
1762 }
1763 if (ex == null) {
1764 try {
1765 if (e != null)
1766 e.execute(new AsyncAccept<T>(t, fn, dst));
1767 else
1768 fn.accept(t);
1769 } catch (Throwable rex) {
1770 ex = rex;
1771 }
1772 }
1773 if (e == null || ex != null)
1774 dst.internalComplete(null, ex);
1775 }
1776 helpPostComplete();
1777 return dst;
1778 }
1779
1780 /**
1781 * Returns a new CompletableFuture that is completed
1782 * when this CompletableFuture completes, after performing the given
1783 * action.
1784 *
1785 * <p>If this CompletableFuture completes exceptionally,
1786 * then the returned CompletableFuture also does so, with a
1787 * CompletionException holding this exception as its cause.
1788 *
1789 * @param action the action to perform before completing the
1790 * returned CompletableFuture
1791 * @return the new CompletableFuture
1792 */
1793 public CompletableFuture<Void> thenRun(Runnable action) {
1794 return doThenRun(action, null);
1795 }
1796
1797 /**
1798 * Returns a new CompletableFuture that is asynchronously completed
1799 * when this CompletableFuture completes, after performing the given
1800 * action from a task running in the {@link ForkJoinPool#commonPool()}.
1801 *
1802 * <p>If this CompletableFuture completes exceptionally,
1803 * then the returned CompletableFuture also does so, with a
1804 * CompletionException holding this exception as its cause.
1805 *
1806 * @param action the action to perform before completing the
1807 * returned CompletableFuture
1808 * @return the new CompletableFuture
1809 */
1810 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1811 return doThenRun(action, ForkJoinPool.commonPool());
1812 }
1813
1814 /**
1815 * Returns a new CompletableFuture that is asynchronously completed
1816 * when this CompletableFuture completes, after performing the given
1817 * action from a task running in the given executor.
1818 *
1819 * <p>If this CompletableFuture completes exceptionally,
1820 * then the returned CompletableFuture also does so, with a
1821 * CompletionException holding this exception as its cause.
1822 *
1823 * @param action the action to perform before completing the
1824 * returned CompletableFuture
1825 * @param executor the executor to use for asynchronous execution
1826 * @return the new CompletableFuture
1827 */
1828 public CompletableFuture<Void> thenRunAsync(Runnable action,
1829 Executor executor) {
1830 if (executor == null) throw new NullPointerException();
1831 return doThenRun(action, executor);
1832 }
1833
1834 private CompletableFuture<Void> doThenRun(Runnable action,
1835 Executor e) {
1836 if (action == null) throw new NullPointerException();
1837 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1838 RunCompletion<T> d = null;
1839 Object r;
1840 if ((r = result) == null) {
1841 CompletionNode p = new CompletionNode
1842 (d = new RunCompletion<T>(this, action, dst, e));
1843 while ((r = result) == null) {
1844 if (UNSAFE.compareAndSwapObject
1845 (this, COMPLETIONS, p.next = completions, p))
1846 break;
1847 }
1848 }
1849 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1850 Throwable ex;
1851 if (r instanceof AltResult)
1852 ex = ((AltResult)r).ex;
1853 else
1854 ex = null;
1855 if (ex == null) {
1856 try {
1857 if (e != null)
1858 e.execute(new AsyncRun(action, dst));
1859 else
1860 action.run();
1861 } catch (Throwable rex) {
1862 ex = rex;
1863 }
1864 }
1865 if (e == null || ex != null)
1866 dst.internalComplete(null, ex);
1867 }
1868 helpPostComplete();
1869 return dst;
1870 }
1871
1872 /**
1873 * Returns a new CompletableFuture that is completed
1874 * when both this and the other given CompletableFuture complete,
1875 * with the result of the given function of the results of the two
1876 * CompletableFutures.
1877 *
1878 * <p>If this and/or the other CompletableFuture complete exceptionally,
1879 * then the returned CompletableFuture also does so, with a
1880 * CompletionException holding the exception as its cause.
1881 *
1882 * @param other the other CompletableFuture
1883 * @param fn the function to use to compute the value of
1884 * the returned CompletableFuture
1885 * @return the new CompletableFuture
1886 */
1887 public <U,V> CompletableFuture<V> thenCombine
1888 (CompletableFuture<? extends U> other,
1889 BiFunction<? super T,? super U,? extends V> fn) {
1890 return doThenBiApply(other, fn, null);
1891 }
1892
1893 /**
1894 * Returns a new CompletableFuture that is asynchronously completed
1895 * when both this and the other given CompletableFuture complete,
1896 * with the result of the given function of the results of the two
1897 * CompletableFutures, called from a task running in the
1898 * {@link ForkJoinPool#commonPool()}.
1899 *
1900 * <p>If this and/or the other CompletableFuture complete exceptionally,
1901 * then the returned CompletableFuture also does so, with a
1902 * CompletionException holding the exception as its cause.
1903 *
1904 * @param other the other CompletableFuture
1905 * @param fn the function to use to compute the value of
1906 * the returned CompletableFuture
1907 * @return the new CompletableFuture
1908 */
1909 public <U,V> CompletableFuture<V> thenCombineAsync
1910 (CompletableFuture<? extends U> other,
1911 BiFunction<? super T,? super U,? extends V> fn) {
1912 return doThenBiApply(other, fn, ForkJoinPool.commonPool());
1913 }
1914
1915 /**
1916 * Returns a new CompletableFuture that is asynchronously completed
1917 * when both this and the other given CompletableFuture complete,
1918 * with the result of the given function of the results of the two
1919 * CompletableFutures, called from a task running in the
1920 * given executor.
1921 *
1922 * <p>If this and/or the other CompletableFuture complete exceptionally,
1923 * then the returned CompletableFuture also does so, with a
1924 * CompletionException holding the exception as its cause.
1925 *
1926 * @param other the other CompletableFuture
1927 * @param fn the function to use to compute the value of
1928 * the returned CompletableFuture
1929 * @param executor the executor to use for asynchronous execution
1930 * @return the new CompletableFuture
1931 */
1932 public <U,V> CompletableFuture<V> thenCombineAsync
1933 (CompletableFuture<? extends U> other,
1934 BiFunction<? super T,? super U,? extends V> fn,
1935 Executor executor) {
1936 if (executor == null) throw new NullPointerException();
1937 return doThenBiApply(other, fn, executor);
1938 }
1939
1940 private <U,V> CompletableFuture<V> doThenBiApply
1941 (CompletableFuture<? extends U> other,
1942 BiFunction<? super T,? super U,? extends V> fn,
1943 Executor e) {
1944 if (other == null || fn == null) throw new NullPointerException();
1945 CompletableFuture<V> dst = new CompletableFuture<V>();
1946 BiApplyCompletion<T,U,V> d = null;
1947 Object r, s = null;
1948 if ((r = result) == null || (s = other.result) == null) {
1949 d = new BiApplyCompletion<T,U,V>(this, other, fn, dst, e);
1950 CompletionNode q = null, p = new CompletionNode(d);
1951 while ((r == null && (r = result) == null) ||
1952 (s == null && (s = other.result) == null)) {
1953 if (q != null) {
1954 if (s != null ||
1955 UNSAFE.compareAndSwapObject
1956 (other, COMPLETIONS, q.next = other.completions, q))
1957 break;
1958 }
1959 else if (r != null ||
1960 UNSAFE.compareAndSwapObject
1961 (this, COMPLETIONS, p.next = completions, p)) {
1962 if (s != null)
1963 break;
1964 q = new CompletionNode(d);
1965 }
1966 }
1967 }
1968 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1969 T t; U u; Throwable ex;
1970 if (r instanceof AltResult) {
1971 ex = ((AltResult)r).ex;
1972 t = null;
1973 }
1974 else {
1975 ex = null;
1976 @SuppressWarnings("unchecked") T tr = (T) r;
1977 t = tr;
1978 }
1979 if (ex != null)
1980 u = null;
1981 else if (s instanceof AltResult) {
1982 ex = ((AltResult)s).ex;
1983 u = null;
1984 }
1985 else {
1986 @SuppressWarnings("unchecked") U us = (U) s;
1987 u = us;
1988 }
1989 V v = null;
1990 if (ex == null) {
1991 try {
1992 if (e != null)
1993 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
1994 else
1995 v = fn.apply(t, u);
1996 } catch (Throwable rex) {
1997 ex = rex;
1998 }
1999 }
2000 if (e == null || ex != null)
2001 dst.internalComplete(v, ex);
2002 }
2003 helpPostComplete();
2004 other.helpPostComplete();
2005 return dst;
2006 }
2007
2008 /**
2009 * Returns a new CompletableFuture that is completed
2010 * when both this and the other given CompletableFuture complete,
2011 * after performing the given action with the results of the two
2012 * CompletableFutures.
2013 *
2014 * <p>If this and/or the other CompletableFuture complete exceptionally,
2015 * then the returned CompletableFuture also does so, with a
2016 * CompletionException holding one of these exceptions as its cause.
2017 *
2018 * @param other the other CompletableFuture
2019 * @param block the action to perform before completing the
2020 * returned CompletableFuture
2021 * @return the new CompletableFuture
2022 */
2023 public <U> CompletableFuture<Void> thenAcceptBoth
2024 (CompletableFuture<? extends U> other,
2025 BiConsumer<? super T, ? super U> block) {
2026 return doThenBiAccept(other, block, null);
2027 }
2028
2029 /**
2030 * Returns a new CompletableFuture that is asynchronously completed
2031 * when both this and the other given CompletableFuture complete,
2032 * after performing the given action with the results of the two
2033 * CompletableFutures from a task running in the {@link
2034 * ForkJoinPool#commonPool()}.
2035 *
2036 * <p>If this and/or the other CompletableFuture complete exceptionally,
2037 * then the returned CompletableFuture also does so, with a
2038 * CompletionException holding one of these exceptions as its cause.
2039 *
2040 * @param other the other CompletableFuture
2041 * @param block the action to perform before completing the
2042 * returned CompletableFuture
2043 * @return the new CompletableFuture
2044 */
2045 public <U> CompletableFuture<Void> thenAcceptBothAsync
2046 (CompletableFuture<? extends U> other,
2047 BiConsumer<? super T, ? super U> block) {
2048 return doThenBiAccept(other, block, ForkJoinPool.commonPool());
2049 }
2050
2051 /**
2052 * Returns a new CompletableFuture that is asynchronously completed
2053 * when both this and the other given CompletableFuture complete,
2054 * after performing the given action with the results of the two
2055 * CompletableFutures from a task running in the given executor.
2056 *
2057 * <p>If this and/or the other CompletableFuture complete exceptionally,
2058 * then the returned CompletableFuture also does so, with a
2059 * CompletionException holding one of these exceptions as its cause.
2060 *
2061 * @param other the other CompletableFuture
2062 * @param block the action to perform before completing the
2063 * returned CompletableFuture
2064 * @param executor the executor to use for asynchronous execution
2065 * @return the new CompletableFuture
2066 */
2067 public <U> CompletableFuture<Void> thenAcceptBothAsync
2068 (CompletableFuture<? extends U> other,
2069 BiConsumer<? super T, ? super U> block,
2070 Executor executor) {
2071 if (executor == null) throw new NullPointerException();
2072 return doThenBiAccept(other, block, executor);
2073 }
2074
2075 private <U> CompletableFuture<Void> doThenBiAccept
2076 (CompletableFuture<? extends U> other,
2077 BiConsumer<? super T,? super U> fn,
2078 Executor e) {
2079 if (other == null || fn == null) throw new NullPointerException();
2080 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2081 BiAcceptCompletion<T,U> d = null;
2082 Object r, s = null;
2083 if ((r = result) == null || (s = other.result) == null) {
2084 d = new BiAcceptCompletion<T,U>(this, other, fn, dst, e);
2085 CompletionNode q = null, p = new CompletionNode(d);
2086 while ((r == null && (r = result) == null) ||
2087 (s == null && (s = other.result) == null)) {
2088 if (q != null) {
2089 if (s != null ||
2090 UNSAFE.compareAndSwapObject
2091 (other, COMPLETIONS, q.next = other.completions, q))
2092 break;
2093 }
2094 else if (r != null ||
2095 UNSAFE.compareAndSwapObject
2096 (this, COMPLETIONS, p.next = completions, p)) {
2097 if (s != null)
2098 break;
2099 q = new CompletionNode(d);
2100 }
2101 }
2102 }
2103 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2104 T t; U u; Throwable ex;
2105 if (r instanceof AltResult) {
2106 ex = ((AltResult)r).ex;
2107 t = null;
2108 }
2109 else {
2110 ex = null;
2111 @SuppressWarnings("unchecked") T tr = (T) r;
2112 t = tr;
2113 }
2114 if (ex != null)
2115 u = null;
2116 else if (s instanceof AltResult) {
2117 ex = ((AltResult)s).ex;
2118 u = null;
2119 }
2120 else {
2121 @SuppressWarnings("unchecked") U us = (U) s;
2122 u = us;
2123 }
2124 if (ex == null) {
2125 try {
2126 if (e != null)
2127 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
2128 else
2129 fn.accept(t, u);
2130 } catch (Throwable rex) {
2131 ex = rex;
2132 }
2133 }
2134 if (e == null || ex != null)
2135 dst.internalComplete(null, ex);
2136 }
2137 helpPostComplete();
2138 other.helpPostComplete();
2139 return dst;
2140 }
2141
2142 /**
2143 * Returns a new CompletableFuture that is completed
2144 * when both this and the other given CompletableFuture complete,
2145 * after performing the given action.
2146 *
2147 * <p>If this and/or the other CompletableFuture complete exceptionally,
2148 * then the returned CompletableFuture also does so, with a
2149 * CompletionException holding one of these exceptions as its cause.
2150 *
2151 * @param other the other CompletableFuture
2152 * @param action the action to perform before completing the
2153 * returned CompletableFuture
2154 * @return the new CompletableFuture
2155 */
2156 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2157 Runnable action) {
2158 return doThenBiRun(other, action, null);
2159 }
2160
2161 /**
2162 * Returns a new CompletableFuture that is asynchronously completed
2163 * when both this and the other given CompletableFuture complete,
2164 * after performing the given action from a task running in the
2165 * {@link ForkJoinPool#commonPool()}.
2166 *
2167 * <p>If this and/or the other CompletableFuture complete exceptionally,
2168 * then the returned CompletableFuture also does so, with a
2169 * CompletionException holding one of these exceptions as its cause.
2170 *
2171 * @param other the other CompletableFuture
2172 * @param action the action to perform before completing the
2173 * returned CompletableFuture
2174 * @return the new CompletableFuture
2175 */
2176 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2177 Runnable action) {
2178 return doThenBiRun(other, action, ForkJoinPool.commonPool());
2179 }
2180
2181 /**
2182 * Returns a new CompletableFuture that is asynchronously completed
2183 * when both this and the other given CompletableFuture complete,
2184 * after performing the given action from a task running in the
2185 * given executor.
2186 *
2187 * <p>If this and/or the other CompletableFuture complete exceptionally,
2188 * then the returned CompletableFuture also does so, with a
2189 * CompletionException holding one of these exceptions as its cause.
2190 *
2191 * @param other the other CompletableFuture
2192 * @param action the action to perform before completing the
2193 * returned CompletableFuture
2194 * @param executor the executor to use for asynchronous execution
2195 * @return the new CompletableFuture
2196 */
2197 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2198 Runnable action,
2199 Executor executor) {
2200 if (executor == null) throw new NullPointerException();
2201 return doThenBiRun(other, action, executor);
2202 }
2203
2204 private CompletableFuture<Void> doThenBiRun(CompletableFuture<?> other,
2205 Runnable action,
2206 Executor e) {
2207 if (other == null || action == null) throw new NullPointerException();
2208 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2209 BiRunCompletion<T> d = null;
2210 Object r, s = null;
2211 if ((r = result) == null || (s = other.result) == null) {
2212 d = new BiRunCompletion<T>(this, other, action, dst, e);
2213 CompletionNode q = null, p = new CompletionNode(d);
2214 while ((r == null && (r = result) == null) ||
2215 (s == null && (s = other.result) == null)) {
2216 if (q != null) {
2217 if (s != null ||
2218 UNSAFE.compareAndSwapObject
2219 (other, COMPLETIONS, q.next = other.completions, q))
2220 break;
2221 }
2222 else if (r != null ||
2223 UNSAFE.compareAndSwapObject
2224 (this, COMPLETIONS, p.next = completions, p)) {
2225 if (s != null)
2226 break;
2227 q = new CompletionNode(d);
2228 }
2229 }
2230 }
2231 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2232 Throwable ex;
2233 if (r instanceof AltResult)
2234 ex = ((AltResult)r).ex;
2235 else
2236 ex = null;
2237 if (ex == null && (s instanceof AltResult))
2238 ex = ((AltResult)s).ex;
2239 if (ex == null) {
2240 try {
2241 if (e != null)
2242 e.execute(new AsyncRun(action, dst));
2243 else
2244 action.run();
2245 } catch (Throwable rex) {
2246 ex = rex;
2247 }
2248 }
2249 if (e == null || ex != null)
2250 dst.internalComplete(null, ex);
2251 }
2252 helpPostComplete();
2253 other.helpPostComplete();
2254 return dst;
2255 }
2256
2257 /**
2258 * Returns a new CompletableFuture that is completed
2259 * when either this or the other given CompletableFuture completes,
2260 * with the result of the given function of either this or the other
2261 * CompletableFuture's result.
2262 *
2263 * <p>If this and/or the other CompletableFuture complete exceptionally,
2264 * then the returned CompletableFuture may also do so, with a
2265 * CompletionException holding one of these exceptions as its cause.
2266 * No guarantees are made about which result or exception is used in
2267 * the returned CompletableFuture.
2268 *
2269 * @param other the other CompletableFuture
2270 * @param fn the function to use to compute the value of
2271 * the returned CompletableFuture
2272 * @return the new CompletableFuture
2273 */
2274 public <U> CompletableFuture<U> applyToEither
2275 (CompletableFuture<? extends T> other,
2276 Function<? super T, U> fn) {
2277 return doOrApply(other, fn, null);
2278 }
2279
2280 /**
2281 * Returns a new CompletableFuture that is asynchronously completed
2282 * when either this or the other given CompletableFuture completes,
2283 * with the result of the given function of either this or the other
2284 * CompletableFuture's result, called from a task running in the
2285 * {@link ForkJoinPool#commonPool()}.
2286 *
2287 * <p>If this and/or the other CompletableFuture complete exceptionally,
2288 * then the returned CompletableFuture may also do so, with a
2289 * CompletionException holding one of these exceptions as its cause.
2290 * No guarantees are made about which result or exception is used in
2291 * the returned CompletableFuture.
2292 *
2293 * @param other the other CompletableFuture
2294 * @param fn the function to use to compute the value of
2295 * the returned CompletableFuture
2296 * @return the new CompletableFuture
2297 */
2298 public <U> CompletableFuture<U> applyToEitherAsync
2299 (CompletableFuture<? extends T> other,
2300 Function<? super T, U> fn) {
2301 return doOrApply(other, fn, ForkJoinPool.commonPool());
2302 }
2303
2304 /**
2305 * Returns a new CompletableFuture that is asynchronously completed
2306 * when either this or the other given CompletableFuture completes,
2307 * with the result of the given function of either this or the other
2308 * CompletableFuture's result, called from a task running in the
2309 * given executor.
2310 *
2311 * <p>If this and/or the other CompletableFuture complete exceptionally,
2312 * then the returned CompletableFuture may also do so, with a
2313 * CompletionException holding one of these exceptions as its cause.
2314 * No guarantees are made about which result or exception is used in
2315 * the returned CompletableFuture.
2316 *
2317 * @param other the other CompletableFuture
2318 * @param fn the function to use to compute the value of
2319 * the returned CompletableFuture
2320 * @param executor the executor to use for asynchronous execution
2321 * @return the new CompletableFuture
2322 */
2323 public <U> CompletableFuture<U> applyToEitherAsync
2324 (CompletableFuture<? extends T> other,
2325 Function<? super T, U> fn,
2326 Executor executor) {
2327 if (executor == null) throw new NullPointerException();
2328 return doOrApply(other, fn, executor);
2329 }
2330
2331 private <U> CompletableFuture<U> doOrApply
2332 (CompletableFuture<? extends T> other,
2333 Function<? super T, U> fn,
2334 Executor e) {
2335 if (other == null || fn == null) throw new NullPointerException();
2336 CompletableFuture<U> dst = new CompletableFuture<U>();
2337 OrApplyCompletion<T,U> d = null;
2338 Object r;
2339 if ((r = result) == null && (r = other.result) == null) {
2340 d = new OrApplyCompletion<T,U>(this, other, fn, dst, e);
2341 CompletionNode q = null, p = new CompletionNode(d);
2342 while ((r = result) == null && (r = other.result) == null) {
2343 if (q != null) {
2344 if (UNSAFE.compareAndSwapObject
2345 (other, COMPLETIONS, q.next = other.completions, q))
2346 break;
2347 }
2348 else if (UNSAFE.compareAndSwapObject
2349 (this, COMPLETIONS, p.next = completions, p))
2350 q = new CompletionNode(d);
2351 }
2352 }
2353 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2354 T t; Throwable ex;
2355 if (r instanceof AltResult) {
2356 ex = ((AltResult)r).ex;
2357 t = null;
2358 }
2359 else {
2360 ex = null;
2361 @SuppressWarnings("unchecked") T tr = (T) r;
2362 t = tr;
2363 }
2364 U u = null;
2365 if (ex == null) {
2366 try {
2367 if (e != null)
2368 e.execute(new AsyncApply<T,U>(t, fn, dst));
2369 else
2370 u = fn.apply(t);
2371 } catch (Throwable rex) {
2372 ex = rex;
2373 }
2374 }
2375 if (e == null || ex != null)
2376 dst.internalComplete(u, ex);
2377 }
2378 helpPostComplete();
2379 other.helpPostComplete();
2380 return dst;
2381 }
2382
2383 /**
2384 * Returns a new CompletableFuture that is completed
2385 * when either this or the other given CompletableFuture completes,
2386 * after performing the given action with the result of either this
2387 * or the other CompletableFuture's result.
2388 *
2389 * <p>If this and/or the other CompletableFuture complete exceptionally,
2390 * then the returned CompletableFuture may also do so, with a
2391 * CompletionException holding one of these exceptions as its cause.
2392 * No guarantees are made about which result or exception is used in
2393 * the returned CompletableFuture.
2394 *
2395 * @param other the other CompletableFuture
2396 * @param block the action to perform before completing the
2397 * returned CompletableFuture
2398 * @return the new CompletableFuture
2399 */
2400 public CompletableFuture<Void> acceptEither
2401 (CompletableFuture<? extends T> other,
2402 Consumer<? super T> block) {
2403 return doOrAccept(other, block, null);
2404 }
2405
2406 /**
2407 * Returns a new CompletableFuture that is asynchronously completed
2408 * when either this or the other given CompletableFuture completes,
2409 * after performing the given action with the result of either this
2410 * or the other CompletableFuture's result from a task running in
2411 * the {@link ForkJoinPool#commonPool()}.
2412 *
2413 * <p>If this and/or the other CompletableFuture complete exceptionally,
2414 * then the returned CompletableFuture may also do so, with a
2415 * CompletionException holding one of these exceptions as its cause.
2416 * No guarantees are made about which result or exception is used in
2417 * the returned CompletableFuture.
2418 *
2419 * @param other the other CompletableFuture
2420 * @param block the action to perform before completing the
2421 * returned CompletableFuture
2422 * @return the new CompletableFuture
2423 */
2424 public CompletableFuture<Void> acceptEitherAsync
2425 (CompletableFuture<? extends T> other,
2426 Consumer<? super T> block) {
2427 return doOrAccept(other, block, ForkJoinPool.commonPool());
2428 }
2429
2430 /**
2431 * Returns a new CompletableFuture that is asynchronously completed
2432 * when either this or the other given CompletableFuture completes,
2433 * after performing the given action with the result of either this
2434 * or the other CompletableFuture's result from a task running in
2435 * the given executor.
2436 *
2437 * <p>If this and/or the other CompletableFuture complete exceptionally,
2438 * then the returned CompletableFuture may also do so, with a
2439 * CompletionException holding one of these exceptions as its cause.
2440 * No guarantees are made about which result or exception is used in
2441 * the returned CompletableFuture.
2442 *
2443 * @param other the other CompletableFuture
2444 * @param block the action to perform before completing the
2445 * returned CompletableFuture
2446 * @param executor the executor to use for asynchronous execution
2447 * @return the new CompletableFuture
2448 */
2449 public CompletableFuture<Void> acceptEitherAsync
2450 (CompletableFuture<? extends T> other,
2451 Consumer<? super T> block,
2452 Executor executor) {
2453 if (executor == null) throw new NullPointerException();
2454 return doOrAccept(other, block, executor);
2455 }
2456
2457 private CompletableFuture<Void> doOrAccept
2458 (CompletableFuture<? extends T> other,
2459 Consumer<? super T> fn,
2460 Executor e) {
2461 if (other == null || fn == null) throw new NullPointerException();
2462 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2463 OrAcceptCompletion<T> d = null;
2464 Object r;
2465 if ((r = result) == null && (r = other.result) == null) {
2466 d = new OrAcceptCompletion<T>(this, other, fn, dst, e);
2467 CompletionNode q = null, p = new CompletionNode(d);
2468 while ((r = result) == null && (r = other.result) == null) {
2469 if (q != null) {
2470 if (UNSAFE.compareAndSwapObject
2471 (other, COMPLETIONS, q.next = other.completions, q))
2472 break;
2473 }
2474 else if (UNSAFE.compareAndSwapObject
2475 (this, COMPLETIONS, p.next = completions, p))
2476 q = new CompletionNode(d);
2477 }
2478 }
2479 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2480 T t; Throwable ex;
2481 if (r instanceof AltResult) {
2482 ex = ((AltResult)r).ex;
2483 t = null;
2484 }
2485 else {
2486 ex = null;
2487 @SuppressWarnings("unchecked") T tr = (T) r;
2488 t = tr;
2489 }
2490 if (ex == null) {
2491 try {
2492 if (e != null)
2493 e.execute(new AsyncAccept<T>(t, fn, dst));
2494 else
2495 fn.accept(t);
2496 } catch (Throwable rex) {
2497 ex = rex;
2498 }
2499 }
2500 if (e == null || ex != null)
2501 dst.internalComplete(null, ex);
2502 }
2503 helpPostComplete();
2504 other.helpPostComplete();
2505 return dst;
2506 }
2507
2508 /**
2509 * Returns a new CompletableFuture that is completed
2510 * when either this or the other given CompletableFuture completes,
2511 * after performing the given action.
2512 *
2513 * <p>If this and/or the other CompletableFuture complete exceptionally,
2514 * then the returned CompletableFuture may also do so, with a
2515 * CompletionException holding one of these exceptions as its cause.
2516 * No guarantees are made about which result or exception is used in
2517 * the returned CompletableFuture.
2518 *
2519 * @param other the other CompletableFuture
2520 * @param action the action to perform before completing the
2521 * returned CompletableFuture
2522 * @return the new CompletableFuture
2523 */
2524 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2525 Runnable action) {
2526 return doOrRun(other, action, null);
2527 }
2528
2529 /**
2530 * Returns a new CompletableFuture that is asynchronously completed
2531 * when either this or the other given CompletableFuture completes,
2532 * after performing the given action from a task running in the
2533 * {@link ForkJoinPool#commonPool()}.
2534 *
2535 * <p>If this and/or the other CompletableFuture complete exceptionally,
2536 * then the returned CompletableFuture may also do so, with a
2537 * CompletionException holding one of these exceptions as its cause.
2538 * No guarantees are made about which result or exception is used in
2539 * the returned CompletableFuture.
2540 *
2541 * @param other the other CompletableFuture
2542 * @param action the action to perform before completing the
2543 * returned CompletableFuture
2544 * @return the new CompletableFuture
2545 */
2546 public CompletableFuture<Void> runAfterEitherAsync
2547 (CompletableFuture<?> other,
2548 Runnable action) {
2549 return doOrRun(other, action, ForkJoinPool.commonPool());
2550 }
2551
2552 /**
2553 * Returns a new CompletableFuture that is asynchronously completed
2554 * when either this or the other given CompletableFuture completes,
2555 * after performing the given action from a task running in the
2556 * given executor.
2557 *
2558 * <p>If this and/or the other CompletableFuture complete exceptionally,
2559 * then the returned CompletableFuture may also do so, with a
2560 * CompletionException holding one of these exceptions as its cause.
2561 * No guarantees are made about which result or exception is used in
2562 * the returned CompletableFuture.
2563 *
2564 * @param other the other CompletableFuture
2565 * @param action the action to perform before completing the
2566 * returned CompletableFuture
2567 * @param executor the executor to use for asynchronous execution
2568 * @return the new CompletableFuture
2569 */
2570 public CompletableFuture<Void> runAfterEitherAsync
2571 (CompletableFuture<?> other,
2572 Runnable action,
2573 Executor executor) {
2574 if (executor == null) throw new NullPointerException();
2575 return doOrRun(other, action, executor);
2576 }
2577
2578 private CompletableFuture<Void> doOrRun(CompletableFuture<?> other,
2579 Runnable action,
2580 Executor e) {
2581 if (other == null || action == null) throw new NullPointerException();
2582 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2583 OrRunCompletion<T> d = null;
2584 Object r;
2585 if ((r = result) == null && (r = other.result) == null) {
2586 d = new OrRunCompletion<T>(this, other, action, dst, e);
2587 CompletionNode q = null, p = new CompletionNode(d);
2588 while ((r = result) == null && (r = other.result) == null) {
2589 if (q != null) {
2590 if (UNSAFE.compareAndSwapObject
2591 (other, COMPLETIONS, q.next = other.completions, q))
2592 break;
2593 }
2594 else if (UNSAFE.compareAndSwapObject
2595 (this, COMPLETIONS, p.next = completions, p))
2596 q = new CompletionNode(d);
2597 }
2598 }
2599 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2600 Throwable ex;
2601 if (r instanceof AltResult)
2602 ex = ((AltResult)r).ex;
2603 else
2604 ex = null;
2605 if (ex == null) {
2606 try {
2607 if (e != null)
2608 e.execute(new AsyncRun(action, dst));
2609 else
2610 action.run();
2611 } catch (Throwable rex) {
2612 ex = rex;
2613 }
2614 }
2615 if (e == null || ex != null)
2616 dst.internalComplete(null, ex);
2617 }
2618 helpPostComplete();
2619 other.helpPostComplete();
2620 return dst;
2621 }
2622
2623 /**
2624 * Returns a CompletableFuture (or an equivalent one) produced by the
2625 * given function of the result of this CompletableFuture when completed.
2626 *
2627 * <p>If this CompletableFuture completes exceptionally,
2628 * then the returned CompletableFuture also does so, with a
2629 * CompletionException holding this exception as its cause.
2630 *
2631 * @param fn the function returning a new CompletableFuture
2632 * @return the CompletableFuture, that {@code isDone()} upon
2633 * return if completed by the given function, or an exception
2634 * occurs
2635 */
2636 public <U> CompletableFuture<U> thenCompose
2637 (Function<? super T, CompletableFuture<U>> fn) {
2638 return doCompose(fn, null);
2639 }
2640
2641 /**
2642 * Returns a CompletableFuture (or an equivalent one) produced
2643 * asynchronously using the {@link ForkJoinPool#commonPool()} by
2644 * the given function of the result of this CompletableFuture when
2645 * completed.
2646 *
2647 * <p>If this CompletableFuture completes exceptionally,
2648 * then the returned CompletableFuture also does so, with a
2649 * CompletionException holding this exception as its cause.
2650 *
2651 * @param fn the function returning a new CompletableFuture
2652 * @return the CompletableFuture, that {@code isDone()} upon
2653 * return if completed by the given function, or an exception
2654 * occurs
2655 */
2656 public <U> CompletableFuture<U> thenComposeAsync
2657 (Function<? super T, CompletableFuture<U>> fn) {
2658 return doCompose(fn, ForkJoinPool.commonPool());
2659 }
2660
2661 /**
2662 * Returns a CompletableFuture (or an equivalent one) produced
2663 * asynchronously using the given executor by the given function
2664 * of the result of this CompletableFuture when completed.
2665 *
2666 * <p>If this CompletableFuture completes exceptionally,
2667 * then the returned CompletableFuture also does so, with a
2668 * CompletionException holding this exception as its cause.
2669 *
2670 * @param fn the function returning a new CompletableFuture
2671 * @param executor the executor to use for asynchronous execution
2672 * @return the CompletableFuture, that {@code isDone()} upon
2673 * return if completed by the given function, or an exception
2674 * occurs
2675 */
2676 public <U> CompletableFuture<U> thenComposeAsync
2677 (Function<? super T, CompletableFuture<U>> fn,
2678 Executor executor) {
2679 if (executor == null) throw new NullPointerException();
2680 return doCompose(fn, executor);
2681 }
2682
2683 private <U> CompletableFuture<U> doCompose
2684 (Function<? super T, CompletableFuture<U>> fn,
2685 Executor e) {
2686 if (fn == null) throw new NullPointerException();
2687 CompletableFuture<U> dst = null;
2688 ComposeCompletion<T,U> d = null;
2689 Object r;
2690 if ((r = result) == null) {
2691 dst = new CompletableFuture<U>();
2692 CompletionNode p = new CompletionNode
2693 (d = new ComposeCompletion<T,U>(this, fn, dst, e));
2694 while ((r = result) == null) {
2695 if (UNSAFE.compareAndSwapObject
2696 (this, COMPLETIONS, p.next = completions, p))
2697 break;
2698 }
2699 }
2700 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2701 T t; Throwable ex;
2702 if (r instanceof AltResult) {
2703 ex = ((AltResult)r).ex;
2704 t = null;
2705 }
2706 else {
2707 ex = null;
2708 @SuppressWarnings("unchecked") T tr = (T) r;
2709 t = tr;
2710 }
2711 if (ex == null) {
2712 if (e != null) {
2713 if (dst == null)
2714 dst = new CompletableFuture<U>();
2715 e.execute(new AsyncCompose<T,U>(t, fn, dst));
2716 }
2717 else {
2718 try {
2719 if ((dst = fn.apply(t)) == null)
2720 ex = new NullPointerException();
2721 } catch (Throwable rex) {
2722 ex = rex;
2723 }
2724 if (dst == null)
2725 dst = new CompletableFuture<U>();
2726 }
2727 }
2728 if (e == null && ex != null)
2729 dst.internalComplete(null, ex);
2730 }
2731 helpPostComplete();
2732 dst.helpPostComplete();
2733 return dst;
2734 }
2735
2736 /**
2737 * Returns a new CompletableFuture that is completed when this
2738 * CompletableFuture completes, with the result of the given
2739 * function of the exception triggering this CompletableFuture's
2740 * completion when it completes exceptionally; otherwise, if this
2741 * CompletableFuture completes normally, then the returned
2742 * CompletableFuture also completes normally with the same value.
2743 *
2744 * @param fn the function to use to compute the value of the
2745 * returned CompletableFuture if this CompletableFuture completed
2746 * exceptionally
2747 * @return the new CompletableFuture
2748 */
2749 public CompletableFuture<T> exceptionally
2750 (Function<Throwable, ? extends T> fn) {
2751 if (fn == null) throw new NullPointerException();
2752 CompletableFuture<T> dst = new CompletableFuture<T>();
2753 ExceptionCompletion<T> d = null;
2754 Object r;
2755 if ((r = result) == null) {
2756 CompletionNode p =
2757 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2758 while ((r = result) == null) {
2759 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2760 p.next = completions, p))
2761 break;
2762 }
2763 }
2764 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2765 T t = null; Throwable ex, dx = null;
2766 if (r instanceof AltResult) {
2767 if ((ex = ((AltResult)r).ex) != null) {
2768 try {
2769 t = fn.apply(ex);
2770 } catch (Throwable rex) {
2771 dx = rex;
2772 }
2773 }
2774 }
2775 else {
2776 @SuppressWarnings("unchecked") T tr = (T) r;
2777 t = tr;
2778 }
2779 dst.internalComplete(t, dx);
2780 }
2781 helpPostComplete();
2782 return dst;
2783 }
2784
2785 /**
2786 * Returns a new CompletableFuture that is completed when this
2787 * CompletableFuture completes, with the result of the given
2788 * function of the result and exception of this CompletableFuture's
2789 * completion. The given function is invoked with the result (or
2790 * {@code null} if none) and the exception (or {@code null} if none)
2791 * of this CompletableFuture when complete.
2792 *
2793 * @param fn the function to use to compute the value of the
2794 * returned CompletableFuture
2795 * @return the new CompletableFuture
2796 */
2797 public <U> CompletableFuture<U> handle
2798 (BiFunction<? super T, Throwable, ? extends U> fn) {
2799 if (fn == null) throw new NullPointerException();
2800 CompletableFuture<U> dst = new CompletableFuture<U>();
2801 HandleCompletion<T,U> d = null;
2802 Object r;
2803 if ((r = result) == null) {
2804 CompletionNode p =
2805 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2806 while ((r = result) == null) {
2807 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2808 p.next = completions, p))
2809 break;
2810 }
2811 }
2812 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2813 T t; Throwable ex;
2814 if (r instanceof AltResult) {
2815 ex = ((AltResult)r).ex;
2816 t = null;
2817 }
2818 else {
2819 ex = null;
2820 @SuppressWarnings("unchecked") T tr = (T) r;
2821 t = tr;
2822 }
2823 U u; Throwable dx;
2824 try {
2825 u = fn.apply(t, ex);
2826 dx = null;
2827 } catch (Throwable rex) {
2828 dx = rex;
2829 u = null;
2830 }
2831 dst.internalComplete(u, dx);
2832 }
2833 helpPostComplete();
2834 return dst;
2835 }
2836
2837
2838 /* ------------- Arbitrary-arity constructions -------------- */
2839
2840 /*
2841 * The basic plan of attack is to recursively form binary
2842 * completion trees of elements. This can be overkill for small
2843 * sets, but scales nicely. The And/All vs Or/Any forms use the
2844 * same idea, but details differ.
2845 */
2846
2847 /**
2848 * Returns a new CompletableFuture that is completed when all of
2849 * the given CompletableFutures complete. If any of the given
2850 * CompletableFutures complete exceptionally, then so does the
2851 * returned CompletableFuture. Otherwise, the results, if any, of
2852 * the given CompletableFutures are not reflected in the returned
2853 * CompletableFuture, but may be obtained by inspecting them
2854 * individually. If no CompletableFutures are provided, returns a
2855 * CompletableFuture completed with the value {@code null}.
2856 *
2857 * <p>Among the applications of this method is to await completion
2858 * of a set of independent CompletableFutures before continuing a
2859 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2860 * c3).join();}.
2861 *
2862 * @param cfs the CompletableFutures
2863 * @return a new CompletableFuture that is completed when all of the
2864 * given CompletableFutures complete
2865 * @throws NullPointerException if the array or any of its elements are
2866 * {@code null}
2867 */
2868 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2869 int len = cfs.length; // Directly handle empty and singleton cases
2870 if (len > 1)
2871 return allTree(cfs, 0, len - 1);
2872 else {
2873 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2874 CompletableFuture<?> f;
2875 if (len == 0)
2876 dst.result = NIL;
2877 else if ((f = cfs[0]) == null)
2878 throw new NullPointerException();
2879 else {
2880 ThenCopy d = null;
2881 CompletionNode p = null;
2882 Object r;
2883 while ((r = f.result) == null) {
2884 if (d == null)
2885 d = new ThenCopy(f, dst);
2886 else if (p == null)
2887 p = new CompletionNode(d);
2888 else if (UNSAFE.compareAndSwapObject
2889 (f, COMPLETIONS, p.next = f.completions, p))
2890 break;
2891 }
2892 if (r != null && (d == null || d.compareAndSet(0, 1)))
2893 dst.internalComplete(null, (r instanceof AltResult) ?
2894 ((AltResult)r).ex : null);
2895 f.helpPostComplete();
2896 }
2897 return dst;
2898 }
2899 }
2900
2901 /**
2902 * Recursively constructs an And'ed tree of CompletableFutures.
2903 * Called only when array known to have at least two elements.
2904 */
2905 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2906 int lo, int hi) {
2907 CompletableFuture<?> fst, snd;
2908 int mid = (lo + hi) >>> 1;
2909 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2910 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2911 throw new NullPointerException();
2912 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2913 AndCompletion d = null;
2914 CompletionNode p = null, q = null;
2915 Object r = null, s = null;
2916 while ((r = fst.result) == null || (s = snd.result) == null) {
2917 if (d == null)
2918 d = new AndCompletion(fst, snd, dst);
2919 else if (p == null)
2920 p = new CompletionNode(d);
2921 else if (q == null) {
2922 if (UNSAFE.compareAndSwapObject
2923 (fst, COMPLETIONS, p.next = fst.completions, p))
2924 q = new CompletionNode(d);
2925 }
2926 else if (UNSAFE.compareAndSwapObject
2927 (snd, COMPLETIONS, q.next = snd.completions, q))
2928 break;
2929 }
2930 if ((r != null || (r = fst.result) != null) &&
2931 (s != null || (s = snd.result) != null) &&
2932 (d == null || d.compareAndSet(0, 1))) {
2933 Throwable ex;
2934 if (r instanceof AltResult)
2935 ex = ((AltResult)r).ex;
2936 else
2937 ex = null;
2938 if (ex == null && (s instanceof AltResult))
2939 ex = ((AltResult)s).ex;
2940 dst.internalComplete(null, ex);
2941 }
2942 fst.helpPostComplete();
2943 snd.helpPostComplete();
2944 return dst;
2945 }
2946
2947 /**
2948 * Returns a new CompletableFuture that is completed when any of
2949 * the given CompletableFutures complete; with the same result if
2950 * it completed normally, otherwise exceptionally. If no
2951 * CompletableFutures are provided, returns an incomplete
2952 * CompletableFuture.
2953 *
2954 * @param cfs the CompletableFutures
2955 * @return a new CompletableFuture that is completed when any of the
2956 * given CompletableFutures complete
2957 * @throws NullPointerException if the array or any of its elements are
2958 * {@code null}
2959 */
2960 public static CompletableFuture<?> anyOf(CompletableFuture<?>... cfs) {
2961 int len = cfs.length; // Same idea as allOf
2962 if (len > 1)
2963 return anyTree(cfs, 0, len - 1);
2964 else {
2965 CompletableFuture<?> dst = new CompletableFuture<Object>();
2966 CompletableFuture<?> f;
2967 if (len == 0)
2968 ; // skip
2969 else if ((f = cfs[0]) == null)
2970 throw new NullPointerException();
2971 else {
2972 ThenCopy d = null;
2973 CompletionNode p = null;
2974 Object r;
2975 while ((r = f.result) == null) {
2976 if (d == null)
2977 d = new ThenCopy(f, dst);
2978 else if (p == null)
2979 p = new CompletionNode(d);
2980 else if (UNSAFE.compareAndSwapObject
2981 (f, COMPLETIONS, p.next = f.completions, p))
2982 break;
2983 }
2984 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2985 Throwable ex; Object t;
2986 if (r instanceof AltResult) {
2987 ex = ((AltResult)r).ex;
2988 t = null;
2989 }
2990 else {
2991 ex = null;
2992 t = r;
2993 }
2994 dst.internalComplete(t, ex);
2995 }
2996 f.helpPostComplete();
2997 }
2998 return dst;
2999 }
3000 }
3001
3002 /**
3003 * Recursively constructs an Or'ed tree of CompletableFutures.
3004 */
3005 private static CompletableFuture<?> anyTree(CompletableFuture<?>[] cfs,
3006 int lo, int hi) {
3007 CompletableFuture<?> fst, snd;
3008 int mid = (lo + hi) >>> 1;
3009 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
3010 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
3011 throw new NullPointerException();
3012 CompletableFuture<?> dst = new CompletableFuture<Object>();
3013 OrCompletion d = null;
3014 CompletionNode p = null, q = null;
3015 Object r;
3016 while ((r = fst.result) == null && (r = snd.result) == null) {
3017 if (d == null)
3018 d = new OrCompletion(fst, snd, dst);
3019 else if (p == null)
3020 p = new CompletionNode(d);
3021 else if (q == null) {
3022 if (UNSAFE.compareAndSwapObject
3023 (fst, COMPLETIONS, p.next = fst.completions, p))
3024 q = new CompletionNode(d);
3025 }
3026 else if (UNSAFE.compareAndSwapObject
3027 (snd, COMPLETIONS, q.next = snd.completions, q))
3028 break;
3029 }
3030 if ((r != null || (r = fst.result) != null ||
3031 (r = snd.result) != null) &&
3032 (d == null || d.compareAndSet(0, 1))) {
3033 Throwable ex; Object t;
3034 if (r instanceof AltResult) {
3035 ex = ((AltResult)r).ex;
3036 t = null;
3037 }
3038 else {
3039 ex = null;
3040 t = r;
3041 }
3042 dst.internalComplete(t, ex);
3043 }
3044 fst.helpPostComplete();
3045 snd.helpPostComplete();
3046 return dst;
3047 }
3048
3049
3050 /* ------------- Control and status methods -------------- */
3051
3052 /**
3053 * If not already completed, completes this CompletableFuture with
3054 * a {@link CancellationException}. Dependent CompletableFutures
3055 * that have not already completed will also complete
3056 * exceptionally, with a {@link CompletionException} caused by
3057 * this {@code CancellationException}.
3058 *
3059 * @param mayInterruptIfRunning this value has no effect in this
3060 * implementation because interrupts are not used to control
3061 * processing.
3062 *
3063 * @return {@code true} if this task is now cancelled
3064 */
3065 public boolean cancel(boolean mayInterruptIfRunning) {
3066 boolean cancelled = (result == null) &&
3067 UNSAFE.compareAndSwapObject
3068 (this, RESULT, null, new AltResult(new CancellationException()));
3069 postComplete();
3070 return cancelled || isCancelled();
3071 }
3072
3073 /**
3074 * Returns {@code true} if this CompletableFuture was cancelled
3075 * before it completed normally.
3076 *
3077 * @return {@code true} if this CompletableFuture was cancelled
3078 * before it completed normally
3079 */
3080 public boolean isCancelled() {
3081 Object r;
3082 return ((r = result) instanceof AltResult) &&
3083 (((AltResult)r).ex instanceof CancellationException);
3084 }
3085
3086 /**
3087 * Forcibly sets or resets the value subsequently returned by
3088 * method {@link #get()} and related methods, whether or not
3089 * already completed. This method is designed for use only in
3090 * error recovery actions, and even in such situations may result
3091 * in ongoing dependent completions using established versus
3092 * overwritten outcomes.
3093 *
3094 * @param value the completion value
3095 */
3096 public void obtrudeValue(T value) {
3097 result = (value == null) ? NIL : value;
3098 postComplete();
3099 }
3100
3101 /**
3102 * Forcibly causes subsequent invocations of method {@link #get()}
3103 * and related methods to throw the given exception, whether or
3104 * not already completed. This method is designed for use only in
3105 * recovery actions, and even in such situations may result in
3106 * ongoing dependent completions using established versus
3107 * overwritten outcomes.
3108 *
3109 * @param ex the exception
3110 */
3111 public void obtrudeException(Throwable ex) {
3112 if (ex == null) throw new NullPointerException();
3113 result = new AltResult(ex);
3114 postComplete();
3115 }
3116
3117 /**
3118 * Returns the estimated number of CompletableFutures whose
3119 * completions are awaiting completion of this CompletableFuture.
3120 * This method is designed for use in monitoring system state, not
3121 * for synchronization control.
3122 *
3123 * @return the number of dependent CompletableFutures
3124 */
3125 public int getNumberOfDependents() {
3126 int count = 0;
3127 for (CompletionNode p = completions; p != null; p = p.next)
3128 ++count;
3129 return count;
3130 }
3131
3132 /**
3133 * Returns a string identifying this CompletableFuture, as well as
3134 * its completion state. The state, in brackets, contains the
3135 * String {@code "Completed Normally"} or the String {@code
3136 * "Completed Exceptionally"}, or the String {@code "Not
3137 * completed"} followed by the number of CompletableFutures
3138 * dependent upon its completion, if any.
3139 *
3140 * @return a string identifying this CompletableFuture, as well as its state
3141 */
3142 public String toString() {
3143 Object r = result;
3144 int count;
3145 return super.toString() +
3146 ((r == null) ?
3147 (((count = getNumberOfDependents()) == 0) ?
3148 "[Not completed]" :
3149 "[Not completed, " + count + " dependents]") :
3150 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
3151 "[Completed exceptionally]" :
3152 "[Completed normally]"));
3153 }
3154
3155 // Unsafe mechanics
3156 private static final sun.misc.Unsafe UNSAFE;
3157 private static final long RESULT;
3158 private static final long WAITERS;
3159 private static final long COMPLETIONS;
3160 static {
3161 try {
3162 UNSAFE = sun.misc.Unsafe.getUnsafe();
3163 Class<?> k = CompletableFuture.class;
3164 RESULT = UNSAFE.objectFieldOffset
3165 (k.getDeclaredField("result"));
3166 WAITERS = UNSAFE.objectFieldOffset
3167 (k.getDeclaredField("waiters"));
3168 COMPLETIONS = UNSAFE.objectFieldOffset
3169 (k.getDeclaredField("completions"));
3170 } catch (Exception e) {
3171 throw new Error(e);
3172 }
3173 }
3174 }