ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.54
Committed: Sat Feb 16 20:50:29 2013 UTC (11 years, 3 months ago) by jsr166
Branch: MAIN
Changes since 1.53: +0 -1 lines
Log Message:
javadoc comment correctness

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.locks.LockSupport;
24
25 /**
26 * A {@link Future} that may be explicitly completed (setting its
27 * value and status), and may include dependent functions and actions
28 * that trigger upon its completion.
29 *
30 * <p>When two or more threads attempt to
31 * {@link #complete complete},
32 * {@link #completeExceptionally completeExceptionally}, or
33 * {@link #cancel cancel}
34 * a CompletableFuture, only one of them succeeds.
35 *
36 * <p>Methods are available for adding dependents based on Functions,
37 * Consumers, and Runnables. The appropriate form to use depends on
38 * whether actions require arguments and/or produce results. Actions
39 * may also be triggered after either or both the current and another
40 * CompletableFuture complete. Multiple CompletableFutures may also
41 * be grouped as one using {@link #anyOf(CompletableFuture...)} and
42 * {@link #allOf(CompletableFuture...)}.
43 *
44 * <p>Actions supplied for dependent completions (mainly using methods
45 * with prefix {@code then}) may be performed by the thread that
46 * completes the current CompletableFuture, or by any other caller of
47 * these methods. There are no guarantees about the order of
48 * processing completions unless constrained by these methods.
49 *
50 * <p>Upon exceptional completion (including cancellation), or when a
51 * completion entails computation, and it terminates abruptly with an
52 * (unchecked) exception or error, then further completions act as
53 * {@code completeExceptionally} with a {@link CompletionException}
54 * holding that exception as its cause. If a CompletableFuture
55 * completes exceptionally, and is not followed by a {@link
56 * #exceptionally} or {@link #handle} completion, then all of its
57 * dependents (and their dependents) also complete exceptionally with
58 * CompletionExceptions holding the ultimate cause. In case of a
59 * CompletionException, methods {@link #get()} and {@link #get(long,
60 * TimeUnit)} throw an {@link ExecutionException} with the same cause
61 * as would be held in the corresponding CompletionException. However,
62 * in these cases, methods {@link #join()} and {@link #getNow} throw
63 * the CompletionException, which simplifies usage.
64 *
65 * <p>CompletableFutures themselves do not execute asynchronously.
66 * However, the {@code async} methods provide commonly useful ways to
67 * commence asynchronous processing, using either a given {@link
68 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
69 * function or action that will result in the completion of a new
70 * CompletableFuture. To simplify monitoring, debugging, and tracking,
71 * all generated asynchronous tasks are instances of the tagging
72 * interface {@link AsynchronousCompletionTask}.
73 *
74 * @author Doug Lea
75 * @since 1.8
76 */
77 public class CompletableFuture<T> implements Future<T> {
78
79 /*
80 * Overview:
81 *
82 * 1. Non-nullness of field result (set via CAS) indicates done.
83 * An AltResult is used to box null as a result, as well as to
84 * hold exceptions. Using a single field makes completion fast
85 * and simple to detect and trigger, at the expense of a lot of
86 * encoding and decoding that infiltrates many methods. One minor
87 * simplification relies on the (static) NIL (to box null results)
88 * being the only AltResult with a null exception field, so we
89 * don't usually need explicit comparisons with NIL. The CF
90 * exception propagation mechanics surrounding decoding rely on
91 * unchecked casts of decoded results really being unchecked,
92 * where user type errors are caught at point of use, as is
93 * currently the case in Java. These are highlighted by using
94 * SuppressWarnings-annotated temporaries.
95 *
96 * 2. Waiters are held in a Treiber stack similar to the one used
97 * in FutureTask, Phaser, and SynchronousQueue. See their
98 * internal documentation for algorithmic details.
99 *
100 * 3. Completions are also kept in a list/stack, and pulled off
101 * and run when completion is triggered. (We could even use the
102 * same stack as for waiters, but would give up the potential
103 * parallelism obtained because woken waiters help release/run
104 * others -- see method postComplete). Because post-processing
105 * may race with direct calls, class Completion opportunistically
106 * extends AtomicInteger so callers can claim the action via
107 * compareAndSet(0, 1). The Completion.run methods are all
108 * written a boringly similar uniform way (that sometimes includes
109 * unnecessary-looking checks, kept to maintain uniformity). There
110 * are enough dimensions upon which they differ that attempts to
111 * factor commonalities while maintaining efficiency require more
112 * lines of code than they would save.
113 *
114 * 4. The exported then/and/or methods do support a bit of
115 * factoring (see doThenApply etc). They must cope with the
116 * intrinsic races surrounding addition of a dependent action
117 * versus performing the action directly because the task is
118 * already complete. For example, a CF may not be complete upon
119 * entry, so a dependent completion is added, but by the time it
120 * is added, the target CF is complete, so must be directly
121 * executed. This is all done while avoiding unnecessary object
122 * construction in safe-bypass cases.
123 */
124
125 // preliminaries
126
127 static final class AltResult {
128 final Throwable ex; // null only for NIL
129 AltResult(Throwable ex) { this.ex = ex; }
130 }
131
132 static final AltResult NIL = new AltResult(null);
133
134 // Fields
135
136 volatile Object result; // Either the result or boxed AltResult
137 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
138 volatile CompletionNode completions; // list (Treiber stack) of completions
139
140 // Basic utilities for triggering and processing completions
141
142 /**
143 * Removes and signals all waiting threads and runs all completions.
144 */
145 final void postComplete() {
146 WaitNode q; Thread t;
147 while ((q = waiters) != null) {
148 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
149 (t = q.thread) != null) {
150 q.thread = null;
151 LockSupport.unpark(t);
152 }
153 }
154
155 CompletionNode h; Completion c;
156 while ((h = completions) != null) {
157 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
158 (c = h.completion) != null)
159 c.run();
160 }
161 }
162
163 /**
164 * Triggers completion with the encoding of the given arguments:
165 * if the exception is non-null, encodes it as a wrapped
166 * CompletionException unless it is one already. Otherwise uses
167 * the given result, boxed as NIL if null.
168 */
169 final void internalComplete(Object v, Throwable ex) {
170 if (result == null)
171 UNSAFE.compareAndSwapObject
172 (this, RESULT, null,
173 (ex == null) ? (v == null) ? NIL : v :
174 new AltResult((ex instanceof CompletionException) ? ex :
175 new CompletionException(ex)));
176 postComplete(); // help out even if not triggered
177 }
178
179 /**
180 * If triggered, helps release and/or process completions.
181 */
182 final void helpPostComplete() {
183 if (result != null)
184 postComplete();
185 }
186
187 /* ------------- waiting for completions -------------- */
188
189 /** Number of processors, for spin control */
190 static final int NCPU = Runtime.getRuntime().availableProcessors();
191
192 /**
193 * Heuristic spin value for waitingGet() before blocking on
194 * multiprocessors
195 */
196 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
197
198 /**
199 * Linked nodes to record waiting threads in a Treiber stack. See
200 * other classes such as Phaser and SynchronousQueue for more
201 * detailed explanation. This class implements ManagedBlocker to
202 * avoid starvation when blocking actions pile up in
203 * ForkJoinPools.
204 */
205 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
206 long nanos; // wait time if timed
207 final long deadline; // non-zero if timed
208 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
209 volatile Thread thread;
210 volatile WaitNode next;
211 WaitNode(boolean interruptible, long nanos, long deadline) {
212 this.thread = Thread.currentThread();
213 this.interruptControl = interruptible ? 1 : 0;
214 this.nanos = nanos;
215 this.deadline = deadline;
216 }
217 public boolean isReleasable() {
218 if (thread == null)
219 return true;
220 if (Thread.interrupted()) {
221 int i = interruptControl;
222 interruptControl = -1;
223 if (i > 0)
224 return true;
225 }
226 if (deadline != 0L &&
227 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
228 thread = null;
229 return true;
230 }
231 return false;
232 }
233 public boolean block() {
234 if (isReleasable())
235 return true;
236 else if (deadline == 0L)
237 LockSupport.park(this);
238 else if (nanos > 0L)
239 LockSupport.parkNanos(this, nanos);
240 return isReleasable();
241 }
242 }
243
244 /**
245 * Returns raw result after waiting, or null if interruptible and
246 * interrupted.
247 */
248 private Object waitingGet(boolean interruptible) {
249 WaitNode q = null;
250 boolean queued = false;
251 int spins = SPINS;
252 for (Object r;;) {
253 if ((r = result) != null) {
254 if (q != null) { // suppress unpark
255 q.thread = null;
256 if (q.interruptControl < 0) {
257 if (interruptible) {
258 removeWaiter(q);
259 return null;
260 }
261 Thread.currentThread().interrupt();
262 }
263 }
264 postComplete(); // help release others
265 return r;
266 }
267 else if (spins > 0) {
268 int rnd = ThreadLocalRandom.nextSecondarySeed();
269 if (rnd == 0)
270 rnd = ThreadLocalRandom.current().nextInt();
271 if (rnd >= 0)
272 --spins;
273 }
274 else if (q == null)
275 q = new WaitNode(interruptible, 0L, 0L);
276 else if (!queued)
277 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
278 q.next = waiters, q);
279 else if (interruptible && q.interruptControl < 0) {
280 removeWaiter(q);
281 return null;
282 }
283 else if (q.thread != null && result == null) {
284 try {
285 ForkJoinPool.managedBlock(q);
286 } catch (InterruptedException ex) {
287 q.interruptControl = -1;
288 }
289 }
290 }
291 }
292
293 /**
294 * Awaits completion or aborts on interrupt or timeout.
295 *
296 * @param nanos time to wait
297 * @return raw result
298 */
299 private Object timedAwaitDone(long nanos)
300 throws InterruptedException, TimeoutException {
301 WaitNode q = null;
302 boolean queued = false;
303 for (Object r;;) {
304 if ((r = result) != null) {
305 if (q != null) {
306 q.thread = null;
307 if (q.interruptControl < 0) {
308 removeWaiter(q);
309 throw new InterruptedException();
310 }
311 }
312 postComplete();
313 return r;
314 }
315 else if (q == null) {
316 if (nanos <= 0L)
317 throw new TimeoutException();
318 long d = System.nanoTime() + nanos;
319 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
320 }
321 else if (!queued)
322 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
323 q.next = waiters, q);
324 else if (q.interruptControl < 0) {
325 removeWaiter(q);
326 throw new InterruptedException();
327 }
328 else if (q.nanos <= 0L) {
329 if (result == null) {
330 removeWaiter(q);
331 throw new TimeoutException();
332 }
333 }
334 else if (q.thread != null && result == null) {
335 try {
336 ForkJoinPool.managedBlock(q);
337 } catch (InterruptedException ex) {
338 q.interruptControl = -1;
339 }
340 }
341 }
342 }
343
344 /**
345 * Tries to unlink a timed-out or interrupted wait node to avoid
346 * accumulating garbage. Internal nodes are simply unspliced
347 * without CAS since it is harmless if they are traversed anyway
348 * by releasers. To avoid effects of unsplicing from already
349 * removed nodes, the list is retraversed in case of an apparent
350 * race. This is slow when there are a lot of nodes, but we don't
351 * expect lists to be long enough to outweigh higher-overhead
352 * schemes.
353 */
354 private void removeWaiter(WaitNode node) {
355 if (node != null) {
356 node.thread = null;
357 retry:
358 for (;;) { // restart on removeWaiter race
359 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
360 s = q.next;
361 if (q.thread != null)
362 pred = q;
363 else if (pred != null) {
364 pred.next = s;
365 if (pred.thread == null) // check for race
366 continue retry;
367 }
368 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
369 continue retry;
370 }
371 break;
372 }
373 }
374 }
375
376 /* ------------- Async tasks -------------- */
377
378 /**
379 * A tagging interface identifying asynchronous tasks produced by
380 * {@code async} methods. This may be useful for monitoring,
381 * debugging, and tracking asynchronous activities.
382 */
383 public static interface AsynchronousCompletionTask {
384 }
385
386 /** Base class can act as either FJ or plain Runnable */
387 abstract static class Async extends ForkJoinTask<Void>
388 implements Runnable, AsynchronousCompletionTask {
389 public final Void getRawResult() { return null; }
390 public final void setRawResult(Void v) { }
391 public final void run() { exec(); }
392 }
393
394 static final class AsyncRun extends Async {
395 final Runnable fn;
396 final CompletableFuture<Void> dst;
397 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
398 this.fn = fn; this.dst = dst;
399 }
400 public final boolean exec() {
401 CompletableFuture<Void> d; Throwable ex;
402 if ((d = this.dst) != null && d.result == null) {
403 try {
404 fn.run();
405 ex = null;
406 } catch (Throwable rex) {
407 ex = rex;
408 }
409 d.internalComplete(null, ex);
410 }
411 return true;
412 }
413 private static final long serialVersionUID = 5232453952276885070L;
414 }
415
416 static final class AsyncSupply<U> extends Async {
417 final Supplier<U> fn;
418 final CompletableFuture<U> dst;
419 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
420 this.fn = fn; this.dst = dst;
421 }
422 public final boolean exec() {
423 CompletableFuture<U> d; U u; Throwable ex;
424 if ((d = this.dst) != null && d.result == null) {
425 try {
426 u = fn.get();
427 ex = null;
428 } catch (Throwable rex) {
429 ex = rex;
430 u = null;
431 }
432 d.internalComplete(u, ex);
433 }
434 return true;
435 }
436 private static final long serialVersionUID = 5232453952276885070L;
437 }
438
439 static final class AsyncApply<T,U> extends Async {
440 final Function<? super T,? extends U> fn;
441 final T arg;
442 final CompletableFuture<U> dst;
443 AsyncApply(T arg, Function<? super T,? extends U> fn,
444 CompletableFuture<U> dst) {
445 this.arg = arg; this.fn = fn; this.dst = dst;
446 }
447 public final boolean exec() {
448 CompletableFuture<U> d; U u; Throwable ex;
449 if ((d = this.dst) != null && d.result == null) {
450 try {
451 u = fn.apply(arg);
452 ex = null;
453 } catch (Throwable rex) {
454 ex = rex;
455 u = null;
456 }
457 d.internalComplete(u, ex);
458 }
459 return true;
460 }
461 private static final long serialVersionUID = 5232453952276885070L;
462 }
463
464 static final class AsyncBiApply<T,U,V> extends Async {
465 final BiFunction<? super T,? super U,? extends V> fn;
466 final T arg1;
467 final U arg2;
468 final CompletableFuture<V> dst;
469 AsyncBiApply(T arg1, U arg2,
470 BiFunction<? super T,? super U,? extends V> fn,
471 CompletableFuture<V> dst) {
472 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
473 }
474 public final boolean exec() {
475 CompletableFuture<V> d; V v; Throwable ex;
476 if ((d = this.dst) != null && d.result == null) {
477 try {
478 v = fn.apply(arg1, arg2);
479 ex = null;
480 } catch (Throwable rex) {
481 ex = rex;
482 v = null;
483 }
484 d.internalComplete(v, ex);
485 }
486 return true;
487 }
488 private static final long serialVersionUID = 5232453952276885070L;
489 }
490
491 static final class AsyncAccept<T> extends Async {
492 final Consumer<? super T> fn;
493 final T arg;
494 final CompletableFuture<Void> dst;
495 AsyncAccept(T arg, Consumer<? super T> fn,
496 CompletableFuture<Void> dst) {
497 this.arg = arg; this.fn = fn; this.dst = dst;
498 }
499 public final boolean exec() {
500 CompletableFuture<Void> d; Throwable ex;
501 if ((d = this.dst) != null && d.result == null) {
502 try {
503 fn.accept(arg);
504 ex = null;
505 } catch (Throwable rex) {
506 ex = rex;
507 }
508 d.internalComplete(null, ex);
509 }
510 return true;
511 }
512 private static final long serialVersionUID = 5232453952276885070L;
513 }
514
515 static final class AsyncBiAccept<T,U> extends Async {
516 final BiConsumer<? super T,? super U> fn;
517 final T arg1;
518 final U arg2;
519 final CompletableFuture<Void> dst;
520 AsyncBiAccept(T arg1, U arg2,
521 BiConsumer<? super T,? super U> fn,
522 CompletableFuture<Void> dst) {
523 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
524 }
525 public final boolean exec() {
526 CompletableFuture<Void> d; Throwable ex;
527 if ((d = this.dst) != null && d.result == null) {
528 try {
529 fn.accept(arg1, arg2);
530 ex = null;
531 } catch (Throwable rex) {
532 ex = rex;
533 }
534 d.internalComplete(null, ex);
535 }
536 return true;
537 }
538 private static final long serialVersionUID = 5232453952276885070L;
539 }
540
541 static final class AsyncCompose<T,U> extends Async {
542 final Function<? super T, CompletableFuture<U>> fn;
543 final T arg;
544 final CompletableFuture<U> dst;
545 AsyncCompose(T arg,
546 Function<? super T, CompletableFuture<U>> fn,
547 CompletableFuture<U> dst) {
548 this.arg = arg; this.fn = fn; this.dst = dst;
549 }
550 public final boolean exec() {
551 CompletableFuture<U> d, fr; U u; Throwable ex;
552 if ((d = this.dst) != null && d.result == null) {
553 try {
554 fr = fn.apply(arg);
555 ex = null;
556 } catch (Throwable rex) {
557 ex = rex;
558 fr = null;
559 }
560 if (ex != null)
561 u = null;
562 else if (fr == null) {
563 ex = new NullPointerException();
564 u = null;
565 }
566 else {
567 Object r = fr.result;
568 if (r instanceof AltResult) {
569 ex = ((AltResult)r).ex;
570 u = null;
571 }
572 else {
573 @SuppressWarnings("unchecked") U ur = (U) r;
574 u = ur;
575 }
576 }
577 d.internalComplete(u, ex);
578 }
579 return true;
580 }
581 private static final long serialVersionUID = 5232453952276885070L;
582 }
583
584 /* ------------- Completions -------------- */
585
586 /**
587 * Simple linked list nodes to record completions, used in
588 * basically the same way as WaitNodes. (We separate nodes from
589 * the Completions themselves mainly because for the And and Or
590 * methods, the same Completion object resides in two lists.)
591 */
592 static final class CompletionNode {
593 final Completion completion;
594 volatile CompletionNode next;
595 CompletionNode(Completion completion) { this.completion = completion; }
596 }
597
598 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
599 abstract static class Completion extends AtomicInteger implements Runnable {
600 }
601
602 static final class ApplyCompletion<T,U> extends Completion {
603 final CompletableFuture<? extends T> src;
604 final Function<? super T,? extends U> fn;
605 final CompletableFuture<U> dst;
606 final Executor executor;
607 ApplyCompletion(CompletableFuture<? extends T> src,
608 Function<? super T,? extends U> fn,
609 CompletableFuture<U> dst, Executor executor) {
610 this.src = src; this.fn = fn; this.dst = dst;
611 this.executor = executor;
612 }
613 public final void run() {
614 final CompletableFuture<? extends T> a;
615 final Function<? super T,? extends U> fn;
616 final CompletableFuture<U> dst;
617 Object r; T t; Throwable ex;
618 if ((dst = this.dst) != null &&
619 (fn = this.fn) != null &&
620 (a = this.src) != null &&
621 (r = a.result) != null &&
622 compareAndSet(0, 1)) {
623 if (r instanceof AltResult) {
624 ex = ((AltResult)r).ex;
625 t = null;
626 }
627 else {
628 ex = null;
629 @SuppressWarnings("unchecked") T tr = (T) r;
630 t = tr;
631 }
632 Executor e = executor;
633 U u = null;
634 if (ex == null) {
635 try {
636 if (e != null)
637 e.execute(new AsyncApply<T,U>(t, fn, dst));
638 else
639 u = fn.apply(t);
640 } catch (Throwable rex) {
641 ex = rex;
642 }
643 }
644 if (e == null || ex != null)
645 dst.internalComplete(u, ex);
646 }
647 }
648 private static final long serialVersionUID = 5232453952276885070L;
649 }
650
651 static final class AcceptCompletion<T> extends Completion {
652 final CompletableFuture<? extends T> src;
653 final Consumer<? super T> fn;
654 final CompletableFuture<Void> dst;
655 final Executor executor;
656 AcceptCompletion(CompletableFuture<? extends T> src,
657 Consumer<? super T> fn,
658 CompletableFuture<Void> dst, Executor executor) {
659 this.src = src; this.fn = fn; this.dst = dst;
660 this.executor = executor;
661 }
662 public final void run() {
663 final CompletableFuture<? extends T> a;
664 final Consumer<? super T> fn;
665 final CompletableFuture<Void> dst;
666 Object r; T t; Throwable ex;
667 if ((dst = this.dst) != null &&
668 (fn = this.fn) != null &&
669 (a = this.src) != null &&
670 (r = a.result) != null &&
671 compareAndSet(0, 1)) {
672 if (r instanceof AltResult) {
673 ex = ((AltResult)r).ex;
674 t = null;
675 }
676 else {
677 ex = null;
678 @SuppressWarnings("unchecked") T tr = (T) r;
679 t = tr;
680 }
681 Executor e = executor;
682 if (ex == null) {
683 try {
684 if (e != null)
685 e.execute(new AsyncAccept<T>(t, fn, dst));
686 else
687 fn.accept(t);
688 } catch (Throwable rex) {
689 ex = rex;
690 }
691 }
692 if (e == null || ex != null)
693 dst.internalComplete(null, ex);
694 }
695 }
696 private static final long serialVersionUID = 5232453952276885070L;
697 }
698
699 static final class RunCompletion<T> extends Completion {
700 final CompletableFuture<? extends T> src;
701 final Runnable fn;
702 final CompletableFuture<Void> dst;
703 final Executor executor;
704 RunCompletion(CompletableFuture<? extends T> src,
705 Runnable fn,
706 CompletableFuture<Void> dst,
707 Executor executor) {
708 this.src = src; this.fn = fn; this.dst = dst;
709 this.executor = executor;
710 }
711 public final void run() {
712 final CompletableFuture<? extends T> a;
713 final Runnable fn;
714 final CompletableFuture<Void> dst;
715 Object r; Throwable ex;
716 if ((dst = this.dst) != null &&
717 (fn = this.fn) != null &&
718 (a = this.src) != null &&
719 (r = a.result) != null &&
720 compareAndSet(0, 1)) {
721 if (r instanceof AltResult)
722 ex = ((AltResult)r).ex;
723 else
724 ex = null;
725 Executor e = executor;
726 if (ex == null) {
727 try {
728 if (e != null)
729 e.execute(new AsyncRun(fn, dst));
730 else
731 fn.run();
732 } catch (Throwable rex) {
733 ex = rex;
734 }
735 }
736 if (e == null || ex != null)
737 dst.internalComplete(null, ex);
738 }
739 }
740 private static final long serialVersionUID = 5232453952276885070L;
741 }
742
743 static final class BiApplyCompletion<T,U,V> extends Completion {
744 final CompletableFuture<? extends T> src;
745 final CompletableFuture<? extends U> snd;
746 final BiFunction<? super T,? super U,? extends V> fn;
747 final CompletableFuture<V> dst;
748 final Executor executor;
749 BiApplyCompletion(CompletableFuture<? extends T> src,
750 CompletableFuture<? extends U> snd,
751 BiFunction<? super T,? super U,? extends V> fn,
752 CompletableFuture<V> dst, Executor executor) {
753 this.src = src; this.snd = snd;
754 this.fn = fn; this.dst = dst;
755 this.executor = executor;
756 }
757 public final void run() {
758 final CompletableFuture<? extends T> a;
759 final CompletableFuture<? extends U> b;
760 final BiFunction<? super T,? super U,? extends V> fn;
761 final CompletableFuture<V> dst;
762 Object r, s; T t; U u; Throwable ex;
763 if ((dst = this.dst) != null &&
764 (fn = this.fn) != null &&
765 (a = this.src) != null &&
766 (r = a.result) != null &&
767 (b = this.snd) != null &&
768 (s = b.result) != null &&
769 compareAndSet(0, 1)) {
770 if (r instanceof AltResult) {
771 ex = ((AltResult)r).ex;
772 t = null;
773 }
774 else {
775 ex = null;
776 @SuppressWarnings("unchecked") T tr = (T) r;
777 t = tr;
778 }
779 if (ex != null)
780 u = null;
781 else if (s instanceof AltResult) {
782 ex = ((AltResult)s).ex;
783 u = null;
784 }
785 else {
786 @SuppressWarnings("unchecked") U us = (U) s;
787 u = us;
788 }
789 Executor e = executor;
790 V v = null;
791 if (ex == null) {
792 try {
793 if (e != null)
794 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
795 else
796 v = fn.apply(t, u);
797 } catch (Throwable rex) {
798 ex = rex;
799 }
800 }
801 if (e == null || ex != null)
802 dst.internalComplete(v, ex);
803 }
804 }
805 private static final long serialVersionUID = 5232453952276885070L;
806 }
807
808 static final class BiAcceptCompletion<T,U> extends Completion {
809 final CompletableFuture<? extends T> src;
810 final CompletableFuture<? extends U> snd;
811 final BiConsumer<? super T,? super U> fn;
812 final CompletableFuture<Void> dst;
813 final Executor executor;
814 BiAcceptCompletion(CompletableFuture<? extends T> src,
815 CompletableFuture<? extends U> snd,
816 BiConsumer<? super T,? super U> fn,
817 CompletableFuture<Void> dst, Executor executor) {
818 this.src = src; this.snd = snd;
819 this.fn = fn; this.dst = dst;
820 this.executor = executor;
821 }
822 public final void run() {
823 final CompletableFuture<? extends T> a;
824 final CompletableFuture<? extends U> b;
825 final BiConsumer<? super T,? super U> fn;
826 final CompletableFuture<Void> dst;
827 Object r, s; T t; U u; Throwable ex;
828 if ((dst = this.dst) != null &&
829 (fn = this.fn) != null &&
830 (a = this.src) != null &&
831 (r = a.result) != null &&
832 (b = this.snd) != null &&
833 (s = b.result) != null &&
834 compareAndSet(0, 1)) {
835 if (r instanceof AltResult) {
836 ex = ((AltResult)r).ex;
837 t = null;
838 }
839 else {
840 ex = null;
841 @SuppressWarnings("unchecked") T tr = (T) r;
842 t = tr;
843 }
844 if (ex != null)
845 u = null;
846 else if (s instanceof AltResult) {
847 ex = ((AltResult)s).ex;
848 u = null;
849 }
850 else {
851 @SuppressWarnings("unchecked") U us = (U) s;
852 u = us;
853 }
854 Executor e = executor;
855 if (ex == null) {
856 try {
857 if (e != null)
858 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
859 else
860 fn.accept(t, u);
861 } catch (Throwable rex) {
862 ex = rex;
863 }
864 }
865 if (e == null || ex != null)
866 dst.internalComplete(null, ex);
867 }
868 }
869 private static final long serialVersionUID = 5232453952276885070L;
870 }
871
872 static final class BiRunCompletion<T> extends Completion {
873 final CompletableFuture<? extends T> src;
874 final CompletableFuture<?> snd;
875 final Runnable fn;
876 final CompletableFuture<Void> dst;
877 final Executor executor;
878 BiRunCompletion(CompletableFuture<? extends T> src,
879 CompletableFuture<?> snd,
880 Runnable fn,
881 CompletableFuture<Void> dst, Executor executor) {
882 this.src = src; this.snd = snd;
883 this.fn = fn; this.dst = dst;
884 this.executor = executor;
885 }
886 public final void run() {
887 final CompletableFuture<? extends T> a;
888 final CompletableFuture<?> b;
889 final Runnable fn;
890 final CompletableFuture<Void> dst;
891 Object r, s; Throwable ex;
892 if ((dst = this.dst) != null &&
893 (fn = this.fn) != null &&
894 (a = this.src) != null &&
895 (r = a.result) != null &&
896 (b = this.snd) != null &&
897 (s = b.result) != null &&
898 compareAndSet(0, 1)) {
899 if (r instanceof AltResult)
900 ex = ((AltResult)r).ex;
901 else
902 ex = null;
903 if (ex == null && (s instanceof AltResult))
904 ex = ((AltResult)s).ex;
905 Executor e = executor;
906 if (ex == null) {
907 try {
908 if (e != null)
909 e.execute(new AsyncRun(fn, dst));
910 else
911 fn.run();
912 } catch (Throwable rex) {
913 ex = rex;
914 }
915 }
916 if (e == null || ex != null)
917 dst.internalComplete(null, ex);
918 }
919 }
920 private static final long serialVersionUID = 5232453952276885070L;
921 }
922
923 static final class AndCompletion extends Completion {
924 final CompletableFuture<?> src;
925 final CompletableFuture<?> snd;
926 final CompletableFuture<Void> dst;
927 AndCompletion(CompletableFuture<?> src,
928 CompletableFuture<?> snd,
929 CompletableFuture<Void> dst) {
930 this.src = src; this.snd = snd; this.dst = dst;
931 }
932 public final void run() {
933 final CompletableFuture<?> a;
934 final CompletableFuture<?> b;
935 final CompletableFuture<Void> dst;
936 Object r, s; Throwable ex;
937 if ((dst = this.dst) != null &&
938 (a = this.src) != null &&
939 (r = a.result) != null &&
940 (b = this.snd) != null &&
941 (s = b.result) != null &&
942 compareAndSet(0, 1)) {
943 if (r instanceof AltResult)
944 ex = ((AltResult)r).ex;
945 else
946 ex = null;
947 if (ex == null && (s instanceof AltResult))
948 ex = ((AltResult)s).ex;
949 dst.internalComplete(null, ex);
950 }
951 }
952 private static final long serialVersionUID = 5232453952276885070L;
953 }
954
955 static final class OrApplyCompletion<T,U> extends Completion {
956 final CompletableFuture<? extends T> src;
957 final CompletableFuture<? extends T> snd;
958 final Function<? super T,? extends U> fn;
959 final CompletableFuture<U> dst;
960 final Executor executor;
961 OrApplyCompletion(CompletableFuture<? extends T> src,
962 CompletableFuture<? extends T> snd,
963 Function<? super T,? extends U> fn,
964 CompletableFuture<U> dst, Executor executor) {
965 this.src = src; this.snd = snd;
966 this.fn = fn; this.dst = dst;
967 this.executor = executor;
968 }
969 public final void run() {
970 final CompletableFuture<? extends T> a;
971 final CompletableFuture<? extends T> b;
972 final Function<? super T,? extends U> fn;
973 final CompletableFuture<U> dst;
974 Object r; T t; Throwable ex;
975 if ((dst = this.dst) != null &&
976 (fn = this.fn) != null &&
977 (((a = this.src) != null && (r = a.result) != null) ||
978 ((b = this.snd) != null && (r = b.result) != null)) &&
979 compareAndSet(0, 1)) {
980 if (r instanceof AltResult) {
981 ex = ((AltResult)r).ex;
982 t = null;
983 }
984 else {
985 ex = null;
986 @SuppressWarnings("unchecked") T tr = (T) r;
987 t = tr;
988 }
989 Executor e = executor;
990 U u = null;
991 if (ex == null) {
992 try {
993 if (e != null)
994 e.execute(new AsyncApply<T,U>(t, fn, dst));
995 else
996 u = fn.apply(t);
997 } catch (Throwable rex) {
998 ex = rex;
999 }
1000 }
1001 if (e == null || ex != null)
1002 dst.internalComplete(u, ex);
1003 }
1004 }
1005 private static final long serialVersionUID = 5232453952276885070L;
1006 }
1007
1008 static final class OrAcceptCompletion<T> extends Completion {
1009 final CompletableFuture<? extends T> src;
1010 final CompletableFuture<? extends T> snd;
1011 final Consumer<? super T> fn;
1012 final CompletableFuture<Void> dst;
1013 final Executor executor;
1014 OrAcceptCompletion(CompletableFuture<? extends T> src,
1015 CompletableFuture<? extends T> snd,
1016 Consumer<? super T> fn,
1017 CompletableFuture<Void> dst, Executor executor) {
1018 this.src = src; this.snd = snd;
1019 this.fn = fn; this.dst = dst;
1020 this.executor = executor;
1021 }
1022 public final void run() {
1023 final CompletableFuture<? extends T> a;
1024 final CompletableFuture<? extends T> b;
1025 final Consumer<? super T> fn;
1026 final CompletableFuture<Void> dst;
1027 Object r; T t; Throwable ex;
1028 if ((dst = this.dst) != null &&
1029 (fn = this.fn) != null &&
1030 (((a = this.src) != null && (r = a.result) != null) ||
1031 ((b = this.snd) != null && (r = b.result) != null)) &&
1032 compareAndSet(0, 1)) {
1033 if (r instanceof AltResult) {
1034 ex = ((AltResult)r).ex;
1035 t = null;
1036 }
1037 else {
1038 ex = null;
1039 @SuppressWarnings("unchecked") T tr = (T) r;
1040 t = tr;
1041 }
1042 Executor e = executor;
1043 if (ex == null) {
1044 try {
1045 if (e != null)
1046 e.execute(new AsyncAccept<T>(t, fn, dst));
1047 else
1048 fn.accept(t);
1049 } catch (Throwable rex) {
1050 ex = rex;
1051 }
1052 }
1053 if (e == null || ex != null)
1054 dst.internalComplete(null, ex);
1055 }
1056 }
1057 private static final long serialVersionUID = 5232453952276885070L;
1058 }
1059
1060 static final class OrRunCompletion<T> extends Completion {
1061 final CompletableFuture<? extends T> src;
1062 final CompletableFuture<?> snd;
1063 final Runnable fn;
1064 final CompletableFuture<Void> dst;
1065 final Executor executor;
1066 OrRunCompletion(CompletableFuture<? extends T> src,
1067 CompletableFuture<?> snd,
1068 Runnable fn,
1069 CompletableFuture<Void> dst, Executor executor) {
1070 this.src = src; this.snd = snd;
1071 this.fn = fn; this.dst = dst;
1072 this.executor = executor;
1073 }
1074 public final void run() {
1075 final CompletableFuture<? extends T> a;
1076 final CompletableFuture<?> b;
1077 final Runnable fn;
1078 final CompletableFuture<Void> dst;
1079 Object r; Throwable ex;
1080 if ((dst = this.dst) != null &&
1081 (fn = this.fn) != null &&
1082 (((a = this.src) != null && (r = a.result) != null) ||
1083 ((b = this.snd) != null && (r = b.result) != null)) &&
1084 compareAndSet(0, 1)) {
1085 if (r instanceof AltResult)
1086 ex = ((AltResult)r).ex;
1087 else
1088 ex = null;
1089 Executor e = executor;
1090 if (ex == null) {
1091 try {
1092 if (e != null)
1093 e.execute(new AsyncRun(fn, dst));
1094 else
1095 fn.run();
1096 } catch (Throwable rex) {
1097 ex = rex;
1098 }
1099 }
1100 if (e == null || ex != null)
1101 dst.internalComplete(null, ex);
1102 }
1103 }
1104 private static final long serialVersionUID = 5232453952276885070L;
1105 }
1106
1107 static final class OrCompletion extends Completion {
1108 final CompletableFuture<?> src;
1109 final CompletableFuture<?> snd;
1110 final CompletableFuture<?> dst;
1111 OrCompletion(CompletableFuture<?> src,
1112 CompletableFuture<?> snd,
1113 CompletableFuture<?> dst) {
1114 this.src = src; this.snd = snd; this.dst = dst;
1115 }
1116 public final void run() {
1117 final CompletableFuture<?> a;
1118 final CompletableFuture<?> b;
1119 final CompletableFuture<?> dst;
1120 Object r, t; Throwable ex;
1121 if ((dst = this.dst) != null &&
1122 (((a = this.src) != null && (r = a.result) != null) ||
1123 ((b = this.snd) != null && (r = b.result) != null)) &&
1124 compareAndSet(0, 1)) {
1125 if (r instanceof AltResult) {
1126 ex = ((AltResult)r).ex;
1127 t = null;
1128 }
1129 else {
1130 ex = null;
1131 t = r;
1132 }
1133 dst.internalComplete(t, ex);
1134 }
1135 }
1136 private static final long serialVersionUID = 5232453952276885070L;
1137 }
1138
1139 static final class ExceptionCompletion<T> extends Completion {
1140 final CompletableFuture<? extends T> src;
1141 final Function<? super Throwable, ? extends T> fn;
1142 final CompletableFuture<T> dst;
1143 ExceptionCompletion(CompletableFuture<? extends T> src,
1144 Function<? super Throwable, ? extends T> fn,
1145 CompletableFuture<T> dst) {
1146 this.src = src; this.fn = fn; this.dst = dst;
1147 }
1148 public final void run() {
1149 final CompletableFuture<? extends T> a;
1150 final Function<? super Throwable, ? extends T> fn;
1151 final CompletableFuture<T> dst;
1152 Object r; T t = null; Throwable ex, dx = null;
1153 if ((dst = this.dst) != null &&
1154 (fn = this.fn) != null &&
1155 (a = this.src) != null &&
1156 (r = a.result) != null &&
1157 compareAndSet(0, 1)) {
1158 if ((r instanceof AltResult) &&
1159 (ex = ((AltResult)r).ex) != null) {
1160 try {
1161 t = fn.apply(ex);
1162 } catch (Throwable rex) {
1163 dx = rex;
1164 }
1165 }
1166 else {
1167 @SuppressWarnings("unchecked") T tr = (T) r;
1168 t = tr;
1169 }
1170 dst.internalComplete(t, dx);
1171 }
1172 }
1173 private static final long serialVersionUID = 5232453952276885070L;
1174 }
1175
1176 static final class ThenCopy extends Completion {
1177 final CompletableFuture<?> src;
1178 final CompletableFuture<?> dst;
1179 ThenCopy(CompletableFuture<?> src,
1180 CompletableFuture<?> dst) {
1181 this.src = src; this.dst = dst;
1182 }
1183 public final void run() {
1184 final CompletableFuture<?> a;
1185 final CompletableFuture<?> dst;
1186 Object r; Object t; Throwable ex;
1187 if ((dst = this.dst) != null &&
1188 (a = this.src) != null &&
1189 (r = a.result) != null &&
1190 compareAndSet(0, 1)) {
1191 if (r instanceof AltResult) {
1192 ex = ((AltResult)r).ex;
1193 t = null;
1194 }
1195 else {
1196 ex = null;
1197 t = r;
1198 }
1199 dst.internalComplete(t, ex);
1200 }
1201 }
1202 private static final long serialVersionUID = 5232453952276885070L;
1203 }
1204
1205 static final class HandleCompletion<T,U> extends Completion {
1206 final CompletableFuture<? extends T> src;
1207 final BiFunction<? super T, Throwable, ? extends U> fn;
1208 final CompletableFuture<U> dst;
1209 HandleCompletion(CompletableFuture<? extends T> src,
1210 BiFunction<? super T, Throwable, ? extends U> fn,
1211 final CompletableFuture<U> dst) {
1212 this.src = src; this.fn = fn; this.dst = dst;
1213 }
1214 public final void run() {
1215 final CompletableFuture<? extends T> a;
1216 final BiFunction<? super T, Throwable, ? extends U> fn;
1217 final CompletableFuture<U> dst;
1218 Object r; T t; Throwable ex;
1219 if ((dst = this.dst) != null &&
1220 (fn = this.fn) != null &&
1221 (a = this.src) != null &&
1222 (r = a.result) != null &&
1223 compareAndSet(0, 1)) {
1224 if (r instanceof AltResult) {
1225 ex = ((AltResult)r).ex;
1226 t = null;
1227 }
1228 else {
1229 ex = null;
1230 @SuppressWarnings("unchecked") T tr = (T) r;
1231 t = tr;
1232 }
1233 U u = null; Throwable dx = null;
1234 try {
1235 u = fn.apply(t, ex);
1236 } catch (Throwable rex) {
1237 dx = rex;
1238 }
1239 dst.internalComplete(u, dx);
1240 }
1241 }
1242 private static final long serialVersionUID = 5232453952276885070L;
1243 }
1244
1245 static final class ComposeCompletion<T,U> extends Completion {
1246 final CompletableFuture<? extends T> src;
1247 final Function<? super T, CompletableFuture<U>> fn;
1248 final CompletableFuture<U> dst;
1249 final Executor executor;
1250 ComposeCompletion(CompletableFuture<? extends T> src,
1251 Function<? super T, CompletableFuture<U>> fn,
1252 final CompletableFuture<U> dst, Executor executor) {
1253 this.src = src; this.fn = fn; this.dst = dst;
1254 this.executor = executor;
1255 }
1256 public final void run() {
1257 final CompletableFuture<? extends T> a;
1258 final Function<? super T, CompletableFuture<U>> fn;
1259 final CompletableFuture<U> dst;
1260 Object r; T t; Throwable ex; Executor e;
1261 if ((dst = this.dst) != null &&
1262 (fn = this.fn) != null &&
1263 (a = this.src) != null &&
1264 (r = a.result) != null &&
1265 compareAndSet(0, 1)) {
1266 if (r instanceof AltResult) {
1267 ex = ((AltResult)r).ex;
1268 t = null;
1269 }
1270 else {
1271 ex = null;
1272 @SuppressWarnings("unchecked") T tr = (T) r;
1273 t = tr;
1274 }
1275 CompletableFuture<U> c = null;
1276 U u = null;
1277 boolean complete = false;
1278 if (ex == null) {
1279 if ((e = executor) != null)
1280 e.execute(new AsyncCompose<T,U>(t, fn, dst));
1281 else {
1282 try {
1283 if ((c = fn.apply(t)) == null)
1284 ex = new NullPointerException();
1285 } catch (Throwable rex) {
1286 ex = rex;
1287 }
1288 }
1289 }
1290 if (c != null) {
1291 ThenCopy d = null;
1292 Object s;
1293 if ((s = c.result) == null) {
1294 CompletionNode p = new CompletionNode
1295 (d = new ThenCopy(c, dst));
1296 while ((s = c.result) == null) {
1297 if (UNSAFE.compareAndSwapObject
1298 (c, COMPLETIONS, p.next = c.completions, p))
1299 break;
1300 }
1301 }
1302 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1303 complete = true;
1304 if (s instanceof AltResult) {
1305 ex = ((AltResult)s).ex; // no rewrap
1306 u = null;
1307 }
1308 else {
1309 @SuppressWarnings("unchecked") U us = (U) s;
1310 u = us;
1311 }
1312 }
1313 }
1314 if (complete || ex != null)
1315 dst.internalComplete(u, ex);
1316 if (c != null)
1317 c.helpPostComplete();
1318 }
1319 }
1320 private static final long serialVersionUID = 5232453952276885070L;
1321 }
1322
1323 // public methods
1324
1325 /**
1326 * Creates a new incomplete CompletableFuture.
1327 */
1328 public CompletableFuture() {
1329 }
1330
1331 /**
1332 * Asynchronously executes in the {@link
1333 * ForkJoinPool#commonPool()}, a task that completes the returned
1334 * CompletableFuture with the result of the given Supplier.
1335 *
1336 * @param supplier a function returning the value to be used
1337 * to complete the returned CompletableFuture
1338 * @return the CompletableFuture
1339 */
1340 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1341 if (supplier == null) throw new NullPointerException();
1342 CompletableFuture<U> f = new CompletableFuture<U>();
1343 ForkJoinPool.commonPool().
1344 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1345 return f;
1346 }
1347
1348 /**
1349 * Asynchronously executes using the given executor, a task that
1350 * completes the returned CompletableFuture with the result of the
1351 * given Supplier.
1352 *
1353 * @param supplier a function returning the value to be used
1354 * to complete the returned CompletableFuture
1355 * @param executor the executor to use for asynchronous execution
1356 * @return the CompletableFuture
1357 */
1358 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1359 Executor executor) {
1360 if (executor == null || supplier == null)
1361 throw new NullPointerException();
1362 CompletableFuture<U> f = new CompletableFuture<U>();
1363 executor.execute(new AsyncSupply<U>(supplier, f));
1364 return f;
1365 }
1366
1367 /**
1368 * Asynchronously executes in the {@link
1369 * ForkJoinPool#commonPool()} a task that runs the given action,
1370 * and then completes the returned CompletableFuture.
1371 *
1372 * @param runnable the action to run before completing the
1373 * returned CompletableFuture
1374 * @return the CompletableFuture
1375 */
1376 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1377 if (runnable == null) throw new NullPointerException();
1378 CompletableFuture<Void> f = new CompletableFuture<Void>();
1379 ForkJoinPool.commonPool().
1380 execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1381 return f;
1382 }
1383
1384 /**
1385 * Asynchronously executes using the given executor, a task that
1386 * runs the given action, and then completes the returned
1387 * CompletableFuture.
1388 *
1389 * @param runnable the action to run before completing the
1390 * returned CompletableFuture
1391 * @param executor the executor to use for asynchronous execution
1392 * @return the CompletableFuture
1393 */
1394 public static CompletableFuture<Void> runAsync(Runnable runnable,
1395 Executor executor) {
1396 if (executor == null || runnable == null)
1397 throw new NullPointerException();
1398 CompletableFuture<Void> f = new CompletableFuture<Void>();
1399 executor.execute(new AsyncRun(runnable, f));
1400 return f;
1401 }
1402
1403 /**
1404 * Returns {@code true} if completed in any fashion: normally,
1405 * exceptionally, or via cancellation.
1406 *
1407 * @return {@code true} if completed
1408 */
1409 public boolean isDone() {
1410 return result != null;
1411 }
1412
1413 /**
1414 * Waits if necessary for this future to complete, and then
1415 * returns its result.
1416 *
1417 * @return the result value
1418 * @throws CancellationException if this future was cancelled
1419 * @throws ExecutionException if this future completed exceptionally
1420 * @throws InterruptedException if the current thread was interrupted
1421 * while waiting
1422 */
1423 public T get() throws InterruptedException, ExecutionException {
1424 Object r; Throwable ex, cause;
1425 if ((r = result) == null && (r = waitingGet(true)) == null)
1426 throw new InterruptedException();
1427 if (!(r instanceof AltResult)) {
1428 @SuppressWarnings("unchecked") T tr = (T) r;
1429 return tr;
1430 }
1431 if ((ex = ((AltResult)r).ex) == null)
1432 return null;
1433 if (ex instanceof CancellationException)
1434 throw (CancellationException)ex;
1435 if ((ex instanceof CompletionException) &&
1436 (cause = ex.getCause()) != null)
1437 ex = cause;
1438 throw new ExecutionException(ex);
1439 }
1440
1441 /**
1442 * Waits if necessary for at most the given time for this future
1443 * to complete, and then returns its result, if available.
1444 *
1445 * @param timeout the maximum time to wait
1446 * @param unit the time unit of the timeout argument
1447 * @return the result value
1448 * @throws CancellationException if this future was cancelled
1449 * @throws ExecutionException if this future completed exceptionally
1450 * @throws InterruptedException if the current thread was interrupted
1451 * while waiting
1452 * @throws TimeoutException if the wait timed out
1453 */
1454 public T get(long timeout, TimeUnit unit)
1455 throws InterruptedException, ExecutionException, TimeoutException {
1456 Object r; Throwable ex, cause;
1457 long nanos = unit.toNanos(timeout);
1458 if (Thread.interrupted())
1459 throw new InterruptedException();
1460 if ((r = result) == null)
1461 r = timedAwaitDone(nanos);
1462 if (!(r instanceof AltResult)) {
1463 @SuppressWarnings("unchecked") T tr = (T) r;
1464 return tr;
1465 }
1466 if ((ex = ((AltResult)r).ex) == null)
1467 return null;
1468 if (ex instanceof CancellationException)
1469 throw (CancellationException)ex;
1470 if ((ex instanceof CompletionException) &&
1471 (cause = ex.getCause()) != null)
1472 ex = cause;
1473 throw new ExecutionException(ex);
1474 }
1475
1476 /**
1477 * Returns the result value when complete, or throws an
1478 * (unchecked) exception if completed exceptionally. To better
1479 * conform with the use of common functional forms, if a
1480 * computation involved in the completion of this
1481 * CompletableFuture threw an exception, this method throws an
1482 * (unchecked) {@link CompletionException} with the underlying
1483 * exception as its cause.
1484 *
1485 * @return the result value
1486 * @throws CancellationException if the computation was cancelled
1487 * @throws CompletionException if a completion computation threw
1488 * an exception
1489 */
1490 public T join() {
1491 Object r; Throwable ex;
1492 if ((r = result) == null)
1493 r = waitingGet(false);
1494 if (!(r instanceof AltResult)) {
1495 @SuppressWarnings("unchecked") T tr = (T) r;
1496 return tr;
1497 }
1498 if ((ex = ((AltResult)r).ex) == null)
1499 return null;
1500 if (ex instanceof CancellationException)
1501 throw (CancellationException)ex;
1502 if (ex instanceof CompletionException)
1503 throw (CompletionException)ex;
1504 throw new CompletionException(ex);
1505 }
1506
1507 /**
1508 * Returns the result value (or throws any encountered exception)
1509 * if completed, else returns the given valueIfAbsent.
1510 *
1511 * @param valueIfAbsent the value to return if not completed
1512 * @return the result value, if completed, else the given valueIfAbsent
1513 * @throws CancellationException if the computation was cancelled
1514 * @throws CompletionException if a completion computation threw
1515 * an exception
1516 */
1517 public T getNow(T valueIfAbsent) {
1518 Object r; Throwable ex;
1519 if ((r = result) == null)
1520 return valueIfAbsent;
1521 if (!(r instanceof AltResult)) {
1522 @SuppressWarnings("unchecked") T tr = (T) r;
1523 return tr;
1524 }
1525 if ((ex = ((AltResult)r).ex) == null)
1526 return null;
1527 if (ex instanceof CancellationException)
1528 throw (CancellationException)ex;
1529 if (ex instanceof CompletionException)
1530 throw (CompletionException)ex;
1531 throw new CompletionException(ex);
1532 }
1533
1534 /**
1535 * If not already completed, sets the value returned by {@link
1536 * #get()} and related methods to the given value.
1537 *
1538 * @param value the result value
1539 * @return {@code true} if this invocation caused this CompletableFuture
1540 * to transition to a completed state, else {@code false}
1541 */
1542 public boolean complete(T value) {
1543 boolean triggered = result == null &&
1544 UNSAFE.compareAndSwapObject(this, RESULT, null,
1545 value == null ? NIL : value);
1546 postComplete();
1547 return triggered;
1548 }
1549
1550 /**
1551 * If not already completed, causes invocations of {@link #get()}
1552 * and related methods to throw the given exception.
1553 *
1554 * @param ex the exception
1555 * @return {@code true} if this invocation caused this CompletableFuture
1556 * to transition to a completed state, else {@code false}
1557 */
1558 public boolean completeExceptionally(Throwable ex) {
1559 if (ex == null) throw new NullPointerException();
1560 boolean triggered = result == null &&
1561 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1562 postComplete();
1563 return triggered;
1564 }
1565
1566 /**
1567 * Creates and returns a CompletableFuture that is completed with
1568 * the result of the given function of this CompletableFuture.
1569 * If this CompletableFuture completes exceptionally,
1570 * then the returned CompletableFuture also does so,
1571 * with a CompletionException holding this exception as
1572 * its cause.
1573 *
1574 * @param fn the function to use to compute the value of
1575 * the returned CompletableFuture
1576 * @return the new CompletableFuture
1577 */
1578 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
1579 return doThenApply(fn, null);
1580 }
1581
1582 /**
1583 * Creates and returns a CompletableFuture that is asynchronously
1584 * completed using the {@link ForkJoinPool#commonPool()} with the
1585 * result of the given function of this CompletableFuture. If
1586 * this CompletableFuture completes exceptionally, then the
1587 * returned CompletableFuture also does so, with a
1588 * CompletionException holding this exception as its cause.
1589 *
1590 * @param fn the function to use to compute the value of
1591 * the returned CompletableFuture
1592 * @return the new CompletableFuture
1593 */
1594 public <U> CompletableFuture<U> thenApplyAsync
1595 (Function<? super T,? extends U> fn) {
1596 return doThenApply(fn, ForkJoinPool.commonPool());
1597 }
1598
1599 /**
1600 * Creates and returns a CompletableFuture that is asynchronously
1601 * completed using the given executor with the result of the given
1602 * function of this CompletableFuture. If this CompletableFuture
1603 * completes exceptionally, then the returned CompletableFuture
1604 * also does so, with a CompletionException holding this exception as
1605 * its cause.
1606 *
1607 * @param fn the function to use to compute the value of
1608 * the returned CompletableFuture
1609 * @param executor the executor to use for asynchronous execution
1610 * @return the new CompletableFuture
1611 */
1612 public <U> CompletableFuture<U> thenApplyAsync
1613 (Function<? super T,? extends U> fn,
1614 Executor executor) {
1615 if (executor == null) throw new NullPointerException();
1616 return doThenApply(fn, executor);
1617 }
1618
1619 private <U> CompletableFuture<U> doThenApply
1620 (Function<? super T,? extends U> fn,
1621 Executor e) {
1622 if (fn == null) throw new NullPointerException();
1623 CompletableFuture<U> dst = new CompletableFuture<U>();
1624 ApplyCompletion<T,U> d = null;
1625 Object r;
1626 if ((r = result) == null) {
1627 CompletionNode p = new CompletionNode
1628 (d = new ApplyCompletion<T,U>(this, fn, dst, e));
1629 while ((r = result) == null) {
1630 if (UNSAFE.compareAndSwapObject
1631 (this, COMPLETIONS, p.next = completions, p))
1632 break;
1633 }
1634 }
1635 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1636 T t; Throwable ex;
1637 if (r instanceof AltResult) {
1638 ex = ((AltResult)r).ex;
1639 t = null;
1640 }
1641 else {
1642 ex = null;
1643 @SuppressWarnings("unchecked") T tr = (T) r;
1644 t = tr;
1645 }
1646 U u = null;
1647 if (ex == null) {
1648 try {
1649 if (e != null)
1650 e.execute(new AsyncApply<T,U>(t, fn, dst));
1651 else
1652 u = fn.apply(t);
1653 } catch (Throwable rex) {
1654 ex = rex;
1655 }
1656 }
1657 if (e == null || ex != null)
1658 dst.internalComplete(u, ex);
1659 }
1660 helpPostComplete();
1661 return dst;
1662 }
1663
1664 /**
1665 * Creates and returns a CompletableFuture that is completed after
1666 * performing the given action with this CompletableFuture's
1667 * result when it completes. If this CompletableFuture
1668 * completes exceptionally, then the returned CompletableFuture
1669 * also does so, with a CompletionException holding this exception as
1670 * its cause.
1671 *
1672 * @param block the action to perform before completing the
1673 * returned CompletableFuture
1674 * @return the new CompletableFuture
1675 */
1676 public CompletableFuture<Void> thenAccept(Consumer<? super T> block) {
1677 return doThenAccept(block, null);
1678 }
1679
1680 /**
1681 * Creates and returns a CompletableFuture that is asynchronously
1682 * completed using the {@link ForkJoinPool#commonPool()} with this
1683 * CompletableFuture's result when it completes. If this
1684 * CompletableFuture completes exceptionally, then the returned
1685 * CompletableFuture also does so, with a CompletionException holding
1686 * this exception as its cause.
1687 *
1688 * @param block the action to perform before completing the
1689 * returned CompletableFuture
1690 * @return the new CompletableFuture
1691 */
1692 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block) {
1693 return doThenAccept(block, ForkJoinPool.commonPool());
1694 }
1695
1696 /**
1697 * Creates and returns a CompletableFuture that is asynchronously
1698 * completed using the given executor with this
1699 * CompletableFuture's result when it completes. If this
1700 * CompletableFuture completes exceptionally, then the returned
1701 * CompletableFuture also does so, with a CompletionException holding
1702 * this exception as its cause.
1703 *
1704 * @param block the action to perform before completing the
1705 * returned CompletableFuture
1706 * @param executor the executor to use for asynchronous execution
1707 * @return the new CompletableFuture
1708 */
1709 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block,
1710 Executor executor) {
1711 if (executor == null) throw new NullPointerException();
1712 return doThenAccept(block, executor);
1713 }
1714
1715 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1716 Executor e) {
1717 if (fn == null) throw new NullPointerException();
1718 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1719 AcceptCompletion<T> d = null;
1720 Object r;
1721 if ((r = result) == null) {
1722 CompletionNode p = new CompletionNode
1723 (d = new AcceptCompletion<T>(this, fn, dst, e));
1724 while ((r = result) == null) {
1725 if (UNSAFE.compareAndSwapObject
1726 (this, COMPLETIONS, p.next = completions, p))
1727 break;
1728 }
1729 }
1730 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1731 T t; Throwable ex;
1732 if (r instanceof AltResult) {
1733 ex = ((AltResult)r).ex;
1734 t = null;
1735 }
1736 else {
1737 ex = null;
1738 @SuppressWarnings("unchecked") T tr = (T) r;
1739 t = tr;
1740 }
1741 if (ex == null) {
1742 try {
1743 if (e != null)
1744 e.execute(new AsyncAccept<T>(t, fn, dst));
1745 else
1746 fn.accept(t);
1747 } catch (Throwable rex) {
1748 ex = rex;
1749 }
1750 }
1751 if (e == null || ex != null)
1752 dst.internalComplete(null, ex);
1753 }
1754 helpPostComplete();
1755 return dst;
1756 }
1757
1758 /**
1759 * Creates and returns a CompletableFuture that is completed after
1760 * performing the given action when this CompletableFuture
1761 * completes. If this CompletableFuture completes exceptionally,
1762 * then the returned CompletableFuture also does so, with a
1763 * CompletionException holding this exception as its cause.
1764 *
1765 * @param action the action to perform before completing the
1766 * returned CompletableFuture
1767 * @return the new CompletableFuture
1768 */
1769 public CompletableFuture<Void> thenRun(Runnable action) {
1770 return doThenRun(action, null);
1771 }
1772
1773 /**
1774 * Creates and returns a CompletableFuture that is asynchronously
1775 * completed using the {@link ForkJoinPool#commonPool()} after
1776 * performing the given action when this CompletableFuture
1777 * completes. If this CompletableFuture completes exceptionally,
1778 * then the returned CompletableFuture also does so, with a
1779 * CompletionException holding this exception as its cause.
1780 *
1781 * @param action the action to perform before completing the
1782 * returned CompletableFuture
1783 * @return the new CompletableFuture
1784 */
1785 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1786 return doThenRun(action, ForkJoinPool.commonPool());
1787 }
1788
1789 /**
1790 * Creates and returns a CompletableFuture that is asynchronously
1791 * completed using the given executor after performing the given
1792 * action when this CompletableFuture completes. If this
1793 * CompletableFuture completes exceptionally, then the returned
1794 * CompletableFuture also does so, with a CompletionException holding
1795 * this exception as its cause.
1796 *
1797 * @param action the action to perform before completing the
1798 * returned CompletableFuture
1799 * @param executor the executor to use for asynchronous execution
1800 * @return the new CompletableFuture
1801 */
1802 public CompletableFuture<Void> thenRunAsync(Runnable action,
1803 Executor executor) {
1804 if (executor == null) throw new NullPointerException();
1805 return doThenRun(action, executor);
1806 }
1807
1808 private CompletableFuture<Void> doThenRun(Runnable action,
1809 Executor e) {
1810 if (action == null) throw new NullPointerException();
1811 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1812 RunCompletion<T> d = null;
1813 Object r;
1814 if ((r = result) == null) {
1815 CompletionNode p = new CompletionNode
1816 (d = new RunCompletion<T>(this, action, dst, e));
1817 while ((r = result) == null) {
1818 if (UNSAFE.compareAndSwapObject
1819 (this, COMPLETIONS, p.next = completions, p))
1820 break;
1821 }
1822 }
1823 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1824 Throwable ex;
1825 if (r instanceof AltResult)
1826 ex = ((AltResult)r).ex;
1827 else
1828 ex = null;
1829 if (ex == null) {
1830 try {
1831 if (e != null)
1832 e.execute(new AsyncRun(action, dst));
1833 else
1834 action.run();
1835 } catch (Throwable rex) {
1836 ex = rex;
1837 }
1838 }
1839 if (e == null || ex != null)
1840 dst.internalComplete(null, ex);
1841 }
1842 helpPostComplete();
1843 return dst;
1844 }
1845
1846 /**
1847 * Creates and returns a CompletableFuture that is completed with
1848 * the result of the given function of this and the other given
1849 * CompletableFuture's results when both complete. If this or
1850 * the other CompletableFuture complete exceptionally, then the
1851 * returned CompletableFuture also does so, with a
1852 * CompletionException holding the exception as its cause.
1853 *
1854 * @param other the other CompletableFuture
1855 * @param fn the function to use to compute the value of
1856 * the returned CompletableFuture
1857 * @return the new CompletableFuture
1858 */
1859 public <U,V> CompletableFuture<V> thenCombine
1860 (CompletableFuture<? extends U> other,
1861 BiFunction<? super T,? super U,? extends V> fn) {
1862 return doThenBiApply(other, fn, null);
1863 }
1864
1865 /**
1866 * Creates and returns a CompletableFuture that is asynchronously
1867 * completed using the {@link ForkJoinPool#commonPool()} with
1868 * the result of the given function of this and the other given
1869 * CompletableFuture's results when both complete. If this or
1870 * the other CompletableFuture complete exceptionally, then the
1871 * returned CompletableFuture also does so, with a
1872 * CompletionException holding the exception as its cause.
1873 *
1874 * @param other the other CompletableFuture
1875 * @param fn the function to use to compute the value of
1876 * the returned CompletableFuture
1877 * @return the new CompletableFuture
1878 */
1879 public <U,V> CompletableFuture<V> thenCombineAsync
1880 (CompletableFuture<? extends U> other,
1881 BiFunction<? super T,? super U,? extends V> fn) {
1882 return doThenBiApply(other, fn, ForkJoinPool.commonPool());
1883 }
1884
1885 /**
1886 * Creates and returns a CompletableFuture that is
1887 * asynchronously completed using the given executor with the
1888 * result of the given function of this and the other given
1889 * CompletableFuture's results when both complete. If this or
1890 * the other CompletableFuture complete exceptionally, then the
1891 * returned CompletableFuture also does so, with a
1892 * CompletionException holding the exception as its cause.
1893 *
1894 * @param other the other CompletableFuture
1895 * @param fn the function to use to compute the value of
1896 * the returned CompletableFuture
1897 * @param executor the executor to use for asynchronous execution
1898 * @return the new CompletableFuture
1899 */
1900 public <U,V> CompletableFuture<V> thenCombineAsync
1901 (CompletableFuture<? extends U> other,
1902 BiFunction<? super T,? super U,? extends V> fn,
1903 Executor executor) {
1904 if (executor == null) throw new NullPointerException();
1905 return doThenBiApply(other, fn, executor);
1906 }
1907
1908 private <U,V> CompletableFuture<V> doThenBiApply
1909 (CompletableFuture<? extends U> other,
1910 BiFunction<? super T,? super U,? extends V> fn,
1911 Executor e) {
1912 if (other == null || fn == null) throw new NullPointerException();
1913 CompletableFuture<V> dst = new CompletableFuture<V>();
1914 BiApplyCompletion<T,U,V> d = null;
1915 Object r, s = null;
1916 if ((r = result) == null || (s = other.result) == null) {
1917 d = new BiApplyCompletion<T,U,V>(this, other, fn, dst, e);
1918 CompletionNode q = null, p = new CompletionNode(d);
1919 while ((r == null && (r = result) == null) ||
1920 (s == null && (s = other.result) == null)) {
1921 if (q != null) {
1922 if (s != null ||
1923 UNSAFE.compareAndSwapObject
1924 (other, COMPLETIONS, q.next = other.completions, q))
1925 break;
1926 }
1927 else if (r != null ||
1928 UNSAFE.compareAndSwapObject
1929 (this, COMPLETIONS, p.next = completions, p)) {
1930 if (s != null)
1931 break;
1932 q = new CompletionNode(d);
1933 }
1934 }
1935 }
1936 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1937 T t; U u; Throwable ex;
1938 if (r instanceof AltResult) {
1939 ex = ((AltResult)r).ex;
1940 t = null;
1941 }
1942 else {
1943 ex = null;
1944 @SuppressWarnings("unchecked") T tr = (T) r;
1945 t = tr;
1946 }
1947 if (ex != null)
1948 u = null;
1949 else if (s instanceof AltResult) {
1950 ex = ((AltResult)s).ex;
1951 u = null;
1952 }
1953 else {
1954 @SuppressWarnings("unchecked") U us = (U) s;
1955 u = us;
1956 }
1957 V v = null;
1958 if (ex == null) {
1959 try {
1960 if (e != null)
1961 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
1962 else
1963 v = fn.apply(t, u);
1964 } catch (Throwable rex) {
1965 ex = rex;
1966 }
1967 }
1968 if (e == null || ex != null)
1969 dst.internalComplete(v, ex);
1970 }
1971 helpPostComplete();
1972 other.helpPostComplete();
1973 return dst;
1974 }
1975
1976 /**
1977 * Creates and returns a CompletableFuture that is completed with
1978 * the results of this and the other given CompletableFuture if
1979 * both complete. If this and/or the other CompletableFuture
1980 * complete exceptionally, then the returned CompletableFuture
1981 * also does so, with a CompletionException holding one of these
1982 * exceptions as its cause.
1983 *
1984 * @param other the other CompletableFuture
1985 * @param block the action to perform before completing the
1986 * returned CompletableFuture
1987 * @return the new CompletableFuture
1988 */
1989 public <U> CompletableFuture<Void> thenAcceptBoth
1990 (CompletableFuture<? extends U> other,
1991 BiConsumer<? super T, ? super U> block) {
1992 return doThenBiAccept(other, block, null);
1993 }
1994
1995 /**
1996 * Creates and returns a CompletableFuture that is completed
1997 * asynchronously using the {@link ForkJoinPool#commonPool()} with
1998 * the results of this and the other given CompletableFuture when
1999 * both complete. If this and/or the other CompletableFuture
2000 * complete exceptionally, then the returned CompletableFuture
2001 * also does so, with a CompletionException holding one of these
2002 * exceptions as its cause.
2003 *
2004 * @param other the other CompletableFuture
2005 * @param block the action to perform before completing the
2006 * returned CompletableFuture
2007 * @return the new CompletableFuture
2008 */
2009 public <U> CompletableFuture<Void> thenAcceptBothAsync
2010 (CompletableFuture<? extends U> other,
2011 BiConsumer<? super T, ? super U> block) {
2012 return doThenBiAccept(other, block, ForkJoinPool.commonPool());
2013 }
2014
2015 /**
2016 * Creates and returns a CompletableFuture that is completed
2017 * asynchronously using the given executor with the results of
2018 * this and the other given CompletableFuture when both complete.
2019 * If this and/or the other CompletableFuture complete exceptionally,
2020 * then the returned CompletableFuture also does so, with a
2021 * CompletionException holding one of these exceptions as its cause.
2022 *
2023 * @param other the other CompletableFuture
2024 * @param block the action to perform before completing the
2025 * returned CompletableFuture
2026 * @param executor the executor to use for asynchronous execution
2027 * @return the new CompletableFuture
2028 */
2029 public <U> CompletableFuture<Void> thenAcceptBothAsync
2030 (CompletableFuture<? extends U> other,
2031 BiConsumer<? super T, ? super U> block,
2032 Executor executor) {
2033 if (executor == null) throw new NullPointerException();
2034 return doThenBiAccept(other, block, executor);
2035 }
2036
2037 private <U> CompletableFuture<Void> doThenBiAccept
2038 (CompletableFuture<? extends U> other,
2039 BiConsumer<? super T,? super U> fn,
2040 Executor e) {
2041 if (other == null || fn == null) throw new NullPointerException();
2042 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2043 BiAcceptCompletion<T,U> d = null;
2044 Object r, s = null;
2045 if ((r = result) == null || (s = other.result) == null) {
2046 d = new BiAcceptCompletion<T,U>(this, other, fn, dst, e);
2047 CompletionNode q = null, p = new CompletionNode(d);
2048 while ((r == null && (r = result) == null) ||
2049 (s == null && (s = other.result) == null)) {
2050 if (q != null) {
2051 if (s != null ||
2052 UNSAFE.compareAndSwapObject
2053 (other, COMPLETIONS, q.next = other.completions, q))
2054 break;
2055 }
2056 else if (r != null ||
2057 UNSAFE.compareAndSwapObject
2058 (this, COMPLETIONS, p.next = completions, p)) {
2059 if (s != null)
2060 break;
2061 q = new CompletionNode(d);
2062 }
2063 }
2064 }
2065 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2066 T t; U u; Throwable ex;
2067 if (r instanceof AltResult) {
2068 ex = ((AltResult)r).ex;
2069 t = null;
2070 }
2071 else {
2072 ex = null;
2073 @SuppressWarnings("unchecked") T tr = (T) r;
2074 t = tr;
2075 }
2076 if (ex != null)
2077 u = null;
2078 else if (s instanceof AltResult) {
2079 ex = ((AltResult)s).ex;
2080 u = null;
2081 }
2082 else {
2083 @SuppressWarnings("unchecked") U us = (U) s;
2084 u = us;
2085 }
2086 if (ex == null) {
2087 try {
2088 if (e != null)
2089 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
2090 else
2091 fn.accept(t, u);
2092 } catch (Throwable rex) {
2093 ex = rex;
2094 }
2095 }
2096 if (e == null || ex != null)
2097 dst.internalComplete(null, ex);
2098 }
2099 helpPostComplete();
2100 other.helpPostComplete();
2101 return dst;
2102 }
2103
2104 /**
2105 * Creates and returns a CompletableFuture that is completed when
2106 * this and the other given CompletableFuture both complete.
2107 * If this and/or the other CompletableFuture complete exceptionally,
2108 * then the returned CompletableFuture also does so, with a
2109 * CompletionException holding one of these exceptions as its cause.
2110 *
2111 * @param other the other CompletableFuture
2112 * @param action the action to perform before completing the
2113 * returned CompletableFuture
2114 * @return the new CompletableFuture
2115 */
2116 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2117 Runnable action) {
2118 return doThenBiRun(other, action, null);
2119 }
2120
2121 /**
2122 * Creates and returns a CompletableFuture that is completed
2123 * asynchronously using the {@link ForkJoinPool#commonPool()}
2124 * when this and the other given CompletableFuture both complete.
2125 * If this and/or the other CompletableFuture complete exceptionally,
2126 * then the returned CompletableFuture also does so, with a
2127 * CompletionException holding one of these exceptions as its cause.
2128 *
2129 * @param other the other CompletableFuture
2130 * @param action the action to perform before completing the
2131 * returned CompletableFuture
2132 * @return the new CompletableFuture
2133 */
2134 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2135 Runnable action) {
2136 return doThenBiRun(other, action, ForkJoinPool.commonPool());
2137 }
2138
2139 /**
2140 * Creates and returns a CompletableFuture that is completed
2141 * asynchronously using the given executor when this and the
2142 * other given CompletableFuture both complete.
2143 * If this and/or the other CompletableFuture complete exceptionally,
2144 * then the returned CompletableFuture also does so, with a
2145 * CompletionException holding one of these exceptions as its cause.
2146 *
2147 * @param other the other CompletableFuture
2148 * @param action the action to perform before completing the
2149 * returned CompletableFuture
2150 * @param executor the executor to use for asynchronous execution
2151 * @return the new CompletableFuture
2152 */
2153 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2154 Runnable action,
2155 Executor executor) {
2156 if (executor == null) throw new NullPointerException();
2157 return doThenBiRun(other, action, executor);
2158 }
2159
2160 private CompletableFuture<Void> doThenBiRun(CompletableFuture<?> other,
2161 Runnable action,
2162 Executor e) {
2163 if (other == null || action == null) throw new NullPointerException();
2164 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2165 BiRunCompletion<T> d = null;
2166 Object r, s = null;
2167 if ((r = result) == null || (s = other.result) == null) {
2168 d = new BiRunCompletion<T>(this, other, action, dst, e);
2169 CompletionNode q = null, p = new CompletionNode(d);
2170 while ((r == null && (r = result) == null) ||
2171 (s == null && (s = other.result) == null)) {
2172 if (q != null) {
2173 if (s != null ||
2174 UNSAFE.compareAndSwapObject
2175 (other, COMPLETIONS, q.next = other.completions, q))
2176 break;
2177 }
2178 else if (r != null ||
2179 UNSAFE.compareAndSwapObject
2180 (this, COMPLETIONS, p.next = completions, p)) {
2181 if (s != null)
2182 break;
2183 q = new CompletionNode(d);
2184 }
2185 }
2186 }
2187 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2188 Throwable ex;
2189 if (r instanceof AltResult)
2190 ex = ((AltResult)r).ex;
2191 else
2192 ex = null;
2193 if (ex == null && (s instanceof AltResult))
2194 ex = ((AltResult)s).ex;
2195 if (ex == null) {
2196 try {
2197 if (e != null)
2198 e.execute(new AsyncRun(action, dst));
2199 else
2200 action.run();
2201 } catch (Throwable rex) {
2202 ex = rex;
2203 }
2204 }
2205 if (e == null || ex != null)
2206 dst.internalComplete(null, ex);
2207 }
2208 helpPostComplete();
2209 other.helpPostComplete();
2210 return dst;
2211 }
2212
2213 /**
2214 * Creates and returns a CompletableFuture that is completed with
2215 * the result of the given function of either this or the other
2216 * given CompletableFuture's results when either complete.
2217 * If this and/or the other CompletableFuture complete exceptionally,
2218 * then the returned CompletableFuture may also do so, with a
2219 * CompletionException holding one of these exceptions as its cause.
2220 * No guarantees are made about which result or exception is used
2221 * in the returned CompletableFuture.
2222 *
2223 * @param other the other CompletableFuture
2224 * @param fn the function to use to compute the value of
2225 * the returned CompletableFuture
2226 * @return the new CompletableFuture
2227 */
2228 public <U> CompletableFuture<U> applyToEither
2229 (CompletableFuture<? extends T> other,
2230 Function<? super T, U> fn) {
2231 return doOrApply(other, fn, null);
2232 }
2233
2234 /**
2235 * Creates and returns a CompletableFuture that is completed
2236 * asynchronously using the {@link ForkJoinPool#commonPool()} with
2237 * the result of the given function of either this or the other
2238 * given CompletableFuture's results when either complete.
2239 * If this and/or the other CompletableFuture complete exceptionally,
2240 * then the returned CompletableFuture may also do so, with a
2241 * CompletionException holding one of these exceptions as its cause.
2242 * No guarantees are made about which result or exception is used
2243 * in the returned CompletableFuture.
2244 *
2245 * @param other the other CompletableFuture
2246 * @param fn the function to use to compute the value of
2247 * the returned CompletableFuture
2248 * @return the new CompletableFuture
2249 */
2250 public <U> CompletableFuture<U> applyToEitherAsync
2251 (CompletableFuture<? extends T> other,
2252 Function<? super T, U> fn) {
2253 return doOrApply(other, fn, ForkJoinPool.commonPool());
2254 }
2255
2256 /**
2257 * Creates and returns a CompletableFuture that is completed
2258 * asynchronously using the given executor with the result of the
2259 * given function of either this or the other given
2260 * CompletableFuture's results when either complete. If this
2261 * and/or the other CompletableFuture complete exceptionally, then
2262 * the returned CompletableFuture may also do so, with a
2263 * CompletionException holding one of these exceptions as its cause.
2264 * No guarantees are made about which result or exception is used
2265 * in the returned CompletableFuture.
2266 *
2267 * @param other the other CompletableFuture
2268 * @param fn the function to use to compute the value of
2269 * the returned CompletableFuture
2270 * @param executor the executor to use for asynchronous execution
2271 * @return the new CompletableFuture
2272 */
2273 public <U> CompletableFuture<U> applyToEitherAsync
2274 (CompletableFuture<? extends T> other,
2275 Function<? super T, U> fn,
2276 Executor executor) {
2277 if (executor == null) throw new NullPointerException();
2278 return doOrApply(other, fn, executor);
2279 }
2280
2281 private <U> CompletableFuture<U> doOrApply
2282 (CompletableFuture<? extends T> other,
2283 Function<? super T, U> fn,
2284 Executor e) {
2285 if (other == null || fn == null) throw new NullPointerException();
2286 CompletableFuture<U> dst = new CompletableFuture<U>();
2287 OrApplyCompletion<T,U> d = null;
2288 Object r;
2289 if ((r = result) == null && (r = other.result) == null) {
2290 d = new OrApplyCompletion<T,U>(this, other, fn, dst, e);
2291 CompletionNode q = null, p = new CompletionNode(d);
2292 while ((r = result) == null && (r = other.result) == null) {
2293 if (q != null) {
2294 if (UNSAFE.compareAndSwapObject
2295 (other, COMPLETIONS, q.next = other.completions, q))
2296 break;
2297 }
2298 else if (UNSAFE.compareAndSwapObject
2299 (this, COMPLETIONS, p.next = completions, p))
2300 q = new CompletionNode(d);
2301 }
2302 }
2303 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2304 T t; Throwable ex;
2305 if (r instanceof AltResult) {
2306 ex = ((AltResult)r).ex;
2307 t = null;
2308 }
2309 else {
2310 ex = null;
2311 @SuppressWarnings("unchecked") T tr = (T) r;
2312 t = tr;
2313 }
2314 U u = null;
2315 if (ex == null) {
2316 try {
2317 if (e != null)
2318 e.execute(new AsyncApply<T,U>(t, fn, dst));
2319 else
2320 u = fn.apply(t);
2321 } catch (Throwable rex) {
2322 ex = rex;
2323 }
2324 }
2325 if (e == null || ex != null)
2326 dst.internalComplete(u, ex);
2327 }
2328 helpPostComplete();
2329 other.helpPostComplete();
2330 return dst;
2331 }
2332
2333 /**
2334 * Creates and returns a CompletableFuture that is completed after
2335 * performing the given action with the result of either this or the
2336 * other given CompletableFuture's result, when either complete.
2337 * If this and/or the other CompletableFuture complete exceptionally,
2338 * then the returned CompletableFuture may also do so, with a
2339 * CompletionException holding one of these exceptions as its cause.
2340 * No guarantees are made about which exception is used in the
2341 * returned CompletableFuture.
2342 *
2343 * @param other the other CompletableFuture
2344 * @param block the action to perform before completing the
2345 * returned CompletableFuture
2346 * @return the new CompletableFuture
2347 */
2348 public CompletableFuture<Void> acceptEither
2349 (CompletableFuture<? extends T> other,
2350 Consumer<? super T> block) {
2351 return doOrAccept(other, block, null);
2352 }
2353
2354 /**
2355 * Creates and returns a CompletableFuture that is completed
2356 * asynchronously using the {@link ForkJoinPool#commonPool()},
2357 * performing the given action with the result of either this or
2358 * the other given CompletableFuture's result, when either complete.
2359 * If this and/or the other CompletableFuture complete exceptionally,
2360 * then the returned CompletableFuture may also do so, with a
2361 * CompletionException holding one of these exceptions as its cause.
2362 * No guarantees are made about which exception is used in the
2363 * returned CompletableFuture.
2364 *
2365 * @param other the other CompletableFuture
2366 * @param block the action to perform before completing the
2367 * returned CompletableFuture
2368 * @return the new CompletableFuture
2369 */
2370 public CompletableFuture<Void> acceptEitherAsync
2371 (CompletableFuture<? extends T> other,
2372 Consumer<? super T> block) {
2373 return doOrAccept(other, block, ForkJoinPool.commonPool());
2374 }
2375
2376 /**
2377 * Creates and returns a CompletableFuture that is completed
2378 * asynchronously using the given executor, performing the given
2379 * action with the result of either this or the other given
2380 * CompletableFuture's result, when either complete.
2381 * If this and/or the other CompletableFuture complete exceptionally,
2382 * then the returned CompletableFuture may also do so, with a
2383 * CompletionException holding one of these exceptions as its cause.
2384 * No guarantees are made about which exception is used in the
2385 * returned CompletableFuture.
2386 *
2387 * @param other the other CompletableFuture
2388 * @param block the action to perform before completing the
2389 * returned CompletableFuture
2390 * @param executor the executor to use for asynchronous execution
2391 * @return the new CompletableFuture
2392 */
2393 public CompletableFuture<Void> acceptEitherAsync
2394 (CompletableFuture<? extends T> other,
2395 Consumer<? super T> block,
2396 Executor executor) {
2397 if (executor == null) throw new NullPointerException();
2398 return doOrAccept(other, block, executor);
2399 }
2400
2401 private CompletableFuture<Void> doOrAccept
2402 (CompletableFuture<? extends T> other,
2403 Consumer<? super T> fn,
2404 Executor e) {
2405 if (other == null || fn == null) throw new NullPointerException();
2406 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2407 OrAcceptCompletion<T> d = null;
2408 Object r;
2409 if ((r = result) == null && (r = other.result) == null) {
2410 d = new OrAcceptCompletion<T>(this, other, fn, dst, e);
2411 CompletionNode q = null, p = new CompletionNode(d);
2412 while ((r = result) == null && (r = other.result) == null) {
2413 if (q != null) {
2414 if (UNSAFE.compareAndSwapObject
2415 (other, COMPLETIONS, q.next = other.completions, q))
2416 break;
2417 }
2418 else if (UNSAFE.compareAndSwapObject
2419 (this, COMPLETIONS, p.next = completions, p))
2420 q = new CompletionNode(d);
2421 }
2422 }
2423 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2424 T t; Throwable ex;
2425 if (r instanceof AltResult) {
2426 ex = ((AltResult)r).ex;
2427 t = null;
2428 }
2429 else {
2430 ex = null;
2431 @SuppressWarnings("unchecked") T tr = (T) r;
2432 t = tr;
2433 }
2434 if (ex == null) {
2435 try {
2436 if (e != null)
2437 e.execute(new AsyncAccept<T>(t, fn, dst));
2438 else
2439 fn.accept(t);
2440 } catch (Throwable rex) {
2441 ex = rex;
2442 }
2443 }
2444 if (e == null || ex != null)
2445 dst.internalComplete(null, ex);
2446 }
2447 helpPostComplete();
2448 other.helpPostComplete();
2449 return dst;
2450 }
2451
2452 /**
2453 * Creates and returns a CompletableFuture that is completed
2454 * after this or the other given CompletableFuture complete.
2455 * If this and/or the other CompletableFuture complete exceptionally,
2456 * then the returned CompletableFuture may also do so, with a
2457 * CompletionException holding one of these exceptions as its cause.
2458 * No guarantees are made about which exception is used in the
2459 * returned CompletableFuture.
2460 *
2461 * @param other the other CompletableFuture
2462 * @param action the action to perform before completing the
2463 * returned CompletableFuture
2464 * @return the new CompletableFuture
2465 */
2466 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2467 Runnable action) {
2468 return doOrRun(other, action, null);
2469 }
2470
2471 /**
2472 * Creates and returns a CompletableFuture that is completed
2473 * asynchronously using the {@link ForkJoinPool#commonPool()}
2474 * after this or the other given CompletableFuture complete.
2475 * If this and/or the other CompletableFuture complete exceptionally,
2476 * then the returned CompletableFuture may also do so, with a
2477 * CompletionException holding one of these exceptions as its cause.
2478 * No guarantees are made about which exception is used in the
2479 * returned CompletableFuture.
2480 *
2481 * @param other the other CompletableFuture
2482 * @param action the action to perform before completing the
2483 * returned CompletableFuture
2484 * @return the new CompletableFuture
2485 */
2486 public CompletableFuture<Void> runAfterEitherAsync
2487 (CompletableFuture<?> other,
2488 Runnable action) {
2489 return doOrRun(other, action, ForkJoinPool.commonPool());
2490 }
2491
2492 /**
2493 * Creates and returns a CompletableFuture that is completed
2494 * asynchronously using the given executor after this or the other
2495 * given CompletableFuture complete.
2496 * If this and/or the other CompletableFuture complete exceptionally,
2497 * then the returned CompletableFuture may also do so, with a
2498 * CompletionException holding one of these exceptions as its cause.
2499 * No guarantees are made about which exception is used in the
2500 * returned CompletableFuture.
2501 *
2502 * @param other the other CompletableFuture
2503 * @param action the action to perform before completing the
2504 * returned CompletableFuture
2505 * @param executor the executor to use for asynchronous execution
2506 * @return the new CompletableFuture
2507 */
2508 public CompletableFuture<Void> runAfterEitherAsync
2509 (CompletableFuture<?> other,
2510 Runnable action,
2511 Executor executor) {
2512 if (executor == null) throw new NullPointerException();
2513 return doOrRun(other, action, executor);
2514 }
2515
2516 private CompletableFuture<Void> doOrRun(CompletableFuture<?> other,
2517 Runnable action,
2518 Executor e) {
2519 if (other == null || action == null) throw new NullPointerException();
2520 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2521 OrRunCompletion<T> d = null;
2522 Object r;
2523 if ((r = result) == null && (r = other.result) == null) {
2524 d = new OrRunCompletion<T>(this, other, action, dst, e);
2525 CompletionNode q = null, p = new CompletionNode(d);
2526 while ((r = result) == null && (r = other.result) == null) {
2527 if (q != null) {
2528 if (UNSAFE.compareAndSwapObject
2529 (other, COMPLETIONS, q.next = other.completions, q))
2530 break;
2531 }
2532 else if (UNSAFE.compareAndSwapObject
2533 (this, COMPLETIONS, p.next = completions, p))
2534 q = new CompletionNode(d);
2535 }
2536 }
2537 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2538 Throwable ex;
2539 if (r instanceof AltResult)
2540 ex = ((AltResult)r).ex;
2541 else
2542 ex = null;
2543 if (ex == null) {
2544 try {
2545 if (e != null)
2546 e.execute(new AsyncRun(action, dst));
2547 else
2548 action.run();
2549 } catch (Throwable rex) {
2550 ex = rex;
2551 }
2552 }
2553 if (e == null || ex != null)
2554 dst.internalComplete(null, ex);
2555 }
2556 helpPostComplete();
2557 other.helpPostComplete();
2558 return dst;
2559 }
2560
2561 /**
2562 * Returns a CompletableFuture (or an equivalent one) produced by
2563 * the given function of the result of this CompletableFuture when
2564 * completed. If this CompletableFuture completes exceptionally,
2565 * then the returned CompletableFuture also does so, with a
2566 * CompletionException holding this exception as its cause.
2567 *
2568 * @param fn the function returning a new CompletableFuture
2569 * @return the CompletableFuture, that {@code isDone()} upon
2570 * return if completed by the given function, or an exception
2571 * occurs
2572 */
2573 public <U> CompletableFuture<U> thenCompose
2574 (Function<? super T, CompletableFuture<U>> fn) {
2575 return doCompose(fn, null);
2576 }
2577
2578 /**
2579 * Returns a CompletableFuture (or an equivalent one) produced
2580 * asynchronously using the {@link ForkJoinPool#commonPool()} by
2581 * the given function of the result of this CompletableFuture when
2582 * completed. If this CompletableFuture completes exceptionally,
2583 * then the returned CompletableFuture also does so, with a
2584 * CompletionException holding this exception as its cause.
2585 *
2586 * @param fn the function returning a new CompletableFuture
2587 * @return the CompletableFuture, that {@code isDone()} upon
2588 * return if completed by the given function, or an exception
2589 * occurs
2590 */
2591 public <U> CompletableFuture<U> thenComposeAsync
2592 (Function<? super T, CompletableFuture<U>> fn) {
2593 return doCompose(fn, ForkJoinPool.commonPool());
2594 }
2595
2596 /**
2597 * Returns a CompletableFuture (or an equivalent one) produced
2598 * asynchronously using the given executor by the given function
2599 * of the result of this CompletableFuture when completed.
2600 * If this CompletableFuture completes exceptionally, then the
2601 * returned CompletableFuture also does so, with a
2602 * CompletionException holding this exception as its cause.
2603 *
2604 * @param fn the function returning a new CompletableFuture
2605 * @param executor the executor to use for asynchronous execution
2606 * @return the CompletableFuture, that {@code isDone()} upon
2607 * return if completed by the given function, or an exception
2608 * occurs
2609 */
2610 public <U> CompletableFuture<U> thenComposeAsync
2611 (Function<? super T, CompletableFuture<U>> fn,
2612 Executor executor) {
2613 if (executor == null) throw new NullPointerException();
2614 return doCompose(fn, executor);
2615 }
2616
2617 private <U> CompletableFuture<U> doCompose
2618 (Function<? super T, CompletableFuture<U>> fn,
2619 Executor e) {
2620 if (fn == null) throw new NullPointerException();
2621 CompletableFuture<U> dst = null;
2622 ComposeCompletion<T,U> d = null;
2623 Object r;
2624 if ((r = result) == null) {
2625 dst = new CompletableFuture<U>();
2626 CompletionNode p = new CompletionNode
2627 (d = new ComposeCompletion<T,U>(this, fn, dst, e));
2628 while ((r = result) == null) {
2629 if (UNSAFE.compareAndSwapObject
2630 (this, COMPLETIONS, p.next = completions, p))
2631 break;
2632 }
2633 }
2634 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2635 T t; Throwable ex;
2636 if (r instanceof AltResult) {
2637 ex = ((AltResult)r).ex;
2638 t = null;
2639 }
2640 else {
2641 ex = null;
2642 @SuppressWarnings("unchecked") T tr = (T) r;
2643 t = tr;
2644 }
2645 if (ex == null) {
2646 if (e != null) {
2647 if (dst == null)
2648 dst = new CompletableFuture<U>();
2649 e.execute(new AsyncCompose<T,U>(t, fn, dst));
2650 }
2651 else {
2652 try {
2653 dst = fn.apply(t);
2654 } catch (Throwable rex) {
2655 ex = rex;
2656 }
2657 if (dst == null) {
2658 dst = new CompletableFuture<U>();
2659 if (ex == null)
2660 ex = new NullPointerException();
2661 }
2662 }
2663 }
2664 if (e == null && ex != null)
2665 dst.internalComplete(null, ex);
2666 }
2667 helpPostComplete();
2668 dst.helpPostComplete();
2669 return dst;
2670 }
2671
2672 /**
2673 * Creates and returns a CompletableFuture that is completed with
2674 * the result of the given function of the exception triggering
2675 * this CompletableFuture's completion when it completes
2676 * exceptionally; Otherwise, if this CompletableFuture completes
2677 * normally, then the returned CompletableFuture also completes
2678 * normally with the same value.
2679 *
2680 * @param fn the function to use to compute the value of the
2681 * returned CompletableFuture if this CompletableFuture completed
2682 * exceptionally
2683 * @return the new CompletableFuture
2684 */
2685 public CompletableFuture<T> exceptionally
2686 (Function<Throwable, ? extends T> fn) {
2687 if (fn == null) throw new NullPointerException();
2688 CompletableFuture<T> dst = new CompletableFuture<T>();
2689 ExceptionCompletion<T> d = null;
2690 Object r;
2691 if ((r = result) == null) {
2692 CompletionNode p =
2693 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2694 while ((r = result) == null) {
2695 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2696 p.next = completions, p))
2697 break;
2698 }
2699 }
2700 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2701 T t = null; Throwable ex, dx = null;
2702 if (r instanceof AltResult) {
2703 if ((ex = ((AltResult)r).ex) != null) {
2704 try {
2705 t = fn.apply(ex);
2706 } catch (Throwable rex) {
2707 dx = rex;
2708 }
2709 }
2710 }
2711 else {
2712 @SuppressWarnings("unchecked") T tr = (T) r;
2713 t = tr;
2714 }
2715 dst.internalComplete(t, dx);
2716 }
2717 helpPostComplete();
2718 return dst;
2719 }
2720
2721 /**
2722 * Creates and returns a CompletableFuture that is completed with
2723 * the result of the given function of the result and exception of
2724 * this CompletableFuture's completion when it completes. The
2725 * given function is invoked with the result (or {@code null} if
2726 * none) and the exception (or {@code null} if none) of this
2727 * CompletableFuture when complete.
2728 *
2729 * @param fn the function to use to compute the value of the
2730 * returned CompletableFuture
2731 * @return the new CompletableFuture
2732 */
2733 public <U> CompletableFuture<U> handle
2734 (BiFunction<? super T, Throwable, ? extends U> fn) {
2735 if (fn == null) throw new NullPointerException();
2736 CompletableFuture<U> dst = new CompletableFuture<U>();
2737 HandleCompletion<T,U> d = null;
2738 Object r;
2739 if ((r = result) == null) {
2740 CompletionNode p =
2741 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2742 while ((r = result) == null) {
2743 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2744 p.next = completions, p))
2745 break;
2746 }
2747 }
2748 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2749 T t; Throwable ex;
2750 if (r instanceof AltResult) {
2751 ex = ((AltResult)r).ex;
2752 t = null;
2753 }
2754 else {
2755 ex = null;
2756 @SuppressWarnings("unchecked") T tr = (T) r;
2757 t = tr;
2758 }
2759 U u; Throwable dx;
2760 try {
2761 u = fn.apply(t, ex);
2762 dx = null;
2763 } catch (Throwable rex) {
2764 dx = rex;
2765 u = null;
2766 }
2767 dst.internalComplete(u, dx);
2768 }
2769 helpPostComplete();
2770 return dst;
2771 }
2772
2773
2774 /* ------------- Arbitrary-arity constructions -------------- */
2775
2776 /*
2777 * The basic plan of attack is to recursively form binary
2778 * completion trees of elements. This can be overkill for small
2779 * sets, but scales nicely. The And/All vs Or/Any forms use the
2780 * same idea, but details differ.
2781 */
2782
2783 /**
2784 * Returns a new CompletableFuture that is completed when all of
2785 * the given CompletableFutures complete. If any of the component
2786 * CompletableFutures complete exceptionally, then so does the
2787 * returned CompletableFuture. Otherwise, the results, if any, of
2788 * the component CompletableFutures are not reflected in the
2789 * returned CompletableFuture, but may be obtained by inspecting
2790 * them individually. If the number of components is zero, returns
2791 * a CompletableFuture completed with the value {@code null}.
2792 *
2793 * <p>Among the applications of this method is to await completion
2794 * of a set of independent CompletableFutures before continuing a
2795 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2796 * c3).join();}.
2797 *
2798 * @param cfs the CompletableFutures
2799 * @return a CompletableFuture that is complete when all of the
2800 * given CompletableFutures complete
2801 * @throws NullPointerException if the array or any of its elements are
2802 * {@code null}
2803 */
2804 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2805 int len = cfs.length; // Directly handle empty and singleton cases
2806 if (len > 1)
2807 return allTree(cfs, 0, len - 1);
2808 else {
2809 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2810 CompletableFuture<?> f;
2811 if (len == 0)
2812 dst.result = NIL;
2813 else if ((f = cfs[0]) == null)
2814 throw new NullPointerException();
2815 else {
2816 ThenCopy d = null;
2817 CompletionNode p = null;
2818 Object r;
2819 while ((r = f.result) == null) {
2820 if (d == null)
2821 d = new ThenCopy(f, dst);
2822 else if (p == null)
2823 p = new CompletionNode(d);
2824 else if (UNSAFE.compareAndSwapObject
2825 (f, COMPLETIONS, p.next = f.completions, p))
2826 break;
2827 }
2828 if (r != null && (d == null || d.compareAndSet(0, 1)))
2829 dst.internalComplete(null, (r instanceof AltResult) ?
2830 ((AltResult)r).ex : null);
2831 f.helpPostComplete();
2832 }
2833 return dst;
2834 }
2835 }
2836
2837 /**
2838 * Recursively constructs an And'ed tree of CompletableFutures.
2839 * Called only when array known to have at least two elements.
2840 */
2841 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2842 int lo, int hi) {
2843 CompletableFuture<?> fst, snd;
2844 int mid = (lo + hi) >>> 1;
2845 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2846 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2847 throw new NullPointerException();
2848 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2849 AndCompletion d = null;
2850 CompletionNode p = null, q = null;
2851 Object r = null, s = null;
2852 while ((r = fst.result) == null || (s = snd.result) == null) {
2853 if (d == null)
2854 d = new AndCompletion(fst, snd, dst);
2855 else if (p == null)
2856 p = new CompletionNode(d);
2857 else if (q == null) {
2858 if (UNSAFE.compareAndSwapObject
2859 (fst, COMPLETIONS, p.next = fst.completions, p))
2860 q = new CompletionNode(d);
2861 }
2862 else if (UNSAFE.compareAndSwapObject
2863 (snd, COMPLETIONS, q.next = snd.completions, q))
2864 break;
2865 }
2866 if ((r != null || (r = fst.result) != null) &&
2867 (s != null || (s = snd.result) != null) &&
2868 (d == null || d.compareAndSet(0, 1))) {
2869 Throwable ex;
2870 if (r instanceof AltResult)
2871 ex = ((AltResult)r).ex;
2872 else
2873 ex = null;
2874 if (ex == null && (s instanceof AltResult))
2875 ex = ((AltResult)s).ex;
2876 dst.internalComplete(null, ex);
2877 }
2878 fst.helpPostComplete();
2879 snd.helpPostComplete();
2880 return dst;
2881 }
2882
2883 /**
2884 * Returns a new CompletableFuture that is completed when any of
2885 * the component CompletableFutures complete; with the same result if
2886 * it completed normally, otherwise exceptionally. If the number
2887 * of components is zero, returns an incomplete CompletableFuture.
2888 *
2889 * @param cfs the CompletableFutures
2890 * @return a CompletableFuture that is complete when any of the
2891 * given CompletableFutures complete
2892 * @throws NullPointerException if the array or any of its elements are
2893 * {@code null}
2894 */
2895 public static CompletableFuture<?> anyOf(CompletableFuture<?>... cfs) {
2896 int len = cfs.length; // Same idea as allOf
2897 if (len > 1)
2898 return anyTree(cfs, 0, len - 1);
2899 else {
2900 CompletableFuture<?> dst = new CompletableFuture<Object>();
2901 CompletableFuture<?> f;
2902 if (len == 0)
2903 ; // skip
2904 else if ((f = cfs[0]) == null)
2905 throw new NullPointerException();
2906 else {
2907 ThenCopy d = null;
2908 CompletionNode p = null;
2909 Object r;
2910 while ((r = f.result) == null) {
2911 if (d == null)
2912 d = new ThenCopy(f, dst);
2913 else if (p == null)
2914 p = new CompletionNode(d);
2915 else if (UNSAFE.compareAndSwapObject
2916 (f, COMPLETIONS, p.next = f.completions, p))
2917 break;
2918 }
2919 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2920 Throwable ex; Object t;
2921 if (r instanceof AltResult) {
2922 ex = ((AltResult)r).ex;
2923 t = null;
2924 }
2925 else {
2926 ex = null;
2927 t = r;
2928 }
2929 dst.internalComplete(t, ex);
2930 }
2931 f.helpPostComplete();
2932 }
2933 return dst;
2934 }
2935 }
2936
2937 /**
2938 * Recursively constructs an Or'ed tree of CompletableFutures.
2939 */
2940 private static CompletableFuture<?> anyTree(CompletableFuture<?>[] cfs,
2941 int lo, int hi) {
2942 CompletableFuture<?> fst, snd;
2943 int mid = (lo + hi) >>> 1;
2944 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
2945 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
2946 throw new NullPointerException();
2947 CompletableFuture<?> dst = new CompletableFuture<Object>();
2948 OrCompletion d = null;
2949 CompletionNode p = null, q = null;
2950 Object r;
2951 while ((r = fst.result) == null && (r = snd.result) == null) {
2952 if (d == null)
2953 d = new OrCompletion(fst, snd, dst);
2954 else if (p == null)
2955 p = new CompletionNode(d);
2956 else if (q == null) {
2957 if (UNSAFE.compareAndSwapObject
2958 (fst, COMPLETIONS, p.next = fst.completions, p))
2959 q = new CompletionNode(d);
2960 }
2961 else if (UNSAFE.compareAndSwapObject
2962 (snd, COMPLETIONS, q.next = snd.completions, q))
2963 break;
2964 }
2965 if ((r != null || (r = fst.result) != null ||
2966 (r = snd.result) != null) &&
2967 (d == null || d.compareAndSet(0, 1))) {
2968 Throwable ex; Object t;
2969 if (r instanceof AltResult) {
2970 ex = ((AltResult)r).ex;
2971 t = null;
2972 }
2973 else {
2974 ex = null;
2975 t = r;
2976 }
2977 dst.internalComplete(t, ex);
2978 }
2979 fst.helpPostComplete();
2980 snd.helpPostComplete();
2981 return dst;
2982 }
2983
2984
2985 /* ------------- Control and status methods -------------- */
2986
2987 /**
2988 * If not already completed, completes this CompletableFuture with
2989 * a {@link CancellationException}. Dependent CompletableFutures
2990 * that have not already completed will also complete
2991 * exceptionally, with a {@link CompletionException} caused by
2992 * this {@code CancellationException}.
2993 *
2994 * @param mayInterruptIfRunning this value has no effect in this
2995 * implementation because interrupts are not used to control
2996 * processing.
2997 *
2998 * @return {@code true} if this task is now cancelled
2999 */
3000 public boolean cancel(boolean mayInterruptIfRunning) {
3001 boolean cancelled = (result == null) &&
3002 UNSAFE.compareAndSwapObject
3003 (this, RESULT, null, new AltResult(new CancellationException()));
3004 postComplete();
3005 return cancelled || isCancelled();
3006 }
3007
3008 /**
3009 * Returns {@code true} if this CompletableFuture was cancelled
3010 * before it completed normally.
3011 *
3012 * @return {@code true} if this CompletableFuture was cancelled
3013 * before it completed normally
3014 */
3015 public boolean isCancelled() {
3016 Object r;
3017 return ((r = result) instanceof AltResult) &&
3018 (((AltResult)r).ex instanceof CancellationException);
3019 }
3020
3021 /**
3022 * Forcibly sets or resets the value subsequently returned by
3023 * method {@link #get()} and related methods, whether or not
3024 * already completed. This method is designed for use only in
3025 * error recovery actions, and even in such situations may result
3026 * in ongoing dependent completions using established versus
3027 * overwritten outcomes.
3028 *
3029 * @param value the completion value
3030 */
3031 public void obtrudeValue(T value) {
3032 result = (value == null) ? NIL : value;
3033 postComplete();
3034 }
3035
3036 /**
3037 * Forcibly causes subsequent invocations of method {@link #get()}
3038 * and related methods to throw the given exception, whether or
3039 * not already completed. This method is designed for use only in
3040 * recovery actions, and even in such situations may result in
3041 * ongoing dependent completions using established versus
3042 * overwritten outcomes.
3043 *
3044 * @param ex the exception
3045 */
3046 public void obtrudeException(Throwable ex) {
3047 if (ex == null) throw new NullPointerException();
3048 result = new AltResult(ex);
3049 postComplete();
3050 }
3051
3052 /**
3053 * Returns the estimated number of CompletableFutures whose
3054 * completions are awaiting completion of this CompletableFuture.
3055 * This method is designed for use in monitoring system state, not
3056 * for synchronization control.
3057 *
3058 * @return the number of dependent CompletableFutures
3059 */
3060 public int getNumberOfDependents() {
3061 int count = 0;
3062 for (CompletionNode p = completions; p != null; p = p.next)
3063 ++count;
3064 return count;
3065 }
3066
3067 /**
3068 * Returns a string identifying this CompletableFuture, as well as
3069 * its completion state. The state, in brackets, contains the
3070 * String {@code "Completed Normally"} or the String {@code
3071 * "Completed Exceptionally"}, or the String {@code "Not
3072 * completed"} followed by the number of CompletableFutures
3073 * dependent upon its completion, if any.
3074 *
3075 * @return a string identifying this CompletableFuture, as well as its state
3076 */
3077 public String toString() {
3078 Object r = result;
3079 int count;
3080 return super.toString() +
3081 ((r == null) ?
3082 (((count = getNumberOfDependents()) == 0) ?
3083 "[Not completed]" :
3084 "[Not completed, " + count + " dependents]") :
3085 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
3086 "[Completed exceptionally]" :
3087 "[Completed normally]"));
3088 }
3089
3090 // Unsafe mechanics
3091 private static final sun.misc.Unsafe UNSAFE;
3092 private static final long RESULT;
3093 private static final long WAITERS;
3094 private static final long COMPLETIONS;
3095 static {
3096 try {
3097 UNSAFE = sun.misc.Unsafe.getUnsafe();
3098 Class<?> k = CompletableFuture.class;
3099 RESULT = UNSAFE.objectFieldOffset
3100 (k.getDeclaredField("result"));
3101 WAITERS = UNSAFE.objectFieldOffset
3102 (k.getDeclaredField("waiters"));
3103 COMPLETIONS = UNSAFE.objectFieldOffset
3104 (k.getDeclaredField("completions"));
3105 } catch (Exception e) {
3106 throw new Error(e);
3107 }
3108 }
3109 }