ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.49
Committed: Sun Feb 10 14:30:47 2013 UTC (11 years, 3 months ago) by dl
Branch: MAIN
Changes since 1.48: +3 -3 lines
Log Message:
Use consistent wording

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.locks.LockSupport;
24
25 /**
26 * A {@link Future} that may be explicitly completed (setting its
27 * value and status), and may include dependent functions and actions
28 * that trigger upon its completion.
29 *
30 * <p>When two or more threads attempt to {@link #complete}, {@link
31 * #completeExceptionally}, or {@link #cancel} a CompletableFuture,
32 * only one of them succeeds.
33 *
34 * <p>Methods are available for adding dependents based on Functions,
35 * Consumers, and Runnables. The appropriate form to use depends on
36 * whether actions require arguments and/or produce results. Actions
37 * may also be triggered after either or both the current and another
38 * CompletableFuture complete. Multiple CompletableFutures may also
39 * be grouped as one using {@link #anyOf} and {@link #allOf}.
40 *
41 * <p>Actions supplied for dependent completions (mainly using methods
42 * with prefix {@code then}) may be performed by the thread that
43 * completes the current CompletableFuture, or by any other caller of
44 * these methods. There are no guarantees about the order of
45 * processing completions unless constrained by these methods.
46 *
47 * <p>Upon exceptional completion (including cancellation), or when a
48 * completion entails computation, and it terminates abruptly with an
49 * (unchecked) exception or error, then further completions act as
50 * {@code completeExceptionally} with a {@link CompletionException}
51 * holding that exception as its cause. If a CompletableFuture
52 * completes exceptionally, and is not followed by a {@link
53 * #exceptionally} or {@link #handle} completion, then all of its
54 * dependents (and their dependents) also complete exceptionally with
55 * CompletionExceptions holding the ultimate cause. In case of a
56 * CompletionException, methods {@link #get()} and {@link #get(long,
57 * TimeUnit)} throw an {@link ExecutionException} with the same cause
58 * as would be held in the corresponding CompletionException. However,
59 * in these cases, methods {@link #join()} and {@link #getNow} throw
60 * the CompletionException, which simplifies usage.
61 *
62 * <p>CompletableFutures themselves do not execute asynchronously.
63 * However, the {@code async} methods provide commonly useful ways to
64 * commence asynchronous processing, using either a given {@link
65 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
66 * function or action that will result in the completion of a new
67 * CompletableFuture. To simplify monitoring, debugging, and tracking,
68 * all generated asynchronous tasks are instances of the tagging
69 * interface {@link AsynchronousCompletionTask}.
70 *
71 * @author Doug Lea
72 * @since 1.8
73 */
74 public class CompletableFuture<T> implements Future<T> {
75
76 /*
77 * Overview:
78 *
79 * 1. Non-nullness of field result (set via CAS) indicates done.
80 * An AltResult is used to box null as a result, as well as to
81 * hold exceptions. Using a single field makes completion fast
82 * and simple to detect and trigger, at the expense of a lot of
83 * encoding and decoding that infiltrates many methods. One minor
84 * simplification relies on the (static) NIL (to box null results)
85 * being the only AltResult with a null exception field, so we
86 * don't usually need explicit comparisons with NIL. The CF
87 * exception propagation mechanics surrounding decoding rely on
88 * unchecked casts of decoded results really being unchecked,
89 * where user type errors are caught at point of use, as is
90 * currently the case in Java. These are highlighted by using
91 * SuppressWarnings-annotated temporaries.
92 *
93 * 2. Waiters are held in a Treiber stack similar to the one used
94 * in FutureTask, Phaser, and SynchronousQueue. See their
95 * internal documentation for algorithmic details.
96 *
97 * 3. Completions are also kept in a list/stack, and pulled off
98 * and run when completion is triggered. (We could even use the
99 * same stack as for waiters, but would give up the potential
100 * parallelism obtained because woken waiters help release/run
101 * others -- see method postComplete). Because post-processing
102 * may race with direct calls, class Completion opportunistically
103 * extends AtomicInteger so callers can claim the action via
104 * compareAndSet(0, 1). The Completion.run methods are all
105 * written a boringly similar uniform way (that sometimes includes
106 * unnecessary-looking checks, kept to maintain uniformity). There
107 * are enough dimensions upon which they differ that attempts to
108 * factor commonalities while maintaining efficiency require more
109 * lines of code than they would save.
110 *
111 * 4. The exported then/and/or methods do support a bit of
112 * factoring (see doThenApply etc). They must cope with the
113 * intrinsic races surrounding addition of a dependent action
114 * versus performing the action directly because the task is
115 * already complete. For example, a CF may not be complete upon
116 * entry, so a dependent completion is added, but by the time it
117 * is added, the target CF is complete, so must be directly
118 * executed. This is all done while avoiding unnecessary object
119 * construction in safe-bypass cases.
120 */
121
122 // preliminaries
123
124 static final class AltResult {
125 final Throwable ex; // null only for NIL
126 AltResult(Throwable ex) { this.ex = ex; }
127 }
128
129 static final AltResult NIL = new AltResult(null);
130
131 // Fields
132
133 volatile Object result; // Either the result or boxed AltResult
134 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
135 volatile CompletionNode completions; // list (Treiber stack) of completions
136
137 // Basic utilities for triggering and processing completions
138
139 /**
140 * Removes and signals all waiting threads and runs all completions.
141 */
142 final void postComplete() {
143 WaitNode q; Thread t;
144 while ((q = waiters) != null) {
145 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
146 (t = q.thread) != null) {
147 q.thread = null;
148 LockSupport.unpark(t);
149 }
150 }
151
152 CompletionNode h; Completion c;
153 while ((h = completions) != null) {
154 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
155 (c = h.completion) != null)
156 c.run();
157 }
158 }
159
160 /**
161 * Triggers completion with the encoding of the given arguments:
162 * if the exception is non-null, encodes it as a wrapped
163 * CompletionException unless it is one already. Otherwise uses
164 * the given result, boxed as NIL if null.
165 */
166 final void internalComplete(Object v, Throwable ex) {
167 if (result == null)
168 UNSAFE.compareAndSwapObject
169 (this, RESULT, null,
170 (ex == null) ? (v == null) ? NIL : v :
171 new AltResult((ex instanceof CompletionException) ? ex :
172 new CompletionException(ex)));
173 postComplete(); // help out even if not triggered
174 }
175
176 /**
177 * If triggered, helps release and/or process completions.
178 */
179 final void helpPostComplete() {
180 if (result != null)
181 postComplete();
182 }
183
184 /* ------------- waiting for completions -------------- */
185
186 /** Number of processors, for spin control */
187 static final int NCPU = Runtime.getRuntime().availableProcessors();
188
189 /**
190 * Heuristic spin value for waitingGet() before blocking on
191 * multiprocessors
192 */
193 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
194
195 /**
196 * Linked nodes to record waiting threads in a Treiber stack. See
197 * other classes such as Phaser and SynchronousQueue for more
198 * detailed explanation. This class implements ManagedBlocker to
199 * avoid starvation when blocking actions pile up in
200 * ForkJoinPools.
201 */
202 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
203 long nanos; // wait time if timed
204 final long deadline; // non-zero if timed
205 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
206 volatile Thread thread;
207 volatile WaitNode next;
208 WaitNode(boolean interruptible, long nanos, long deadline) {
209 this.thread = Thread.currentThread();
210 this.interruptControl = interruptible ? 1 : 0;
211 this.nanos = nanos;
212 this.deadline = deadline;
213 }
214 public boolean isReleasable() {
215 if (thread == null)
216 return true;
217 if (Thread.interrupted()) {
218 int i = interruptControl;
219 interruptControl = -1;
220 if (i > 0)
221 return true;
222 }
223 if (deadline != 0L &&
224 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
225 thread = null;
226 return true;
227 }
228 return false;
229 }
230 public boolean block() {
231 if (isReleasable())
232 return true;
233 else if (deadline == 0L)
234 LockSupport.park(this);
235 else if (nanos > 0L)
236 LockSupport.parkNanos(this, nanos);
237 return isReleasable();
238 }
239 }
240
241 /**
242 * Returns raw result after waiting, or null if interruptible and
243 * interrupted.
244 */
245 private Object waitingGet(boolean interruptible) {
246 WaitNode q = null;
247 boolean queued = false;
248 int spins = SPINS;
249 for (Object r;;) {
250 if ((r = result) != null) {
251 if (q != null) { // suppress unpark
252 q.thread = null;
253 if (q.interruptControl < 0) {
254 if (interruptible) {
255 removeWaiter(q);
256 return null;
257 }
258 Thread.currentThread().interrupt();
259 }
260 }
261 postComplete(); // help release others
262 return r;
263 }
264 else if (spins > 0) {
265 int rnd = ThreadLocalRandom.nextSecondarySeed();
266 if (rnd == 0)
267 rnd = ThreadLocalRandom.current().nextInt();
268 if (rnd >= 0)
269 --spins;
270 }
271 else if (q == null)
272 q = new WaitNode(interruptible, 0L, 0L);
273 else if (!queued)
274 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
275 q.next = waiters, q);
276 else if (interruptible && q.interruptControl < 0) {
277 removeWaiter(q);
278 return null;
279 }
280 else if (q.thread != null && result == null) {
281 try {
282 ForkJoinPool.managedBlock(q);
283 } catch (InterruptedException ex) {
284 q.interruptControl = -1;
285 }
286 }
287 }
288 }
289
290 /**
291 * Awaits completion or aborts on interrupt or timeout.
292 *
293 * @param nanos time to wait
294 * @return raw result
295 */
296 private Object timedAwaitDone(long nanos)
297 throws InterruptedException, TimeoutException {
298 WaitNode q = null;
299 boolean queued = false;
300 for (Object r;;) {
301 if ((r = result) != null) {
302 if (q != null) {
303 q.thread = null;
304 if (q.interruptControl < 0) {
305 removeWaiter(q);
306 throw new InterruptedException();
307 }
308 }
309 postComplete();
310 return r;
311 }
312 else if (q == null) {
313 if (nanos <= 0L)
314 throw new TimeoutException();
315 long d = System.nanoTime() + nanos;
316 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
317 }
318 else if (!queued)
319 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
320 q.next = waiters, q);
321 else if (q.interruptControl < 0) {
322 removeWaiter(q);
323 throw new InterruptedException();
324 }
325 else if (q.nanos <= 0L) {
326 if (result == null) {
327 removeWaiter(q);
328 throw new TimeoutException();
329 }
330 }
331 else if (q.thread != null && result == null) {
332 try {
333 ForkJoinPool.managedBlock(q);
334 } catch (InterruptedException ex) {
335 q.interruptControl = -1;
336 }
337 }
338 }
339 }
340
341 /**
342 * Tries to unlink a timed-out or interrupted wait node to avoid
343 * accumulating garbage. Internal nodes are simply unspliced
344 * without CAS since it is harmless if they are traversed anyway
345 * by releasers. To avoid effects of unsplicing from already
346 * removed nodes, the list is retraversed in case of an apparent
347 * race. This is slow when there are a lot of nodes, but we don't
348 * expect lists to be long enough to outweigh higher-overhead
349 * schemes.
350 */
351 private void removeWaiter(WaitNode node) {
352 if (node != null) {
353 node.thread = null;
354 retry:
355 for (;;) { // restart on removeWaiter race
356 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
357 s = q.next;
358 if (q.thread != null)
359 pred = q;
360 else if (pred != null) {
361 pred.next = s;
362 if (pred.thread == null) // check for race
363 continue retry;
364 }
365 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
366 continue retry;
367 }
368 break;
369 }
370 }
371 }
372
373 /* ------------- Async tasks -------------- */
374
375 /**
376 * A tagging interface identifying asynchronous tasks produced by
377 * {@code async} methods. This may be useful for monitoring,
378 * debugging, and tracking asynchronous activities.
379 */
380 public static interface AsynchronousCompletionTask {
381 }
382
383 /** Base class can act as either FJ or plain Runnable */
384 abstract static class Async extends ForkJoinTask<Void>
385 implements Runnable, AsynchronousCompletionTask {
386 public final Void getRawResult() { return null; }
387 public final void setRawResult(Void v) { }
388 public final void run() { exec(); }
389 }
390
391 static final class AsyncRun extends Async {
392 final Runnable fn;
393 final CompletableFuture<Void> dst;
394 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
395 this.fn = fn; this.dst = dst;
396 }
397 public final boolean exec() {
398 CompletableFuture<Void> d; Throwable ex;
399 if ((d = this.dst) != null && d.result == null) {
400 try {
401 fn.run();
402 ex = null;
403 } catch (Throwable rex) {
404 ex = rex;
405 }
406 d.internalComplete(null, ex);
407 }
408 return true;
409 }
410 private static final long serialVersionUID = 5232453952276885070L;
411 }
412
413 static final class AsyncSupply<U> extends Async {
414 final Supplier<U> fn;
415 final CompletableFuture<U> dst;
416 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
417 this.fn = fn; this.dst = dst;
418 }
419 public final boolean exec() {
420 CompletableFuture<U> d; U u; Throwable ex;
421 if ((d = this.dst) != null && d.result == null) {
422 try {
423 u = fn.get();
424 ex = null;
425 } catch (Throwable rex) {
426 ex = rex;
427 u = null;
428 }
429 d.internalComplete(u, ex);
430 }
431 return true;
432 }
433 private static final long serialVersionUID = 5232453952276885070L;
434 }
435
436 static final class AsyncApply<T,U> extends Async {
437 final Function<? super T,? extends U> fn;
438 final T arg;
439 final CompletableFuture<U> dst;
440 AsyncApply(T arg, Function<? super T,? extends U> fn,
441 CompletableFuture<U> dst) {
442 this.arg = arg; this.fn = fn; this.dst = dst;
443 }
444 public final boolean exec() {
445 CompletableFuture<U> d; U u; Throwable ex;
446 if ((d = this.dst) != null && d.result == null) {
447 try {
448 u = fn.apply(arg);
449 ex = null;
450 } catch (Throwable rex) {
451 ex = rex;
452 u = null;
453 }
454 d.internalComplete(u, ex);
455 }
456 return true;
457 }
458 private static final long serialVersionUID = 5232453952276885070L;
459 }
460
461 static final class AsyncBiApply<T,U,V> extends Async {
462 final BiFunction<? super T,? super U,? extends V> fn;
463 final T arg1;
464 final U arg2;
465 final CompletableFuture<V> dst;
466 AsyncBiApply(T arg1, U arg2,
467 BiFunction<? super T,? super U,? extends V> fn,
468 CompletableFuture<V> dst) {
469 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
470 }
471 public final boolean exec() {
472 CompletableFuture<V> d; V v; Throwable ex;
473 if ((d = this.dst) != null && d.result == null) {
474 try {
475 v = fn.apply(arg1, arg2);
476 ex = null;
477 } catch (Throwable rex) {
478 ex = rex;
479 v = null;
480 }
481 d.internalComplete(v, ex);
482 }
483 return true;
484 }
485 private static final long serialVersionUID = 5232453952276885070L;
486 }
487
488 static final class AsyncAccept<T> extends Async {
489 final Consumer<? super T> fn;
490 final T arg;
491 final CompletableFuture<Void> dst;
492 AsyncAccept(T arg, Consumer<? super T> fn,
493 CompletableFuture<Void> dst) {
494 this.arg = arg; this.fn = fn; this.dst = dst;
495 }
496 public final boolean exec() {
497 CompletableFuture<Void> d; Throwable ex;
498 if ((d = this.dst) != null && d.result == null) {
499 try {
500 fn.accept(arg);
501 ex = null;
502 } catch (Throwable rex) {
503 ex = rex;
504 }
505 d.internalComplete(null, ex);
506 }
507 return true;
508 }
509 private static final long serialVersionUID = 5232453952276885070L;
510 }
511
512 static final class AsyncBiAccept<T,U> extends Async {
513 final BiConsumer<? super T,? super U> fn;
514 final T arg1;
515 final U arg2;
516 final CompletableFuture<Void> dst;
517 AsyncBiAccept(T arg1, U arg2,
518 BiConsumer<? super T,? super U> fn,
519 CompletableFuture<Void> dst) {
520 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
521 }
522 public final boolean exec() {
523 CompletableFuture<Void> d; Throwable ex;
524 if ((d = this.dst) != null && d.result == null) {
525 try {
526 fn.accept(arg1, arg2);
527 ex = null;
528 } catch (Throwable rex) {
529 ex = rex;
530 }
531 d.internalComplete(null, ex);
532 }
533 return true;
534 }
535 private static final long serialVersionUID = 5232453952276885070L;
536 }
537
538 static final class AsyncCompose<T,U> extends Async {
539 final Function<? super T, CompletableFuture<U>> fn;
540 final T arg;
541 final CompletableFuture<U> dst;
542 AsyncCompose(T arg,
543 Function<? super T, CompletableFuture<U>> fn,
544 CompletableFuture<U> dst) {
545 this.arg = arg; this.fn = fn; this.dst = dst;
546 }
547 public final boolean exec() {
548 CompletableFuture<U> d, fr; U u; Throwable ex;
549 if ((d = this.dst) != null && d.result == null) {
550 try {
551 fr = fn.apply(arg);
552 ex = null;
553 } catch (Throwable rex) {
554 ex = rex;
555 fr = null;
556 }
557 if (ex != null)
558 u = null;
559 else if (fr == null) {
560 ex = new NullPointerException();
561 u = null;
562 }
563 else {
564 Object r = fr.result;
565 if (r instanceof AltResult) {
566 ex = ((AltResult)r).ex;
567 u = null;
568 }
569 else {
570 @SuppressWarnings("unchecked") U ur = (U) r;
571 u = ur;
572 }
573 }
574 d.internalComplete(u, ex);
575 }
576 return true;
577 }
578 private static final long serialVersionUID = 5232453952276885070L;
579 }
580
581 /* ------------- Completions -------------- */
582
583 /**
584 * Simple linked list nodes to record completions, used in
585 * basically the same way as WaitNodes. (We separate nodes from
586 * the Completions themselves mainly because for the And and Or
587 * methods, the same Completion object resides in two lists.)
588 */
589 static final class CompletionNode {
590 final Completion completion;
591 volatile CompletionNode next;
592 CompletionNode(Completion completion) { this.completion = completion; }
593 }
594
595 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
596 abstract static class Completion extends AtomicInteger implements Runnable {
597 }
598
599 static final class ApplyCompletion<T,U> extends Completion {
600 final CompletableFuture<? extends T> src;
601 final Function<? super T,? extends U> fn;
602 final CompletableFuture<U> dst;
603 final Executor executor;
604 ApplyCompletion(CompletableFuture<? extends T> src,
605 Function<? super T,? extends U> fn,
606 CompletableFuture<U> dst, Executor executor) {
607 this.src = src; this.fn = fn; this.dst = dst;
608 this.executor = executor;
609 }
610 public final void run() {
611 final CompletableFuture<? extends T> a;
612 final Function<? super T,? extends U> fn;
613 final CompletableFuture<U> dst;
614 Object r; T t; Throwable ex;
615 if ((dst = this.dst) != null &&
616 (fn = this.fn) != null &&
617 (a = this.src) != null &&
618 (r = a.result) != null &&
619 compareAndSet(0, 1)) {
620 if (r instanceof AltResult) {
621 ex = ((AltResult)r).ex;
622 t = null;
623 }
624 else {
625 ex = null;
626 @SuppressWarnings("unchecked") T tr = (T) r;
627 t = tr;
628 }
629 Executor e = executor;
630 U u = null;
631 if (ex == null) {
632 try {
633 if (e != null)
634 e.execute(new AsyncApply<T,U>(t, fn, dst));
635 else
636 u = fn.apply(t);
637 } catch (Throwable rex) {
638 ex = rex;
639 }
640 }
641 if (e == null || ex != null)
642 dst.internalComplete(u, ex);
643 }
644 }
645 private static final long serialVersionUID = 5232453952276885070L;
646 }
647
648 static final class AcceptCompletion<T> extends Completion {
649 final CompletableFuture<? extends T> src;
650 final Consumer<? super T> fn;
651 final CompletableFuture<Void> dst;
652 final Executor executor;
653 AcceptCompletion(CompletableFuture<? extends T> src,
654 Consumer<? super T> fn,
655 CompletableFuture<Void> dst, Executor executor) {
656 this.src = src; this.fn = fn; this.dst = dst;
657 this.executor = executor;
658 }
659 public final void run() {
660 final CompletableFuture<? extends T> a;
661 final Consumer<? super T> fn;
662 final CompletableFuture<Void> dst;
663 Object r; T t; Throwable ex;
664 if ((dst = this.dst) != null &&
665 (fn = this.fn) != null &&
666 (a = this.src) != null &&
667 (r = a.result) != null &&
668 compareAndSet(0, 1)) {
669 if (r instanceof AltResult) {
670 ex = ((AltResult)r).ex;
671 t = null;
672 }
673 else {
674 ex = null;
675 @SuppressWarnings("unchecked") T tr = (T) r;
676 t = tr;
677 }
678 Executor e = executor;
679 if (ex == null) {
680 try {
681 if (e != null)
682 e.execute(new AsyncAccept<T>(t, fn, dst));
683 else
684 fn.accept(t);
685 } catch (Throwable rex) {
686 ex = rex;
687 }
688 }
689 if (e == null || ex != null)
690 dst.internalComplete(null, ex);
691 }
692 }
693 private static final long serialVersionUID = 5232453952276885070L;
694 }
695
696 static final class RunCompletion<T> extends Completion {
697 final CompletableFuture<? extends T> src;
698 final Runnable fn;
699 final CompletableFuture<Void> dst;
700 final Executor executor;
701 RunCompletion(CompletableFuture<? extends T> src,
702 Runnable fn,
703 CompletableFuture<Void> dst,
704 Executor executor) {
705 this.src = src; this.fn = fn; this.dst = dst;
706 this.executor = executor;
707 }
708 public final void run() {
709 final CompletableFuture<? extends T> a;
710 final Runnable fn;
711 final CompletableFuture<Void> dst;
712 Object r; Throwable ex;
713 if ((dst = this.dst) != null &&
714 (fn = this.fn) != null &&
715 (a = this.src) != null &&
716 (r = a.result) != null &&
717 compareAndSet(0, 1)) {
718 if (r instanceof AltResult)
719 ex = ((AltResult)r).ex;
720 else
721 ex = null;
722 Executor e = executor;
723 if (ex == null) {
724 try {
725 if (e != null)
726 e.execute(new AsyncRun(fn, dst));
727 else
728 fn.run();
729 } catch (Throwable rex) {
730 ex = rex;
731 }
732 }
733 if (e == null || ex != null)
734 dst.internalComplete(null, ex);
735 }
736 }
737 private static final long serialVersionUID = 5232453952276885070L;
738 }
739
740 static final class BiApplyCompletion<T,U,V> extends Completion {
741 final CompletableFuture<? extends T> src;
742 final CompletableFuture<? extends U> snd;
743 final BiFunction<? super T,? super U,? extends V> fn;
744 final CompletableFuture<V> dst;
745 final Executor executor;
746 BiApplyCompletion(CompletableFuture<? extends T> src,
747 CompletableFuture<? extends U> snd,
748 BiFunction<? super T,? super U,? extends V> fn,
749 CompletableFuture<V> dst, Executor executor) {
750 this.src = src; this.snd = snd;
751 this.fn = fn; this.dst = dst;
752 this.executor = executor;
753 }
754 public final void run() {
755 final CompletableFuture<? extends T> a;
756 final CompletableFuture<? extends U> b;
757 final BiFunction<? super T,? super U,? extends V> fn;
758 final CompletableFuture<V> dst;
759 Object r, s; T t; U u; Throwable ex;
760 if ((dst = this.dst) != null &&
761 (fn = this.fn) != null &&
762 (a = this.src) != null &&
763 (r = a.result) != null &&
764 (b = this.snd) != null &&
765 (s = b.result) != null &&
766 compareAndSet(0, 1)) {
767 if (r instanceof AltResult) {
768 ex = ((AltResult)r).ex;
769 t = null;
770 }
771 else {
772 ex = null;
773 @SuppressWarnings("unchecked") T tr = (T) r;
774 t = tr;
775 }
776 if (ex != null)
777 u = null;
778 else if (s instanceof AltResult) {
779 ex = ((AltResult)s).ex;
780 u = null;
781 }
782 else {
783 @SuppressWarnings("unchecked") U us = (U) s;
784 u = us;
785 }
786 Executor e = executor;
787 V v = null;
788 if (ex == null) {
789 try {
790 if (e != null)
791 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
792 else
793 v = fn.apply(t, u);
794 } catch (Throwable rex) {
795 ex = rex;
796 }
797 }
798 if (e == null || ex != null)
799 dst.internalComplete(v, ex);
800 }
801 }
802 private static final long serialVersionUID = 5232453952276885070L;
803 }
804
805 static final class BiAcceptCompletion<T,U> extends Completion {
806 final CompletableFuture<? extends T> src;
807 final CompletableFuture<? extends U> snd;
808 final BiConsumer<? super T,? super U> fn;
809 final CompletableFuture<Void> dst;
810 final Executor executor;
811 BiAcceptCompletion(CompletableFuture<? extends T> src,
812 CompletableFuture<? extends U> snd,
813 BiConsumer<? super T,? super U> fn,
814 CompletableFuture<Void> dst, Executor executor) {
815 this.src = src; this.snd = snd;
816 this.fn = fn; this.dst = dst;
817 this.executor = executor;
818 }
819 public final void run() {
820 final CompletableFuture<? extends T> a;
821 final CompletableFuture<? extends U> b;
822 final BiConsumer<? super T,? super U> fn;
823 final CompletableFuture<Void> dst;
824 Object r, s; T t; U u; Throwable ex;
825 if ((dst = this.dst) != null &&
826 (fn = this.fn) != null &&
827 (a = this.src) != null &&
828 (r = a.result) != null &&
829 (b = this.snd) != null &&
830 (s = b.result) != null &&
831 compareAndSet(0, 1)) {
832 if (r instanceof AltResult) {
833 ex = ((AltResult)r).ex;
834 t = null;
835 }
836 else {
837 ex = null;
838 @SuppressWarnings("unchecked") T tr = (T) r;
839 t = tr;
840 }
841 if (ex != null)
842 u = null;
843 else if (s instanceof AltResult) {
844 ex = ((AltResult)s).ex;
845 u = null;
846 }
847 else {
848 @SuppressWarnings("unchecked") U us = (U) s;
849 u = us;
850 }
851 Executor e = executor;
852 if (ex == null) {
853 try {
854 if (e != null)
855 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
856 else
857 fn.accept(t, u);
858 } catch (Throwable rex) {
859 ex = rex;
860 }
861 }
862 if (e == null || ex != null)
863 dst.internalComplete(null, ex);
864 }
865 }
866 private static final long serialVersionUID = 5232453952276885070L;
867 }
868
869 static final class BiRunCompletion<T> extends Completion {
870 final CompletableFuture<? extends T> src;
871 final CompletableFuture<?> snd;
872 final Runnable fn;
873 final CompletableFuture<Void> dst;
874 final Executor executor;
875 BiRunCompletion(CompletableFuture<? extends T> src,
876 CompletableFuture<?> snd,
877 Runnable fn,
878 CompletableFuture<Void> dst, Executor executor) {
879 this.src = src; this.snd = snd;
880 this.fn = fn; this.dst = dst;
881 this.executor = executor;
882 }
883 public final void run() {
884 final CompletableFuture<? extends T> a;
885 final CompletableFuture<?> b;
886 final Runnable fn;
887 final CompletableFuture<Void> dst;
888 Object r, s; Throwable ex;
889 if ((dst = this.dst) != null &&
890 (fn = this.fn) != null &&
891 (a = this.src) != null &&
892 (r = a.result) != null &&
893 (b = this.snd) != null &&
894 (s = b.result) != null &&
895 compareAndSet(0, 1)) {
896 if (r instanceof AltResult)
897 ex = ((AltResult)r).ex;
898 else
899 ex = null;
900 if (ex == null && (s instanceof AltResult))
901 ex = ((AltResult)s).ex;
902 Executor e = executor;
903 if (ex == null) {
904 try {
905 if (e != null)
906 e.execute(new AsyncRun(fn, dst));
907 else
908 fn.run();
909 } catch (Throwable rex) {
910 ex = rex;
911 }
912 }
913 if (e == null || ex != null)
914 dst.internalComplete(null, ex);
915 }
916 }
917 private static final long serialVersionUID = 5232453952276885070L;
918 }
919
920 static final class AndCompletion extends Completion {
921 final CompletableFuture<?> src;
922 final CompletableFuture<?> snd;
923 final CompletableFuture<Void> dst;
924 AndCompletion(CompletableFuture<?> src,
925 CompletableFuture<?> snd,
926 CompletableFuture<Void> dst) {
927 this.src = src; this.snd = snd; this.dst = dst;
928 }
929 public final void run() {
930 final CompletableFuture<?> a;
931 final CompletableFuture<?> b;
932 final CompletableFuture<Void> dst;
933 Object r, s; Throwable ex;
934 if ((dst = this.dst) != null &&
935 (a = this.src) != null &&
936 (r = a.result) != null &&
937 (b = this.snd) != null &&
938 (s = b.result) != null &&
939 compareAndSet(0, 1)) {
940 if (r instanceof AltResult)
941 ex = ((AltResult)r).ex;
942 else
943 ex = null;
944 if (ex == null && (s instanceof AltResult))
945 ex = ((AltResult)s).ex;
946 dst.internalComplete(null, ex);
947 }
948 }
949 private static final long serialVersionUID = 5232453952276885070L;
950 }
951
952 static final class OrApplyCompletion<T,U> extends Completion {
953 final CompletableFuture<? extends T> src;
954 final CompletableFuture<? extends T> snd;
955 final Function<? super T,? extends U> fn;
956 final CompletableFuture<U> dst;
957 final Executor executor;
958 OrApplyCompletion(CompletableFuture<? extends T> src,
959 CompletableFuture<? extends T> snd,
960 Function<? super T,? extends U> fn,
961 CompletableFuture<U> dst, Executor executor) {
962 this.src = src; this.snd = snd;
963 this.fn = fn; this.dst = dst;
964 this.executor = executor;
965 }
966 public final void run() {
967 final CompletableFuture<? extends T> a;
968 final CompletableFuture<? extends T> b;
969 final Function<? super T,? extends U> fn;
970 final CompletableFuture<U> dst;
971 Object r; T t; Throwable ex;
972 if ((dst = this.dst) != null &&
973 (fn = this.fn) != null &&
974 (((a = this.src) != null && (r = a.result) != null) ||
975 ((b = this.snd) != null && (r = b.result) != null)) &&
976 compareAndSet(0, 1)) {
977 if (r instanceof AltResult) {
978 ex = ((AltResult)r).ex;
979 t = null;
980 }
981 else {
982 ex = null;
983 @SuppressWarnings("unchecked") T tr = (T) r;
984 t = tr;
985 }
986 Executor e = executor;
987 U u = null;
988 if (ex == null) {
989 try {
990 if (e != null)
991 e.execute(new AsyncApply<T,U>(t, fn, dst));
992 else
993 u = fn.apply(t);
994 } catch (Throwable rex) {
995 ex = rex;
996 }
997 }
998 if (e == null || ex != null)
999 dst.internalComplete(u, ex);
1000 }
1001 }
1002 private static final long serialVersionUID = 5232453952276885070L;
1003 }
1004
1005 static final class OrAcceptCompletion<T> extends Completion {
1006 final CompletableFuture<? extends T> src;
1007 final CompletableFuture<? extends T> snd;
1008 final Consumer<? super T> fn;
1009 final CompletableFuture<Void> dst;
1010 final Executor executor;
1011 OrAcceptCompletion(CompletableFuture<? extends T> src,
1012 CompletableFuture<? extends T> snd,
1013 Consumer<? super T> fn,
1014 CompletableFuture<Void> dst, Executor executor) {
1015 this.src = src; this.snd = snd;
1016 this.fn = fn; this.dst = dst;
1017 this.executor = executor;
1018 }
1019 public final void run() {
1020 final CompletableFuture<? extends T> a;
1021 final CompletableFuture<? extends T> b;
1022 final Consumer<? super T> fn;
1023 final CompletableFuture<Void> dst;
1024 Object r; T t; Throwable ex;
1025 if ((dst = this.dst) != null &&
1026 (fn = this.fn) != null &&
1027 (((a = this.src) != null && (r = a.result) != null) ||
1028 ((b = this.snd) != null && (r = b.result) != null)) &&
1029 compareAndSet(0, 1)) {
1030 if (r instanceof AltResult) {
1031 ex = ((AltResult)r).ex;
1032 t = null;
1033 }
1034 else {
1035 ex = null;
1036 @SuppressWarnings("unchecked") T tr = (T) r;
1037 t = tr;
1038 }
1039 Executor e = executor;
1040 if (ex == null) {
1041 try {
1042 if (e != null)
1043 e.execute(new AsyncAccept<T>(t, fn, dst));
1044 else
1045 fn.accept(t);
1046 } catch (Throwable rex) {
1047 ex = rex;
1048 }
1049 }
1050 if (e == null || ex != null)
1051 dst.internalComplete(null, ex);
1052 }
1053 }
1054 private static final long serialVersionUID = 5232453952276885070L;
1055 }
1056
1057 static final class OrRunCompletion<T> extends Completion {
1058 final CompletableFuture<? extends T> src;
1059 final CompletableFuture<?> snd;
1060 final Runnable fn;
1061 final CompletableFuture<Void> dst;
1062 final Executor executor;
1063 OrRunCompletion(CompletableFuture<? extends T> src,
1064 CompletableFuture<?> snd,
1065 Runnable fn,
1066 CompletableFuture<Void> dst, Executor executor) {
1067 this.src = src; this.snd = snd;
1068 this.fn = fn; this.dst = dst;
1069 this.executor = executor;
1070 }
1071 public final void run() {
1072 final CompletableFuture<? extends T> a;
1073 final CompletableFuture<?> b;
1074 final Runnable fn;
1075 final CompletableFuture<Void> dst;
1076 Object r; Throwable ex;
1077 if ((dst = this.dst) != null &&
1078 (fn = this.fn) != null &&
1079 (((a = this.src) != null && (r = a.result) != null) ||
1080 ((b = this.snd) != null && (r = b.result) != null)) &&
1081 compareAndSet(0, 1)) {
1082 if (r instanceof AltResult)
1083 ex = ((AltResult)r).ex;
1084 else
1085 ex = null;
1086 Executor e = executor;
1087 if (ex == null) {
1088 try {
1089 if (e != null)
1090 e.execute(new AsyncRun(fn, dst));
1091 else
1092 fn.run();
1093 } catch (Throwable rex) {
1094 ex = rex;
1095 }
1096 }
1097 if (e == null || ex != null)
1098 dst.internalComplete(null, ex);
1099 }
1100 }
1101 private static final long serialVersionUID = 5232453952276885070L;
1102 }
1103
1104 static final class OrCompletion extends Completion {
1105 final CompletableFuture<?> src;
1106 final CompletableFuture<?> snd;
1107 final CompletableFuture<?> dst;
1108 OrCompletion(CompletableFuture<?> src,
1109 CompletableFuture<?> snd,
1110 CompletableFuture<?> dst) {
1111 this.src = src; this.snd = snd; this.dst = dst;
1112 }
1113 public final void run() {
1114 final CompletableFuture<?> a;
1115 final CompletableFuture<?> b;
1116 final CompletableFuture<?> dst;
1117 Object r, t; Throwable ex;
1118 if ((dst = this.dst) != null &&
1119 (((a = this.src) != null && (r = a.result) != null) ||
1120 ((b = this.snd) != null && (r = b.result) != null)) &&
1121 compareAndSet(0, 1)) {
1122 if (r instanceof AltResult) {
1123 ex = ((AltResult)r).ex;
1124 t = null;
1125 }
1126 else {
1127 ex = null;
1128 t = r;
1129 }
1130 dst.internalComplete(t, ex);
1131 }
1132 }
1133 private static final long serialVersionUID = 5232453952276885070L;
1134 }
1135
1136 static final class ExceptionCompletion<T> extends Completion {
1137 final CompletableFuture<? extends T> src;
1138 final Function<? super Throwable, ? extends T> fn;
1139 final CompletableFuture<T> dst;
1140 ExceptionCompletion(CompletableFuture<? extends T> src,
1141 Function<? super Throwable, ? extends T> fn,
1142 CompletableFuture<T> dst) {
1143 this.src = src; this.fn = fn; this.dst = dst;
1144 }
1145 public final void run() {
1146 final CompletableFuture<? extends T> a;
1147 final Function<? super Throwable, ? extends T> fn;
1148 final CompletableFuture<T> dst;
1149 Object r; T t = null; Throwable ex, dx = null;
1150 if ((dst = this.dst) != null &&
1151 (fn = this.fn) != null &&
1152 (a = this.src) != null &&
1153 (r = a.result) != null &&
1154 compareAndSet(0, 1)) {
1155 if ((r instanceof AltResult) &&
1156 (ex = ((AltResult)r).ex) != null) {
1157 try {
1158 t = fn.apply(ex);
1159 } catch (Throwable rex) {
1160 dx = rex;
1161 }
1162 }
1163 else {
1164 @SuppressWarnings("unchecked") T tr = (T) r;
1165 t = tr;
1166 }
1167 dst.internalComplete(t, dx);
1168 }
1169 }
1170 private static final long serialVersionUID = 5232453952276885070L;
1171 }
1172
1173 static final class ThenCopy extends Completion {
1174 final CompletableFuture<?> src;
1175 final CompletableFuture<?> dst;
1176 ThenCopy(CompletableFuture<?> src,
1177 CompletableFuture<?> dst) {
1178 this.src = src; this.dst = dst;
1179 }
1180 public final void run() {
1181 final CompletableFuture<?> a;
1182 final CompletableFuture<?> dst;
1183 Object r; Object t; Throwable ex;
1184 if ((dst = this.dst) != null &&
1185 (a = this.src) != null &&
1186 (r = a.result) != null &&
1187 compareAndSet(0, 1)) {
1188 if (r instanceof AltResult) {
1189 ex = ((AltResult)r).ex;
1190 t = null;
1191 }
1192 else {
1193 ex = null;
1194 t = r;
1195 }
1196 dst.internalComplete(t, ex);
1197 }
1198 }
1199 private static final long serialVersionUID = 5232453952276885070L;
1200 }
1201
1202 static final class HandleCompletion<T,U> extends Completion {
1203 final CompletableFuture<? extends T> src;
1204 final BiFunction<? super T, Throwable, ? extends U> fn;
1205 final CompletableFuture<U> dst;
1206 HandleCompletion(CompletableFuture<? extends T> src,
1207 BiFunction<? super T, Throwable, ? extends U> fn,
1208 final CompletableFuture<U> dst) {
1209 this.src = src; this.fn = fn; this.dst = dst;
1210 }
1211 public final void run() {
1212 final CompletableFuture<? extends T> a;
1213 final BiFunction<? super T, Throwable, ? extends U> fn;
1214 final CompletableFuture<U> dst;
1215 Object r; T t; Throwable ex;
1216 if ((dst = this.dst) != null &&
1217 (fn = this.fn) != null &&
1218 (a = this.src) != null &&
1219 (r = a.result) != null &&
1220 compareAndSet(0, 1)) {
1221 if (r instanceof AltResult) {
1222 ex = ((AltResult)r).ex;
1223 t = null;
1224 }
1225 else {
1226 ex = null;
1227 @SuppressWarnings("unchecked") T tr = (T) r;
1228 t = tr;
1229 }
1230 U u = null; Throwable dx = null;
1231 try {
1232 u = fn.apply(t, ex);
1233 } catch (Throwable rex) {
1234 dx = rex;
1235 }
1236 dst.internalComplete(u, dx);
1237 }
1238 }
1239 private static final long serialVersionUID = 5232453952276885070L;
1240 }
1241
1242 static final class ComposeCompletion<T,U> extends Completion {
1243 final CompletableFuture<? extends T> src;
1244 final Function<? super T, CompletableFuture<U>> fn;
1245 final CompletableFuture<U> dst;
1246 final Executor executor;
1247 ComposeCompletion(CompletableFuture<? extends T> src,
1248 Function<? super T, CompletableFuture<U>> fn,
1249 final CompletableFuture<U> dst, Executor executor) {
1250 this.src = src; this.fn = fn; this.dst = dst;
1251 this.executor = executor;
1252 }
1253 public final void run() {
1254 final CompletableFuture<? extends T> a;
1255 final Function<? super T, CompletableFuture<U>> fn;
1256 final CompletableFuture<U> dst;
1257 Object r; T t; Throwable ex; Executor e;
1258 if ((dst = this.dst) != null &&
1259 (fn = this.fn) != null &&
1260 (a = this.src) != null &&
1261 (r = a.result) != null &&
1262 compareAndSet(0, 1)) {
1263 if (r instanceof AltResult) {
1264 ex = ((AltResult)r).ex;
1265 t = null;
1266 }
1267 else {
1268 ex = null;
1269 @SuppressWarnings("unchecked") T tr = (T) r;
1270 t = tr;
1271 }
1272 CompletableFuture<U> c = null;
1273 U u = null;
1274 boolean complete = false;
1275 if (ex == null) {
1276 if ((e = executor) != null)
1277 e.execute(new AsyncCompose<T,U>(t, fn, dst));
1278 else {
1279 try {
1280 if ((c = fn.apply(t)) == null)
1281 ex = new NullPointerException();
1282 } catch (Throwable rex) {
1283 ex = rex;
1284 }
1285 }
1286 }
1287 if (c != null) {
1288 ThenCopy d = null;
1289 Object s;
1290 if ((s = c.result) == null) {
1291 CompletionNode p = new CompletionNode
1292 (d = new ThenCopy(c, dst));
1293 while ((s = c.result) == null) {
1294 if (UNSAFE.compareAndSwapObject
1295 (c, COMPLETIONS, p.next = c.completions, p))
1296 break;
1297 }
1298 }
1299 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1300 complete = true;
1301 if (s instanceof AltResult) {
1302 ex = ((AltResult)s).ex; // no rewrap
1303 u = null;
1304 }
1305 else {
1306 @SuppressWarnings("unchecked") U us = (U) s;
1307 u = us;
1308 }
1309 }
1310 }
1311 if (complete || ex != null)
1312 dst.internalComplete(u, ex);
1313 if (c != null)
1314 c.helpPostComplete();
1315 }
1316 }
1317 private static final long serialVersionUID = 5232453952276885070L;
1318 }
1319
1320 // public methods
1321
1322 /**
1323 * Creates a new incomplete CompletableFuture.
1324 */
1325 public CompletableFuture() {
1326 }
1327
1328 /**
1329 * Asynchronously executes in the {@link
1330 * ForkJoinPool#commonPool()}, a task that completes the returned
1331 * CompletableFuture with the result of the given Supplier.
1332 *
1333 * @param supplier a function returning the value to be used
1334 * to complete the returned CompletableFuture
1335 * @return the CompletableFuture
1336 */
1337 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1338 if (supplier == null) throw new NullPointerException();
1339 CompletableFuture<U> f = new CompletableFuture<U>();
1340 ForkJoinPool.commonPool().
1341 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1342 return f;
1343 }
1344
1345 /**
1346 * Asynchronously executes using the given executor, a task that
1347 * completes the returned CompletableFuture with the result of the
1348 * given Supplier.
1349 *
1350 * @param supplier a function returning the value to be used
1351 * to complete the returned CompletableFuture
1352 * @param executor the executor to use for asynchronous execution
1353 * @return the CompletableFuture
1354 */
1355 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1356 Executor executor) {
1357 if (executor == null || supplier == null)
1358 throw new NullPointerException();
1359 CompletableFuture<U> f = new CompletableFuture<U>();
1360 executor.execute(new AsyncSupply<U>(supplier, f));
1361 return f;
1362 }
1363
1364 /**
1365 * Asynchronously executes in the {@link
1366 * ForkJoinPool#commonPool()} a task that runs the given action,
1367 * and then completes the returned CompletableFuture.
1368 *
1369 * @param runnable the action to run before completing the
1370 * returned CompletableFuture
1371 * @return the CompletableFuture
1372 */
1373 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1374 if (runnable == null) throw new NullPointerException();
1375 CompletableFuture<Void> f = new CompletableFuture<Void>();
1376 ForkJoinPool.commonPool().
1377 execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1378 return f;
1379 }
1380
1381 /**
1382 * Asynchronously executes using the given executor, a task that
1383 * runs the given action, and then completes the returned
1384 * CompletableFuture.
1385 *
1386 * @param runnable the action to run before completing the
1387 * returned CompletableFuture
1388 * @param executor the executor to use for asynchronous execution
1389 * @return the CompletableFuture
1390 */
1391 public static CompletableFuture<Void> runAsync(Runnable runnable,
1392 Executor executor) {
1393 if (executor == null || runnable == null)
1394 throw new NullPointerException();
1395 CompletableFuture<Void> f = new CompletableFuture<Void>();
1396 executor.execute(new AsyncRun(runnable, f));
1397 return f;
1398 }
1399
1400 /**
1401 * Returns {@code true} if completed in any fashion: normally,
1402 * exceptionally, or via cancellation.
1403 *
1404 * @return {@code true} if completed
1405 */
1406 public boolean isDone() {
1407 return result != null;
1408 }
1409
1410 /**
1411 * Waits if necessary for this future to complete, and then
1412 * returns its result.
1413 *
1414 * @return the result value
1415 * @throws CancellationException if this future was cancelled
1416 * @throws ExecutionException if this future completed exceptionally
1417 * @throws InterruptedException if the current thread was interrupted
1418 * while waiting
1419 */
1420 public T get() throws InterruptedException, ExecutionException {
1421 Object r; Throwable ex, cause;
1422 if ((r = result) == null && (r = waitingGet(true)) == null)
1423 throw new InterruptedException();
1424 if (!(r instanceof AltResult)) {
1425 @SuppressWarnings("unchecked") T tr = (T) r;
1426 return tr;
1427 }
1428 if ((ex = ((AltResult)r).ex) == null)
1429 return null;
1430 if (ex instanceof CancellationException)
1431 throw (CancellationException)ex;
1432 if ((ex instanceof CompletionException) &&
1433 (cause = ex.getCause()) != null)
1434 ex = cause;
1435 throw new ExecutionException(ex);
1436 }
1437
1438 /**
1439 * Waits if necessary for at most the given time for this future
1440 * to complete, and then returns its result, if available.
1441 *
1442 * @param timeout the maximum time to wait
1443 * @param unit the time unit of the timeout argument
1444 * @return the result value
1445 * @throws CancellationException if this future was cancelled
1446 * @throws ExecutionException if this future completed exceptionally
1447 * @throws InterruptedException if the current thread was interrupted
1448 * while waiting
1449 * @throws TimeoutException if the wait timed out
1450 */
1451 public T get(long timeout, TimeUnit unit)
1452 throws InterruptedException, ExecutionException, TimeoutException {
1453 Object r; Throwable ex, cause;
1454 long nanos = unit.toNanos(timeout);
1455 if (Thread.interrupted())
1456 throw new InterruptedException();
1457 if ((r = result) == null)
1458 r = timedAwaitDone(nanos);
1459 if (!(r instanceof AltResult)) {
1460 @SuppressWarnings("unchecked") T tr = (T) r;
1461 return tr;
1462 }
1463 if ((ex = ((AltResult)r).ex) == null)
1464 return null;
1465 if (ex instanceof CancellationException)
1466 throw (CancellationException)ex;
1467 if ((ex instanceof CompletionException) &&
1468 (cause = ex.getCause()) != null)
1469 ex = cause;
1470 throw new ExecutionException(ex);
1471 }
1472
1473 /**
1474 * Returns the result value when complete, or throws an
1475 * (unchecked) exception if completed exceptionally. To better
1476 * conform with the use of common functional forms, if a
1477 * computation involved in the completion of this
1478 * CompletableFuture threw an exception, this method throws an
1479 * (unchecked) {@link CompletionException} with the underlying
1480 * exception as its cause.
1481 *
1482 * @return the result value
1483 * @throws CancellationException if the computation was cancelled
1484 * @throws CompletionException if a completion computation threw
1485 * an exception
1486 */
1487 public T join() {
1488 Object r; Throwable ex;
1489 if ((r = result) == null)
1490 r = waitingGet(false);
1491 if (!(r instanceof AltResult)) {
1492 @SuppressWarnings("unchecked") T tr = (T) r;
1493 return tr;
1494 }
1495 if ((ex = ((AltResult)r).ex) == null)
1496 return null;
1497 if (ex instanceof CancellationException)
1498 throw (CancellationException)ex;
1499 if (ex instanceof CompletionException)
1500 throw (CompletionException)ex;
1501 throw new CompletionException(ex);
1502 }
1503
1504 /**
1505 * Returns the result value (or throws any encountered exception)
1506 * if completed, else returns the given valueIfAbsent.
1507 *
1508 * @param valueIfAbsent the value to return if not completed
1509 * @return the result value, if completed, else the given valueIfAbsent
1510 * @throws CancellationException if the computation was cancelled
1511 * @throws CompletionException if a completion computation threw
1512 * an exception
1513 */
1514 public T getNow(T valueIfAbsent) {
1515 Object r; Throwable ex;
1516 if ((r = result) == null)
1517 return valueIfAbsent;
1518 if (!(r instanceof AltResult)) {
1519 @SuppressWarnings("unchecked") T tr = (T) r;
1520 return tr;
1521 }
1522 if ((ex = ((AltResult)r).ex) == null)
1523 return null;
1524 if (ex instanceof CancellationException)
1525 throw (CancellationException)ex;
1526 if (ex instanceof CompletionException)
1527 throw (CompletionException)ex;
1528 throw new CompletionException(ex);
1529 }
1530
1531 /**
1532 * If not already completed, sets the value returned by {@link
1533 * #get()} and related methods to the given value.
1534 *
1535 * @param value the result value
1536 * @return {@code true} if this invocation caused this CompletableFuture
1537 * to transition to a completed state, else {@code false}
1538 */
1539 public boolean complete(T value) {
1540 boolean triggered = result == null &&
1541 UNSAFE.compareAndSwapObject(this, RESULT, null,
1542 value == null ? NIL : value);
1543 postComplete();
1544 return triggered;
1545 }
1546
1547 /**
1548 * If not already completed, causes invocations of {@link #get()}
1549 * and related methods to throw the given exception.
1550 *
1551 * @param ex the exception
1552 * @return {@code true} if this invocation caused this CompletableFuture
1553 * to transition to a completed state, else {@code false}
1554 */
1555 public boolean completeExceptionally(Throwable ex) {
1556 if (ex == null) throw new NullPointerException();
1557 boolean triggered = result == null &&
1558 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1559 postComplete();
1560 return triggered;
1561 }
1562
1563 /**
1564 * Creates and returns a CompletableFuture that is completed with
1565 * the result of the given function of this CompletableFuture.
1566 * If this CompletableFuture completes exceptionally,
1567 * then the returned CompletableFuture also does so,
1568 * with a CompletionException holding this exception as
1569 * its cause.
1570 *
1571 * @param fn the function to use to compute the value of
1572 * the returned CompletableFuture
1573 * @return the new CompletableFuture
1574 */
1575 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
1576 return doThenApply(fn, null);
1577 }
1578
1579 /**
1580 * Creates and returns a CompletableFuture that is asynchronously
1581 * completed using the {@link ForkJoinPool#commonPool()} with the
1582 * result of the given function of this CompletableFuture. If
1583 * this CompletableFuture completes exceptionally, then the
1584 * returned CompletableFuture also does so, with a
1585 * CompletionException holding this exception as its cause.
1586 *
1587 * @param fn the function to use to compute the value of
1588 * the returned CompletableFuture
1589 * @return the new CompletableFuture
1590 */
1591 public <U> CompletableFuture<U> thenApplyAsync
1592 (Function<? super T,? extends U> fn) {
1593 return doThenApply(fn, ForkJoinPool.commonPool());
1594 }
1595
1596 /**
1597 * Creates and returns a CompletableFuture that is asynchronously
1598 * completed using the given executor with the result of the given
1599 * function of this CompletableFuture. If this CompletableFuture
1600 * completes exceptionally, then the returned CompletableFuture
1601 * also does so, with a CompletionException holding this exception as
1602 * its cause.
1603 *
1604 * @param fn the function to use to compute the value of
1605 * the returned CompletableFuture
1606 * @param executor the executor to use for asynchronous execution
1607 * @return the new CompletableFuture
1608 */
1609 public <U> CompletableFuture<U> thenApplyAsync
1610 (Function<? super T,? extends U> fn,
1611 Executor executor) {
1612 if (executor == null) throw new NullPointerException();
1613 return doThenApply(fn, executor);
1614 }
1615
1616 private <U> CompletableFuture<U> doThenApply
1617 (Function<? super T,? extends U> fn,
1618 Executor e) {
1619 if (fn == null) throw new NullPointerException();
1620 CompletableFuture<U> dst = new CompletableFuture<U>();
1621 ApplyCompletion<T,U> d = null;
1622 Object r;
1623 if ((r = result) == null) {
1624 CompletionNode p = new CompletionNode
1625 (d = new ApplyCompletion<T,U>(this, fn, dst, e));
1626 while ((r = result) == null) {
1627 if (UNSAFE.compareAndSwapObject
1628 (this, COMPLETIONS, p.next = completions, p))
1629 break;
1630 }
1631 }
1632 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1633 T t; Throwable ex;
1634 if (r instanceof AltResult) {
1635 ex = ((AltResult)r).ex;
1636 t = null;
1637 }
1638 else {
1639 ex = null;
1640 @SuppressWarnings("unchecked") T tr = (T) r;
1641 t = tr;
1642 }
1643 U u = null;
1644 if (ex == null) {
1645 try {
1646 if (e != null)
1647 e.execute(new AsyncApply<T,U>(t, fn, dst));
1648 else
1649 u = fn.apply(t);
1650 } catch (Throwable rex) {
1651 ex = rex;
1652 }
1653 }
1654 if (e == null || ex != null)
1655 dst.internalComplete(u, ex);
1656 }
1657 helpPostComplete();
1658 return dst;
1659 }
1660
1661 /**
1662 * Creates and returns a CompletableFuture that is completed after
1663 * performing the given action with this CompletableFuture's
1664 * result when it completes. If this CompletableFuture
1665 * completes exceptionally, then the returned CompletableFuture
1666 * also does so, with a CompletionException holding this exception as
1667 * its cause.
1668 *
1669 * @param block the action to perform before completing the
1670 * returned CompletableFuture
1671 * @return the new CompletableFuture
1672 */
1673 public CompletableFuture<Void> thenAccept(Consumer<? super T> block) {
1674 return doThenAccept(block, null);
1675 }
1676
1677 /**
1678 * Creates and returns a CompletableFuture that is asynchronously
1679 * completed using the {@link ForkJoinPool#commonPool()} with this
1680 * CompletableFuture's result when it completes. If this
1681 * CompletableFuture completes exceptionally, then the returned
1682 * CompletableFuture also does so, with a CompletionException holding
1683 * this exception as its cause.
1684 *
1685 * @param block the action to perform before completing the
1686 * returned CompletableFuture
1687 * @return the new CompletableFuture
1688 */
1689 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block) {
1690 return doThenAccept(block, ForkJoinPool.commonPool());
1691 }
1692
1693 /**
1694 * Creates and returns a CompletableFuture that is asynchronously
1695 * completed using the given executor with this
1696 * CompletableFuture's result when it completes. If this
1697 * CompletableFuture completes exceptionally, then the returned
1698 * CompletableFuture also does so, with a CompletionException holding
1699 * this exception as its cause.
1700 *
1701 * @param block the action to perform before completing the
1702 * returned CompletableFuture
1703 * @param executor the executor to use for asynchronous execution
1704 * @return the new CompletableFuture
1705 */
1706 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block,
1707 Executor executor) {
1708 if (executor == null) throw new NullPointerException();
1709 return doThenAccept(block, executor);
1710 }
1711
1712 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1713 Executor e) {
1714 if (fn == null) throw new NullPointerException();
1715 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1716 AcceptCompletion<T> d = null;
1717 Object r;
1718 if ((r = result) == null) {
1719 CompletionNode p = new CompletionNode
1720 (d = new AcceptCompletion<T>(this, fn, dst, e));
1721 while ((r = result) == null) {
1722 if (UNSAFE.compareAndSwapObject
1723 (this, COMPLETIONS, p.next = completions, p))
1724 break;
1725 }
1726 }
1727 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1728 T t; Throwable ex;
1729 if (r instanceof AltResult) {
1730 ex = ((AltResult)r).ex;
1731 t = null;
1732 }
1733 else {
1734 ex = null;
1735 @SuppressWarnings("unchecked") T tr = (T) r;
1736 t = tr;
1737 }
1738 if (ex == null) {
1739 try {
1740 if (e != null)
1741 e.execute(new AsyncAccept<T>(t, fn, dst));
1742 else
1743 fn.accept(t);
1744 } catch (Throwable rex) {
1745 ex = rex;
1746 }
1747 }
1748 if (e == null || ex != null)
1749 dst.internalComplete(null, ex);
1750 }
1751 helpPostComplete();
1752 return dst;
1753 }
1754
1755 /**
1756 * Creates and returns a CompletableFuture that is completed after
1757 * performing the given action when this CompletableFuture
1758 * completes. If this CompletableFuture completes exceptionally,
1759 * then the returned CompletableFuture also does so, with a
1760 * CompletionException holding this exception as its cause.
1761 *
1762 * @param action the action to perform before completing the
1763 * returned CompletableFuture
1764 * @return the new CompletableFuture
1765 */
1766 public CompletableFuture<Void> thenRun(Runnable action) {
1767 return doThenRun(action, null);
1768 }
1769
1770 /**
1771 * Creates and returns a CompletableFuture that is asynchronously
1772 * completed using the {@link ForkJoinPool#commonPool()} after
1773 * performing the given action when this CompletableFuture
1774 * completes. If this CompletableFuture completes exceptionally,
1775 * then the returned CompletableFuture also does so, with a
1776 * CompletionException holding this exception as its cause.
1777 *
1778 * @param action the action to perform before completing the
1779 * returned CompletableFuture
1780 * @return the new CompletableFuture
1781 */
1782 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1783 return doThenRun(action, ForkJoinPool.commonPool());
1784 }
1785
1786 /**
1787 * Creates and returns a CompletableFuture that is asynchronously
1788 * completed using the given executor after performing the given
1789 * action when this CompletableFuture completes. If this
1790 * CompletableFuture completes exceptionally, then the returned
1791 * CompletableFuture also does so, with a CompletionException holding
1792 * this exception as its cause.
1793 *
1794 * @param action the action to perform before completing the
1795 * returned CompletableFuture
1796 * @param executor the executor to use for asynchronous execution
1797 * @return the new CompletableFuture
1798 */
1799 public CompletableFuture<Void> thenRunAsync(Runnable action,
1800 Executor executor) {
1801 if (executor == null) throw new NullPointerException();
1802 return doThenRun(action, executor);
1803 }
1804
1805 private CompletableFuture<Void> doThenRun(Runnable action,
1806 Executor e) {
1807 if (action == null) throw new NullPointerException();
1808 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1809 RunCompletion<T> d = null;
1810 Object r;
1811 if ((r = result) == null) {
1812 CompletionNode p = new CompletionNode
1813 (d = new RunCompletion<T>(this, action, dst, e));
1814 while ((r = result) == null) {
1815 if (UNSAFE.compareAndSwapObject
1816 (this, COMPLETIONS, p.next = completions, p))
1817 break;
1818 }
1819 }
1820 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1821 Throwable ex;
1822 if (r instanceof AltResult)
1823 ex = ((AltResult)r).ex;
1824 else
1825 ex = null;
1826 if (ex == null) {
1827 try {
1828 if (e != null)
1829 e.execute(new AsyncRun(action, dst));
1830 else
1831 action.run();
1832 } catch (Throwable rex) {
1833 ex = rex;
1834 }
1835 }
1836 if (e == null || ex != null)
1837 dst.internalComplete(null, ex);
1838 }
1839 helpPostComplete();
1840 return dst;
1841 }
1842
1843 /**
1844 * Creates and returns a CompletableFuture that is completed with
1845 * the result of the given function of this and the other given
1846 * CompletableFuture's results when both complete. If this or
1847 * the other CompletableFuture complete exceptionally, then the
1848 * returned CompletableFuture also does so, with a
1849 * CompletionException holding the exception as its cause.
1850 *
1851 * @param other the other CompletableFuture
1852 * @param fn the function to use to compute the value of
1853 * the returned CompletableFuture
1854 * @return the new CompletableFuture
1855 */
1856 public <U,V> CompletableFuture<V> thenCombine
1857 (CompletableFuture<? extends U> other,
1858 BiFunction<? super T,? super U,? extends V> fn) {
1859 return doThenBiApply(other, fn, null);
1860 }
1861
1862 /**
1863 * Creates and returns a CompletableFuture that is asynchronously
1864 * completed using the {@link ForkJoinPool#commonPool()} with
1865 * the result of the given function of this and the other given
1866 * CompletableFuture's results when both complete. If this or
1867 * the other CompletableFuture complete exceptionally, then the
1868 * returned CompletableFuture also does so, with a
1869 * CompletionException holding the exception as its cause.
1870 *
1871 * @param other the other CompletableFuture
1872 * @param fn the function to use to compute the value of
1873 * the returned CompletableFuture
1874 * @return the new CompletableFuture
1875 */
1876 public <U,V> CompletableFuture<V> thenCombineAsync
1877 (CompletableFuture<? extends U> other,
1878 BiFunction<? super T,? super U,? extends V> fn) {
1879 return doThenBiApply(other, fn, ForkJoinPool.commonPool());
1880 }
1881
1882 /**
1883 * Creates and returns a CompletableFuture that is
1884 * asynchronously completed using the given executor with the
1885 * result of the given function of this and the other given
1886 * CompletableFuture's results when both complete. If this or
1887 * the other CompletableFuture complete exceptionally, then the
1888 * returned CompletableFuture also does so, with a
1889 * CompletionException holding the exception as its cause.
1890 *
1891 * @param other the other CompletableFuture
1892 * @param fn the function to use to compute the value of
1893 * the returned CompletableFuture
1894 * @param executor the executor to use for asynchronous execution
1895 * @return the new CompletableFuture
1896 */
1897
1898 public <U,V> CompletableFuture<V> thenCombineAsync
1899 (CompletableFuture<? extends U> other,
1900 BiFunction<? super T,? super U,? extends V> fn,
1901 Executor executor) {
1902 if (executor == null) throw new NullPointerException();
1903 return doThenBiApply(other, fn, executor);
1904 }
1905
1906 private <U,V> CompletableFuture<V> doThenBiApply
1907 (CompletableFuture<? extends U> other,
1908 BiFunction<? super T,? super U,? extends V> fn,
1909 Executor e) {
1910 if (other == null || fn == null) throw new NullPointerException();
1911 CompletableFuture<V> dst = new CompletableFuture<V>();
1912 BiApplyCompletion<T,U,V> d = null;
1913 Object r, s = null;
1914 if ((r = result) == null || (s = other.result) == null) {
1915 d = new BiApplyCompletion<T,U,V>(this, other, fn, dst, e);
1916 CompletionNode q = null, p = new CompletionNode(d);
1917 while ((r == null && (r = result) == null) ||
1918 (s == null && (s = other.result) == null)) {
1919 if (q != null) {
1920 if (s != null ||
1921 UNSAFE.compareAndSwapObject
1922 (other, COMPLETIONS, q.next = other.completions, q))
1923 break;
1924 }
1925 else if (r != null ||
1926 UNSAFE.compareAndSwapObject
1927 (this, COMPLETIONS, p.next = completions, p)) {
1928 if (s != null)
1929 break;
1930 q = new CompletionNode(d);
1931 }
1932 }
1933 }
1934 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1935 T t; U u; Throwable ex;
1936 if (r instanceof AltResult) {
1937 ex = ((AltResult)r).ex;
1938 t = null;
1939 }
1940 else {
1941 ex = null;
1942 @SuppressWarnings("unchecked") T tr = (T) r;
1943 t = tr;
1944 }
1945 if (ex != null)
1946 u = null;
1947 else if (s instanceof AltResult) {
1948 ex = ((AltResult)s).ex;
1949 u = null;
1950 }
1951 else {
1952 @SuppressWarnings("unchecked") U us = (U) s;
1953 u = us;
1954 }
1955 V v = null;
1956 if (ex == null) {
1957 try {
1958 if (e != null)
1959 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
1960 else
1961 v = fn.apply(t, u);
1962 } catch (Throwable rex) {
1963 ex = rex;
1964 }
1965 }
1966 if (e == null || ex != null)
1967 dst.internalComplete(v, ex);
1968 }
1969 helpPostComplete();
1970 other.helpPostComplete();
1971 return dst;
1972 }
1973
1974 /**
1975 * Creates and returns a CompletableFuture that is completed with
1976 * the results of this and the other given CompletableFuture if
1977 * both complete. If this and/or the other CompletableFuture
1978 * complete exceptionally, then the returned CompletableFuture
1979 * also does so, with a CompletionException holding one of these
1980 * exceptions as its cause.
1981 *
1982 * @param other the other CompletableFuture
1983 * @param block the action to perform before completing the
1984 * returned CompletableFuture
1985 * @return the new CompletableFuture
1986 */
1987 public <U> CompletableFuture<Void> thenAcceptBoth
1988 (CompletableFuture<? extends U> other,
1989 BiConsumer<? super T, ? super U> block) {
1990 return doThenBiAccept(other, block, null);
1991 }
1992
1993 /**
1994 * Creates and returns a CompletableFuture that is completed
1995 * asynchronously using the {@link ForkJoinPool#commonPool()} with
1996 * the results of this and the other given CompletableFuture when
1997 * both complete. If this and/or the other CompletableFuture
1998 * complete exceptionally, then the returned CompletableFuture
1999 * also does so, with a CompletionException holding one of these
2000 * exceptions as its cause.
2001 *
2002 * @param other the other CompletableFuture
2003 * @param block the action to perform before completing the
2004 * returned CompletableFuture
2005 * @return the new CompletableFuture
2006 */
2007 public <U> CompletableFuture<Void> thenAcceptBothAsync
2008 (CompletableFuture<? extends U> other,
2009 BiConsumer<? super T, ? super U> block) {
2010 return doThenBiAccept(other, block, ForkJoinPool.commonPool());
2011 }
2012
2013 /**
2014 * Creates and returns a CompletableFuture that is completed
2015 * asynchronously using the given executor with the results of
2016 * this and the other given CompletableFuture when both complete.
2017 * If this and/or the other CompletableFuture complete exceptionally,
2018 * then the returned CompletableFuture also does so, with a
2019 * CompletionException holding one of these exceptions as its cause.
2020 *
2021 * @param other the other CompletableFuture
2022 * @param block the action to perform before completing the
2023 * returned CompletableFuture
2024 * @param executor the executor to use for asynchronous execution
2025 * @return the new CompletableFuture
2026 */
2027 public <U> CompletableFuture<Void> thenAcceptBothAsync
2028 (CompletableFuture<? extends U> other,
2029 BiConsumer<? super T, ? super U> block,
2030 Executor executor) {
2031 if (executor == null) throw new NullPointerException();
2032 return doThenBiAccept(other, block, executor);
2033 }
2034
2035 private <U> CompletableFuture<Void> doThenBiAccept
2036 (CompletableFuture<? extends U> other,
2037 BiConsumer<? super T,? super U> fn,
2038 Executor e) {
2039 if (other == null || fn == null) throw new NullPointerException();
2040 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2041 BiAcceptCompletion<T,U> d = null;
2042 Object r, s = null;
2043 if ((r = result) == null || (s = other.result) == null) {
2044 d = new BiAcceptCompletion<T,U>(this, other, fn, dst, e);
2045 CompletionNode q = null, p = new CompletionNode(d);
2046 while ((r == null && (r = result) == null) ||
2047 (s == null && (s = other.result) == null)) {
2048 if (q != null) {
2049 if (s != null ||
2050 UNSAFE.compareAndSwapObject
2051 (other, COMPLETIONS, q.next = other.completions, q))
2052 break;
2053 }
2054 else if (r != null ||
2055 UNSAFE.compareAndSwapObject
2056 (this, COMPLETIONS, p.next = completions, p)) {
2057 if (s != null)
2058 break;
2059 q = new CompletionNode(d);
2060 }
2061 }
2062 }
2063 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2064 T t; U u; Throwable ex;
2065 if (r instanceof AltResult) {
2066 ex = ((AltResult)r).ex;
2067 t = null;
2068 }
2069 else {
2070 ex = null;
2071 @SuppressWarnings("unchecked") T tr = (T) r;
2072 t = tr;
2073 }
2074 if (ex != null)
2075 u = null;
2076 else if (s instanceof AltResult) {
2077 ex = ((AltResult)s).ex;
2078 u = null;
2079 }
2080 else {
2081 @SuppressWarnings("unchecked") U us = (U) s;
2082 u = us;
2083 }
2084 if (ex == null) {
2085 try {
2086 if (e != null)
2087 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
2088 else
2089 fn.accept(t, u);
2090 } catch (Throwable rex) {
2091 ex = rex;
2092 }
2093 }
2094 if (e == null || ex != null)
2095 dst.internalComplete(null, ex);
2096 }
2097 helpPostComplete();
2098 other.helpPostComplete();
2099 return dst;
2100 }
2101
2102 /**
2103 * Creates and returns a CompletableFuture that is completed when
2104 * this and the other given CompletableFuture both complete.
2105 * If this and/or the other CompletableFuture complete exceptionally,
2106 * then the returned CompletableFuture also does so, with a
2107 * CompletionException holding one of these exceptions as its cause.
2108 *
2109 * @param other the other CompletableFuture
2110 * @param action the action to perform before completing the
2111 * returned CompletableFuture
2112 * @return the new CompletableFuture
2113 */
2114 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2115 Runnable action) {
2116 return doThenBiRun(other, action, null);
2117 }
2118
2119 /**
2120 * Creates and returns a CompletableFuture that is completed
2121 * asynchronously using the {@link ForkJoinPool#commonPool()}
2122 * when this and the other given CompletableFuture both complete.
2123 * If this and/or the other CompletableFuture complete exceptionally,
2124 * then the returned CompletableFuture also does so, with a
2125 * CompletionException holding one of these exceptions as its cause.
2126 *
2127 * @param other the other CompletableFuture
2128 * @param action the action to perform before completing the
2129 * returned CompletableFuture
2130 * @return the new CompletableFuture
2131 */
2132 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2133 Runnable action) {
2134 return doThenBiRun(other, action, ForkJoinPool.commonPool());
2135 }
2136
2137 /**
2138 * Creates and returns a CompletableFuture that is completed
2139 * asynchronously using the given executor when this and the
2140 * other given CompletableFuture both complete.
2141 * If this and/or the other CompletableFuture complete exceptionally,
2142 * then the returned CompletableFuture also does so, with a
2143 * CompletionException holding one of these exceptions as its cause.
2144 *
2145 * @param other the other CompletableFuture
2146 * @param action the action to perform before completing the
2147 * returned CompletableFuture
2148 * @param executor the executor to use for asynchronous execution
2149 * @return the new CompletableFuture
2150 */
2151 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2152 Runnable action,
2153 Executor executor) {
2154 if (executor == null) throw new NullPointerException();
2155 return doThenBiRun(other, action, executor);
2156 }
2157
2158 private CompletableFuture<Void> doThenBiRun(CompletableFuture<?> other,
2159 Runnable action,
2160 Executor e) {
2161 if (other == null || action == null) throw new NullPointerException();
2162 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2163 BiRunCompletion<T> d = null;
2164 Object r, s = null;
2165 if ((r = result) == null || (s = other.result) == null) {
2166 d = new BiRunCompletion<T>(this, other, action, dst, e);
2167 CompletionNode q = null, p = new CompletionNode(d);
2168 while ((r == null && (r = result) == null) ||
2169 (s == null && (s = other.result) == null)) {
2170 if (q != null) {
2171 if (s != null ||
2172 UNSAFE.compareAndSwapObject
2173 (other, COMPLETIONS, q.next = other.completions, q))
2174 break;
2175 }
2176 else if (r != null ||
2177 UNSAFE.compareAndSwapObject
2178 (this, COMPLETIONS, p.next = completions, p)) {
2179 if (s != null)
2180 break;
2181 q = new CompletionNode(d);
2182 }
2183 }
2184 }
2185 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2186 Throwable ex;
2187 if (r instanceof AltResult)
2188 ex = ((AltResult)r).ex;
2189 else
2190 ex = null;
2191 if (ex == null && (s instanceof AltResult))
2192 ex = ((AltResult)s).ex;
2193 if (ex == null) {
2194 try {
2195 if (e != null)
2196 e.execute(new AsyncRun(action, dst));
2197 else
2198 action.run();
2199 } catch (Throwable rex) {
2200 ex = rex;
2201 }
2202 }
2203 if (e == null || ex != null)
2204 dst.internalComplete(null, ex);
2205 }
2206 helpPostComplete();
2207 other.helpPostComplete();
2208 return dst;
2209 }
2210
2211 /**
2212 * Creates and returns a CompletableFuture that is completed with
2213 * the result of the given function of either this or the other
2214 * given CompletableFuture's results when either complete.
2215 * If this and/or the other CompletableFuture complete exceptionally,
2216 * then the returned CompletableFuture may also do so, with a
2217 * CompletionException holding one of these exceptions as its cause.
2218 * No guarantees are made about which result or exception is used
2219 * in the returned CompletableFuture.
2220 *
2221 * @param other the other CompletableFuture
2222 * @param fn the function to use to compute the value of
2223 * the returned CompletableFuture
2224 * @return the new CompletableFuture
2225 */
2226 public <U> CompletableFuture<U> applyToEither
2227 (CompletableFuture<? extends T> other,
2228 Function<? super T, U> fn) {
2229 return doOrApply(other, fn, null);
2230 }
2231
2232 /**
2233 * Creates and returns a CompletableFuture that is completed
2234 * asynchronously using the {@link ForkJoinPool#commonPool()} with
2235 * the result of the given function of either this or the other
2236 * given CompletableFuture's results when either complete.
2237 * If this and/or the other CompletableFuture complete exceptionally,
2238 * then the returned CompletableFuture may also do so, with a
2239 * CompletionException holding one of these exceptions as its cause.
2240 * No guarantees are made about which result or exception is used
2241 * in the returned CompletableFuture.
2242 *
2243 * @param other the other CompletableFuture
2244 * @param fn the function to use to compute the value of
2245 * the returned CompletableFuture
2246 * @return the new CompletableFuture
2247 */
2248 public <U> CompletableFuture<U> applyToEitherAsync
2249 (CompletableFuture<? extends T> other,
2250 Function<? super T, U> fn) {
2251 return doOrApply(other, fn, ForkJoinPool.commonPool());
2252 }
2253
2254 /**
2255 * Creates and returns a CompletableFuture that is completed
2256 * asynchronously using the given executor with the result of the
2257 * given function of either this or the other given
2258 * CompletableFuture's results when either complete. If this
2259 * and/or the other CompletableFuture complete exceptionally, then
2260 * the returned CompletableFuture may also do so, with a
2261 * CompletionException holding one of these exceptions as its cause.
2262 * No guarantees are made about which result or exception is used
2263 * in the returned CompletableFuture.
2264 *
2265 * @param other the other CompletableFuture
2266 * @param fn the function to use to compute the value of
2267 * the returned CompletableFuture
2268 * @param executor the executor to use for asynchronous execution
2269 * @return the new CompletableFuture
2270 */
2271 public <U> CompletableFuture<U> applyToEitherAsync
2272 (CompletableFuture<? extends T> other,
2273 Function<? super T, U> fn,
2274 Executor executor) {
2275 if (executor == null) throw new NullPointerException();
2276 return doOrApply(other, fn, executor);
2277 }
2278
2279 private <U> CompletableFuture<U> doOrApply
2280 (CompletableFuture<? extends T> other,
2281 Function<? super T, U> fn,
2282 Executor e) {
2283 if (other == null || fn == null) throw new NullPointerException();
2284 CompletableFuture<U> dst = new CompletableFuture<U>();
2285 OrApplyCompletion<T,U> d = null;
2286 Object r;
2287 if ((r = result) == null && (r = other.result) == null) {
2288 d = new OrApplyCompletion<T,U>(this, other, fn, dst, e);
2289 CompletionNode q = null, p = new CompletionNode(d);
2290 while ((r = result) == null && (r = other.result) == null) {
2291 if (q != null) {
2292 if (UNSAFE.compareAndSwapObject
2293 (other, COMPLETIONS, q.next = other.completions, q))
2294 break;
2295 }
2296 else if (UNSAFE.compareAndSwapObject
2297 (this, COMPLETIONS, p.next = completions, p))
2298 q = new CompletionNode(d);
2299 }
2300 }
2301 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2302 T t; Throwable ex;
2303 if (r instanceof AltResult) {
2304 ex = ((AltResult)r).ex;
2305 t = null;
2306 }
2307 else {
2308 ex = null;
2309 @SuppressWarnings("unchecked") T tr = (T) r;
2310 t = tr;
2311 }
2312 U u = null;
2313 if (ex == null) {
2314 try {
2315 if (e != null)
2316 e.execute(new AsyncApply<T,U>(t, fn, dst));
2317 else
2318 u = fn.apply(t);
2319 } catch (Throwable rex) {
2320 ex = rex;
2321 }
2322 }
2323 if (e == null || ex != null)
2324 dst.internalComplete(u, ex);
2325 }
2326 helpPostComplete();
2327 other.helpPostComplete();
2328 return dst;
2329 }
2330
2331 /**
2332 * Creates and returns a CompletableFuture that is completed after
2333 * performing the given action with the result of either this or the
2334 * other given CompletableFuture's result, when either complete.
2335 * If this and/or the other CompletableFuture complete exceptionally,
2336 * then the returned CompletableFuture may also do so, with a
2337 * CompletionException holding one of these exceptions as its cause.
2338 * No guarantees are made about which exception is used in the
2339 * returned CompletableFuture.
2340 *
2341 * @param other the other CompletableFuture
2342 * @param block the action to perform before completing the
2343 * returned CompletableFuture
2344 * @return the new CompletableFuture
2345 */
2346 public CompletableFuture<Void> acceptEither
2347 (CompletableFuture<? extends T> other,
2348 Consumer<? super T> block) {
2349 return doOrAccept(other, block, null);
2350 }
2351
2352 /**
2353 * Creates and returns a CompletableFuture that is completed
2354 * asynchronously using the {@link ForkJoinPool#commonPool()},
2355 * performing the given action with the result of either this or
2356 * the other given CompletableFuture's result, when either complete.
2357 * If this and/or the other CompletableFuture complete exceptionally,
2358 * then the returned CompletableFuture may also do so, with a
2359 * CompletionException holding one of these exceptions as its cause.
2360 * No guarantees are made about which exception is used in the
2361 * returned CompletableFuture.
2362 *
2363 * @param other the other CompletableFuture
2364 * @param block the action to perform before completing the
2365 * returned CompletableFuture
2366 * @return the new CompletableFuture
2367 */
2368 public CompletableFuture<Void> acceptEitherAsync
2369 (CompletableFuture<? extends T> other,
2370 Consumer<? super T> block) {
2371 return doOrAccept(other, block, ForkJoinPool.commonPool());
2372 }
2373
2374 /**
2375 * Creates and returns a CompletableFuture that is completed
2376 * asynchronously using the given executor, performing the given
2377 * action with the result of either this or the other given
2378 * CompletableFuture's result, when either complete.
2379 * If this and/or the other CompletableFuture complete exceptionally,
2380 * then the returned CompletableFuture may also do so, with a
2381 * CompletionException holding one of these exceptions as its cause.
2382 * No guarantees are made about which exception is used in the
2383 * returned CompletableFuture.
2384 *
2385 * @param other the other CompletableFuture
2386 * @param block the action to perform before completing the
2387 * returned CompletableFuture
2388 * @param executor the executor to use for asynchronous execution
2389 * @return the new CompletableFuture
2390 */
2391 public CompletableFuture<Void> acceptEitherAsync
2392 (CompletableFuture<? extends T> other,
2393 Consumer<? super T> block,
2394 Executor executor) {
2395 if (executor == null) throw new NullPointerException();
2396 return doOrAccept(other, block, executor);
2397 }
2398
2399 private CompletableFuture<Void> doOrAccept
2400 (CompletableFuture<? extends T> other,
2401 Consumer<? super T> fn,
2402 Executor e) {
2403 if (other == null || fn == null) throw new NullPointerException();
2404 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2405 OrAcceptCompletion<T> d = null;
2406 Object r;
2407 if ((r = result) == null && (r = other.result) == null) {
2408 d = new OrAcceptCompletion<T>(this, other, fn, dst, e);
2409 CompletionNode q = null, p = new CompletionNode(d);
2410 while ((r = result) == null && (r = other.result) == null) {
2411 if (q != null) {
2412 if (UNSAFE.compareAndSwapObject
2413 (other, COMPLETIONS, q.next = other.completions, q))
2414 break;
2415 }
2416 else if (UNSAFE.compareAndSwapObject
2417 (this, COMPLETIONS, p.next = completions, p))
2418 q = new CompletionNode(d);
2419 }
2420 }
2421 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2422 T t; Throwable ex;
2423 if (r instanceof AltResult) {
2424 ex = ((AltResult)r).ex;
2425 t = null;
2426 }
2427 else {
2428 ex = null;
2429 @SuppressWarnings("unchecked") T tr = (T) r;
2430 t = tr;
2431 }
2432 if (ex == null) {
2433 try {
2434 if (e != null)
2435 e.execute(new AsyncAccept<T>(t, fn, dst));
2436 else
2437 fn.accept(t);
2438 } catch (Throwable rex) {
2439 ex = rex;
2440 }
2441 }
2442 if (e == null || ex != null)
2443 dst.internalComplete(null, ex);
2444 }
2445 helpPostComplete();
2446 other.helpPostComplete();
2447 return dst;
2448 }
2449
2450 /**
2451 * Creates and returns a CompletableFuture that is completed
2452 * after this or the other given CompletableFuture complete.
2453 * If this and/or the other CompletableFuture complete exceptionally,
2454 * then the returned CompletableFuture may also do so, with a
2455 * CompletionException holding one of these exceptions as its cause.
2456 * No guarantees are made about which exception is used in the
2457 * returned CompletableFuture.
2458 *
2459 * @param other the other CompletableFuture
2460 * @param action the action to perform before completing the
2461 * returned CompletableFuture
2462 * @return the new CompletableFuture
2463 */
2464 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2465 Runnable action) {
2466 return doOrRun(other, action, null);
2467 }
2468
2469 /**
2470 * Creates and returns a CompletableFuture that is completed
2471 * asynchronously using the {@link ForkJoinPool#commonPool()}
2472 * after this or the other given CompletableFuture complete.
2473 * If this and/or the other CompletableFuture complete exceptionally,
2474 * then the returned CompletableFuture may also do so, with a
2475 * CompletionException holding one of these exceptions as its cause.
2476 * No guarantees are made about which exception is used in the
2477 * returned CompletableFuture.
2478 *
2479 * @param other the other CompletableFuture
2480 * @param action the action to perform before completing the
2481 * returned CompletableFuture
2482 * @return the new CompletableFuture
2483 */
2484 public CompletableFuture<Void> runAfterEitherAsync
2485 (CompletableFuture<?> other,
2486 Runnable action) {
2487 return doOrRun(other, action, ForkJoinPool.commonPool());
2488 }
2489
2490 /**
2491 * Creates and returns a CompletableFuture that is completed
2492 * asynchronously using the given executor after this or the other
2493 * given CompletableFuture complete.
2494 * If this and/or the other CompletableFuture complete exceptionally,
2495 * then the returned CompletableFuture may also do so, with a
2496 * CompletionException holding one of these exceptions as its cause.
2497 * No guarantees are made about which exception is used in the
2498 * returned CompletableFuture.
2499 *
2500 * @param other the other CompletableFuture
2501 * @param action the action to perform before completing the
2502 * returned CompletableFuture
2503 * @param executor the executor to use for asynchronous execution
2504 * @return the new CompletableFuture
2505 */
2506 public CompletableFuture<Void> runAfterEitherAsync
2507 (CompletableFuture<?> other,
2508 Runnable action,
2509 Executor executor) {
2510 if (executor == null) throw new NullPointerException();
2511 return doOrRun(other, action, executor);
2512 }
2513
2514 private CompletableFuture<Void> doOrRun(CompletableFuture<?> other,
2515 Runnable action,
2516 Executor e) {
2517 if (other == null || action == null) throw new NullPointerException();
2518 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2519 OrRunCompletion<T> d = null;
2520 Object r;
2521 if ((r = result) == null && (r = other.result) == null) {
2522 d = new OrRunCompletion<T>(this, other, action, dst, e);
2523 CompletionNode q = null, p = new CompletionNode(d);
2524 while ((r = result) == null && (r = other.result) == null) {
2525 if (q != null) {
2526 if (UNSAFE.compareAndSwapObject
2527 (other, COMPLETIONS, q.next = other.completions, q))
2528 break;
2529 }
2530 else if (UNSAFE.compareAndSwapObject
2531 (this, COMPLETIONS, p.next = completions, p))
2532 q = new CompletionNode(d);
2533 }
2534 }
2535 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2536 Throwable ex;
2537 if (r instanceof AltResult)
2538 ex = ((AltResult)r).ex;
2539 else
2540 ex = null;
2541 if (ex == null) {
2542 try {
2543 if (e != null)
2544 e.execute(new AsyncRun(action, dst));
2545 else
2546 action.run();
2547 } catch (Throwable rex) {
2548 ex = rex;
2549 }
2550 }
2551 if (e == null || ex != null)
2552 dst.internalComplete(null, ex);
2553 }
2554 helpPostComplete();
2555 other.helpPostComplete();
2556 return dst;
2557 }
2558
2559 /**
2560 * Returns a CompletableFuture (or an equivalent one) produced by
2561 * the given function of the result of this CompletableFuture when
2562 * completed. If this CompletableFuture completes exceptionally,
2563 * then the returned CompletableFuture also does so, with a
2564 * CompletionException holding this exception as its cause.
2565 *
2566 * @param fn the function returning a new CompletableFuture
2567 * @return the CompletableFuture, that {@code isDone()} upon
2568 * return if completed by the given function, or an exception
2569 * occurs
2570 */
2571 public <U> CompletableFuture<U> thenCompose
2572 (Function<? super T, CompletableFuture<U>> fn) {
2573 return doCompose(fn, null);
2574 }
2575
2576 /**
2577 * Returns a CompletableFuture (or an equivalent one) produced
2578 * asynchronously using the {@link ForkJoinPool#commonPool()} by
2579 * the given function of the result of this CompletableFuture when
2580 * completed. If this CompletableFuture completes exceptionally,
2581 * then the returned CompletableFuture also does so, with a
2582 * CompletionException holding this exception as its cause.
2583 *
2584 * @param fn the function returning a new CompletableFuture
2585 * @return the CompletableFuture, that {@code isDone()} upon
2586 * return if completed by the given function, or an exception
2587 * occurs
2588 */
2589 public <U> CompletableFuture<U> thenComposeAsync
2590 (Function<? super T, CompletableFuture<U>> fn) {
2591 return doCompose(fn, ForkJoinPool.commonPool());
2592 }
2593
2594 /**
2595 * Returns a CompletableFuture (or an equivalent one) produced
2596 * asynchronously using the given executor by the given function
2597 * of the result of this CompletableFuture when completed.
2598 * If this CompletableFuture completes exceptionally, then the
2599 * returned CompletableFuture also does so, with a
2600 * CompletionException holding this exception as its cause.
2601 *
2602 * @param fn the function returning a new CompletableFuture
2603 * @param executor the executor to use for asynchronous execution
2604 * @return the CompletableFuture, that {@code isDone()} upon
2605 * return if completed by the given function, or an exception
2606 * occurs
2607 */
2608 public <U> CompletableFuture<U> thenComposeAsync
2609 (Function<? super T, CompletableFuture<U>> fn,
2610 Executor executor) {
2611 if (executor == null) throw new NullPointerException();
2612 return doCompose(fn, executor);
2613 }
2614
2615 private <U> CompletableFuture<U> doCompose
2616 (Function<? super T, CompletableFuture<U>> fn,
2617 Executor e) {
2618 if (fn == null) throw new NullPointerException();
2619 CompletableFuture<U> dst = null;
2620 ComposeCompletion<T,U> d = null;
2621 Object r;
2622 if ((r = result) == null) {
2623 dst = new CompletableFuture<U>();
2624 CompletionNode p = new CompletionNode
2625 (d = new ComposeCompletion<T,U>(this, fn, dst, e));
2626 while ((r = result) == null) {
2627 if (UNSAFE.compareAndSwapObject
2628 (this, COMPLETIONS, p.next = completions, p))
2629 break;
2630 }
2631 }
2632 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2633 T t; Throwable ex;
2634 if (r instanceof AltResult) {
2635 ex = ((AltResult)r).ex;
2636 t = null;
2637 }
2638 else {
2639 ex = null;
2640 @SuppressWarnings("unchecked") T tr = (T) r;
2641 t = tr;
2642 }
2643 if (ex == null) {
2644 if (e != null) {
2645 if (dst == null)
2646 dst = new CompletableFuture<U>();
2647 e.execute(new AsyncCompose<T,U>(t, fn, dst));
2648 }
2649 else {
2650 try {
2651 dst = fn.apply(t);
2652 } catch (Throwable rex) {
2653 ex = rex;
2654 }
2655 if (dst == null) {
2656 dst = new CompletableFuture<U>();
2657 if (ex == null)
2658 ex = new NullPointerException();
2659 }
2660 }
2661 }
2662 if (e == null && ex != null)
2663 dst.internalComplete(null, ex);
2664 }
2665 helpPostComplete();
2666 dst.helpPostComplete();
2667 return dst;
2668 }
2669
2670 /**
2671 * Creates and returns a CompletableFuture that is completed with
2672 * the result of the given function of the exception triggering
2673 * this CompletableFuture's completion when it completes
2674 * exceptionally; Otherwise, if this CompletableFuture completes
2675 * normally, then the returned CompletableFuture also completes
2676 * normally with the same value.
2677 *
2678 * @param fn the function to use to compute the value of the
2679 * returned CompletableFuture if this CompletableFuture completed
2680 * exceptionally
2681 * @return the new CompletableFuture
2682 */
2683 public CompletableFuture<T> exceptionally
2684 (Function<Throwable, ? extends T> fn) {
2685 if (fn == null) throw new NullPointerException();
2686 CompletableFuture<T> dst = new CompletableFuture<T>();
2687 ExceptionCompletion<T> d = null;
2688 Object r;
2689 if ((r = result) == null) {
2690 CompletionNode p =
2691 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2692 while ((r = result) == null) {
2693 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2694 p.next = completions, p))
2695 break;
2696 }
2697 }
2698 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2699 T t = null; Throwable ex, dx = null;
2700 if (r instanceof AltResult) {
2701 if ((ex = ((AltResult)r).ex) != null) {
2702 try {
2703 t = fn.apply(ex);
2704 } catch (Throwable rex) {
2705 dx = rex;
2706 }
2707 }
2708 }
2709 else {
2710 @SuppressWarnings("unchecked") T tr = (T) r;
2711 t = tr;
2712 }
2713 dst.internalComplete(t, dx);
2714 }
2715 helpPostComplete();
2716 return dst;
2717 }
2718
2719 /**
2720 * Creates and returns a CompletableFuture that is completed with
2721 * the result of the given function of the result and exception of
2722 * this CompletableFuture's completion when it completes. The
2723 * given function is invoked with the result (or {@code null} if
2724 * none) and the exception (or {@code null} if none) of this
2725 * CompletableFuture when complete.
2726 *
2727 * @param fn the function to use to compute the value of the
2728 * returned CompletableFuture
2729
2730 * @return the new CompletableFuture
2731 */
2732 public <U> CompletableFuture<U> handle
2733 (BiFunction<? super T, Throwable, ? extends U> fn) {
2734 if (fn == null) throw new NullPointerException();
2735 CompletableFuture<U> dst = new CompletableFuture<U>();
2736 HandleCompletion<T,U> d = null;
2737 Object r;
2738 if ((r = result) == null) {
2739 CompletionNode p =
2740 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2741 while ((r = result) == null) {
2742 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2743 p.next = completions, p))
2744 break;
2745 }
2746 }
2747 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2748 T t; Throwable ex;
2749 if (r instanceof AltResult) {
2750 ex = ((AltResult)r).ex;
2751 t = null;
2752 }
2753 else {
2754 ex = null;
2755 @SuppressWarnings("unchecked") T tr = (T) r;
2756 t = tr;
2757 }
2758 U u; Throwable dx;
2759 try {
2760 u = fn.apply(t, ex);
2761 dx = null;
2762 } catch (Throwable rex) {
2763 dx = rex;
2764 u = null;
2765 }
2766 dst.internalComplete(u, dx);
2767 }
2768 helpPostComplete();
2769 return dst;
2770 }
2771
2772
2773 /* ------------- Arbitrary-arity constructions -------------- */
2774
2775 /*
2776 * The basic plan of attack is to recursively form binary
2777 * completion trees of elements. This can be overkill for small
2778 * sets, but scales nicely. The And/All vs Or/Any forms use the
2779 * same idea, but details differ.
2780 */
2781
2782 /**
2783 * Returns a new CompletableFuture that is completed when all of
2784 * the given CompletableFutures complete. If any of the component
2785 * CompletableFuture complete exceptionally, then so does the
2786 * returned CompletableFuture. Otherwise, the results, if any, of
2787 * the component CompletableFutures are not reflected in the
2788 * returned CompletableFuture, but may be obtained by inspecting
2789 * them individually. If the number of components is zero, returns
2790 * a CompletableFuture completed with the value {@code null}.
2791 *
2792 * <p>Among the applications of this method is to await completion
2793 * of a set of independent CompletableFutures before continuing a
2794 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2795 * c3).join();}.
2796 *
2797 * @param cfs the CompletableFutures
2798 * @return a CompletableFuture that is complete when all of the
2799 * given CompletableFutures complete
2800 * @throws NullPointerException if the array or any of its elements are
2801 * {@code null}
2802 */
2803 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2804 int len = cfs.length; // Directly handle empty and singleton cases
2805 if (len > 1)
2806 return allTree(cfs, 0, len - 1);
2807 else {
2808 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2809 CompletableFuture<?> f;
2810 if (len == 0)
2811 dst.result = NIL;
2812 else if ((f = cfs[0]) == null)
2813 throw new NullPointerException();
2814 else {
2815 ThenCopy d = null;
2816 CompletionNode p = null;
2817 Object r;
2818 while ((r = f.result) == null) {
2819 if (d == null)
2820 d = new ThenCopy(f, dst);
2821 else if (p == null)
2822 p = new CompletionNode(d);
2823 else if (UNSAFE.compareAndSwapObject
2824 (f, COMPLETIONS, p.next = f.completions, p))
2825 break;
2826 }
2827 if (r != null && (d == null || d.compareAndSet(0, 1)))
2828 dst.internalComplete(null, (r instanceof AltResult) ?
2829 ((AltResult)r).ex : null);
2830 f.helpPostComplete();
2831 }
2832 return dst;
2833 }
2834 }
2835
2836 /**
2837 * Recursively constructs an And'ed tree of CompletableFutures.
2838 * Called only when array known to have at least two elements.
2839 */
2840 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2841 int lo, int hi) {
2842 CompletableFuture<?> fst, snd;
2843 int mid = (lo + hi) >>> 1;
2844 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2845 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2846 throw new NullPointerException();
2847 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2848 AndCompletion d = null;
2849 CompletionNode p = null, q = null;
2850 Object r = null, s = null;
2851 while ((r = fst.result) == null || (s = snd.result) == null) {
2852 if (d == null)
2853 d = new AndCompletion(fst, snd, dst);
2854 else if (p == null)
2855 p = new CompletionNode(d);
2856 else if (q == null) {
2857 if (UNSAFE.compareAndSwapObject
2858 (fst, COMPLETIONS, p.next = fst.completions, p))
2859 q = new CompletionNode(d);
2860 }
2861 else if (UNSAFE.compareAndSwapObject
2862 (snd, COMPLETIONS, q.next = snd.completions, q))
2863 break;
2864 }
2865 if ((r != null || (r = fst.result) != null) &&
2866 (s != null || (s = snd.result) != null) &&
2867 (d == null || d.compareAndSet(0, 1))) {
2868 Throwable ex;
2869 if (r instanceof AltResult)
2870 ex = ((AltResult)r).ex;
2871 else
2872 ex = null;
2873 if (ex == null && (s instanceof AltResult))
2874 ex = ((AltResult)s).ex;
2875 dst.internalComplete(null, ex);
2876 }
2877 fst.helpPostComplete();
2878 snd.helpPostComplete();
2879 return dst;
2880 }
2881
2882 /**
2883 * Returns a new CompletableFuture that is completed when any of
2884 * the component CompletableFutures complete; with the same result if
2885 * it completed normally, otherwise exceptionally. If the number
2886 * of components is zero, returns an incomplete CompletableFuture.
2887 *
2888 * @param cfs the CompletableFutures
2889 * @return a CompletableFuture that is complete when any of the
2890 * given CompletableFutures complete
2891 * @throws NullPointerException if the array or any of its elements are
2892 * {@code null}
2893 */
2894 public static CompletableFuture<?> anyOf(CompletableFuture<?>... cfs) {
2895 int len = cfs.length; // Same idea as allOf
2896 if (len > 1)
2897 return anyTree(cfs, 0, len - 1);
2898 else {
2899 CompletableFuture<?> dst = new CompletableFuture<Object>();
2900 CompletableFuture<?> f;
2901 if (len == 0)
2902 ; // skip
2903 else if ((f = cfs[0]) == null)
2904 throw new NullPointerException();
2905 else {
2906 ThenCopy d = null;
2907 CompletionNode p = null;
2908 Object r;
2909 while ((r = f.result) == null) {
2910 if (d == null)
2911 d = new ThenCopy(f, dst);
2912 else if (p == null)
2913 p = new CompletionNode(d);
2914 else if (UNSAFE.compareAndSwapObject
2915 (f, COMPLETIONS, p.next = f.completions, p))
2916 break;
2917 }
2918 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2919 Throwable ex; Object t;
2920 if (r instanceof AltResult) {
2921 ex = ((AltResult)r).ex;
2922 t = null;
2923 }
2924 else {
2925 ex = null;
2926 t = r;
2927 }
2928 dst.internalComplete(t, ex);
2929 }
2930 f.helpPostComplete();
2931 }
2932 return dst;
2933 }
2934 }
2935
2936 /**
2937 * Recursively constructs an Or'ed tree of CompletableFutures.
2938 */
2939 private static CompletableFuture<?> anyTree(CompletableFuture<?>[] cfs,
2940 int lo, int hi) {
2941 CompletableFuture<?> fst, snd;
2942 int mid = (lo + hi) >>> 1;
2943 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
2944 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
2945 throw new NullPointerException();
2946 CompletableFuture<?> dst = new CompletableFuture<Object>();
2947 OrCompletion d = null;
2948 CompletionNode p = null, q = null;
2949 Object r;
2950 while ((r = fst.result) == null && (r = snd.result) == null) {
2951 if (d == null)
2952 d = new OrCompletion(fst, snd, dst);
2953 else if (p == null)
2954 p = new CompletionNode(d);
2955 else if (q == null) {
2956 if (UNSAFE.compareAndSwapObject
2957 (fst, COMPLETIONS, p.next = fst.completions, p))
2958 q = new CompletionNode(d);
2959 }
2960 else if (UNSAFE.compareAndSwapObject
2961 (snd, COMPLETIONS, q.next = snd.completions, q))
2962 break;
2963 }
2964 if ((r != null || (r = fst.result) != null ||
2965 (r = snd.result) != null) &&
2966 (d == null || d.compareAndSet(0, 1))) {
2967 Throwable ex; Object t;
2968 if (r instanceof AltResult) {
2969 ex = ((AltResult)r).ex;
2970 t = null;
2971 }
2972 else {
2973 ex = null;
2974 t = r;
2975 }
2976 dst.internalComplete(t, ex);
2977 }
2978 fst.helpPostComplete();
2979 snd.helpPostComplete();
2980 return dst;
2981 }
2982
2983
2984 /* ------------- Control and status methods -------------- */
2985
2986 /**
2987 * If not already completed, completes this CompletableFuture with
2988 * a {@link CancellationException}. Dependent CompletableFutures
2989 * that have not already completed will also complete
2990 * exceptionally, with a {@link CompletionException} caused by
2991 * this {@code CancellationException}.
2992 *
2993 * @param mayInterruptIfRunning this value has no effect in this
2994 * implementation because interrupts are not used to control
2995 * processing.
2996 *
2997 * @return {@code true} if this task is now cancelled
2998 */
2999 public boolean cancel(boolean mayInterruptIfRunning) {
3000 boolean cancelled = (result == null) &&
3001 UNSAFE.compareAndSwapObject
3002 (this, RESULT, null, new AltResult(new CancellationException()));
3003 postComplete();
3004 return cancelled || isCancelled();
3005 }
3006
3007 /**
3008 * Returns {@code true} if this CompletableFuture was cancelled
3009 * before it completed normally.
3010 *
3011 * @return {@code true} if this CompletableFuture was cancelled
3012 * before it completed normally
3013 */
3014 public boolean isCancelled() {
3015 Object r;
3016 return ((r = result) instanceof AltResult) &&
3017 (((AltResult)r).ex instanceof CancellationException);
3018 }
3019
3020 /**
3021 * Forcibly sets or resets the value subsequently returned by
3022 * method {@link #get()} and related methods, whether or not
3023 * already completed. This method is designed for use only in
3024 * error recovery actions, and even in such situations may result
3025 * in ongoing dependent completions using established versus
3026 * overwritten outcomes.
3027 *
3028 * @param value the completion value
3029 */
3030 public void obtrudeValue(T value) {
3031 result = (value == null) ? NIL : value;
3032 postComplete();
3033 }
3034
3035 /**
3036 * Forcibly causes subsequent invocations of method {@link #get()}
3037 * and related methods to throw the given exception, whether or
3038 * not already completed. This method is designed for use only in
3039 * recovery actions, and even in such situations may result in
3040 * ongoing dependent completions using established versus
3041 * overwritten outcomes.
3042 *
3043 * @param ex the exception
3044 */
3045 public void obtrudeException(Throwable ex) {
3046 if (ex == null) throw new NullPointerException();
3047 result = new AltResult(ex);
3048 postComplete();
3049 }
3050
3051 /**
3052 * Returns the estimated number of CompletableFutures whose
3053 * completions are awaiting completion of this CompletableFuture.
3054 * This method is designed for use in monitoring system state, not
3055 * for synchronization control.
3056 *
3057 * @return the number of dependent CompletableFutures
3058 */
3059 public int getNumberOfDependents() {
3060 int count = 0;
3061 for (CompletionNode p = completions; p != null; p = p.next)
3062 ++count;
3063 return count;
3064 }
3065
3066 /**
3067 * Returns a string identifying this CompletableFuture, as well as
3068 * its completion state. The state, in brackets, contains the
3069 * String {@code "Completed Normally"} or the String {@code
3070 * "Completed Exceptionally"}, or the String {@code "Not
3071 * completed"} followed by the number of CompletableFutures
3072 * dependent upon its completion, if any.
3073 *
3074 * @return a string identifying this CompletableFuture, as well as its state
3075 */
3076 public String toString() {
3077 Object r = result;
3078 int count;
3079 return super.toString() +
3080 ((r == null) ?
3081 (((count = getNumberOfDependents()) == 0) ?
3082 "[Not completed]" :
3083 "[Not completed, " + count + " dependents]") :
3084 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
3085 "[Completed exceptionally]" :
3086 "[Completed normally]"));
3087 }
3088
3089 // Unsafe mechanics
3090 private static final sun.misc.Unsafe UNSAFE;
3091 private static final long RESULT;
3092 private static final long WAITERS;
3093 private static final long COMPLETIONS;
3094 static {
3095 try {
3096 UNSAFE = sun.misc.Unsafe.getUnsafe();
3097 Class<?> k = CompletableFuture.class;
3098 RESULT = UNSAFE.objectFieldOffset
3099 (k.getDeclaredField("result"));
3100 WAITERS = UNSAFE.objectFieldOffset
3101 (k.getDeclaredField("waiters"));
3102 COMPLETIONS = UNSAFE.objectFieldOffset
3103 (k.getDeclaredField("completions"));
3104 } catch (Exception e) {
3105 throw new Error(e);
3106 }
3107 }
3108 }