ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.52
Committed: Mon Feb 11 07:46:34 2013 UTC (11 years, 3 months ago) by jsr166
Branch: MAIN
Changes since 1.51: +1 -1 lines
Log Message:
javadoc link readability

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.locks.LockSupport;
24
25 /**
26 * A {@link Future} that may be explicitly completed (setting its
27 * value and status), and may include dependent functions and actions
28 * that trigger upon its completion.
29 *
30 * <p>When two or more threads attempt to
31 * {@link #complete complete},
32 * {@link #completeExceptionally completeExceptionally}, or
33 * {@link #cancel cancel}
34 * a CompletableFuture, only one of them succeeds.
35 *
36 * <p>Methods are available for adding dependents based on Functions,
37 * Consumers, and Runnables. The appropriate form to use depends on
38 * whether actions require arguments and/or produce results. Actions
39 * may also be triggered after either or both the current and another
40 * CompletableFuture complete. Multiple CompletableFutures may also
41 * be grouped as one using {@link #anyOf(CompletableFuture...)} and
42 * {@link #allOf(CompletableFuture...)}.
43 *
44 * <p>Actions supplied for dependent completions (mainly using methods
45 * with prefix {@code then}) may be performed by the thread that
46 * completes the current CompletableFuture, or by any other caller of
47 * these methods. There are no guarantees about the order of
48 * processing completions unless constrained by these methods.
49 *
50 * <p>Upon exceptional completion (including cancellation), or when a
51 * completion entails computation, and it terminates abruptly with an
52 * (unchecked) exception or error, then further completions act as
53 * {@code completeExceptionally} with a {@link CompletionException}
54 * holding that exception as its cause. If a CompletableFuture
55 * completes exceptionally, and is not followed by a {@link
56 * #exceptionally} or {@link #handle} completion, then all of its
57 * dependents (and their dependents) also complete exceptionally with
58 * CompletionExceptions holding the ultimate cause. In case of a
59 * CompletionException, methods {@link #get()} and {@link #get(long,
60 * TimeUnit)} throw an {@link ExecutionException} with the same cause
61 * as would be held in the corresponding CompletionException. However,
62 * in these cases, methods {@link #join()} and {@link #getNow} throw
63 * the CompletionException, which simplifies usage.
64 *
65 * <p>CompletableFutures themselves do not execute asynchronously.
66 * However, the {@code async} methods provide commonly useful ways to
67 * commence asynchronous processing, using either a given {@link
68 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
69 * function or action that will result in the completion of a new
70 * CompletableFuture. To simplify monitoring, debugging, and tracking,
71 * all generated asynchronous tasks are instances of the tagging
72 * interface {@link AsynchronousCompletionTask}.
73 *
74 * @author Doug Lea
75 * @since 1.8
76 */
77 public class CompletableFuture<T> implements Future<T> {
78
79 /*
80 * Overview:
81 *
82 * 1. Non-nullness of field result (set via CAS) indicates done.
83 * An AltResult is used to box null as a result, as well as to
84 * hold exceptions. Using a single field makes completion fast
85 * and simple to detect and trigger, at the expense of a lot of
86 * encoding and decoding that infiltrates many methods. One minor
87 * simplification relies on the (static) NIL (to box null results)
88 * being the only AltResult with a null exception field, so we
89 * don't usually need explicit comparisons with NIL. The CF
90 * exception propagation mechanics surrounding decoding rely on
91 * unchecked casts of decoded results really being unchecked,
92 * where user type errors are caught at point of use, as is
93 * currently the case in Java. These are highlighted by using
94 * SuppressWarnings-annotated temporaries.
95 *
96 * 2. Waiters are held in a Treiber stack similar to the one used
97 * in FutureTask, Phaser, and SynchronousQueue. See their
98 * internal documentation for algorithmic details.
99 *
100 * 3. Completions are also kept in a list/stack, and pulled off
101 * and run when completion is triggered. (We could even use the
102 * same stack as for waiters, but would give up the potential
103 * parallelism obtained because woken waiters help release/run
104 * others -- see method postComplete). Because post-processing
105 * may race with direct calls, class Completion opportunistically
106 * extends AtomicInteger so callers can claim the action via
107 * compareAndSet(0, 1). The Completion.run methods are all
108 * written a boringly similar uniform way (that sometimes includes
109 * unnecessary-looking checks, kept to maintain uniformity). There
110 * are enough dimensions upon which they differ that attempts to
111 * factor commonalities while maintaining efficiency require more
112 * lines of code than they would save.
113 *
114 * 4. The exported then/and/or methods do support a bit of
115 * factoring (see doThenApply etc). They must cope with the
116 * intrinsic races surrounding addition of a dependent action
117 * versus performing the action directly because the task is
118 * already complete. For example, a CF may not be complete upon
119 * entry, so a dependent completion is added, but by the time it
120 * is added, the target CF is complete, so must be directly
121 * executed. This is all done while avoiding unnecessary object
122 * construction in safe-bypass cases.
123 */
124
125 // preliminaries
126
127 static final class AltResult {
128 final Throwable ex; // null only for NIL
129 AltResult(Throwable ex) { this.ex = ex; }
130 }
131
132 static final AltResult NIL = new AltResult(null);
133
134 // Fields
135
136 volatile Object result; // Either the result or boxed AltResult
137 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
138 volatile CompletionNode completions; // list (Treiber stack) of completions
139
140 // Basic utilities for triggering and processing completions
141
142 /**
143 * Removes and signals all waiting threads and runs all completions.
144 */
145 final void postComplete() {
146 WaitNode q; Thread t;
147 while ((q = waiters) != null) {
148 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
149 (t = q.thread) != null) {
150 q.thread = null;
151 LockSupport.unpark(t);
152 }
153 }
154
155 CompletionNode h; Completion c;
156 while ((h = completions) != null) {
157 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
158 (c = h.completion) != null)
159 c.run();
160 }
161 }
162
163 /**
164 * Triggers completion with the encoding of the given arguments:
165 * if the exception is non-null, encodes it as a wrapped
166 * CompletionException unless it is one already. Otherwise uses
167 * the given result, boxed as NIL if null.
168 */
169 final void internalComplete(Object v, Throwable ex) {
170 if (result == null)
171 UNSAFE.compareAndSwapObject
172 (this, RESULT, null,
173 (ex == null) ? (v == null) ? NIL : v :
174 new AltResult((ex instanceof CompletionException) ? ex :
175 new CompletionException(ex)));
176 postComplete(); // help out even if not triggered
177 }
178
179 /**
180 * If triggered, helps release and/or process completions.
181 */
182 final void helpPostComplete() {
183 if (result != null)
184 postComplete();
185 }
186
187 /* ------------- waiting for completions -------------- */
188
189 /** Number of processors, for spin control */
190 static final int NCPU = Runtime.getRuntime().availableProcessors();
191
192 /**
193 * Heuristic spin value for waitingGet() before blocking on
194 * multiprocessors
195 */
196 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
197
198 /**
199 * Linked nodes to record waiting threads in a Treiber stack. See
200 * other classes such as Phaser and SynchronousQueue for more
201 * detailed explanation. This class implements ManagedBlocker to
202 * avoid starvation when blocking actions pile up in
203 * ForkJoinPools.
204 */
205 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
206 long nanos; // wait time if timed
207 final long deadline; // non-zero if timed
208 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
209 volatile Thread thread;
210 volatile WaitNode next;
211 WaitNode(boolean interruptible, long nanos, long deadline) {
212 this.thread = Thread.currentThread();
213 this.interruptControl = interruptible ? 1 : 0;
214 this.nanos = nanos;
215 this.deadline = deadline;
216 }
217 public boolean isReleasable() {
218 if (thread == null)
219 return true;
220 if (Thread.interrupted()) {
221 int i = interruptControl;
222 interruptControl = -1;
223 if (i > 0)
224 return true;
225 }
226 if (deadline != 0L &&
227 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
228 thread = null;
229 return true;
230 }
231 return false;
232 }
233 public boolean block() {
234 if (isReleasable())
235 return true;
236 else if (deadline == 0L)
237 LockSupport.park(this);
238 else if (nanos > 0L)
239 LockSupport.parkNanos(this, nanos);
240 return isReleasable();
241 }
242 }
243
244 /**
245 * Returns raw result after waiting, or null if interruptible and
246 * interrupted.
247 */
248 private Object waitingGet(boolean interruptible) {
249 WaitNode q = null;
250 boolean queued = false;
251 int spins = SPINS;
252 for (Object r;;) {
253 if ((r = result) != null) {
254 if (q != null) { // suppress unpark
255 q.thread = null;
256 if (q.interruptControl < 0) {
257 if (interruptible) {
258 removeWaiter(q);
259 return null;
260 }
261 Thread.currentThread().interrupt();
262 }
263 }
264 postComplete(); // help release others
265 return r;
266 }
267 else if (spins > 0) {
268 int rnd = ThreadLocalRandom.nextSecondarySeed();
269 if (rnd == 0)
270 rnd = ThreadLocalRandom.current().nextInt();
271 if (rnd >= 0)
272 --spins;
273 }
274 else if (q == null)
275 q = new WaitNode(interruptible, 0L, 0L);
276 else if (!queued)
277 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
278 q.next = waiters, q);
279 else if (interruptible && q.interruptControl < 0) {
280 removeWaiter(q);
281 return null;
282 }
283 else if (q.thread != null && result == null) {
284 try {
285 ForkJoinPool.managedBlock(q);
286 } catch (InterruptedException ex) {
287 q.interruptControl = -1;
288 }
289 }
290 }
291 }
292
293 /**
294 * Awaits completion or aborts on interrupt or timeout.
295 *
296 * @param nanos time to wait
297 * @return raw result
298 */
299 private Object timedAwaitDone(long nanos)
300 throws InterruptedException, TimeoutException {
301 WaitNode q = null;
302 boolean queued = false;
303 for (Object r;;) {
304 if ((r = result) != null) {
305 if (q != null) {
306 q.thread = null;
307 if (q.interruptControl < 0) {
308 removeWaiter(q);
309 throw new InterruptedException();
310 }
311 }
312 postComplete();
313 return r;
314 }
315 else if (q == null) {
316 if (nanos <= 0L)
317 throw new TimeoutException();
318 long d = System.nanoTime() + nanos;
319 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
320 }
321 else if (!queued)
322 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
323 q.next = waiters, q);
324 else if (q.interruptControl < 0) {
325 removeWaiter(q);
326 throw new InterruptedException();
327 }
328 else if (q.nanos <= 0L) {
329 if (result == null) {
330 removeWaiter(q);
331 throw new TimeoutException();
332 }
333 }
334 else if (q.thread != null && result == null) {
335 try {
336 ForkJoinPool.managedBlock(q);
337 } catch (InterruptedException ex) {
338 q.interruptControl = -1;
339 }
340 }
341 }
342 }
343
344 /**
345 * Tries to unlink a timed-out or interrupted wait node to avoid
346 * accumulating garbage. Internal nodes are simply unspliced
347 * without CAS since it is harmless if they are traversed anyway
348 * by releasers. To avoid effects of unsplicing from already
349 * removed nodes, the list is retraversed in case of an apparent
350 * race. This is slow when there are a lot of nodes, but we don't
351 * expect lists to be long enough to outweigh higher-overhead
352 * schemes.
353 */
354 private void removeWaiter(WaitNode node) {
355 if (node != null) {
356 node.thread = null;
357 retry:
358 for (;;) { // restart on removeWaiter race
359 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
360 s = q.next;
361 if (q.thread != null)
362 pred = q;
363 else if (pred != null) {
364 pred.next = s;
365 if (pred.thread == null) // check for race
366 continue retry;
367 }
368 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
369 continue retry;
370 }
371 break;
372 }
373 }
374 }
375
376 /* ------------- Async tasks -------------- */
377
378 /**
379 * A tagging interface identifying asynchronous tasks produced by
380 * {@code async} methods. This may be useful for monitoring,
381 * debugging, and tracking asynchronous activities.
382 */
383 public static interface AsynchronousCompletionTask {
384 }
385
386 /** Base class can act as either FJ or plain Runnable */
387 abstract static class Async extends ForkJoinTask<Void>
388 implements Runnable, AsynchronousCompletionTask {
389 public final Void getRawResult() { return null; }
390 public final void setRawResult(Void v) { }
391 public final void run() { exec(); }
392 }
393
394 static final class AsyncRun extends Async {
395 final Runnable fn;
396 final CompletableFuture<Void> dst;
397 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
398 this.fn = fn; this.dst = dst;
399 }
400 public final boolean exec() {
401 CompletableFuture<Void> d; Throwable ex;
402 if ((d = this.dst) != null && d.result == null) {
403 try {
404 fn.run();
405 ex = null;
406 } catch (Throwable rex) {
407 ex = rex;
408 }
409 d.internalComplete(null, ex);
410 }
411 return true;
412 }
413 private static final long serialVersionUID = 5232453952276885070L;
414 }
415
416 static final class AsyncSupply<U> extends Async {
417 final Supplier<U> fn;
418 final CompletableFuture<U> dst;
419 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
420 this.fn = fn; this.dst = dst;
421 }
422 public final boolean exec() {
423 CompletableFuture<U> d; U u; Throwable ex;
424 if ((d = this.dst) != null && d.result == null) {
425 try {
426 u = fn.get();
427 ex = null;
428 } catch (Throwable rex) {
429 ex = rex;
430 u = null;
431 }
432 d.internalComplete(u, ex);
433 }
434 return true;
435 }
436 private static final long serialVersionUID = 5232453952276885070L;
437 }
438
439 static final class AsyncApply<T,U> extends Async {
440 final Function<? super T,? extends U> fn;
441 final T arg;
442 final CompletableFuture<U> dst;
443 AsyncApply(T arg, Function<? super T,? extends U> fn,
444 CompletableFuture<U> dst) {
445 this.arg = arg; this.fn = fn; this.dst = dst;
446 }
447 public final boolean exec() {
448 CompletableFuture<U> d; U u; Throwable ex;
449 if ((d = this.dst) != null && d.result == null) {
450 try {
451 u = fn.apply(arg);
452 ex = null;
453 } catch (Throwable rex) {
454 ex = rex;
455 u = null;
456 }
457 d.internalComplete(u, ex);
458 }
459 return true;
460 }
461 private static final long serialVersionUID = 5232453952276885070L;
462 }
463
464 static final class AsyncBiApply<T,U,V> extends Async {
465 final BiFunction<? super T,? super U,? extends V> fn;
466 final T arg1;
467 final U arg2;
468 final CompletableFuture<V> dst;
469 AsyncBiApply(T arg1, U arg2,
470 BiFunction<? super T,? super U,? extends V> fn,
471 CompletableFuture<V> dst) {
472 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
473 }
474 public final boolean exec() {
475 CompletableFuture<V> d; V v; Throwable ex;
476 if ((d = this.dst) != null && d.result == null) {
477 try {
478 v = fn.apply(arg1, arg2);
479 ex = null;
480 } catch (Throwable rex) {
481 ex = rex;
482 v = null;
483 }
484 d.internalComplete(v, ex);
485 }
486 return true;
487 }
488 private static final long serialVersionUID = 5232453952276885070L;
489 }
490
491 static final class AsyncAccept<T> extends Async {
492 final Consumer<? super T> fn;
493 final T arg;
494 final CompletableFuture<Void> dst;
495 AsyncAccept(T arg, Consumer<? super T> fn,
496 CompletableFuture<Void> dst) {
497 this.arg = arg; this.fn = fn; this.dst = dst;
498 }
499 public final boolean exec() {
500 CompletableFuture<Void> d; Throwable ex;
501 if ((d = this.dst) != null && d.result == null) {
502 try {
503 fn.accept(arg);
504 ex = null;
505 } catch (Throwable rex) {
506 ex = rex;
507 }
508 d.internalComplete(null, ex);
509 }
510 return true;
511 }
512 private static final long serialVersionUID = 5232453952276885070L;
513 }
514
515 static final class AsyncBiAccept<T,U> extends Async {
516 final BiConsumer<? super T,? super U> fn;
517 final T arg1;
518 final U arg2;
519 final CompletableFuture<Void> dst;
520 AsyncBiAccept(T arg1, U arg2,
521 BiConsumer<? super T,? super U> fn,
522 CompletableFuture<Void> dst) {
523 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
524 }
525 public final boolean exec() {
526 CompletableFuture<Void> d; Throwable ex;
527 if ((d = this.dst) != null && d.result == null) {
528 try {
529 fn.accept(arg1, arg2);
530 ex = null;
531 } catch (Throwable rex) {
532 ex = rex;
533 }
534 d.internalComplete(null, ex);
535 }
536 return true;
537 }
538 private static final long serialVersionUID = 5232453952276885070L;
539 }
540
541 static final class AsyncCompose<T,U> extends Async {
542 final Function<? super T, CompletableFuture<U>> fn;
543 final T arg;
544 final CompletableFuture<U> dst;
545 AsyncCompose(T arg,
546 Function<? super T, CompletableFuture<U>> fn,
547 CompletableFuture<U> dst) {
548 this.arg = arg; this.fn = fn; this.dst = dst;
549 }
550 public final boolean exec() {
551 CompletableFuture<U> d, fr; U u; Throwable ex;
552 if ((d = this.dst) != null && d.result == null) {
553 try {
554 fr = fn.apply(arg);
555 ex = null;
556 } catch (Throwable rex) {
557 ex = rex;
558 fr = null;
559 }
560 if (ex != null)
561 u = null;
562 else if (fr == null) {
563 ex = new NullPointerException();
564 u = null;
565 }
566 else {
567 Object r = fr.result;
568 if (r instanceof AltResult) {
569 ex = ((AltResult)r).ex;
570 u = null;
571 }
572 else {
573 @SuppressWarnings("unchecked") U ur = (U) r;
574 u = ur;
575 }
576 }
577 d.internalComplete(u, ex);
578 }
579 return true;
580 }
581 private static final long serialVersionUID = 5232453952276885070L;
582 }
583
584 /* ------------- Completions -------------- */
585
586 /**
587 * Simple linked list nodes to record completions, used in
588 * basically the same way as WaitNodes. (We separate nodes from
589 * the Completions themselves mainly because for the And and Or
590 * methods, the same Completion object resides in two lists.)
591 */
592 static final class CompletionNode {
593 final Completion completion;
594 volatile CompletionNode next;
595 CompletionNode(Completion completion) { this.completion = completion; }
596 }
597
598 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
599 abstract static class Completion extends AtomicInteger implements Runnable {
600 }
601
602 static final class ApplyCompletion<T,U> extends Completion {
603 final CompletableFuture<? extends T> src;
604 final Function<? super T,? extends U> fn;
605 final CompletableFuture<U> dst;
606 final Executor executor;
607 ApplyCompletion(CompletableFuture<? extends T> src,
608 Function<? super T,? extends U> fn,
609 CompletableFuture<U> dst, Executor executor) {
610 this.src = src; this.fn = fn; this.dst = dst;
611 this.executor = executor;
612 }
613 public final void run() {
614 final CompletableFuture<? extends T> a;
615 final Function<? super T,? extends U> fn;
616 final CompletableFuture<U> dst;
617 Object r; T t; Throwable ex;
618 if ((dst = this.dst) != null &&
619 (fn = this.fn) != null &&
620 (a = this.src) != null &&
621 (r = a.result) != null &&
622 compareAndSet(0, 1)) {
623 if (r instanceof AltResult) {
624 ex = ((AltResult)r).ex;
625 t = null;
626 }
627 else {
628 ex = null;
629 @SuppressWarnings("unchecked") T tr = (T) r;
630 t = tr;
631 }
632 Executor e = executor;
633 U u = null;
634 if (ex == null) {
635 try {
636 if (e != null)
637 e.execute(new AsyncApply<T,U>(t, fn, dst));
638 else
639 u = fn.apply(t);
640 } catch (Throwable rex) {
641 ex = rex;
642 }
643 }
644 if (e == null || ex != null)
645 dst.internalComplete(u, ex);
646 }
647 }
648 private static final long serialVersionUID = 5232453952276885070L;
649 }
650
651 static final class AcceptCompletion<T> extends Completion {
652 final CompletableFuture<? extends T> src;
653 final Consumer<? super T> fn;
654 final CompletableFuture<Void> dst;
655 final Executor executor;
656 AcceptCompletion(CompletableFuture<? extends T> src,
657 Consumer<? super T> fn,
658 CompletableFuture<Void> dst, Executor executor) {
659 this.src = src; this.fn = fn; this.dst = dst;
660 this.executor = executor;
661 }
662 public final void run() {
663 final CompletableFuture<? extends T> a;
664 final Consumer<? super T> fn;
665 final CompletableFuture<Void> dst;
666 Object r; T t; Throwable ex;
667 if ((dst = this.dst) != null &&
668 (fn = this.fn) != null &&
669 (a = this.src) != null &&
670 (r = a.result) != null &&
671 compareAndSet(0, 1)) {
672 if (r instanceof AltResult) {
673 ex = ((AltResult)r).ex;
674 t = null;
675 }
676 else {
677 ex = null;
678 @SuppressWarnings("unchecked") T tr = (T) r;
679 t = tr;
680 }
681 Executor e = executor;
682 if (ex == null) {
683 try {
684 if (e != null)
685 e.execute(new AsyncAccept<T>(t, fn, dst));
686 else
687 fn.accept(t);
688 } catch (Throwable rex) {
689 ex = rex;
690 }
691 }
692 if (e == null || ex != null)
693 dst.internalComplete(null, ex);
694 }
695 }
696 private static final long serialVersionUID = 5232453952276885070L;
697 }
698
699 static final class RunCompletion<T> extends Completion {
700 final CompletableFuture<? extends T> src;
701 final Runnable fn;
702 final CompletableFuture<Void> dst;
703 final Executor executor;
704 RunCompletion(CompletableFuture<? extends T> src,
705 Runnable fn,
706 CompletableFuture<Void> dst,
707 Executor executor) {
708 this.src = src; this.fn = fn; this.dst = dst;
709 this.executor = executor;
710 }
711 public final void run() {
712 final CompletableFuture<? extends T> a;
713 final Runnable fn;
714 final CompletableFuture<Void> dst;
715 Object r; Throwable ex;
716 if ((dst = this.dst) != null &&
717 (fn = this.fn) != null &&
718 (a = this.src) != null &&
719 (r = a.result) != null &&
720 compareAndSet(0, 1)) {
721 if (r instanceof AltResult)
722 ex = ((AltResult)r).ex;
723 else
724 ex = null;
725 Executor e = executor;
726 if (ex == null) {
727 try {
728 if (e != null)
729 e.execute(new AsyncRun(fn, dst));
730 else
731 fn.run();
732 } catch (Throwable rex) {
733 ex = rex;
734 }
735 }
736 if (e == null || ex != null)
737 dst.internalComplete(null, ex);
738 }
739 }
740 private static final long serialVersionUID = 5232453952276885070L;
741 }
742
743 static final class BiApplyCompletion<T,U,V> extends Completion {
744 final CompletableFuture<? extends T> src;
745 final CompletableFuture<? extends U> snd;
746 final BiFunction<? super T,? super U,? extends V> fn;
747 final CompletableFuture<V> dst;
748 final Executor executor;
749 BiApplyCompletion(CompletableFuture<? extends T> src,
750 CompletableFuture<? extends U> snd,
751 BiFunction<? super T,? super U,? extends V> fn,
752 CompletableFuture<V> dst, Executor executor) {
753 this.src = src; this.snd = snd;
754 this.fn = fn; this.dst = dst;
755 this.executor = executor;
756 }
757 public final void run() {
758 final CompletableFuture<? extends T> a;
759 final CompletableFuture<? extends U> b;
760 final BiFunction<? super T,? super U,? extends V> fn;
761 final CompletableFuture<V> dst;
762 Object r, s; T t; U u; Throwable ex;
763 if ((dst = this.dst) != null &&
764 (fn = this.fn) != null &&
765 (a = this.src) != null &&
766 (r = a.result) != null &&
767 (b = this.snd) != null &&
768 (s = b.result) != null &&
769 compareAndSet(0, 1)) {
770 if (r instanceof AltResult) {
771 ex = ((AltResult)r).ex;
772 t = null;
773 }
774 else {
775 ex = null;
776 @SuppressWarnings("unchecked") T tr = (T) r;
777 t = tr;
778 }
779 if (ex != null)
780 u = null;
781 else if (s instanceof AltResult) {
782 ex = ((AltResult)s).ex;
783 u = null;
784 }
785 else {
786 @SuppressWarnings("unchecked") U us = (U) s;
787 u = us;
788 }
789 Executor e = executor;
790 V v = null;
791 if (ex == null) {
792 try {
793 if (e != null)
794 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
795 else
796 v = fn.apply(t, u);
797 } catch (Throwable rex) {
798 ex = rex;
799 }
800 }
801 if (e == null || ex != null)
802 dst.internalComplete(v, ex);
803 }
804 }
805 private static final long serialVersionUID = 5232453952276885070L;
806 }
807
808 static final class BiAcceptCompletion<T,U> extends Completion {
809 final CompletableFuture<? extends T> src;
810 final CompletableFuture<? extends U> snd;
811 final BiConsumer<? super T,? super U> fn;
812 final CompletableFuture<Void> dst;
813 final Executor executor;
814 BiAcceptCompletion(CompletableFuture<? extends T> src,
815 CompletableFuture<? extends U> snd,
816 BiConsumer<? super T,? super U> fn,
817 CompletableFuture<Void> dst, Executor executor) {
818 this.src = src; this.snd = snd;
819 this.fn = fn; this.dst = dst;
820 this.executor = executor;
821 }
822 public final void run() {
823 final CompletableFuture<? extends T> a;
824 final CompletableFuture<? extends U> b;
825 final BiConsumer<? super T,? super U> fn;
826 final CompletableFuture<Void> dst;
827 Object r, s; T t; U u; Throwable ex;
828 if ((dst = this.dst) != null &&
829 (fn = this.fn) != null &&
830 (a = this.src) != null &&
831 (r = a.result) != null &&
832 (b = this.snd) != null &&
833 (s = b.result) != null &&
834 compareAndSet(0, 1)) {
835 if (r instanceof AltResult) {
836 ex = ((AltResult)r).ex;
837 t = null;
838 }
839 else {
840 ex = null;
841 @SuppressWarnings("unchecked") T tr = (T) r;
842 t = tr;
843 }
844 if (ex != null)
845 u = null;
846 else if (s instanceof AltResult) {
847 ex = ((AltResult)s).ex;
848 u = null;
849 }
850 else {
851 @SuppressWarnings("unchecked") U us = (U) s;
852 u = us;
853 }
854 Executor e = executor;
855 if (ex == null) {
856 try {
857 if (e != null)
858 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
859 else
860 fn.accept(t, u);
861 } catch (Throwable rex) {
862 ex = rex;
863 }
864 }
865 if (e == null || ex != null)
866 dst.internalComplete(null, ex);
867 }
868 }
869 private static final long serialVersionUID = 5232453952276885070L;
870 }
871
872 static final class BiRunCompletion<T> extends Completion {
873 final CompletableFuture<? extends T> src;
874 final CompletableFuture<?> snd;
875 final Runnable fn;
876 final CompletableFuture<Void> dst;
877 final Executor executor;
878 BiRunCompletion(CompletableFuture<? extends T> src,
879 CompletableFuture<?> snd,
880 Runnable fn,
881 CompletableFuture<Void> dst, Executor executor) {
882 this.src = src; this.snd = snd;
883 this.fn = fn; this.dst = dst;
884 this.executor = executor;
885 }
886 public final void run() {
887 final CompletableFuture<? extends T> a;
888 final CompletableFuture<?> b;
889 final Runnable fn;
890 final CompletableFuture<Void> dst;
891 Object r, s; Throwable ex;
892 if ((dst = this.dst) != null &&
893 (fn = this.fn) != null &&
894 (a = this.src) != null &&
895 (r = a.result) != null &&
896 (b = this.snd) != null &&
897 (s = b.result) != null &&
898 compareAndSet(0, 1)) {
899 if (r instanceof AltResult)
900 ex = ((AltResult)r).ex;
901 else
902 ex = null;
903 if (ex == null && (s instanceof AltResult))
904 ex = ((AltResult)s).ex;
905 Executor e = executor;
906 if (ex == null) {
907 try {
908 if (e != null)
909 e.execute(new AsyncRun(fn, dst));
910 else
911 fn.run();
912 } catch (Throwable rex) {
913 ex = rex;
914 }
915 }
916 if (e == null || ex != null)
917 dst.internalComplete(null, ex);
918 }
919 }
920 private static final long serialVersionUID = 5232453952276885070L;
921 }
922
923 static final class AndCompletion extends Completion {
924 final CompletableFuture<?> src;
925 final CompletableFuture<?> snd;
926 final CompletableFuture<Void> dst;
927 AndCompletion(CompletableFuture<?> src,
928 CompletableFuture<?> snd,
929 CompletableFuture<Void> dst) {
930 this.src = src; this.snd = snd; this.dst = dst;
931 }
932 public final void run() {
933 final CompletableFuture<?> a;
934 final CompletableFuture<?> b;
935 final CompletableFuture<Void> dst;
936 Object r, s; Throwable ex;
937 if ((dst = this.dst) != null &&
938 (a = this.src) != null &&
939 (r = a.result) != null &&
940 (b = this.snd) != null &&
941 (s = b.result) != null &&
942 compareAndSet(0, 1)) {
943 if (r instanceof AltResult)
944 ex = ((AltResult)r).ex;
945 else
946 ex = null;
947 if (ex == null && (s instanceof AltResult))
948 ex = ((AltResult)s).ex;
949 dst.internalComplete(null, ex);
950 }
951 }
952 private static final long serialVersionUID = 5232453952276885070L;
953 }
954
955 static final class OrApplyCompletion<T,U> extends Completion {
956 final CompletableFuture<? extends T> src;
957 final CompletableFuture<? extends T> snd;
958 final Function<? super T,? extends U> fn;
959 final CompletableFuture<U> dst;
960 final Executor executor;
961 OrApplyCompletion(CompletableFuture<? extends T> src,
962 CompletableFuture<? extends T> snd,
963 Function<? super T,? extends U> fn,
964 CompletableFuture<U> dst, Executor executor) {
965 this.src = src; this.snd = snd;
966 this.fn = fn; this.dst = dst;
967 this.executor = executor;
968 }
969 public final void run() {
970 final CompletableFuture<? extends T> a;
971 final CompletableFuture<? extends T> b;
972 final Function<? super T,? extends U> fn;
973 final CompletableFuture<U> dst;
974 Object r; T t; Throwable ex;
975 if ((dst = this.dst) != null &&
976 (fn = this.fn) != null &&
977 (((a = this.src) != null && (r = a.result) != null) ||
978 ((b = this.snd) != null && (r = b.result) != null)) &&
979 compareAndSet(0, 1)) {
980 if (r instanceof AltResult) {
981 ex = ((AltResult)r).ex;
982 t = null;
983 }
984 else {
985 ex = null;
986 @SuppressWarnings("unchecked") T tr = (T) r;
987 t = tr;
988 }
989 Executor e = executor;
990 U u = null;
991 if (ex == null) {
992 try {
993 if (e != null)
994 e.execute(new AsyncApply<T,U>(t, fn, dst));
995 else
996 u = fn.apply(t);
997 } catch (Throwable rex) {
998 ex = rex;
999 }
1000 }
1001 if (e == null || ex != null)
1002 dst.internalComplete(u, ex);
1003 }
1004 }
1005 private static final long serialVersionUID = 5232453952276885070L;
1006 }
1007
1008 static final class OrAcceptCompletion<T> extends Completion {
1009 final CompletableFuture<? extends T> src;
1010 final CompletableFuture<? extends T> snd;
1011 final Consumer<? super T> fn;
1012 final CompletableFuture<Void> dst;
1013 final Executor executor;
1014 OrAcceptCompletion(CompletableFuture<? extends T> src,
1015 CompletableFuture<? extends T> snd,
1016 Consumer<? super T> fn,
1017 CompletableFuture<Void> dst, Executor executor) {
1018 this.src = src; this.snd = snd;
1019 this.fn = fn; this.dst = dst;
1020 this.executor = executor;
1021 }
1022 public final void run() {
1023 final CompletableFuture<? extends T> a;
1024 final CompletableFuture<? extends T> b;
1025 final Consumer<? super T> fn;
1026 final CompletableFuture<Void> dst;
1027 Object r; T t; Throwable ex;
1028 if ((dst = this.dst) != null &&
1029 (fn = this.fn) != null &&
1030 (((a = this.src) != null && (r = a.result) != null) ||
1031 ((b = this.snd) != null && (r = b.result) != null)) &&
1032 compareAndSet(0, 1)) {
1033 if (r instanceof AltResult) {
1034 ex = ((AltResult)r).ex;
1035 t = null;
1036 }
1037 else {
1038 ex = null;
1039 @SuppressWarnings("unchecked") T tr = (T) r;
1040 t = tr;
1041 }
1042 Executor e = executor;
1043 if (ex == null) {
1044 try {
1045 if (e != null)
1046 e.execute(new AsyncAccept<T>(t, fn, dst));
1047 else
1048 fn.accept(t);
1049 } catch (Throwable rex) {
1050 ex = rex;
1051 }
1052 }
1053 if (e == null || ex != null)
1054 dst.internalComplete(null, ex);
1055 }
1056 }
1057 private static final long serialVersionUID = 5232453952276885070L;
1058 }
1059
1060 static final class OrRunCompletion<T> extends Completion {
1061 final CompletableFuture<? extends T> src;
1062 final CompletableFuture<?> snd;
1063 final Runnable fn;
1064 final CompletableFuture<Void> dst;
1065 final Executor executor;
1066 OrRunCompletion(CompletableFuture<? extends T> src,
1067 CompletableFuture<?> snd,
1068 Runnable fn,
1069 CompletableFuture<Void> dst, Executor executor) {
1070 this.src = src; this.snd = snd;
1071 this.fn = fn; this.dst = dst;
1072 this.executor = executor;
1073 }
1074 public final void run() {
1075 final CompletableFuture<? extends T> a;
1076 final CompletableFuture<?> b;
1077 final Runnable fn;
1078 final CompletableFuture<Void> dst;
1079 Object r; Throwable ex;
1080 if ((dst = this.dst) != null &&
1081 (fn = this.fn) != null &&
1082 (((a = this.src) != null && (r = a.result) != null) ||
1083 ((b = this.snd) != null && (r = b.result) != null)) &&
1084 compareAndSet(0, 1)) {
1085 if (r instanceof AltResult)
1086 ex = ((AltResult)r).ex;
1087 else
1088 ex = null;
1089 Executor e = executor;
1090 if (ex == null) {
1091 try {
1092 if (e != null)
1093 e.execute(new AsyncRun(fn, dst));
1094 else
1095 fn.run();
1096 } catch (Throwable rex) {
1097 ex = rex;
1098 }
1099 }
1100 if (e == null || ex != null)
1101 dst.internalComplete(null, ex);
1102 }
1103 }
1104 private static final long serialVersionUID = 5232453952276885070L;
1105 }
1106
1107 static final class OrCompletion extends Completion {
1108 final CompletableFuture<?> src;
1109 final CompletableFuture<?> snd;
1110 final CompletableFuture<?> dst;
1111 OrCompletion(CompletableFuture<?> src,
1112 CompletableFuture<?> snd,
1113 CompletableFuture<?> dst) {
1114 this.src = src; this.snd = snd; this.dst = dst;
1115 }
1116 public final void run() {
1117 final CompletableFuture<?> a;
1118 final CompletableFuture<?> b;
1119 final CompletableFuture<?> dst;
1120 Object r, t; Throwable ex;
1121 if ((dst = this.dst) != null &&
1122 (((a = this.src) != null && (r = a.result) != null) ||
1123 ((b = this.snd) != null && (r = b.result) != null)) &&
1124 compareAndSet(0, 1)) {
1125 if (r instanceof AltResult) {
1126 ex = ((AltResult)r).ex;
1127 t = null;
1128 }
1129 else {
1130 ex = null;
1131 t = r;
1132 }
1133 dst.internalComplete(t, ex);
1134 }
1135 }
1136 private static final long serialVersionUID = 5232453952276885070L;
1137 }
1138
1139 static final class ExceptionCompletion<T> extends Completion {
1140 final CompletableFuture<? extends T> src;
1141 final Function<? super Throwable, ? extends T> fn;
1142 final CompletableFuture<T> dst;
1143 ExceptionCompletion(CompletableFuture<? extends T> src,
1144 Function<? super Throwable, ? extends T> fn,
1145 CompletableFuture<T> dst) {
1146 this.src = src; this.fn = fn; this.dst = dst;
1147 }
1148 public final void run() {
1149 final CompletableFuture<? extends T> a;
1150 final Function<? super Throwable, ? extends T> fn;
1151 final CompletableFuture<T> dst;
1152 Object r; T t = null; Throwable ex, dx = null;
1153 if ((dst = this.dst) != null &&
1154 (fn = this.fn) != null &&
1155 (a = this.src) != null &&
1156 (r = a.result) != null &&
1157 compareAndSet(0, 1)) {
1158 if ((r instanceof AltResult) &&
1159 (ex = ((AltResult)r).ex) != null) {
1160 try {
1161 t = fn.apply(ex);
1162 } catch (Throwable rex) {
1163 dx = rex;
1164 }
1165 }
1166 else {
1167 @SuppressWarnings("unchecked") T tr = (T) r;
1168 t = tr;
1169 }
1170 dst.internalComplete(t, dx);
1171 }
1172 }
1173 private static final long serialVersionUID = 5232453952276885070L;
1174 }
1175
1176 static final class ThenCopy extends Completion {
1177 final CompletableFuture<?> src;
1178 final CompletableFuture<?> dst;
1179 ThenCopy(CompletableFuture<?> src,
1180 CompletableFuture<?> dst) {
1181 this.src = src; this.dst = dst;
1182 }
1183 public final void run() {
1184 final CompletableFuture<?> a;
1185 final CompletableFuture<?> dst;
1186 Object r; Object t; Throwable ex;
1187 if ((dst = this.dst) != null &&
1188 (a = this.src) != null &&
1189 (r = a.result) != null &&
1190 compareAndSet(0, 1)) {
1191 if (r instanceof AltResult) {
1192 ex = ((AltResult)r).ex;
1193 t = null;
1194 }
1195 else {
1196 ex = null;
1197 t = r;
1198 }
1199 dst.internalComplete(t, ex);
1200 }
1201 }
1202 private static final long serialVersionUID = 5232453952276885070L;
1203 }
1204
1205 static final class HandleCompletion<T,U> extends Completion {
1206 final CompletableFuture<? extends T> src;
1207 final BiFunction<? super T, Throwable, ? extends U> fn;
1208 final CompletableFuture<U> dst;
1209 HandleCompletion(CompletableFuture<? extends T> src,
1210 BiFunction<? super T, Throwable, ? extends U> fn,
1211 final CompletableFuture<U> dst) {
1212 this.src = src; this.fn = fn; this.dst = dst;
1213 }
1214 public final void run() {
1215 final CompletableFuture<? extends T> a;
1216 final BiFunction<? super T, Throwable, ? extends U> fn;
1217 final CompletableFuture<U> dst;
1218 Object r; T t; Throwable ex;
1219 if ((dst = this.dst) != null &&
1220 (fn = this.fn) != null &&
1221 (a = this.src) != null &&
1222 (r = a.result) != null &&
1223 compareAndSet(0, 1)) {
1224 if (r instanceof AltResult) {
1225 ex = ((AltResult)r).ex;
1226 t = null;
1227 }
1228 else {
1229 ex = null;
1230 @SuppressWarnings("unchecked") T tr = (T) r;
1231 t = tr;
1232 }
1233 U u = null; Throwable dx = null;
1234 try {
1235 u = fn.apply(t, ex);
1236 } catch (Throwable rex) {
1237 dx = rex;
1238 }
1239 dst.internalComplete(u, dx);
1240 }
1241 }
1242 private static final long serialVersionUID = 5232453952276885070L;
1243 }
1244
1245 static final class ComposeCompletion<T,U> extends Completion {
1246 final CompletableFuture<? extends T> src;
1247 final Function<? super T, CompletableFuture<U>> fn;
1248 final CompletableFuture<U> dst;
1249 final Executor executor;
1250 ComposeCompletion(CompletableFuture<? extends T> src,
1251 Function<? super T, CompletableFuture<U>> fn,
1252 final CompletableFuture<U> dst, Executor executor) {
1253 this.src = src; this.fn = fn; this.dst = dst;
1254 this.executor = executor;
1255 }
1256 public final void run() {
1257 final CompletableFuture<? extends T> a;
1258 final Function<? super T, CompletableFuture<U>> fn;
1259 final CompletableFuture<U> dst;
1260 Object r; T t; Throwable ex; Executor e;
1261 if ((dst = this.dst) != null &&
1262 (fn = this.fn) != null &&
1263 (a = this.src) != null &&
1264 (r = a.result) != null &&
1265 compareAndSet(0, 1)) {
1266 if (r instanceof AltResult) {
1267 ex = ((AltResult)r).ex;
1268 t = null;
1269 }
1270 else {
1271 ex = null;
1272 @SuppressWarnings("unchecked") T tr = (T) r;
1273 t = tr;
1274 }
1275 CompletableFuture<U> c = null;
1276 U u = null;
1277 boolean complete = false;
1278 if (ex == null) {
1279 if ((e = executor) != null)
1280 e.execute(new AsyncCompose<T,U>(t, fn, dst));
1281 else {
1282 try {
1283 if ((c = fn.apply(t)) == null)
1284 ex = new NullPointerException();
1285 } catch (Throwable rex) {
1286 ex = rex;
1287 }
1288 }
1289 }
1290 if (c != null) {
1291 ThenCopy d = null;
1292 Object s;
1293 if ((s = c.result) == null) {
1294 CompletionNode p = new CompletionNode
1295 (d = new ThenCopy(c, dst));
1296 while ((s = c.result) == null) {
1297 if (UNSAFE.compareAndSwapObject
1298 (c, COMPLETIONS, p.next = c.completions, p))
1299 break;
1300 }
1301 }
1302 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1303 complete = true;
1304 if (s instanceof AltResult) {
1305 ex = ((AltResult)s).ex; // no rewrap
1306 u = null;
1307 }
1308 else {
1309 @SuppressWarnings("unchecked") U us = (U) s;
1310 u = us;
1311 }
1312 }
1313 }
1314 if (complete || ex != null)
1315 dst.internalComplete(u, ex);
1316 if (c != null)
1317 c.helpPostComplete();
1318 }
1319 }
1320 private static final long serialVersionUID = 5232453952276885070L;
1321 }
1322
1323 // public methods
1324
1325 /**
1326 * Creates a new incomplete CompletableFuture.
1327 */
1328 public CompletableFuture() {
1329 }
1330
1331 /**
1332 * Asynchronously executes in the {@link
1333 * ForkJoinPool#commonPool()}, a task that completes the returned
1334 * CompletableFuture with the result of the given Supplier.
1335 *
1336 * @param supplier a function returning the value to be used
1337 * to complete the returned CompletableFuture
1338 * @return the CompletableFuture
1339 */
1340 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1341 if (supplier == null) throw new NullPointerException();
1342 CompletableFuture<U> f = new CompletableFuture<U>();
1343 ForkJoinPool.commonPool().
1344 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1345 return f;
1346 }
1347
1348 /**
1349 * Asynchronously executes using the given executor, a task that
1350 * completes the returned CompletableFuture with the result of the
1351 * given Supplier.
1352 *
1353 * @param supplier a function returning the value to be used
1354 * to complete the returned CompletableFuture
1355 * @param executor the executor to use for asynchronous execution
1356 * @return the CompletableFuture
1357 */
1358 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1359 Executor executor) {
1360 if (executor == null || supplier == null)
1361 throw new NullPointerException();
1362 CompletableFuture<U> f = new CompletableFuture<U>();
1363 executor.execute(new AsyncSupply<U>(supplier, f));
1364 return f;
1365 }
1366
1367 /**
1368 * Asynchronously executes in the {@link
1369 * ForkJoinPool#commonPool()} a task that runs the given action,
1370 * and then completes the returned CompletableFuture.
1371 *
1372 * @param runnable the action to run before completing the
1373 * returned CompletableFuture
1374 * @return the CompletableFuture
1375 */
1376 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1377 if (runnable == null) throw new NullPointerException();
1378 CompletableFuture<Void> f = new CompletableFuture<Void>();
1379 ForkJoinPool.commonPool().
1380 execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1381 return f;
1382 }
1383
1384 /**
1385 * Asynchronously executes using the given executor, a task that
1386 * runs the given action, and then completes the returned
1387 * CompletableFuture.
1388 *
1389 * @param runnable the action to run before completing the
1390 * returned CompletableFuture
1391 * @param executor the executor to use for asynchronous execution
1392 * @return the CompletableFuture
1393 */
1394 public static CompletableFuture<Void> runAsync(Runnable runnable,
1395 Executor executor) {
1396 if (executor == null || runnable == null)
1397 throw new NullPointerException();
1398 CompletableFuture<Void> f = new CompletableFuture<Void>();
1399 executor.execute(new AsyncRun(runnable, f));
1400 return f;
1401 }
1402
1403 /**
1404 * Returns {@code true} if completed in any fashion: normally,
1405 * exceptionally, or via cancellation.
1406 *
1407 * @return {@code true} if completed
1408 */
1409 public boolean isDone() {
1410 return result != null;
1411 }
1412
1413 /**
1414 * Waits if necessary for this future to complete, and then
1415 * returns its result.
1416 *
1417 * @return the result value
1418 * @throws CancellationException if this future was cancelled
1419 * @throws ExecutionException if this future completed exceptionally
1420 * @throws InterruptedException if the current thread was interrupted
1421 * while waiting
1422 */
1423 public T get() throws InterruptedException, ExecutionException {
1424 Object r; Throwable ex, cause;
1425 if ((r = result) == null && (r = waitingGet(true)) == null)
1426 throw new InterruptedException();
1427 if (!(r instanceof AltResult)) {
1428 @SuppressWarnings("unchecked") T tr = (T) r;
1429 return tr;
1430 }
1431 if ((ex = ((AltResult)r).ex) == null)
1432 return null;
1433 if (ex instanceof CancellationException)
1434 throw (CancellationException)ex;
1435 if ((ex instanceof CompletionException) &&
1436 (cause = ex.getCause()) != null)
1437 ex = cause;
1438 throw new ExecutionException(ex);
1439 }
1440
1441 /**
1442 * Waits if necessary for at most the given time for this future
1443 * to complete, and then returns its result, if available.
1444 *
1445 * @param timeout the maximum time to wait
1446 * @param unit the time unit of the timeout argument
1447 * @return the result value
1448 * @throws CancellationException if this future was cancelled
1449 * @throws ExecutionException if this future completed exceptionally
1450 * @throws InterruptedException if the current thread was interrupted
1451 * while waiting
1452 * @throws TimeoutException if the wait timed out
1453 */
1454 public T get(long timeout, TimeUnit unit)
1455 throws InterruptedException, ExecutionException, TimeoutException {
1456 Object r; Throwable ex, cause;
1457 long nanos = unit.toNanos(timeout);
1458 if (Thread.interrupted())
1459 throw new InterruptedException();
1460 if ((r = result) == null)
1461 r = timedAwaitDone(nanos);
1462 if (!(r instanceof AltResult)) {
1463 @SuppressWarnings("unchecked") T tr = (T) r;
1464 return tr;
1465 }
1466 if ((ex = ((AltResult)r).ex) == null)
1467 return null;
1468 if (ex instanceof CancellationException)
1469 throw (CancellationException)ex;
1470 if ((ex instanceof CompletionException) &&
1471 (cause = ex.getCause()) != null)
1472 ex = cause;
1473 throw new ExecutionException(ex);
1474 }
1475
1476 /**
1477 * Returns the result value when complete, or throws an
1478 * (unchecked) exception if completed exceptionally. To better
1479 * conform with the use of common functional forms, if a
1480 * computation involved in the completion of this
1481 * CompletableFuture threw an exception, this method throws an
1482 * (unchecked) {@link CompletionException} with the underlying
1483 * exception as its cause.
1484 *
1485 * @return the result value
1486 * @throws CancellationException if the computation was cancelled
1487 * @throws CompletionException if a completion computation threw
1488 * an exception
1489 */
1490 public T join() {
1491 Object r; Throwable ex;
1492 if ((r = result) == null)
1493 r = waitingGet(false);
1494 if (!(r instanceof AltResult)) {
1495 @SuppressWarnings("unchecked") T tr = (T) r;
1496 return tr;
1497 }
1498 if ((ex = ((AltResult)r).ex) == null)
1499 return null;
1500 if (ex instanceof CancellationException)
1501 throw (CancellationException)ex;
1502 if (ex instanceof CompletionException)
1503 throw (CompletionException)ex;
1504 throw new CompletionException(ex);
1505 }
1506
1507 /**
1508 * Returns the result value (or throws any encountered exception)
1509 * if completed, else returns the given valueIfAbsent.
1510 *
1511 * @param valueIfAbsent the value to return if not completed
1512 * @return the result value, if completed, else the given valueIfAbsent
1513 * @throws CancellationException if the computation was cancelled
1514 * @throws CompletionException if a completion computation threw
1515 * an exception
1516 */
1517 public T getNow(T valueIfAbsent) {
1518 Object r; Throwable ex;
1519 if ((r = result) == null)
1520 return valueIfAbsent;
1521 if (!(r instanceof AltResult)) {
1522 @SuppressWarnings("unchecked") T tr = (T) r;
1523 return tr;
1524 }
1525 if ((ex = ((AltResult)r).ex) == null)
1526 return null;
1527 if (ex instanceof CancellationException)
1528 throw (CancellationException)ex;
1529 if (ex instanceof CompletionException)
1530 throw (CompletionException)ex;
1531 throw new CompletionException(ex);
1532 }
1533
1534 /**
1535 * If not already completed, sets the value returned by {@link
1536 * #get()} and related methods to the given value.
1537 *
1538 * @param value the result value
1539 * @return {@code true} if this invocation caused this CompletableFuture
1540 * to transition to a completed state, else {@code false}
1541 */
1542 public boolean complete(T value) {
1543 boolean triggered = result == null &&
1544 UNSAFE.compareAndSwapObject(this, RESULT, null,
1545 value == null ? NIL : value);
1546 postComplete();
1547 return triggered;
1548 }
1549
1550 /**
1551 * If not already completed, causes invocations of {@link #get()}
1552 * and related methods to throw the given exception.
1553 *
1554 * @param ex the exception
1555 * @return {@code true} if this invocation caused this CompletableFuture
1556 * to transition to a completed state, else {@code false}
1557 */
1558 public boolean completeExceptionally(Throwable ex) {
1559 if (ex == null) throw new NullPointerException();
1560 boolean triggered = result == null &&
1561 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1562 postComplete();
1563 return triggered;
1564 }
1565
1566 /**
1567 * Creates and returns a CompletableFuture that is completed with
1568 * the result of the given function of this CompletableFuture.
1569 * If this CompletableFuture completes exceptionally,
1570 * then the returned CompletableFuture also does so,
1571 * with a CompletionException holding this exception as
1572 * its cause.
1573 *
1574 * @param fn the function to use to compute the value of
1575 * the returned CompletableFuture
1576 * @return the new CompletableFuture
1577 */
1578 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
1579 return doThenApply(fn, null);
1580 }
1581
1582 /**
1583 * Creates and returns a CompletableFuture that is asynchronously
1584 * completed using the {@link ForkJoinPool#commonPool()} with the
1585 * result of the given function of this CompletableFuture. If
1586 * this CompletableFuture completes exceptionally, then the
1587 * returned CompletableFuture also does so, with a
1588 * CompletionException holding this exception as its cause.
1589 *
1590 * @param fn the function to use to compute the value of
1591 * the returned CompletableFuture
1592 * @return the new CompletableFuture
1593 */
1594 public <U> CompletableFuture<U> thenApplyAsync
1595 (Function<? super T,? extends U> fn) {
1596 return doThenApply(fn, ForkJoinPool.commonPool());
1597 }
1598
1599 /**
1600 * Creates and returns a CompletableFuture that is asynchronously
1601 * completed using the given executor with the result of the given
1602 * function of this CompletableFuture. If this CompletableFuture
1603 * completes exceptionally, then the returned CompletableFuture
1604 * also does so, with a CompletionException holding this exception as
1605 * its cause.
1606 *
1607 * @param fn the function to use to compute the value of
1608 * the returned CompletableFuture
1609 * @param executor the executor to use for asynchronous execution
1610 * @return the new CompletableFuture
1611 */
1612 public <U> CompletableFuture<U> thenApplyAsync
1613 (Function<? super T,? extends U> fn,
1614 Executor executor) {
1615 if (executor == null) throw new NullPointerException();
1616 return doThenApply(fn, executor);
1617 }
1618
1619 private <U> CompletableFuture<U> doThenApply
1620 (Function<? super T,? extends U> fn,
1621 Executor e) {
1622 if (fn == null) throw new NullPointerException();
1623 CompletableFuture<U> dst = new CompletableFuture<U>();
1624 ApplyCompletion<T,U> d = null;
1625 Object r;
1626 if ((r = result) == null) {
1627 CompletionNode p = new CompletionNode
1628 (d = new ApplyCompletion<T,U>(this, fn, dst, e));
1629 while ((r = result) == null) {
1630 if (UNSAFE.compareAndSwapObject
1631 (this, COMPLETIONS, p.next = completions, p))
1632 break;
1633 }
1634 }
1635 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1636 T t; Throwable ex;
1637 if (r instanceof AltResult) {
1638 ex = ((AltResult)r).ex;
1639 t = null;
1640 }
1641 else {
1642 ex = null;
1643 @SuppressWarnings("unchecked") T tr = (T) r;
1644 t = tr;
1645 }
1646 U u = null;
1647 if (ex == null) {
1648 try {
1649 if (e != null)
1650 e.execute(new AsyncApply<T,U>(t, fn, dst));
1651 else
1652 u = fn.apply(t);
1653 } catch (Throwable rex) {
1654 ex = rex;
1655 }
1656 }
1657 if (e == null || ex != null)
1658 dst.internalComplete(u, ex);
1659 }
1660 helpPostComplete();
1661 return dst;
1662 }
1663
1664 /**
1665 * Creates and returns a CompletableFuture that is completed after
1666 * performing the given action with this CompletableFuture's
1667 * result when it completes. If this CompletableFuture
1668 * completes exceptionally, then the returned CompletableFuture
1669 * also does so, with a CompletionException holding this exception as
1670 * its cause.
1671 *
1672 * @param block the action to perform before completing the
1673 * returned CompletableFuture
1674 * @return the new CompletableFuture
1675 */
1676 public CompletableFuture<Void> thenAccept(Consumer<? super T> block) {
1677 return doThenAccept(block, null);
1678 }
1679
1680 /**
1681 * Creates and returns a CompletableFuture that is asynchronously
1682 * completed using the {@link ForkJoinPool#commonPool()} with this
1683 * CompletableFuture's result when it completes. If this
1684 * CompletableFuture completes exceptionally, then the returned
1685 * CompletableFuture also does so, with a CompletionException holding
1686 * this exception as its cause.
1687 *
1688 * @param block the action to perform before completing the
1689 * returned CompletableFuture
1690 * @return the new CompletableFuture
1691 */
1692 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block) {
1693 return doThenAccept(block, ForkJoinPool.commonPool());
1694 }
1695
1696 /**
1697 * Creates and returns a CompletableFuture that is asynchronously
1698 * completed using the given executor with this
1699 * CompletableFuture's result when it completes. If this
1700 * CompletableFuture completes exceptionally, then the returned
1701 * CompletableFuture also does so, with a CompletionException holding
1702 * this exception as its cause.
1703 *
1704 * @param block the action to perform before completing the
1705 * returned CompletableFuture
1706 * @param executor the executor to use for asynchronous execution
1707 * @return the new CompletableFuture
1708 */
1709 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block,
1710 Executor executor) {
1711 if (executor == null) throw new NullPointerException();
1712 return doThenAccept(block, executor);
1713 }
1714
1715 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1716 Executor e) {
1717 if (fn == null) throw new NullPointerException();
1718 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1719 AcceptCompletion<T> d = null;
1720 Object r;
1721 if ((r = result) == null) {
1722 CompletionNode p = new CompletionNode
1723 (d = new AcceptCompletion<T>(this, fn, dst, e));
1724 while ((r = result) == null) {
1725 if (UNSAFE.compareAndSwapObject
1726 (this, COMPLETIONS, p.next = completions, p))
1727 break;
1728 }
1729 }
1730 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1731 T t; Throwable ex;
1732 if (r instanceof AltResult) {
1733 ex = ((AltResult)r).ex;
1734 t = null;
1735 }
1736 else {
1737 ex = null;
1738 @SuppressWarnings("unchecked") T tr = (T) r;
1739 t = tr;
1740 }
1741 if (ex == null) {
1742 try {
1743 if (e != null)
1744 e.execute(new AsyncAccept<T>(t, fn, dst));
1745 else
1746 fn.accept(t);
1747 } catch (Throwable rex) {
1748 ex = rex;
1749 }
1750 }
1751 if (e == null || ex != null)
1752 dst.internalComplete(null, ex);
1753 }
1754 helpPostComplete();
1755 return dst;
1756 }
1757
1758 /**
1759 * Creates and returns a CompletableFuture that is completed after
1760 * performing the given action when this CompletableFuture
1761 * completes. If this CompletableFuture completes exceptionally,
1762 * then the returned CompletableFuture also does so, with a
1763 * CompletionException holding this exception as its cause.
1764 *
1765 * @param action the action to perform before completing the
1766 * returned CompletableFuture
1767 * @return the new CompletableFuture
1768 */
1769 public CompletableFuture<Void> thenRun(Runnable action) {
1770 return doThenRun(action, null);
1771 }
1772
1773 /**
1774 * Creates and returns a CompletableFuture that is asynchronously
1775 * completed using the {@link ForkJoinPool#commonPool()} after
1776 * performing the given action when this CompletableFuture
1777 * completes. If this CompletableFuture completes exceptionally,
1778 * then the returned CompletableFuture also does so, with a
1779 * CompletionException holding this exception as its cause.
1780 *
1781 * @param action the action to perform before completing the
1782 * returned CompletableFuture
1783 * @return the new CompletableFuture
1784 */
1785 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1786 return doThenRun(action, ForkJoinPool.commonPool());
1787 }
1788
1789 /**
1790 * Creates and returns a CompletableFuture that is asynchronously
1791 * completed using the given executor after performing the given
1792 * action when this CompletableFuture completes. If this
1793 * CompletableFuture completes exceptionally, then the returned
1794 * CompletableFuture also does so, with a CompletionException holding
1795 * this exception as its cause.
1796 *
1797 * @param action the action to perform before completing the
1798 * returned CompletableFuture
1799 * @param executor the executor to use for asynchronous execution
1800 * @return the new CompletableFuture
1801 */
1802 public CompletableFuture<Void> thenRunAsync(Runnable action,
1803 Executor executor) {
1804 if (executor == null) throw new NullPointerException();
1805 return doThenRun(action, executor);
1806 }
1807
1808 private CompletableFuture<Void> doThenRun(Runnable action,
1809 Executor e) {
1810 if (action == null) throw new NullPointerException();
1811 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1812 RunCompletion<T> d = null;
1813 Object r;
1814 if ((r = result) == null) {
1815 CompletionNode p = new CompletionNode
1816 (d = new RunCompletion<T>(this, action, dst, e));
1817 while ((r = result) == null) {
1818 if (UNSAFE.compareAndSwapObject
1819 (this, COMPLETIONS, p.next = completions, p))
1820 break;
1821 }
1822 }
1823 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1824 Throwable ex;
1825 if (r instanceof AltResult)
1826 ex = ((AltResult)r).ex;
1827 else
1828 ex = null;
1829 if (ex == null) {
1830 try {
1831 if (e != null)
1832 e.execute(new AsyncRun(action, dst));
1833 else
1834 action.run();
1835 } catch (Throwable rex) {
1836 ex = rex;
1837 }
1838 }
1839 if (e == null || ex != null)
1840 dst.internalComplete(null, ex);
1841 }
1842 helpPostComplete();
1843 return dst;
1844 }
1845
1846 /**
1847 * Creates and returns a CompletableFuture that is completed with
1848 * the result of the given function of this and the other given
1849 * CompletableFuture's results when both complete. If this or
1850 * the other CompletableFuture complete exceptionally, then the
1851 * returned CompletableFuture also does so, with a
1852 * CompletionException holding the exception as its cause.
1853 *
1854 * @param other the other CompletableFuture
1855 * @param fn the function to use to compute the value of
1856 * the returned CompletableFuture
1857 * @return the new CompletableFuture
1858 */
1859 public <U,V> CompletableFuture<V> thenCombine
1860 (CompletableFuture<? extends U> other,
1861 BiFunction<? super T,? super U,? extends V> fn) {
1862 return doThenBiApply(other, fn, null);
1863 }
1864
1865 /**
1866 * Creates and returns a CompletableFuture that is asynchronously
1867 * completed using the {@link ForkJoinPool#commonPool()} with
1868 * the result of the given function of this and the other given
1869 * CompletableFuture's results when both complete. If this or
1870 * the other CompletableFuture complete exceptionally, then the
1871 * returned CompletableFuture also does so, with a
1872 * CompletionException holding the exception as its cause.
1873 *
1874 * @param other the other CompletableFuture
1875 * @param fn the function to use to compute the value of
1876 * the returned CompletableFuture
1877 * @return the new CompletableFuture
1878 */
1879 public <U,V> CompletableFuture<V> thenCombineAsync
1880 (CompletableFuture<? extends U> other,
1881 BiFunction<? super T,? super U,? extends V> fn) {
1882 return doThenBiApply(other, fn, ForkJoinPool.commonPool());
1883 }
1884
1885 /**
1886 * Creates and returns a CompletableFuture that is
1887 * asynchronously completed using the given executor with the
1888 * result of the given function of this and the other given
1889 * CompletableFuture's results when both complete. If this or
1890 * the other CompletableFuture complete exceptionally, then the
1891 * returned CompletableFuture also does so, with a
1892 * CompletionException holding the exception as its cause.
1893 *
1894 * @param other the other CompletableFuture
1895 * @param fn the function to use to compute the value of
1896 * the returned CompletableFuture
1897 * @param executor the executor to use for asynchronous execution
1898 * @return the new CompletableFuture
1899 */
1900
1901 public <U,V> CompletableFuture<V> thenCombineAsync
1902 (CompletableFuture<? extends U> other,
1903 BiFunction<? super T,? super U,? extends V> fn,
1904 Executor executor) {
1905 if (executor == null) throw new NullPointerException();
1906 return doThenBiApply(other, fn, executor);
1907 }
1908
1909 private <U,V> CompletableFuture<V> doThenBiApply
1910 (CompletableFuture<? extends U> other,
1911 BiFunction<? super T,? super U,? extends V> fn,
1912 Executor e) {
1913 if (other == null || fn == null) throw new NullPointerException();
1914 CompletableFuture<V> dst = new CompletableFuture<V>();
1915 BiApplyCompletion<T,U,V> d = null;
1916 Object r, s = null;
1917 if ((r = result) == null || (s = other.result) == null) {
1918 d = new BiApplyCompletion<T,U,V>(this, other, fn, dst, e);
1919 CompletionNode q = null, p = new CompletionNode(d);
1920 while ((r == null && (r = result) == null) ||
1921 (s == null && (s = other.result) == null)) {
1922 if (q != null) {
1923 if (s != null ||
1924 UNSAFE.compareAndSwapObject
1925 (other, COMPLETIONS, q.next = other.completions, q))
1926 break;
1927 }
1928 else if (r != null ||
1929 UNSAFE.compareAndSwapObject
1930 (this, COMPLETIONS, p.next = completions, p)) {
1931 if (s != null)
1932 break;
1933 q = new CompletionNode(d);
1934 }
1935 }
1936 }
1937 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1938 T t; U u; Throwable ex;
1939 if (r instanceof AltResult) {
1940 ex = ((AltResult)r).ex;
1941 t = null;
1942 }
1943 else {
1944 ex = null;
1945 @SuppressWarnings("unchecked") T tr = (T) r;
1946 t = tr;
1947 }
1948 if (ex != null)
1949 u = null;
1950 else if (s instanceof AltResult) {
1951 ex = ((AltResult)s).ex;
1952 u = null;
1953 }
1954 else {
1955 @SuppressWarnings("unchecked") U us = (U) s;
1956 u = us;
1957 }
1958 V v = null;
1959 if (ex == null) {
1960 try {
1961 if (e != null)
1962 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
1963 else
1964 v = fn.apply(t, u);
1965 } catch (Throwable rex) {
1966 ex = rex;
1967 }
1968 }
1969 if (e == null || ex != null)
1970 dst.internalComplete(v, ex);
1971 }
1972 helpPostComplete();
1973 other.helpPostComplete();
1974 return dst;
1975 }
1976
1977 /**
1978 * Creates and returns a CompletableFuture that is completed with
1979 * the results of this and the other given CompletableFuture if
1980 * both complete. If this and/or the other CompletableFuture
1981 * complete exceptionally, then the returned CompletableFuture
1982 * also does so, with a CompletionException holding one of these
1983 * exceptions as its cause.
1984 *
1985 * @param other the other CompletableFuture
1986 * @param block the action to perform before completing the
1987 * returned CompletableFuture
1988 * @return the new CompletableFuture
1989 */
1990 public <U> CompletableFuture<Void> thenAcceptBoth
1991 (CompletableFuture<? extends U> other,
1992 BiConsumer<? super T, ? super U> block) {
1993 return doThenBiAccept(other, block, null);
1994 }
1995
1996 /**
1997 * Creates and returns a CompletableFuture that is completed
1998 * asynchronously using the {@link ForkJoinPool#commonPool()} with
1999 * the results of this and the other given CompletableFuture when
2000 * both complete. If this and/or the other CompletableFuture
2001 * complete exceptionally, then the returned CompletableFuture
2002 * also does so, with a CompletionException holding one of these
2003 * exceptions as its cause.
2004 *
2005 * @param other the other CompletableFuture
2006 * @param block the action to perform before completing the
2007 * returned CompletableFuture
2008 * @return the new CompletableFuture
2009 */
2010 public <U> CompletableFuture<Void> thenAcceptBothAsync
2011 (CompletableFuture<? extends U> other,
2012 BiConsumer<? super T, ? super U> block) {
2013 return doThenBiAccept(other, block, ForkJoinPool.commonPool());
2014 }
2015
2016 /**
2017 * Creates and returns a CompletableFuture that is completed
2018 * asynchronously using the given executor with the results of
2019 * this and the other given CompletableFuture when both complete.
2020 * If this and/or the other CompletableFuture complete exceptionally,
2021 * then the returned CompletableFuture also does so, with a
2022 * CompletionException holding one of these exceptions as its cause.
2023 *
2024 * @param other the other CompletableFuture
2025 * @param block the action to perform before completing the
2026 * returned CompletableFuture
2027 * @param executor the executor to use for asynchronous execution
2028 * @return the new CompletableFuture
2029 */
2030 public <U> CompletableFuture<Void> thenAcceptBothAsync
2031 (CompletableFuture<? extends U> other,
2032 BiConsumer<? super T, ? super U> block,
2033 Executor executor) {
2034 if (executor == null) throw new NullPointerException();
2035 return doThenBiAccept(other, block, executor);
2036 }
2037
2038 private <U> CompletableFuture<Void> doThenBiAccept
2039 (CompletableFuture<? extends U> other,
2040 BiConsumer<? super T,? super U> fn,
2041 Executor e) {
2042 if (other == null || fn == null) throw new NullPointerException();
2043 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2044 BiAcceptCompletion<T,U> d = null;
2045 Object r, s = null;
2046 if ((r = result) == null || (s = other.result) == null) {
2047 d = new BiAcceptCompletion<T,U>(this, other, fn, dst, e);
2048 CompletionNode q = null, p = new CompletionNode(d);
2049 while ((r == null && (r = result) == null) ||
2050 (s == null && (s = other.result) == null)) {
2051 if (q != null) {
2052 if (s != null ||
2053 UNSAFE.compareAndSwapObject
2054 (other, COMPLETIONS, q.next = other.completions, q))
2055 break;
2056 }
2057 else if (r != null ||
2058 UNSAFE.compareAndSwapObject
2059 (this, COMPLETIONS, p.next = completions, p)) {
2060 if (s != null)
2061 break;
2062 q = new CompletionNode(d);
2063 }
2064 }
2065 }
2066 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2067 T t; U u; Throwable ex;
2068 if (r instanceof AltResult) {
2069 ex = ((AltResult)r).ex;
2070 t = null;
2071 }
2072 else {
2073 ex = null;
2074 @SuppressWarnings("unchecked") T tr = (T) r;
2075 t = tr;
2076 }
2077 if (ex != null)
2078 u = null;
2079 else if (s instanceof AltResult) {
2080 ex = ((AltResult)s).ex;
2081 u = null;
2082 }
2083 else {
2084 @SuppressWarnings("unchecked") U us = (U) s;
2085 u = us;
2086 }
2087 if (ex == null) {
2088 try {
2089 if (e != null)
2090 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
2091 else
2092 fn.accept(t, u);
2093 } catch (Throwable rex) {
2094 ex = rex;
2095 }
2096 }
2097 if (e == null || ex != null)
2098 dst.internalComplete(null, ex);
2099 }
2100 helpPostComplete();
2101 other.helpPostComplete();
2102 return dst;
2103 }
2104
2105 /**
2106 * Creates and returns a CompletableFuture that is completed when
2107 * this and the other given CompletableFuture both complete.
2108 * If this and/or the other CompletableFuture complete exceptionally,
2109 * then the returned CompletableFuture also does so, with a
2110 * CompletionException holding one of these exceptions as its cause.
2111 *
2112 * @param other the other CompletableFuture
2113 * @param action the action to perform before completing the
2114 * returned CompletableFuture
2115 * @return the new CompletableFuture
2116 */
2117 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2118 Runnable action) {
2119 return doThenBiRun(other, action, null);
2120 }
2121
2122 /**
2123 * Creates and returns a CompletableFuture that is completed
2124 * asynchronously using the {@link ForkJoinPool#commonPool()}
2125 * when this and the other given CompletableFuture both complete.
2126 * If this and/or the other CompletableFuture complete exceptionally,
2127 * then the returned CompletableFuture also does so, with a
2128 * CompletionException holding one of these exceptions as its cause.
2129 *
2130 * @param other the other CompletableFuture
2131 * @param action the action to perform before completing the
2132 * returned CompletableFuture
2133 * @return the new CompletableFuture
2134 */
2135 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2136 Runnable action) {
2137 return doThenBiRun(other, action, ForkJoinPool.commonPool());
2138 }
2139
2140 /**
2141 * Creates and returns a CompletableFuture that is completed
2142 * asynchronously using the given executor when this and the
2143 * other given CompletableFuture both complete.
2144 * If this and/or the other CompletableFuture complete exceptionally,
2145 * then the returned CompletableFuture also does so, with a
2146 * CompletionException holding one of these exceptions as its cause.
2147 *
2148 * @param other the other CompletableFuture
2149 * @param action the action to perform before completing the
2150 * returned CompletableFuture
2151 * @param executor the executor to use for asynchronous execution
2152 * @return the new CompletableFuture
2153 */
2154 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2155 Runnable action,
2156 Executor executor) {
2157 if (executor == null) throw new NullPointerException();
2158 return doThenBiRun(other, action, executor);
2159 }
2160
2161 private CompletableFuture<Void> doThenBiRun(CompletableFuture<?> other,
2162 Runnable action,
2163 Executor e) {
2164 if (other == null || action == null) throw new NullPointerException();
2165 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2166 BiRunCompletion<T> d = null;
2167 Object r, s = null;
2168 if ((r = result) == null || (s = other.result) == null) {
2169 d = new BiRunCompletion<T>(this, other, action, dst, e);
2170 CompletionNode q = null, p = new CompletionNode(d);
2171 while ((r == null && (r = result) == null) ||
2172 (s == null && (s = other.result) == null)) {
2173 if (q != null) {
2174 if (s != null ||
2175 UNSAFE.compareAndSwapObject
2176 (other, COMPLETIONS, q.next = other.completions, q))
2177 break;
2178 }
2179 else if (r != null ||
2180 UNSAFE.compareAndSwapObject
2181 (this, COMPLETIONS, p.next = completions, p)) {
2182 if (s != null)
2183 break;
2184 q = new CompletionNode(d);
2185 }
2186 }
2187 }
2188 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2189 Throwable ex;
2190 if (r instanceof AltResult)
2191 ex = ((AltResult)r).ex;
2192 else
2193 ex = null;
2194 if (ex == null && (s instanceof AltResult))
2195 ex = ((AltResult)s).ex;
2196 if (ex == null) {
2197 try {
2198 if (e != null)
2199 e.execute(new AsyncRun(action, dst));
2200 else
2201 action.run();
2202 } catch (Throwable rex) {
2203 ex = rex;
2204 }
2205 }
2206 if (e == null || ex != null)
2207 dst.internalComplete(null, ex);
2208 }
2209 helpPostComplete();
2210 other.helpPostComplete();
2211 return dst;
2212 }
2213
2214 /**
2215 * Creates and returns a CompletableFuture that is completed with
2216 * the result of the given function of either this or the other
2217 * given CompletableFuture's results when either complete.
2218 * If this and/or the other CompletableFuture complete exceptionally,
2219 * then the returned CompletableFuture may also do so, with a
2220 * CompletionException holding one of these exceptions as its cause.
2221 * No guarantees are made about which result or exception is used
2222 * in the returned CompletableFuture.
2223 *
2224 * @param other the other CompletableFuture
2225 * @param fn the function to use to compute the value of
2226 * the returned CompletableFuture
2227 * @return the new CompletableFuture
2228 */
2229 public <U> CompletableFuture<U> applyToEither
2230 (CompletableFuture<? extends T> other,
2231 Function<? super T, U> fn) {
2232 return doOrApply(other, fn, null);
2233 }
2234
2235 /**
2236 * Creates and returns a CompletableFuture that is completed
2237 * asynchronously using the {@link ForkJoinPool#commonPool()} with
2238 * the result of the given function of either this or the other
2239 * given CompletableFuture's results when either complete.
2240 * If this and/or the other CompletableFuture complete exceptionally,
2241 * then the returned CompletableFuture may also do so, with a
2242 * CompletionException holding one of these exceptions as its cause.
2243 * No guarantees are made about which result or exception is used
2244 * in the returned CompletableFuture.
2245 *
2246 * @param other the other CompletableFuture
2247 * @param fn the function to use to compute the value of
2248 * the returned CompletableFuture
2249 * @return the new CompletableFuture
2250 */
2251 public <U> CompletableFuture<U> applyToEitherAsync
2252 (CompletableFuture<? extends T> other,
2253 Function<? super T, U> fn) {
2254 return doOrApply(other, fn, ForkJoinPool.commonPool());
2255 }
2256
2257 /**
2258 * Creates and returns a CompletableFuture that is completed
2259 * asynchronously using the given executor with the result of the
2260 * given function of either this or the other given
2261 * CompletableFuture's results when either complete. If this
2262 * and/or the other CompletableFuture complete exceptionally, then
2263 * the returned CompletableFuture may also do so, with a
2264 * CompletionException holding one of these exceptions as its cause.
2265 * No guarantees are made about which result or exception is used
2266 * in the returned CompletableFuture.
2267 *
2268 * @param other the other CompletableFuture
2269 * @param fn the function to use to compute the value of
2270 * the returned CompletableFuture
2271 * @param executor the executor to use for asynchronous execution
2272 * @return the new CompletableFuture
2273 */
2274 public <U> CompletableFuture<U> applyToEitherAsync
2275 (CompletableFuture<? extends T> other,
2276 Function<? super T, U> fn,
2277 Executor executor) {
2278 if (executor == null) throw new NullPointerException();
2279 return doOrApply(other, fn, executor);
2280 }
2281
2282 private <U> CompletableFuture<U> doOrApply
2283 (CompletableFuture<? extends T> other,
2284 Function<? super T, U> fn,
2285 Executor e) {
2286 if (other == null || fn == null) throw new NullPointerException();
2287 CompletableFuture<U> dst = new CompletableFuture<U>();
2288 OrApplyCompletion<T,U> d = null;
2289 Object r;
2290 if ((r = result) == null && (r = other.result) == null) {
2291 d = new OrApplyCompletion<T,U>(this, other, fn, dst, e);
2292 CompletionNode q = null, p = new CompletionNode(d);
2293 while ((r = result) == null && (r = other.result) == null) {
2294 if (q != null) {
2295 if (UNSAFE.compareAndSwapObject
2296 (other, COMPLETIONS, q.next = other.completions, q))
2297 break;
2298 }
2299 else if (UNSAFE.compareAndSwapObject
2300 (this, COMPLETIONS, p.next = completions, p))
2301 q = new CompletionNode(d);
2302 }
2303 }
2304 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2305 T t; Throwable ex;
2306 if (r instanceof AltResult) {
2307 ex = ((AltResult)r).ex;
2308 t = null;
2309 }
2310 else {
2311 ex = null;
2312 @SuppressWarnings("unchecked") T tr = (T) r;
2313 t = tr;
2314 }
2315 U u = null;
2316 if (ex == null) {
2317 try {
2318 if (e != null)
2319 e.execute(new AsyncApply<T,U>(t, fn, dst));
2320 else
2321 u = fn.apply(t);
2322 } catch (Throwable rex) {
2323 ex = rex;
2324 }
2325 }
2326 if (e == null || ex != null)
2327 dst.internalComplete(u, ex);
2328 }
2329 helpPostComplete();
2330 other.helpPostComplete();
2331 return dst;
2332 }
2333
2334 /**
2335 * Creates and returns a CompletableFuture that is completed after
2336 * performing the given action with the result of either this or the
2337 * other given CompletableFuture's result, when either complete.
2338 * If this and/or the other CompletableFuture complete exceptionally,
2339 * then the returned CompletableFuture may also do so, with a
2340 * CompletionException holding one of these exceptions as its cause.
2341 * No guarantees are made about which exception is used in the
2342 * returned CompletableFuture.
2343 *
2344 * @param other the other CompletableFuture
2345 * @param block the action to perform before completing the
2346 * returned CompletableFuture
2347 * @return the new CompletableFuture
2348 */
2349 public CompletableFuture<Void> acceptEither
2350 (CompletableFuture<? extends T> other,
2351 Consumer<? super T> block) {
2352 return doOrAccept(other, block, null);
2353 }
2354
2355 /**
2356 * Creates and returns a CompletableFuture that is completed
2357 * asynchronously using the {@link ForkJoinPool#commonPool()},
2358 * performing the given action with the result of either this or
2359 * the other given CompletableFuture's result, when either complete.
2360 * If this and/or the other CompletableFuture complete exceptionally,
2361 * then the returned CompletableFuture may also do so, with a
2362 * CompletionException holding one of these exceptions as its cause.
2363 * No guarantees are made about which exception is used in the
2364 * returned CompletableFuture.
2365 *
2366 * @param other the other CompletableFuture
2367 * @param block the action to perform before completing the
2368 * returned CompletableFuture
2369 * @return the new CompletableFuture
2370 */
2371 public CompletableFuture<Void> acceptEitherAsync
2372 (CompletableFuture<? extends T> other,
2373 Consumer<? super T> block) {
2374 return doOrAccept(other, block, ForkJoinPool.commonPool());
2375 }
2376
2377 /**
2378 * Creates and returns a CompletableFuture that is completed
2379 * asynchronously using the given executor, performing the given
2380 * action with the result of either this or the other given
2381 * CompletableFuture's result, when either complete.
2382 * If this and/or the other CompletableFuture complete exceptionally,
2383 * then the returned CompletableFuture may also do so, with a
2384 * CompletionException holding one of these exceptions as its cause.
2385 * No guarantees are made about which exception is used in the
2386 * returned CompletableFuture.
2387 *
2388 * @param other the other CompletableFuture
2389 * @param block the action to perform before completing the
2390 * returned CompletableFuture
2391 * @param executor the executor to use for asynchronous execution
2392 * @return the new CompletableFuture
2393 */
2394 public CompletableFuture<Void> acceptEitherAsync
2395 (CompletableFuture<? extends T> other,
2396 Consumer<? super T> block,
2397 Executor executor) {
2398 if (executor == null) throw new NullPointerException();
2399 return doOrAccept(other, block, executor);
2400 }
2401
2402 private CompletableFuture<Void> doOrAccept
2403 (CompletableFuture<? extends T> other,
2404 Consumer<? super T> fn,
2405 Executor e) {
2406 if (other == null || fn == null) throw new NullPointerException();
2407 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2408 OrAcceptCompletion<T> d = null;
2409 Object r;
2410 if ((r = result) == null && (r = other.result) == null) {
2411 d = new OrAcceptCompletion<T>(this, other, fn, dst, e);
2412 CompletionNode q = null, p = new CompletionNode(d);
2413 while ((r = result) == null && (r = other.result) == null) {
2414 if (q != null) {
2415 if (UNSAFE.compareAndSwapObject
2416 (other, COMPLETIONS, q.next = other.completions, q))
2417 break;
2418 }
2419 else if (UNSAFE.compareAndSwapObject
2420 (this, COMPLETIONS, p.next = completions, p))
2421 q = new CompletionNode(d);
2422 }
2423 }
2424 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2425 T t; Throwable ex;
2426 if (r instanceof AltResult) {
2427 ex = ((AltResult)r).ex;
2428 t = null;
2429 }
2430 else {
2431 ex = null;
2432 @SuppressWarnings("unchecked") T tr = (T) r;
2433 t = tr;
2434 }
2435 if (ex == null) {
2436 try {
2437 if (e != null)
2438 e.execute(new AsyncAccept<T>(t, fn, dst));
2439 else
2440 fn.accept(t);
2441 } catch (Throwable rex) {
2442 ex = rex;
2443 }
2444 }
2445 if (e == null || ex != null)
2446 dst.internalComplete(null, ex);
2447 }
2448 helpPostComplete();
2449 other.helpPostComplete();
2450 return dst;
2451 }
2452
2453 /**
2454 * Creates and returns a CompletableFuture that is completed
2455 * after this or the other given CompletableFuture complete.
2456 * If this and/or the other CompletableFuture complete exceptionally,
2457 * then the returned CompletableFuture may also do so, with a
2458 * CompletionException holding one of these exceptions as its cause.
2459 * No guarantees are made about which exception is used in the
2460 * returned CompletableFuture.
2461 *
2462 * @param other the other CompletableFuture
2463 * @param action the action to perform before completing the
2464 * returned CompletableFuture
2465 * @return the new CompletableFuture
2466 */
2467 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2468 Runnable action) {
2469 return doOrRun(other, action, null);
2470 }
2471
2472 /**
2473 * Creates and returns a CompletableFuture that is completed
2474 * asynchronously using the {@link ForkJoinPool#commonPool()}
2475 * after this or the other given CompletableFuture complete.
2476 * If this and/or the other CompletableFuture complete exceptionally,
2477 * then the returned CompletableFuture may also do so, with a
2478 * CompletionException holding one of these exceptions as its cause.
2479 * No guarantees are made about which exception is used in the
2480 * returned CompletableFuture.
2481 *
2482 * @param other the other CompletableFuture
2483 * @param action the action to perform before completing the
2484 * returned CompletableFuture
2485 * @return the new CompletableFuture
2486 */
2487 public CompletableFuture<Void> runAfterEitherAsync
2488 (CompletableFuture<?> other,
2489 Runnable action) {
2490 return doOrRun(other, action, ForkJoinPool.commonPool());
2491 }
2492
2493 /**
2494 * Creates and returns a CompletableFuture that is completed
2495 * asynchronously using the given executor after this or the other
2496 * given CompletableFuture complete.
2497 * If this and/or the other CompletableFuture complete exceptionally,
2498 * then the returned CompletableFuture may also do so, with a
2499 * CompletionException holding one of these exceptions as its cause.
2500 * No guarantees are made about which exception is used in the
2501 * returned CompletableFuture.
2502 *
2503 * @param other the other CompletableFuture
2504 * @param action the action to perform before completing the
2505 * returned CompletableFuture
2506 * @param executor the executor to use for asynchronous execution
2507 * @return the new CompletableFuture
2508 */
2509 public CompletableFuture<Void> runAfterEitherAsync
2510 (CompletableFuture<?> other,
2511 Runnable action,
2512 Executor executor) {
2513 if (executor == null) throw new NullPointerException();
2514 return doOrRun(other, action, executor);
2515 }
2516
2517 private CompletableFuture<Void> doOrRun(CompletableFuture<?> other,
2518 Runnable action,
2519 Executor e) {
2520 if (other == null || action == null) throw new NullPointerException();
2521 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2522 OrRunCompletion<T> d = null;
2523 Object r;
2524 if ((r = result) == null && (r = other.result) == null) {
2525 d = new OrRunCompletion<T>(this, other, action, dst, e);
2526 CompletionNode q = null, p = new CompletionNode(d);
2527 while ((r = result) == null && (r = other.result) == null) {
2528 if (q != null) {
2529 if (UNSAFE.compareAndSwapObject
2530 (other, COMPLETIONS, q.next = other.completions, q))
2531 break;
2532 }
2533 else if (UNSAFE.compareAndSwapObject
2534 (this, COMPLETIONS, p.next = completions, p))
2535 q = new CompletionNode(d);
2536 }
2537 }
2538 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2539 Throwable ex;
2540 if (r instanceof AltResult)
2541 ex = ((AltResult)r).ex;
2542 else
2543 ex = null;
2544 if (ex == null) {
2545 try {
2546 if (e != null)
2547 e.execute(new AsyncRun(action, dst));
2548 else
2549 action.run();
2550 } catch (Throwable rex) {
2551 ex = rex;
2552 }
2553 }
2554 if (e == null || ex != null)
2555 dst.internalComplete(null, ex);
2556 }
2557 helpPostComplete();
2558 other.helpPostComplete();
2559 return dst;
2560 }
2561
2562 /**
2563 * Returns a CompletableFuture (or an equivalent one) produced by
2564 * the given function of the result of this CompletableFuture when
2565 * completed. If this CompletableFuture completes exceptionally,
2566 * then the returned CompletableFuture also does so, with a
2567 * CompletionException holding this exception as its cause.
2568 *
2569 * @param fn the function returning a new CompletableFuture
2570 * @return the CompletableFuture, that {@code isDone()} upon
2571 * return if completed by the given function, or an exception
2572 * occurs
2573 */
2574 public <U> CompletableFuture<U> thenCompose
2575 (Function<? super T, CompletableFuture<U>> fn) {
2576 return doCompose(fn, null);
2577 }
2578
2579 /**
2580 * Returns a CompletableFuture (or an equivalent one) produced
2581 * asynchronously using the {@link ForkJoinPool#commonPool()} by
2582 * the given function of the result of this CompletableFuture when
2583 * completed. If this CompletableFuture completes exceptionally,
2584 * then the returned CompletableFuture also does so, with a
2585 * CompletionException holding this exception as its cause.
2586 *
2587 * @param fn the function returning a new CompletableFuture
2588 * @return the CompletableFuture, that {@code isDone()} upon
2589 * return if completed by the given function, or an exception
2590 * occurs
2591 */
2592 public <U> CompletableFuture<U> thenComposeAsync
2593 (Function<? super T, CompletableFuture<U>> fn) {
2594 return doCompose(fn, ForkJoinPool.commonPool());
2595 }
2596
2597 /**
2598 * Returns a CompletableFuture (or an equivalent one) produced
2599 * asynchronously using the given executor by the given function
2600 * of the result of this CompletableFuture when completed.
2601 * If this CompletableFuture completes exceptionally, then the
2602 * returned CompletableFuture also does so, with a
2603 * CompletionException holding this exception as its cause.
2604 *
2605 * @param fn the function returning a new CompletableFuture
2606 * @param executor the executor to use for asynchronous execution
2607 * @return the CompletableFuture, that {@code isDone()} upon
2608 * return if completed by the given function, or an exception
2609 * occurs
2610 */
2611 public <U> CompletableFuture<U> thenComposeAsync
2612 (Function<? super T, CompletableFuture<U>> fn,
2613 Executor executor) {
2614 if (executor == null) throw new NullPointerException();
2615 return doCompose(fn, executor);
2616 }
2617
2618 private <U> CompletableFuture<U> doCompose
2619 (Function<? super T, CompletableFuture<U>> fn,
2620 Executor e) {
2621 if (fn == null) throw new NullPointerException();
2622 CompletableFuture<U> dst = null;
2623 ComposeCompletion<T,U> d = null;
2624 Object r;
2625 if ((r = result) == null) {
2626 dst = new CompletableFuture<U>();
2627 CompletionNode p = new CompletionNode
2628 (d = new ComposeCompletion<T,U>(this, fn, dst, e));
2629 while ((r = result) == null) {
2630 if (UNSAFE.compareAndSwapObject
2631 (this, COMPLETIONS, p.next = completions, p))
2632 break;
2633 }
2634 }
2635 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2636 T t; Throwable ex;
2637 if (r instanceof AltResult) {
2638 ex = ((AltResult)r).ex;
2639 t = null;
2640 }
2641 else {
2642 ex = null;
2643 @SuppressWarnings("unchecked") T tr = (T) r;
2644 t = tr;
2645 }
2646 if (ex == null) {
2647 if (e != null) {
2648 if (dst == null)
2649 dst = new CompletableFuture<U>();
2650 e.execute(new AsyncCompose<T,U>(t, fn, dst));
2651 }
2652 else {
2653 try {
2654 dst = fn.apply(t);
2655 } catch (Throwable rex) {
2656 ex = rex;
2657 }
2658 if (dst == null) {
2659 dst = new CompletableFuture<U>();
2660 if (ex == null)
2661 ex = new NullPointerException();
2662 }
2663 }
2664 }
2665 if (e == null && ex != null)
2666 dst.internalComplete(null, ex);
2667 }
2668 helpPostComplete();
2669 dst.helpPostComplete();
2670 return dst;
2671 }
2672
2673 /**
2674 * Creates and returns a CompletableFuture that is completed with
2675 * the result of the given function of the exception triggering
2676 * this CompletableFuture's completion when it completes
2677 * exceptionally; Otherwise, if this CompletableFuture completes
2678 * normally, then the returned CompletableFuture also completes
2679 * normally with the same value.
2680 *
2681 * @param fn the function to use to compute the value of the
2682 * returned CompletableFuture if this CompletableFuture completed
2683 * exceptionally
2684 * @return the new CompletableFuture
2685 */
2686 public CompletableFuture<T> exceptionally
2687 (Function<Throwable, ? extends T> fn) {
2688 if (fn == null) throw new NullPointerException();
2689 CompletableFuture<T> dst = new CompletableFuture<T>();
2690 ExceptionCompletion<T> d = null;
2691 Object r;
2692 if ((r = result) == null) {
2693 CompletionNode p =
2694 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2695 while ((r = result) == null) {
2696 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2697 p.next = completions, p))
2698 break;
2699 }
2700 }
2701 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2702 T t = null; Throwable ex, dx = null;
2703 if (r instanceof AltResult) {
2704 if ((ex = ((AltResult)r).ex) != null) {
2705 try {
2706 t = fn.apply(ex);
2707 } catch (Throwable rex) {
2708 dx = rex;
2709 }
2710 }
2711 }
2712 else {
2713 @SuppressWarnings("unchecked") T tr = (T) r;
2714 t = tr;
2715 }
2716 dst.internalComplete(t, dx);
2717 }
2718 helpPostComplete();
2719 return dst;
2720 }
2721
2722 /**
2723 * Creates and returns a CompletableFuture that is completed with
2724 * the result of the given function of the result and exception of
2725 * this CompletableFuture's completion when it completes. The
2726 * given function is invoked with the result (or {@code null} if
2727 * none) and the exception (or {@code null} if none) of this
2728 * CompletableFuture when complete.
2729 *
2730 * @param fn the function to use to compute the value of the
2731 * returned CompletableFuture
2732
2733 * @return the new CompletableFuture
2734 */
2735 public <U> CompletableFuture<U> handle
2736 (BiFunction<? super T, Throwable, ? extends U> fn) {
2737 if (fn == null) throw new NullPointerException();
2738 CompletableFuture<U> dst = new CompletableFuture<U>();
2739 HandleCompletion<T,U> d = null;
2740 Object r;
2741 if ((r = result) == null) {
2742 CompletionNode p =
2743 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2744 while ((r = result) == null) {
2745 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2746 p.next = completions, p))
2747 break;
2748 }
2749 }
2750 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2751 T t; Throwable ex;
2752 if (r instanceof AltResult) {
2753 ex = ((AltResult)r).ex;
2754 t = null;
2755 }
2756 else {
2757 ex = null;
2758 @SuppressWarnings("unchecked") T tr = (T) r;
2759 t = tr;
2760 }
2761 U u; Throwable dx;
2762 try {
2763 u = fn.apply(t, ex);
2764 dx = null;
2765 } catch (Throwable rex) {
2766 dx = rex;
2767 u = null;
2768 }
2769 dst.internalComplete(u, dx);
2770 }
2771 helpPostComplete();
2772 return dst;
2773 }
2774
2775
2776 /* ------------- Arbitrary-arity constructions -------------- */
2777
2778 /*
2779 * The basic plan of attack is to recursively form binary
2780 * completion trees of elements. This can be overkill for small
2781 * sets, but scales nicely. The And/All vs Or/Any forms use the
2782 * same idea, but details differ.
2783 */
2784
2785 /**
2786 * Returns a new CompletableFuture that is completed when all of
2787 * the given CompletableFutures complete. If any of the component
2788 * CompletableFutures complete exceptionally, then so does the
2789 * returned CompletableFuture. Otherwise, the results, if any, of
2790 * the component CompletableFutures are not reflected in the
2791 * returned CompletableFuture, but may be obtained by inspecting
2792 * them individually. If the number of components is zero, returns
2793 * a CompletableFuture completed with the value {@code null}.
2794 *
2795 * <p>Among the applications of this method is to await completion
2796 * of a set of independent CompletableFutures before continuing a
2797 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2798 * c3).join();}.
2799 *
2800 * @param cfs the CompletableFutures
2801 * @return a CompletableFuture that is complete when all of the
2802 * given CompletableFutures complete
2803 * @throws NullPointerException if the array or any of its elements are
2804 * {@code null}
2805 */
2806 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2807 int len = cfs.length; // Directly handle empty and singleton cases
2808 if (len > 1)
2809 return allTree(cfs, 0, len - 1);
2810 else {
2811 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2812 CompletableFuture<?> f;
2813 if (len == 0)
2814 dst.result = NIL;
2815 else if ((f = cfs[0]) == null)
2816 throw new NullPointerException();
2817 else {
2818 ThenCopy d = null;
2819 CompletionNode p = null;
2820 Object r;
2821 while ((r = f.result) == null) {
2822 if (d == null)
2823 d = new ThenCopy(f, dst);
2824 else if (p == null)
2825 p = new CompletionNode(d);
2826 else if (UNSAFE.compareAndSwapObject
2827 (f, COMPLETIONS, p.next = f.completions, p))
2828 break;
2829 }
2830 if (r != null && (d == null || d.compareAndSet(0, 1)))
2831 dst.internalComplete(null, (r instanceof AltResult) ?
2832 ((AltResult)r).ex : null);
2833 f.helpPostComplete();
2834 }
2835 return dst;
2836 }
2837 }
2838
2839 /**
2840 * Recursively constructs an And'ed tree of CompletableFutures.
2841 * Called only when array known to have at least two elements.
2842 */
2843 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2844 int lo, int hi) {
2845 CompletableFuture<?> fst, snd;
2846 int mid = (lo + hi) >>> 1;
2847 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2848 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2849 throw new NullPointerException();
2850 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2851 AndCompletion d = null;
2852 CompletionNode p = null, q = null;
2853 Object r = null, s = null;
2854 while ((r = fst.result) == null || (s = snd.result) == null) {
2855 if (d == null)
2856 d = new AndCompletion(fst, snd, dst);
2857 else if (p == null)
2858 p = new CompletionNode(d);
2859 else if (q == null) {
2860 if (UNSAFE.compareAndSwapObject
2861 (fst, COMPLETIONS, p.next = fst.completions, p))
2862 q = new CompletionNode(d);
2863 }
2864 else if (UNSAFE.compareAndSwapObject
2865 (snd, COMPLETIONS, q.next = snd.completions, q))
2866 break;
2867 }
2868 if ((r != null || (r = fst.result) != null) &&
2869 (s != null || (s = snd.result) != null) &&
2870 (d == null || d.compareAndSet(0, 1))) {
2871 Throwable ex;
2872 if (r instanceof AltResult)
2873 ex = ((AltResult)r).ex;
2874 else
2875 ex = null;
2876 if (ex == null && (s instanceof AltResult))
2877 ex = ((AltResult)s).ex;
2878 dst.internalComplete(null, ex);
2879 }
2880 fst.helpPostComplete();
2881 snd.helpPostComplete();
2882 return dst;
2883 }
2884
2885 /**
2886 * Returns a new CompletableFuture that is completed when any of
2887 * the component CompletableFutures complete; with the same result if
2888 * it completed normally, otherwise exceptionally. If the number
2889 * of components is zero, returns an incomplete CompletableFuture.
2890 *
2891 * @param cfs the CompletableFutures
2892 * @return a CompletableFuture that is complete when any of the
2893 * given CompletableFutures complete
2894 * @throws NullPointerException if the array or any of its elements are
2895 * {@code null}
2896 */
2897 public static CompletableFuture<?> anyOf(CompletableFuture<?>... cfs) {
2898 int len = cfs.length; // Same idea as allOf
2899 if (len > 1)
2900 return anyTree(cfs, 0, len - 1);
2901 else {
2902 CompletableFuture<?> dst = new CompletableFuture<Object>();
2903 CompletableFuture<?> f;
2904 if (len == 0)
2905 ; // skip
2906 else if ((f = cfs[0]) == null)
2907 throw new NullPointerException();
2908 else {
2909 ThenCopy d = null;
2910 CompletionNode p = null;
2911 Object r;
2912 while ((r = f.result) == null) {
2913 if (d == null)
2914 d = new ThenCopy(f, dst);
2915 else if (p == null)
2916 p = new CompletionNode(d);
2917 else if (UNSAFE.compareAndSwapObject
2918 (f, COMPLETIONS, p.next = f.completions, p))
2919 break;
2920 }
2921 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2922 Throwable ex; Object t;
2923 if (r instanceof AltResult) {
2924 ex = ((AltResult)r).ex;
2925 t = null;
2926 }
2927 else {
2928 ex = null;
2929 t = r;
2930 }
2931 dst.internalComplete(t, ex);
2932 }
2933 f.helpPostComplete();
2934 }
2935 return dst;
2936 }
2937 }
2938
2939 /**
2940 * Recursively constructs an Or'ed tree of CompletableFutures.
2941 */
2942 private static CompletableFuture<?> anyTree(CompletableFuture<?>[] cfs,
2943 int lo, int hi) {
2944 CompletableFuture<?> fst, snd;
2945 int mid = (lo + hi) >>> 1;
2946 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
2947 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
2948 throw new NullPointerException();
2949 CompletableFuture<?> dst = new CompletableFuture<Object>();
2950 OrCompletion d = null;
2951 CompletionNode p = null, q = null;
2952 Object r;
2953 while ((r = fst.result) == null && (r = snd.result) == null) {
2954 if (d == null)
2955 d = new OrCompletion(fst, snd, dst);
2956 else if (p == null)
2957 p = new CompletionNode(d);
2958 else if (q == null) {
2959 if (UNSAFE.compareAndSwapObject
2960 (fst, COMPLETIONS, p.next = fst.completions, p))
2961 q = new CompletionNode(d);
2962 }
2963 else if (UNSAFE.compareAndSwapObject
2964 (snd, COMPLETIONS, q.next = snd.completions, q))
2965 break;
2966 }
2967 if ((r != null || (r = fst.result) != null ||
2968 (r = snd.result) != null) &&
2969 (d == null || d.compareAndSet(0, 1))) {
2970 Throwable ex; Object t;
2971 if (r instanceof AltResult) {
2972 ex = ((AltResult)r).ex;
2973 t = null;
2974 }
2975 else {
2976 ex = null;
2977 t = r;
2978 }
2979 dst.internalComplete(t, ex);
2980 }
2981 fst.helpPostComplete();
2982 snd.helpPostComplete();
2983 return dst;
2984 }
2985
2986
2987 /* ------------- Control and status methods -------------- */
2988
2989 /**
2990 * If not already completed, completes this CompletableFuture with
2991 * a {@link CancellationException}. Dependent CompletableFutures
2992 * that have not already completed will also complete
2993 * exceptionally, with a {@link CompletionException} caused by
2994 * this {@code CancellationException}.
2995 *
2996 * @param mayInterruptIfRunning this value has no effect in this
2997 * implementation because interrupts are not used to control
2998 * processing.
2999 *
3000 * @return {@code true} if this task is now cancelled
3001 */
3002 public boolean cancel(boolean mayInterruptIfRunning) {
3003 boolean cancelled = (result == null) &&
3004 UNSAFE.compareAndSwapObject
3005 (this, RESULT, null, new AltResult(new CancellationException()));
3006 postComplete();
3007 return cancelled || isCancelled();
3008 }
3009
3010 /**
3011 * Returns {@code true} if this CompletableFuture was cancelled
3012 * before it completed normally.
3013 *
3014 * @return {@code true} if this CompletableFuture was cancelled
3015 * before it completed normally
3016 */
3017 public boolean isCancelled() {
3018 Object r;
3019 return ((r = result) instanceof AltResult) &&
3020 (((AltResult)r).ex instanceof CancellationException);
3021 }
3022
3023 /**
3024 * Forcibly sets or resets the value subsequently returned by
3025 * method {@link #get()} and related methods, whether or not
3026 * already completed. This method is designed for use only in
3027 * error recovery actions, and even in such situations may result
3028 * in ongoing dependent completions using established versus
3029 * overwritten outcomes.
3030 *
3031 * @param value the completion value
3032 */
3033 public void obtrudeValue(T value) {
3034 result = (value == null) ? NIL : value;
3035 postComplete();
3036 }
3037
3038 /**
3039 * Forcibly causes subsequent invocations of method {@link #get()}
3040 * and related methods to throw the given exception, whether or
3041 * not already completed. This method is designed for use only in
3042 * recovery actions, and even in such situations may result in
3043 * ongoing dependent completions using established versus
3044 * overwritten outcomes.
3045 *
3046 * @param ex the exception
3047 */
3048 public void obtrudeException(Throwable ex) {
3049 if (ex == null) throw new NullPointerException();
3050 result = new AltResult(ex);
3051 postComplete();
3052 }
3053
3054 /**
3055 * Returns the estimated number of CompletableFutures whose
3056 * completions are awaiting completion of this CompletableFuture.
3057 * This method is designed for use in monitoring system state, not
3058 * for synchronization control.
3059 *
3060 * @return the number of dependent CompletableFutures
3061 */
3062 public int getNumberOfDependents() {
3063 int count = 0;
3064 for (CompletionNode p = completions; p != null; p = p.next)
3065 ++count;
3066 return count;
3067 }
3068
3069 /**
3070 * Returns a string identifying this CompletableFuture, as well as
3071 * its completion state. The state, in brackets, contains the
3072 * String {@code "Completed Normally"} or the String {@code
3073 * "Completed Exceptionally"}, or the String {@code "Not
3074 * completed"} followed by the number of CompletableFutures
3075 * dependent upon its completion, if any.
3076 *
3077 * @return a string identifying this CompletableFuture, as well as its state
3078 */
3079 public String toString() {
3080 Object r = result;
3081 int count;
3082 return super.toString() +
3083 ((r == null) ?
3084 (((count = getNumberOfDependents()) == 0) ?
3085 "[Not completed]" :
3086 "[Not completed, " + count + " dependents]") :
3087 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
3088 "[Completed exceptionally]" :
3089 "[Completed normally]"));
3090 }
3091
3092 // Unsafe mechanics
3093 private static final sun.misc.Unsafe UNSAFE;
3094 private static final long RESULT;
3095 private static final long WAITERS;
3096 private static final long COMPLETIONS;
3097 static {
3098 try {
3099 UNSAFE = sun.misc.Unsafe.getUnsafe();
3100 Class<?> k = CompletableFuture.class;
3101 RESULT = UNSAFE.objectFieldOffset
3102 (k.getDeclaredField("result"));
3103 WAITERS = UNSAFE.objectFieldOffset
3104 (k.getDeclaredField("waiters"));
3105 COMPLETIONS = UNSAFE.objectFieldOffset
3106 (k.getDeclaredField("completions"));
3107 } catch (Exception e) {
3108 throw new Error(e);
3109 }
3110 }
3111 }