ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.46
Committed: Wed Feb 6 13:55:22 2013 UTC (11 years, 3 months ago) by dl
Branch: MAIN
Changes since 1.45: +7 -9 lines
Log Message:
Use symmetrical construction for cancel

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.locks.LockSupport;
24
25 /**
26 * A {@link Future} that may be explicitly completed (setting its
27 * value and status), and may include dependent functions and actions
28 * that trigger upon its completion.
29 *
30 * <p>When two or more threads attempt to {@link #complete} or {@link
31 * #completeExceptionally} a CompletableFuture, only one of them
32 * succeeds.
33 *
34 * <p>Methods are available for adding dependents based on Functions,
35 * Consumers, and Runnables. The appropriate form to use depends on
36 * whether actions require arguments and/or produce results. Actions
37 * may also be triggered after either or both the current and another
38 * CompletableFuture complete. Multiple CompletableFutures may also
39 * be grouped as one using {@link #anyOf} and {@link #allOf}.
40 *
41 * <p>Actions supplied for dependent completions (mainly using methods
42 * with prefix {@code then}) may be performed by the thread that
43 * completes the current CompletableFuture, or by any other caller of
44 * these methods. There are no guarantees about the order of
45 * processing completions unless constrained by these methods.
46 *
47 * <p>Upon exceptional completion, or when a completion entails
48 * computation, and it terminates abruptly with an (unchecked)
49 * exception or error, then further completions act as {@code
50 * completeExceptionally} with a {@link CompletionException} holding
51 * that exception as its cause. If a CompletableFuture completes
52 * exceptionally, and is not followed by a {@link #exceptionally} or
53 * {@link #handle} completion, then all of its dependents (and their
54 * dependents) also complete exceptionally with CompletionExceptions
55 * holding the ultimate cause. In case of a CompletionException,
56 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
57 * {@link ExecutionException} with the same cause as would be held in
58 * the corresponding CompletionException. However, in these cases,
59 * methods {@link #join()} and {@link #getNow} throw the
60 * CompletionException, which simplifies usage especially within other
61 * completion functions.
62 *
63 * <p>CompletableFutures themselves do not execute asynchronously.
64 * However, the {@code async} methods provide commonly useful ways to
65 * commence asynchronous processing, using either a given {@link
66 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
67 * function or action that will result in the completion of a new
68 * CompletableFuture. To simplify monitoring, debugging, and tracking,
69 * all generated asynchronous tasks are instances of the tagging
70 * interface {@link AsynchronousCompletionTask}.
71 *
72 * @author Doug Lea
73 * @since 1.8
74 */
75 public class CompletableFuture<T> implements Future<T> {
76
77 /*
78 * Overview:
79 *
80 * 1. Non-nullness of field result (set via CAS) indicates done.
81 * An AltResult is used to box null as a result, as well as to
82 * hold exceptions. Using a single field makes completion fast
83 * and simple to detect and trigger, at the expense of a lot of
84 * encoding and decoding that infiltrates many methods. One minor
85 * simplification relies on the (static) NIL (to box null results)
86 * being the only AltResult with a null exception field, so we
87 * don't usually need explicit comparisons with NIL. The CF
88 * exception propagation mechanics surrounding decoding rely on
89 * unchecked casts of decoded results really being unchecked,
90 * where user type errors are caught at point of use, as is
91 * currently the case in Java. These are highlighted by using
92 * SuppressWarnings-annotated temporaries.
93 *
94 * 2. Waiters are held in a Treiber stack similar to the one used
95 * in FutureTask, Phaser, and SynchronousQueue. See their
96 * internal documentation for algorithmic details.
97 *
98 * 3. Completions are also kept in a list/stack, and pulled off
99 * and run when completion is triggered. (We could even use the
100 * same stack as for waiters, but would give up the potential
101 * parallelism obtained because woken waiters help release/run
102 * others -- see method postComplete). Because post-processing
103 * may race with direct calls, class Completion opportunistically
104 * extends AtomicInteger so callers can claim the action via
105 * compareAndSet(0, 1). The Completion.run methods are all
106 * written a boringly similar uniform way (that sometimes includes
107 * unnecessary-looking checks, kept to maintain uniformity). There
108 * are enough dimensions upon which they differ that attempts to
109 * factor commonalities while maintaining efficiency require more
110 * lines of code than they would save.
111 *
112 * 4. The exported then/and/or methods do support a bit of
113 * factoring (see doThenApply etc). They must cope with the
114 * intrinsic races surrounding addition of a dependent action
115 * versus performing the action directly because the task is
116 * already complete. For example, a CF may not be complete upon
117 * entry, so a dependent completion is added, but by the time it
118 * is added, the target CF is complete, so must be directly
119 * executed. This is all done while avoiding unnecessary object
120 * construction in safe-bypass cases.
121 */
122
123 // preliminaries
124
125 static final class AltResult {
126 final Throwable ex; // null only for NIL
127 AltResult(Throwable ex) { this.ex = ex; }
128 }
129
130 static final AltResult NIL = new AltResult(null);
131
132 // Fields
133
134 volatile Object result; // Either the result or boxed AltResult
135 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
136 volatile CompletionNode completions; // list (Treiber stack) of completions
137
138 // Basic utilities for triggering and processing completions
139
140 /**
141 * Removes and signals all waiting threads and runs all completions.
142 */
143 final void postComplete() {
144 WaitNode q; Thread t;
145 while ((q = waiters) != null) {
146 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
147 (t = q.thread) != null) {
148 q.thread = null;
149 LockSupport.unpark(t);
150 }
151 }
152
153 CompletionNode h; Completion c;
154 while ((h = completions) != null) {
155 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
156 (c = h.completion) != null)
157 c.run();
158 }
159 }
160
161 /**
162 * Triggers completion with the encoding of the given arguments:
163 * if the exception is non-null, encodes it as a wrapped
164 * CompletionException unless it is one already. Otherwise uses
165 * the given result, boxed as NIL if null.
166 */
167 final void internalComplete(Object v, Throwable ex) {
168 if (result == null)
169 UNSAFE.compareAndSwapObject
170 (this, RESULT, null,
171 (ex == null) ? (v == null) ? NIL : v :
172 new AltResult((ex instanceof CompletionException) ? ex :
173 new CompletionException(ex)));
174 postComplete(); // help out even if not triggered
175 }
176
177 /**
178 * If triggered, helps release and/or process completions.
179 */
180 final void helpPostComplete() {
181 if (result != null)
182 postComplete();
183 }
184
185 /* ------------- waiting for completions -------------- */
186
187 /** Number of processors, for spin control */
188 static final int NCPU = Runtime.getRuntime().availableProcessors();
189
190 /**
191 * Heuristic spin value for waitingGet() before blocking on
192 * multiprocessors
193 */
194 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
195
196 /**
197 * Linked nodes to record waiting threads in a Treiber stack. See
198 * other classes such as Phaser and SynchronousQueue for more
199 * detailed explanation. This class implements ManagedBlocker to
200 * avoid starvation when blocking actions pile up in
201 * ForkJoinPools.
202 */
203 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
204 long nanos; // wait time if timed
205 final long deadline; // non-zero if timed
206 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
207 volatile Thread thread;
208 volatile WaitNode next;
209 WaitNode(boolean interruptible, long nanos, long deadline) {
210 this.thread = Thread.currentThread();
211 this.interruptControl = interruptible ? 1 : 0;
212 this.nanos = nanos;
213 this.deadline = deadline;
214 }
215 public boolean isReleasable() {
216 if (thread == null)
217 return true;
218 if (Thread.interrupted()) {
219 int i = interruptControl;
220 interruptControl = -1;
221 if (i > 0)
222 return true;
223 }
224 if (deadline != 0L &&
225 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
226 thread = null;
227 return true;
228 }
229 return false;
230 }
231 public boolean block() {
232 if (isReleasable())
233 return true;
234 else if (deadline == 0L)
235 LockSupport.park(this);
236 else if (nanos > 0L)
237 LockSupport.parkNanos(this, nanos);
238 return isReleasable();
239 }
240 }
241
242 /**
243 * Returns raw result after waiting, or null if interruptible and
244 * interrupted.
245 */
246 private Object waitingGet(boolean interruptible) {
247 WaitNode q = null;
248 boolean queued = false;
249 int spins = SPINS;
250 for (Object r;;) {
251 if ((r = result) != null) {
252 if (q != null) { // suppress unpark
253 q.thread = null;
254 if (q.interruptControl < 0) {
255 if (interruptible) {
256 removeWaiter(q);
257 return null;
258 }
259 Thread.currentThread().interrupt();
260 }
261 }
262 postComplete(); // help release others
263 return r;
264 }
265 else if (spins > 0) {
266 int rnd = ThreadLocalRandom.nextSecondarySeed();
267 if (rnd == 0)
268 rnd = ThreadLocalRandom.current().nextInt();
269 if (rnd >= 0)
270 --spins;
271 }
272 else if (q == null)
273 q = new WaitNode(interruptible, 0L, 0L);
274 else if (!queued)
275 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
276 q.next = waiters, q);
277 else if (interruptible && q.interruptControl < 0) {
278 removeWaiter(q);
279 return null;
280 }
281 else if (q.thread != null && result == null) {
282 try {
283 ForkJoinPool.managedBlock(q);
284 } catch (InterruptedException ex) {
285 q.interruptControl = -1;
286 }
287 }
288 }
289 }
290
291 /**
292 * Awaits completion or aborts on interrupt or timeout.
293 *
294 * @param nanos time to wait
295 * @return raw result
296 */
297 private Object timedAwaitDone(long nanos)
298 throws InterruptedException, TimeoutException {
299 WaitNode q = null;
300 boolean queued = false;
301 for (Object r;;) {
302 if ((r = result) != null) {
303 if (q != null) {
304 q.thread = null;
305 if (q.interruptControl < 0) {
306 removeWaiter(q);
307 throw new InterruptedException();
308 }
309 }
310 postComplete();
311 return r;
312 }
313 else if (q == null) {
314 if (nanos <= 0L)
315 throw new TimeoutException();
316 long d = System.nanoTime() + nanos;
317 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
318 }
319 else if (!queued)
320 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
321 q.next = waiters, q);
322 else if (q.interruptControl < 0) {
323 removeWaiter(q);
324 throw new InterruptedException();
325 }
326 else if (q.nanos <= 0L) {
327 if (result == null) {
328 removeWaiter(q);
329 throw new TimeoutException();
330 }
331 }
332 else if (q.thread != null && result == null) {
333 try {
334 ForkJoinPool.managedBlock(q);
335 } catch (InterruptedException ex) {
336 q.interruptControl = -1;
337 }
338 }
339 }
340 }
341
342 /**
343 * Tries to unlink a timed-out or interrupted wait node to avoid
344 * accumulating garbage. Internal nodes are simply unspliced
345 * without CAS since it is harmless if they are traversed anyway
346 * by releasers. To avoid effects of unsplicing from already
347 * removed nodes, the list is retraversed in case of an apparent
348 * race. This is slow when there are a lot of nodes, but we don't
349 * expect lists to be long enough to outweigh higher-overhead
350 * schemes.
351 */
352 private void removeWaiter(WaitNode node) {
353 if (node != null) {
354 node.thread = null;
355 retry:
356 for (;;) { // restart on removeWaiter race
357 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
358 s = q.next;
359 if (q.thread != null)
360 pred = q;
361 else if (pred != null) {
362 pred.next = s;
363 if (pred.thread == null) // check for race
364 continue retry;
365 }
366 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
367 continue retry;
368 }
369 break;
370 }
371 }
372 }
373
374 /* ------------- Async tasks -------------- */
375
376 /**
377 * A tagging interface identifying asynchronous tasks produced by
378 * {@code async} methods. This may be useful for monitoring,
379 * debugging, and tracking asynchronous activities.
380 */
381 public static interface AsynchronousCompletionTask {
382 }
383
384 /** Base class can act as either FJ or plain Runnable */
385 abstract static class Async extends ForkJoinTask<Void>
386 implements Runnable, AsynchronousCompletionTask {
387 public final Void getRawResult() { return null; }
388 public final void setRawResult(Void v) { }
389 public final void run() { exec(); }
390 }
391
392 static final class AsyncRun extends Async {
393 final Runnable fn;
394 final CompletableFuture<Void> dst;
395 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
396 this.fn = fn; this.dst = dst;
397 }
398 public final boolean exec() {
399 CompletableFuture<Void> d; Throwable ex;
400 if ((d = this.dst) != null && d.result == null) {
401 try {
402 fn.run();
403 ex = null;
404 } catch (Throwable rex) {
405 ex = rex;
406 }
407 d.internalComplete(null, ex);
408 }
409 return true;
410 }
411 private static final long serialVersionUID = 5232453952276885070L;
412 }
413
414 static final class AsyncSupply<U> extends Async {
415 final Supplier<U> fn;
416 final CompletableFuture<U> dst;
417 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
418 this.fn = fn; this.dst = dst;
419 }
420 public final boolean exec() {
421 CompletableFuture<U> d; U u; Throwable ex;
422 if ((d = this.dst) != null && d.result == null) {
423 try {
424 u = fn.get();
425 ex = null;
426 } catch (Throwable rex) {
427 ex = rex;
428 u = null;
429 }
430 d.internalComplete(u, ex);
431 }
432 return true;
433 }
434 private static final long serialVersionUID = 5232453952276885070L;
435 }
436
437 static final class AsyncApply<T,U> extends Async {
438 final Function<? super T,? extends U> fn;
439 final T arg;
440 final CompletableFuture<U> dst;
441 AsyncApply(T arg, Function<? super T,? extends U> fn,
442 CompletableFuture<U> dst) {
443 this.arg = arg; this.fn = fn; this.dst = dst;
444 }
445 public final boolean exec() {
446 CompletableFuture<U> d; U u; Throwable ex;
447 if ((d = this.dst) != null && d.result == null) {
448 try {
449 u = fn.apply(arg);
450 ex = null;
451 } catch (Throwable rex) {
452 ex = rex;
453 u = null;
454 }
455 d.internalComplete(u, ex);
456 }
457 return true;
458 }
459 private static final long serialVersionUID = 5232453952276885070L;
460 }
461
462 static final class AsyncBiApply<T,U,V> extends Async {
463 final BiFunction<? super T,? super U,? extends V> fn;
464 final T arg1;
465 final U arg2;
466 final CompletableFuture<V> dst;
467 AsyncBiApply(T arg1, U arg2,
468 BiFunction<? super T,? super U,? extends V> fn,
469 CompletableFuture<V> dst) {
470 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
471 }
472 public final boolean exec() {
473 CompletableFuture<V> d; V v; Throwable ex;
474 if ((d = this.dst) != null && d.result == null) {
475 try {
476 v = fn.apply(arg1, arg2);
477 ex = null;
478 } catch (Throwable rex) {
479 ex = rex;
480 v = null;
481 }
482 d.internalComplete(v, ex);
483 }
484 return true;
485 }
486 private static final long serialVersionUID = 5232453952276885070L;
487 }
488
489 static final class AsyncAccept<T> extends Async {
490 final Consumer<? super T> fn;
491 final T arg;
492 final CompletableFuture<Void> dst;
493 AsyncAccept(T arg, Consumer<? super T> fn,
494 CompletableFuture<Void> dst) {
495 this.arg = arg; this.fn = fn; this.dst = dst;
496 }
497 public final boolean exec() {
498 CompletableFuture<Void> d; Throwable ex;
499 if ((d = this.dst) != null && d.result == null) {
500 try {
501 fn.accept(arg);
502 ex = null;
503 } catch (Throwable rex) {
504 ex = rex;
505 }
506 d.internalComplete(null, ex);
507 }
508 return true;
509 }
510 private static final long serialVersionUID = 5232453952276885070L;
511 }
512
513 static final class AsyncBiAccept<T,U> extends Async {
514 final BiConsumer<? super T,? super U> fn;
515 final T arg1;
516 final U arg2;
517 final CompletableFuture<Void> dst;
518 AsyncBiAccept(T arg1, U arg2,
519 BiConsumer<? super T,? super U> fn,
520 CompletableFuture<Void> dst) {
521 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
522 }
523 public final boolean exec() {
524 CompletableFuture<Void> d; Throwable ex;
525 if ((d = this.dst) != null && d.result == null) {
526 try {
527 fn.accept(arg1, arg2);
528 ex = null;
529 } catch (Throwable rex) {
530 ex = rex;
531 }
532 d.internalComplete(null, ex);
533 }
534 return true;
535 }
536 private static final long serialVersionUID = 5232453952276885070L;
537 }
538
539 static final class AsyncCompose<T,U> extends Async {
540 final Function<? super T, CompletableFuture<U>> fn;
541 final T arg;
542 final CompletableFuture<U> dst;
543 AsyncCompose(T arg,
544 Function<? super T, CompletableFuture<U>> fn,
545 CompletableFuture<U> dst) {
546 this.arg = arg; this.fn = fn; this.dst = dst;
547 }
548 public final boolean exec() {
549 CompletableFuture<U> d, fr; U u; Throwable ex;
550 if ((d = this.dst) != null && d.result == null) {
551 try {
552 fr = fn.apply(arg);
553 ex = null;
554 } catch (Throwable rex) {
555 ex = rex;
556 fr = null;
557 }
558 if (ex != null)
559 u = null;
560 else if (fr == null) {
561 ex = new NullPointerException();
562 u = null;
563 }
564 else {
565 Object r = fr.result;
566 if (r instanceof AltResult) {
567 ex = ((AltResult)r).ex;
568 u = null;
569 }
570 else {
571 @SuppressWarnings("unchecked") U ur = (U) r;
572 u = ur;
573 }
574 }
575 d.internalComplete(u, ex);
576 }
577 return true;
578 }
579 private static final long serialVersionUID = 5232453952276885070L;
580 }
581
582 /* ------------- Completions -------------- */
583
584 /**
585 * Simple linked list nodes to record completions, used in
586 * basically the same way as WaitNodes. (We separate nodes from
587 * the Completions themselves mainly because for the And and Or
588 * methods, the same Completion object resides in two lists.)
589 */
590 static final class CompletionNode {
591 final Completion completion;
592 volatile CompletionNode next;
593 CompletionNode(Completion completion) { this.completion = completion; }
594 }
595
596 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
597 abstract static class Completion extends AtomicInteger implements Runnable {
598 }
599
600 static final class ApplyCompletion<T,U> extends Completion {
601 final CompletableFuture<? extends T> src;
602 final Function<? super T,? extends U> fn;
603 final CompletableFuture<U> dst;
604 final Executor executor;
605 ApplyCompletion(CompletableFuture<? extends T> src,
606 Function<? super T,? extends U> fn,
607 CompletableFuture<U> dst, Executor executor) {
608 this.src = src; this.fn = fn; this.dst = dst;
609 this.executor = executor;
610 }
611 public final void run() {
612 final CompletableFuture<? extends T> a;
613 final Function<? super T,? extends U> fn;
614 final CompletableFuture<U> dst;
615 Object r; T t; Throwable ex;
616 if ((dst = this.dst) != null &&
617 (fn = this.fn) != null &&
618 (a = this.src) != null &&
619 (r = a.result) != null &&
620 compareAndSet(0, 1)) {
621 if (r instanceof AltResult) {
622 ex = ((AltResult)r).ex;
623 t = null;
624 }
625 else {
626 ex = null;
627 @SuppressWarnings("unchecked") T tr = (T) r;
628 t = tr;
629 }
630 Executor e = executor;
631 U u = null;
632 if (ex == null) {
633 try {
634 if (e != null)
635 e.execute(new AsyncApply<T,U>(t, fn, dst));
636 else
637 u = fn.apply(t);
638 } catch (Throwable rex) {
639 ex = rex;
640 }
641 }
642 if (e == null || ex != null)
643 dst.internalComplete(u, ex);
644 }
645 }
646 private static final long serialVersionUID = 5232453952276885070L;
647 }
648
649 static final class AcceptCompletion<T> extends Completion {
650 final CompletableFuture<? extends T> src;
651 final Consumer<? super T> fn;
652 final CompletableFuture<Void> dst;
653 final Executor executor;
654 AcceptCompletion(CompletableFuture<? extends T> src,
655 Consumer<? super T> fn,
656 CompletableFuture<Void> dst, Executor executor) {
657 this.src = src; this.fn = fn; this.dst = dst;
658 this.executor = executor;
659 }
660 public final void run() {
661 final CompletableFuture<? extends T> a;
662 final Consumer<? super T> fn;
663 final CompletableFuture<Void> dst;
664 Object r; T t; Throwable ex;
665 if ((dst = this.dst) != null &&
666 (fn = this.fn) != null &&
667 (a = this.src) != null &&
668 (r = a.result) != null &&
669 compareAndSet(0, 1)) {
670 if (r instanceof AltResult) {
671 ex = ((AltResult)r).ex;
672 t = null;
673 }
674 else {
675 ex = null;
676 @SuppressWarnings("unchecked") T tr = (T) r;
677 t = tr;
678 }
679 Executor e = executor;
680 if (ex == null) {
681 try {
682 if (e != null)
683 e.execute(new AsyncAccept<T>(t, fn, dst));
684 else
685 fn.accept(t);
686 } catch (Throwable rex) {
687 ex = rex;
688 }
689 }
690 if (e == null || ex != null)
691 dst.internalComplete(null, ex);
692 }
693 }
694 private static final long serialVersionUID = 5232453952276885070L;
695 }
696
697 static final class RunCompletion<T> extends Completion {
698 final CompletableFuture<? extends T> src;
699 final Runnable fn;
700 final CompletableFuture<Void> dst;
701 final Executor executor;
702 RunCompletion(CompletableFuture<? extends T> src,
703 Runnable fn,
704 CompletableFuture<Void> dst,
705 Executor executor) {
706 this.src = src; this.fn = fn; this.dst = dst;
707 this.executor = executor;
708 }
709 public final void run() {
710 final CompletableFuture<? extends T> a;
711 final Runnable fn;
712 final CompletableFuture<Void> dst;
713 Object r; Throwable ex;
714 if ((dst = this.dst) != null &&
715 (fn = this.fn) != null &&
716 (a = this.src) != null &&
717 (r = a.result) != null &&
718 compareAndSet(0, 1)) {
719 if (r instanceof AltResult)
720 ex = ((AltResult)r).ex;
721 else
722 ex = null;
723 Executor e = executor;
724 if (ex == null) {
725 try {
726 if (e != null)
727 e.execute(new AsyncRun(fn, dst));
728 else
729 fn.run();
730 } catch (Throwable rex) {
731 ex = rex;
732 }
733 }
734 if (e == null || ex != null)
735 dst.internalComplete(null, ex);
736 }
737 }
738 private static final long serialVersionUID = 5232453952276885070L;
739 }
740
741 static final class BiApplyCompletion<T,U,V> extends Completion {
742 final CompletableFuture<? extends T> src;
743 final CompletableFuture<? extends U> snd;
744 final BiFunction<? super T,? super U,? extends V> fn;
745 final CompletableFuture<V> dst;
746 final Executor executor;
747 BiApplyCompletion(CompletableFuture<? extends T> src,
748 CompletableFuture<? extends U> snd,
749 BiFunction<? super T,? super U,? extends V> fn,
750 CompletableFuture<V> dst, Executor executor) {
751 this.src = src; this.snd = snd;
752 this.fn = fn; this.dst = dst;
753 this.executor = executor;
754 }
755 public final void run() {
756 final CompletableFuture<? extends T> a;
757 final CompletableFuture<? extends U> b;
758 final BiFunction<? super T,? super U,? extends V> fn;
759 final CompletableFuture<V> dst;
760 Object r, s; T t; U u; Throwable ex;
761 if ((dst = this.dst) != null &&
762 (fn = this.fn) != null &&
763 (a = this.src) != null &&
764 (r = a.result) != null &&
765 (b = this.snd) != null &&
766 (s = b.result) != null &&
767 compareAndSet(0, 1)) {
768 if (r instanceof AltResult) {
769 ex = ((AltResult)r).ex;
770 t = null;
771 }
772 else {
773 ex = null;
774 @SuppressWarnings("unchecked") T tr = (T) r;
775 t = tr;
776 }
777 if (ex != null)
778 u = null;
779 else if (s instanceof AltResult) {
780 ex = ((AltResult)s).ex;
781 u = null;
782 }
783 else {
784 @SuppressWarnings("unchecked") U us = (U) s;
785 u = us;
786 }
787 Executor e = executor;
788 V v = null;
789 if (ex == null) {
790 try {
791 if (e != null)
792 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
793 else
794 v = fn.apply(t, u);
795 } catch (Throwable rex) {
796 ex = rex;
797 }
798 }
799 if (e == null || ex != null)
800 dst.internalComplete(v, ex);
801 }
802 }
803 private static final long serialVersionUID = 5232453952276885070L;
804 }
805
806 static final class BiAcceptCompletion<T,U> extends Completion {
807 final CompletableFuture<? extends T> src;
808 final CompletableFuture<? extends U> snd;
809 final BiConsumer<? super T,? super U> fn;
810 final CompletableFuture<Void> dst;
811 final Executor executor;
812 BiAcceptCompletion(CompletableFuture<? extends T> src,
813 CompletableFuture<? extends U> snd,
814 BiConsumer<? super T,? super U> fn,
815 CompletableFuture<Void> dst, Executor executor) {
816 this.src = src; this.snd = snd;
817 this.fn = fn; this.dst = dst;
818 this.executor = executor;
819 }
820 public final void run() {
821 final CompletableFuture<? extends T> a;
822 final CompletableFuture<? extends U> b;
823 final BiConsumer<? super T,? super U> fn;
824 final CompletableFuture<Void> dst;
825 Object r, s; T t; U u; Throwable ex;
826 if ((dst = this.dst) != null &&
827 (fn = this.fn) != null &&
828 (a = this.src) != null &&
829 (r = a.result) != null &&
830 (b = this.snd) != null &&
831 (s = b.result) != null &&
832 compareAndSet(0, 1)) {
833 if (r instanceof AltResult) {
834 ex = ((AltResult)r).ex;
835 t = null;
836 }
837 else {
838 ex = null;
839 @SuppressWarnings("unchecked") T tr = (T) r;
840 t = tr;
841 }
842 if (ex != null)
843 u = null;
844 else if (s instanceof AltResult) {
845 ex = ((AltResult)s).ex;
846 u = null;
847 }
848 else {
849 @SuppressWarnings("unchecked") U us = (U) s;
850 u = us;
851 }
852 Executor e = executor;
853 if (ex == null) {
854 try {
855 if (e != null)
856 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
857 else
858 fn.accept(t, u);
859 } catch (Throwable rex) {
860 ex = rex;
861 }
862 }
863 if (e == null || ex != null)
864 dst.internalComplete(null, ex);
865 }
866 }
867 private static final long serialVersionUID = 5232453952276885070L;
868 }
869
870 static final class BiRunCompletion<T> extends Completion {
871 final CompletableFuture<? extends T> src;
872 final CompletableFuture<?> snd;
873 final Runnable fn;
874 final CompletableFuture<Void> dst;
875 final Executor executor;
876 BiRunCompletion(CompletableFuture<? extends T> src,
877 CompletableFuture<?> snd,
878 Runnable fn,
879 CompletableFuture<Void> dst, Executor executor) {
880 this.src = src; this.snd = snd;
881 this.fn = fn; this.dst = dst;
882 this.executor = executor;
883 }
884 public final void run() {
885 final CompletableFuture<? extends T> a;
886 final CompletableFuture<?> b;
887 final Runnable fn;
888 final CompletableFuture<Void> dst;
889 Object r, s; Throwable ex;
890 if ((dst = this.dst) != null &&
891 (fn = this.fn) != null &&
892 (a = this.src) != null &&
893 (r = a.result) != null &&
894 (b = this.snd) != null &&
895 (s = b.result) != null &&
896 compareAndSet(0, 1)) {
897 if (r instanceof AltResult)
898 ex = ((AltResult)r).ex;
899 else
900 ex = null;
901 if (ex == null && (s instanceof AltResult))
902 ex = ((AltResult)s).ex;
903 Executor e = executor;
904 if (ex == null) {
905 try {
906 if (e != null)
907 e.execute(new AsyncRun(fn, dst));
908 else
909 fn.run();
910 } catch (Throwable rex) {
911 ex = rex;
912 }
913 }
914 if (e == null || ex != null)
915 dst.internalComplete(null, ex);
916 }
917 }
918 private static final long serialVersionUID = 5232453952276885070L;
919 }
920
921 static final class AndCompletion extends Completion {
922 final CompletableFuture<?> src;
923 final CompletableFuture<?> snd;
924 final CompletableFuture<Void> dst;
925 AndCompletion(CompletableFuture<?> src,
926 CompletableFuture<?> snd,
927 CompletableFuture<Void> dst) {
928 this.src = src; this.snd = snd; this.dst = dst;
929 }
930 public final void run() {
931 final CompletableFuture<?> a;
932 final CompletableFuture<?> b;
933 final CompletableFuture<Void> dst;
934 Object r, s; Throwable ex;
935 if ((dst = this.dst) != null &&
936 (a = this.src) != null &&
937 (r = a.result) != null &&
938 (b = this.snd) != null &&
939 (s = b.result) != null &&
940 compareAndSet(0, 1)) {
941 if (r instanceof AltResult)
942 ex = ((AltResult)r).ex;
943 else
944 ex = null;
945 if (ex == null && (s instanceof AltResult))
946 ex = ((AltResult)s).ex;
947 dst.internalComplete(null, ex);
948 }
949 }
950 private static final long serialVersionUID = 5232453952276885070L;
951 }
952
953 static final class OrApplyCompletion<T,U> extends Completion {
954 final CompletableFuture<? extends T> src;
955 final CompletableFuture<? extends T> snd;
956 final Function<? super T,? extends U> fn;
957 final CompletableFuture<U> dst;
958 final Executor executor;
959 OrApplyCompletion(CompletableFuture<? extends T> src,
960 CompletableFuture<? extends T> snd,
961 Function<? super T,? extends U> fn,
962 CompletableFuture<U> dst, Executor executor) {
963 this.src = src; this.snd = snd;
964 this.fn = fn; this.dst = dst;
965 this.executor = executor;
966 }
967 public final void run() {
968 final CompletableFuture<? extends T> a;
969 final CompletableFuture<? extends T> b;
970 final Function<? super T,? extends U> fn;
971 final CompletableFuture<U> dst;
972 Object r; T t; Throwable ex;
973 if ((dst = this.dst) != null &&
974 (fn = this.fn) != null &&
975 (((a = this.src) != null && (r = a.result) != null) ||
976 ((b = this.snd) != null && (r = b.result) != null)) &&
977 compareAndSet(0, 1)) {
978 if (r instanceof AltResult) {
979 ex = ((AltResult)r).ex;
980 t = null;
981 }
982 else {
983 ex = null;
984 @SuppressWarnings("unchecked") T tr = (T) r;
985 t = tr;
986 }
987 Executor e = executor;
988 U u = null;
989 if (ex == null) {
990 try {
991 if (e != null)
992 e.execute(new AsyncApply<T,U>(t, fn, dst));
993 else
994 u = fn.apply(t);
995 } catch (Throwable rex) {
996 ex = rex;
997 }
998 }
999 if (e == null || ex != null)
1000 dst.internalComplete(u, ex);
1001 }
1002 }
1003 private static final long serialVersionUID = 5232453952276885070L;
1004 }
1005
1006 static final class OrAcceptCompletion<T> extends Completion {
1007 final CompletableFuture<? extends T> src;
1008 final CompletableFuture<? extends T> snd;
1009 final Consumer<? super T> fn;
1010 final CompletableFuture<Void> dst;
1011 final Executor executor;
1012 OrAcceptCompletion(CompletableFuture<? extends T> src,
1013 CompletableFuture<? extends T> snd,
1014 Consumer<? super T> fn,
1015 CompletableFuture<Void> dst, Executor executor) {
1016 this.src = src; this.snd = snd;
1017 this.fn = fn; this.dst = dst;
1018 this.executor = executor;
1019 }
1020 public final void run() {
1021 final CompletableFuture<? extends T> a;
1022 final CompletableFuture<? extends T> b;
1023 final Consumer<? super T> fn;
1024 final CompletableFuture<Void> dst;
1025 Object r; T t; Throwable ex;
1026 if ((dst = this.dst) != null &&
1027 (fn = this.fn) != null &&
1028 (((a = this.src) != null && (r = a.result) != null) ||
1029 ((b = this.snd) != null && (r = b.result) != null)) &&
1030 compareAndSet(0, 1)) {
1031 if (r instanceof AltResult) {
1032 ex = ((AltResult)r).ex;
1033 t = null;
1034 }
1035 else {
1036 ex = null;
1037 @SuppressWarnings("unchecked") T tr = (T) r;
1038 t = tr;
1039 }
1040 Executor e = executor;
1041 if (ex == null) {
1042 try {
1043 if (e != null)
1044 e.execute(new AsyncAccept<T>(t, fn, dst));
1045 else
1046 fn.accept(t);
1047 } catch (Throwable rex) {
1048 ex = rex;
1049 }
1050 }
1051 if (e == null || ex != null)
1052 dst.internalComplete(null, ex);
1053 }
1054 }
1055 private static final long serialVersionUID = 5232453952276885070L;
1056 }
1057
1058 static final class OrRunCompletion<T> extends Completion {
1059 final CompletableFuture<? extends T> src;
1060 final CompletableFuture<?> snd;
1061 final Runnable fn;
1062 final CompletableFuture<Void> dst;
1063 final Executor executor;
1064 OrRunCompletion(CompletableFuture<? extends T> src,
1065 CompletableFuture<?> snd,
1066 Runnable fn,
1067 CompletableFuture<Void> dst, Executor executor) {
1068 this.src = src; this.snd = snd;
1069 this.fn = fn; this.dst = dst;
1070 this.executor = executor;
1071 }
1072 public final void run() {
1073 final CompletableFuture<? extends T> a;
1074 final CompletableFuture<?> b;
1075 final Runnable fn;
1076 final CompletableFuture<Void> dst;
1077 Object r; Throwable ex;
1078 if ((dst = this.dst) != null &&
1079 (fn = this.fn) != null &&
1080 (((a = this.src) != null && (r = a.result) != null) ||
1081 ((b = this.snd) != null && (r = b.result) != null)) &&
1082 compareAndSet(0, 1)) {
1083 if (r instanceof AltResult)
1084 ex = ((AltResult)r).ex;
1085 else
1086 ex = null;
1087 Executor e = executor;
1088 if (ex == null) {
1089 try {
1090 if (e != null)
1091 e.execute(new AsyncRun(fn, dst));
1092 else
1093 fn.run();
1094 } catch (Throwable rex) {
1095 ex = rex;
1096 }
1097 }
1098 if (e == null || ex != null)
1099 dst.internalComplete(null, ex);
1100 }
1101 }
1102 private static final long serialVersionUID = 5232453952276885070L;
1103 }
1104
1105 static final class OrCompletion extends Completion {
1106 final CompletableFuture<?> src;
1107 final CompletableFuture<?> snd;
1108 final CompletableFuture<?> dst;
1109 OrCompletion(CompletableFuture<?> src,
1110 CompletableFuture<?> snd,
1111 CompletableFuture<?> dst) {
1112 this.src = src; this.snd = snd; this.dst = dst;
1113 }
1114 public final void run() {
1115 final CompletableFuture<?> a;
1116 final CompletableFuture<?> b;
1117 final CompletableFuture<?> dst;
1118 Object r, t; Throwable ex;
1119 if ((dst = this.dst) != null &&
1120 (((a = this.src) != null && (r = a.result) != null) ||
1121 ((b = this.snd) != null && (r = b.result) != null)) &&
1122 compareAndSet(0, 1)) {
1123 if (r instanceof AltResult) {
1124 ex = ((AltResult)r).ex;
1125 t = null;
1126 }
1127 else {
1128 ex = null;
1129 t = r;
1130 }
1131 dst.internalComplete(t, ex);
1132 }
1133 }
1134 private static final long serialVersionUID = 5232453952276885070L;
1135 }
1136
1137 static final class ExceptionCompletion<T> extends Completion {
1138 final CompletableFuture<? extends T> src;
1139 final Function<? super Throwable, ? extends T> fn;
1140 final CompletableFuture<T> dst;
1141 ExceptionCompletion(CompletableFuture<? extends T> src,
1142 Function<? super Throwable, ? extends T> fn,
1143 CompletableFuture<T> dst) {
1144 this.src = src; this.fn = fn; this.dst = dst;
1145 }
1146 public final void run() {
1147 final CompletableFuture<? extends T> a;
1148 final Function<? super Throwable, ? extends T> fn;
1149 final CompletableFuture<T> dst;
1150 Object r; T t = null; Throwable ex, dx = null;
1151 if ((dst = this.dst) != null &&
1152 (fn = this.fn) != null &&
1153 (a = this.src) != null &&
1154 (r = a.result) != null &&
1155 compareAndSet(0, 1)) {
1156 if ((r instanceof AltResult) &&
1157 (ex = ((AltResult)r).ex) != null) {
1158 try {
1159 t = fn.apply(ex);
1160 } catch (Throwable rex) {
1161 dx = rex;
1162 }
1163 }
1164 else {
1165 @SuppressWarnings("unchecked") T tr = (T) r;
1166 t = tr;
1167 }
1168 dst.internalComplete(t, dx);
1169 }
1170 }
1171 private static final long serialVersionUID = 5232453952276885070L;
1172 }
1173
1174 static final class ThenCopy extends Completion {
1175 final CompletableFuture<?> src;
1176 final CompletableFuture<?> dst;
1177 ThenCopy(CompletableFuture<?> src,
1178 CompletableFuture<?> dst) {
1179 this.src = src; this.dst = dst;
1180 }
1181 public final void run() {
1182 final CompletableFuture<?> a;
1183 final CompletableFuture<?> dst;
1184 Object r; Object t; Throwable ex;
1185 if ((dst = this.dst) != null &&
1186 (a = this.src) != null &&
1187 (r = a.result) != null &&
1188 compareAndSet(0, 1)) {
1189 if (r instanceof AltResult) {
1190 ex = ((AltResult)r).ex;
1191 t = null;
1192 }
1193 else {
1194 ex = null;
1195 t = r;
1196 }
1197 dst.internalComplete(t, ex);
1198 }
1199 }
1200 private static final long serialVersionUID = 5232453952276885070L;
1201 }
1202
1203 static final class HandleCompletion<T,U> extends Completion {
1204 final CompletableFuture<? extends T> src;
1205 final BiFunction<? super T, Throwable, ? extends U> fn;
1206 final CompletableFuture<U> dst;
1207 HandleCompletion(CompletableFuture<? extends T> src,
1208 BiFunction<? super T, Throwable, ? extends U> fn,
1209 final CompletableFuture<U> dst) {
1210 this.src = src; this.fn = fn; this.dst = dst;
1211 }
1212 public final void run() {
1213 final CompletableFuture<? extends T> a;
1214 final BiFunction<? super T, Throwable, ? extends U> fn;
1215 final CompletableFuture<U> dst;
1216 Object r; T t; Throwable ex;
1217 if ((dst = this.dst) != null &&
1218 (fn = this.fn) != null &&
1219 (a = this.src) != null &&
1220 (r = a.result) != null &&
1221 compareAndSet(0, 1)) {
1222 if (r instanceof AltResult) {
1223 ex = ((AltResult)r).ex;
1224 t = null;
1225 }
1226 else {
1227 ex = null;
1228 @SuppressWarnings("unchecked") T tr = (T) r;
1229 t = tr;
1230 }
1231 U u = null; Throwable dx = null;
1232 try {
1233 u = fn.apply(t, ex);
1234 } catch (Throwable rex) {
1235 dx = rex;
1236 }
1237 dst.internalComplete(u, dx);
1238 }
1239 }
1240 private static final long serialVersionUID = 5232453952276885070L;
1241 }
1242
1243 static final class ComposeCompletion<T,U> extends Completion {
1244 final CompletableFuture<? extends T> src;
1245 final Function<? super T, CompletableFuture<U>> fn;
1246 final CompletableFuture<U> dst;
1247 final Executor executor;
1248 ComposeCompletion(CompletableFuture<? extends T> src,
1249 Function<? super T, CompletableFuture<U>> fn,
1250 final CompletableFuture<U> dst, Executor executor) {
1251 this.src = src; this.fn = fn; this.dst = dst;
1252 this.executor = executor;
1253 }
1254 public final void run() {
1255 final CompletableFuture<? extends T> a;
1256 final Function<? super T, CompletableFuture<U>> fn;
1257 final CompletableFuture<U> dst;
1258 Object r; T t; Throwable ex; Executor e;
1259 if ((dst = this.dst) != null &&
1260 (fn = this.fn) != null &&
1261 (a = this.src) != null &&
1262 (r = a.result) != null &&
1263 compareAndSet(0, 1)) {
1264 if (r instanceof AltResult) {
1265 ex = ((AltResult)r).ex;
1266 t = null;
1267 }
1268 else {
1269 ex = null;
1270 @SuppressWarnings("unchecked") T tr = (T) r;
1271 t = tr;
1272 }
1273 CompletableFuture<U> c = null;
1274 U u = null;
1275 boolean complete = false;
1276 if (ex == null) {
1277 if ((e = executor) != null)
1278 e.execute(new AsyncCompose<T,U>(t, fn, dst));
1279 else {
1280 try {
1281 if ((c = fn.apply(t)) == null)
1282 ex = new NullPointerException();
1283 } catch (Throwable rex) {
1284 ex = rex;
1285 }
1286 }
1287 }
1288 if (c != null) {
1289 ThenCopy d = null;
1290 Object s;
1291 if ((s = c.result) == null) {
1292 CompletionNode p = new CompletionNode
1293 (d = new ThenCopy(c, dst));
1294 while ((s = c.result) == null) {
1295 if (UNSAFE.compareAndSwapObject
1296 (c, COMPLETIONS, p.next = c.completions, p))
1297 break;
1298 }
1299 }
1300 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1301 complete = true;
1302 if (s instanceof AltResult) {
1303 ex = ((AltResult)s).ex; // no rewrap
1304 u = null;
1305 }
1306 else {
1307 @SuppressWarnings("unchecked") U us = (U) s;
1308 u = us;
1309 }
1310 }
1311 }
1312 if (complete || ex != null)
1313 dst.internalComplete(u, ex);
1314 if (c != null)
1315 c.helpPostComplete();
1316 }
1317 }
1318 private static final long serialVersionUID = 5232453952276885070L;
1319 }
1320
1321 // public methods
1322
1323 /**
1324 * Creates a new incomplete CompletableFuture.
1325 */
1326 public CompletableFuture() {
1327 }
1328
1329 /**
1330 * Asynchronously executes in the {@link
1331 * ForkJoinPool#commonPool()}, a task that completes the returned
1332 * CompletableFuture with the result of the given Supplier.
1333 *
1334 * @param supplier a function returning the value to be used
1335 * to complete the returned CompletableFuture
1336 * @return the CompletableFuture
1337 */
1338 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1339 if (supplier == null) throw new NullPointerException();
1340 CompletableFuture<U> f = new CompletableFuture<U>();
1341 ForkJoinPool.commonPool().
1342 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1343 return f;
1344 }
1345
1346 /**
1347 * Asynchronously executes using the given executor, a task that
1348 * completes the returned CompletableFuture with the result of the
1349 * given Supplier.
1350 *
1351 * @param supplier a function returning the value to be used
1352 * to complete the returned CompletableFuture
1353 * @param executor the executor to use for asynchronous execution
1354 * @return the CompletableFuture
1355 */
1356 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1357 Executor executor) {
1358 if (executor == null || supplier == null)
1359 throw new NullPointerException();
1360 CompletableFuture<U> f = new CompletableFuture<U>();
1361 executor.execute(new AsyncSupply<U>(supplier, f));
1362 return f;
1363 }
1364
1365 /**
1366 * Asynchronously executes in the {@link
1367 * ForkJoinPool#commonPool()} a task that runs the given action,
1368 * and then completes the returned CompletableFuture.
1369 *
1370 * @param runnable the action to run before completing the
1371 * returned CompletableFuture
1372 * @return the CompletableFuture
1373 */
1374 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1375 if (runnable == null) throw new NullPointerException();
1376 CompletableFuture<Void> f = new CompletableFuture<Void>();
1377 ForkJoinPool.commonPool().
1378 execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1379 return f;
1380 }
1381
1382 /**
1383 * Asynchronously executes using the given executor, a task that
1384 * runs the given action, and then completes the returned
1385 * CompletableFuture.
1386 *
1387 * @param runnable the action to run before completing the
1388 * returned CompletableFuture
1389 * @param executor the executor to use for asynchronous execution
1390 * @return the CompletableFuture
1391 */
1392 public static CompletableFuture<Void> runAsync(Runnable runnable,
1393 Executor executor) {
1394 if (executor == null || runnable == null)
1395 throw new NullPointerException();
1396 CompletableFuture<Void> f = new CompletableFuture<Void>();
1397 executor.execute(new AsyncRun(runnable, f));
1398 return f;
1399 }
1400
1401 /**
1402 * Returns {@code true} if completed in any fashion: normally,
1403 * exceptionally, or via cancellation.
1404 *
1405 * @return {@code true} if completed
1406 */
1407 public boolean isDone() {
1408 return result != null;
1409 }
1410
1411 /**
1412 * Waits if necessary for the computation to complete, and then
1413 * retrieves its result.
1414 *
1415 * @return the computed result
1416 * @throws CancellationException if the computation was cancelled
1417 * @throws ExecutionException if the computation threw an
1418 * exception
1419 * @throws InterruptedException if the current thread was interrupted
1420 * while waiting
1421 */
1422 public T get() throws InterruptedException, ExecutionException {
1423 Object r; Throwable ex, cause;
1424 if ((r = result) == null && (r = waitingGet(true)) == null)
1425 throw new InterruptedException();
1426 if (!(r instanceof AltResult)) {
1427 @SuppressWarnings("unchecked") T tr = (T) r;
1428 return tr;
1429 }
1430 if ((ex = ((AltResult)r).ex) == null)
1431 return null;
1432 if (ex instanceof CancellationException)
1433 throw (CancellationException)ex;
1434 if ((ex instanceof CompletionException) &&
1435 (cause = ex.getCause()) != null)
1436 ex = cause;
1437 throw new ExecutionException(ex);
1438 }
1439
1440 /**
1441 * Waits if necessary for at most the given time for completion,
1442 * and then retrieves its result, if available.
1443 *
1444 * @param timeout the maximum time to wait
1445 * @param unit the time unit of the timeout argument
1446 * @return the computed result
1447 * @throws CancellationException if the computation was cancelled
1448 * @throws ExecutionException if the computation threw an
1449 * exception
1450 * @throws InterruptedException if the current thread was interrupted
1451 * while waiting
1452 * @throws TimeoutException if the wait timed out
1453 */
1454 public T get(long timeout, TimeUnit unit)
1455 throws InterruptedException, ExecutionException, TimeoutException {
1456 Object r; Throwable ex, cause;
1457 long nanos = unit.toNanos(timeout);
1458 if (Thread.interrupted())
1459 throw new InterruptedException();
1460 if ((r = result) == null)
1461 r = timedAwaitDone(nanos);
1462 if (!(r instanceof AltResult)) {
1463 @SuppressWarnings("unchecked") T tr = (T) r;
1464 return tr;
1465 }
1466 if ((ex = ((AltResult)r).ex) == null)
1467 return null;
1468 if (ex instanceof CancellationException)
1469 throw (CancellationException)ex;
1470 if ((ex instanceof CompletionException) &&
1471 (cause = ex.getCause()) != null)
1472 ex = cause;
1473 throw new ExecutionException(ex);
1474 }
1475
1476 /**
1477 * Returns the result value when complete, or throws an
1478 * (unchecked) exception if completed exceptionally. To better
1479 * conform with the use of common functional forms, if a
1480 * computation involved in the completion of this
1481 * CompletableFuture threw an exception, this method throws an
1482 * (unchecked) {@link CompletionException} with the underlying
1483 * exception as its cause.
1484 *
1485 * @return the result value
1486 * @throws CancellationException if the computation was cancelled
1487 * @throws CompletionException if a completion computation threw
1488 * an exception
1489 */
1490 public T join() {
1491 Object r; Throwable ex;
1492 if ((r = result) == null)
1493 r = waitingGet(false);
1494 if (!(r instanceof AltResult)) {
1495 @SuppressWarnings("unchecked") T tr = (T) r;
1496 return tr;
1497 }
1498 if ((ex = ((AltResult)r).ex) == null)
1499 return null;
1500 if (ex instanceof CancellationException)
1501 throw (CancellationException)ex;
1502 if (ex instanceof CompletionException)
1503 throw (CompletionException)ex;
1504 throw new CompletionException(ex);
1505 }
1506
1507 /**
1508 * Returns the result value (or throws any encountered exception)
1509 * if completed, else returns the given valueIfAbsent.
1510 *
1511 * @param valueIfAbsent the value to return if not completed
1512 * @return the result value, if completed, else the given valueIfAbsent
1513 * @throws CancellationException if the computation was cancelled
1514 * @throws CompletionException if a completion computation threw
1515 * an exception
1516 */
1517 public T getNow(T valueIfAbsent) {
1518 Object r; Throwable ex;
1519 if ((r = result) == null)
1520 return valueIfAbsent;
1521 if (!(r instanceof AltResult)) {
1522 @SuppressWarnings("unchecked") T tr = (T) r;
1523 return tr;
1524 }
1525 if ((ex = ((AltResult)r).ex) == null)
1526 return null;
1527 if (ex instanceof CancellationException)
1528 throw (CancellationException)ex;
1529 if (ex instanceof CompletionException)
1530 throw (CompletionException)ex;
1531 throw new CompletionException(ex);
1532 }
1533
1534 /**
1535 * If not already completed, sets the value returned by {@link
1536 * #get()} and related methods to the given value.
1537 *
1538 * @param value the result value
1539 * @return {@code true} if this invocation caused this CompletableFuture
1540 * to transition to a completed state, else {@code false}
1541 */
1542 public boolean complete(T value) {
1543 boolean triggered = result == null &&
1544 UNSAFE.compareAndSwapObject(this, RESULT, null,
1545 value == null ? NIL : value);
1546 postComplete();
1547 return triggered;
1548 }
1549
1550 /**
1551 * If not already completed, causes invocations of {@link #get()}
1552 * and related methods to throw the given exception.
1553 *
1554 * @param ex the exception
1555 * @return {@code true} if this invocation caused this CompletableFuture
1556 * to transition to a completed state, else {@code false}
1557 */
1558 public boolean completeExceptionally(Throwable ex) {
1559 if (ex == null) throw new NullPointerException();
1560 boolean triggered = result == null &&
1561 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1562 postComplete();
1563 return triggered;
1564 }
1565
1566 /**
1567 * Creates and returns a CompletableFuture that is completed with
1568 * the result of the given function of this CompletableFuture.
1569 * If this CompletableFuture completes exceptionally,
1570 * then the returned CompletableFuture also does so,
1571 * with a CompletionException holding this exception as
1572 * its cause.
1573 *
1574 * @param fn the function to use to compute the value of
1575 * the returned CompletableFuture
1576 * @return the new CompletableFuture
1577 */
1578 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
1579 return doThenApply(fn, null);
1580 }
1581
1582 /**
1583 * Creates and returns a CompletableFuture that is asynchronously
1584 * completed using the {@link ForkJoinPool#commonPool()} with the
1585 * result of the given function of this CompletableFuture. If
1586 * this CompletableFuture completes exceptionally, then the
1587 * returned CompletableFuture also does so, with a
1588 * CompletionException holding this exception as its cause.
1589 *
1590 * @param fn the function to use to compute the value of
1591 * the returned CompletableFuture
1592 * @return the new CompletableFuture
1593 */
1594 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {
1595 return doThenApply(fn, ForkJoinPool.commonPool());
1596 }
1597
1598 /**
1599 * Creates and returns a CompletableFuture that is asynchronously
1600 * completed using the given executor with the result of the given
1601 * function of this CompletableFuture. If this CompletableFuture
1602 * completes exceptionally, then the returned CompletableFuture
1603 * also does so, with a CompletionException holding this exception as
1604 * its cause.
1605 *
1606 * @param fn the function to use to compute the value of
1607 * the returned CompletableFuture
1608 * @param executor the executor to use for asynchronous execution
1609 * @return the new CompletableFuture
1610 */
1611 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,
1612 Executor executor) {
1613 if (executor == null) throw new NullPointerException();
1614 return doThenApply(fn, executor);
1615 }
1616
1617 private <U> CompletableFuture<U> doThenApply(Function<? super T,? extends U> fn,
1618 Executor e) {
1619 if (fn == null) throw new NullPointerException();
1620 CompletableFuture<U> dst = new CompletableFuture<U>();
1621 ApplyCompletion<T,U> d = null;
1622 Object r;
1623 if ((r = result) == null) {
1624 CompletionNode p = new CompletionNode
1625 (d = new ApplyCompletion<T,U>(this, fn, dst, e));
1626 while ((r = result) == null) {
1627 if (UNSAFE.compareAndSwapObject
1628 (this, COMPLETIONS, p.next = completions, p))
1629 break;
1630 }
1631 }
1632 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1633 T t; Throwable ex;
1634 if (r instanceof AltResult) {
1635 ex = ((AltResult)r).ex;
1636 t = null;
1637 }
1638 else {
1639 ex = null;
1640 @SuppressWarnings("unchecked") T tr = (T) r;
1641 t = tr;
1642 }
1643 U u = null;
1644 if (ex == null) {
1645 try {
1646 if (e != null)
1647 e.execute(new AsyncApply<T,U>(t, fn, dst));
1648 else
1649 u = fn.apply(t);
1650 } catch (Throwable rex) {
1651 ex = rex;
1652 }
1653 }
1654 if (e == null || ex != null)
1655 dst.internalComplete(u, ex);
1656 }
1657 helpPostComplete();
1658 return dst;
1659 }
1660
1661 /**
1662 * Creates and returns a CompletableFuture that is completed after
1663 * performing the given action with this CompletableFuture's
1664 * result when it completes. If this CompletableFuture
1665 * completes exceptionally, then the returned CompletableFuture
1666 * also does so, with a CompletionException holding this exception as
1667 * its cause.
1668 *
1669 * @param block the action to perform before completing the
1670 * returned CompletableFuture
1671 * @return the new CompletableFuture
1672 */
1673 public CompletableFuture<Void> thenAccept(Consumer<? super T> block) {
1674 return doThenAccept(block, null);
1675 }
1676
1677 /**
1678 * Creates and returns a CompletableFuture that is asynchronously
1679 * completed using the {@link ForkJoinPool#commonPool()} with this
1680 * CompletableFuture's result when it completes. If this
1681 * CompletableFuture completes exceptionally, then the returned
1682 * CompletableFuture also does so, with a CompletionException holding
1683 * this exception as its cause.
1684 *
1685 * @param block the action to perform before completing the
1686 * returned CompletableFuture
1687 * @return the new CompletableFuture
1688 */
1689 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block) {
1690 return doThenAccept(block, ForkJoinPool.commonPool());
1691 }
1692
1693 /**
1694 * Creates and returns a CompletableFuture that is asynchronously
1695 * completed using the given executor with this
1696 * CompletableFuture's result when it completes. If this
1697 * CompletableFuture completes exceptionally, then the returned
1698 * CompletableFuture also does so, with a CompletionException holding
1699 * this exception as its cause.
1700 *
1701 * @param block the action to perform before completing the
1702 * returned CompletableFuture
1703 * @param executor the executor to use for asynchronous execution
1704 * @return the new CompletableFuture
1705 */
1706 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block,
1707 Executor executor) {
1708 if (executor == null) throw new NullPointerException();
1709 return doThenAccept(block, executor);
1710 }
1711
1712 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1713 Executor e) {
1714 if (fn == null) throw new NullPointerException();
1715 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1716 AcceptCompletion<T> d = null;
1717 Object r;
1718 if ((r = result) == null) {
1719 CompletionNode p = new CompletionNode
1720 (d = new AcceptCompletion<T>(this, fn, dst, e));
1721 while ((r = result) == null) {
1722 if (UNSAFE.compareAndSwapObject
1723 (this, COMPLETIONS, p.next = completions, p))
1724 break;
1725 }
1726 }
1727 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1728 T t; Throwable ex;
1729 if (r instanceof AltResult) {
1730 ex = ((AltResult)r).ex;
1731 t = null;
1732 }
1733 else {
1734 ex = null;
1735 @SuppressWarnings("unchecked") T tr = (T) r;
1736 t = tr;
1737 }
1738 if (ex == null) {
1739 try {
1740 if (e != null)
1741 e.execute(new AsyncAccept<T>(t, fn, dst));
1742 else
1743 fn.accept(t);
1744 } catch (Throwable rex) {
1745 ex = rex;
1746 }
1747 }
1748 if (e == null || ex != null)
1749 dst.internalComplete(null, ex);
1750 }
1751 helpPostComplete();
1752 return dst;
1753 }
1754
1755 /**
1756 * Creates and returns a CompletableFuture that is completed after
1757 * performing the given action when this CompletableFuture
1758 * completes. If this CompletableFuture completes exceptionally,
1759 * then the returned CompletableFuture also does so, with a
1760 * CompletionException holding this exception as its cause.
1761 *
1762 * @param action the action to perform before completing the
1763 * returned CompletableFuture
1764 * @return the new CompletableFuture
1765 */
1766 public CompletableFuture<Void> thenRun(Runnable action) {
1767 return doThenRun(action, null);
1768 }
1769
1770 /**
1771 * Creates and returns a CompletableFuture that is asynchronously
1772 * completed using the {@link ForkJoinPool#commonPool()} after
1773 * performing the given action when this CompletableFuture
1774 * completes. If this CompletableFuture completes exceptionally,
1775 * then the returned CompletableFuture also does so, with a
1776 * CompletionException holding this exception as its cause.
1777 *
1778 * @param action the action to perform before completing the
1779 * returned CompletableFuture
1780 * @return the new CompletableFuture
1781 */
1782 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1783 return doThenRun(action, ForkJoinPool.commonPool());
1784 }
1785
1786 /**
1787 * Creates and returns a CompletableFuture that is asynchronously
1788 * completed using the given executor after performing the given
1789 * action when this CompletableFuture completes. If this
1790 * CompletableFuture completes exceptionally, then the returned
1791 * CompletableFuture also does so, with a CompletionException holding
1792 * this exception as its cause.
1793 *
1794 * @param action the action to perform before completing the
1795 * returned CompletableFuture
1796 * @param executor the executor to use for asynchronous execution
1797 * @return the new CompletableFuture
1798 */
1799 public CompletableFuture<Void> thenRunAsync(Runnable action,
1800 Executor executor) {
1801 if (executor == null) throw new NullPointerException();
1802 return doThenRun(action, executor);
1803 }
1804
1805 private CompletableFuture<Void> doThenRun(Runnable action,
1806 Executor e) {
1807 if (action == null) throw new NullPointerException();
1808 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1809 RunCompletion<T> d = null;
1810 Object r;
1811 if ((r = result) == null) {
1812 CompletionNode p = new CompletionNode
1813 (d = new RunCompletion<T>(this, action, dst, e));
1814 while ((r = result) == null) {
1815 if (UNSAFE.compareAndSwapObject
1816 (this, COMPLETIONS, p.next = completions, p))
1817 break;
1818 }
1819 }
1820 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1821 Throwable ex;
1822 if (r instanceof AltResult)
1823 ex = ((AltResult)r).ex;
1824 else
1825 ex = null;
1826 if (ex == null) {
1827 try {
1828 if (e != null)
1829 e.execute(new AsyncRun(action, dst));
1830 else
1831 action.run();
1832 } catch (Throwable rex) {
1833 ex = rex;
1834 }
1835 }
1836 if (e == null || ex != null)
1837 dst.internalComplete(null, ex);
1838 }
1839 helpPostComplete();
1840 return dst;
1841 }
1842
1843 /**
1844 * Creates and returns a CompletableFuture that is completed with
1845 * the result of the given function of this and the other given
1846 * CompletableFuture's results when both complete. If this or
1847 * the other CompletableFuture complete exceptionally, then the
1848 * returned CompletableFuture also does so, with a
1849 * CompletionException holding the exception as its cause.
1850 *
1851 * @param other the other CompletableFuture
1852 * @param fn the function to use to compute the value of
1853 * the returned CompletableFuture
1854 * @return the new CompletableFuture
1855 */
1856 public <U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other,
1857 BiFunction<? super T,? super U,? extends V> fn) {
1858 return doThenBiApply(other, fn, null);
1859 }
1860
1861 /**
1862 * Creates and returns a CompletableFuture that is asynchronously
1863 * completed using the {@link ForkJoinPool#commonPool()} with
1864 * the result of the given function of this and the other given
1865 * CompletableFuture's results when both complete. If this or
1866 * the other CompletableFuture complete exceptionally, then the
1867 * returned CompletableFuture also does so, with a
1868 * CompletionException holding the exception as its cause.
1869 *
1870 * @param other the other CompletableFuture
1871 * @param fn the function to use to compute the value of
1872 * the returned CompletableFuture
1873 * @return the new CompletableFuture
1874 */
1875 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
1876 BiFunction<? super T,? super U,? extends V> fn) {
1877 return doThenBiApply(other, fn, ForkJoinPool.commonPool());
1878 }
1879
1880 /**
1881 * Creates and returns a CompletableFuture that is
1882 * asynchronously completed using the given executor with the
1883 * result of the given function of this and the other given
1884 * CompletableFuture's results when both complete. If this or
1885 * the other CompletableFuture complete exceptionally, then the
1886 * returned CompletableFuture also does so, with a
1887 * CompletionException holding the exception as its cause.
1888 *
1889 * @param other the other CompletableFuture
1890 * @param fn the function to use to compute the value of
1891 * the returned CompletableFuture
1892 * @param executor the executor to use for asynchronous execution
1893 * @return the new CompletableFuture
1894 */
1895
1896 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
1897 BiFunction<? super T,? super U,? extends V> fn,
1898 Executor executor) {
1899 if (executor == null) throw new NullPointerException();
1900 return doThenBiApply(other, fn, executor);
1901 }
1902
1903 private <U,V> CompletableFuture<V> doThenBiApply(CompletableFuture<? extends U> other,
1904 BiFunction<? super T,? super U,? extends V> fn,
1905 Executor e) {
1906 if (other == null || fn == null) throw new NullPointerException();
1907 CompletableFuture<V> dst = new CompletableFuture<V>();
1908 BiApplyCompletion<T,U,V> d = null;
1909 Object r, s = null;
1910 if ((r = result) == null || (s = other.result) == null) {
1911 d = new BiApplyCompletion<T,U,V>(this, other, fn, dst, e);
1912 CompletionNode q = null, p = new CompletionNode(d);
1913 while ((r == null && (r = result) == null) ||
1914 (s == null && (s = other.result) == null)) {
1915 if (q != null) {
1916 if (s != null ||
1917 UNSAFE.compareAndSwapObject
1918 (other, COMPLETIONS, q.next = other.completions, q))
1919 break;
1920 }
1921 else if (r != null ||
1922 UNSAFE.compareAndSwapObject
1923 (this, COMPLETIONS, p.next = completions, p)) {
1924 if (s != null)
1925 break;
1926 q = new CompletionNode(d);
1927 }
1928 }
1929 }
1930 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1931 T t; U u; Throwable ex;
1932 if (r instanceof AltResult) {
1933 ex = ((AltResult)r).ex;
1934 t = null;
1935 }
1936 else {
1937 ex = null;
1938 @SuppressWarnings("unchecked") T tr = (T) r;
1939 t = tr;
1940 }
1941 if (ex != null)
1942 u = null;
1943 else if (s instanceof AltResult) {
1944 ex = ((AltResult)s).ex;
1945 u = null;
1946 }
1947 else {
1948 @SuppressWarnings("unchecked") U us = (U) s;
1949 u = us;
1950 }
1951 V v = null;
1952 if (ex == null) {
1953 try {
1954 if (e != null)
1955 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
1956 else
1957 v = fn.apply(t, u);
1958 } catch (Throwable rex) {
1959 ex = rex;
1960 }
1961 }
1962 if (e == null || ex != null)
1963 dst.internalComplete(v, ex);
1964 }
1965 helpPostComplete();
1966 other.helpPostComplete();
1967 return dst;
1968 }
1969
1970 /**
1971 * Creates and returns a CompletableFuture that is completed with
1972 * the results of this and the other given CompletableFuture if
1973 * both complete. If this and/or the other CompletableFuture
1974 * complete exceptionally, then the returned CompletableFuture
1975 * also does so, with a CompletionException holding one of these
1976 * exceptions as its cause.
1977 *
1978 * @param other the other CompletableFuture
1979 * @param block the action to perform before completing the
1980 * returned CompletableFuture
1981 * @return the new CompletableFuture
1982 */
1983 public <U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other,
1984 BiConsumer<? super T, ? super U> block) {
1985 return doThenBiAccept(other, block, null);
1986 }
1987
1988 /**
1989 * Creates and returns a CompletableFuture that is completed
1990 * asynchronously using the {@link ForkJoinPool#commonPool()} with
1991 * the results of this and the other given CompletableFuture when
1992 * both complete. If this and/or the other CompletableFuture
1993 * complete exceptionally, then the returned CompletableFuture
1994 * also does so, with a CompletionException holding one of these
1995 * exceptions as its cause.
1996 *
1997 * @param other the other CompletableFuture
1998 * @param block the action to perform before completing the
1999 * returned CompletableFuture
2000 * @return the new CompletableFuture
2001 */
2002 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
2003 BiConsumer<? super T, ? super U> block) {
2004 return doThenBiAccept(other, block, ForkJoinPool.commonPool());
2005 }
2006
2007 /**
2008 * Creates and returns a CompletableFuture that is completed
2009 * asynchronously using the given executor with the results of
2010 * this and the other given CompletableFuture when both complete.
2011 * If this and/or the other CompletableFuture complete
2012 * exceptionally, then the returned CompletableFuture also does
2013 * so, with a CompletionException holding one of these exceptions as
2014 * its cause.
2015 *
2016 * @param other the other CompletableFuture
2017 * @param block the action to perform before completing the
2018 * returned CompletableFuture
2019 * @param executor the executor to use for asynchronous execution
2020 * @return the new CompletableFuture
2021 */
2022 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
2023 BiConsumer<? super T, ? super U> block,
2024 Executor executor) {
2025 if (executor == null) throw new NullPointerException();
2026 return doThenBiAccept(other, block, executor);
2027 }
2028
2029 private <U> CompletableFuture<Void> doThenBiAccept(CompletableFuture<? extends U> other,
2030 BiConsumer<? super T,? super U> fn,
2031 Executor e) {
2032 if (other == null || fn == null) throw new NullPointerException();
2033 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2034 BiAcceptCompletion<T,U> d = null;
2035 Object r, s = null;
2036 if ((r = result) == null || (s = other.result) == null) {
2037 d = new BiAcceptCompletion<T,U>(this, other, fn, dst, e);
2038 CompletionNode q = null, p = new CompletionNode(d);
2039 while ((r == null && (r = result) == null) ||
2040 (s == null && (s = other.result) == null)) {
2041 if (q != null) {
2042 if (s != null ||
2043 UNSAFE.compareAndSwapObject
2044 (other, COMPLETIONS, q.next = other.completions, q))
2045 break;
2046 }
2047 else if (r != null ||
2048 UNSAFE.compareAndSwapObject
2049 (this, COMPLETIONS, p.next = completions, p)) {
2050 if (s != null)
2051 break;
2052 q = new CompletionNode(d);
2053 }
2054 }
2055 }
2056 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2057 T t; U u; Throwable ex;
2058 if (r instanceof AltResult) {
2059 ex = ((AltResult)r).ex;
2060 t = null;
2061 }
2062 else {
2063 ex = null;
2064 @SuppressWarnings("unchecked") T tr = (T) r;
2065 t = tr;
2066 }
2067 if (ex != null)
2068 u = null;
2069 else if (s instanceof AltResult) {
2070 ex = ((AltResult)s).ex;
2071 u = null;
2072 }
2073 else {
2074 @SuppressWarnings("unchecked") U us = (U) s;
2075 u = us;
2076 }
2077 if (ex == null) {
2078 try {
2079 if (e != null)
2080 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
2081 else
2082 fn.accept(t, u);
2083 } catch (Throwable rex) {
2084 ex = rex;
2085 }
2086 }
2087 if (e == null || ex != null)
2088 dst.internalComplete(null, ex);
2089 }
2090 helpPostComplete();
2091 other.helpPostComplete();
2092 return dst;
2093 }
2094
2095 /**
2096 * Creates and returns a CompletableFuture that is completed
2097 * when this and the other given CompletableFuture both
2098 * complete. If this and/or the other CompletableFuture complete
2099 * exceptionally, then the returned CompletableFuture also does
2100 * so, with a CompletionException holding one of these exceptions as
2101 * its cause.
2102 *
2103 * @param other the other CompletableFuture
2104 * @param action the action to perform before completing the
2105 * returned CompletableFuture
2106 * @return the new CompletableFuture
2107 */
2108 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2109 Runnable action) {
2110 return doThenBiRun(other, action, null);
2111 }
2112
2113 /**
2114 * Creates and returns a CompletableFuture that is completed
2115 * asynchronously using the {@link ForkJoinPool#commonPool()}
2116 * when this and the other given CompletableFuture both
2117 * complete. If this and/or the other CompletableFuture complete
2118 * exceptionally, then the returned CompletableFuture also does
2119 * so, with a CompletionException holding one of these exceptions as
2120 * its cause.
2121 *
2122 * @param other the other CompletableFuture
2123 * @param action the action to perform before completing the
2124 * returned CompletableFuture
2125 * @return the new CompletableFuture
2126 */
2127 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2128 Runnable action) {
2129 return doThenBiRun(other, action, ForkJoinPool.commonPool());
2130 }
2131
2132 /**
2133 * Creates and returns a CompletableFuture that is completed
2134 * asynchronously using the given executor
2135 * when this and the other given CompletableFuture both
2136 * complete. If this and/or the other CompletableFuture complete
2137 * exceptionally, then the returned CompletableFuture also does
2138 * so, with a CompletionException holding one of these exceptions as
2139 * its cause.
2140 *
2141 * @param other the other CompletableFuture
2142 * @param action the action to perform before completing the
2143 * returned CompletableFuture
2144 * @param executor the executor to use for asynchronous execution
2145 * @return the new CompletableFuture
2146 */
2147 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2148 Runnable action,
2149 Executor executor) {
2150 if (executor == null) throw new NullPointerException();
2151 return doThenBiRun(other, action, executor);
2152 }
2153
2154 private CompletableFuture<Void> doThenBiRun(CompletableFuture<?> other,
2155 Runnable action,
2156 Executor e) {
2157 if (other == null || action == null) throw new NullPointerException();
2158 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2159 BiRunCompletion<T> d = null;
2160 Object r, s = null;
2161 if ((r = result) == null || (s = other.result) == null) {
2162 d = new BiRunCompletion<T>(this, other, action, dst, e);
2163 CompletionNode q = null, p = new CompletionNode(d);
2164 while ((r == null && (r = result) == null) ||
2165 (s == null && (s = other.result) == null)) {
2166 if (q != null) {
2167 if (s != null ||
2168 UNSAFE.compareAndSwapObject
2169 (other, COMPLETIONS, q.next = other.completions, q))
2170 break;
2171 }
2172 else if (r != null ||
2173 UNSAFE.compareAndSwapObject
2174 (this, COMPLETIONS, p.next = completions, p)) {
2175 if (s != null)
2176 break;
2177 q = new CompletionNode(d);
2178 }
2179 }
2180 }
2181 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2182 Throwable ex;
2183 if (r instanceof AltResult)
2184 ex = ((AltResult)r).ex;
2185 else
2186 ex = null;
2187 if (ex == null && (s instanceof AltResult))
2188 ex = ((AltResult)s).ex;
2189 if (ex == null) {
2190 try {
2191 if (e != null)
2192 e.execute(new AsyncRun(action, dst));
2193 else
2194 action.run();
2195 } catch (Throwable rex) {
2196 ex = rex;
2197 }
2198 }
2199 if (e == null || ex != null)
2200 dst.internalComplete(null, ex);
2201 }
2202 helpPostComplete();
2203 other.helpPostComplete();
2204 return dst;
2205 }
2206
2207 /**
2208 * Creates and returns a CompletableFuture that is completed with
2209 * the result of the given function of either this or the other
2210 * given CompletableFuture's results when either complete. If
2211 * this and/or the other CompletableFuture complete exceptionally,
2212 * then the returned CompletableFuture may also do so, with a
2213 * CompletionException holding one of these exceptions as its cause.
2214 * No guarantees are made about which result or exception is used
2215 * in the returned CompletableFuture.
2216 *
2217 * @param other the other CompletableFuture
2218 * @param fn the function to use to compute the value of
2219 * the returned CompletableFuture
2220 * @return the new CompletableFuture
2221 */
2222 public <U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other,
2223 Function<? super T, U> fn) {
2224 return doOrApply(other, fn, null);
2225 }
2226
2227 /**
2228 * Creates and returns a CompletableFuture that is completed
2229 * asynchronously using the {@link ForkJoinPool#commonPool()} with
2230 * the result of the given function of either this or the other
2231 * given CompletableFuture's results when either complete. If
2232 * this and/or the other CompletableFuture complete exceptionally,
2233 * then the returned CompletableFuture may also do so, with a
2234 * CompletionException holding one of these exceptions as its cause.
2235 * No guarantees are made about which result or exception is used
2236 * in the returned CompletableFuture.
2237 *
2238 * @param other the other CompletableFuture
2239 * @param fn the function to use to compute the value of
2240 * the returned CompletableFuture
2241 * @return the new CompletableFuture
2242 */
2243 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
2244 Function<? super T, U> fn) {
2245 return doOrApply(other, fn, ForkJoinPool.commonPool());
2246 }
2247
2248 /**
2249 * Creates and returns a CompletableFuture that is completed
2250 * asynchronously using the given executor with the result of the
2251 * given function of either this or the other given
2252 * CompletableFuture's results when either complete. If this
2253 * and/or the other CompletableFuture complete exceptionally, then
2254 * the returned CompletableFuture may also do so, with a
2255 * CompletionException holding one of these exceptions as its cause.
2256 * No guarantees are made about which result or exception is used
2257 * in the returned CompletableFuture.
2258 *
2259 * @param other the other CompletableFuture
2260 * @param fn the function to use to compute the value of
2261 * the returned CompletableFuture
2262 * @param executor the executor to use for asynchronous execution
2263 * @return the new CompletableFuture
2264 */
2265 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
2266 Function<? super T, U> fn,
2267 Executor executor) {
2268 if (executor == null) throw new NullPointerException();
2269 return doOrApply(other, fn, executor);
2270 }
2271
2272 private <U> CompletableFuture<U> doOrApply(CompletableFuture<? extends T> other,
2273 Function<? super T, U> fn,
2274 Executor e) {
2275 if (other == null || fn == null) throw new NullPointerException();
2276 CompletableFuture<U> dst = new CompletableFuture<U>();
2277 OrApplyCompletion<T,U> d = null;
2278 Object r;
2279 if ((r = result) == null && (r = other.result) == null) {
2280 d = new OrApplyCompletion<T,U>(this, other, fn, dst, e);
2281 CompletionNode q = null, p = new CompletionNode(d);
2282 while ((r = result) == null && (r = other.result) == null) {
2283 if (q != null) {
2284 if (UNSAFE.compareAndSwapObject
2285 (other, COMPLETIONS, q.next = other.completions, q))
2286 break;
2287 }
2288 else if (UNSAFE.compareAndSwapObject
2289 (this, COMPLETIONS, p.next = completions, p))
2290 q = new CompletionNode(d);
2291 }
2292 }
2293 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2294 T t; Throwable ex;
2295 if (r instanceof AltResult) {
2296 ex = ((AltResult)r).ex;
2297 t = null;
2298 }
2299 else {
2300 ex = null;
2301 @SuppressWarnings("unchecked") T tr = (T) r;
2302 t = tr;
2303 }
2304 U u = null;
2305 if (ex == null) {
2306 try {
2307 if (e != null)
2308 e.execute(new AsyncApply<T,U>(t, fn, dst));
2309 else
2310 u = fn.apply(t);
2311 } catch (Throwable rex) {
2312 ex = rex;
2313 }
2314 }
2315 if (e == null || ex != null)
2316 dst.internalComplete(u, ex);
2317 }
2318 helpPostComplete();
2319 other.helpPostComplete();
2320 return dst;
2321 }
2322
2323 /**
2324 * Creates and returns a CompletableFuture that is completed after
2325 * performing the given action with the result of either this or the
2326 * other given CompletableFuture's result, when either complete.
2327 * If this and/or the other CompletableFuture complete
2328 * exceptionally, then the returned CompletableFuture may also do
2329 * so, with a CompletionException holding one of these exceptions as
2330 * its cause. No guarantees are made about which exception is
2331 * used in the returned CompletableFuture.
2332 *
2333 * @param other the other CompletableFuture
2334 * @param block the action to perform before completing the
2335 * returned CompletableFuture
2336 * @return the new CompletableFuture
2337 */
2338 public CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other,
2339 Consumer<? super T> block) {
2340 return doOrAccept(other, block, null);
2341 }
2342
2343 /**
2344 * Creates and returns a CompletableFuture that is completed
2345 * asynchronously using the {@link ForkJoinPool#commonPool()},
2346 * performing the given action with the result of either this or
2347 * the other given CompletableFuture's result, when either
2348 * complete. If this and/or the other CompletableFuture complete
2349 * exceptionally, then the returned CompletableFuture may also do
2350 * so, with a CompletionException holding one of these exceptions as
2351 * its cause. No guarantees are made about which exception is
2352 * used in the returned CompletableFuture.
2353 *
2354 * @param other the other CompletableFuture
2355 * @param block the action to perform before completing the
2356 * returned CompletableFuture
2357 * @return the new CompletableFuture
2358 */
2359 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
2360 Consumer<? super T> block) {
2361 return doOrAccept(other, block, ForkJoinPool.commonPool());
2362 }
2363
2364 /**
2365 * Creates and returns a CompletableFuture that is completed
2366 * asynchronously using the given executor,
2367 * performing the given action with the result of either this or
2368 * the other given CompletableFuture's result, when either
2369 * complete. If this and/or the other CompletableFuture complete
2370 * exceptionally, then the returned CompletableFuture may also do
2371 * so, with a CompletionException holding one of these exceptions as
2372 * its cause. No guarantees are made about which exception is
2373 * used in the returned CompletableFuture.
2374 *
2375 * @param other the other CompletableFuture
2376 * @param block the action to perform before completing the
2377 * returned CompletableFuture
2378 * @param executor the executor to use for asynchronous execution
2379 * @return the new CompletableFuture
2380 */
2381 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
2382 Consumer<? super T> block,
2383 Executor executor) {
2384 if (executor == null) throw new NullPointerException();
2385 return doOrAccept(other, block, executor);
2386 }
2387
2388 private CompletableFuture<Void> doOrAccept(CompletableFuture<? extends T> other,
2389 Consumer<? super T> fn,
2390 Executor e) {
2391 if (other == null || fn == null) throw new NullPointerException();
2392 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2393 OrAcceptCompletion<T> d = null;
2394 Object r;
2395 if ((r = result) == null && (r = other.result) == null) {
2396 d = new OrAcceptCompletion<T>(this, other, fn, dst, e);
2397 CompletionNode q = null, p = new CompletionNode(d);
2398 while ((r = result) == null && (r = other.result) == null) {
2399 if (q != null) {
2400 if (UNSAFE.compareAndSwapObject
2401 (other, COMPLETIONS, q.next = other.completions, q))
2402 break;
2403 }
2404 else if (UNSAFE.compareAndSwapObject
2405 (this, COMPLETIONS, p.next = completions, p))
2406 q = new CompletionNode(d);
2407 }
2408 }
2409 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2410 T t; Throwable ex;
2411 if (r instanceof AltResult) {
2412 ex = ((AltResult)r).ex;
2413 t = null;
2414 }
2415 else {
2416 ex = null;
2417 @SuppressWarnings("unchecked") T tr = (T) r;
2418 t = tr;
2419 }
2420 if (ex == null) {
2421 try {
2422 if (e != null)
2423 e.execute(new AsyncAccept<T>(t, fn, dst));
2424 else
2425 fn.accept(t);
2426 } catch (Throwable rex) {
2427 ex = rex;
2428 }
2429 }
2430 if (e == null || ex != null)
2431 dst.internalComplete(null, ex);
2432 }
2433 helpPostComplete();
2434 other.helpPostComplete();
2435 return dst;
2436 }
2437
2438 /**
2439 * Creates and returns a CompletableFuture that is completed
2440 * after this or the other given CompletableFuture complete. If
2441 * this and/or the other CompletableFuture complete exceptionally,
2442 * then the returned CompletableFuture may also do so, with a
2443 * CompletionException holding one of these exceptions as its cause.
2444 * No guarantees are made about which exception is used in the
2445 * returned CompletableFuture.
2446 *
2447 * @param other the other CompletableFuture
2448 * @param action the action to perform before completing the
2449 * returned CompletableFuture
2450 * @return the new CompletableFuture
2451 */
2452 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2453 Runnable action) {
2454 return doOrRun(other, action, null);
2455 }
2456
2457 /**
2458 * Creates and returns a CompletableFuture that is completed
2459 * asynchronously using the {@link ForkJoinPool#commonPool()}
2460 * after this or the other given CompletableFuture complete. If
2461 * this and/or the other CompletableFuture complete exceptionally,
2462 * then the returned CompletableFuture may also do so, with a
2463 * CompletionException holding one of these exceptions as its cause.
2464 * No guarantees are made about which exception is used in the
2465 * returned CompletableFuture.
2466 *
2467 * @param other the other CompletableFuture
2468 * @param action the action to perform before completing the
2469 * returned CompletableFuture
2470 * @return the new CompletableFuture
2471 */
2472 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
2473 Runnable action) {
2474 return doOrRun(other, action, ForkJoinPool.commonPool());
2475 }
2476
2477 /**
2478 * Creates and returns a CompletableFuture that is completed
2479 * asynchronously using the given executor after this or the other
2480 * given CompletableFuture complete. If this and/or the other
2481 * CompletableFuture complete exceptionally, then the returned
2482 * CompletableFuture may also do so, with a CompletionException
2483 * holding one of these exceptions as its cause. No guarantees are
2484 * made about which exception is used in the returned
2485 * CompletableFuture.
2486 *
2487 * @param other the other CompletableFuture
2488 * @param action the action to perform before completing the
2489 * returned CompletableFuture
2490 * @param executor the executor to use for asynchronous execution
2491 * @return the new CompletableFuture
2492 */
2493 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
2494 Runnable action,
2495 Executor executor) {
2496 if (executor == null) throw new NullPointerException();
2497 return doOrRun(other, action, executor);
2498 }
2499
2500 private CompletableFuture<Void> doOrRun(CompletableFuture<?> other,
2501 Runnable action,
2502 Executor e) {
2503 if (other == null || action == null) throw new NullPointerException();
2504 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2505 OrRunCompletion<T> d = null;
2506 Object r;
2507 if ((r = result) == null && (r = other.result) == null) {
2508 d = new OrRunCompletion<T>(this, other, action, dst, e);
2509 CompletionNode q = null, p = new CompletionNode(d);
2510 while ((r = result) == null && (r = other.result) == null) {
2511 if (q != null) {
2512 if (UNSAFE.compareAndSwapObject
2513 (other, COMPLETIONS, q.next = other.completions, q))
2514 break;
2515 }
2516 else if (UNSAFE.compareAndSwapObject
2517 (this, COMPLETIONS, p.next = completions, p))
2518 q = new CompletionNode(d);
2519 }
2520 }
2521 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2522 Throwable ex;
2523 if (r instanceof AltResult)
2524 ex = ((AltResult)r).ex;
2525 else
2526 ex = null;
2527 if (ex == null) {
2528 try {
2529 if (e != null)
2530 e.execute(new AsyncRun(action, dst));
2531 else
2532 action.run();
2533 } catch (Throwable rex) {
2534 ex = rex;
2535 }
2536 }
2537 if (e == null || ex != null)
2538 dst.internalComplete(null, ex);
2539 }
2540 helpPostComplete();
2541 other.helpPostComplete();
2542 return dst;
2543 }
2544
2545 /**
2546 * Returns a CompletableFuture (or an equivalent one) produced by
2547 * the given function of the result of this CompletableFuture when
2548 * completed. If this CompletableFuture completes exceptionally,
2549 * then the returned CompletableFuture also does so, with a
2550 * CompletionException holding this exception as its cause.
2551 *
2552 * @param fn the function returning a new CompletableFuture
2553 * @return the CompletableFuture, that {@code isDone()} upon
2554 * return if completed by the given function, or an exception
2555 * occurs
2556 */
2557 public <U> CompletableFuture<U> thenCompose(Function<? super T,
2558 CompletableFuture<U>> fn) {
2559 return doCompose(fn, null);
2560 }
2561
2562 /**
2563 * Returns a CompletableFuture (or an equivalent one) produced
2564 * asynchronously using the {@link ForkJoinPool#commonPool()} by
2565 * the given function of the result of this CompletableFuture when
2566 * completed. If this CompletableFuture completes exceptionally,
2567 * then the returned CompletableFuture also does so, with a
2568 * CompletionException holding this exception as its cause.
2569 *
2570 * @param fn the function returning a new CompletableFuture
2571 * @return the CompletableFuture, that {@code isDone()} upon
2572 * return if completed by the given function, or an exception
2573 * occurs
2574 */
2575 public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,
2576 CompletableFuture<U>> fn) {
2577 return doCompose(fn, ForkJoinPool.commonPool());
2578 }
2579
2580 /**
2581 * Returns a CompletableFuture (or an equivalent one) produced
2582 * asynchronously using the given executor by the given function
2583 * of the result of this CompletableFuture when completed.
2584 * If this CompletableFuture completes exceptionally, then the
2585 * returned CompletableFuture also does so, with a
2586 * CompletionException holding this exception as its cause.
2587 *
2588 * @param fn the function returning a new CompletableFuture
2589 * @param executor the executor to use for asynchronous execution
2590 * @return the CompletableFuture, that {@code isDone()} upon
2591 * return if completed by the given function, or an exception
2592 * occurs
2593 */
2594 public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,
2595 CompletableFuture<U>> fn,
2596 Executor executor) {
2597 if (executor == null) throw new NullPointerException();
2598 return doCompose(fn, executor);
2599 }
2600
2601 private <U> CompletableFuture<U> doCompose(Function<? super T,
2602 CompletableFuture<U>> fn,
2603 Executor e) {
2604 if (fn == null) throw new NullPointerException();
2605 CompletableFuture<U> dst = null;
2606 ComposeCompletion<T,U> d = null;
2607 Object r;
2608 if ((r = result) == null) {
2609 dst = new CompletableFuture<U>();
2610 CompletionNode p = new CompletionNode
2611 (d = new ComposeCompletion<T,U>(this, fn, dst, e));
2612 while ((r = result) == null) {
2613 if (UNSAFE.compareAndSwapObject
2614 (this, COMPLETIONS, p.next = completions, p))
2615 break;
2616 }
2617 }
2618 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2619 T t; Throwable ex;
2620 if (r instanceof AltResult) {
2621 ex = ((AltResult)r).ex;
2622 t = null;
2623 }
2624 else {
2625 ex = null;
2626 @SuppressWarnings("unchecked") T tr = (T) r;
2627 t = tr;
2628 }
2629 if (ex == null) {
2630 if (e != null) {
2631 if (dst == null)
2632 dst = new CompletableFuture<U>();
2633 e.execute(new AsyncCompose<T,U>(t, fn, dst));
2634 }
2635 else {
2636 try {
2637 dst = fn.apply(t);
2638 } catch (Throwable rex) {
2639 ex = rex;
2640 }
2641 if (dst == null) {
2642 dst = new CompletableFuture<U>();
2643 if (ex == null)
2644 ex = new NullPointerException();
2645 }
2646 }
2647 }
2648 if (e == null && ex != null)
2649 dst.internalComplete(null, ex);
2650 }
2651 helpPostComplete();
2652 dst.helpPostComplete();
2653 return dst;
2654 }
2655
2656 /**
2657 * Creates and returns a CompletableFuture that is completed with
2658 * the result of the given function of the exception triggering
2659 * this CompletableFuture's completion when it completes
2660 * exceptionally; Otherwise, if this CompletableFuture completes
2661 * normally, then the returned CompletableFuture also completes
2662 * normally with the same value.
2663 *
2664 * @param fn the function to use to compute the value of the
2665 * returned CompletableFuture if this CompletableFuture completed
2666 * exceptionally
2667 * @return the new CompletableFuture
2668 */
2669 public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {
2670 if (fn == null) throw new NullPointerException();
2671 CompletableFuture<T> dst = new CompletableFuture<T>();
2672 ExceptionCompletion<T> d = null;
2673 Object r;
2674 if ((r = result) == null) {
2675 CompletionNode p =
2676 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2677 while ((r = result) == null) {
2678 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2679 p.next = completions, p))
2680 break;
2681 }
2682 }
2683 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2684 T t = null; Throwable ex, dx = null;
2685 if (r instanceof AltResult) {
2686 if ((ex = ((AltResult)r).ex) != null) {
2687 try {
2688 t = fn.apply(ex);
2689 } catch (Throwable rex) {
2690 dx = rex;
2691 }
2692 }
2693 }
2694 else {
2695 @SuppressWarnings("unchecked") T tr = (T) r;
2696 t = tr;
2697 }
2698 dst.internalComplete(t, dx);
2699 }
2700 helpPostComplete();
2701 return dst;
2702 }
2703
2704 /**
2705 * Creates and returns a CompletableFuture that is completed with
2706 * the result of the given function of the result and exception of
2707 * this CompletableFuture's completion when it completes. The
2708 * given function is invoked with the result (or {@code null} if
2709 * none) and the exception (or {@code null} if none) of this
2710 * CompletableFuture when complete.
2711 *
2712 * @param fn the function to use to compute the value of the
2713 * returned CompletableFuture
2714
2715 * @return the new CompletableFuture
2716 */
2717 public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {
2718 if (fn == null) throw new NullPointerException();
2719 CompletableFuture<U> dst = new CompletableFuture<U>();
2720 HandleCompletion<T,U> d = null;
2721 Object r;
2722 if ((r = result) == null) {
2723 CompletionNode p =
2724 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2725 while ((r = result) == null) {
2726 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2727 p.next = completions, p))
2728 break;
2729 }
2730 }
2731 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2732 T t; Throwable ex;
2733 if (r instanceof AltResult) {
2734 ex = ((AltResult)r).ex;
2735 t = null;
2736 }
2737 else {
2738 ex = null;
2739 @SuppressWarnings("unchecked") T tr = (T) r;
2740 t = tr;
2741 }
2742 U u; Throwable dx;
2743 try {
2744 u = fn.apply(t, ex);
2745 dx = null;
2746 } catch (Throwable rex) {
2747 dx = rex;
2748 u = null;
2749 }
2750 dst.internalComplete(u, dx);
2751 }
2752 helpPostComplete();
2753 return dst;
2754 }
2755
2756
2757 /* ------------- Arbitrary-arity constructions -------------- */
2758
2759 /*
2760 * The basic plan of attack is to recursively form binary
2761 * completion trees of elements. This can be overkill for small
2762 * sets, but scales nicely. The And/All vs Or/Any forms use the
2763 * same idea, but details differ.
2764 */
2765
2766 /**
2767 * Returns a new CompletableFuture that is completed when all of
2768 * the given CompletableFutures complete. If any of the component
2769 * CompletableFuture complete exceptionally, then so does the
2770 * returned CompletableFuture. Otherwise, the results, if any, of
2771 * the component CompletableFutures are not reflected in the
2772 * returned CompletableFuture, but may be obtained by inspecting
2773 * them individually. If the number of components is zero, returns
2774 * a completed CompletableFuture.
2775 *
2776 * <p>Among the applications of this method is to await completion
2777 * of a set of independent CompletableFutures before continuing a
2778 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2779 * c3).join();}.
2780 *
2781 * @param cfs the CompletableFutures
2782 * @return a CompletableFuture that is complete when all of the
2783 * given CompletableFutures complete
2784 * @throws NullPointerException if the array or any of its elements are
2785 * {@code null}
2786 */
2787 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2788 int len = cfs.length; // Directly handle empty and singleton cases
2789 if (len > 1)
2790 return allTree(cfs, 0, len - 1);
2791 else {
2792 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2793 CompletableFuture<?> f;
2794 if (len == 0)
2795 dst.result = NIL;
2796 else if ((f = cfs[0]) == null)
2797 throw new NullPointerException();
2798 else {
2799 ThenCopy d = null;
2800 CompletionNode p = null;
2801 Object r;
2802 while ((r = f.result) == null) {
2803 if (d == null)
2804 d = new ThenCopy(f, dst);
2805 else if (p == null)
2806 p = new CompletionNode(d);
2807 else if (UNSAFE.compareAndSwapObject
2808 (f, COMPLETIONS, p.next = f.completions, p))
2809 break;
2810 }
2811 if (r != null && (d == null || d.compareAndSet(0, 1)))
2812 dst.internalComplete(null, (r instanceof AltResult) ?
2813 ((AltResult)r).ex : null);
2814 f.helpPostComplete();
2815 }
2816 return dst;
2817 }
2818 }
2819
2820 /**
2821 * Recursively constructs an And'ed tree of CompletableFutures.
2822 * Called only when array known to have at least two elements.
2823 */
2824 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2825 int lo, int hi) {
2826 CompletableFuture<?> fst, snd;
2827 int mid = (lo + hi) >>> 1;
2828 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2829 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2830 throw new NullPointerException();
2831 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2832 AndCompletion d = null;
2833 CompletionNode p = null, q = null;
2834 Object r = null, s = null;
2835 while ((r = fst.result) == null || (s = snd.result) == null) {
2836 if (d == null)
2837 d = new AndCompletion(fst, snd, dst);
2838 else if (p == null)
2839 p = new CompletionNode(d);
2840 else if (q == null) {
2841 if (UNSAFE.compareAndSwapObject
2842 (fst, COMPLETIONS, p.next = fst.completions, p))
2843 q = new CompletionNode(d);
2844 }
2845 else if (UNSAFE.compareAndSwapObject
2846 (snd, COMPLETIONS, q.next = snd.completions, q))
2847 break;
2848 }
2849 if ((r != null || (r = fst.result) != null) &&
2850 (s != null || (s = snd.result) != null) &&
2851 (d == null || d.compareAndSet(0, 1))) {
2852 Throwable ex;
2853 if (r instanceof AltResult)
2854 ex = ((AltResult)r).ex;
2855 else
2856 ex = null;
2857 if (ex == null && (s instanceof AltResult))
2858 ex = ((AltResult)s).ex;
2859 dst.internalComplete(null, ex);
2860 }
2861 fst.helpPostComplete();
2862 snd.helpPostComplete();
2863 return dst;
2864 }
2865
2866 /**
2867 * Returns a new CompletableFuture that is completed when any of
2868 * the component CompletableFutures complete; with the same result if
2869 * it completed normally, otherwise exceptionally. If the number
2870 * of components is zero, returns a completed CompletableFuture.
2871 *
2872 * @param cfs the CompletableFutures
2873 * @return a CompletableFuture that is complete when any of the
2874 * given CompletableFutures complete
2875 * @throws NullPointerException if the array or any of its elements are
2876 * {@code null}
2877 */
2878 public static CompletableFuture<?> anyOf(CompletableFuture<?>... cfs) {
2879 int len = cfs.length; // Same idea as allOf
2880 if (len > 1)
2881 return anyTree(cfs, 0, len - 1);
2882 else {
2883 CompletableFuture<?> dst = new CompletableFuture<Object>();
2884 CompletableFuture<?> f;
2885 if (len == 0)
2886 dst.result = NIL;
2887 else if ((f = cfs[0]) == null)
2888 throw new NullPointerException();
2889 else {
2890 ThenCopy d = null;
2891 CompletionNode p = null;
2892 Object r;
2893 while ((r = f.result) == null) {
2894 if (d == null)
2895 d = new ThenCopy(f, dst);
2896 else if (p == null)
2897 p = new CompletionNode(d);
2898 else if (UNSAFE.compareAndSwapObject
2899 (f, COMPLETIONS, p.next = f.completions, p))
2900 break;
2901 }
2902 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2903 Throwable ex; Object t;
2904 if (r instanceof AltResult) {
2905 ex = ((AltResult)r).ex;
2906 t = null;
2907 }
2908 else {
2909 ex = null;
2910 t = r;
2911 }
2912 dst.internalComplete(t, ex);
2913 }
2914 f.helpPostComplete();
2915 }
2916 return dst;
2917 }
2918 }
2919
2920 /**
2921 * Recursively constructs an Or'ed tree of CompletableFutures.
2922 */
2923 private static CompletableFuture<?> anyTree(CompletableFuture<?>[] cfs,
2924 int lo, int hi) {
2925 CompletableFuture<?> fst, snd;
2926 int mid = (lo + hi) >>> 1;
2927 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
2928 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
2929 throw new NullPointerException();
2930 CompletableFuture<?> dst = new CompletableFuture<Object>();
2931 OrCompletion d = null;
2932 CompletionNode p = null, q = null;
2933 Object r;
2934 while ((r = fst.result) == null && (r = snd.result) == null) {
2935 if (d == null)
2936 d = new OrCompletion(fst, snd, dst);
2937 else if (p == null)
2938 p = new CompletionNode(d);
2939 else if (q == null) {
2940 if (UNSAFE.compareAndSwapObject
2941 (fst, COMPLETIONS, p.next = fst.completions, p))
2942 q = new CompletionNode(d);
2943 }
2944 else if (UNSAFE.compareAndSwapObject
2945 (snd, COMPLETIONS, q.next = snd.completions, q))
2946 break;
2947 }
2948 if ((r != null || (r = fst.result) != null ||
2949 (r = snd.result) != null) &&
2950 (d == null || d.compareAndSet(0, 1))) {
2951 Throwable ex; Object t;
2952 if (r instanceof AltResult) {
2953 ex = ((AltResult)r).ex;
2954 t = null;
2955 }
2956 else {
2957 ex = null;
2958 t = r;
2959 }
2960 dst.internalComplete(t, ex);
2961 }
2962 fst.helpPostComplete();
2963 snd.helpPostComplete();
2964 return dst;
2965 }
2966
2967
2968 /* ------------- Control and status methods -------------- */
2969
2970 /**
2971 * If not already completed, completes this CompletableFuture with
2972 * a {@link CancellationException}. Dependent CompletableFutures
2973 * that have not already completed will also complete
2974 * exceptionally, with a {@link CompletionException} caused by
2975 * this {@code CancellationException}.
2976 *
2977 * @param mayInterruptIfRunning this value has no effect in this
2978 * implementation because interrupts are not used to control
2979 * processing.
2980 *
2981 * @return {@code true} if this task is now cancelled
2982 */
2983 public boolean cancel(boolean mayInterruptIfRunning) {
2984 Object r;
2985 boolean cancelled = (result == null) &&
2986 UNSAFE.compareAndSwapObject
2987 (this, RESULT, null, new AltResult(new CancellationException()));
2988 postComplete();
2989 return cancelled ||
2990 (((r = result) instanceof AltResult) &&
2991 (((AltResult)r).ex instanceof CancellationException));
2992 }
2993
2994 /**
2995 * Returns {@code true} if this CompletableFuture was cancelled
2996 * before it completed normally.
2997 *
2998 * @return {@code true} if this CompletableFuture was cancelled
2999 * before it completed normally
3000 */
3001 public boolean isCancelled() {
3002 Object r;
3003 return ((r = result) instanceof AltResult) &&
3004 (((AltResult)r).ex instanceof CancellationException);
3005 }
3006
3007 /**
3008 * Forcibly sets or resets the value subsequently returned by
3009 * method {@link #get()} and related methods, whether or not
3010 * already completed. This method is designed for use only in
3011 * error recovery actions, and even in such situations may result
3012 * in ongoing dependent completions using established versus
3013 * overwritten outcomes.
3014 *
3015 * @param value the completion value
3016 */
3017 public void obtrudeValue(T value) {
3018 result = (value == null) ? NIL : value;
3019 postComplete();
3020 }
3021
3022 /**
3023 * Forcibly causes subsequent invocations of method {@link #get()}
3024 * and related methods to throw the given exception, whether or
3025 * not already completed. This method is designed for use only in
3026 * recovery actions, and even in such situations may result in
3027 * ongoing dependent completions using established versus
3028 * overwritten outcomes.
3029 *
3030 * @param ex the exception
3031 */
3032 public void obtrudeException(Throwable ex) {
3033 if (ex == null) throw new NullPointerException();
3034 result = new AltResult(ex);
3035 postComplete();
3036 }
3037
3038 /**
3039 * Returns the estimated number of CompletableFutures whose
3040 * completions are awaiting completion of this CompletableFuture.
3041 * This method is designed for use in monitoring system state, not
3042 * for synchronization control.
3043 *
3044 * @return the number of dependent CompletableFutures
3045 */
3046 public int getNumberOfDependents() {
3047 int count = 0;
3048 for (CompletionNode p = completions; p != null; p = p.next)
3049 ++count;
3050 return count;
3051 }
3052
3053 /**
3054 * Returns a string identifying this CompletableFuture, as well as
3055 * its completion state. The state, in brackets, contains the
3056 * String {@code "Completed Normally"} or the String {@code
3057 * "Completed Exceptionally"}, or the String {@code "Not
3058 * completed"} followed by the number of CompletableFutures
3059 * dependent upon its completion, if any.
3060 *
3061 * @return a string identifying this CompletableFuture, as well as its state
3062 */
3063 public String toString() {
3064 Object r = result;
3065 int count;
3066 return super.toString() +
3067 ((r == null) ?
3068 (((count = getNumberOfDependents()) == 0) ?
3069 "[Not completed]" :
3070 "[Not completed, " + count + " dependents]") :
3071 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
3072 "[Completed exceptionally]" :
3073 "[Completed normally]"));
3074 }
3075
3076 // Unsafe mechanics
3077 private static final sun.misc.Unsafe UNSAFE;
3078 private static final long RESULT;
3079 private static final long WAITERS;
3080 private static final long COMPLETIONS;
3081 static {
3082 try {
3083 UNSAFE = sun.misc.Unsafe.getUnsafe();
3084 Class<?> k = CompletableFuture.class;
3085 RESULT = UNSAFE.objectFieldOffset
3086 (k.getDeclaredField("result"));
3087 WAITERS = UNSAFE.objectFieldOffset
3088 (k.getDeclaredField("waiters"));
3089 COMPLETIONS = UNSAFE.objectFieldOffset
3090 (k.getDeclaredField("completions"));
3091 } catch (Exception e) {
3092 throw new Error(e);
3093 }
3094 }
3095 }