ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.57
Committed: Sun Mar 17 19:20:21 2013 UTC (11 years, 2 months ago) by jsr166
Branch: MAIN
Changes since 1.56: +2 -0 lines
Log Message:
add missing @since

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.locks.LockSupport;
24
25 /**
26 * A {@link Future} that may be explicitly completed (setting its
27 * value and status), and may include dependent functions and actions
28 * that trigger upon its completion.
29 *
30 * <p>When two or more threads attempt to
31 * {@link #complete complete},
32 * {@link #completeExceptionally completeExceptionally}, or
33 * {@link #cancel cancel}
34 * a CompletableFuture, only one of them succeeds.
35 *
36 * <p>Methods are available for adding dependents based on Functions,
37 * Consumers, and Runnables. The appropriate form to use depends on
38 * whether actions require arguments and/or produce results. Actions
39 * may also be triggered after either or both the current and another
40 * CompletableFuture complete. Multiple CompletableFutures may also
41 * be grouped as one using {@link #anyOf(CompletableFuture...)} and
42 * {@link #allOf(CompletableFuture...)}.
43 *
44 * <p>Actions supplied for dependent completions (mainly using methods
45 * with prefix {@code then}) may be performed by the thread that
46 * completes the current CompletableFuture, or by any other caller of
47 * these methods. There are no guarantees about the order of
48 * processing completions unless constrained by these methods.
49 *
50 * <p>Since (unlike {@link FutureTask}) this class has no direct
51 * control over the computation that causes it to be completed,
52 * cancellation is treated as just another form of exceptional completion.
53 * Method {@link #cancel cancel} has the same effect as
54 * {@code completeExceptionally(new CancellationException())}.
55 *
56 * <p>Upon exceptional completion (including cancellation), or when a
57 * completion entails an additional computation which terminates
58 * abruptly with an (unchecked) exception or error, then all of their
59 * dependent completions (and their dependents in turn) generally act
60 * as {@code completeExceptionally} with a {@link CompletionException}
61 * holding that exception as its cause. However, the {@link
62 * #exceptionally exceptionally} and {@link #handle handle}
63 * completions <em>are</em> able to handle exceptional completions of
64 * the CompletableFutures they depend on.
65 *
66 * <p>In case of exceptional completion with a CompletionException,
67 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
68 * {@link ExecutionException} with the same cause as held in the
69 * corresponding CompletionException. However, in these cases,
70 * methods {@link #join()} and {@link #getNow} throw the
71 * CompletionException, which simplifies usage.
72 *
73 * <p>CompletableFutures themselves do not execute asynchronously.
74 * However, the {@code async} methods provide commonly useful ways to
75 * commence asynchronous processing, using either a given {@link
76 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
77 * function or action that will result in the completion of a new
78 * CompletableFuture. To simplify monitoring, debugging, and tracking,
79 * all generated asynchronous tasks are instances of the tagging
80 * interface {@link AsynchronousCompletionTask}.
81 *
82 * @author Doug Lea
83 * @since 1.8
84 */
85 public class CompletableFuture<T> implements Future<T> {
86
87 /*
88 * Overview:
89 *
90 * 1. Non-nullness of field result (set via CAS) indicates done.
91 * An AltResult is used to box null as a result, as well as to
92 * hold exceptions. Using a single field makes completion fast
93 * and simple to detect and trigger, at the expense of a lot of
94 * encoding and decoding that infiltrates many methods. One minor
95 * simplification relies on the (static) NIL (to box null results)
96 * being the only AltResult with a null exception field, so we
97 * don't usually need explicit comparisons with NIL. The CF
98 * exception propagation mechanics surrounding decoding rely on
99 * unchecked casts of decoded results really being unchecked,
100 * where user type errors are caught at point of use, as is
101 * currently the case in Java. These are highlighted by using
102 * SuppressWarnings-annotated temporaries.
103 *
104 * 2. Waiters are held in a Treiber stack similar to the one used
105 * in FutureTask, Phaser, and SynchronousQueue. See their
106 * internal documentation for algorithmic details.
107 *
108 * 3. Completions are also kept in a list/stack, and pulled off
109 * and run when completion is triggered. (We could even use the
110 * same stack as for waiters, but would give up the potential
111 * parallelism obtained because woken waiters help release/run
112 * others -- see method postComplete). Because post-processing
113 * may race with direct calls, class Completion opportunistically
114 * extends AtomicInteger so callers can claim the action via
115 * compareAndSet(0, 1). The Completion.run methods are all
116 * written a boringly similar uniform way (that sometimes includes
117 * unnecessary-looking checks, kept to maintain uniformity).
118 * There are enough dimensions upon which they differ that
119 * attempts to factor commonalities while maintaining efficiency
120 * require more lines of code than they would save.
121 *
122 * 4. The exported then/and/or methods do support a bit of
123 * factoring (see doThenApply etc). They must cope with the
124 * intrinsic races surrounding addition of a dependent action
125 * versus performing the action directly because the task is
126 * already complete. For example, a CF may not be complete upon
127 * entry, so a dependent completion is added, but by the time it
128 * is added, the target CF is complete, so must be directly
129 * executed. This is all done while avoiding unnecessary object
130 * construction in safe-bypass cases.
131 */
132
133 // preliminaries
134
135 static final class AltResult {
136 final Throwable ex; // null only for NIL
137 AltResult(Throwable ex) { this.ex = ex; }
138 }
139
140 static final AltResult NIL = new AltResult(null);
141
142 // Fields
143
144 volatile Object result; // Either the result or boxed AltResult
145 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
146 volatile CompletionNode completions; // list (Treiber stack) of completions
147
148 // Basic utilities for triggering and processing completions
149
150 /**
151 * Removes and signals all waiting threads and runs all completions.
152 */
153 final void postComplete() {
154 WaitNode q; Thread t;
155 while ((q = waiters) != null) {
156 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
157 (t = q.thread) != null) {
158 q.thread = null;
159 LockSupport.unpark(t);
160 }
161 }
162
163 CompletionNode h; Completion c;
164 while ((h = completions) != null) {
165 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
166 (c = h.completion) != null)
167 c.run();
168 }
169 }
170
171 /**
172 * Triggers completion with the encoding of the given arguments:
173 * if the exception is non-null, encodes it as a wrapped
174 * CompletionException unless it is one already. Otherwise uses
175 * the given result, boxed as NIL if null.
176 */
177 final void internalComplete(Object v, Throwable ex) {
178 if (result == null)
179 UNSAFE.compareAndSwapObject
180 (this, RESULT, null,
181 (ex == null) ? (v == null) ? NIL : v :
182 new AltResult((ex instanceof CompletionException) ? ex :
183 new CompletionException(ex)));
184 postComplete(); // help out even if not triggered
185 }
186
187 /**
188 * If triggered, helps release and/or process completions.
189 */
190 final void helpPostComplete() {
191 if (result != null)
192 postComplete();
193 }
194
195 /* ------------- waiting for completions -------------- */
196
197 /** Number of processors, for spin control */
198 static final int NCPU = Runtime.getRuntime().availableProcessors();
199
200 /**
201 * Heuristic spin value for waitingGet() before blocking on
202 * multiprocessors
203 */
204 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
205
206 /**
207 * Linked nodes to record waiting threads in a Treiber stack. See
208 * other classes such as Phaser and SynchronousQueue for more
209 * detailed explanation. This class implements ManagedBlocker to
210 * avoid starvation when blocking actions pile up in
211 * ForkJoinPools.
212 */
213 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
214 long nanos; // wait time if timed
215 final long deadline; // non-zero if timed
216 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
217 volatile Thread thread;
218 volatile WaitNode next;
219 WaitNode(boolean interruptible, long nanos, long deadline) {
220 this.thread = Thread.currentThread();
221 this.interruptControl = interruptible ? 1 : 0;
222 this.nanos = nanos;
223 this.deadline = deadline;
224 }
225 public boolean isReleasable() {
226 if (thread == null)
227 return true;
228 if (Thread.interrupted()) {
229 int i = interruptControl;
230 interruptControl = -1;
231 if (i > 0)
232 return true;
233 }
234 if (deadline != 0L &&
235 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
236 thread = null;
237 return true;
238 }
239 return false;
240 }
241 public boolean block() {
242 if (isReleasable())
243 return true;
244 else if (deadline == 0L)
245 LockSupport.park(this);
246 else if (nanos > 0L)
247 LockSupport.parkNanos(this, nanos);
248 return isReleasable();
249 }
250 }
251
252 /**
253 * Returns raw result after waiting, or null if interruptible and
254 * interrupted.
255 */
256 private Object waitingGet(boolean interruptible) {
257 WaitNode q = null;
258 boolean queued = false;
259 int spins = SPINS;
260 for (Object r;;) {
261 if ((r = result) != null) {
262 if (q != null) { // suppress unpark
263 q.thread = null;
264 if (q.interruptControl < 0) {
265 if (interruptible) {
266 removeWaiter(q);
267 return null;
268 }
269 Thread.currentThread().interrupt();
270 }
271 }
272 postComplete(); // help release others
273 return r;
274 }
275 else if (spins > 0) {
276 int rnd = ThreadLocalRandom.nextSecondarySeed();
277 if (rnd == 0)
278 rnd = ThreadLocalRandom.current().nextInt();
279 if (rnd >= 0)
280 --spins;
281 }
282 else if (q == null)
283 q = new WaitNode(interruptible, 0L, 0L);
284 else if (!queued)
285 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
286 q.next = waiters, q);
287 else if (interruptible && q.interruptControl < 0) {
288 removeWaiter(q);
289 return null;
290 }
291 else if (q.thread != null && result == null) {
292 try {
293 ForkJoinPool.managedBlock(q);
294 } catch (InterruptedException ex) {
295 q.interruptControl = -1;
296 }
297 }
298 }
299 }
300
301 /**
302 * Awaits completion or aborts on interrupt or timeout.
303 *
304 * @param nanos time to wait
305 * @return raw result
306 */
307 private Object timedAwaitDone(long nanos)
308 throws InterruptedException, TimeoutException {
309 WaitNode q = null;
310 boolean queued = false;
311 for (Object r;;) {
312 if ((r = result) != null) {
313 if (q != null) {
314 q.thread = null;
315 if (q.interruptControl < 0) {
316 removeWaiter(q);
317 throw new InterruptedException();
318 }
319 }
320 postComplete();
321 return r;
322 }
323 else if (q == null) {
324 if (nanos <= 0L)
325 throw new TimeoutException();
326 long d = System.nanoTime() + nanos;
327 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
328 }
329 else if (!queued)
330 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
331 q.next = waiters, q);
332 else if (q.interruptControl < 0) {
333 removeWaiter(q);
334 throw new InterruptedException();
335 }
336 else if (q.nanos <= 0L) {
337 if (result == null) {
338 removeWaiter(q);
339 throw new TimeoutException();
340 }
341 }
342 else if (q.thread != null && result == null) {
343 try {
344 ForkJoinPool.managedBlock(q);
345 } catch (InterruptedException ex) {
346 q.interruptControl = -1;
347 }
348 }
349 }
350 }
351
352 /**
353 * Tries to unlink a timed-out or interrupted wait node to avoid
354 * accumulating garbage. Internal nodes are simply unspliced
355 * without CAS since it is harmless if they are traversed anyway
356 * by releasers. To avoid effects of unsplicing from already
357 * removed nodes, the list is retraversed in case of an apparent
358 * race. This is slow when there are a lot of nodes, but we don't
359 * expect lists to be long enough to outweigh higher-overhead
360 * schemes.
361 */
362 private void removeWaiter(WaitNode node) {
363 if (node != null) {
364 node.thread = null;
365 retry:
366 for (;;) { // restart on removeWaiter race
367 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
368 s = q.next;
369 if (q.thread != null)
370 pred = q;
371 else if (pred != null) {
372 pred.next = s;
373 if (pred.thread == null) // check for race
374 continue retry;
375 }
376 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
377 continue retry;
378 }
379 break;
380 }
381 }
382 }
383
384 /* ------------- Async tasks -------------- */
385
386 /**
387 * A marker interface identifying asynchronous tasks produced by
388 * {@code async} methods. This may be useful for monitoring,
389 * debugging, and tracking asynchronous activities.
390 *
391 * @since 1.8
392 */
393 public static interface AsynchronousCompletionTask {
394 }
395
396 /** Base class can act as either FJ or plain Runnable */
397 abstract static class Async extends ForkJoinTask<Void>
398 implements Runnable, AsynchronousCompletionTask {
399 public final Void getRawResult() { return null; }
400 public final void setRawResult(Void v) { }
401 public final void run() { exec(); }
402 }
403
404 static final class AsyncRun extends Async {
405 final Runnable fn;
406 final CompletableFuture<Void> dst;
407 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
408 this.fn = fn; this.dst = dst;
409 }
410 public final boolean exec() {
411 CompletableFuture<Void> d; Throwable ex;
412 if ((d = this.dst) != null && d.result == null) {
413 try {
414 fn.run();
415 ex = null;
416 } catch (Throwable rex) {
417 ex = rex;
418 }
419 d.internalComplete(null, ex);
420 }
421 return true;
422 }
423 private static final long serialVersionUID = 5232453952276885070L;
424 }
425
426 static final class AsyncSupply<U> extends Async {
427 final Supplier<U> fn;
428 final CompletableFuture<U> dst;
429 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
430 this.fn = fn; this.dst = dst;
431 }
432 public final boolean exec() {
433 CompletableFuture<U> d; U u; Throwable ex;
434 if ((d = this.dst) != null && d.result == null) {
435 try {
436 u = fn.get();
437 ex = null;
438 } catch (Throwable rex) {
439 ex = rex;
440 u = null;
441 }
442 d.internalComplete(u, ex);
443 }
444 return true;
445 }
446 private static final long serialVersionUID = 5232453952276885070L;
447 }
448
449 static final class AsyncApply<T,U> extends Async {
450 final Function<? super T,? extends U> fn;
451 final T arg;
452 final CompletableFuture<U> dst;
453 AsyncApply(T arg, Function<? super T,? extends U> fn,
454 CompletableFuture<U> dst) {
455 this.arg = arg; this.fn = fn; this.dst = dst;
456 }
457 public final boolean exec() {
458 CompletableFuture<U> d; U u; Throwable ex;
459 if ((d = this.dst) != null && d.result == null) {
460 try {
461 u = fn.apply(arg);
462 ex = null;
463 } catch (Throwable rex) {
464 ex = rex;
465 u = null;
466 }
467 d.internalComplete(u, ex);
468 }
469 return true;
470 }
471 private static final long serialVersionUID = 5232453952276885070L;
472 }
473
474 static final class AsyncBiApply<T,U,V> extends Async {
475 final BiFunction<? super T,? super U,? extends V> fn;
476 final T arg1;
477 final U arg2;
478 final CompletableFuture<V> dst;
479 AsyncBiApply(T arg1, U arg2,
480 BiFunction<? super T,? super U,? extends V> fn,
481 CompletableFuture<V> dst) {
482 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
483 }
484 public final boolean exec() {
485 CompletableFuture<V> d; V v; Throwable ex;
486 if ((d = this.dst) != null && d.result == null) {
487 try {
488 v = fn.apply(arg1, arg2);
489 ex = null;
490 } catch (Throwable rex) {
491 ex = rex;
492 v = null;
493 }
494 d.internalComplete(v, ex);
495 }
496 return true;
497 }
498 private static final long serialVersionUID = 5232453952276885070L;
499 }
500
501 static final class AsyncAccept<T> extends Async {
502 final Consumer<? super T> fn;
503 final T arg;
504 final CompletableFuture<Void> dst;
505 AsyncAccept(T arg, Consumer<? super T> fn,
506 CompletableFuture<Void> dst) {
507 this.arg = arg; this.fn = fn; this.dst = dst;
508 }
509 public final boolean exec() {
510 CompletableFuture<Void> d; Throwable ex;
511 if ((d = this.dst) != null && d.result == null) {
512 try {
513 fn.accept(arg);
514 ex = null;
515 } catch (Throwable rex) {
516 ex = rex;
517 }
518 d.internalComplete(null, ex);
519 }
520 return true;
521 }
522 private static final long serialVersionUID = 5232453952276885070L;
523 }
524
525 static final class AsyncBiAccept<T,U> extends Async {
526 final BiConsumer<? super T,? super U> fn;
527 final T arg1;
528 final U arg2;
529 final CompletableFuture<Void> dst;
530 AsyncBiAccept(T arg1, U arg2,
531 BiConsumer<? super T,? super U> fn,
532 CompletableFuture<Void> dst) {
533 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
534 }
535 public final boolean exec() {
536 CompletableFuture<Void> d; Throwable ex;
537 if ((d = this.dst) != null && d.result == null) {
538 try {
539 fn.accept(arg1, arg2);
540 ex = null;
541 } catch (Throwable rex) {
542 ex = rex;
543 }
544 d.internalComplete(null, ex);
545 }
546 return true;
547 }
548 private static final long serialVersionUID = 5232453952276885070L;
549 }
550
551 static final class AsyncCompose<T,U> extends Async {
552 final Function<? super T, CompletableFuture<U>> fn;
553 final T arg;
554 final CompletableFuture<U> dst;
555 AsyncCompose(T arg,
556 Function<? super T, CompletableFuture<U>> fn,
557 CompletableFuture<U> dst) {
558 this.arg = arg; this.fn = fn; this.dst = dst;
559 }
560 public final boolean exec() {
561 CompletableFuture<U> d, fr; U u; Throwable ex;
562 if ((d = this.dst) != null && d.result == null) {
563 try {
564 fr = fn.apply(arg);
565 ex = null;
566 } catch (Throwable rex) {
567 ex = rex;
568 fr = null;
569 }
570 if (ex != null)
571 u = null;
572 else if (fr == null) {
573 ex = new NullPointerException();
574 u = null;
575 }
576 else {
577 Object r = fr.result;
578 if (r instanceof AltResult) {
579 ex = ((AltResult)r).ex;
580 u = null;
581 }
582 else {
583 @SuppressWarnings("unchecked") U ur = (U) r;
584 u = ur;
585 }
586 }
587 d.internalComplete(u, ex);
588 }
589 return true;
590 }
591 private static final long serialVersionUID = 5232453952276885070L;
592 }
593
594 /* ------------- Completions -------------- */
595
596 /**
597 * Simple linked list nodes to record completions, used in
598 * basically the same way as WaitNodes. (We separate nodes from
599 * the Completions themselves mainly because for the And and Or
600 * methods, the same Completion object resides in two lists.)
601 */
602 static final class CompletionNode {
603 final Completion completion;
604 volatile CompletionNode next;
605 CompletionNode(Completion completion) { this.completion = completion; }
606 }
607
608 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
609 abstract static class Completion extends AtomicInteger implements Runnable {
610 }
611
612 static final class ApplyCompletion<T,U> extends Completion {
613 final CompletableFuture<? extends T> src;
614 final Function<? super T,? extends U> fn;
615 final CompletableFuture<U> dst;
616 final Executor executor;
617 ApplyCompletion(CompletableFuture<? extends T> src,
618 Function<? super T,? extends U> fn,
619 CompletableFuture<U> dst, Executor executor) {
620 this.src = src; this.fn = fn; this.dst = dst;
621 this.executor = executor;
622 }
623 public final void run() {
624 final CompletableFuture<? extends T> a;
625 final Function<? super T,? extends U> fn;
626 final CompletableFuture<U> dst;
627 Object r; T t; Throwable ex;
628 if ((dst = this.dst) != null &&
629 (fn = this.fn) != null &&
630 (a = this.src) != null &&
631 (r = a.result) != null &&
632 compareAndSet(0, 1)) {
633 if (r instanceof AltResult) {
634 ex = ((AltResult)r).ex;
635 t = null;
636 }
637 else {
638 ex = null;
639 @SuppressWarnings("unchecked") T tr = (T) r;
640 t = tr;
641 }
642 Executor e = executor;
643 U u = null;
644 if (ex == null) {
645 try {
646 if (e != null)
647 e.execute(new AsyncApply<T,U>(t, fn, dst));
648 else
649 u = fn.apply(t);
650 } catch (Throwable rex) {
651 ex = rex;
652 }
653 }
654 if (e == null || ex != null)
655 dst.internalComplete(u, ex);
656 }
657 }
658 private static final long serialVersionUID = 5232453952276885070L;
659 }
660
661 static final class AcceptCompletion<T> extends Completion {
662 final CompletableFuture<? extends T> src;
663 final Consumer<? super T> fn;
664 final CompletableFuture<Void> dst;
665 final Executor executor;
666 AcceptCompletion(CompletableFuture<? extends T> src,
667 Consumer<? super T> fn,
668 CompletableFuture<Void> dst, Executor executor) {
669 this.src = src; this.fn = fn; this.dst = dst;
670 this.executor = executor;
671 }
672 public final void run() {
673 final CompletableFuture<? extends T> a;
674 final Consumer<? super T> fn;
675 final CompletableFuture<Void> dst;
676 Object r; T t; Throwable ex;
677 if ((dst = this.dst) != null &&
678 (fn = this.fn) != null &&
679 (a = this.src) != null &&
680 (r = a.result) != null &&
681 compareAndSet(0, 1)) {
682 if (r instanceof AltResult) {
683 ex = ((AltResult)r).ex;
684 t = null;
685 }
686 else {
687 ex = null;
688 @SuppressWarnings("unchecked") T tr = (T) r;
689 t = tr;
690 }
691 Executor e = executor;
692 if (ex == null) {
693 try {
694 if (e != null)
695 e.execute(new AsyncAccept<T>(t, fn, dst));
696 else
697 fn.accept(t);
698 } catch (Throwable rex) {
699 ex = rex;
700 }
701 }
702 if (e == null || ex != null)
703 dst.internalComplete(null, ex);
704 }
705 }
706 private static final long serialVersionUID = 5232453952276885070L;
707 }
708
709 static final class RunCompletion<T> extends Completion {
710 final CompletableFuture<? extends T> src;
711 final Runnable fn;
712 final CompletableFuture<Void> dst;
713 final Executor executor;
714 RunCompletion(CompletableFuture<? extends T> src,
715 Runnable fn,
716 CompletableFuture<Void> dst,
717 Executor executor) {
718 this.src = src; this.fn = fn; this.dst = dst;
719 this.executor = executor;
720 }
721 public final void run() {
722 final CompletableFuture<? extends T> a;
723 final Runnable fn;
724 final CompletableFuture<Void> dst;
725 Object r; Throwable ex;
726 if ((dst = this.dst) != null &&
727 (fn = this.fn) != null &&
728 (a = this.src) != null &&
729 (r = a.result) != null &&
730 compareAndSet(0, 1)) {
731 if (r instanceof AltResult)
732 ex = ((AltResult)r).ex;
733 else
734 ex = null;
735 Executor e = executor;
736 if (ex == null) {
737 try {
738 if (e != null)
739 e.execute(new AsyncRun(fn, dst));
740 else
741 fn.run();
742 } catch (Throwable rex) {
743 ex = rex;
744 }
745 }
746 if (e == null || ex != null)
747 dst.internalComplete(null, ex);
748 }
749 }
750 private static final long serialVersionUID = 5232453952276885070L;
751 }
752
753 static final class BiApplyCompletion<T,U,V> extends Completion {
754 final CompletableFuture<? extends T> src;
755 final CompletableFuture<? extends U> snd;
756 final BiFunction<? super T,? super U,? extends V> fn;
757 final CompletableFuture<V> dst;
758 final Executor executor;
759 BiApplyCompletion(CompletableFuture<? extends T> src,
760 CompletableFuture<? extends U> snd,
761 BiFunction<? super T,? super U,? extends V> fn,
762 CompletableFuture<V> dst, Executor executor) {
763 this.src = src; this.snd = snd;
764 this.fn = fn; this.dst = dst;
765 this.executor = executor;
766 }
767 public final void run() {
768 final CompletableFuture<? extends T> a;
769 final CompletableFuture<? extends U> b;
770 final BiFunction<? super T,? super U,? extends V> fn;
771 final CompletableFuture<V> dst;
772 Object r, s; T t; U u; Throwable ex;
773 if ((dst = this.dst) != null &&
774 (fn = this.fn) != null &&
775 (a = this.src) != null &&
776 (r = a.result) != null &&
777 (b = this.snd) != null &&
778 (s = b.result) != null &&
779 compareAndSet(0, 1)) {
780 if (r instanceof AltResult) {
781 ex = ((AltResult)r).ex;
782 t = null;
783 }
784 else {
785 ex = null;
786 @SuppressWarnings("unchecked") T tr = (T) r;
787 t = tr;
788 }
789 if (ex != null)
790 u = null;
791 else if (s instanceof AltResult) {
792 ex = ((AltResult)s).ex;
793 u = null;
794 }
795 else {
796 @SuppressWarnings("unchecked") U us = (U) s;
797 u = us;
798 }
799 Executor e = executor;
800 V v = null;
801 if (ex == null) {
802 try {
803 if (e != null)
804 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
805 else
806 v = fn.apply(t, u);
807 } catch (Throwable rex) {
808 ex = rex;
809 }
810 }
811 if (e == null || ex != null)
812 dst.internalComplete(v, ex);
813 }
814 }
815 private static final long serialVersionUID = 5232453952276885070L;
816 }
817
818 static final class BiAcceptCompletion<T,U> extends Completion {
819 final CompletableFuture<? extends T> src;
820 final CompletableFuture<? extends U> snd;
821 final BiConsumer<? super T,? super U> fn;
822 final CompletableFuture<Void> dst;
823 final Executor executor;
824 BiAcceptCompletion(CompletableFuture<? extends T> src,
825 CompletableFuture<? extends U> snd,
826 BiConsumer<? super T,? super U> fn,
827 CompletableFuture<Void> dst, Executor executor) {
828 this.src = src; this.snd = snd;
829 this.fn = fn; this.dst = dst;
830 this.executor = executor;
831 }
832 public final void run() {
833 final CompletableFuture<? extends T> a;
834 final CompletableFuture<? extends U> b;
835 final BiConsumer<? super T,? super U> fn;
836 final CompletableFuture<Void> dst;
837 Object r, s; T t; U u; Throwable ex;
838 if ((dst = this.dst) != null &&
839 (fn = this.fn) != null &&
840 (a = this.src) != null &&
841 (r = a.result) != null &&
842 (b = this.snd) != null &&
843 (s = b.result) != null &&
844 compareAndSet(0, 1)) {
845 if (r instanceof AltResult) {
846 ex = ((AltResult)r).ex;
847 t = null;
848 }
849 else {
850 ex = null;
851 @SuppressWarnings("unchecked") T tr = (T) r;
852 t = tr;
853 }
854 if (ex != null)
855 u = null;
856 else if (s instanceof AltResult) {
857 ex = ((AltResult)s).ex;
858 u = null;
859 }
860 else {
861 @SuppressWarnings("unchecked") U us = (U) s;
862 u = us;
863 }
864 Executor e = executor;
865 if (ex == null) {
866 try {
867 if (e != null)
868 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
869 else
870 fn.accept(t, u);
871 } catch (Throwable rex) {
872 ex = rex;
873 }
874 }
875 if (e == null || ex != null)
876 dst.internalComplete(null, ex);
877 }
878 }
879 private static final long serialVersionUID = 5232453952276885070L;
880 }
881
882 static final class BiRunCompletion<T> extends Completion {
883 final CompletableFuture<? extends T> src;
884 final CompletableFuture<?> snd;
885 final Runnable fn;
886 final CompletableFuture<Void> dst;
887 final Executor executor;
888 BiRunCompletion(CompletableFuture<? extends T> src,
889 CompletableFuture<?> snd,
890 Runnable fn,
891 CompletableFuture<Void> dst, Executor executor) {
892 this.src = src; this.snd = snd;
893 this.fn = fn; this.dst = dst;
894 this.executor = executor;
895 }
896 public final void run() {
897 final CompletableFuture<? extends T> a;
898 final CompletableFuture<?> b;
899 final Runnable fn;
900 final CompletableFuture<Void> dst;
901 Object r, s; Throwable ex;
902 if ((dst = this.dst) != null &&
903 (fn = this.fn) != null &&
904 (a = this.src) != null &&
905 (r = a.result) != null &&
906 (b = this.snd) != null &&
907 (s = b.result) != null &&
908 compareAndSet(0, 1)) {
909 if (r instanceof AltResult)
910 ex = ((AltResult)r).ex;
911 else
912 ex = null;
913 if (ex == null && (s instanceof AltResult))
914 ex = ((AltResult)s).ex;
915 Executor e = executor;
916 if (ex == null) {
917 try {
918 if (e != null)
919 e.execute(new AsyncRun(fn, dst));
920 else
921 fn.run();
922 } catch (Throwable rex) {
923 ex = rex;
924 }
925 }
926 if (e == null || ex != null)
927 dst.internalComplete(null, ex);
928 }
929 }
930 private static final long serialVersionUID = 5232453952276885070L;
931 }
932
933 static final class AndCompletion extends Completion {
934 final CompletableFuture<?> src;
935 final CompletableFuture<?> snd;
936 final CompletableFuture<Void> dst;
937 AndCompletion(CompletableFuture<?> src,
938 CompletableFuture<?> snd,
939 CompletableFuture<Void> dst) {
940 this.src = src; this.snd = snd; this.dst = dst;
941 }
942 public final void run() {
943 final CompletableFuture<?> a;
944 final CompletableFuture<?> b;
945 final CompletableFuture<Void> dst;
946 Object r, s; Throwable ex;
947 if ((dst = this.dst) != null &&
948 (a = this.src) != null &&
949 (r = a.result) != null &&
950 (b = this.snd) != null &&
951 (s = b.result) != null &&
952 compareAndSet(0, 1)) {
953 if (r instanceof AltResult)
954 ex = ((AltResult)r).ex;
955 else
956 ex = null;
957 if (ex == null && (s instanceof AltResult))
958 ex = ((AltResult)s).ex;
959 dst.internalComplete(null, ex);
960 }
961 }
962 private static final long serialVersionUID = 5232453952276885070L;
963 }
964
965 static final class OrApplyCompletion<T,U> extends Completion {
966 final CompletableFuture<? extends T> src;
967 final CompletableFuture<? extends T> snd;
968 final Function<? super T,? extends U> fn;
969 final CompletableFuture<U> dst;
970 final Executor executor;
971 OrApplyCompletion(CompletableFuture<? extends T> src,
972 CompletableFuture<? extends T> snd,
973 Function<? super T,? extends U> fn,
974 CompletableFuture<U> dst, Executor executor) {
975 this.src = src; this.snd = snd;
976 this.fn = fn; this.dst = dst;
977 this.executor = executor;
978 }
979 public final void run() {
980 final CompletableFuture<? extends T> a;
981 final CompletableFuture<? extends T> b;
982 final Function<? super T,? extends U> fn;
983 final CompletableFuture<U> dst;
984 Object r; T t; Throwable ex;
985 if ((dst = this.dst) != null &&
986 (fn = this.fn) != null &&
987 (((a = this.src) != null && (r = a.result) != null) ||
988 ((b = this.snd) != null && (r = b.result) != null)) &&
989 compareAndSet(0, 1)) {
990 if (r instanceof AltResult) {
991 ex = ((AltResult)r).ex;
992 t = null;
993 }
994 else {
995 ex = null;
996 @SuppressWarnings("unchecked") T tr = (T) r;
997 t = tr;
998 }
999 Executor e = executor;
1000 U u = null;
1001 if (ex == null) {
1002 try {
1003 if (e != null)
1004 e.execute(new AsyncApply<T,U>(t, fn, dst));
1005 else
1006 u = fn.apply(t);
1007 } catch (Throwable rex) {
1008 ex = rex;
1009 }
1010 }
1011 if (e == null || ex != null)
1012 dst.internalComplete(u, ex);
1013 }
1014 }
1015 private static final long serialVersionUID = 5232453952276885070L;
1016 }
1017
1018 static final class OrAcceptCompletion<T> extends Completion {
1019 final CompletableFuture<? extends T> src;
1020 final CompletableFuture<? extends T> snd;
1021 final Consumer<? super T> fn;
1022 final CompletableFuture<Void> dst;
1023 final Executor executor;
1024 OrAcceptCompletion(CompletableFuture<? extends T> src,
1025 CompletableFuture<? extends T> snd,
1026 Consumer<? super T> fn,
1027 CompletableFuture<Void> dst, Executor executor) {
1028 this.src = src; this.snd = snd;
1029 this.fn = fn; this.dst = dst;
1030 this.executor = executor;
1031 }
1032 public final void run() {
1033 final CompletableFuture<? extends T> a;
1034 final CompletableFuture<? extends T> b;
1035 final Consumer<? super T> fn;
1036 final CompletableFuture<Void> dst;
1037 Object r; T t; Throwable ex;
1038 if ((dst = this.dst) != null &&
1039 (fn = this.fn) != null &&
1040 (((a = this.src) != null && (r = a.result) != null) ||
1041 ((b = this.snd) != null && (r = b.result) != null)) &&
1042 compareAndSet(0, 1)) {
1043 if (r instanceof AltResult) {
1044 ex = ((AltResult)r).ex;
1045 t = null;
1046 }
1047 else {
1048 ex = null;
1049 @SuppressWarnings("unchecked") T tr = (T) r;
1050 t = tr;
1051 }
1052 Executor e = executor;
1053 if (ex == null) {
1054 try {
1055 if (e != null)
1056 e.execute(new AsyncAccept<T>(t, fn, dst));
1057 else
1058 fn.accept(t);
1059 } catch (Throwable rex) {
1060 ex = rex;
1061 }
1062 }
1063 if (e == null || ex != null)
1064 dst.internalComplete(null, ex);
1065 }
1066 }
1067 private static final long serialVersionUID = 5232453952276885070L;
1068 }
1069
1070 static final class OrRunCompletion<T> extends Completion {
1071 final CompletableFuture<? extends T> src;
1072 final CompletableFuture<?> snd;
1073 final Runnable fn;
1074 final CompletableFuture<Void> dst;
1075 final Executor executor;
1076 OrRunCompletion(CompletableFuture<? extends T> src,
1077 CompletableFuture<?> snd,
1078 Runnable fn,
1079 CompletableFuture<Void> dst, Executor executor) {
1080 this.src = src; this.snd = snd;
1081 this.fn = fn; this.dst = dst;
1082 this.executor = executor;
1083 }
1084 public final void run() {
1085 final CompletableFuture<? extends T> a;
1086 final CompletableFuture<?> b;
1087 final Runnable fn;
1088 final CompletableFuture<Void> dst;
1089 Object r; Throwable ex;
1090 if ((dst = this.dst) != null &&
1091 (fn = this.fn) != null &&
1092 (((a = this.src) != null && (r = a.result) != null) ||
1093 ((b = this.snd) != null && (r = b.result) != null)) &&
1094 compareAndSet(0, 1)) {
1095 if (r instanceof AltResult)
1096 ex = ((AltResult)r).ex;
1097 else
1098 ex = null;
1099 Executor e = executor;
1100 if (ex == null) {
1101 try {
1102 if (e != null)
1103 e.execute(new AsyncRun(fn, dst));
1104 else
1105 fn.run();
1106 } catch (Throwable rex) {
1107 ex = rex;
1108 }
1109 }
1110 if (e == null || ex != null)
1111 dst.internalComplete(null, ex);
1112 }
1113 }
1114 private static final long serialVersionUID = 5232453952276885070L;
1115 }
1116
1117 static final class OrCompletion extends Completion {
1118 final CompletableFuture<?> src;
1119 final CompletableFuture<?> snd;
1120 final CompletableFuture<?> dst;
1121 OrCompletion(CompletableFuture<?> src,
1122 CompletableFuture<?> snd,
1123 CompletableFuture<?> dst) {
1124 this.src = src; this.snd = snd; this.dst = dst;
1125 }
1126 public final void run() {
1127 final CompletableFuture<?> a;
1128 final CompletableFuture<?> b;
1129 final CompletableFuture<?> dst;
1130 Object r, t; Throwable ex;
1131 if ((dst = this.dst) != null &&
1132 (((a = this.src) != null && (r = a.result) != null) ||
1133 ((b = this.snd) != null && (r = b.result) != null)) &&
1134 compareAndSet(0, 1)) {
1135 if (r instanceof AltResult) {
1136 ex = ((AltResult)r).ex;
1137 t = null;
1138 }
1139 else {
1140 ex = null;
1141 t = r;
1142 }
1143 dst.internalComplete(t, ex);
1144 }
1145 }
1146 private static final long serialVersionUID = 5232453952276885070L;
1147 }
1148
1149 static final class ExceptionCompletion<T> extends Completion {
1150 final CompletableFuture<? extends T> src;
1151 final Function<? super Throwable, ? extends T> fn;
1152 final CompletableFuture<T> dst;
1153 ExceptionCompletion(CompletableFuture<? extends T> src,
1154 Function<? super Throwable, ? extends T> fn,
1155 CompletableFuture<T> dst) {
1156 this.src = src; this.fn = fn; this.dst = dst;
1157 }
1158 public final void run() {
1159 final CompletableFuture<? extends T> a;
1160 final Function<? super Throwable, ? extends T> fn;
1161 final CompletableFuture<T> dst;
1162 Object r; T t = null; Throwable ex, dx = null;
1163 if ((dst = this.dst) != null &&
1164 (fn = this.fn) != null &&
1165 (a = this.src) != null &&
1166 (r = a.result) != null &&
1167 compareAndSet(0, 1)) {
1168 if ((r instanceof AltResult) &&
1169 (ex = ((AltResult)r).ex) != null) {
1170 try {
1171 t = fn.apply(ex);
1172 } catch (Throwable rex) {
1173 dx = rex;
1174 }
1175 }
1176 else {
1177 @SuppressWarnings("unchecked") T tr = (T) r;
1178 t = tr;
1179 }
1180 dst.internalComplete(t, dx);
1181 }
1182 }
1183 private static final long serialVersionUID = 5232453952276885070L;
1184 }
1185
1186 static final class ThenCopy extends Completion {
1187 final CompletableFuture<?> src;
1188 final CompletableFuture<?> dst;
1189 ThenCopy(CompletableFuture<?> src,
1190 CompletableFuture<?> dst) {
1191 this.src = src; this.dst = dst;
1192 }
1193 public final void run() {
1194 final CompletableFuture<?> a;
1195 final CompletableFuture<?> dst;
1196 Object r; Object t; Throwable ex;
1197 if ((dst = this.dst) != null &&
1198 (a = this.src) != null &&
1199 (r = a.result) != null &&
1200 compareAndSet(0, 1)) {
1201 if (r instanceof AltResult) {
1202 ex = ((AltResult)r).ex;
1203 t = null;
1204 }
1205 else {
1206 ex = null;
1207 t = r;
1208 }
1209 dst.internalComplete(t, ex);
1210 }
1211 }
1212 private static final long serialVersionUID = 5232453952276885070L;
1213 }
1214
1215 static final class HandleCompletion<T,U> extends Completion {
1216 final CompletableFuture<? extends T> src;
1217 final BiFunction<? super T, Throwable, ? extends U> fn;
1218 final CompletableFuture<U> dst;
1219 HandleCompletion(CompletableFuture<? extends T> src,
1220 BiFunction<? super T, Throwable, ? extends U> fn,
1221 final CompletableFuture<U> dst) {
1222 this.src = src; this.fn = fn; this.dst = dst;
1223 }
1224 public final void run() {
1225 final CompletableFuture<? extends T> a;
1226 final BiFunction<? super T, Throwable, ? extends U> fn;
1227 final CompletableFuture<U> dst;
1228 Object r; T t; Throwable ex;
1229 if ((dst = this.dst) != null &&
1230 (fn = this.fn) != null &&
1231 (a = this.src) != null &&
1232 (r = a.result) != null &&
1233 compareAndSet(0, 1)) {
1234 if (r instanceof AltResult) {
1235 ex = ((AltResult)r).ex;
1236 t = null;
1237 }
1238 else {
1239 ex = null;
1240 @SuppressWarnings("unchecked") T tr = (T) r;
1241 t = tr;
1242 }
1243 U u = null; Throwable dx = null;
1244 try {
1245 u = fn.apply(t, ex);
1246 } catch (Throwable rex) {
1247 dx = rex;
1248 }
1249 dst.internalComplete(u, dx);
1250 }
1251 }
1252 private static final long serialVersionUID = 5232453952276885070L;
1253 }
1254
1255 static final class ComposeCompletion<T,U> extends Completion {
1256 final CompletableFuture<? extends T> src;
1257 final Function<? super T, CompletableFuture<U>> fn;
1258 final CompletableFuture<U> dst;
1259 final Executor executor;
1260 ComposeCompletion(CompletableFuture<? extends T> src,
1261 Function<? super T, CompletableFuture<U>> fn,
1262 final CompletableFuture<U> dst, Executor executor) {
1263 this.src = src; this.fn = fn; this.dst = dst;
1264 this.executor = executor;
1265 }
1266 public final void run() {
1267 final CompletableFuture<? extends T> a;
1268 final Function<? super T, CompletableFuture<U>> fn;
1269 final CompletableFuture<U> dst;
1270 Object r; T t; Throwable ex; Executor e;
1271 if ((dst = this.dst) != null &&
1272 (fn = this.fn) != null &&
1273 (a = this.src) != null &&
1274 (r = a.result) != null &&
1275 compareAndSet(0, 1)) {
1276 if (r instanceof AltResult) {
1277 ex = ((AltResult)r).ex;
1278 t = null;
1279 }
1280 else {
1281 ex = null;
1282 @SuppressWarnings("unchecked") T tr = (T) r;
1283 t = tr;
1284 }
1285 CompletableFuture<U> c = null;
1286 U u = null;
1287 boolean complete = false;
1288 if (ex == null) {
1289 if ((e = executor) != null)
1290 e.execute(new AsyncCompose<T,U>(t, fn, dst));
1291 else {
1292 try {
1293 if ((c = fn.apply(t)) == null)
1294 ex = new NullPointerException();
1295 } catch (Throwable rex) {
1296 ex = rex;
1297 }
1298 }
1299 }
1300 if (c != null) {
1301 ThenCopy d = null;
1302 Object s;
1303 if ((s = c.result) == null) {
1304 CompletionNode p = new CompletionNode
1305 (d = new ThenCopy(c, dst));
1306 while ((s = c.result) == null) {
1307 if (UNSAFE.compareAndSwapObject
1308 (c, COMPLETIONS, p.next = c.completions, p))
1309 break;
1310 }
1311 }
1312 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1313 complete = true;
1314 if (s instanceof AltResult) {
1315 ex = ((AltResult)s).ex; // no rewrap
1316 u = null;
1317 }
1318 else {
1319 @SuppressWarnings("unchecked") U us = (U) s;
1320 u = us;
1321 }
1322 }
1323 }
1324 if (complete || ex != null)
1325 dst.internalComplete(u, ex);
1326 if (c != null)
1327 c.helpPostComplete();
1328 }
1329 }
1330 private static final long serialVersionUID = 5232453952276885070L;
1331 }
1332
1333 // public methods
1334
1335 /**
1336 * Creates a new incomplete CompletableFuture.
1337 */
1338 public CompletableFuture() {
1339 }
1340
1341 /**
1342 * Asynchronously executes in the {@link
1343 * ForkJoinPool#commonPool()}, a task that completes the returned
1344 * CompletableFuture with the result of the given Supplier.
1345 *
1346 * @param supplier a function returning the value to be used
1347 * to complete the returned CompletableFuture
1348 * @return the CompletableFuture
1349 */
1350 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1351 if (supplier == null) throw new NullPointerException();
1352 CompletableFuture<U> f = new CompletableFuture<U>();
1353 ForkJoinPool.commonPool().
1354 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1355 return f;
1356 }
1357
1358 /**
1359 * Asynchronously executes using the given executor, a task that
1360 * completes the returned CompletableFuture with the result of the
1361 * given Supplier.
1362 *
1363 * @param supplier a function returning the value to be used
1364 * to complete the returned CompletableFuture
1365 * @param executor the executor to use for asynchronous execution
1366 * @return the CompletableFuture
1367 */
1368 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1369 Executor executor) {
1370 if (executor == null || supplier == null)
1371 throw new NullPointerException();
1372 CompletableFuture<U> f = new CompletableFuture<U>();
1373 executor.execute(new AsyncSupply<U>(supplier, f));
1374 return f;
1375 }
1376
1377 /**
1378 * Asynchronously executes in the {@link
1379 * ForkJoinPool#commonPool()} a task that runs the given action,
1380 * and then completes the returned CompletableFuture.
1381 *
1382 * @param runnable the action to run before completing the
1383 * returned CompletableFuture
1384 * @return the CompletableFuture
1385 */
1386 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1387 if (runnable == null) throw new NullPointerException();
1388 CompletableFuture<Void> f = new CompletableFuture<Void>();
1389 ForkJoinPool.commonPool().
1390 execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1391 return f;
1392 }
1393
1394 /**
1395 * Asynchronously executes using the given executor, a task that
1396 * runs the given action, and then completes the returned
1397 * CompletableFuture.
1398 *
1399 * @param runnable the action to run before completing the
1400 * returned CompletableFuture
1401 * @param executor the executor to use for asynchronous execution
1402 * @return the CompletableFuture
1403 */
1404 public static CompletableFuture<Void> runAsync(Runnable runnable,
1405 Executor executor) {
1406 if (executor == null || runnable == null)
1407 throw new NullPointerException();
1408 CompletableFuture<Void> f = new CompletableFuture<Void>();
1409 executor.execute(new AsyncRun(runnable, f));
1410 return f;
1411 }
1412
1413 /**
1414 * Returns {@code true} if completed in any fashion: normally,
1415 * exceptionally, or via cancellation.
1416 *
1417 * @return {@code true} if completed
1418 */
1419 public boolean isDone() {
1420 return result != null;
1421 }
1422
1423 /**
1424 * Waits if necessary for this future to complete, and then
1425 * returns its result.
1426 *
1427 * @return the result value
1428 * @throws CancellationException if this future was cancelled
1429 * @throws ExecutionException if this future completed exceptionally
1430 * @throws InterruptedException if the current thread was interrupted
1431 * while waiting
1432 */
1433 public T get() throws InterruptedException, ExecutionException {
1434 Object r; Throwable ex, cause;
1435 if ((r = result) == null && (r = waitingGet(true)) == null)
1436 throw new InterruptedException();
1437 if (!(r instanceof AltResult)) {
1438 @SuppressWarnings("unchecked") T tr = (T) r;
1439 return tr;
1440 }
1441 if ((ex = ((AltResult)r).ex) == null)
1442 return null;
1443 if (ex instanceof CancellationException)
1444 throw (CancellationException)ex;
1445 if ((ex instanceof CompletionException) &&
1446 (cause = ex.getCause()) != null)
1447 ex = cause;
1448 throw new ExecutionException(ex);
1449 }
1450
1451 /**
1452 * Waits if necessary for at most the given time for this future
1453 * to complete, and then returns its result, if available.
1454 *
1455 * @param timeout the maximum time to wait
1456 * @param unit the time unit of the timeout argument
1457 * @return the result value
1458 * @throws CancellationException if this future was cancelled
1459 * @throws ExecutionException if this future completed exceptionally
1460 * @throws InterruptedException if the current thread was interrupted
1461 * while waiting
1462 * @throws TimeoutException if the wait timed out
1463 */
1464 public T get(long timeout, TimeUnit unit)
1465 throws InterruptedException, ExecutionException, TimeoutException {
1466 Object r; Throwable ex, cause;
1467 long nanos = unit.toNanos(timeout);
1468 if (Thread.interrupted())
1469 throw new InterruptedException();
1470 if ((r = result) == null)
1471 r = timedAwaitDone(nanos);
1472 if (!(r instanceof AltResult)) {
1473 @SuppressWarnings("unchecked") T tr = (T) r;
1474 return tr;
1475 }
1476 if ((ex = ((AltResult)r).ex) == null)
1477 return null;
1478 if (ex instanceof CancellationException)
1479 throw (CancellationException)ex;
1480 if ((ex instanceof CompletionException) &&
1481 (cause = ex.getCause()) != null)
1482 ex = cause;
1483 throw new ExecutionException(ex);
1484 }
1485
1486 /**
1487 * Returns the result value when complete, or throws an
1488 * (unchecked) exception if completed exceptionally. To better
1489 * conform with the use of common functional forms, if a
1490 * computation involved in the completion of this
1491 * CompletableFuture threw an exception, this method throws an
1492 * (unchecked) {@link CompletionException} with the underlying
1493 * exception as its cause.
1494 *
1495 * @return the result value
1496 * @throws CancellationException if the computation was cancelled
1497 * @throws CompletionException if this future completed
1498 * exceptionally or a completion computation threw an exception
1499 */
1500 public T join() {
1501 Object r; Throwable ex;
1502 if ((r = result) == null)
1503 r = waitingGet(false);
1504 if (!(r instanceof AltResult)) {
1505 @SuppressWarnings("unchecked") T tr = (T) r;
1506 return tr;
1507 }
1508 if ((ex = ((AltResult)r).ex) == null)
1509 return null;
1510 if (ex instanceof CancellationException)
1511 throw (CancellationException)ex;
1512 if (ex instanceof CompletionException)
1513 throw (CompletionException)ex;
1514 throw new CompletionException(ex);
1515 }
1516
1517 /**
1518 * Returns the result value (or throws any encountered exception)
1519 * if completed, else returns the given valueIfAbsent.
1520 *
1521 * @param valueIfAbsent the value to return if not completed
1522 * @return the result value, if completed, else the given valueIfAbsent
1523 * @throws CancellationException if the computation was cancelled
1524 * @throws CompletionException if this future completed
1525 * exceptionally or a completion computation threw an exception
1526 */
1527 public T getNow(T valueIfAbsent) {
1528 Object r; Throwable ex;
1529 if ((r = result) == null)
1530 return valueIfAbsent;
1531 if (!(r instanceof AltResult)) {
1532 @SuppressWarnings("unchecked") T tr = (T) r;
1533 return tr;
1534 }
1535 if ((ex = ((AltResult)r).ex) == null)
1536 return null;
1537 if (ex instanceof CancellationException)
1538 throw (CancellationException)ex;
1539 if (ex instanceof CompletionException)
1540 throw (CompletionException)ex;
1541 throw new CompletionException(ex);
1542 }
1543
1544 /**
1545 * If not already completed, sets the value returned by {@link
1546 * #get()} and related methods to the given value.
1547 *
1548 * @param value the result value
1549 * @return {@code true} if this invocation caused this CompletableFuture
1550 * to transition to a completed state, else {@code false}
1551 */
1552 public boolean complete(T value) {
1553 boolean triggered = result == null &&
1554 UNSAFE.compareAndSwapObject(this, RESULT, null,
1555 value == null ? NIL : value);
1556 postComplete();
1557 return triggered;
1558 }
1559
1560 /**
1561 * If not already completed, causes invocations of {@link #get()}
1562 * and related methods to throw the given exception.
1563 *
1564 * @param ex the exception
1565 * @return {@code true} if this invocation caused this CompletableFuture
1566 * to transition to a completed state, else {@code false}
1567 */
1568 public boolean completeExceptionally(Throwable ex) {
1569 if (ex == null) throw new NullPointerException();
1570 boolean triggered = result == null &&
1571 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1572 postComplete();
1573 return triggered;
1574 }
1575
1576 /**
1577 * Creates and returns a CompletableFuture that is completed with
1578 * the result of the given function of this CompletableFuture.
1579 * If this CompletableFuture completes exceptionally,
1580 * then the returned CompletableFuture also does so,
1581 * with a CompletionException holding this exception as
1582 * its cause.
1583 *
1584 * @param fn the function to use to compute the value of
1585 * the returned CompletableFuture
1586 * @return the new CompletableFuture
1587 */
1588 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
1589 return doThenApply(fn, null);
1590 }
1591
1592 /**
1593 * Creates and returns a CompletableFuture that is asynchronously
1594 * completed using the {@link ForkJoinPool#commonPool()} with the
1595 * result of the given function of this CompletableFuture. If
1596 * this CompletableFuture completes exceptionally, then the
1597 * returned CompletableFuture also does so, with a
1598 * CompletionException holding this exception as its cause.
1599 *
1600 * @param fn the function to use to compute the value of
1601 * the returned CompletableFuture
1602 * @return the new CompletableFuture
1603 */
1604 public <U> CompletableFuture<U> thenApplyAsync
1605 (Function<? super T,? extends U> fn) {
1606 return doThenApply(fn, ForkJoinPool.commonPool());
1607 }
1608
1609 /**
1610 * Creates and returns a CompletableFuture that is asynchronously
1611 * completed using the given executor with the result of the given
1612 * function of this CompletableFuture. If this CompletableFuture
1613 * completes exceptionally, then the returned CompletableFuture
1614 * also does so, with a CompletionException holding this exception as
1615 * its cause.
1616 *
1617 * @param fn the function to use to compute the value of
1618 * the returned CompletableFuture
1619 * @param executor the executor to use for asynchronous execution
1620 * @return the new CompletableFuture
1621 */
1622 public <U> CompletableFuture<U> thenApplyAsync
1623 (Function<? super T,? extends U> fn,
1624 Executor executor) {
1625 if (executor == null) throw new NullPointerException();
1626 return doThenApply(fn, executor);
1627 }
1628
1629 private <U> CompletableFuture<U> doThenApply
1630 (Function<? super T,? extends U> fn,
1631 Executor e) {
1632 if (fn == null) throw new NullPointerException();
1633 CompletableFuture<U> dst = new CompletableFuture<U>();
1634 ApplyCompletion<T,U> d = null;
1635 Object r;
1636 if ((r = result) == null) {
1637 CompletionNode p = new CompletionNode
1638 (d = new ApplyCompletion<T,U>(this, fn, dst, e));
1639 while ((r = result) == null) {
1640 if (UNSAFE.compareAndSwapObject
1641 (this, COMPLETIONS, p.next = completions, p))
1642 break;
1643 }
1644 }
1645 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1646 T t; Throwable ex;
1647 if (r instanceof AltResult) {
1648 ex = ((AltResult)r).ex;
1649 t = null;
1650 }
1651 else {
1652 ex = null;
1653 @SuppressWarnings("unchecked") T tr = (T) r;
1654 t = tr;
1655 }
1656 U u = null;
1657 if (ex == null) {
1658 try {
1659 if (e != null)
1660 e.execute(new AsyncApply<T,U>(t, fn, dst));
1661 else
1662 u = fn.apply(t);
1663 } catch (Throwable rex) {
1664 ex = rex;
1665 }
1666 }
1667 if (e == null || ex != null)
1668 dst.internalComplete(u, ex);
1669 }
1670 helpPostComplete();
1671 return dst;
1672 }
1673
1674 /**
1675 * Creates and returns a CompletableFuture that is completed after
1676 * performing the given action with this CompletableFuture's
1677 * result when it completes. If this CompletableFuture
1678 * completes exceptionally, then the returned CompletableFuture
1679 * also does so, with a CompletionException holding this exception as
1680 * its cause.
1681 *
1682 * @param block the action to perform before completing the
1683 * returned CompletableFuture
1684 * @return the new CompletableFuture
1685 */
1686 public CompletableFuture<Void> thenAccept(Consumer<? super T> block) {
1687 return doThenAccept(block, null);
1688 }
1689
1690 /**
1691 * Creates and returns a CompletableFuture that is asynchronously
1692 * completed using the {@link ForkJoinPool#commonPool()} with this
1693 * CompletableFuture's result when it completes. If this
1694 * CompletableFuture completes exceptionally, then the returned
1695 * CompletableFuture also does so, with a CompletionException holding
1696 * this exception as its cause.
1697 *
1698 * @param block the action to perform before completing the
1699 * returned CompletableFuture
1700 * @return the new CompletableFuture
1701 */
1702 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block) {
1703 return doThenAccept(block, ForkJoinPool.commonPool());
1704 }
1705
1706 /**
1707 * Creates and returns a CompletableFuture that is asynchronously
1708 * completed using the given executor with this
1709 * CompletableFuture's result when it completes. If this
1710 * CompletableFuture completes exceptionally, then the returned
1711 * CompletableFuture also does so, with a CompletionException holding
1712 * this exception as its cause.
1713 *
1714 * @param block the action to perform before completing the
1715 * returned CompletableFuture
1716 * @param executor the executor to use for asynchronous execution
1717 * @return the new CompletableFuture
1718 */
1719 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block,
1720 Executor executor) {
1721 if (executor == null) throw new NullPointerException();
1722 return doThenAccept(block, executor);
1723 }
1724
1725 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1726 Executor e) {
1727 if (fn == null) throw new NullPointerException();
1728 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1729 AcceptCompletion<T> d = null;
1730 Object r;
1731 if ((r = result) == null) {
1732 CompletionNode p = new CompletionNode
1733 (d = new AcceptCompletion<T>(this, fn, dst, e));
1734 while ((r = result) == null) {
1735 if (UNSAFE.compareAndSwapObject
1736 (this, COMPLETIONS, p.next = completions, p))
1737 break;
1738 }
1739 }
1740 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1741 T t; Throwable ex;
1742 if (r instanceof AltResult) {
1743 ex = ((AltResult)r).ex;
1744 t = null;
1745 }
1746 else {
1747 ex = null;
1748 @SuppressWarnings("unchecked") T tr = (T) r;
1749 t = tr;
1750 }
1751 if (ex == null) {
1752 try {
1753 if (e != null)
1754 e.execute(new AsyncAccept<T>(t, fn, dst));
1755 else
1756 fn.accept(t);
1757 } catch (Throwable rex) {
1758 ex = rex;
1759 }
1760 }
1761 if (e == null || ex != null)
1762 dst.internalComplete(null, ex);
1763 }
1764 helpPostComplete();
1765 return dst;
1766 }
1767
1768 /**
1769 * Creates and returns a CompletableFuture that is completed after
1770 * performing the given action when this CompletableFuture
1771 * completes. If this CompletableFuture completes exceptionally,
1772 * then the returned CompletableFuture also does so, with a
1773 * CompletionException holding this exception as its cause.
1774 *
1775 * @param action the action to perform before completing the
1776 * returned CompletableFuture
1777 * @return the new CompletableFuture
1778 */
1779 public CompletableFuture<Void> thenRun(Runnable action) {
1780 return doThenRun(action, null);
1781 }
1782
1783 /**
1784 * Creates and returns a CompletableFuture that is asynchronously
1785 * completed using the {@link ForkJoinPool#commonPool()} after
1786 * performing the given action when this CompletableFuture
1787 * completes. If this CompletableFuture completes exceptionally,
1788 * then the returned CompletableFuture also does so, with a
1789 * CompletionException holding this exception as its cause.
1790 *
1791 * @param action the action to perform before completing the
1792 * returned CompletableFuture
1793 * @return the new CompletableFuture
1794 */
1795 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1796 return doThenRun(action, ForkJoinPool.commonPool());
1797 }
1798
1799 /**
1800 * Creates and returns a CompletableFuture that is asynchronously
1801 * completed using the given executor after performing the given
1802 * action when this CompletableFuture completes. If this
1803 * CompletableFuture completes exceptionally, then the returned
1804 * CompletableFuture also does so, with a CompletionException holding
1805 * this exception as its cause.
1806 *
1807 * @param action the action to perform before completing the
1808 * returned CompletableFuture
1809 * @param executor the executor to use for asynchronous execution
1810 * @return the new CompletableFuture
1811 */
1812 public CompletableFuture<Void> thenRunAsync(Runnable action,
1813 Executor executor) {
1814 if (executor == null) throw new NullPointerException();
1815 return doThenRun(action, executor);
1816 }
1817
1818 private CompletableFuture<Void> doThenRun(Runnable action,
1819 Executor e) {
1820 if (action == null) throw new NullPointerException();
1821 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1822 RunCompletion<T> d = null;
1823 Object r;
1824 if ((r = result) == null) {
1825 CompletionNode p = new CompletionNode
1826 (d = new RunCompletion<T>(this, action, dst, e));
1827 while ((r = result) == null) {
1828 if (UNSAFE.compareAndSwapObject
1829 (this, COMPLETIONS, p.next = completions, p))
1830 break;
1831 }
1832 }
1833 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1834 Throwable ex;
1835 if (r instanceof AltResult)
1836 ex = ((AltResult)r).ex;
1837 else
1838 ex = null;
1839 if (ex == null) {
1840 try {
1841 if (e != null)
1842 e.execute(new AsyncRun(action, dst));
1843 else
1844 action.run();
1845 } catch (Throwable rex) {
1846 ex = rex;
1847 }
1848 }
1849 if (e == null || ex != null)
1850 dst.internalComplete(null, ex);
1851 }
1852 helpPostComplete();
1853 return dst;
1854 }
1855
1856 /**
1857 * Creates and returns a CompletableFuture that is completed with
1858 * the result of the given function of this and the other given
1859 * CompletableFuture's results when both complete. If this or
1860 * the other CompletableFuture complete exceptionally, then the
1861 * returned CompletableFuture also does so, with a
1862 * CompletionException holding the exception as its cause.
1863 *
1864 * @param other the other CompletableFuture
1865 * @param fn the function to use to compute the value of
1866 * the returned CompletableFuture
1867 * @return the new CompletableFuture
1868 */
1869 public <U,V> CompletableFuture<V> thenCombine
1870 (CompletableFuture<? extends U> other,
1871 BiFunction<? super T,? super U,? extends V> fn) {
1872 return doThenBiApply(other, fn, null);
1873 }
1874
1875 /**
1876 * Creates and returns a CompletableFuture that is asynchronously
1877 * completed using the {@link ForkJoinPool#commonPool()} with
1878 * the result of the given function of this and the other given
1879 * CompletableFuture's results when both complete. If this or
1880 * the other CompletableFuture complete exceptionally, then the
1881 * returned CompletableFuture also does so, with a
1882 * CompletionException holding the exception as its cause.
1883 *
1884 * @param other the other CompletableFuture
1885 * @param fn the function to use to compute the value of
1886 * the returned CompletableFuture
1887 * @return the new CompletableFuture
1888 */
1889 public <U,V> CompletableFuture<V> thenCombineAsync
1890 (CompletableFuture<? extends U> other,
1891 BiFunction<? super T,? super U,? extends V> fn) {
1892 return doThenBiApply(other, fn, ForkJoinPool.commonPool());
1893 }
1894
1895 /**
1896 * Creates and returns a CompletableFuture that is
1897 * asynchronously completed using the given executor with the
1898 * result of the given function of this and the other given
1899 * CompletableFuture's results when both complete. If this or
1900 * the other CompletableFuture complete exceptionally, then the
1901 * returned CompletableFuture also does so, with a
1902 * CompletionException holding the exception as its cause.
1903 *
1904 * @param other the other CompletableFuture
1905 * @param fn the function to use to compute the value of
1906 * the returned CompletableFuture
1907 * @param executor the executor to use for asynchronous execution
1908 * @return the new CompletableFuture
1909 */
1910 public <U,V> CompletableFuture<V> thenCombineAsync
1911 (CompletableFuture<? extends U> other,
1912 BiFunction<? super T,? super U,? extends V> fn,
1913 Executor executor) {
1914 if (executor == null) throw new NullPointerException();
1915 return doThenBiApply(other, fn, executor);
1916 }
1917
1918 private <U,V> CompletableFuture<V> doThenBiApply
1919 (CompletableFuture<? extends U> other,
1920 BiFunction<? super T,? super U,? extends V> fn,
1921 Executor e) {
1922 if (other == null || fn == null) throw new NullPointerException();
1923 CompletableFuture<V> dst = new CompletableFuture<V>();
1924 BiApplyCompletion<T,U,V> d = null;
1925 Object r, s = null;
1926 if ((r = result) == null || (s = other.result) == null) {
1927 d = new BiApplyCompletion<T,U,V>(this, other, fn, dst, e);
1928 CompletionNode q = null, p = new CompletionNode(d);
1929 while ((r == null && (r = result) == null) ||
1930 (s == null && (s = other.result) == null)) {
1931 if (q != null) {
1932 if (s != null ||
1933 UNSAFE.compareAndSwapObject
1934 (other, COMPLETIONS, q.next = other.completions, q))
1935 break;
1936 }
1937 else if (r != null ||
1938 UNSAFE.compareAndSwapObject
1939 (this, COMPLETIONS, p.next = completions, p)) {
1940 if (s != null)
1941 break;
1942 q = new CompletionNode(d);
1943 }
1944 }
1945 }
1946 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1947 T t; U u; Throwable ex;
1948 if (r instanceof AltResult) {
1949 ex = ((AltResult)r).ex;
1950 t = null;
1951 }
1952 else {
1953 ex = null;
1954 @SuppressWarnings("unchecked") T tr = (T) r;
1955 t = tr;
1956 }
1957 if (ex != null)
1958 u = null;
1959 else if (s instanceof AltResult) {
1960 ex = ((AltResult)s).ex;
1961 u = null;
1962 }
1963 else {
1964 @SuppressWarnings("unchecked") U us = (U) s;
1965 u = us;
1966 }
1967 V v = null;
1968 if (ex == null) {
1969 try {
1970 if (e != null)
1971 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
1972 else
1973 v = fn.apply(t, u);
1974 } catch (Throwable rex) {
1975 ex = rex;
1976 }
1977 }
1978 if (e == null || ex != null)
1979 dst.internalComplete(v, ex);
1980 }
1981 helpPostComplete();
1982 other.helpPostComplete();
1983 return dst;
1984 }
1985
1986 /**
1987 * Creates and returns a CompletableFuture that is completed with
1988 * the results of this and the other given CompletableFuture if
1989 * both complete. If this and/or the other CompletableFuture
1990 * complete exceptionally, then the returned CompletableFuture
1991 * also does so, with a CompletionException holding one of these
1992 * exceptions as its cause.
1993 *
1994 * @param other the other CompletableFuture
1995 * @param block the action to perform before completing the
1996 * returned CompletableFuture
1997 * @return the new CompletableFuture
1998 */
1999 public <U> CompletableFuture<Void> thenAcceptBoth
2000 (CompletableFuture<? extends U> other,
2001 BiConsumer<? super T, ? super U> block) {
2002 return doThenBiAccept(other, block, null);
2003 }
2004
2005 /**
2006 * Creates and returns a CompletableFuture that is completed
2007 * asynchronously using the {@link ForkJoinPool#commonPool()} with
2008 * the results of this and the other given CompletableFuture when
2009 * both complete. If this and/or the other CompletableFuture
2010 * complete exceptionally, then the returned CompletableFuture
2011 * also does so, with a CompletionException holding one of these
2012 * exceptions as its cause.
2013 *
2014 * @param other the other CompletableFuture
2015 * @param block the action to perform before completing the
2016 * returned CompletableFuture
2017 * @return the new CompletableFuture
2018 */
2019 public <U> CompletableFuture<Void> thenAcceptBothAsync
2020 (CompletableFuture<? extends U> other,
2021 BiConsumer<? super T, ? super U> block) {
2022 return doThenBiAccept(other, block, ForkJoinPool.commonPool());
2023 }
2024
2025 /**
2026 * Creates and returns a CompletableFuture that is completed
2027 * asynchronously using the given executor with the results of
2028 * this and the other given CompletableFuture when both complete.
2029 * If this and/or the other CompletableFuture complete exceptionally,
2030 * then the returned CompletableFuture also does so, with a
2031 * CompletionException holding one of these exceptions as its cause.
2032 *
2033 * @param other the other CompletableFuture
2034 * @param block the action to perform before completing the
2035 * returned CompletableFuture
2036 * @param executor the executor to use for asynchronous execution
2037 * @return the new CompletableFuture
2038 */
2039 public <U> CompletableFuture<Void> thenAcceptBothAsync
2040 (CompletableFuture<? extends U> other,
2041 BiConsumer<? super T, ? super U> block,
2042 Executor executor) {
2043 if (executor == null) throw new NullPointerException();
2044 return doThenBiAccept(other, block, executor);
2045 }
2046
2047 private <U> CompletableFuture<Void> doThenBiAccept
2048 (CompletableFuture<? extends U> other,
2049 BiConsumer<? super T,? super U> fn,
2050 Executor e) {
2051 if (other == null || fn == null) throw new NullPointerException();
2052 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2053 BiAcceptCompletion<T,U> d = null;
2054 Object r, s = null;
2055 if ((r = result) == null || (s = other.result) == null) {
2056 d = new BiAcceptCompletion<T,U>(this, other, fn, dst, e);
2057 CompletionNode q = null, p = new CompletionNode(d);
2058 while ((r == null && (r = result) == null) ||
2059 (s == null && (s = other.result) == null)) {
2060 if (q != null) {
2061 if (s != null ||
2062 UNSAFE.compareAndSwapObject
2063 (other, COMPLETIONS, q.next = other.completions, q))
2064 break;
2065 }
2066 else if (r != null ||
2067 UNSAFE.compareAndSwapObject
2068 (this, COMPLETIONS, p.next = completions, p)) {
2069 if (s != null)
2070 break;
2071 q = new CompletionNode(d);
2072 }
2073 }
2074 }
2075 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2076 T t; U u; Throwable ex;
2077 if (r instanceof AltResult) {
2078 ex = ((AltResult)r).ex;
2079 t = null;
2080 }
2081 else {
2082 ex = null;
2083 @SuppressWarnings("unchecked") T tr = (T) r;
2084 t = tr;
2085 }
2086 if (ex != null)
2087 u = null;
2088 else if (s instanceof AltResult) {
2089 ex = ((AltResult)s).ex;
2090 u = null;
2091 }
2092 else {
2093 @SuppressWarnings("unchecked") U us = (U) s;
2094 u = us;
2095 }
2096 if (ex == null) {
2097 try {
2098 if (e != null)
2099 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
2100 else
2101 fn.accept(t, u);
2102 } catch (Throwable rex) {
2103 ex = rex;
2104 }
2105 }
2106 if (e == null || ex != null)
2107 dst.internalComplete(null, ex);
2108 }
2109 helpPostComplete();
2110 other.helpPostComplete();
2111 return dst;
2112 }
2113
2114 /**
2115 * Creates and returns a CompletableFuture that is completed when
2116 * this and the other given CompletableFuture both complete.
2117 * If this and/or the other CompletableFuture complete exceptionally,
2118 * then the returned CompletableFuture also does so, with a
2119 * CompletionException holding one of these exceptions as its cause.
2120 *
2121 * @param other the other CompletableFuture
2122 * @param action the action to perform before completing the
2123 * returned CompletableFuture
2124 * @return the new CompletableFuture
2125 */
2126 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2127 Runnable action) {
2128 return doThenBiRun(other, action, null);
2129 }
2130
2131 /**
2132 * Creates and returns a CompletableFuture that is completed
2133 * asynchronously using the {@link ForkJoinPool#commonPool()}
2134 * when this and the other given CompletableFuture both complete.
2135 * If this and/or the other CompletableFuture complete exceptionally,
2136 * then the returned CompletableFuture also does so, with a
2137 * CompletionException holding one of these exceptions as its cause.
2138 *
2139 * @param other the other CompletableFuture
2140 * @param action the action to perform before completing the
2141 * returned CompletableFuture
2142 * @return the new CompletableFuture
2143 */
2144 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2145 Runnable action) {
2146 return doThenBiRun(other, action, ForkJoinPool.commonPool());
2147 }
2148
2149 /**
2150 * Creates and returns a CompletableFuture that is completed
2151 * asynchronously using the given executor when this and the
2152 * other given CompletableFuture both complete.
2153 * If this and/or the other CompletableFuture complete exceptionally,
2154 * then the returned CompletableFuture also does so, with a
2155 * CompletionException holding one of these exceptions as its cause.
2156 *
2157 * @param other the other CompletableFuture
2158 * @param action the action to perform before completing the
2159 * returned CompletableFuture
2160 * @param executor the executor to use for asynchronous execution
2161 * @return the new CompletableFuture
2162 */
2163 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2164 Runnable action,
2165 Executor executor) {
2166 if (executor == null) throw new NullPointerException();
2167 return doThenBiRun(other, action, executor);
2168 }
2169
2170 private CompletableFuture<Void> doThenBiRun(CompletableFuture<?> other,
2171 Runnable action,
2172 Executor e) {
2173 if (other == null || action == null) throw new NullPointerException();
2174 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2175 BiRunCompletion<T> d = null;
2176 Object r, s = null;
2177 if ((r = result) == null || (s = other.result) == null) {
2178 d = new BiRunCompletion<T>(this, other, action, dst, e);
2179 CompletionNode q = null, p = new CompletionNode(d);
2180 while ((r == null && (r = result) == null) ||
2181 (s == null && (s = other.result) == null)) {
2182 if (q != null) {
2183 if (s != null ||
2184 UNSAFE.compareAndSwapObject
2185 (other, COMPLETIONS, q.next = other.completions, q))
2186 break;
2187 }
2188 else if (r != null ||
2189 UNSAFE.compareAndSwapObject
2190 (this, COMPLETIONS, p.next = completions, p)) {
2191 if (s != null)
2192 break;
2193 q = new CompletionNode(d);
2194 }
2195 }
2196 }
2197 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2198 Throwable ex;
2199 if (r instanceof AltResult)
2200 ex = ((AltResult)r).ex;
2201 else
2202 ex = null;
2203 if (ex == null && (s instanceof AltResult))
2204 ex = ((AltResult)s).ex;
2205 if (ex == null) {
2206 try {
2207 if (e != null)
2208 e.execute(new AsyncRun(action, dst));
2209 else
2210 action.run();
2211 } catch (Throwable rex) {
2212 ex = rex;
2213 }
2214 }
2215 if (e == null || ex != null)
2216 dst.internalComplete(null, ex);
2217 }
2218 helpPostComplete();
2219 other.helpPostComplete();
2220 return dst;
2221 }
2222
2223 /**
2224 * Creates and returns a CompletableFuture that is completed with
2225 * the result of the given function of either this or the other
2226 * given CompletableFuture's results when either complete.
2227 * If this and/or the other CompletableFuture complete exceptionally,
2228 * then the returned CompletableFuture may also do so, with a
2229 * CompletionException holding one of these exceptions as its cause.
2230 * No guarantees are made about which result or exception is used
2231 * in the returned CompletableFuture.
2232 *
2233 * @param other the other CompletableFuture
2234 * @param fn the function to use to compute the value of
2235 * the returned CompletableFuture
2236 * @return the new CompletableFuture
2237 */
2238 public <U> CompletableFuture<U> applyToEither
2239 (CompletableFuture<? extends T> other,
2240 Function<? super T, U> fn) {
2241 return doOrApply(other, fn, null);
2242 }
2243
2244 /**
2245 * Creates and returns a CompletableFuture that is completed
2246 * asynchronously using the {@link ForkJoinPool#commonPool()} with
2247 * the result of the given function of either this or the other
2248 * given CompletableFuture's results when either complete.
2249 * If this and/or the other CompletableFuture complete exceptionally,
2250 * then the returned CompletableFuture may also do so, with a
2251 * CompletionException holding one of these exceptions as its cause.
2252 * No guarantees are made about which result or exception is used
2253 * in the returned CompletableFuture.
2254 *
2255 * @param other the other CompletableFuture
2256 * @param fn the function to use to compute the value of
2257 * the returned CompletableFuture
2258 * @return the new CompletableFuture
2259 */
2260 public <U> CompletableFuture<U> applyToEitherAsync
2261 (CompletableFuture<? extends T> other,
2262 Function<? super T, U> fn) {
2263 return doOrApply(other, fn, ForkJoinPool.commonPool());
2264 }
2265
2266 /**
2267 * Creates and returns a CompletableFuture that is completed
2268 * asynchronously using the given executor with the result of the
2269 * given function of either this or the other given
2270 * CompletableFuture's results when either complete. If this
2271 * and/or the other CompletableFuture complete exceptionally, then
2272 * the returned CompletableFuture may also do so, with a
2273 * CompletionException holding one of these exceptions as its cause.
2274 * No guarantees are made about which result or exception is used
2275 * in the returned CompletableFuture.
2276 *
2277 * @param other the other CompletableFuture
2278 * @param fn the function to use to compute the value of
2279 * the returned CompletableFuture
2280 * @param executor the executor to use for asynchronous execution
2281 * @return the new CompletableFuture
2282 */
2283 public <U> CompletableFuture<U> applyToEitherAsync
2284 (CompletableFuture<? extends T> other,
2285 Function<? super T, U> fn,
2286 Executor executor) {
2287 if (executor == null) throw new NullPointerException();
2288 return doOrApply(other, fn, executor);
2289 }
2290
2291 private <U> CompletableFuture<U> doOrApply
2292 (CompletableFuture<? extends T> other,
2293 Function<? super T, U> fn,
2294 Executor e) {
2295 if (other == null || fn == null) throw new NullPointerException();
2296 CompletableFuture<U> dst = new CompletableFuture<U>();
2297 OrApplyCompletion<T,U> d = null;
2298 Object r;
2299 if ((r = result) == null && (r = other.result) == null) {
2300 d = new OrApplyCompletion<T,U>(this, other, fn, dst, e);
2301 CompletionNode q = null, p = new CompletionNode(d);
2302 while ((r = result) == null && (r = other.result) == null) {
2303 if (q != null) {
2304 if (UNSAFE.compareAndSwapObject
2305 (other, COMPLETIONS, q.next = other.completions, q))
2306 break;
2307 }
2308 else if (UNSAFE.compareAndSwapObject
2309 (this, COMPLETIONS, p.next = completions, p))
2310 q = new CompletionNode(d);
2311 }
2312 }
2313 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2314 T t; Throwable ex;
2315 if (r instanceof AltResult) {
2316 ex = ((AltResult)r).ex;
2317 t = null;
2318 }
2319 else {
2320 ex = null;
2321 @SuppressWarnings("unchecked") T tr = (T) r;
2322 t = tr;
2323 }
2324 U u = null;
2325 if (ex == null) {
2326 try {
2327 if (e != null)
2328 e.execute(new AsyncApply<T,U>(t, fn, dst));
2329 else
2330 u = fn.apply(t);
2331 } catch (Throwable rex) {
2332 ex = rex;
2333 }
2334 }
2335 if (e == null || ex != null)
2336 dst.internalComplete(u, ex);
2337 }
2338 helpPostComplete();
2339 other.helpPostComplete();
2340 return dst;
2341 }
2342
2343 /**
2344 * Creates and returns a CompletableFuture that is completed after
2345 * performing the given action with the result of either this or the
2346 * other given CompletableFuture's result, when either complete.
2347 * If this and/or the other CompletableFuture complete exceptionally,
2348 * then the returned CompletableFuture may also do so, with a
2349 * CompletionException holding one of these exceptions as its cause.
2350 * No guarantees are made about which exception is used in the
2351 * returned CompletableFuture.
2352 *
2353 * @param other the other CompletableFuture
2354 * @param block the action to perform before completing the
2355 * returned CompletableFuture
2356 * @return the new CompletableFuture
2357 */
2358 public CompletableFuture<Void> acceptEither
2359 (CompletableFuture<? extends T> other,
2360 Consumer<? super T> block) {
2361 return doOrAccept(other, block, null);
2362 }
2363
2364 /**
2365 * Creates and returns a CompletableFuture that is completed
2366 * asynchronously using the {@link ForkJoinPool#commonPool()},
2367 * performing the given action with the result of either this or
2368 * the other given CompletableFuture's result, when either complete.
2369 * If this and/or the other CompletableFuture complete exceptionally,
2370 * then the returned CompletableFuture may also do so, with a
2371 * CompletionException holding one of these exceptions as its cause.
2372 * No guarantees are made about which exception is used in the
2373 * returned CompletableFuture.
2374 *
2375 * @param other the other CompletableFuture
2376 * @param block the action to perform before completing the
2377 * returned CompletableFuture
2378 * @return the new CompletableFuture
2379 */
2380 public CompletableFuture<Void> acceptEitherAsync
2381 (CompletableFuture<? extends T> other,
2382 Consumer<? super T> block) {
2383 return doOrAccept(other, block, ForkJoinPool.commonPool());
2384 }
2385
2386 /**
2387 * Creates and returns a CompletableFuture that is completed
2388 * asynchronously using the given executor, performing the given
2389 * action with the result of either this or the other given
2390 * CompletableFuture's result, when either complete.
2391 * If this and/or the other CompletableFuture complete exceptionally,
2392 * then the returned CompletableFuture may also do so, with a
2393 * CompletionException holding one of these exceptions as its cause.
2394 * No guarantees are made about which exception is used in the
2395 * returned CompletableFuture.
2396 *
2397 * @param other the other CompletableFuture
2398 * @param block the action to perform before completing the
2399 * returned CompletableFuture
2400 * @param executor the executor to use for asynchronous execution
2401 * @return the new CompletableFuture
2402 */
2403 public CompletableFuture<Void> acceptEitherAsync
2404 (CompletableFuture<? extends T> other,
2405 Consumer<? super T> block,
2406 Executor executor) {
2407 if (executor == null) throw new NullPointerException();
2408 return doOrAccept(other, block, executor);
2409 }
2410
2411 private CompletableFuture<Void> doOrAccept
2412 (CompletableFuture<? extends T> other,
2413 Consumer<? super T> fn,
2414 Executor e) {
2415 if (other == null || fn == null) throw new NullPointerException();
2416 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2417 OrAcceptCompletion<T> d = null;
2418 Object r;
2419 if ((r = result) == null && (r = other.result) == null) {
2420 d = new OrAcceptCompletion<T>(this, other, fn, dst, e);
2421 CompletionNode q = null, p = new CompletionNode(d);
2422 while ((r = result) == null && (r = other.result) == null) {
2423 if (q != null) {
2424 if (UNSAFE.compareAndSwapObject
2425 (other, COMPLETIONS, q.next = other.completions, q))
2426 break;
2427 }
2428 else if (UNSAFE.compareAndSwapObject
2429 (this, COMPLETIONS, p.next = completions, p))
2430 q = new CompletionNode(d);
2431 }
2432 }
2433 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2434 T t; Throwable ex;
2435 if (r instanceof AltResult) {
2436 ex = ((AltResult)r).ex;
2437 t = null;
2438 }
2439 else {
2440 ex = null;
2441 @SuppressWarnings("unchecked") T tr = (T) r;
2442 t = tr;
2443 }
2444 if (ex == null) {
2445 try {
2446 if (e != null)
2447 e.execute(new AsyncAccept<T>(t, fn, dst));
2448 else
2449 fn.accept(t);
2450 } catch (Throwable rex) {
2451 ex = rex;
2452 }
2453 }
2454 if (e == null || ex != null)
2455 dst.internalComplete(null, ex);
2456 }
2457 helpPostComplete();
2458 other.helpPostComplete();
2459 return dst;
2460 }
2461
2462 /**
2463 * Creates and returns a CompletableFuture that is completed
2464 * after this or the other given CompletableFuture complete.
2465 * If this and/or the other CompletableFuture complete exceptionally,
2466 * then the returned CompletableFuture may also do so, with a
2467 * CompletionException holding one of these exceptions as its cause.
2468 * No guarantees are made about which exception is used in the
2469 * returned CompletableFuture.
2470 *
2471 * @param other the other CompletableFuture
2472 * @param action the action to perform before completing the
2473 * returned CompletableFuture
2474 * @return the new CompletableFuture
2475 */
2476 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2477 Runnable action) {
2478 return doOrRun(other, action, null);
2479 }
2480
2481 /**
2482 * Creates and returns a CompletableFuture that is completed
2483 * asynchronously using the {@link ForkJoinPool#commonPool()}
2484 * after this or the other given CompletableFuture complete.
2485 * If this and/or the other CompletableFuture complete exceptionally,
2486 * then the returned CompletableFuture may also do so, with a
2487 * CompletionException holding one of these exceptions as its cause.
2488 * No guarantees are made about which exception is used in the
2489 * returned CompletableFuture.
2490 *
2491 * @param other the other CompletableFuture
2492 * @param action the action to perform before completing the
2493 * returned CompletableFuture
2494 * @return the new CompletableFuture
2495 */
2496 public CompletableFuture<Void> runAfterEitherAsync
2497 (CompletableFuture<?> other,
2498 Runnable action) {
2499 return doOrRun(other, action, ForkJoinPool.commonPool());
2500 }
2501
2502 /**
2503 * Creates and returns a CompletableFuture that is completed
2504 * asynchronously using the given executor after this or the other
2505 * given CompletableFuture complete.
2506 * If this and/or the other CompletableFuture complete exceptionally,
2507 * then the returned CompletableFuture may also do so, with a
2508 * CompletionException holding one of these exceptions as its cause.
2509 * No guarantees are made about which exception is used in the
2510 * returned CompletableFuture.
2511 *
2512 * @param other the other CompletableFuture
2513 * @param action the action to perform before completing the
2514 * returned CompletableFuture
2515 * @param executor the executor to use for asynchronous execution
2516 * @return the new CompletableFuture
2517 */
2518 public CompletableFuture<Void> runAfterEitherAsync
2519 (CompletableFuture<?> other,
2520 Runnable action,
2521 Executor executor) {
2522 if (executor == null) throw new NullPointerException();
2523 return doOrRun(other, action, executor);
2524 }
2525
2526 private CompletableFuture<Void> doOrRun(CompletableFuture<?> other,
2527 Runnable action,
2528 Executor e) {
2529 if (other == null || action == null) throw new NullPointerException();
2530 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2531 OrRunCompletion<T> d = null;
2532 Object r;
2533 if ((r = result) == null && (r = other.result) == null) {
2534 d = new OrRunCompletion<T>(this, other, action, dst, e);
2535 CompletionNode q = null, p = new CompletionNode(d);
2536 while ((r = result) == null && (r = other.result) == null) {
2537 if (q != null) {
2538 if (UNSAFE.compareAndSwapObject
2539 (other, COMPLETIONS, q.next = other.completions, q))
2540 break;
2541 }
2542 else if (UNSAFE.compareAndSwapObject
2543 (this, COMPLETIONS, p.next = completions, p))
2544 q = new CompletionNode(d);
2545 }
2546 }
2547 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2548 Throwable ex;
2549 if (r instanceof AltResult)
2550 ex = ((AltResult)r).ex;
2551 else
2552 ex = null;
2553 if (ex == null) {
2554 try {
2555 if (e != null)
2556 e.execute(new AsyncRun(action, dst));
2557 else
2558 action.run();
2559 } catch (Throwable rex) {
2560 ex = rex;
2561 }
2562 }
2563 if (e == null || ex != null)
2564 dst.internalComplete(null, ex);
2565 }
2566 helpPostComplete();
2567 other.helpPostComplete();
2568 return dst;
2569 }
2570
2571 /**
2572 * Returns a CompletableFuture (or an equivalent one) produced by
2573 * the given function of the result of this CompletableFuture when
2574 * completed. If this CompletableFuture completes exceptionally,
2575 * then the returned CompletableFuture also does so, with a
2576 * CompletionException holding this exception as its cause.
2577 *
2578 * @param fn the function returning a new CompletableFuture
2579 * @return the CompletableFuture, that {@code isDone()} upon
2580 * return if completed by the given function, or an exception
2581 * occurs
2582 */
2583 public <U> CompletableFuture<U> thenCompose
2584 (Function<? super T, CompletableFuture<U>> fn) {
2585 return doCompose(fn, null);
2586 }
2587
2588 /**
2589 * Returns a CompletableFuture (or an equivalent one) produced
2590 * asynchronously using the {@link ForkJoinPool#commonPool()} by
2591 * the given function of the result of this CompletableFuture when
2592 * completed. If this CompletableFuture completes exceptionally,
2593 * then the returned CompletableFuture also does so, with a
2594 * CompletionException holding this exception as its cause.
2595 *
2596 * @param fn the function returning a new CompletableFuture
2597 * @return the CompletableFuture, that {@code isDone()} upon
2598 * return if completed by the given function, or an exception
2599 * occurs
2600 */
2601 public <U> CompletableFuture<U> thenComposeAsync
2602 (Function<? super T, CompletableFuture<U>> fn) {
2603 return doCompose(fn, ForkJoinPool.commonPool());
2604 }
2605
2606 /**
2607 * Returns a CompletableFuture (or an equivalent one) produced
2608 * asynchronously using the given executor by the given function
2609 * of the result of this CompletableFuture when completed.
2610 * If this CompletableFuture completes exceptionally, then the
2611 * returned CompletableFuture also does so, with a
2612 * CompletionException holding this exception as its cause.
2613 *
2614 * @param fn the function returning a new CompletableFuture
2615 * @param executor the executor to use for asynchronous execution
2616 * @return the CompletableFuture, that {@code isDone()} upon
2617 * return if completed by the given function, or an exception
2618 * occurs
2619 */
2620 public <U> CompletableFuture<U> thenComposeAsync
2621 (Function<? super T, CompletableFuture<U>> fn,
2622 Executor executor) {
2623 if (executor == null) throw new NullPointerException();
2624 return doCompose(fn, executor);
2625 }
2626
2627 private <U> CompletableFuture<U> doCompose
2628 (Function<? super T, CompletableFuture<U>> fn,
2629 Executor e) {
2630 if (fn == null) throw new NullPointerException();
2631 CompletableFuture<U> dst = null;
2632 ComposeCompletion<T,U> d = null;
2633 Object r;
2634 if ((r = result) == null) {
2635 dst = new CompletableFuture<U>();
2636 CompletionNode p = new CompletionNode
2637 (d = new ComposeCompletion<T,U>(this, fn, dst, e));
2638 while ((r = result) == null) {
2639 if (UNSAFE.compareAndSwapObject
2640 (this, COMPLETIONS, p.next = completions, p))
2641 break;
2642 }
2643 }
2644 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2645 T t; Throwable ex;
2646 if (r instanceof AltResult) {
2647 ex = ((AltResult)r).ex;
2648 t = null;
2649 }
2650 else {
2651 ex = null;
2652 @SuppressWarnings("unchecked") T tr = (T) r;
2653 t = tr;
2654 }
2655 if (ex == null) {
2656 if (e != null) {
2657 if (dst == null)
2658 dst = new CompletableFuture<U>();
2659 e.execute(new AsyncCompose<T,U>(t, fn, dst));
2660 }
2661 else {
2662 try {
2663 dst = fn.apply(t);
2664 } catch (Throwable rex) {
2665 ex = rex;
2666 }
2667 if (dst == null) {
2668 dst = new CompletableFuture<U>();
2669 if (ex == null)
2670 ex = new NullPointerException();
2671 }
2672 }
2673 }
2674 if (e == null && ex != null)
2675 dst.internalComplete(null, ex);
2676 }
2677 helpPostComplete();
2678 dst.helpPostComplete();
2679 return dst;
2680 }
2681
2682 /**
2683 * Creates and returns a CompletableFuture that is completed with
2684 * the result of the given function of the exception triggering
2685 * this CompletableFuture's completion when it completes
2686 * exceptionally; Otherwise, if this CompletableFuture completes
2687 * normally, then the returned CompletableFuture also completes
2688 * normally with the same value.
2689 *
2690 * @param fn the function to use to compute the value of the
2691 * returned CompletableFuture if this CompletableFuture completed
2692 * exceptionally
2693 * @return the new CompletableFuture
2694 */
2695 public CompletableFuture<T> exceptionally
2696 (Function<Throwable, ? extends T> fn) {
2697 if (fn == null) throw new NullPointerException();
2698 CompletableFuture<T> dst = new CompletableFuture<T>();
2699 ExceptionCompletion<T> d = null;
2700 Object r;
2701 if ((r = result) == null) {
2702 CompletionNode p =
2703 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2704 while ((r = result) == null) {
2705 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2706 p.next = completions, p))
2707 break;
2708 }
2709 }
2710 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2711 T t = null; Throwable ex, dx = null;
2712 if (r instanceof AltResult) {
2713 if ((ex = ((AltResult)r).ex) != null) {
2714 try {
2715 t = fn.apply(ex);
2716 } catch (Throwable rex) {
2717 dx = rex;
2718 }
2719 }
2720 }
2721 else {
2722 @SuppressWarnings("unchecked") T tr = (T) r;
2723 t = tr;
2724 }
2725 dst.internalComplete(t, dx);
2726 }
2727 helpPostComplete();
2728 return dst;
2729 }
2730
2731 /**
2732 * Creates and returns a CompletableFuture that is completed with
2733 * the result of the given function of the result and exception of
2734 * this CompletableFuture's completion when it completes. The
2735 * given function is invoked with the result (or {@code null} if
2736 * none) and the exception (or {@code null} if none) of this
2737 * CompletableFuture when complete.
2738 *
2739 * @param fn the function to use to compute the value of the
2740 * returned CompletableFuture
2741 * @return the new CompletableFuture
2742 */
2743 public <U> CompletableFuture<U> handle
2744 (BiFunction<? super T, Throwable, ? extends U> fn) {
2745 if (fn == null) throw new NullPointerException();
2746 CompletableFuture<U> dst = new CompletableFuture<U>();
2747 HandleCompletion<T,U> d = null;
2748 Object r;
2749 if ((r = result) == null) {
2750 CompletionNode p =
2751 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2752 while ((r = result) == null) {
2753 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2754 p.next = completions, p))
2755 break;
2756 }
2757 }
2758 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2759 T t; Throwable ex;
2760 if (r instanceof AltResult) {
2761 ex = ((AltResult)r).ex;
2762 t = null;
2763 }
2764 else {
2765 ex = null;
2766 @SuppressWarnings("unchecked") T tr = (T) r;
2767 t = tr;
2768 }
2769 U u; Throwable dx;
2770 try {
2771 u = fn.apply(t, ex);
2772 dx = null;
2773 } catch (Throwable rex) {
2774 dx = rex;
2775 u = null;
2776 }
2777 dst.internalComplete(u, dx);
2778 }
2779 helpPostComplete();
2780 return dst;
2781 }
2782
2783
2784 /* ------------- Arbitrary-arity constructions -------------- */
2785
2786 /*
2787 * The basic plan of attack is to recursively form binary
2788 * completion trees of elements. This can be overkill for small
2789 * sets, but scales nicely. The And/All vs Or/Any forms use the
2790 * same idea, but details differ.
2791 */
2792
2793 /**
2794 * Returns a new CompletableFuture that is completed when all of
2795 * the given CompletableFutures complete. If any of the component
2796 * CompletableFutures complete exceptionally, then so does the
2797 * returned CompletableFuture. Otherwise, the results, if any, of
2798 * the component CompletableFutures are not reflected in the
2799 * returned CompletableFuture, but may be obtained by inspecting
2800 * them individually. If the number of components is zero, returns
2801 * a CompletableFuture completed with the value {@code null}.
2802 *
2803 * <p>Among the applications of this method is to await completion
2804 * of a set of independent CompletableFutures before continuing a
2805 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2806 * c3).join();}.
2807 *
2808 * @param cfs the CompletableFutures
2809 * @return a CompletableFuture that is complete when all of the
2810 * given CompletableFutures complete
2811 * @throws NullPointerException if the array or any of its elements are
2812 * {@code null}
2813 */
2814 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2815 int len = cfs.length; // Directly handle empty and singleton cases
2816 if (len > 1)
2817 return allTree(cfs, 0, len - 1);
2818 else {
2819 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2820 CompletableFuture<?> f;
2821 if (len == 0)
2822 dst.result = NIL;
2823 else if ((f = cfs[0]) == null)
2824 throw new NullPointerException();
2825 else {
2826 ThenCopy d = null;
2827 CompletionNode p = null;
2828 Object r;
2829 while ((r = f.result) == null) {
2830 if (d == null)
2831 d = new ThenCopy(f, dst);
2832 else if (p == null)
2833 p = new CompletionNode(d);
2834 else if (UNSAFE.compareAndSwapObject
2835 (f, COMPLETIONS, p.next = f.completions, p))
2836 break;
2837 }
2838 if (r != null && (d == null || d.compareAndSet(0, 1)))
2839 dst.internalComplete(null, (r instanceof AltResult) ?
2840 ((AltResult)r).ex : null);
2841 f.helpPostComplete();
2842 }
2843 return dst;
2844 }
2845 }
2846
2847 /**
2848 * Recursively constructs an And'ed tree of CompletableFutures.
2849 * Called only when array known to have at least two elements.
2850 */
2851 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2852 int lo, int hi) {
2853 CompletableFuture<?> fst, snd;
2854 int mid = (lo + hi) >>> 1;
2855 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2856 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2857 throw new NullPointerException();
2858 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2859 AndCompletion d = null;
2860 CompletionNode p = null, q = null;
2861 Object r = null, s = null;
2862 while ((r = fst.result) == null || (s = snd.result) == null) {
2863 if (d == null)
2864 d = new AndCompletion(fst, snd, dst);
2865 else if (p == null)
2866 p = new CompletionNode(d);
2867 else if (q == null) {
2868 if (UNSAFE.compareAndSwapObject
2869 (fst, COMPLETIONS, p.next = fst.completions, p))
2870 q = new CompletionNode(d);
2871 }
2872 else if (UNSAFE.compareAndSwapObject
2873 (snd, COMPLETIONS, q.next = snd.completions, q))
2874 break;
2875 }
2876 if ((r != null || (r = fst.result) != null) &&
2877 (s != null || (s = snd.result) != null) &&
2878 (d == null || d.compareAndSet(0, 1))) {
2879 Throwable ex;
2880 if (r instanceof AltResult)
2881 ex = ((AltResult)r).ex;
2882 else
2883 ex = null;
2884 if (ex == null && (s instanceof AltResult))
2885 ex = ((AltResult)s).ex;
2886 dst.internalComplete(null, ex);
2887 }
2888 fst.helpPostComplete();
2889 snd.helpPostComplete();
2890 return dst;
2891 }
2892
2893 /**
2894 * Returns a new CompletableFuture that is completed when any of
2895 * the component CompletableFutures complete; with the same result if
2896 * it completed normally, otherwise exceptionally. If the number
2897 * of components is zero, returns an incomplete CompletableFuture.
2898 *
2899 * @param cfs the CompletableFutures
2900 * @return a CompletableFuture that is complete when any of the
2901 * given CompletableFutures complete
2902 * @throws NullPointerException if the array or any of its elements are
2903 * {@code null}
2904 */
2905 public static CompletableFuture<?> anyOf(CompletableFuture<?>... cfs) {
2906 int len = cfs.length; // Same idea as allOf
2907 if (len > 1)
2908 return anyTree(cfs, 0, len - 1);
2909 else {
2910 CompletableFuture<?> dst = new CompletableFuture<Object>();
2911 CompletableFuture<?> f;
2912 if (len == 0)
2913 ; // skip
2914 else if ((f = cfs[0]) == null)
2915 throw new NullPointerException();
2916 else {
2917 ThenCopy d = null;
2918 CompletionNode p = null;
2919 Object r;
2920 while ((r = f.result) == null) {
2921 if (d == null)
2922 d = new ThenCopy(f, dst);
2923 else if (p == null)
2924 p = new CompletionNode(d);
2925 else if (UNSAFE.compareAndSwapObject
2926 (f, COMPLETIONS, p.next = f.completions, p))
2927 break;
2928 }
2929 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2930 Throwable ex; Object t;
2931 if (r instanceof AltResult) {
2932 ex = ((AltResult)r).ex;
2933 t = null;
2934 }
2935 else {
2936 ex = null;
2937 t = r;
2938 }
2939 dst.internalComplete(t, ex);
2940 }
2941 f.helpPostComplete();
2942 }
2943 return dst;
2944 }
2945 }
2946
2947 /**
2948 * Recursively constructs an Or'ed tree of CompletableFutures.
2949 */
2950 private static CompletableFuture<?> anyTree(CompletableFuture<?>[] cfs,
2951 int lo, int hi) {
2952 CompletableFuture<?> fst, snd;
2953 int mid = (lo + hi) >>> 1;
2954 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
2955 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
2956 throw new NullPointerException();
2957 CompletableFuture<?> dst = new CompletableFuture<Object>();
2958 OrCompletion d = null;
2959 CompletionNode p = null, q = null;
2960 Object r;
2961 while ((r = fst.result) == null && (r = snd.result) == null) {
2962 if (d == null)
2963 d = new OrCompletion(fst, snd, dst);
2964 else if (p == null)
2965 p = new CompletionNode(d);
2966 else if (q == null) {
2967 if (UNSAFE.compareAndSwapObject
2968 (fst, COMPLETIONS, p.next = fst.completions, p))
2969 q = new CompletionNode(d);
2970 }
2971 else if (UNSAFE.compareAndSwapObject
2972 (snd, COMPLETIONS, q.next = snd.completions, q))
2973 break;
2974 }
2975 if ((r != null || (r = fst.result) != null ||
2976 (r = snd.result) != null) &&
2977 (d == null || d.compareAndSet(0, 1))) {
2978 Throwable ex; Object t;
2979 if (r instanceof AltResult) {
2980 ex = ((AltResult)r).ex;
2981 t = null;
2982 }
2983 else {
2984 ex = null;
2985 t = r;
2986 }
2987 dst.internalComplete(t, ex);
2988 }
2989 fst.helpPostComplete();
2990 snd.helpPostComplete();
2991 return dst;
2992 }
2993
2994
2995 /* ------------- Control and status methods -------------- */
2996
2997 /**
2998 * If not already completed, completes this CompletableFuture with
2999 * a {@link CancellationException}. Dependent CompletableFutures
3000 * that have not already completed will also complete
3001 * exceptionally, with a {@link CompletionException} caused by
3002 * this {@code CancellationException}.
3003 *
3004 * @param mayInterruptIfRunning this value has no effect in this
3005 * implementation because interrupts are not used to control
3006 * processing.
3007 *
3008 * @return {@code true} if this task is now cancelled
3009 */
3010 public boolean cancel(boolean mayInterruptIfRunning) {
3011 boolean cancelled = (result == null) &&
3012 UNSAFE.compareAndSwapObject
3013 (this, RESULT, null, new AltResult(new CancellationException()));
3014 postComplete();
3015 return cancelled || isCancelled();
3016 }
3017
3018 /**
3019 * Returns {@code true} if this CompletableFuture was cancelled
3020 * before it completed normally.
3021 *
3022 * @return {@code true} if this CompletableFuture was cancelled
3023 * before it completed normally
3024 */
3025 public boolean isCancelled() {
3026 Object r;
3027 return ((r = result) instanceof AltResult) &&
3028 (((AltResult)r).ex instanceof CancellationException);
3029 }
3030
3031 /**
3032 * Forcibly sets or resets the value subsequently returned by
3033 * method {@link #get()} and related methods, whether or not
3034 * already completed. This method is designed for use only in
3035 * error recovery actions, and even in such situations may result
3036 * in ongoing dependent completions using established versus
3037 * overwritten outcomes.
3038 *
3039 * @param value the completion value
3040 */
3041 public void obtrudeValue(T value) {
3042 result = (value == null) ? NIL : value;
3043 postComplete();
3044 }
3045
3046 /**
3047 * Forcibly causes subsequent invocations of method {@link #get()}
3048 * and related methods to throw the given exception, whether or
3049 * not already completed. This method is designed for use only in
3050 * recovery actions, and even in such situations may result in
3051 * ongoing dependent completions using established versus
3052 * overwritten outcomes.
3053 *
3054 * @param ex the exception
3055 */
3056 public void obtrudeException(Throwable ex) {
3057 if (ex == null) throw new NullPointerException();
3058 result = new AltResult(ex);
3059 postComplete();
3060 }
3061
3062 /**
3063 * Returns the estimated number of CompletableFutures whose
3064 * completions are awaiting completion of this CompletableFuture.
3065 * This method is designed for use in monitoring system state, not
3066 * for synchronization control.
3067 *
3068 * @return the number of dependent CompletableFutures
3069 */
3070 public int getNumberOfDependents() {
3071 int count = 0;
3072 for (CompletionNode p = completions; p != null; p = p.next)
3073 ++count;
3074 return count;
3075 }
3076
3077 /**
3078 * Returns a string identifying this CompletableFuture, as well as
3079 * its completion state. The state, in brackets, contains the
3080 * String {@code "Completed Normally"} or the String {@code
3081 * "Completed Exceptionally"}, or the String {@code "Not
3082 * completed"} followed by the number of CompletableFutures
3083 * dependent upon its completion, if any.
3084 *
3085 * @return a string identifying this CompletableFuture, as well as its state
3086 */
3087 public String toString() {
3088 Object r = result;
3089 int count;
3090 return super.toString() +
3091 ((r == null) ?
3092 (((count = getNumberOfDependents()) == 0) ?
3093 "[Not completed]" :
3094 "[Not completed, " + count + " dependents]") :
3095 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
3096 "[Completed exceptionally]" :
3097 "[Completed normally]"));
3098 }
3099
3100 // Unsafe mechanics
3101 private static final sun.misc.Unsafe UNSAFE;
3102 private static final long RESULT;
3103 private static final long WAITERS;
3104 private static final long COMPLETIONS;
3105 static {
3106 try {
3107 UNSAFE = sun.misc.Unsafe.getUnsafe();
3108 Class<?> k = CompletableFuture.class;
3109 RESULT = UNSAFE.objectFieldOffset
3110 (k.getDeclaredField("result"));
3111 WAITERS = UNSAFE.objectFieldOffset
3112 (k.getDeclaredField("waiters"));
3113 COMPLETIONS = UNSAFE.objectFieldOffset
3114 (k.getDeclaredField("completions"));
3115 } catch (Exception e) {
3116 throw new Error(e);
3117 }
3118 }
3119 }