ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.40
Committed: Wed Feb 6 06:57:14 2013 UTC (11 years, 3 months ago) by jsr166
Branch: MAIN
Changes since 1.39: +10 -9 lines
Log Message:
improve toString()

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.locks.LockSupport;
24
25 /**
26 * A {@link Future} that may be explicitly completed (setting its
27 * value and status), and may include dependent functions and actions
28 * that trigger upon its completion.
29 *
30 * <p>When two or more threads attempt to {@link #complete} or {@link
31 * #completeExceptionally} a CompletableFuture, only one of them
32 * succeeds.
33 *
34 * <p>Methods are available for adding dependents based on Functions,
35 * Consumers, and Runnables. The appropriate form to use depends on
36 * whether actions require arguments and/or produce results. Actions
37 * may also be triggered after either or both the current and another
38 * CompletableFuture complete. Multiple CompletableFutures may also
39 * be grouped as one using {@link #anyOf} and {@link #allOf}.
40 *
41 * <p>Actions supplied for dependent completions (mainly using methods
42 * with prefix {@code then}) may be performed by the thread that
43 * completes the current CompletableFuture, or by any other caller of
44 * these methods. There are no guarantees about the order of
45 * processing completions unless constrained by these methods.
46 *
47 * <p>Upon exceptional completion, or when a completion entails
48 * computation, and it terminates abruptly with an (unchecked)
49 * exception or error, then further completions act as {@code
50 * completeExceptionally} with a {@link CompletionException} holding
51 * that exception as its cause. If a CompletableFuture completes
52 * exceptionally, and is not followed by a {@link #exceptionally} or
53 * {@link #handle} completion, then all of its dependents (and their
54 * dependents) also complete exceptionally with CompletionExceptions
55 * holding the ultimate cause. In case of a CompletionException,
56 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
57 * {@link ExecutionException} with the same cause as would be held in
58 * the corresponding CompletionException. However, in these cases,
59 * methods {@link #join()} and {@link #getNow} throw the
60 * CompletionException, which simplifies usage especially within other
61 * completion functions.
62 *
63 * <p>CompletableFutures themselves do not execute asynchronously.
64 * However, the {@code async} methods provide commonly useful ways to
65 * commence asynchronous processing, using either a given {@link
66 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
67 * function or action that will result in the completion of a new
68 * CompletableFuture. To simplify monitoring, debugging, and tracking,
69 * all generated asynchronous tasks are instances of the tagging
70 * interface {@link AsynchronousCompletionTask}.
71 *
72 * @author Doug Lea
73 * @since 1.8
74 */
75 public class CompletableFuture<T> implements Future<T> {
76
77 /*
78 * Overview:
79 *
80 * 1. Non-nullness of field result (set via CAS) indicates done.
81 * An AltResult is used to box null as a result, as well as to
82 * hold exceptions. Using a single field makes completion fast
83 * and simple to detect and trigger, at the expense of a lot of
84 * encoding and decoding that infiltrates many methods. One minor
85 * simplification relies on the (static) NIL (to box null results)
86 * being the only AltResult with a null exception field, so we
87 * don't usually need explicit comparisons with NIL. The CF
88 * exception propagation mechanics surrounding decoding rely on
89 * unchecked casts of decoded results really being unchecked,
90 * where user type errors are caught at point of use, as is
91 * currently the case in Java. These are highlighted by using
92 * SuppressWarnings-annotated temporaries.
93 *
94 * 2. Waiters are held in a Treiber stack similar to the one used
95 * in FutureTask, Phaser, and SynchronousQueue. See their
96 * internal documentation for algorithmic details.
97 *
98 * 3. Completions are also kept in a list/stack, and pulled off
99 * and run when completion is triggered. (We could even use the
100 * same stack as for waiters, but would give up the potential
101 * parallelism obtained because woken waiters help release/run
102 * others -- see method postComplete). Because post-processing
103 * may race with direct calls, class Completion opportunistically
104 * extends AtomicInteger so callers can claim the action via
105 * compareAndSet(0, 1). The Completion.run methods are all
106 * written a boringly similar uniform way (that sometimes includes
107 * unnecessary-looking checks, kept to maintain uniformity). There
108 * are enough dimensions upon which they differ that attempts to
109 * factor commonalities while maintaining efficiency require more
110 * lines of code than they would save.
111 *
112 * 4. The exported then/and/or methods do support a bit of
113 * factoring (see doThenApply etc). They must cope with the
114 * intrinsic races surrounding addition of a dependent action
115 * versus performing the action directly because the task is
116 * already complete. For example, a CF may not be complete upon
117 * entry, so a dependent completion is added, but by the time it
118 * is added, the target CF is complete, so must be directly
119 * executed. This is all done while avoiding unnecessary object
120 * construction in safe-bypass cases.
121 */
122
123 // preliminaries
124
125 static final class AltResult {
126 final Throwable ex; // null only for NIL
127 AltResult(Throwable ex) { this.ex = ex; }
128 }
129
130 static final AltResult NIL = new AltResult(null);
131
132 // Fields
133
134 volatile Object result; // Either the result or boxed AltResult
135 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
136 volatile CompletionNode completions; // list (Treiber stack) of completions
137
138 // Basic utilities for triggering and processing completions
139
140 /**
141 * Removes and signals all waiting threads and runs all completions.
142 */
143 final void postComplete() {
144 WaitNode q; Thread t;
145 while ((q = waiters) != null) {
146 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
147 (t = q.thread) != null) {
148 q.thread = null;
149 LockSupport.unpark(t);
150 }
151 }
152
153 CompletionNode h; Completion c;
154 while ((h = completions) != null) {
155 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
156 (c = h.completion) != null)
157 c.run();
158 }
159 }
160
161 /**
162 * Triggers completion with the encoding of the given arguments:
163 * if the exception is non-null, encodes it as a wrapped
164 * CompletionException unless it is one already. Otherwise uses
165 * the given result, boxed as NIL if null.
166 */
167 final void internalComplete(Object v, Throwable ex) {
168 if (result == null)
169 UNSAFE.compareAndSwapObject
170 (this, RESULT, null,
171 (ex == null) ? (v == null) ? NIL : v :
172 new AltResult((ex instanceof CompletionException) ? ex :
173 new CompletionException(ex)));
174 postComplete(); // help out even if not triggered
175 }
176
177 /**
178 * If triggered, helps release and/or process completions.
179 */
180 final void helpPostComplete() {
181 if (result != null)
182 postComplete();
183 }
184
185 /* ------------- waiting for completions -------------- */
186
187 /** Number of processors, for spin control */
188 static final int NCPU = Runtime.getRuntime().availableProcessors();
189
190 /**
191 * Heuristic spin value for waitingGet() before blocking on
192 * multiprocessors
193 */
194 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
195
196 /**
197 * Linked nodes to record waiting threads in a Treiber stack. See
198 * other classes such as Phaser and SynchronousQueue for more
199 * detailed explanation. This class implements ManagedBlocker to
200 * avoid starvation when blocking actions pile up in
201 * ForkJoinPools.
202 */
203 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
204 long nanos; // wait time if timed
205 final long deadline; // non-zero if timed
206 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
207 volatile Thread thread;
208 volatile WaitNode next;
209 WaitNode(boolean interruptible, long nanos, long deadline) {
210 this.thread = Thread.currentThread();
211 this.interruptControl = interruptible ? 1 : 0;
212 this.nanos = nanos;
213 this.deadline = deadline;
214 }
215 public boolean isReleasable() {
216 if (thread == null)
217 return true;
218 if (Thread.interrupted()) {
219 int i = interruptControl;
220 interruptControl = -1;
221 if (i > 0)
222 return true;
223 }
224 if (deadline != 0L &&
225 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
226 thread = null;
227 return true;
228 }
229 return false;
230 }
231 public boolean block() {
232 if (isReleasable())
233 return true;
234 else if (deadline == 0L)
235 LockSupport.park(this);
236 else if (nanos > 0L)
237 LockSupport.parkNanos(this, nanos);
238 return isReleasable();
239 }
240 }
241
242 /**
243 * Returns raw result after waiting, or null if interruptible and
244 * interrupted.
245 */
246 private Object waitingGet(boolean interruptible) {
247 WaitNode q = null;
248 boolean queued = false;
249 int spins = SPINS;
250 for (Object r;;) {
251 if ((r = result) != null) {
252 if (q != null) { // suppress unpark
253 q.thread = null;
254 if (q.interruptControl < 0) {
255 if (interruptible) {
256 removeWaiter(q);
257 return null;
258 }
259 Thread.currentThread().interrupt();
260 }
261 }
262 postComplete(); // help release others
263 return r;
264 }
265 else if (spins > 0) {
266 int rnd = ThreadLocalRandom.nextSecondarySeed();
267 if (rnd == 0)
268 rnd = ThreadLocalRandom.current().nextInt();
269 if (rnd >= 0)
270 --spins;
271 }
272 else if (q == null)
273 q = new WaitNode(interruptible, 0L, 0L);
274 else if (!queued)
275 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
276 q.next = waiters, q);
277 else if (interruptible && q.interruptControl < 0) {
278 removeWaiter(q);
279 return null;
280 }
281 else if (q.thread != null && result == null) {
282 try {
283 ForkJoinPool.managedBlock(q);
284 } catch (InterruptedException ex) {
285 q.interruptControl = -1;
286 }
287 }
288 }
289 }
290
291 /**
292 * Awaits completion or aborts on interrupt or timeout.
293 *
294 * @param nanos time to wait
295 * @return raw result
296 */
297 private Object timedAwaitDone(long nanos)
298 throws InterruptedException, TimeoutException {
299 WaitNode q = null;
300 boolean queued = false;
301 for (Object r;;) {
302 if ((r = result) != null) {
303 if (q != null) {
304 q.thread = null;
305 if (q.interruptControl < 0) {
306 removeWaiter(q);
307 throw new InterruptedException();
308 }
309 }
310 postComplete();
311 return r;
312 }
313 else if (q == null) {
314 if (nanos <= 0L)
315 throw new TimeoutException();
316 long d = System.nanoTime() + nanos;
317 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
318 }
319 else if (!queued)
320 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
321 q.next = waiters, q);
322 else if (q.interruptControl < 0) {
323 removeWaiter(q);
324 throw new InterruptedException();
325 }
326 else if (q.nanos <= 0L) {
327 if (result == null) {
328 removeWaiter(q);
329 throw new TimeoutException();
330 }
331 }
332 else if (q.thread != null && result == null) {
333 try {
334 ForkJoinPool.managedBlock(q);
335 } catch (InterruptedException ex) {
336 q.interruptControl = -1;
337 }
338 }
339 }
340 }
341
342 /**
343 * Tries to unlink a timed-out or interrupted wait node to avoid
344 * accumulating garbage. Internal nodes are simply unspliced
345 * without CAS since it is harmless if they are traversed anyway
346 * by releasers. To avoid effects of unsplicing from already
347 * removed nodes, the list is retraversed in case of an apparent
348 * race. This is slow when there are a lot of nodes, but we don't
349 * expect lists to be long enough to outweigh higher-overhead
350 * schemes.
351 */
352 private void removeWaiter(WaitNode node) {
353 if (node != null) {
354 node.thread = null;
355 retry:
356 for (;;) { // restart on removeWaiter race
357 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
358 s = q.next;
359 if (q.thread != null)
360 pred = q;
361 else if (pred != null) {
362 pred.next = s;
363 if (pred.thread == null) // check for race
364 continue retry;
365 }
366 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
367 continue retry;
368 }
369 break;
370 }
371 }
372 }
373
374 /* ------------- Async tasks -------------- */
375
376 /**
377 * A tagging interface identifying asynchronous tasks produced by
378 * {@code async} methods. This may be useful for monitoring,
379 * debugging, and tracking asynchronous activities.
380 */
381 public static interface AsynchronousCompletionTask {
382 }
383
384 /** Base class can act as either FJ or plain Runnable */
385 abstract static class Async extends ForkJoinTask<Void>
386 implements Runnable, AsynchronousCompletionTask {
387 public final Void getRawResult() { return null; }
388 public final void setRawResult(Void v) { }
389 public final void run() { exec(); }
390 }
391
392 static final class AsyncRun extends Async {
393 final Runnable fn;
394 final CompletableFuture<Void> dst;
395 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
396 this.fn = fn; this.dst = dst;
397 }
398 public final boolean exec() {
399 CompletableFuture<Void> d; Throwable ex;
400 if ((d = this.dst) != null && d.result == null) {
401 try {
402 fn.run();
403 ex = null;
404 } catch (Throwable rex) {
405 ex = rex;
406 }
407 d.internalComplete(null, ex);
408 }
409 return true;
410 }
411 private static final long serialVersionUID = 5232453952276885070L;
412 }
413
414 static final class AsyncSupply<U> extends Async {
415 final Supplier<U> fn;
416 final CompletableFuture<U> dst;
417 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
418 this.fn = fn; this.dst = dst;
419 }
420 public final boolean exec() {
421 CompletableFuture<U> d; U u; Throwable ex;
422 if ((d = this.dst) != null && d.result == null) {
423 try {
424 u = fn.get();
425 ex = null;
426 } catch (Throwable rex) {
427 ex = rex;
428 u = null;
429 }
430 d.internalComplete(u, ex);
431 }
432 return true;
433 }
434 private static final long serialVersionUID = 5232453952276885070L;
435 }
436
437 static final class AsyncApply<T,U> extends Async {
438 final Function<? super T,? extends U> fn;
439 final T arg;
440 final CompletableFuture<U> dst;
441 AsyncApply(T arg, Function<? super T,? extends U> fn,
442 CompletableFuture<U> dst) {
443 this.arg = arg; this.fn = fn; this.dst = dst;
444 }
445 public final boolean exec() {
446 CompletableFuture<U> d; U u; Throwable ex;
447 if ((d = this.dst) != null && d.result == null) {
448 try {
449 u = fn.apply(arg);
450 ex = null;
451 } catch (Throwable rex) {
452 ex = rex;
453 u = null;
454 }
455 d.internalComplete(u, ex);
456 }
457 return true;
458 }
459 private static final long serialVersionUID = 5232453952276885070L;
460 }
461
462 static final class AsyncBiApply<T,U,V> extends Async {
463 final BiFunction<? super T,? super U,? extends V> fn;
464 final T arg1;
465 final U arg2;
466 final CompletableFuture<V> dst;
467 AsyncBiApply(T arg1, U arg2,
468 BiFunction<? super T,? super U,? extends V> fn,
469 CompletableFuture<V> dst) {
470 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
471 }
472 public final boolean exec() {
473 CompletableFuture<V> d; V v; Throwable ex;
474 if ((d = this.dst) != null && d.result == null) {
475 try {
476 v = fn.apply(arg1, arg2);
477 ex = null;
478 } catch (Throwable rex) {
479 ex = rex;
480 v = null;
481 }
482 d.internalComplete(v, ex);
483 }
484 return true;
485 }
486 private static final long serialVersionUID = 5232453952276885070L;
487 }
488
489 static final class AsyncAccept<T> extends Async {
490 final Consumer<? super T> fn;
491 final T arg;
492 final CompletableFuture<Void> dst;
493 AsyncAccept(T arg, Consumer<? super T> fn,
494 CompletableFuture<Void> dst) {
495 this.arg = arg; this.fn = fn; this.dst = dst;
496 }
497 public final boolean exec() {
498 CompletableFuture<Void> d; Throwable ex;
499 if ((d = this.dst) != null && d.result == null) {
500 try {
501 fn.accept(arg);
502 ex = null;
503 } catch (Throwable rex) {
504 ex = rex;
505 }
506 d.internalComplete(null, ex);
507 }
508 return true;
509 }
510 private static final long serialVersionUID = 5232453952276885070L;
511 }
512
513 static final class AsyncBiAccept<T,U> extends Async {
514 final BiConsumer<? super T,? super U> fn;
515 final T arg1;
516 final U arg2;
517 final CompletableFuture<Void> dst;
518 AsyncBiAccept(T arg1, U arg2,
519 BiConsumer<? super T,? super U> fn,
520 CompletableFuture<Void> dst) {
521 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
522 }
523 public final boolean exec() {
524 CompletableFuture<Void> d; Throwable ex;
525 if ((d = this.dst) != null && d.result == null) {
526 try {
527 fn.accept(arg1, arg2);
528 ex = null;
529 } catch (Throwable rex) {
530 ex = rex;
531 }
532 d.internalComplete(null, ex);
533 }
534 return true;
535 }
536 private static final long serialVersionUID = 5232453952276885070L;
537 }
538
539 static final class AsyncCompose<T,U> extends Async {
540 final Function<? super T, CompletableFuture<U>> fn;
541 final T arg;
542 final CompletableFuture<U> dst;
543 AsyncCompose(T arg,
544 Function<? super T, CompletableFuture<U>> fn,
545 CompletableFuture<U> dst) {
546 this.arg = arg; this.fn = fn; this.dst = dst;
547 }
548 public final boolean exec() {
549 CompletableFuture<U> d, fr; U u; Throwable ex;
550 if ((d = this.dst) != null && d.result == null) {
551 try {
552 fr = fn.apply(arg);
553 ex = null;
554 } catch (Throwable rex) {
555 ex = rex;
556 fr = null;
557 }
558 if (ex != null)
559 u = null;
560 else if (fr == null) {
561 ex = new NullPointerException();
562 u = null;
563 }
564 else {
565 Object r = fr.result;
566 if (r instanceof AltResult) {
567 ex = ((AltResult)r).ex;
568 u = null;
569 }
570 else {
571 @SuppressWarnings("unchecked") U ur = (U) r;
572 u = ur;
573 }
574 }
575 d.internalComplete(u, ex);
576 }
577 return true;
578 }
579 private static final long serialVersionUID = 5232453952276885070L;
580 }
581
582 /* ------------- Completions -------------- */
583
584 /**
585 * Simple linked list nodes to record completions, used in
586 * basically the same way as WaitNodes. (We separate nodes from
587 * the Completions themselves mainly because for the And and Or
588 * methods, the same Completion object resides in two lists.)
589 */
590 static final class CompletionNode {
591 final Completion completion;
592 volatile CompletionNode next;
593 CompletionNode(Completion completion) { this.completion = completion; }
594 }
595
596 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
597 abstract static class Completion extends AtomicInteger implements Runnable {
598 }
599
600 static final class ApplyCompletion<T,U> extends Completion {
601 final CompletableFuture<? extends T> src;
602 final Function<? super T,? extends U> fn;
603 final CompletableFuture<U> dst;
604 final Executor executor;
605 ApplyCompletion(CompletableFuture<? extends T> src,
606 Function<? super T,? extends U> fn,
607 CompletableFuture<U> dst, Executor executor) {
608 this.src = src; this.fn = fn; this.dst = dst;
609 this.executor = executor;
610 }
611 public final void run() {
612 final CompletableFuture<? extends T> a;
613 final Function<? super T,? extends U> fn;
614 final CompletableFuture<U> dst;
615 Object r; T t; Throwable ex;
616 if ((dst = this.dst) != null &&
617 (fn = this.fn) != null &&
618 (a = this.src) != null &&
619 (r = a.result) != null &&
620 compareAndSet(0, 1)) {
621 if (r instanceof AltResult) {
622 ex = ((AltResult)r).ex;
623 t = null;
624 }
625 else {
626 ex = null;
627 @SuppressWarnings("unchecked") T tr = (T) r;
628 t = tr;
629 }
630 Executor e = executor;
631 U u = null;
632 if (ex == null) {
633 try {
634 if (e != null)
635 e.execute(new AsyncApply<T,U>(t, fn, dst));
636 else
637 u = fn.apply(t);
638 } catch (Throwable rex) {
639 ex = rex;
640 }
641 }
642 if (e == null || ex != null)
643 dst.internalComplete(u, ex);
644 }
645 }
646 private static final long serialVersionUID = 5232453952276885070L;
647 }
648
649 static final class AcceptCompletion<T> extends Completion {
650 final CompletableFuture<? extends T> src;
651 final Consumer<? super T> fn;
652 final CompletableFuture<Void> dst;
653 final Executor executor;
654 AcceptCompletion(CompletableFuture<? extends T> src,
655 Consumer<? super T> fn,
656 CompletableFuture<Void> dst, Executor executor) {
657 this.src = src; this.fn = fn; this.dst = dst;
658 this.executor = executor;
659 }
660 public final void run() {
661 final CompletableFuture<? extends T> a;
662 final Consumer<? super T> fn;
663 final CompletableFuture<Void> dst;
664 Object r; T t; Throwable ex;
665 if ((dst = this.dst) != null &&
666 (fn = this.fn) != null &&
667 (a = this.src) != null &&
668 (r = a.result) != null &&
669 compareAndSet(0, 1)) {
670 if (r instanceof AltResult) {
671 ex = ((AltResult)r).ex;
672 t = null;
673 }
674 else {
675 ex = null;
676 @SuppressWarnings("unchecked") T tr = (T) r;
677 t = tr;
678 }
679 Executor e = executor;
680 if (ex == null) {
681 try {
682 if (e != null)
683 e.execute(new AsyncAccept<T>(t, fn, dst));
684 else
685 fn.accept(t);
686 } catch (Throwable rex) {
687 ex = rex;
688 }
689 }
690 if (e == null || ex != null)
691 dst.internalComplete(null, ex);
692 }
693 }
694 private static final long serialVersionUID = 5232453952276885070L;
695 }
696
697 static final class RunCompletion<T> extends Completion {
698 final CompletableFuture<? extends T> src;
699 final Runnable fn;
700 final CompletableFuture<Void> dst;
701 final Executor executor;
702 RunCompletion(CompletableFuture<? extends T> src,
703 Runnable fn,
704 CompletableFuture<Void> dst,
705 Executor executor) {
706 this.src = src; this.fn = fn; this.dst = dst;
707 this.executor = executor;
708 }
709 public final void run() {
710 final CompletableFuture<? extends T> a;
711 final Runnable fn;
712 final CompletableFuture<Void> dst;
713 Object r; Throwable ex;
714 if ((dst = this.dst) != null &&
715 (fn = this.fn) != null &&
716 (a = this.src) != null &&
717 (r = a.result) != null &&
718 compareAndSet(0, 1)) {
719 if (r instanceof AltResult)
720 ex = ((AltResult)r).ex;
721 else
722 ex = null;
723 Executor e = executor;
724 if (ex == null) {
725 try {
726 if (e != null)
727 e.execute(new AsyncRun(fn, dst));
728 else
729 fn.run();
730 } catch (Throwable rex) {
731 ex = rex;
732 }
733 }
734 if (e == null || ex != null)
735 dst.internalComplete(null, ex);
736 }
737 }
738 private static final long serialVersionUID = 5232453952276885070L;
739 }
740
741 static final class BiApplyCompletion<T,U,V> extends Completion {
742 final CompletableFuture<? extends T> src;
743 final CompletableFuture<? extends U> snd;
744 final BiFunction<? super T,? super U,? extends V> fn;
745 final CompletableFuture<V> dst;
746 final Executor executor;
747 BiApplyCompletion(CompletableFuture<? extends T> src,
748 CompletableFuture<? extends U> snd,
749 BiFunction<? super T,? super U,? extends V> fn,
750 CompletableFuture<V> dst, Executor executor) {
751 this.src = src; this.snd = snd;
752 this.fn = fn; this.dst = dst;
753 this.executor = executor;
754 }
755 public final void run() {
756 final CompletableFuture<? extends T> a;
757 final CompletableFuture<? extends U> b;
758 final BiFunction<? super T,? super U,? extends V> fn;
759 final CompletableFuture<V> dst;
760 Object r, s; T t; U u; Throwable ex;
761 if ((dst = this.dst) != null &&
762 (fn = this.fn) != null &&
763 (a = this.src) != null &&
764 (r = a.result) != null &&
765 (b = this.snd) != null &&
766 (s = b.result) != null &&
767 compareAndSet(0, 1)) {
768 if (r instanceof AltResult) {
769 ex = ((AltResult)r).ex;
770 t = null;
771 }
772 else {
773 ex = null;
774 @SuppressWarnings("unchecked") T tr = (T) r;
775 t = tr;
776 }
777 if (ex != null)
778 u = null;
779 else if (s instanceof AltResult) {
780 ex = ((AltResult)s).ex;
781 u = null;
782 }
783 else {
784 @SuppressWarnings("unchecked") U us = (U) s;
785 u = us;
786 }
787 Executor e = executor;
788 V v = null;
789 if (ex == null) {
790 try {
791 if (e != null)
792 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
793 else
794 v = fn.apply(t, u);
795 } catch (Throwable rex) {
796 ex = rex;
797 }
798 }
799 if (e == null || ex != null)
800 dst.internalComplete(v, ex);
801 }
802 }
803 private static final long serialVersionUID = 5232453952276885070L;
804 }
805
806 static final class BiAcceptCompletion<T,U> extends Completion {
807 final CompletableFuture<? extends T> src;
808 final CompletableFuture<? extends U> snd;
809 final BiConsumer<? super T,? super U> fn;
810 final CompletableFuture<Void> dst;
811 final Executor executor;
812 BiAcceptCompletion(CompletableFuture<? extends T> src,
813 CompletableFuture<? extends U> snd,
814 BiConsumer<? super T,? super U> fn,
815 CompletableFuture<Void> dst, Executor executor) {
816 this.src = src; this.snd = snd;
817 this.fn = fn; this.dst = dst;
818 this.executor = executor;
819 }
820 public final void run() {
821 final CompletableFuture<? extends T> a;
822 final CompletableFuture<? extends U> b;
823 final BiConsumer<? super T,? super U> fn;
824 final CompletableFuture<Void> dst;
825 Object r, s; T t; U u; Throwable ex;
826 if ((dst = this.dst) != null &&
827 (fn = this.fn) != null &&
828 (a = this.src) != null &&
829 (r = a.result) != null &&
830 (b = this.snd) != null &&
831 (s = b.result) != null &&
832 compareAndSet(0, 1)) {
833 if (r instanceof AltResult) {
834 ex = ((AltResult)r).ex;
835 t = null;
836 }
837 else {
838 ex = null;
839 @SuppressWarnings("unchecked") T tr = (T) r;
840 t = tr;
841 }
842 if (ex != null)
843 u = null;
844 else if (s instanceof AltResult) {
845 ex = ((AltResult)s).ex;
846 u = null;
847 }
848 else {
849 @SuppressWarnings("unchecked") U us = (U) s;
850 u = us;
851 }
852 Executor e = executor;
853 if (ex == null) {
854 try {
855 if (e != null)
856 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
857 else
858 fn.accept(t, u);
859 } catch (Throwable rex) {
860 ex = rex;
861 }
862 }
863 if (e == null || ex != null)
864 dst.internalComplete(null, ex);
865 }
866 }
867 private static final long serialVersionUID = 5232453952276885070L;
868 }
869
870 static final class BiRunCompletion<T> extends Completion {
871 final CompletableFuture<? extends T> src;
872 final CompletableFuture<?> snd;
873 final Runnable fn;
874 final CompletableFuture<Void> dst;
875 final Executor executor;
876 BiRunCompletion(CompletableFuture<? extends T> src,
877 CompletableFuture<?> snd,
878 Runnable fn,
879 CompletableFuture<Void> dst, Executor executor) {
880 this.src = src; this.snd = snd;
881 this.fn = fn; this.dst = dst;
882 this.executor = executor;
883 }
884 public final void run() {
885 final CompletableFuture<? extends T> a;
886 final CompletableFuture<?> b;
887 final Runnable fn;
888 final CompletableFuture<Void> dst;
889 Object r, s; Throwable ex;
890 if ((dst = this.dst) != null &&
891 (fn = this.fn) != null &&
892 (a = this.src) != null &&
893 (r = a.result) != null &&
894 (b = this.snd) != null &&
895 (s = b.result) != null &&
896 compareAndSet(0, 1)) {
897 if (r instanceof AltResult)
898 ex = ((AltResult)r).ex;
899 else
900 ex = null;
901 if (ex == null && (s instanceof AltResult))
902 ex = ((AltResult)s).ex;
903 Executor e = executor;
904 if (ex == null) {
905 try {
906 if (e != null)
907 e.execute(new AsyncRun(fn, dst));
908 else
909 fn.run();
910 } catch (Throwable rex) {
911 ex = rex;
912 }
913 }
914 if (e == null || ex != null)
915 dst.internalComplete(null, ex);
916 }
917 }
918 private static final long serialVersionUID = 5232453952276885070L;
919 }
920
921 static final class AndCompletion extends Completion {
922 final CompletableFuture<?> src;
923 final CompletableFuture<?> snd;
924 final CompletableFuture<Void> dst;
925 AndCompletion(CompletableFuture<?> src,
926 CompletableFuture<?> snd,
927 CompletableFuture<Void> dst) {
928 this.src = src; this.snd = snd; this.dst = dst;
929 }
930 public final void run() {
931 final CompletableFuture<?> a;
932 final CompletableFuture<?> b;
933 final CompletableFuture<Void> dst;
934 Object r, s; Throwable ex;
935 if ((dst = this.dst) != null &&
936 (a = this.src) != null &&
937 (r = a.result) != null &&
938 (b = this.snd) != null &&
939 (s = b.result) != null &&
940 compareAndSet(0, 1)) {
941 if (r instanceof AltResult)
942 ex = ((AltResult)r).ex;
943 else
944 ex = null;
945 if (ex == null && (s instanceof AltResult))
946 ex = ((AltResult)s).ex;
947 dst.internalComplete(null, ex);
948 }
949 }
950 private static final long serialVersionUID = 5232453952276885070L;
951 }
952
953 static final class OrApplyCompletion<T,U> extends Completion {
954 final CompletableFuture<? extends T> src;
955 final CompletableFuture<? extends T> snd;
956 final Function<? super T,? extends U> fn;
957 final CompletableFuture<U> dst;
958 final Executor executor;
959 OrApplyCompletion(CompletableFuture<? extends T> src,
960 CompletableFuture<? extends T> snd,
961 Function<? super T,? extends U> fn,
962 CompletableFuture<U> dst, Executor executor) {
963 this.src = src; this.snd = snd;
964 this.fn = fn; this.dst = dst;
965 this.executor = executor;
966 }
967 public final void run() {
968 final CompletableFuture<? extends T> a;
969 final CompletableFuture<? extends T> b;
970 final Function<? super T,? extends U> fn;
971 final CompletableFuture<U> dst;
972 Object r; T t; Throwable ex;
973 if ((dst = this.dst) != null &&
974 (fn = this.fn) != null &&
975 (((a = this.src) != null && (r = a.result) != null) ||
976 ((b = this.snd) != null && (r = b.result) != null)) &&
977 compareAndSet(0, 1)) {
978 if (r instanceof AltResult) {
979 ex = ((AltResult)r).ex;
980 t = null;
981 }
982 else {
983 ex = null;
984 @SuppressWarnings("unchecked") T tr = (T) r;
985 t = tr;
986 }
987 Executor e = executor;
988 U u = null;
989 if (ex == null) {
990 try {
991 if (e != null)
992 e.execute(new AsyncApply<T,U>(t, fn, dst));
993 else
994 u = fn.apply(t);
995 } catch (Throwable rex) {
996 ex = rex;
997 }
998 }
999 if (e == null || ex != null)
1000 dst.internalComplete(u, ex);
1001 }
1002 }
1003 private static final long serialVersionUID = 5232453952276885070L;
1004 }
1005
1006 static final class OrAcceptCompletion<T> extends Completion {
1007 final CompletableFuture<? extends T> src;
1008 final CompletableFuture<? extends T> snd;
1009 final Consumer<? super T> fn;
1010 final CompletableFuture<Void> dst;
1011 final Executor executor;
1012 OrAcceptCompletion(CompletableFuture<? extends T> src,
1013 CompletableFuture<? extends T> snd,
1014 Consumer<? super T> fn,
1015 CompletableFuture<Void> dst, Executor executor) {
1016 this.src = src; this.snd = snd;
1017 this.fn = fn; this.dst = dst;
1018 this.executor = executor;
1019 }
1020 public final void run() {
1021 final CompletableFuture<? extends T> a;
1022 final CompletableFuture<? extends T> b;
1023 final Consumer<? super T> fn;
1024 final CompletableFuture<Void> dst;
1025 Object r; T t; Throwable ex;
1026 if ((dst = this.dst) != null &&
1027 (fn = this.fn) != null &&
1028 (((a = this.src) != null && (r = a.result) != null) ||
1029 ((b = this.snd) != null && (r = b.result) != null)) &&
1030 compareAndSet(0, 1)) {
1031 if (r instanceof AltResult) {
1032 ex = ((AltResult)r).ex;
1033 t = null;
1034 }
1035 else {
1036 ex = null;
1037 @SuppressWarnings("unchecked") T tr = (T) r;
1038 t = tr;
1039 }
1040 Executor e = executor;
1041 if (ex == null) {
1042 try {
1043 if (e != null)
1044 e.execute(new AsyncAccept<T>(t, fn, dst));
1045 else
1046 fn.accept(t);
1047 } catch (Throwable rex) {
1048 ex = rex;
1049 }
1050 }
1051 if (e == null || ex != null)
1052 dst.internalComplete(null, ex);
1053 }
1054 }
1055 private static final long serialVersionUID = 5232453952276885070L;
1056 }
1057
1058 static final class OrRunCompletion<T> extends Completion {
1059 final CompletableFuture<? extends T> src;
1060 final CompletableFuture<?> snd;
1061 final Runnable fn;
1062 final CompletableFuture<Void> dst;
1063 final Executor executor;
1064 OrRunCompletion(CompletableFuture<? extends T> src,
1065 CompletableFuture<?> snd,
1066 Runnable fn,
1067 CompletableFuture<Void> dst, Executor executor) {
1068 this.src = src; this.snd = snd;
1069 this.fn = fn; this.dst = dst;
1070 this.executor = executor;
1071 }
1072 public final void run() {
1073 final CompletableFuture<? extends T> a;
1074 final CompletableFuture<?> b;
1075 final Runnable fn;
1076 final CompletableFuture<Void> dst;
1077 Object r; Throwable ex;
1078 if ((dst = this.dst) != null &&
1079 (fn = this.fn) != null &&
1080 (((a = this.src) != null && (r = a.result) != null) ||
1081 ((b = this.snd) != null && (r = b.result) != null)) &&
1082 compareAndSet(0, 1)) {
1083 if (r instanceof AltResult)
1084 ex = ((AltResult)r).ex;
1085 else
1086 ex = null;
1087 Executor e = executor;
1088 if (ex == null) {
1089 try {
1090 if (e != null)
1091 e.execute(new AsyncRun(fn, dst));
1092 else
1093 fn.run();
1094 } catch (Throwable rex) {
1095 ex = rex;
1096 }
1097 }
1098 if (e == null || ex != null)
1099 dst.internalComplete(null, ex);
1100 }
1101 }
1102 private static final long serialVersionUID = 5232453952276885070L;
1103 }
1104
1105 static final class OrCompletion extends Completion {
1106 final CompletableFuture<?> src;
1107 final CompletableFuture<?> snd;
1108 final CompletableFuture<?> dst;
1109 OrCompletion(CompletableFuture<?> src,
1110 CompletableFuture<?> snd,
1111 CompletableFuture<?> dst) {
1112 this.src = src; this.snd = snd; this.dst = dst;
1113 }
1114 public final void run() {
1115 final CompletableFuture<?> a;
1116 final CompletableFuture<?> b;
1117 final CompletableFuture<?> dst;
1118 Object r, t; Throwable ex;
1119 if ((dst = this.dst) != null &&
1120 (((a = this.src) != null && (r = a.result) != null) ||
1121 ((b = this.snd) != null && (r = b.result) != null)) &&
1122 compareAndSet(0, 1)) {
1123 if (r instanceof AltResult) {
1124 ex = ((AltResult)r).ex;
1125 t = null;
1126 }
1127 else {
1128 ex = null;
1129 t = r;
1130 }
1131 dst.internalComplete(t, ex);
1132 }
1133 }
1134 private static final long serialVersionUID = 5232453952276885070L;
1135 }
1136
1137 static final class ExceptionCompletion<T> extends Completion {
1138 final CompletableFuture<? extends T> src;
1139 final Function<? super Throwable, ? extends T> fn;
1140 final CompletableFuture<T> dst;
1141 ExceptionCompletion(CompletableFuture<? extends T> src,
1142 Function<? super Throwable, ? extends T> fn,
1143 CompletableFuture<T> dst) {
1144 this.src = src; this.fn = fn; this.dst = dst;
1145 }
1146 public final void run() {
1147 final CompletableFuture<? extends T> a;
1148 final Function<? super Throwable, ? extends T> fn;
1149 final CompletableFuture<T> dst;
1150 Object r; T t = null; Throwable ex, dx = null;
1151 if ((dst = this.dst) != null &&
1152 (fn = this.fn) != null &&
1153 (a = this.src) != null &&
1154 (r = a.result) != null &&
1155 compareAndSet(0, 1)) {
1156 if ((r instanceof AltResult) &&
1157 (ex = ((AltResult)r).ex) != null) {
1158 try {
1159 t = fn.apply(ex);
1160 } catch (Throwable rex) {
1161 dx = rex;
1162 }
1163 }
1164 else {
1165 @SuppressWarnings("unchecked") T tr = (T) r;
1166 t = tr;
1167 }
1168 dst.internalComplete(t, dx);
1169 }
1170 }
1171 private static final long serialVersionUID = 5232453952276885070L;
1172 }
1173
1174 static final class ThenCopy extends Completion {
1175 final CompletableFuture<?> src;
1176 final CompletableFuture<?> dst;
1177 ThenCopy(CompletableFuture<?> src,
1178 CompletableFuture<?> dst) {
1179 this.src = src; this.dst = dst;
1180 }
1181 public final void run() {
1182 final CompletableFuture<?> a;
1183 final CompletableFuture<?> dst;
1184 Object r; Object t; Throwable ex;
1185 if ((dst = this.dst) != null &&
1186 (a = this.src) != null &&
1187 (r = a.result) != null &&
1188 compareAndSet(0, 1)) {
1189 if (r instanceof AltResult) {
1190 ex = ((AltResult)r).ex;
1191 t = null;
1192 }
1193 else {
1194 ex = null;
1195 t = r;
1196 }
1197 dst.internalComplete(t, ex);
1198 }
1199 }
1200 private static final long serialVersionUID = 5232453952276885070L;
1201 }
1202
1203 static final class HandleCompletion<T,U> extends Completion {
1204 final CompletableFuture<? extends T> src;
1205 final BiFunction<? super T, Throwable, ? extends U> fn;
1206 final CompletableFuture<U> dst;
1207 HandleCompletion(CompletableFuture<? extends T> src,
1208 BiFunction<? super T, Throwable, ? extends U> fn,
1209 final CompletableFuture<U> dst) {
1210 this.src = src; this.fn = fn; this.dst = dst;
1211 }
1212 public final void run() {
1213 final CompletableFuture<? extends T> a;
1214 final BiFunction<? super T, Throwable, ? extends U> fn;
1215 final CompletableFuture<U> dst;
1216 Object r; T t; Throwable ex;
1217 if ((dst = this.dst) != null &&
1218 (fn = this.fn) != null &&
1219 (a = this.src) != null &&
1220 (r = a.result) != null &&
1221 compareAndSet(0, 1)) {
1222 if (r instanceof AltResult) {
1223 ex = ((AltResult)r).ex;
1224 t = null;
1225 }
1226 else {
1227 ex = null;
1228 @SuppressWarnings("unchecked") T tr = (T) r;
1229 t = tr;
1230 }
1231 U u = null; Throwable dx = null;
1232 try {
1233 u = fn.apply(t, ex);
1234 } catch (Throwable rex) {
1235 dx = rex;
1236 }
1237 dst.internalComplete(u, dx);
1238 }
1239 }
1240 private static final long serialVersionUID = 5232453952276885070L;
1241 }
1242
1243 static final class ComposeCompletion<T,U> extends Completion {
1244 final CompletableFuture<? extends T> src;
1245 final Function<? super T, CompletableFuture<U>> fn;
1246 final CompletableFuture<U> dst;
1247 final Executor executor;
1248 ComposeCompletion(CompletableFuture<? extends T> src,
1249 Function<? super T, CompletableFuture<U>> fn,
1250 final CompletableFuture<U> dst, Executor executor) {
1251 this.src = src; this.fn = fn; this.dst = dst;
1252 this.executor = executor;
1253 }
1254 public final void run() {
1255 final CompletableFuture<? extends T> a;
1256 final Function<? super T, CompletableFuture<U>> fn;
1257 final CompletableFuture<U> dst;
1258 Object r; T t; Throwable ex; Executor e;
1259 if ((dst = this.dst) != null &&
1260 (fn = this.fn) != null &&
1261 (a = this.src) != null &&
1262 (r = a.result) != null &&
1263 compareAndSet(0, 1)) {
1264 if (r instanceof AltResult) {
1265 ex = ((AltResult)r).ex;
1266 t = null;
1267 }
1268 else {
1269 ex = null;
1270 @SuppressWarnings("unchecked") T tr = (T) r;
1271 t = tr;
1272 }
1273 CompletableFuture<U> c = null;
1274 U u = null;
1275 boolean complete = false;
1276 if (ex == null) {
1277 if ((e = executor) != null)
1278 e.execute(new AsyncCompose<T,U>(t, fn, dst));
1279 else {
1280 try {
1281 if ((c = fn.apply(t)) == null)
1282 ex = new NullPointerException();
1283 } catch (Throwable rex) {
1284 ex = rex;
1285 }
1286 }
1287 }
1288 if (c != null) {
1289 ThenCopy d = null;
1290 Object s;
1291 if ((s = c.result) == null) {
1292 CompletionNode p = new CompletionNode
1293 (d = new ThenCopy(c, dst));
1294 while ((s = c.result) == null) {
1295 if (UNSAFE.compareAndSwapObject
1296 (c, COMPLETIONS, p.next = c.completions, p))
1297 break;
1298 }
1299 }
1300 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1301 complete = true;
1302 if (s instanceof AltResult) {
1303 ex = ((AltResult)s).ex; // no rewrap
1304 u = null;
1305 }
1306 else {
1307 @SuppressWarnings("unchecked") U us = (U) s;
1308 u = us;
1309 }
1310 }
1311 }
1312 if (complete || ex != null)
1313 dst.internalComplete(u, ex);
1314 if (c != null)
1315 c.helpPostComplete();
1316 }
1317 }
1318 private static final long serialVersionUID = 5232453952276885070L;
1319 }
1320
1321 // public methods
1322
1323 /**
1324 * Creates a new incomplete CompletableFuture.
1325 */
1326 public CompletableFuture() {
1327 }
1328
1329 /**
1330 * Asynchronously executes in the {@link
1331 * ForkJoinPool#commonPool()}, a task that completes the returned
1332 * CompletableFuture with the result of the given Supplier.
1333 *
1334 * @param supplier a function returning the value to be used
1335 * to complete the returned CompletableFuture
1336 * @return the CompletableFuture
1337 */
1338 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1339 if (supplier == null) throw new NullPointerException();
1340 CompletableFuture<U> f = new CompletableFuture<U>();
1341 ForkJoinPool.commonPool().
1342 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1343 return f;
1344 }
1345
1346 /**
1347 * Asynchronously executes using the given executor, a task that
1348 * completes the returned CompletableFuture with the result of the
1349 * given Supplier.
1350 *
1351 * @param supplier a function returning the value to be used
1352 * to complete the returned CompletableFuture
1353 * @param executor the executor to use for asynchronous execution
1354 * @return the CompletableFuture
1355 */
1356 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1357 Executor executor) {
1358 if (executor == null || supplier == null)
1359 throw new NullPointerException();
1360 CompletableFuture<U> f = new CompletableFuture<U>();
1361 executor.execute(new AsyncSupply<U>(supplier, f));
1362 return f;
1363 }
1364
1365 /**
1366 * Asynchronously executes in the {@link
1367 * ForkJoinPool#commonPool()} a task that runs the given action,
1368 * and then completes the returned CompletableFuture.
1369 *
1370 * @param runnable the action to run before completing the
1371 * returned CompletableFuture
1372 * @return the CompletableFuture
1373 */
1374 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1375 if (runnable == null) throw new NullPointerException();
1376 CompletableFuture<Void> f = new CompletableFuture<Void>();
1377 ForkJoinPool.commonPool().
1378 execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1379 return f;
1380 }
1381
1382 /**
1383 * Asynchronously executes using the given executor, a task that
1384 * runs the given action, and then completes the returned
1385 * CompletableFuture.
1386 *
1387 * @param runnable the action to run before completing the
1388 * returned CompletableFuture
1389 * @param executor the executor to use for asynchronous execution
1390 * @return the CompletableFuture
1391 */
1392 public static CompletableFuture<Void> runAsync(Runnable runnable,
1393 Executor executor) {
1394 if (executor == null || runnable == null)
1395 throw new NullPointerException();
1396 CompletableFuture<Void> f = new CompletableFuture<Void>();
1397 executor.execute(new AsyncRun(runnable, f));
1398 return f;
1399 }
1400
1401 /**
1402 * Returns {@code true} if completed in any fashion: normally,
1403 * exceptionally, or via cancellation.
1404 *
1405 * @return {@code true} if completed
1406 */
1407 public boolean isDone() {
1408 return result != null;
1409 }
1410
1411 /**
1412 * Waits if necessary for the computation to complete, and then
1413 * retrieves its result.
1414 *
1415 * @return the computed result
1416 * @throws CancellationException if the computation was cancelled
1417 * @throws ExecutionException if the computation threw an
1418 * exception
1419 * @throws InterruptedException if the current thread was interrupted
1420 * while waiting
1421 */
1422 public T get() throws InterruptedException, ExecutionException {
1423 Object r; Throwable ex, cause;
1424 if ((r = result) == null && (r = waitingGet(true)) == null)
1425 throw new InterruptedException();
1426 if (r instanceof AltResult) {
1427 if ((ex = ((AltResult)r).ex) != null) {
1428 if (ex instanceof CancellationException)
1429 throw (CancellationException)ex;
1430 if ((ex instanceof CompletionException) &&
1431 (cause = ex.getCause()) != null)
1432 ex = cause;
1433 throw new ExecutionException(ex);
1434 }
1435 return null;
1436 }
1437 @SuppressWarnings("unchecked") T tr = (T) r;
1438 return tr;
1439 }
1440
1441 /**
1442 * Waits if necessary for at most the given time for completion,
1443 * and then retrieves its result, if available.
1444 *
1445 * @param timeout the maximum time to wait
1446 * @param unit the time unit of the timeout argument
1447 * @return the computed result
1448 * @throws CancellationException if the computation was cancelled
1449 * @throws ExecutionException if the computation threw an
1450 * exception
1451 * @throws InterruptedException if the current thread was interrupted
1452 * while waiting
1453 * @throws TimeoutException if the wait timed out
1454 */
1455 public T get(long timeout, TimeUnit unit)
1456 throws InterruptedException, ExecutionException, TimeoutException {
1457 Object r; Throwable ex, cause;
1458 long nanos = unit.toNanos(timeout);
1459 if (Thread.interrupted())
1460 throw new InterruptedException();
1461 if ((r = result) == null)
1462 r = timedAwaitDone(nanos);
1463 if (r instanceof AltResult) {
1464 if ((ex = ((AltResult)r).ex) != null) {
1465 if (ex instanceof CancellationException)
1466 throw (CancellationException)ex;
1467 if ((ex instanceof CompletionException) &&
1468 (cause = ex.getCause()) != null)
1469 ex = cause;
1470 throw new ExecutionException(ex);
1471 }
1472 return null;
1473 }
1474 @SuppressWarnings("unchecked") T tr = (T) r;
1475 return tr;
1476 }
1477
1478 /**
1479 * Returns the result value when complete, or throws an
1480 * (unchecked) exception if completed exceptionally. To better
1481 * conform with the use of common functional forms, if a
1482 * computation involved in the completion of this
1483 * CompletableFuture threw an exception, this method throws an
1484 * (unchecked) {@link CompletionException} with the underlying
1485 * exception as its cause.
1486 *
1487 * @return the result value
1488 * @throws CancellationException if the computation was cancelled
1489 * @throws CompletionException if a completion computation threw
1490 * an exception
1491 */
1492 public T join() {
1493 Object r; Throwable ex;
1494 if ((r = result) == null)
1495 r = waitingGet(false);
1496 if (r instanceof AltResult) {
1497 if ((ex = ((AltResult)r).ex) != null) {
1498 if (ex instanceof CancellationException)
1499 throw (CancellationException)ex;
1500 if (ex instanceof CompletionException)
1501 throw (CompletionException)ex;
1502 throw new CompletionException(ex);
1503 }
1504 return null;
1505 }
1506 @SuppressWarnings("unchecked") T tr = (T) r;
1507 return tr;
1508 }
1509
1510 /**
1511 * Returns the result value (or throws any encountered exception)
1512 * if completed, else returns the given valueIfAbsent.
1513 *
1514 * @param valueIfAbsent the value to return if not completed
1515 * @return the result value, if completed, else the given valueIfAbsent
1516 * @throws CancellationException if the computation was cancelled
1517 * @throws CompletionException if a completion computation threw
1518 * an exception
1519 */
1520 public T getNow(T valueIfAbsent) {
1521 Object r; Throwable ex;
1522 if ((r = result) == null)
1523 return valueIfAbsent;
1524 if (r instanceof AltResult) {
1525 if ((ex = ((AltResult)r).ex) != null) {
1526 if (ex instanceof CancellationException)
1527 throw (CancellationException)ex;
1528 if (ex instanceof CompletionException)
1529 throw (CompletionException)ex;
1530 throw new CompletionException(ex);
1531 }
1532 return null;
1533 }
1534 @SuppressWarnings("unchecked") T tr = (T) r;
1535 return tr;
1536 }
1537
1538 /**
1539 * If not already completed, sets the value returned by {@link
1540 * #get()} and related methods to the given value.
1541 *
1542 * @param value the result value
1543 * @return {@code true} if this invocation caused this CompletableFuture
1544 * to transition to a completed state, else {@code false}
1545 */
1546 public boolean complete(T value) {
1547 boolean triggered = result == null &&
1548 UNSAFE.compareAndSwapObject(this, RESULT, null,
1549 value == null ? NIL : value);
1550 postComplete();
1551 return triggered;
1552 }
1553
1554 /**
1555 * If not already completed, causes invocations of {@link #get()}
1556 * and related methods to throw the given exception.
1557 *
1558 * @param ex the exception
1559 * @return {@code true} if this invocation caused this CompletableFuture
1560 * to transition to a completed state, else {@code false}
1561 */
1562 public boolean completeExceptionally(Throwable ex) {
1563 if (ex == null) throw new NullPointerException();
1564 boolean triggered = result == null &&
1565 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1566 postComplete();
1567 return triggered;
1568 }
1569
1570 /**
1571 * Creates and returns a CompletableFuture that is completed with
1572 * the result of the given function of this CompletableFuture.
1573 * If this CompletableFuture completes exceptionally,
1574 * then the returned CompletableFuture also does so,
1575 * with a CompletionException holding this exception as
1576 * its cause.
1577 *
1578 * @param fn the function to use to compute the value of
1579 * the returned CompletableFuture
1580 * @return the new CompletableFuture
1581 */
1582 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
1583 return doThenApply(fn, null);
1584 }
1585
1586 /**
1587 * Creates and returns a CompletableFuture that is asynchronously
1588 * completed using the {@link ForkJoinPool#commonPool()} with the
1589 * result of the given function of this CompletableFuture. If
1590 * this CompletableFuture completes exceptionally, then the
1591 * returned CompletableFuture also does so, with a
1592 * CompletionException holding this exception as its cause.
1593 *
1594 * @param fn the function to use to compute the value of
1595 * the returned CompletableFuture
1596 * @return the new CompletableFuture
1597 */
1598 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {
1599 return doThenApply(fn, ForkJoinPool.commonPool());
1600 }
1601
1602 /**
1603 * Creates and returns a CompletableFuture that is asynchronously
1604 * completed using the given executor with the result of the given
1605 * function of this CompletableFuture. If this CompletableFuture
1606 * completes exceptionally, then the returned CompletableFuture
1607 * also does so, with a CompletionException holding this exception as
1608 * its cause.
1609 *
1610 * @param fn the function to use to compute the value of
1611 * the returned CompletableFuture
1612 * @param executor the executor to use for asynchronous execution
1613 * @return the new CompletableFuture
1614 */
1615 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,
1616 Executor executor) {
1617 if (executor == null) throw new NullPointerException();
1618 return doThenApply(fn, executor);
1619 }
1620
1621 private <U> CompletableFuture<U> doThenApply(Function<? super T,? extends U> fn,
1622 Executor e) {
1623 if (fn == null) throw new NullPointerException();
1624 CompletableFuture<U> dst = new CompletableFuture<U>();
1625 ApplyCompletion<T,U> d = null;
1626 Object r;
1627 if ((r = result) == null) {
1628 CompletionNode p = new CompletionNode
1629 (d = new ApplyCompletion<T,U>(this, fn, dst, e));
1630 while ((r = result) == null) {
1631 if (UNSAFE.compareAndSwapObject
1632 (this, COMPLETIONS, p.next = completions, p))
1633 break;
1634 }
1635 }
1636 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1637 T t; Throwable ex;
1638 if (r instanceof AltResult) {
1639 ex = ((AltResult)r).ex;
1640 t = null;
1641 }
1642 else {
1643 ex = null;
1644 @SuppressWarnings("unchecked") T tr = (T) r;
1645 t = tr;
1646 }
1647 U u = null;
1648 if (ex == null) {
1649 try {
1650 if (e != null)
1651 e.execute(new AsyncApply<T,U>(t, fn, dst));
1652 else
1653 u = fn.apply(t);
1654 } catch (Throwable rex) {
1655 ex = rex;
1656 }
1657 }
1658 if (e == null || ex != null)
1659 dst.internalComplete(u, ex);
1660 }
1661 helpPostComplete();
1662 return dst;
1663 }
1664
1665 /**
1666 * Creates and returns a CompletableFuture that is completed after
1667 * performing the given action with this CompletableFuture's
1668 * result when it completes. If this CompletableFuture
1669 * completes exceptionally, then the returned CompletableFuture
1670 * also does so, with a CompletionException holding this exception as
1671 * its cause.
1672 *
1673 * @param block the action to perform before completing the
1674 * returned CompletableFuture
1675 * @return the new CompletableFuture
1676 */
1677 public CompletableFuture<Void> thenAccept(Consumer<? super T> block) {
1678 return doThenAccept(block, null);
1679 }
1680
1681 /**
1682 * Creates and returns a CompletableFuture that is asynchronously
1683 * completed using the {@link ForkJoinPool#commonPool()} with this
1684 * CompletableFuture's result when it completes. If this
1685 * CompletableFuture completes exceptionally, then the returned
1686 * CompletableFuture also does so, with a CompletionException holding
1687 * this exception as its cause.
1688 *
1689 * @param block the action to perform before completing the
1690 * returned CompletableFuture
1691 * @return the new CompletableFuture
1692 */
1693 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block) {
1694 return doThenAccept(block, ForkJoinPool.commonPool());
1695 }
1696
1697 /**
1698 * Creates and returns a CompletableFuture that is asynchronously
1699 * completed using the given executor with this
1700 * CompletableFuture's result when it completes. If this
1701 * CompletableFuture completes exceptionally, then the returned
1702 * CompletableFuture also does so, with a CompletionException holding
1703 * this exception as its cause.
1704 *
1705 * @param block the action to perform before completing the
1706 * returned CompletableFuture
1707 * @param executor the executor to use for asynchronous execution
1708 * @return the new CompletableFuture
1709 */
1710 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block,
1711 Executor executor) {
1712 if (executor == null) throw new NullPointerException();
1713 return doThenAccept(block, executor);
1714 }
1715
1716 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1717 Executor e) {
1718 if (fn == null) throw new NullPointerException();
1719 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1720 AcceptCompletion<T> d = null;
1721 Object r;
1722 if ((r = result) == null) {
1723 CompletionNode p = new CompletionNode
1724 (d = new AcceptCompletion<T>(this, fn, dst, e));
1725 while ((r = result) == null) {
1726 if (UNSAFE.compareAndSwapObject
1727 (this, COMPLETIONS, p.next = completions, p))
1728 break;
1729 }
1730 }
1731 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1732 T t; Throwable ex;
1733 if (r instanceof AltResult) {
1734 ex = ((AltResult)r).ex;
1735 t = null;
1736 }
1737 else {
1738 ex = null;
1739 @SuppressWarnings("unchecked") T tr = (T) r;
1740 t = tr;
1741 }
1742 if (ex == null) {
1743 try {
1744 if (e != null)
1745 e.execute(new AsyncAccept<T>(t, fn, dst));
1746 else
1747 fn.accept(t);
1748 } catch (Throwable rex) {
1749 ex = rex;
1750 }
1751 }
1752 if (e == null || ex != null)
1753 dst.internalComplete(null, ex);
1754 }
1755 helpPostComplete();
1756 return dst;
1757 }
1758
1759 /**
1760 * Creates and returns a CompletableFuture that is completed after
1761 * performing the given action when this CompletableFuture
1762 * completes. If this CompletableFuture completes exceptionally,
1763 * then the returned CompletableFuture also does so, with a
1764 * CompletionException holding this exception as its cause.
1765 *
1766 * @param action the action to perform before completing the
1767 * returned CompletableFuture
1768 * @return the new CompletableFuture
1769 */
1770 public CompletableFuture<Void> thenRun(Runnable action) {
1771 return doThenRun(action, null);
1772 }
1773
1774 /**
1775 * Creates and returns a CompletableFuture that is asynchronously
1776 * completed using the {@link ForkJoinPool#commonPool()} after
1777 * performing the given action when this CompletableFuture
1778 * completes. If this CompletableFuture completes exceptionally,
1779 * then the returned CompletableFuture also does so, with a
1780 * CompletionException holding this exception as its cause.
1781 *
1782 * @param action the action to perform before completing the
1783 * returned CompletableFuture
1784 * @return the new CompletableFuture
1785 */
1786 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1787 return doThenRun(action, ForkJoinPool.commonPool());
1788 }
1789
1790 /**
1791 * Creates and returns a CompletableFuture that is asynchronously
1792 * completed using the given executor after performing the given
1793 * action when this CompletableFuture completes. If this
1794 * CompletableFuture completes exceptionally, then the returned
1795 * CompletableFuture also does so, with a CompletionException holding
1796 * this exception as its cause.
1797 *
1798 * @param action the action to perform before completing the
1799 * returned CompletableFuture
1800 * @param executor the executor to use for asynchronous execution
1801 * @return the new CompletableFuture
1802 */
1803 public CompletableFuture<Void> thenRunAsync(Runnable action,
1804 Executor executor) {
1805 if (executor == null) throw new NullPointerException();
1806 return doThenRun(action, executor);
1807 }
1808
1809 private CompletableFuture<Void> doThenRun(Runnable action,
1810 Executor e) {
1811 if (action == null) throw new NullPointerException();
1812 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1813 RunCompletion<T> d = null;
1814 Object r;
1815 if ((r = result) == null) {
1816 CompletionNode p = new CompletionNode
1817 (d = new RunCompletion<T>(this, action, dst, e));
1818 while ((r = result) == null) {
1819 if (UNSAFE.compareAndSwapObject
1820 (this, COMPLETIONS, p.next = completions, p))
1821 break;
1822 }
1823 }
1824 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1825 Throwable ex;
1826 if (r instanceof AltResult)
1827 ex = ((AltResult)r).ex;
1828 else
1829 ex = null;
1830 if (ex == null) {
1831 try {
1832 if (e != null)
1833 e.execute(new AsyncRun(action, dst));
1834 else
1835 action.run();
1836 } catch (Throwable rex) {
1837 ex = rex;
1838 }
1839 }
1840 if (e == null || ex != null)
1841 dst.internalComplete(null, ex);
1842 }
1843 helpPostComplete();
1844 return dst;
1845 }
1846
1847 /**
1848 * Creates and returns a CompletableFuture that is completed with
1849 * the result of the given function of this and the other given
1850 * CompletableFuture's results when both complete. If this or
1851 * the other CompletableFuture complete exceptionally, then the
1852 * returned CompletableFuture also does so, with a
1853 * CompletionException holding the exception as its cause.
1854 *
1855 * @param other the other CompletableFuture
1856 * @param fn the function to use to compute the value of
1857 * the returned CompletableFuture
1858 * @return the new CompletableFuture
1859 */
1860 public <U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other,
1861 BiFunction<? super T,? super U,? extends V> fn) {
1862 return doThenBiApply(other, fn, null);
1863 }
1864
1865 /**
1866 * Creates and returns a CompletableFuture that is asynchronously
1867 * completed using the {@link ForkJoinPool#commonPool()} with
1868 * the result of the given function of this and the other given
1869 * CompletableFuture's results when both complete. If this or
1870 * the other CompletableFuture complete exceptionally, then the
1871 * returned CompletableFuture also does so, with a
1872 * CompletionException holding the exception as its cause.
1873 *
1874 * @param other the other CompletableFuture
1875 * @param fn the function to use to compute the value of
1876 * the returned CompletableFuture
1877 * @return the new CompletableFuture
1878 */
1879 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
1880 BiFunction<? super T,? super U,? extends V> fn) {
1881 return doThenBiApply(other, fn, ForkJoinPool.commonPool());
1882 }
1883
1884 /**
1885 * Creates and returns a CompletableFuture that is
1886 * asynchronously completed using the given executor with the
1887 * result of the given function of this and the other given
1888 * CompletableFuture's results when both complete. If this or
1889 * the other CompletableFuture complete exceptionally, then the
1890 * returned CompletableFuture also does so, with a
1891 * CompletionException holding the exception as its cause.
1892 *
1893 * @param other the other CompletableFuture
1894 * @param fn the function to use to compute the value of
1895 * the returned CompletableFuture
1896 * @param executor the executor to use for asynchronous execution
1897 * @return the new CompletableFuture
1898 */
1899
1900 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
1901 BiFunction<? super T,? super U,? extends V> fn,
1902 Executor executor) {
1903 if (executor == null) throw new NullPointerException();
1904 return doThenBiApply(other, fn, executor);
1905 }
1906
1907 private <U,V> CompletableFuture<V> doThenBiApply(CompletableFuture<? extends U> other,
1908 BiFunction<? super T,? super U,? extends V> fn,
1909 Executor e) {
1910 if (other == null || fn == null) throw new NullPointerException();
1911 CompletableFuture<V> dst = new CompletableFuture<V>();
1912 BiApplyCompletion<T,U,V> d = null;
1913 Object r, s = null;
1914 if ((r = result) == null || (s = other.result) == null) {
1915 d = new BiApplyCompletion<T,U,V>(this, other, fn, dst, e);
1916 CompletionNode q = null, p = new CompletionNode(d);
1917 while ((r == null && (r = result) == null) ||
1918 (s == null && (s = other.result) == null)) {
1919 if (q != null) {
1920 if (s != null ||
1921 UNSAFE.compareAndSwapObject
1922 (other, COMPLETIONS, q.next = other.completions, q))
1923 break;
1924 }
1925 else if (r != null ||
1926 UNSAFE.compareAndSwapObject
1927 (this, COMPLETIONS, p.next = completions, p)) {
1928 if (s != null)
1929 break;
1930 q = new CompletionNode(d);
1931 }
1932 }
1933 }
1934 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1935 T t; U u; Throwable ex;
1936 if (r instanceof AltResult) {
1937 ex = ((AltResult)r).ex;
1938 t = null;
1939 }
1940 else {
1941 ex = null;
1942 @SuppressWarnings("unchecked") T tr = (T) r;
1943 t = tr;
1944 }
1945 if (ex != null)
1946 u = null;
1947 else if (s instanceof AltResult) {
1948 ex = ((AltResult)s).ex;
1949 u = null;
1950 }
1951 else {
1952 @SuppressWarnings("unchecked") U us = (U) s;
1953 u = us;
1954 }
1955 V v = null;
1956 if (ex == null) {
1957 try {
1958 if (e != null)
1959 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
1960 else
1961 v = fn.apply(t, u);
1962 } catch (Throwable rex) {
1963 ex = rex;
1964 }
1965 }
1966 if (e == null || ex != null)
1967 dst.internalComplete(v, ex);
1968 }
1969 helpPostComplete();
1970 other.helpPostComplete();
1971 return dst;
1972 }
1973
1974 /**
1975 * Creates and returns a CompletableFuture that is completed with
1976 * the results of this and the other given CompletableFuture if
1977 * both complete. If this and/or the other CompletableFuture
1978 * complete exceptionally, then the returned CompletableFuture
1979 * also does so, with a CompletionException holding one of these
1980 * exceptions as its cause.
1981 *
1982 * @param other the other CompletableFuture
1983 * @param block the action to perform before completing the
1984 * returned CompletableFuture
1985 * @return the new CompletableFuture
1986 */
1987 public <U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other,
1988 BiConsumer<? super T, ? super U> block) {
1989 return doThenBiAccept(other, block, null);
1990 }
1991
1992 /**
1993 * Creates and returns a CompletableFuture that is completed
1994 * asynchronously using the {@link ForkJoinPool#commonPool()} with
1995 * the results of this and the other given CompletableFuture when
1996 * both complete. If this and/or the other CompletableFuture
1997 * complete exceptionally, then the returned CompletableFuture
1998 * also does so, with a CompletionException holding one of these
1999 * exceptions as its cause.
2000 *
2001 * @param other the other CompletableFuture
2002 * @param block the action to perform before completing the
2003 * returned CompletableFuture
2004 * @return the new CompletableFuture
2005 */
2006 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
2007 BiConsumer<? super T, ? super U> block) {
2008 return doThenBiAccept(other, block, ForkJoinPool.commonPool());
2009 }
2010
2011 /**
2012 * Creates and returns a CompletableFuture that is completed
2013 * asynchronously using the given executor with the results of
2014 * this and the other given CompletableFuture when both complete.
2015 * If this and/or the other CompletableFuture complete
2016 * exceptionally, then the returned CompletableFuture also does
2017 * so, with a CompletionException holding one of these exceptions as
2018 * its cause.
2019 *
2020 * @param other the other CompletableFuture
2021 * @param block the action to perform before completing the
2022 * returned CompletableFuture
2023 * @param executor the executor to use for asynchronous execution
2024 * @return the new CompletableFuture
2025 */
2026 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
2027 BiConsumer<? super T, ? super U> block,
2028 Executor executor) {
2029 if (executor == null) throw new NullPointerException();
2030 return doThenBiAccept(other, block, executor);
2031 }
2032
2033 private <U> CompletableFuture<Void> doThenBiAccept(CompletableFuture<? extends U> other,
2034 BiConsumer<? super T,? super U> fn,
2035 Executor e) {
2036 if (other == null || fn == null) throw new NullPointerException();
2037 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2038 BiAcceptCompletion<T,U> d = null;
2039 Object r, s = null;
2040 if ((r = result) == null || (s = other.result) == null) {
2041 d = new BiAcceptCompletion<T,U>(this, other, fn, dst, e);
2042 CompletionNode q = null, p = new CompletionNode(d);
2043 while ((r == null && (r = result) == null) ||
2044 (s == null && (s = other.result) == null)) {
2045 if (q != null) {
2046 if (s != null ||
2047 UNSAFE.compareAndSwapObject
2048 (other, COMPLETIONS, q.next = other.completions, q))
2049 break;
2050 }
2051 else if (r != null ||
2052 UNSAFE.compareAndSwapObject
2053 (this, COMPLETIONS, p.next = completions, p)) {
2054 if (s != null)
2055 break;
2056 q = new CompletionNode(d);
2057 }
2058 }
2059 }
2060 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2061 T t; U u; Throwable ex;
2062 if (r instanceof AltResult) {
2063 ex = ((AltResult)r).ex;
2064 t = null;
2065 }
2066 else {
2067 ex = null;
2068 @SuppressWarnings("unchecked") T tr = (T) r;
2069 t = tr;
2070 }
2071 if (ex != null)
2072 u = null;
2073 else if (s instanceof AltResult) {
2074 ex = ((AltResult)s).ex;
2075 u = null;
2076 }
2077 else {
2078 @SuppressWarnings("unchecked") U us = (U) s;
2079 u = us;
2080 }
2081 if (ex == null) {
2082 try {
2083 if (e != null)
2084 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
2085 else
2086 fn.accept(t, u);
2087 } catch (Throwable rex) {
2088 ex = rex;
2089 }
2090 }
2091 if (e == null || ex != null)
2092 dst.internalComplete(null, ex);
2093 }
2094 helpPostComplete();
2095 other.helpPostComplete();
2096 return dst;
2097 }
2098
2099 /**
2100 * Creates and returns a CompletableFuture that is completed
2101 * when this and the other given CompletableFuture both
2102 * complete. If this and/or the other CompletableFuture complete
2103 * exceptionally, then the returned CompletableFuture also does
2104 * so, with a CompletionException holding one of these exceptions as
2105 * its cause.
2106 *
2107 * @param other the other CompletableFuture
2108 * @param action the action to perform before completing the
2109 * returned CompletableFuture
2110 * @return the new CompletableFuture
2111 */
2112 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2113 Runnable action) {
2114 return doThenBiRun(other, action, null);
2115 }
2116
2117 /**
2118 * Creates and returns a CompletableFuture that is completed
2119 * asynchronously using the {@link ForkJoinPool#commonPool()}
2120 * when this and the other given CompletableFuture both
2121 * complete. If this and/or the other CompletableFuture complete
2122 * exceptionally, then the returned CompletableFuture also does
2123 * so, with a CompletionException holding one of these exceptions as
2124 * its cause.
2125 *
2126 * @param other the other CompletableFuture
2127 * @param action the action to perform before completing the
2128 * returned CompletableFuture
2129 * @return the new CompletableFuture
2130 */
2131 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2132 Runnable action) {
2133 return doThenBiRun(other, action, ForkJoinPool.commonPool());
2134 }
2135
2136 /**
2137 * Creates and returns a CompletableFuture that is completed
2138 * asynchronously using the given executor
2139 * when this and the other given CompletableFuture both
2140 * complete. If this and/or the other CompletableFuture complete
2141 * exceptionally, then the returned CompletableFuture also does
2142 * so, with a CompletionException holding one of these exceptions as
2143 * its cause.
2144 *
2145 * @param other the other CompletableFuture
2146 * @param action the action to perform before completing the
2147 * returned CompletableFuture
2148 * @param executor the executor to use for asynchronous execution
2149 * @return the new CompletableFuture
2150 */
2151 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2152 Runnable action,
2153 Executor executor) {
2154 if (executor == null) throw new NullPointerException();
2155 return doThenBiRun(other, action, executor);
2156 }
2157
2158 private CompletableFuture<Void> doThenBiRun(CompletableFuture<?> other,
2159 Runnable action,
2160 Executor e) {
2161 if (other == null || action == null) throw new NullPointerException();
2162 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2163 BiRunCompletion<T> d = null;
2164 Object r, s = null;
2165 if ((r = result) == null || (s = other.result) == null) {
2166 d = new BiRunCompletion<T>(this, other, action, dst, e);
2167 CompletionNode q = null, p = new CompletionNode(d);
2168 while ((r == null && (r = result) == null) ||
2169 (s == null && (s = other.result) == null)) {
2170 if (q != null) {
2171 if (s != null ||
2172 UNSAFE.compareAndSwapObject
2173 (other, COMPLETIONS, q.next = other.completions, q))
2174 break;
2175 }
2176 else if (r != null ||
2177 UNSAFE.compareAndSwapObject
2178 (this, COMPLETIONS, p.next = completions, p)) {
2179 if (s != null)
2180 break;
2181 q = new CompletionNode(d);
2182 }
2183 }
2184 }
2185 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2186 Throwable ex;
2187 if (r instanceof AltResult)
2188 ex = ((AltResult)r).ex;
2189 else
2190 ex = null;
2191 if (ex == null && (s instanceof AltResult))
2192 ex = ((AltResult)s).ex;
2193 if (ex == null) {
2194 try {
2195 if (e != null)
2196 e.execute(new AsyncRun(action, dst));
2197 else
2198 action.run();
2199 } catch (Throwable rex) {
2200 ex = rex;
2201 }
2202 }
2203 if (e == null || ex != null)
2204 dst.internalComplete(null, ex);
2205 }
2206 helpPostComplete();
2207 other.helpPostComplete();
2208 return dst;
2209 }
2210
2211 /**
2212 * Creates and returns a CompletableFuture that is completed with
2213 * the result of the given function of either this or the other
2214 * given CompletableFuture's results when either complete. If
2215 * this and/or the other CompletableFuture complete exceptionally,
2216 * then the returned CompletableFuture may also do so, with a
2217 * CompletionException holding one of these exceptions as its cause.
2218 * No guarantees are made about which result or exception is used
2219 * in the returned CompletableFuture.
2220 *
2221 * @param other the other CompletableFuture
2222 * @param fn the function to use to compute the value of
2223 * the returned CompletableFuture
2224 * @return the new CompletableFuture
2225 */
2226 public <U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other,
2227 Function<? super T, U> fn) {
2228 return doOrApply(other, fn, null);
2229 }
2230
2231 /**
2232 * Creates and returns a CompletableFuture that is completed
2233 * asynchronously using the {@link ForkJoinPool#commonPool()} with
2234 * the result of the given function of either this or the other
2235 * given CompletableFuture's results when either complete. If
2236 * this and/or the other CompletableFuture complete exceptionally,
2237 * then the returned CompletableFuture may also do so, with a
2238 * CompletionException holding one of these exceptions as its cause.
2239 * No guarantees are made about which result or exception is used
2240 * in the returned CompletableFuture.
2241 *
2242 * @param other the other CompletableFuture
2243 * @param fn the function to use to compute the value of
2244 * the returned CompletableFuture
2245 * @return the new CompletableFuture
2246 */
2247 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
2248 Function<? super T, U> fn) {
2249 return doOrApply(other, fn, ForkJoinPool.commonPool());
2250 }
2251
2252 /**
2253 * Creates and returns a CompletableFuture that is completed
2254 * asynchronously using the given executor with the result of the
2255 * given function of either this or the other given
2256 * CompletableFuture's results when either complete. If this
2257 * and/or the other CompletableFuture complete exceptionally, then
2258 * the returned CompletableFuture may also do so, with a
2259 * CompletionException holding one of these exceptions as its cause.
2260 * No guarantees are made about which result or exception is used
2261 * in the returned CompletableFuture.
2262 *
2263 * @param other the other CompletableFuture
2264 * @param fn the function to use to compute the value of
2265 * the returned CompletableFuture
2266 * @param executor the executor to use for asynchronous execution
2267 * @return the new CompletableFuture
2268 */
2269 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
2270 Function<? super T, U> fn,
2271 Executor executor) {
2272 if (executor == null) throw new NullPointerException();
2273 return doOrApply(other, fn, executor);
2274 }
2275
2276 private <U> CompletableFuture<U> doOrApply(CompletableFuture<? extends T> other,
2277 Function<? super T, U> fn,
2278 Executor e) {
2279 if (other == null || fn == null) throw new NullPointerException();
2280 CompletableFuture<U> dst = new CompletableFuture<U>();
2281 OrApplyCompletion<T,U> d = null;
2282 Object r;
2283 if ((r = result) == null && (r = other.result) == null) {
2284 d = new OrApplyCompletion<T,U>(this, other, fn, dst, e);
2285 CompletionNode q = null, p = new CompletionNode(d);
2286 while ((r = result) == null && (r = other.result) == null) {
2287 if (q != null) {
2288 if (UNSAFE.compareAndSwapObject
2289 (other, COMPLETIONS, q.next = other.completions, q))
2290 break;
2291 }
2292 else if (UNSAFE.compareAndSwapObject
2293 (this, COMPLETIONS, p.next = completions, p))
2294 q = new CompletionNode(d);
2295 }
2296 }
2297 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2298 T t; Throwable ex;
2299 if (r instanceof AltResult) {
2300 ex = ((AltResult)r).ex;
2301 t = null;
2302 }
2303 else {
2304 ex = null;
2305 @SuppressWarnings("unchecked") T tr = (T) r;
2306 t = tr;
2307 }
2308 U u = null;
2309 if (ex == null) {
2310 try {
2311 if (e != null)
2312 e.execute(new AsyncApply<T,U>(t, fn, dst));
2313 else
2314 u = fn.apply(t);
2315 } catch (Throwable rex) {
2316 ex = rex;
2317 }
2318 }
2319 if (e == null || ex != null)
2320 dst.internalComplete(u, ex);
2321 }
2322 helpPostComplete();
2323 other.helpPostComplete();
2324 return dst;
2325 }
2326
2327 /**
2328 * Creates and returns a CompletableFuture that is completed after
2329 * performing the given action with the result of either this or the
2330 * other given CompletableFuture's result, when either complete.
2331 * If this and/or the other CompletableFuture complete
2332 * exceptionally, then the returned CompletableFuture may also do
2333 * so, with a CompletionException holding one of these exceptions as
2334 * its cause. No guarantees are made about which exception is
2335 * used in the returned CompletableFuture.
2336 *
2337 * @param other the other CompletableFuture
2338 * @param block the action to perform before completing the
2339 * returned CompletableFuture
2340 * @return the new CompletableFuture
2341 */
2342 public CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other,
2343 Consumer<? super T> block) {
2344 return doOrAccept(other, block, null);
2345 }
2346
2347 /**
2348 * Creates and returns a CompletableFuture that is completed
2349 * asynchronously using the {@link ForkJoinPool#commonPool()},
2350 * performing the given action with the result of either this or
2351 * the other given CompletableFuture's result, when either
2352 * complete. If this and/or the other CompletableFuture complete
2353 * exceptionally, then the returned CompletableFuture may also do
2354 * so, with a CompletionException holding one of these exceptions as
2355 * its cause. No guarantees are made about which exception is
2356 * used in the returned CompletableFuture.
2357 *
2358 * @param other the other CompletableFuture
2359 * @param block the action to perform before completing the
2360 * returned CompletableFuture
2361 * @return the new CompletableFuture
2362 */
2363 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
2364 Consumer<? super T> block) {
2365 return doOrAccept(other, block, ForkJoinPool.commonPool());
2366 }
2367
2368 /**
2369 * Creates and returns a CompletableFuture that is completed
2370 * asynchronously using the given executor,
2371 * performing the given action with the result of either this or
2372 * the other given CompletableFuture's result, when either
2373 * complete. If this and/or the other CompletableFuture complete
2374 * exceptionally, then the returned CompletableFuture may also do
2375 * so, with a CompletionException holding one of these exceptions as
2376 * its cause. No guarantees are made about which exception is
2377 * used in the returned CompletableFuture.
2378 *
2379 * @param other the other CompletableFuture
2380 * @param block the action to perform before completing the
2381 * returned CompletableFuture
2382 * @param executor the executor to use for asynchronous execution
2383 * @return the new CompletableFuture
2384 */
2385 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
2386 Consumer<? super T> block,
2387 Executor executor) {
2388 if (executor == null) throw new NullPointerException();
2389 return doOrAccept(other, block, executor);
2390 }
2391
2392 private CompletableFuture<Void> doOrAccept(CompletableFuture<? extends T> other,
2393 Consumer<? super T> fn,
2394 Executor e) {
2395 if (other == null || fn == null) throw new NullPointerException();
2396 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2397 OrAcceptCompletion<T> d = null;
2398 Object r;
2399 if ((r = result) == null && (r = other.result) == null) {
2400 d = new OrAcceptCompletion<T>(this, other, fn, dst, e);
2401 CompletionNode q = null, p = new CompletionNode(d);
2402 while ((r = result) == null && (r = other.result) == null) {
2403 if (q != null) {
2404 if (UNSAFE.compareAndSwapObject
2405 (other, COMPLETIONS, q.next = other.completions, q))
2406 break;
2407 }
2408 else if (UNSAFE.compareAndSwapObject
2409 (this, COMPLETIONS, p.next = completions, p))
2410 q = new CompletionNode(d);
2411 }
2412 }
2413 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2414 T t; Throwable ex;
2415 if (r instanceof AltResult) {
2416 ex = ((AltResult)r).ex;
2417 t = null;
2418 }
2419 else {
2420 ex = null;
2421 @SuppressWarnings("unchecked") T tr = (T) r;
2422 t = tr;
2423 }
2424 if (ex == null) {
2425 try {
2426 if (e != null)
2427 e.execute(new AsyncAccept<T>(t, fn, dst));
2428 else
2429 fn.accept(t);
2430 } catch (Throwable rex) {
2431 ex = rex;
2432 }
2433 }
2434 if (e == null || ex != null)
2435 dst.internalComplete(null, ex);
2436 }
2437 helpPostComplete();
2438 other.helpPostComplete();
2439 return dst;
2440 }
2441
2442 /**
2443 * Creates and returns a CompletableFuture that is completed
2444 * after this or the other given CompletableFuture complete. If
2445 * this and/or the other CompletableFuture complete exceptionally,
2446 * then the returned CompletableFuture may also do so, with a
2447 * CompletionException holding one of these exceptions as its cause.
2448 * No guarantees are made about which exception is used in the
2449 * returned CompletableFuture.
2450 *
2451 * @param other the other CompletableFuture
2452 * @param action the action to perform before completing the
2453 * returned CompletableFuture
2454 * @return the new CompletableFuture
2455 */
2456 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2457 Runnable action) {
2458 return doOrRun(other, action, null);
2459 }
2460
2461 /**
2462 * Creates and returns a CompletableFuture that is completed
2463 * asynchronously using the {@link ForkJoinPool#commonPool()}
2464 * after this or the other given CompletableFuture complete. If
2465 * this and/or the other CompletableFuture complete exceptionally,
2466 * then the returned CompletableFuture may also do so, with a
2467 * CompletionException holding one of these exceptions as its cause.
2468 * No guarantees are made about which exception is used in the
2469 * returned CompletableFuture.
2470 *
2471 * @param other the other CompletableFuture
2472 * @param action the action to perform before completing the
2473 * returned CompletableFuture
2474 * @return the new CompletableFuture
2475 */
2476 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
2477 Runnable action) {
2478 return doOrRun(other, action, ForkJoinPool.commonPool());
2479 }
2480
2481 /**
2482 * Creates and returns a CompletableFuture that is completed
2483 * asynchronously using the given executor after this or the other
2484 * given CompletableFuture complete. If this and/or the other
2485 * CompletableFuture complete exceptionally, then the returned
2486 * CompletableFuture may also do so, with a CompletionException
2487 * holding one of these exceptions as its cause. No guarantees are
2488 * made about which exception is used in the returned
2489 * CompletableFuture.
2490 *
2491 * @param other the other CompletableFuture
2492 * @param action the action to perform before completing the
2493 * returned CompletableFuture
2494 * @param executor the executor to use for asynchronous execution
2495 * @return the new CompletableFuture
2496 */
2497 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
2498 Runnable action,
2499 Executor executor) {
2500 if (executor == null) throw new NullPointerException();
2501 return doOrRun(other, action, executor);
2502 }
2503
2504 private CompletableFuture<Void> doOrRun(CompletableFuture<?> other,
2505 Runnable action,
2506 Executor e) {
2507 if (other == null || action == null) throw new NullPointerException();
2508 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2509 OrRunCompletion<T> d = null;
2510 Object r;
2511 if ((r = result) == null && (r = other.result) == null) {
2512 d = new OrRunCompletion<T>(this, other, action, dst, e);
2513 CompletionNode q = null, p = new CompletionNode(d);
2514 while ((r = result) == null && (r = other.result) == null) {
2515 if (q != null) {
2516 if (UNSAFE.compareAndSwapObject
2517 (other, COMPLETIONS, q.next = other.completions, q))
2518 break;
2519 }
2520 else if (UNSAFE.compareAndSwapObject
2521 (this, COMPLETIONS, p.next = completions, p))
2522 q = new CompletionNode(d);
2523 }
2524 }
2525 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2526 Throwable ex;
2527 if (r instanceof AltResult)
2528 ex = ((AltResult)r).ex;
2529 else
2530 ex = null;
2531 if (ex == null) {
2532 try {
2533 if (e != null)
2534 e.execute(new AsyncRun(action, dst));
2535 else
2536 action.run();
2537 } catch (Throwable rex) {
2538 ex = rex;
2539 }
2540 }
2541 if (e == null || ex != null)
2542 dst.internalComplete(null, ex);
2543 }
2544 helpPostComplete();
2545 other.helpPostComplete();
2546 return dst;
2547 }
2548
2549 /**
2550 * Returns a CompletableFuture (or an equivalent one) produced by
2551 * the given function of the result of this CompletableFuture when
2552 * completed. If this CompletableFuture completes exceptionally,
2553 * then the returned CompletableFuture also does so, with a
2554 * CompletionException holding this exception as its cause.
2555 *
2556 * @param fn the function returning a new CompletableFuture
2557 * @return the CompletableFuture, that {@code isDone()} upon
2558 * return if completed by the given function, or an exception
2559 * occurs
2560 */
2561 public <U> CompletableFuture<U> thenCompose(Function<? super T,
2562 CompletableFuture<U>> fn) {
2563 return doCompose(fn, null);
2564 }
2565
2566 /**
2567 * Returns a CompletableFuture (or an equivalent one) produced
2568 * asynchronously using the {@link ForkJoinPool#commonPool()} by
2569 * the given function of the result of this CompletableFuture when
2570 * completed. If this CompletableFuture completes exceptionally,
2571 * then the returned CompletableFuture also does so, with a
2572 * CompletionException holding this exception as its cause.
2573 *
2574 * @param fn the function returning a new CompletableFuture
2575 * @return the CompletableFuture, that {@code isDone()} upon
2576 * return if completed by the given function, or an exception
2577 * occurs
2578 */
2579 public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,
2580 CompletableFuture<U>> fn) {
2581 return doCompose(fn, ForkJoinPool.commonPool());
2582 }
2583
2584 /**
2585 * Returns a CompletableFuture (or an equivalent one) produced
2586 * asynchronously using the given executor by the given function
2587 * of the result of this CompletableFuture when completed.
2588 * If this CompletableFuture completes exceptionally, then the
2589 * returned CompletableFuture also does so, with a
2590 * CompletionException holding this exception as its cause.
2591 *
2592 * @param fn the function returning a new CompletableFuture
2593 * @param executor the executor to use for asynchronous execution
2594 * @return the CompletableFuture, that {@code isDone()} upon
2595 * return if completed by the given function, or an exception
2596 * occurs
2597 */
2598 public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,
2599 CompletableFuture<U>> fn,
2600 Executor executor) {
2601 if (executor == null) throw new NullPointerException();
2602 return doCompose(fn, executor);
2603 }
2604
2605 private <U> CompletableFuture<U> doCompose(Function<? super T,
2606 CompletableFuture<U>> fn,
2607 Executor e) {
2608 if (fn == null) throw new NullPointerException();
2609 CompletableFuture<U> dst = null;
2610 ComposeCompletion<T,U> d = null;
2611 Object r;
2612 if ((r = result) == null) {
2613 dst = new CompletableFuture<U>();
2614 CompletionNode p = new CompletionNode
2615 (d = new ComposeCompletion<T,U>(this, fn, dst, e));
2616 while ((r = result) == null) {
2617 if (UNSAFE.compareAndSwapObject
2618 (this, COMPLETIONS, p.next = completions, p))
2619 break;
2620 }
2621 }
2622 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2623 T t; Throwable ex;
2624 if (r instanceof AltResult) {
2625 ex = ((AltResult)r).ex;
2626 t = null;
2627 }
2628 else {
2629 ex = null;
2630 @SuppressWarnings("unchecked") T tr = (T) r;
2631 t = tr;
2632 }
2633 if (ex == null) {
2634 if (e != null) {
2635 if (dst == null)
2636 dst = new CompletableFuture<U>();
2637 e.execute(new AsyncCompose<T,U>(t, fn, dst));
2638 }
2639 else {
2640 try {
2641 dst = fn.apply(t);
2642 } catch (Throwable rex) {
2643 ex = rex;
2644 }
2645 if (dst == null) {
2646 dst = new CompletableFuture<U>();
2647 if (ex == null)
2648 ex = new NullPointerException();
2649 }
2650 }
2651 }
2652 if (e == null && ex != null)
2653 dst.internalComplete(null, ex);
2654 }
2655 helpPostComplete();
2656 dst.helpPostComplete();
2657 return dst;
2658 }
2659
2660 /**
2661 * Creates and returns a CompletableFuture that is completed with
2662 * the result of the given function of the exception triggering
2663 * this CompletableFuture's completion when it completes
2664 * exceptionally; Otherwise, if this CompletableFuture completes
2665 * normally, then the returned CompletableFuture also completes
2666 * normally with the same value.
2667 *
2668 * @param fn the function to use to compute the value of the
2669 * returned CompletableFuture if this CompletableFuture completed
2670 * exceptionally
2671 * @return the new CompletableFuture
2672 */
2673 public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {
2674 if (fn == null) throw new NullPointerException();
2675 CompletableFuture<T> dst = new CompletableFuture<T>();
2676 ExceptionCompletion<T> d = null;
2677 Object r;
2678 if ((r = result) == null) {
2679 CompletionNode p =
2680 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2681 while ((r = result) == null) {
2682 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2683 p.next = completions, p))
2684 break;
2685 }
2686 }
2687 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2688 T t = null; Throwable ex, dx = null;
2689 if (r instanceof AltResult) {
2690 if ((ex = ((AltResult)r).ex) != null) {
2691 try {
2692 t = fn.apply(ex);
2693 } catch (Throwable rex) {
2694 dx = rex;
2695 }
2696 }
2697 }
2698 else {
2699 @SuppressWarnings("unchecked") T tr = (T) r;
2700 t = tr;
2701 }
2702 dst.internalComplete(t, dx);
2703 }
2704 helpPostComplete();
2705 return dst;
2706 }
2707
2708 /**
2709 * Creates and returns a CompletableFuture that is completed with
2710 * the result of the given function of the result and exception of
2711 * this CompletableFuture's completion when it completes. The
2712 * given function is invoked with the result (or {@code null} if
2713 * none) and the exception (or {@code null} if none) of this
2714 * CompletableFuture when complete.
2715 *
2716 * @param fn the function to use to compute the value of the
2717 * returned CompletableFuture
2718
2719 * @return the new CompletableFuture
2720 */
2721 public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {
2722 if (fn == null) throw new NullPointerException();
2723 CompletableFuture<U> dst = new CompletableFuture<U>();
2724 HandleCompletion<T,U> d = null;
2725 Object r;
2726 if ((r = result) == null) {
2727 CompletionNode p =
2728 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2729 while ((r = result) == null) {
2730 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2731 p.next = completions, p))
2732 break;
2733 }
2734 }
2735 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2736 T t; Throwable ex;
2737 if (r instanceof AltResult) {
2738 ex = ((AltResult)r).ex;
2739 t = null;
2740 }
2741 else {
2742 ex = null;
2743 @SuppressWarnings("unchecked") T tr = (T) r;
2744 t = tr;
2745 }
2746 U u; Throwable dx;
2747 try {
2748 u = fn.apply(t, ex);
2749 dx = null;
2750 } catch (Throwable rex) {
2751 dx = rex;
2752 u = null;
2753 }
2754 dst.internalComplete(u, dx);
2755 }
2756 helpPostComplete();
2757 return dst;
2758 }
2759
2760
2761 /* ------------- Arbitrary-arity constructions -------------- */
2762
2763 /*
2764 * The basic plan of attack is to recursively form binary
2765 * completion trees of elements. This can be overkill for small
2766 * sets, but scales nicely. The And/All vs Or/Any forms use the
2767 * same idea, but details differ.
2768 */
2769
2770 /**
2771 * Returns a new CompletableFuture that is completed when all of
2772 * the given CompletableFutures complete. If any of the component
2773 * CompletableFuture complete exceptionally, then so does the
2774 * returned CompletableFuture. Otherwise, the results, if any, of
2775 * the component CompletableFutures are not reflected in the
2776 * returned CompletableFuture, but may be obtained by inspecting
2777 * them individually. If the number of components is zero, returns
2778 * a completed CompletableFuture.
2779 *
2780 * <p>Among the applications of this method is to await completion
2781 * of a set of independent CompletableFutures before continuing a
2782 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2783 * c3).join();}.
2784 *
2785 * @param cfs the CompletableFutures
2786 * @return a CompletableFuture that is complete when all of the
2787 * given CompletableFutures complete
2788 * @throws NullPointerException if the array or any of its elements are
2789 * {@code null}
2790 */
2791 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2792 int len = cfs.length; // Directly handle empty and singleton cases
2793 if (len > 1)
2794 return allTree(cfs, 0, len - 1);
2795 else {
2796 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2797 CompletableFuture<?> f;
2798 if (len == 0)
2799 dst.result = NIL;
2800 else if ((f = cfs[0]) == null)
2801 throw new NullPointerException();
2802 else {
2803 ThenCopy d = null;
2804 CompletionNode p = null;
2805 Object r;
2806 while ((r = f.result) == null) {
2807 if (d == null)
2808 d = new ThenCopy(f, dst);
2809 else if (p == null)
2810 p = new CompletionNode(d);
2811 else if (UNSAFE.compareAndSwapObject
2812 (f, COMPLETIONS, p.next = f.completions, p))
2813 break;
2814 }
2815 if (r != null && (d == null || d.compareAndSet(0, 1)))
2816 dst.internalComplete(null, (r instanceof AltResult) ?
2817 ((AltResult)r).ex : null);
2818 f.helpPostComplete();
2819 }
2820 return dst;
2821 }
2822 }
2823
2824 /**
2825 * Recursively constructs an And'ed tree of CompletableFutures.
2826 * Called only when array known to have at least two elements.
2827 */
2828 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2829 int lo, int hi) {
2830 CompletableFuture<?> fst, snd;
2831 int mid = (lo + hi) >>> 1;
2832 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2833 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2834 throw new NullPointerException();
2835 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2836 AndCompletion d = null;
2837 CompletionNode p = null, q = null;
2838 Object r = null, s = null;
2839 while ((r = fst.result) == null || (s = snd.result) == null) {
2840 if (d == null)
2841 d = new AndCompletion(fst, snd, dst);
2842 else if (p == null)
2843 p = new CompletionNode(d);
2844 else if (q == null) {
2845 if (UNSAFE.compareAndSwapObject
2846 (fst, COMPLETIONS, p.next = fst.completions, p))
2847 q = new CompletionNode(d);
2848 }
2849 else if (UNSAFE.compareAndSwapObject
2850 (snd, COMPLETIONS, q.next = snd.completions, q))
2851 break;
2852 }
2853 if ((r != null || (r = fst.result) != null) &&
2854 (s != null || (s = snd.result) != null) &&
2855 (d == null || d.compareAndSet(0, 1))) {
2856 Throwable ex;
2857 if (r instanceof AltResult)
2858 ex = ((AltResult)r).ex;
2859 else
2860 ex = null;
2861 if (ex == null && (s instanceof AltResult))
2862 ex = ((AltResult)s).ex;
2863 dst.internalComplete(null, ex);
2864 }
2865 fst.helpPostComplete();
2866 snd.helpPostComplete();
2867 return dst;
2868 }
2869
2870 /**
2871 * Returns a new CompletableFuture that is completed when any of
2872 * the component CompletableFutures complete; with the same result if
2873 * it completed normally, otherwise exceptionally. If the number
2874 * of components is zero, returns a completed CompletableFuture.
2875 *
2876 * @param cfs the CompletableFutures
2877 * @return a CompletableFuture that is complete when any of the
2878 * given CompletableFutures complete
2879 * @throws NullPointerException if the array or any of its elements are
2880 * {@code null}
2881 */
2882 public static CompletableFuture<?> anyOf(CompletableFuture<?>... cfs) {
2883 int len = cfs.length; // Same idea as allOf
2884 if (len > 1)
2885 return anyTree(cfs, 0, len - 1);
2886 else {
2887 CompletableFuture<?> dst = new CompletableFuture<Object>();
2888 CompletableFuture<?> f;
2889 if (len == 0)
2890 dst.result = NIL;
2891 else if ((f = cfs[0]) == null)
2892 throw new NullPointerException();
2893 else {
2894 ThenCopy d = null;
2895 CompletionNode p = null;
2896 Object r;
2897 while ((r = f.result) == null) {
2898 if (d == null)
2899 d = new ThenCopy(f, dst);
2900 else if (p == null)
2901 p = new CompletionNode(d);
2902 else if (UNSAFE.compareAndSwapObject
2903 (f, COMPLETIONS, p.next = f.completions, p))
2904 break;
2905 }
2906 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2907 Throwable ex; Object t;
2908 if (r instanceof AltResult) {
2909 ex = ((AltResult)r).ex;
2910 t = null;
2911 }
2912 else {
2913 ex = null;
2914 t = r;
2915 }
2916 dst.internalComplete(t, ex);
2917 }
2918 f.helpPostComplete();
2919 }
2920 return dst;
2921 }
2922 }
2923
2924 /**
2925 * Recursively constructs an Or'ed tree of CompletableFutures
2926 */
2927 private static CompletableFuture<?> anyTree(CompletableFuture<?>[] cfs,
2928 int lo, int hi) {
2929 CompletableFuture<?> fst, snd;
2930 int mid = (lo + hi) >>> 1;
2931 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
2932 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
2933 throw new NullPointerException();
2934 CompletableFuture<?> dst = new CompletableFuture<Object>();
2935 OrCompletion d = null;
2936 CompletionNode p = null, q = null;
2937 Object r;
2938 while ((r = fst.result) == null && (r = snd.result) == null) {
2939 if (d == null)
2940 d = new OrCompletion(fst, snd, dst);
2941 else if (p == null)
2942 p = new CompletionNode(d);
2943 else if (q == null) {
2944 if (UNSAFE.compareAndSwapObject
2945 (fst, COMPLETIONS, p.next = fst.completions, p))
2946 q = new CompletionNode(d);
2947 }
2948 else if (UNSAFE.compareAndSwapObject
2949 (snd, COMPLETIONS, q.next = snd.completions, q))
2950 break;
2951 }
2952 if ((r != null || (r = fst.result) != null ||
2953 (r = snd.result) != null) &&
2954 (d == null || d.compareAndSet(0, 1))) {
2955 Throwable ex; Object t;
2956 if (r instanceof AltResult) {
2957 ex = ((AltResult)r).ex;
2958 t = null;
2959 }
2960 else {
2961 ex = null;
2962 t = r;
2963 }
2964 dst.internalComplete(t, ex);
2965 }
2966 fst.helpPostComplete();
2967 snd.helpPostComplete();
2968 return dst;
2969 }
2970
2971
2972 /* ------------- Control and status methods -------------- */
2973
2974 /**
2975 * If not already completed, completes this CompletableFuture with
2976 * a {@link CancellationException}. Dependent CompletableFutures
2977 * that have not already completed will also complete
2978 * exceptionally, with a {@link CompletionException} caused by
2979 * this {@code CancellationException}.
2980 *
2981 * @param mayInterruptIfRunning this value has no effect in this
2982 * implementation because interrupts are not used to control
2983 * processing.
2984 *
2985 * @return {@code true} if this task is now cancelled
2986 */
2987 public boolean cancel(boolean mayInterruptIfRunning) {
2988 Object r;
2989 while ((r = result) == null) {
2990 r = new AltResult(new CancellationException());
2991 if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
2992 postComplete();
2993 return true;
2994 }
2995 }
2996 return ((r instanceof AltResult) &&
2997 (((AltResult)r).ex instanceof CancellationException));
2998 }
2999
3000 /**
3001 * Returns {@code true} if this CompletableFuture was cancelled
3002 * before it completed normally.
3003 *
3004 * @return {@code true} if this CompletableFuture was cancelled
3005 * before it completed normally
3006 */
3007 public boolean isCancelled() {
3008 Object r;
3009 return ((r = result) != null &&
3010 (r instanceof AltResult) &&
3011 (((AltResult)r).ex instanceof CancellationException));
3012 }
3013
3014 /**
3015 * Forcibly sets or resets the value subsequently returned by
3016 * method get() and related methods, whether or not already
3017 * completed. This method is designed for use only in error
3018 * recovery actions, and even in such situations may result in
3019 * ongoing dependent completions using established versus
3020 * overwritten outcomes.
3021 *
3022 * @param value the completion value
3023 */
3024 public void obtrudeValue(T value) {
3025 result = (value == null) ? NIL : value;
3026 postComplete();
3027 }
3028
3029 /**
3030 * Forcibly causes subsequent invocations of method get() and
3031 * related methods to throw the given exception, whether or not
3032 * already completed. This method is designed for use only in
3033 * recovery actions, and even in such situations may result in
3034 * ongoing dependent completions using established versus
3035 * overwritten outcomes.
3036 *
3037 * @param ex the exception
3038 */
3039 public void obtrudeException(Throwable ex) {
3040 if (ex == null) throw new NullPointerException();
3041 result = new AltResult(ex);
3042 postComplete();
3043 }
3044
3045 /**
3046 * Returns the estimated number of CompletableFutures whose
3047 * completions are awaiting completion of this CompletableFuture.
3048 * This method is designed for use in monitoring system state, not
3049 * for synchronization control.
3050 *
3051 * @return the number of dependent CompletableFutures
3052 */
3053 public int getNumberOfDependents() {
3054 int count = 0;
3055 for (CompletionNode p = completions; p != null; p = p.next)
3056 ++count;
3057 return count;
3058 }
3059
3060 /**
3061 * Returns a string identifying this CompletableFuture, as well as
3062 * its completion state. The state, in brackets, contains the
3063 * String {@code "Completed Normally"} or the String {@code
3064 * "Completed Exceptionally"}, or the String {@code "Not
3065 * completed"} followed by the number of CompletableFutures
3066 * dependent upon its completion, if any.
3067 *
3068 * @return a string identifying this CompletableFuture, as well as its state
3069 */
3070 public String toString() {
3071 Object r = result;
3072 int count;
3073 return super.toString() +
3074 ((r == null) ?
3075 (((count = getNumberOfDependents()) == 0) ?
3076 "[Not completed]" :
3077 "[Not completed, " + count + " dependents]") :
3078 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
3079 "[Completed exceptionally]" :
3080 "[Completed normally]"));
3081 }
3082
3083 // Unsafe mechanics
3084 private static final sun.misc.Unsafe UNSAFE;
3085 private static final long RESULT;
3086 private static final long WAITERS;
3087 private static final long COMPLETIONS;
3088 static {
3089 try {
3090 UNSAFE = sun.misc.Unsafe.getUnsafe();
3091 Class<?> k = CompletableFuture.class;
3092 RESULT = UNSAFE.objectFieldOffset
3093 (k.getDeclaredField("result"));
3094 WAITERS = UNSAFE.objectFieldOffset
3095 (k.getDeclaredField("waiters"));
3096 COMPLETIONS = UNSAFE.objectFieldOffset
3097 (k.getDeclaredField("completions"));
3098 } catch (Exception e) {
3099 throw new Error(e);
3100 }
3101 }
3102 }