ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.36
Committed: Sat Feb 2 20:39:34 2013 UTC (11 years, 4 months ago) by jsr166
Branch: MAIN
Changes since 1.35: +1 -1 lines
Log Message:
whitespace

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.locks.LockSupport;
24
25 /**
26 * A {@link Future} that may be explicitly completed (setting its
27 * value and status), and may include dependent functions and actions
28 * that trigger upon its completion.
29 *
30 * <p>When two or more threads attempt to {@link #complete} or {@link
31 * #completeExceptionally} a CompletableFuture, only one of them
32 * succeeds.
33 *
34 * <p>Methods are available for adding dependents based on Functions,
35 * Consumers, and Runnables. The appropriate form to use depends on
36 * whether actions require arguments and/or produce results. Actions
37 * may also be triggered after either or both the current and another
38 * CompletableFuture complete. Multiple CompletableFutures may also
39 * be grouped as one using {@link #anyOf} and {@link #allOf}.
40 *
41 * <p>Actions supplied for dependent completions (mainly using methods
42 * with prefix {@code then}) may be performed by the thread that
43 * completes the current CompletableFuture, or by any other caller of
44 * these methods. There are no guarantees about the order of
45 * processing completions unless constrained by these methods.
46 *
47 * <p>Upon exceptional completion, or when a completion entails
48 * computation, and it terminates abruptly with an (unchecked)
49 * exception or error, then further completions act as {@code
50 * completeExceptionally} with a {@link CompletionException} holding
51 * that exception as its cause. If a CompletableFuture completes
52 * exceptionally, and is not followed by a {@link #exceptionally} or
53 * {@link #handle} completion, then all of its dependents (and their
54 * dependents) also complete exceptionally with CompletionExceptions
55 * holding the ultimate cause. In case of a CompletionException,
56 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
57 * {@link ExecutionException} with the same cause as would be held in
58 * the corresponding CompletionException. However, in these cases,
59 * methods {@link #join()} and {@link #getNow} throw the
60 * CompletionException, which simplifies usage especially within other
61 * completion functions.
62 *
63 * <p>CompletableFutures themselves do not execute asynchronously.
64 * However, the {@code async} methods provide commonly useful ways to
65 * commence asynchronous processing, using either a given {@link
66 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
67 * function or action that will result in the completion of a new
68 * CompletableFuture. To simplify monitoring, debugging, and tracking,
69 * all generated asynchronous tasks are instances of the tagging
70 * interface {@link AsynchronousCompletionTask}.
71 *
72 * @author Doug Lea
73 * @since 1.8
74 */
75 public class CompletableFuture<T> implements Future<T> {
76
77 /*
78 * Overview:
79 *
80 * 1. Non-nullness of field result (set via CAS) indicates done.
81 * An AltResult is used to box null as a result, as well as to
82 * hold exceptions. Using a single field makes completion fast
83 * and simple to detect and trigger, at the expense of a lot of
84 * encoding and decoding that infiltrates many methods. One minor
85 * simplification relies on the (static) NIL (to box null results)
86 * being the only AltResult with a null exception field, so we
87 * don't usually need explicit comparisons with NIL. The CF
88 * exception propagation mechanics surrounding decoding rely on
89 * unchecked casts of decoded results really being unchecked,
90 * where user type errors are caught at point of use, as is
91 * currently the case in Java. These are highlighted by using
92 * SuppressWarnings-annotated temporaries.
93 *
94 * 2. Waiters are held in a Treiber stack similar to the one used
95 * in FutureTask, Phaser, and SynchronousQueue. See their
96 * internal documentation for algorithmic details.
97 *
98 * 3. Completions are also kept in a list/stack, and pulled off
99 * and run when completion is triggered. (We could even use the
100 * same stack as for waiters, but would give up the potential
101 * parallelism obtained because woken waiters help release/run
102 * others -- see method postComplete). Because post-processing
103 * may race with direct calls, class Completion opportunistically
104 * extends AtomicInteger so callers can claim the action via
105 * compareAndSet(0, 1). The Completion.run methods are all
106 * written a boringly similar uniform way (that sometimes includes
107 * unnecessary-looking checks, kept to maintain uniformity). There
108 * are enough dimensions upon which they differ that attempts to
109 * factor commonalities while maintaining efficiency require more
110 * lines of code than they would save.
111 *
112 * 4. The exported then/and/or methods do support a bit of
113 * factoring (see doThenApply etc). They must cope with the
114 * intrinsic races surrounding addition of a dependent action
115 * versus performing the action directly because the task is
116 * already complete. For example, a CF may not be complete upon
117 * entry, so a dependent completion is added, but by the time it
118 * is added, the target CF is complete, so must be directly
119 * executed. This is all done while avoiding unnecessary object
120 * construction in safe-bypass cases.
121 */
122
123 // preliminaries
124
125 static final class AltResult {
126 final Throwable ex; // null only for NIL
127 AltResult(Throwable ex) { this.ex = ex; }
128 }
129
130 static final AltResult NIL = new AltResult(null);
131
132 // Fields
133
134 volatile Object result; // Either the result or boxed AltResult
135 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
136 volatile CompletionNode completions; // list (Treiber stack) of completions
137
138 // Basic utilities for triggering and processing completions
139
140 /**
141 * Removes and signals all waiting threads and runs all completions.
142 */
143 final void postComplete() {
144 WaitNode q; Thread t;
145 while ((q = waiters) != null) {
146 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
147 (t = q.thread) != null) {
148 q.thread = null;
149 LockSupport.unpark(t);
150 }
151 }
152
153 CompletionNode h; Completion c;
154 while ((h = completions) != null) {
155 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
156 (c = h.completion) != null)
157 c.run();
158 }
159 }
160
161 /**
162 * Triggers completion with the encoding of the given arguments:
163 * if the exception is non-null, encodes it as a wrapped
164 * CompletionException unless it is one already. Otherwise uses
165 * the given result, boxed as NIL if null.
166 */
167 final void internalComplete(Object v, Throwable ex) {
168 if (result == null)
169 UNSAFE.compareAndSwapObject
170 (this, RESULT, null,
171 (ex == null) ? (v == null) ? NIL : v :
172 new AltResult((ex instanceof CompletionException) ? ex :
173 new CompletionException(ex)));
174 postComplete(); // help out even if not triggered
175 }
176
177 /**
178 * If triggered, helps release and/or process completions.
179 */
180 final void helpPostComplete() {
181 if (result != null)
182 postComplete();
183 }
184
185 /* ------------- waiting for completions -------------- */
186
187 /** Number of processors, for spin control */
188 static final int NCPU = Runtime.getRuntime().availableProcessors();
189
190 /**
191 * Heuristic spin value for waitingGet() before blocking on
192 * multiprocessors
193 */
194 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
195
196 /**
197 * Linked nodes to record waiting threads in a Treiber stack. See
198 * other classes such as Phaser and SynchronousQueue for more
199 * detailed explanation. This class implements ManagedBlocker to
200 * avoid starvation when blocking actions pile up in
201 * ForkJoinPools.
202 */
203 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
204 long nanos; // wait time if timed
205 final long deadline; // non-zero if timed
206 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
207 volatile Thread thread;
208 volatile WaitNode next;
209 WaitNode(boolean interruptible, long nanos, long deadline) {
210 this.thread = Thread.currentThread();
211 this.interruptControl = interruptible ? 1 : 0;
212 this.nanos = nanos;
213 this.deadline = deadline;
214 }
215 public boolean isReleasable() {
216 if (thread == null)
217 return true;
218 if (Thread.interrupted()) {
219 int i = interruptControl;
220 interruptControl = -1;
221 if (i > 0)
222 return true;
223 }
224 if (deadline != 0L &&
225 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
226 thread = null;
227 return true;
228 }
229 return false;
230 }
231 public boolean block() {
232 if (isReleasable())
233 return true;
234 else if (deadline == 0L)
235 LockSupport.park(this);
236 else if (nanos > 0L)
237 LockSupport.parkNanos(this, nanos);
238 return isReleasable();
239 }
240 }
241
242 /**
243 * Returns raw result after waiting, or null if interruptible and
244 * interrupted.
245 */
246 private Object waitingGet(boolean interruptible) {
247 WaitNode q = null;
248 boolean queued = false;
249 int spins = SPINS;
250 for (Object r;;) {
251 if ((r = result) != null) {
252 if (q != null) { // suppress unpark
253 q.thread = null;
254 if (q.interruptControl < 0) {
255 if (interruptible) {
256 removeWaiter(q);
257 return null;
258 }
259 Thread.currentThread().interrupt();
260 }
261 }
262 postComplete(); // help release others
263 return r;
264 }
265 else if (spins > 0) {
266 int rnd = ThreadLocalRandom.nextSecondarySeed();
267 if (rnd == 0)
268 rnd = ThreadLocalRandom.current().nextInt();
269 if (rnd >= 0)
270 --spins;
271 }
272 else if (q == null)
273 q = new WaitNode(interruptible, 0L, 0L);
274 else if (!queued)
275 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
276 q.next = waiters, q);
277 else if (interruptible && q.interruptControl < 0) {
278 removeWaiter(q);
279 return null;
280 }
281 else if (q.thread != null && result == null) {
282 try {
283 ForkJoinPool.managedBlock(q);
284 } catch (InterruptedException ex) {
285 q.interruptControl = -1;
286 }
287 }
288 }
289 }
290
291 /**
292 * Awaits completion or aborts on interrupt or timeout.
293 *
294 * @param nanos time to wait
295 * @return raw result
296 */
297 private Object timedAwaitDone(long nanos)
298 throws InterruptedException, TimeoutException {
299 WaitNode q = null;
300 boolean queued = false;
301 for (Object r;;) {
302 if ((r = result) != null) {
303 if (q != null) {
304 q.thread = null;
305 if (q.interruptControl < 0) {
306 removeWaiter(q);
307 throw new InterruptedException();
308 }
309 }
310 postComplete();
311 return r;
312 }
313 else if (q == null) {
314 if (nanos <= 0L)
315 throw new TimeoutException();
316 long d = System.nanoTime() + nanos;
317 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
318 }
319 else if (!queued)
320 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
321 q.next = waiters, q);
322 else if (q.interruptControl < 0) {
323 removeWaiter(q);
324 throw new InterruptedException();
325 }
326 else if (q.nanos <= 0L) {
327 if (result == null) {
328 removeWaiter(q);
329 throw new TimeoutException();
330 }
331 }
332 else if (q.thread != null && result == null) {
333 try {
334 ForkJoinPool.managedBlock(q);
335 } catch (InterruptedException ex) {
336 q.interruptControl = -1;
337 }
338 }
339 }
340 }
341
342 /**
343 * Tries to unlink a timed-out or interrupted wait node to avoid
344 * accumulating garbage. Internal nodes are simply unspliced
345 * without CAS since it is harmless if they are traversed anyway
346 * by releasers. To avoid effects of unsplicing from already
347 * removed nodes, the list is retraversed in case of an apparent
348 * race. This is slow when there are a lot of nodes, but we don't
349 * expect lists to be long enough to outweigh higher-overhead
350 * schemes.
351 */
352 private void removeWaiter(WaitNode node) {
353 if (node != null) {
354 node.thread = null;
355 retry:
356 for (;;) { // restart on removeWaiter race
357 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
358 s = q.next;
359 if (q.thread != null)
360 pred = q;
361 else if (pred != null) {
362 pred.next = s;
363 if (pred.thread == null) // check for race
364 continue retry;
365 }
366 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
367 continue retry;
368 }
369 break;
370 }
371 }
372 }
373
374 /* ------------- Async tasks -------------- */
375
376 /**
377 * A tagging interface identifying asynchronous tasks produced by
378 * {@code async} methods. This may be useful for monitoring,
379 * debugging, and tracking asynchronous activities.
380 */
381 public static interface AsynchronousCompletionTask {
382 }
383
384 /** Base class can act as either FJ or plain Runnable */
385 abstract static class Async extends ForkJoinTask<Void>
386 implements Runnable, AsynchronousCompletionTask {
387 public final Void getRawResult() { return null; }
388 public final void setRawResult(Void v) { }
389 public final void run() { exec(); }
390 }
391
392 static final class AsyncRun extends Async {
393 final Runnable fn;
394 final CompletableFuture<Void> dst;
395 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
396 this.fn = fn; this.dst = dst;
397 }
398 public final boolean exec() {
399 CompletableFuture<Void> d; Throwable ex;
400 if ((d = this.dst) != null && d.result == null) {
401 try {
402 fn.run();
403 ex = null;
404 } catch (Throwable rex) {
405 ex = rex;
406 }
407 d.internalComplete(null, ex);
408 }
409 return true;
410 }
411 private static final long serialVersionUID = 5232453952276885070L;
412 }
413
414 static final class AsyncSupply<U> extends Async {
415 final Supplier<U> fn;
416 final CompletableFuture<U> dst;
417 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
418 this.fn = fn; this.dst = dst;
419 }
420 public final boolean exec() {
421 CompletableFuture<U> d; U u; Throwable ex;
422 if ((d = this.dst) != null && d.result == null) {
423 try {
424 u = fn.get();
425 ex = null;
426 } catch (Throwable rex) {
427 ex = rex;
428 u = null;
429 }
430 d.internalComplete(u, ex);
431 }
432 return true;
433 }
434 private static final long serialVersionUID = 5232453952276885070L;
435 }
436
437 static final class AsyncApply<T,U> extends Async {
438 final Function<? super T,? extends U> fn;
439 final T arg;
440 final CompletableFuture<U> dst;
441 AsyncApply(T arg, Function<? super T,? extends U> fn,
442 CompletableFuture<U> dst) {
443 this.arg = arg; this.fn = fn; this.dst = dst;
444 }
445 public final boolean exec() {
446 CompletableFuture<U> d; U u; Throwable ex;
447 if ((d = this.dst) != null && d.result == null) {
448 try {
449 u = fn.apply(arg);
450 ex = null;
451 } catch (Throwable rex) {
452 ex = rex;
453 u = null;
454 }
455 d.internalComplete(u, ex);
456 }
457 return true;
458 }
459 private static final long serialVersionUID = 5232453952276885070L;
460 }
461
462 static final class AsyncBiApply<T,U,V> extends Async {
463 final BiFunction<? super T,? super U,? extends V> fn;
464 final T arg1;
465 final U arg2;
466 final CompletableFuture<V> dst;
467 AsyncBiApply(T arg1, U arg2,
468 BiFunction<? super T,? super U,? extends V> fn,
469 CompletableFuture<V> dst) {
470 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
471 }
472 public final boolean exec() {
473 CompletableFuture<V> d; V v; Throwable ex;
474 if ((d = this.dst) != null && d.result == null) {
475 try {
476 v = fn.apply(arg1, arg2);
477 ex = null;
478 } catch (Throwable rex) {
479 ex = rex;
480 v = null;
481 }
482 d.internalComplete(v, ex);
483 }
484 return true;
485 }
486 private static final long serialVersionUID = 5232453952276885070L;
487 }
488
489 static final class AsyncAccept<T> extends Async {
490 final Consumer<? super T> fn;
491 final T arg;
492 final CompletableFuture<Void> dst;
493 AsyncAccept(T arg, Consumer<? super T> fn,
494 CompletableFuture<Void> dst) {
495 this.arg = arg; this.fn = fn; this.dst = dst;
496 }
497 public final boolean exec() {
498 CompletableFuture<Void> d; Throwable ex;
499 if ((d = this.dst) != null && d.result == null) {
500 try {
501 fn.accept(arg);
502 ex = null;
503 } catch (Throwable rex) {
504 ex = rex;
505 }
506 d.internalComplete(null, ex);
507 }
508 return true;
509 }
510 private static final long serialVersionUID = 5232453952276885070L;
511 }
512
513 static final class AsyncBiAccept<T,U> extends Async {
514 final BiConsumer<? super T,? super U> fn;
515 final T arg1;
516 final U arg2;
517 final CompletableFuture<Void> dst;
518 AsyncBiAccept(T arg1, U arg2,
519 BiConsumer<? super T,? super U> fn,
520 CompletableFuture<Void> dst) {
521 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
522 }
523 public final boolean exec() {
524 CompletableFuture<Void> d; Throwable ex;
525 if ((d = this.dst) != null && d.result == null) {
526 try {
527 fn.accept(arg1, arg2);
528 ex = null;
529 } catch (Throwable rex) {
530 ex = rex;
531 }
532 d.internalComplete(null, ex);
533 }
534 return true;
535 }
536 private static final long serialVersionUID = 5232453952276885070L;
537 }
538
539 /* ------------- Completions -------------- */
540
541 /**
542 * Simple linked list nodes to record completions, used in
543 * basically the same way as WaitNodes. (We separate nodes from
544 * the Completions themselves mainly because for the And and Or
545 * methods, the same Completion object resides in two lists.)
546 */
547 static final class CompletionNode {
548 final Completion completion;
549 volatile CompletionNode next;
550 CompletionNode(Completion completion) { this.completion = completion; }
551 }
552
553 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
554 abstract static class Completion extends AtomicInteger implements Runnable {
555 }
556
557 static final class ApplyCompletion<T,U> extends Completion {
558 final CompletableFuture<? extends T> src;
559 final Function<? super T,? extends U> fn;
560 final CompletableFuture<U> dst;
561 final Executor executor;
562 ApplyCompletion(CompletableFuture<? extends T> src,
563 Function<? super T,? extends U> fn,
564 CompletableFuture<U> dst, Executor executor) {
565 this.src = src; this.fn = fn; this.dst = dst;
566 this.executor = executor;
567 }
568 public final void run() {
569 final CompletableFuture<? extends T> a;
570 final Function<? super T,? extends U> fn;
571 final CompletableFuture<U> dst;
572 Object r; T t; Throwable ex;
573 if ((dst = this.dst) != null &&
574 (fn = this.fn) != null &&
575 (a = this.src) != null &&
576 (r = a.result) != null &&
577 compareAndSet(0, 1)) {
578 if (r instanceof AltResult) {
579 ex = ((AltResult)r).ex;
580 t = null;
581 }
582 else {
583 ex = null;
584 @SuppressWarnings("unchecked") T tr = (T) r;
585 t = tr;
586 }
587 Executor e = executor;
588 U u = null;
589 if (ex == null) {
590 try {
591 if (e != null)
592 e.execute(new AsyncApply<T,U>(t, fn, dst));
593 else
594 u = fn.apply(t);
595 } catch (Throwable rex) {
596 ex = rex;
597 }
598 }
599 if (e == null || ex != null)
600 dst.internalComplete(u, ex);
601 }
602 }
603 private static final long serialVersionUID = 5232453952276885070L;
604 }
605
606 static final class AcceptCompletion<T> extends Completion {
607 final CompletableFuture<? extends T> src;
608 final Consumer<? super T> fn;
609 final CompletableFuture<Void> dst;
610 final Executor executor;
611 AcceptCompletion(CompletableFuture<? extends T> src,
612 Consumer<? super T> fn,
613 CompletableFuture<Void> dst, Executor executor) {
614 this.src = src; this.fn = fn; this.dst = dst;
615 this.executor = executor;
616 }
617 public final void run() {
618 final CompletableFuture<? extends T> a;
619 final Consumer<? super T> fn;
620 final CompletableFuture<Void> dst;
621 Object r; T t; Throwable ex;
622 if ((dst = this.dst) != null &&
623 (fn = this.fn) != null &&
624 (a = this.src) != null &&
625 (r = a.result) != null &&
626 compareAndSet(0, 1)) {
627 if (r instanceof AltResult) {
628 ex = ((AltResult)r).ex;
629 t = null;
630 }
631 else {
632 ex = null;
633 @SuppressWarnings("unchecked") T tr = (T) r;
634 t = tr;
635 }
636 Executor e = executor;
637 if (ex == null) {
638 try {
639 if (e != null)
640 e.execute(new AsyncAccept<T>(t, fn, dst));
641 else
642 fn.accept(t);
643 } catch (Throwable rex) {
644 ex = rex;
645 }
646 }
647 if (e == null || ex != null)
648 dst.internalComplete(null, ex);
649 }
650 }
651 private static final long serialVersionUID = 5232453952276885070L;
652 }
653
654 static final class RunCompletion<T> extends Completion {
655 final CompletableFuture<? extends T> src;
656 final Runnable fn;
657 final CompletableFuture<Void> dst;
658 final Executor executor;
659 RunCompletion(CompletableFuture<? extends T> src,
660 Runnable fn,
661 CompletableFuture<Void> dst,
662 Executor executor) {
663 this.src = src; this.fn = fn; this.dst = dst;
664 this.executor = executor;
665 }
666 public final void run() {
667 final CompletableFuture<? extends T> a;
668 final Runnable fn;
669 final CompletableFuture<Void> dst;
670 Object r; Throwable ex;
671 if ((dst = this.dst) != null &&
672 (fn = this.fn) != null &&
673 (a = this.src) != null &&
674 (r = a.result) != null &&
675 compareAndSet(0, 1)) {
676 if (r instanceof AltResult)
677 ex = ((AltResult)r).ex;
678 else
679 ex = null;
680 Executor e = executor;
681 if (ex == null) {
682 try {
683 if (e != null)
684 e.execute(new AsyncRun(fn, dst));
685 else
686 fn.run();
687 } catch (Throwable rex) {
688 ex = rex;
689 }
690 }
691 if (e == null || ex != null)
692 dst.internalComplete(null, ex);
693 }
694 }
695 private static final long serialVersionUID = 5232453952276885070L;
696 }
697
698 static final class BiApplyCompletion<T,U,V> extends Completion {
699 final CompletableFuture<? extends T> src;
700 final CompletableFuture<? extends U> snd;
701 final BiFunction<? super T,? super U,? extends V> fn;
702 final CompletableFuture<V> dst;
703 final Executor executor;
704 BiApplyCompletion(CompletableFuture<? extends T> src,
705 CompletableFuture<? extends U> snd,
706 BiFunction<? super T,? super U,? extends V> fn,
707 CompletableFuture<V> dst, Executor executor) {
708 this.src = src; this.snd = snd;
709 this.fn = fn; this.dst = dst;
710 this.executor = executor;
711 }
712 public final void run() {
713 final CompletableFuture<? extends T> a;
714 final CompletableFuture<? extends U> b;
715 final BiFunction<? super T,? super U,? extends V> fn;
716 final CompletableFuture<V> dst;
717 Object r, s; T t; U u; Throwable ex;
718 if ((dst = this.dst) != null &&
719 (fn = this.fn) != null &&
720 (a = this.src) != null &&
721 (r = a.result) != null &&
722 (b = this.snd) != null &&
723 (s = b.result) != null &&
724 compareAndSet(0, 1)) {
725 if (r instanceof AltResult) {
726 ex = ((AltResult)r).ex;
727 t = null;
728 }
729 else {
730 ex = null;
731 @SuppressWarnings("unchecked") T tr = (T) r;
732 t = tr;
733 }
734 if (ex != null)
735 u = null;
736 else if (s instanceof AltResult) {
737 ex = ((AltResult)s).ex;
738 u = null;
739 }
740 else {
741 @SuppressWarnings("unchecked") U us = (U) s;
742 u = us;
743 }
744 Executor e = executor;
745 V v = null;
746 if (ex == null) {
747 try {
748 if (e != null)
749 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
750 else
751 v = fn.apply(t, u);
752 } catch (Throwable rex) {
753 ex = rex;
754 }
755 }
756 if (e == null || ex != null)
757 dst.internalComplete(v, ex);
758 }
759 }
760 private static final long serialVersionUID = 5232453952276885070L;
761 }
762
763 static final class BiAcceptCompletion<T,U> extends Completion {
764 final CompletableFuture<? extends T> src;
765 final CompletableFuture<? extends U> snd;
766 final BiConsumer<? super T,? super U> fn;
767 final CompletableFuture<Void> dst;
768 final Executor executor;
769 BiAcceptCompletion(CompletableFuture<? extends T> src,
770 CompletableFuture<? extends U> snd,
771 BiConsumer<? super T,? super U> fn,
772 CompletableFuture<Void> dst, Executor executor) {
773 this.src = src; this.snd = snd;
774 this.fn = fn; this.dst = dst;
775 this.executor = executor;
776 }
777 public final void run() {
778 final CompletableFuture<? extends T> a;
779 final CompletableFuture<? extends U> b;
780 final BiConsumer<? super T,? super U> fn;
781 final CompletableFuture<Void> dst;
782 Object r, s; T t; U u; Throwable ex;
783 if ((dst = this.dst) != null &&
784 (fn = this.fn) != null &&
785 (a = this.src) != null &&
786 (r = a.result) != null &&
787 (b = this.snd) != null &&
788 (s = b.result) != null &&
789 compareAndSet(0, 1)) {
790 if (r instanceof AltResult) {
791 ex = ((AltResult)r).ex;
792 t = null;
793 }
794 else {
795 ex = null;
796 @SuppressWarnings("unchecked") T tr = (T) r;
797 t = tr;
798 }
799 if (ex != null)
800 u = null;
801 else if (s instanceof AltResult) {
802 ex = ((AltResult)s).ex;
803 u = null;
804 }
805 else {
806 @SuppressWarnings("unchecked") U us = (U) s;
807 u = us;
808 }
809 Executor e = executor;
810 if (ex == null) {
811 try {
812 if (e != null)
813 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
814 else
815 fn.accept(t, u);
816 } catch (Throwable rex) {
817 ex = rex;
818 }
819 }
820 if (e == null || ex != null)
821 dst.internalComplete(null, ex);
822 }
823 }
824 private static final long serialVersionUID = 5232453952276885070L;
825 }
826
827 static final class BiRunCompletion<T> extends Completion {
828 final CompletableFuture<? extends T> src;
829 final CompletableFuture<?> snd;
830 final Runnable fn;
831 final CompletableFuture<Void> dst;
832 final Executor executor;
833 BiRunCompletion(CompletableFuture<? extends T> src,
834 CompletableFuture<?> snd,
835 Runnable fn,
836 CompletableFuture<Void> dst, Executor executor) {
837 this.src = src; this.snd = snd;
838 this.fn = fn; this.dst = dst;
839 this.executor = executor;
840 }
841 public final void run() {
842 final CompletableFuture<? extends T> a;
843 final CompletableFuture<?> b;
844 final Runnable fn;
845 final CompletableFuture<Void> dst;
846 Object r, s; Throwable ex;
847 if ((dst = this.dst) != null &&
848 (fn = this.fn) != null &&
849 (a = this.src) != null &&
850 (r = a.result) != null &&
851 (b = this.snd) != null &&
852 (s = b.result) != null &&
853 compareAndSet(0, 1)) {
854 if (r instanceof AltResult)
855 ex = ((AltResult)r).ex;
856 else
857 ex = null;
858 if (ex == null && (s instanceof AltResult))
859 ex = ((AltResult)s).ex;
860 Executor e = executor;
861 if (ex == null) {
862 try {
863 if (e != null)
864 e.execute(new AsyncRun(fn, dst));
865 else
866 fn.run();
867 } catch (Throwable rex) {
868 ex = rex;
869 }
870 }
871 if (e == null || ex != null)
872 dst.internalComplete(null, ex);
873 }
874 }
875 private static final long serialVersionUID = 5232453952276885070L;
876 }
877
878 static final class AndCompletion extends Completion {
879 final CompletableFuture<?> src;
880 final CompletableFuture<?> snd;
881 final CompletableFuture<Void> dst;
882 AndCompletion(CompletableFuture<?> src,
883 CompletableFuture<?> snd,
884 CompletableFuture<Void> dst) {
885 this.src = src; this.snd = snd; this.dst = dst;
886 }
887 public final void run() {
888 final CompletableFuture<?> a;
889 final CompletableFuture<?> b;
890 final CompletableFuture<Void> dst;
891 Object r, s; Throwable ex;
892 if ((dst = this.dst) != null &&
893 (a = this.src) != null &&
894 (r = a.result) != null &&
895 (b = this.snd) != null &&
896 (s = b.result) != null &&
897 compareAndSet(0, 1)) {
898 if (r instanceof AltResult)
899 ex = ((AltResult)r).ex;
900 else
901 ex = null;
902 if (ex == null && (s instanceof AltResult))
903 ex = ((AltResult)s).ex;
904 dst.internalComplete(null, ex);
905 }
906 }
907 private static final long serialVersionUID = 5232453952276885070L;
908 }
909
910 static final class OrApplyCompletion<T,U> extends Completion {
911 final CompletableFuture<? extends T> src;
912 final CompletableFuture<? extends T> snd;
913 final Function<? super T,? extends U> fn;
914 final CompletableFuture<U> dst;
915 final Executor executor;
916 OrApplyCompletion(CompletableFuture<? extends T> src,
917 CompletableFuture<? extends T> snd,
918 Function<? super T,? extends U> fn,
919 CompletableFuture<U> dst, Executor executor) {
920 this.src = src; this.snd = snd;
921 this.fn = fn; this.dst = dst;
922 this.executor = executor;
923 }
924 public final void run() {
925 final CompletableFuture<? extends T> a;
926 final CompletableFuture<? extends T> b;
927 final Function<? super T,? extends U> fn;
928 final CompletableFuture<U> dst;
929 Object r; T t; Throwable ex;
930 if ((dst = this.dst) != null &&
931 (fn = this.fn) != null &&
932 (((a = this.src) != null && (r = a.result) != null) ||
933 ((b = this.snd) != null && (r = b.result) != null)) &&
934 compareAndSet(0, 1)) {
935 if (r instanceof AltResult) {
936 ex = ((AltResult)r).ex;
937 t = null;
938 }
939 else {
940 ex = null;
941 @SuppressWarnings("unchecked") T tr = (T) r;
942 t = tr;
943 }
944 Executor e = executor;
945 U u = null;
946 if (ex == null) {
947 try {
948 if (e != null)
949 e.execute(new AsyncApply<T,U>(t, fn, dst));
950 else
951 u = fn.apply(t);
952 } catch (Throwable rex) {
953 ex = rex;
954 }
955 }
956 if (e == null || ex != null)
957 dst.internalComplete(u, ex);
958 }
959 }
960 private static final long serialVersionUID = 5232453952276885070L;
961 }
962
963 static final class OrAcceptCompletion<T> extends Completion {
964 final CompletableFuture<? extends T> src;
965 final CompletableFuture<? extends T> snd;
966 final Consumer<? super T> fn;
967 final CompletableFuture<Void> dst;
968 final Executor executor;
969 OrAcceptCompletion(CompletableFuture<? extends T> src,
970 CompletableFuture<? extends T> snd,
971 Consumer<? super T> fn,
972 CompletableFuture<Void> dst, Executor executor) {
973 this.src = src; this.snd = snd;
974 this.fn = fn; this.dst = dst;
975 this.executor = executor;
976 }
977 public final void run() {
978 final CompletableFuture<? extends T> a;
979 final CompletableFuture<? extends T> b;
980 final Consumer<? super T> fn;
981 final CompletableFuture<Void> dst;
982 Object r; T t; Throwable ex;
983 if ((dst = this.dst) != null &&
984 (fn = this.fn) != null &&
985 (((a = this.src) != null && (r = a.result) != null) ||
986 ((b = this.snd) != null && (r = b.result) != null)) &&
987 compareAndSet(0, 1)) {
988 if (r instanceof AltResult) {
989 ex = ((AltResult)r).ex;
990 t = null;
991 }
992 else {
993 ex = null;
994 @SuppressWarnings("unchecked") T tr = (T) r;
995 t = tr;
996 }
997 Executor e = executor;
998 if (ex == null) {
999 try {
1000 if (e != null)
1001 e.execute(new AsyncAccept<T>(t, fn, dst));
1002 else
1003 fn.accept(t);
1004 } catch (Throwable rex) {
1005 ex = rex;
1006 }
1007 }
1008 if (e == null || ex != null)
1009 dst.internalComplete(null, ex);
1010 }
1011 }
1012 private static final long serialVersionUID = 5232453952276885070L;
1013 }
1014
1015 static final class OrRunCompletion<T> extends Completion {
1016 final CompletableFuture<? extends T> src;
1017 final CompletableFuture<?> snd;
1018 final Runnable fn;
1019 final CompletableFuture<Void> dst;
1020 final Executor executor;
1021 OrRunCompletion(CompletableFuture<? extends T> src,
1022 CompletableFuture<?> snd,
1023 Runnable fn,
1024 CompletableFuture<Void> dst, Executor executor) {
1025 this.src = src; this.snd = snd;
1026 this.fn = fn; this.dst = dst;
1027 this.executor = executor;
1028 }
1029 public final void run() {
1030 final CompletableFuture<? extends T> a;
1031 final CompletableFuture<?> b;
1032 final Runnable fn;
1033 final CompletableFuture<Void> dst;
1034 Object r; Throwable ex;
1035 if ((dst = this.dst) != null &&
1036 (fn = this.fn) != null &&
1037 (((a = this.src) != null && (r = a.result) != null) ||
1038 ((b = this.snd) != null && (r = b.result) != null)) &&
1039 compareAndSet(0, 1)) {
1040 if (r instanceof AltResult)
1041 ex = ((AltResult)r).ex;
1042 else
1043 ex = null;
1044 Executor e = executor;
1045 if (ex == null) {
1046 try {
1047 if (e != null)
1048 e.execute(new AsyncRun(fn, dst));
1049 else
1050 fn.run();
1051 } catch (Throwable rex) {
1052 ex = rex;
1053 }
1054 }
1055 if (e == null || ex != null)
1056 dst.internalComplete(null, ex);
1057 }
1058 }
1059 private static final long serialVersionUID = 5232453952276885070L;
1060 }
1061
1062 static final class OrCompletion extends Completion {
1063 final CompletableFuture<?> src;
1064 final CompletableFuture<?> snd;
1065 final CompletableFuture<?> dst;
1066 OrCompletion(CompletableFuture<?> src,
1067 CompletableFuture<?> snd,
1068 CompletableFuture<?> dst) {
1069 this.src = src; this.snd = snd; this.dst = dst;
1070 }
1071 public final void run() {
1072 final CompletableFuture<?> a;
1073 final CompletableFuture<?> b;
1074 final CompletableFuture<?> dst;
1075 Object r, t; Throwable ex;
1076 if ((dst = this.dst) != null &&
1077 (((a = this.src) != null && (r = a.result) != null) ||
1078 ((b = this.snd) != null && (r = b.result) != null)) &&
1079 compareAndSet(0, 1)) {
1080 if (r instanceof AltResult) {
1081 ex = ((AltResult)r).ex;
1082 t = null;
1083 }
1084 else {
1085 ex = null;
1086 t = r;
1087 }
1088 dst.internalComplete(t, ex);
1089 }
1090 }
1091 private static final long serialVersionUID = 5232453952276885070L;
1092 }
1093
1094 static final class ExceptionCompletion<T> extends Completion {
1095 final CompletableFuture<? extends T> src;
1096 final Function<? super Throwable, ? extends T> fn;
1097 final CompletableFuture<T> dst;
1098 ExceptionCompletion(CompletableFuture<? extends T> src,
1099 Function<? super Throwable, ? extends T> fn,
1100 CompletableFuture<T> dst) {
1101 this.src = src; this.fn = fn; this.dst = dst;
1102 }
1103 public final void run() {
1104 final CompletableFuture<? extends T> a;
1105 final Function<? super Throwable, ? extends T> fn;
1106 final CompletableFuture<T> dst;
1107 Object r; T t = null; Throwable ex, dx = null;
1108 if ((dst = this.dst) != null &&
1109 (fn = this.fn) != null &&
1110 (a = this.src) != null &&
1111 (r = a.result) != null &&
1112 compareAndSet(0, 1)) {
1113 if ((r instanceof AltResult) &&
1114 (ex = ((AltResult)r).ex) != null) {
1115 try {
1116 t = fn.apply(ex);
1117 } catch (Throwable rex) {
1118 dx = rex;
1119 }
1120 }
1121 else {
1122 @SuppressWarnings("unchecked") T tr = (T) r;
1123 t = tr;
1124 }
1125 dst.internalComplete(t, dx);
1126 }
1127 }
1128 private static final long serialVersionUID = 5232453952276885070L;
1129 }
1130
1131 static final class ThenCopy extends Completion {
1132 final CompletableFuture<?> src;
1133 final CompletableFuture<?> dst;
1134 ThenCopy(CompletableFuture<?> src,
1135 CompletableFuture<?> dst) {
1136 this.src = src; this.dst = dst;
1137 }
1138 public final void run() {
1139 final CompletableFuture<?> a;
1140 final CompletableFuture<?> dst;
1141 Object r; Object t; Throwable ex;
1142 if ((dst = this.dst) != null &&
1143 (a = this.src) != null &&
1144 (r = a.result) != null &&
1145 compareAndSet(0, 1)) {
1146 if (r instanceof AltResult) {
1147 ex = ((AltResult)r).ex;
1148 t = null;
1149 }
1150 else {
1151 ex = null;
1152 t = r;
1153 }
1154 dst.internalComplete(t, ex);
1155 }
1156 }
1157 private static final long serialVersionUID = 5232453952276885070L;
1158 }
1159
1160 static final class HandleCompletion<T,U> extends Completion {
1161 final CompletableFuture<? extends T> src;
1162 final BiFunction<? super T, Throwable, ? extends U> fn;
1163 final CompletableFuture<U> dst;
1164 HandleCompletion(CompletableFuture<? extends T> src,
1165 BiFunction<? super T, Throwable, ? extends U> fn,
1166 final CompletableFuture<U> dst) {
1167 this.src = src; this.fn = fn; this.dst = dst;
1168 }
1169 public final void run() {
1170 final CompletableFuture<? extends T> a;
1171 final BiFunction<? super T, Throwable, ? extends U> fn;
1172 final CompletableFuture<U> dst;
1173 Object r; T t; Throwable ex;
1174 if ((dst = this.dst) != null &&
1175 (fn = this.fn) != null &&
1176 (a = this.src) != null &&
1177 (r = a.result) != null &&
1178 compareAndSet(0, 1)) {
1179 if (r instanceof AltResult) {
1180 ex = ((AltResult)r).ex;
1181 t = null;
1182 }
1183 else {
1184 ex = null;
1185 @SuppressWarnings("unchecked") T tr = (T) r;
1186 t = tr;
1187 }
1188 U u = null; Throwable dx = null;
1189 try {
1190 u = fn.apply(t, ex);
1191 } catch (Throwable rex) {
1192 dx = rex;
1193 }
1194 dst.internalComplete(u, dx);
1195 }
1196 }
1197 private static final long serialVersionUID = 5232453952276885070L;
1198 }
1199
1200 static final class ComposeCompletion<T,U> extends Completion {
1201 final CompletableFuture<? extends T> src;
1202 final Function<? super T, CompletableFuture<U>> fn;
1203 final CompletableFuture<U> dst;
1204 ComposeCompletion(CompletableFuture<? extends T> src,
1205 Function<? super T, CompletableFuture<U>> fn,
1206 final CompletableFuture<U> dst) {
1207 this.src = src; this.fn = fn; this.dst = dst;
1208 }
1209 public final void run() {
1210 final CompletableFuture<? extends T> a;
1211 final Function<? super T, CompletableFuture<U>> fn;
1212 final CompletableFuture<U> dst;
1213 Object r; T t; Throwable ex;
1214 if ((dst = this.dst) != null &&
1215 (fn = this.fn) != null &&
1216 (a = this.src) != null &&
1217 (r = a.result) != null &&
1218 compareAndSet(0, 1)) {
1219 if (r instanceof AltResult) {
1220 ex = ((AltResult)r).ex;
1221 t = null;
1222 }
1223 else {
1224 ex = null;
1225 @SuppressWarnings("unchecked") T tr = (T) r;
1226 t = tr;
1227 }
1228 CompletableFuture<U> c = null;
1229 U u = null;
1230 boolean complete = false;
1231 if (ex == null) {
1232 try {
1233 c = fn.apply(t);
1234 } catch (Throwable rex) {
1235 ex = rex;
1236 }
1237 }
1238 if (ex != null || c == null) {
1239 if (ex == null)
1240 ex = new NullPointerException();
1241 }
1242 else {
1243 ThenCopy d = null;
1244 Object s;
1245 if ((s = c.result) == null) {
1246 CompletionNode p = new CompletionNode
1247 (d = new ThenCopy(c, dst));
1248 while ((s = c.result) == null) {
1249 if (UNSAFE.compareAndSwapObject
1250 (c, COMPLETIONS, p.next = c.completions, p))
1251 break;
1252 }
1253 }
1254 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1255 complete = true;
1256 if (s instanceof AltResult) {
1257 ex = ((AltResult)s).ex; // no rewrap
1258 u = null;
1259 }
1260 else {
1261 @SuppressWarnings("unchecked") U us = (U) s;
1262 u = us;
1263 }
1264 }
1265 }
1266 if (complete || ex != null)
1267 dst.internalComplete(u, ex);
1268 if (c != null)
1269 c.helpPostComplete();
1270 }
1271 }
1272 private static final long serialVersionUID = 5232453952276885070L;
1273 }
1274
1275 // public methods
1276
1277 /**
1278 * Creates a new incomplete CompletableFuture.
1279 */
1280 public CompletableFuture() {
1281 }
1282
1283 /**
1284 * Asynchronously executes in the {@link
1285 * ForkJoinPool#commonPool()}, a task that completes the returned
1286 * CompletableFuture with the result of the given Supplier.
1287 *
1288 * @param supplier a function returning the value to be used
1289 * to complete the returned CompletableFuture
1290 * @return the CompletableFuture
1291 */
1292 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1293 if (supplier == null) throw new NullPointerException();
1294 CompletableFuture<U> f = new CompletableFuture<U>();
1295 ForkJoinPool.commonPool().
1296 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1297 return f;
1298 }
1299
1300 /**
1301 * Asynchronously executes using the given executor, a task that
1302 * completes the returned CompletableFuture with the result of the
1303 * given Supplier.
1304 *
1305 * @param supplier a function returning the value to be used
1306 * to complete the returned CompletableFuture
1307 * @param executor the executor to use for asynchronous execution
1308 * @return the CompletableFuture
1309 */
1310 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1311 Executor executor) {
1312 if (executor == null || supplier == null)
1313 throw new NullPointerException();
1314 CompletableFuture<U> f = new CompletableFuture<U>();
1315 executor.execute(new AsyncSupply<U>(supplier, f));
1316 return f;
1317 }
1318
1319 /**
1320 * Asynchronously executes in the {@link
1321 * ForkJoinPool#commonPool()} a task that runs the given action,
1322 * and then completes the returned CompletableFuture.
1323 *
1324 * @param runnable the action to run before completing the
1325 * returned CompletableFuture
1326 * @return the CompletableFuture
1327 */
1328 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1329 if (runnable == null) throw new NullPointerException();
1330 CompletableFuture<Void> f = new CompletableFuture<Void>();
1331 ForkJoinPool.commonPool().
1332 execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1333 return f;
1334 }
1335
1336 /**
1337 * Asynchronously executes using the given executor, a task that
1338 * runs the given action, and then completes the returned
1339 * CompletableFuture.
1340 *
1341 * @param runnable the action to run before completing the
1342 * returned CompletableFuture
1343 * @param executor the executor to use for asynchronous execution
1344 * @return the CompletableFuture
1345 */
1346 public static CompletableFuture<Void> runAsync(Runnable runnable,
1347 Executor executor) {
1348 if (executor == null || runnable == null)
1349 throw new NullPointerException();
1350 CompletableFuture<Void> f = new CompletableFuture<Void>();
1351 executor.execute(new AsyncRun(runnable, f));
1352 return f;
1353 }
1354
1355 /**
1356 * Returns {@code true} if completed in any fashion: normally,
1357 * exceptionally, or via cancellation.
1358 *
1359 * @return {@code true} if completed
1360 */
1361 public boolean isDone() {
1362 return result != null;
1363 }
1364
1365 /**
1366 * Waits if necessary for the computation to complete, and then
1367 * retrieves its result.
1368 *
1369 * @return the computed result
1370 * @throws CancellationException if the computation was cancelled
1371 * @throws ExecutionException if the computation threw an
1372 * exception
1373 * @throws InterruptedException if the current thread was interrupted
1374 * while waiting
1375 */
1376 public T get() throws InterruptedException, ExecutionException {
1377 Object r; Throwable ex, cause;
1378 if ((r = result) == null && (r = waitingGet(true)) == null)
1379 throw new InterruptedException();
1380 if (r instanceof AltResult) {
1381 if ((ex = ((AltResult)r).ex) != null) {
1382 if (ex instanceof CancellationException)
1383 throw (CancellationException)ex;
1384 if ((ex instanceof CompletionException) &&
1385 (cause = ex.getCause()) != null)
1386 ex = cause;
1387 throw new ExecutionException(ex);
1388 }
1389 return null;
1390 }
1391 @SuppressWarnings("unchecked") T tr = (T) r;
1392 return tr;
1393 }
1394
1395 /**
1396 * Waits if necessary for at most the given time for completion,
1397 * and then retrieves its result, if available.
1398 *
1399 * @param timeout the maximum time to wait
1400 * @param unit the time unit of the timeout argument
1401 * @return the computed result
1402 * @throws CancellationException if the computation was cancelled
1403 * @throws ExecutionException if the computation threw an
1404 * exception
1405 * @throws InterruptedException if the current thread was interrupted
1406 * while waiting
1407 * @throws TimeoutException if the wait timed out
1408 */
1409 public T get(long timeout, TimeUnit unit)
1410 throws InterruptedException, ExecutionException, TimeoutException {
1411 Object r; Throwable ex, cause;
1412 long nanos = unit.toNanos(timeout);
1413 if (Thread.interrupted())
1414 throw new InterruptedException();
1415 if ((r = result) == null)
1416 r = timedAwaitDone(nanos);
1417 if (r instanceof AltResult) {
1418 if ((ex = ((AltResult)r).ex) != null) {
1419 if (ex instanceof CancellationException)
1420 throw (CancellationException)ex;
1421 if ((ex instanceof CompletionException) &&
1422 (cause = ex.getCause()) != null)
1423 ex = cause;
1424 throw new ExecutionException(ex);
1425 }
1426 return null;
1427 }
1428 @SuppressWarnings("unchecked") T tr = (T) r;
1429 return tr;
1430 }
1431
1432 /**
1433 * Returns the result value when complete, or throws an
1434 * (unchecked) exception if completed exceptionally. To better
1435 * conform with the use of common functional forms, if a
1436 * computation involved in the completion of this
1437 * CompletableFuture threw an exception, this method throws an
1438 * (unchecked) {@link CompletionException} with the underlying
1439 * exception as its cause.
1440 *
1441 * @return the result value
1442 * @throws CancellationException if the computation was cancelled
1443 * @throws CompletionException if a completion computation threw
1444 * an exception
1445 */
1446 public T join() {
1447 Object r; Throwable ex;
1448 if ((r = result) == null)
1449 r = waitingGet(false);
1450 if (r instanceof AltResult) {
1451 if ((ex = ((AltResult)r).ex) != null) {
1452 if (ex instanceof CancellationException)
1453 throw (CancellationException)ex;
1454 if (ex instanceof CompletionException)
1455 throw (CompletionException)ex;
1456 throw new CompletionException(ex);
1457 }
1458 return null;
1459 }
1460 @SuppressWarnings("unchecked") T tr = (T) r;
1461 return tr;
1462 }
1463
1464 /**
1465 * Returns the result value (or throws any encountered exception)
1466 * if completed, else returns the given valueIfAbsent.
1467 *
1468 * @param valueIfAbsent the value to return if not completed
1469 * @return the result value, if completed, else the given valueIfAbsent
1470 * @throws CancellationException if the computation was cancelled
1471 * @throws CompletionException if a completion computation threw
1472 * an exception
1473 */
1474 public T getNow(T valueIfAbsent) {
1475 Object r; Throwable ex;
1476 if ((r = result) == null)
1477 return valueIfAbsent;
1478 if (r instanceof AltResult) {
1479 if ((ex = ((AltResult)r).ex) != null) {
1480 if (ex instanceof CancellationException)
1481 throw (CancellationException)ex;
1482 if (ex instanceof CompletionException)
1483 throw (CompletionException)ex;
1484 throw new CompletionException(ex);
1485 }
1486 return null;
1487 }
1488 @SuppressWarnings("unchecked") T tr = (T) r;
1489 return tr;
1490 }
1491
1492 /**
1493 * If not already completed, sets the value returned by {@link
1494 * #get()} and related methods to the given value.
1495 *
1496 * @param value the result value
1497 * @return {@code true} if this invocation caused this CompletableFuture
1498 * to transition to a completed state, else {@code false}
1499 */
1500 public boolean complete(T value) {
1501 boolean triggered = result == null &&
1502 UNSAFE.compareAndSwapObject(this, RESULT, null,
1503 value == null ? NIL : value);
1504 postComplete();
1505 return triggered;
1506 }
1507
1508 /**
1509 * If not already completed, causes invocations of {@link #get()}
1510 * and related methods to throw the given exception.
1511 *
1512 * @param ex the exception
1513 * @return {@code true} if this invocation caused this CompletableFuture
1514 * to transition to a completed state, else {@code false}
1515 */
1516 public boolean completeExceptionally(Throwable ex) {
1517 if (ex == null) throw new NullPointerException();
1518 boolean triggered = result == null &&
1519 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1520 postComplete();
1521 return triggered;
1522 }
1523
1524 /**
1525 * Creates and returns a CompletableFuture that is completed with
1526 * the result of the given function of this CompletableFuture.
1527 * If this CompletableFuture completes exceptionally,
1528 * then the returned CompletableFuture also does so,
1529 * with a CompletionException holding this exception as
1530 * its cause.
1531 *
1532 * @param fn the function to use to compute the value of
1533 * the returned CompletableFuture
1534 * @return the new CompletableFuture
1535 */
1536 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
1537 return doThenApply(fn, null);
1538 }
1539
1540 /**
1541 * Creates and returns a CompletableFuture that is asynchronously
1542 * completed using the {@link ForkJoinPool#commonPool()} with the
1543 * result of the given function of this CompletableFuture. If
1544 * this CompletableFuture completes exceptionally, then the
1545 * returned CompletableFuture also does so, with a
1546 * CompletionException holding this exception as its cause.
1547 *
1548 * @param fn the function to use to compute the value of
1549 * the returned CompletableFuture
1550 * @return the new CompletableFuture
1551 */
1552 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {
1553 return doThenApply(fn, ForkJoinPool.commonPool());
1554 }
1555
1556 /**
1557 * Creates and returns a CompletableFuture that is asynchronously
1558 * completed using the given executor with the result of the given
1559 * function of this CompletableFuture. If this CompletableFuture
1560 * completes exceptionally, then the returned CompletableFuture
1561 * also does so, with a CompletionException holding this exception as
1562 * its cause.
1563 *
1564 * @param fn the function to use to compute the value of
1565 * the returned CompletableFuture
1566 * @param executor the executor to use for asynchronous execution
1567 * @return the new CompletableFuture
1568 */
1569 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,
1570 Executor executor) {
1571 if (executor == null) throw new NullPointerException();
1572 return doThenApply(fn, executor);
1573 }
1574
1575 private <U> CompletableFuture<U> doThenApply(Function<? super T,? extends U> fn,
1576 Executor e) {
1577 if (fn == null) throw new NullPointerException();
1578 CompletableFuture<U> dst = new CompletableFuture<U>();
1579 ApplyCompletion<T,U> d = null;
1580 Object r;
1581 if ((r = result) == null) {
1582 CompletionNode p = new CompletionNode
1583 (d = new ApplyCompletion<T,U>(this, fn, dst, e));
1584 while ((r = result) == null) {
1585 if (UNSAFE.compareAndSwapObject
1586 (this, COMPLETIONS, p.next = completions, p))
1587 break;
1588 }
1589 }
1590 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1591 T t; Throwable ex;
1592 if (r instanceof AltResult) {
1593 ex = ((AltResult)r).ex;
1594 t = null;
1595 }
1596 else {
1597 ex = null;
1598 @SuppressWarnings("unchecked") T tr = (T) r;
1599 t = tr;
1600 }
1601 U u = null;
1602 if (ex == null) {
1603 try {
1604 if (e != null)
1605 e.execute(new AsyncApply<T,U>(t, fn, dst));
1606 else
1607 u = fn.apply(t);
1608 } catch (Throwable rex) {
1609 ex = rex;
1610 }
1611 }
1612 if (e == null || ex != null)
1613 dst.internalComplete(u, ex);
1614 }
1615 helpPostComplete();
1616 return dst;
1617 }
1618
1619 /**
1620 * Creates and returns a CompletableFuture that is completed after
1621 * performing the given action with this CompletableFuture's
1622 * result when it completes. If this CompletableFuture
1623 * completes exceptionally, then the returned CompletableFuture
1624 * also does so, with a CompletionException holding this exception as
1625 * its cause.
1626 *
1627 * @param block the action to perform before completing the
1628 * returned CompletableFuture
1629 * @return the new CompletableFuture
1630 */
1631 public CompletableFuture<Void> thenAccept(Consumer<? super T> block) {
1632 return doThenAccept(block, null);
1633 }
1634
1635 /**
1636 * Creates and returns a CompletableFuture that is asynchronously
1637 * completed using the {@link ForkJoinPool#commonPool()} with this
1638 * CompletableFuture's result when it completes. If this
1639 * CompletableFuture completes exceptionally, then the returned
1640 * CompletableFuture also does so, with a CompletionException holding
1641 * this exception as its cause.
1642 *
1643 * @param block the action to perform before completing the
1644 * returned CompletableFuture
1645 * @return the new CompletableFuture
1646 */
1647 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block) {
1648 return doThenAccept(block, ForkJoinPool.commonPool());
1649 }
1650
1651 /**
1652 * Creates and returns a CompletableFuture that is asynchronously
1653 * completed using the given executor with this
1654 * CompletableFuture's result when it completes. If this
1655 * CompletableFuture completes exceptionally, then the returned
1656 * CompletableFuture also does so, with a CompletionException holding
1657 * this exception as its cause.
1658 *
1659 * @param block the action to perform before completing the
1660 * returned CompletableFuture
1661 * @param executor the executor to use for asynchronous execution
1662 * @return the new CompletableFuture
1663 */
1664 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block,
1665 Executor executor) {
1666 if (executor == null) throw new NullPointerException();
1667 return doThenAccept(block, executor);
1668 }
1669
1670 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1671 Executor e) {
1672 if (fn == null) throw new NullPointerException();
1673 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1674 AcceptCompletion<T> d = null;
1675 Object r;
1676 if ((r = result) == null) {
1677 CompletionNode p = new CompletionNode
1678 (d = new AcceptCompletion<T>(this, fn, dst, e));
1679 while ((r = result) == null) {
1680 if (UNSAFE.compareAndSwapObject
1681 (this, COMPLETIONS, p.next = completions, p))
1682 break;
1683 }
1684 }
1685 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1686 T t; Throwable ex;
1687 if (r instanceof AltResult) {
1688 ex = ((AltResult)r).ex;
1689 t = null;
1690 }
1691 else {
1692 ex = null;
1693 @SuppressWarnings("unchecked") T tr = (T) r;
1694 t = tr;
1695 }
1696 if (ex == null) {
1697 try {
1698 if (e != null)
1699 e.execute(new AsyncAccept<T>(t, fn, dst));
1700 else
1701 fn.accept(t);
1702 } catch (Throwable rex) {
1703 ex = rex;
1704 }
1705 }
1706 if (e == null || ex != null)
1707 dst.internalComplete(null, ex);
1708 }
1709 helpPostComplete();
1710 return dst;
1711 }
1712
1713 /**
1714 * Creates and returns a CompletableFuture that is completed after
1715 * performing the given action when this CompletableFuture
1716 * completes. If this CompletableFuture completes exceptionally,
1717 * then the returned CompletableFuture also does so, with a
1718 * CompletionException holding this exception as its cause.
1719 *
1720 * @param action the action to perform before completing the
1721 * returned CompletableFuture
1722 * @return the new CompletableFuture
1723 */
1724 public CompletableFuture<Void> thenRun(Runnable action) {
1725 return doThenRun(action, null);
1726 }
1727
1728 /**
1729 * Creates and returns a CompletableFuture that is asynchronously
1730 * completed using the {@link ForkJoinPool#commonPool()} after
1731 * performing the given action when this CompletableFuture
1732 * completes. If this CompletableFuture completes exceptionally,
1733 * then the returned CompletableFuture also does so, with a
1734 * CompletionException holding this exception as its cause.
1735 *
1736 * @param action the action to perform before completing the
1737 * returned CompletableFuture
1738 * @return the new CompletableFuture
1739 */
1740 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1741 return doThenRun(action, ForkJoinPool.commonPool());
1742 }
1743
1744 /**
1745 * Creates and returns a CompletableFuture that is asynchronously
1746 * completed using the given executor after performing the given
1747 * action when this CompletableFuture completes. If this
1748 * CompletableFuture completes exceptionally, then the returned
1749 * CompletableFuture also does so, with a CompletionException holding
1750 * this exception as its cause.
1751 *
1752 * @param action the action to perform before completing the
1753 * returned CompletableFuture
1754 * @param executor the executor to use for asynchronous execution
1755 * @return the new CompletableFuture
1756 */
1757 public CompletableFuture<Void> thenRunAsync(Runnable action,
1758 Executor executor) {
1759 if (executor == null) throw new NullPointerException();
1760 return doThenRun(action, executor);
1761 }
1762
1763 private CompletableFuture<Void> doThenRun(Runnable action,
1764 Executor e) {
1765 if (action == null) throw new NullPointerException();
1766 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1767 RunCompletion<T> d = null;
1768 Object r;
1769 if ((r = result) == null) {
1770 CompletionNode p = new CompletionNode
1771 (d = new RunCompletion<T>(this, action, dst, e));
1772 while ((r = result) == null) {
1773 if (UNSAFE.compareAndSwapObject
1774 (this, COMPLETIONS, p.next = completions, p))
1775 break;
1776 }
1777 }
1778 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1779 Throwable ex;
1780 if (r instanceof AltResult)
1781 ex = ((AltResult)r).ex;
1782 else
1783 ex = null;
1784 if (ex == null) {
1785 try {
1786 if (e != null)
1787 e.execute(new AsyncRun(action, dst));
1788 else
1789 action.run();
1790 } catch (Throwable rex) {
1791 ex = rex;
1792 }
1793 }
1794 if (e == null || ex != null)
1795 dst.internalComplete(null, ex);
1796 }
1797 helpPostComplete();
1798 return dst;
1799 }
1800
1801 /**
1802 * Creates and returns a CompletableFuture that is completed with
1803 * the result of the given function of this and the other given
1804 * CompletableFuture's results when both complete. If this or
1805 * the other CompletableFuture complete exceptionally, then the
1806 * returned CompletableFuture also does so, with a
1807 * CompletionException holding the exception as its cause.
1808 *
1809 * @param other the other CompletableFuture
1810 * @param fn the function to use to compute the value of
1811 * the returned CompletableFuture
1812 * @return the new CompletableFuture
1813 */
1814 public <U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other,
1815 BiFunction<? super T,? super U,? extends V> fn) {
1816 return doThenBiApply(other, fn, null);
1817 }
1818
1819 /**
1820 * Creates and returns a CompletableFuture that is asynchronously
1821 * completed using the {@link ForkJoinPool#commonPool()} with
1822 * the result of the given function of this and the other given
1823 * CompletableFuture's results when both complete. If this or
1824 * the other CompletableFuture complete exceptionally, then the
1825 * returned CompletableFuture also does so, with a
1826 * CompletionException holding the exception as its cause.
1827 *
1828 * @param other the other CompletableFuture
1829 * @param fn the function to use to compute the value of
1830 * the returned CompletableFuture
1831 * @return the new CompletableFuture
1832 */
1833 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
1834 BiFunction<? super T,? super U,? extends V> fn) {
1835 return doThenBiApply(other, fn, ForkJoinPool.commonPool());
1836 }
1837
1838 /**
1839 * Creates and returns a CompletableFuture that is
1840 * asynchronously completed using the given executor with the
1841 * result of the given function of this and the other given
1842 * CompletableFuture's results when both complete. If this or
1843 * the other CompletableFuture complete exceptionally, then the
1844 * returned CompletableFuture also does so, with a
1845 * CompletionException holding the exception as its cause.
1846 *
1847 * @param other the other CompletableFuture
1848 * @param fn the function to use to compute the value of
1849 * the returned CompletableFuture
1850 * @param executor the executor to use for asynchronous execution
1851 * @return the new CompletableFuture
1852 */
1853
1854 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
1855 BiFunction<? super T,? super U,? extends V> fn,
1856 Executor executor) {
1857 if (executor == null) throw new NullPointerException();
1858 return doThenBiApply(other, fn, executor);
1859 }
1860
1861 private <U,V> CompletableFuture<V> doThenBiApply(CompletableFuture<? extends U> other,
1862 BiFunction<? super T,? super U,? extends V> fn,
1863 Executor e) {
1864 if (other == null || fn == null) throw new NullPointerException();
1865 CompletableFuture<V> dst = new CompletableFuture<V>();
1866 BiApplyCompletion<T,U,V> d = null;
1867 Object r, s = null;
1868 if ((r = result) == null || (s = other.result) == null) {
1869 d = new BiApplyCompletion<T,U,V>(this, other, fn, dst, e);
1870 CompletionNode q = null, p = new CompletionNode(d);
1871 while ((r == null && (r = result) == null) ||
1872 (s == null && (s = other.result) == null)) {
1873 if (q != null) {
1874 if (s != null ||
1875 UNSAFE.compareAndSwapObject
1876 (other, COMPLETIONS, q.next = other.completions, q))
1877 break;
1878 }
1879 else if (r != null ||
1880 UNSAFE.compareAndSwapObject
1881 (this, COMPLETIONS, p.next = completions, p)) {
1882 if (s != null)
1883 break;
1884 q = new CompletionNode(d);
1885 }
1886 }
1887 }
1888 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1889 T t; U u; Throwable ex;
1890 if (r instanceof AltResult) {
1891 ex = ((AltResult)r).ex;
1892 t = null;
1893 }
1894 else {
1895 ex = null;
1896 @SuppressWarnings("unchecked") T tr = (T) r;
1897 t = tr;
1898 }
1899 if (ex != null)
1900 u = null;
1901 else if (s instanceof AltResult) {
1902 ex = ((AltResult)s).ex;
1903 u = null;
1904 }
1905 else {
1906 @SuppressWarnings("unchecked") U us = (U) s;
1907 u = us;
1908 }
1909 V v = null;
1910 if (ex == null) {
1911 try {
1912 if (e != null)
1913 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
1914 else
1915 v = fn.apply(t, u);
1916 } catch (Throwable rex) {
1917 ex = rex;
1918 }
1919 }
1920 if (e == null || ex != null)
1921 dst.internalComplete(v, ex);
1922 }
1923 helpPostComplete();
1924 other.helpPostComplete();
1925 return dst;
1926 }
1927
1928 /**
1929 * Creates and returns a CompletableFuture that is completed with
1930 * the results of this and the other given CompletableFuture if
1931 * both complete. If this and/or the other CompletableFuture
1932 * complete exceptionally, then the returned CompletableFuture
1933 * also does so, with a CompletionException holding one of these
1934 * exceptions as its cause.
1935 *
1936 * @param other the other CompletableFuture
1937 * @param block the action to perform before completing the
1938 * returned CompletableFuture
1939 * @return the new CompletableFuture
1940 */
1941 public <U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other,
1942 BiConsumer<? super T, ? super U> block) {
1943 return doThenBiAccept(other, block, null);
1944 }
1945
1946 /**
1947 * Creates and returns a CompletableFuture that is completed
1948 * asynchronously using the {@link ForkJoinPool#commonPool()} with
1949 * the results of this and the other given CompletableFuture when
1950 * both complete. If this and/or the other CompletableFuture
1951 * complete exceptionally, then the returned CompletableFuture
1952 * also does so, with a CompletionException holding one of these
1953 * exceptions as its cause.
1954 *
1955 * @param other the other CompletableFuture
1956 * @param block the action to perform before completing the
1957 * returned CompletableFuture
1958 * @return the new CompletableFuture
1959 */
1960 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
1961 BiConsumer<? super T, ? super U> block) {
1962 return doThenBiAccept(other, block, ForkJoinPool.commonPool());
1963 }
1964
1965 /**
1966 * Creates and returns a CompletableFuture that is completed
1967 * asynchronously using the given executor with the results of
1968 * this and the other given CompletableFuture when both complete.
1969 * If this and/or the other CompletableFuture complete
1970 * exceptionally, then the returned CompletableFuture also does
1971 * so, with a CompletionException holding one of these exceptions as
1972 * its cause.
1973 *
1974 * @param other the other CompletableFuture
1975 * @param block the action to perform before completing the
1976 * returned CompletableFuture
1977 * @param executor the executor to use for asynchronous execution
1978 * @return the new CompletableFuture
1979 */
1980 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
1981 BiConsumer<? super T, ? super U> block,
1982 Executor executor) {
1983 if (executor == null) throw new NullPointerException();
1984 return doThenBiAccept(other, block, executor);
1985 }
1986
1987 private <U> CompletableFuture<Void> doThenBiAccept(CompletableFuture<? extends U> other,
1988 BiConsumer<? super T,? super U> fn,
1989 Executor e) {
1990 if (other == null || fn == null) throw new NullPointerException();
1991 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1992 BiAcceptCompletion<T,U> d = null;
1993 Object r, s = null;
1994 if ((r = result) == null || (s = other.result) == null) {
1995 d = new BiAcceptCompletion<T,U>(this, other, fn, dst, e);
1996 CompletionNode q = null, p = new CompletionNode(d);
1997 while ((r == null && (r = result) == null) ||
1998 (s == null && (s = other.result) == null)) {
1999 if (q != null) {
2000 if (s != null ||
2001 UNSAFE.compareAndSwapObject
2002 (other, COMPLETIONS, q.next = other.completions, q))
2003 break;
2004 }
2005 else if (r != null ||
2006 UNSAFE.compareAndSwapObject
2007 (this, COMPLETIONS, p.next = completions, p)) {
2008 if (s != null)
2009 break;
2010 q = new CompletionNode(d);
2011 }
2012 }
2013 }
2014 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2015 T t; U u; Throwable ex;
2016 if (r instanceof AltResult) {
2017 ex = ((AltResult)r).ex;
2018 t = null;
2019 }
2020 else {
2021 ex = null;
2022 @SuppressWarnings("unchecked") T tr = (T) r;
2023 t = tr;
2024 }
2025 if (ex != null)
2026 u = null;
2027 else if (s instanceof AltResult) {
2028 ex = ((AltResult)s).ex;
2029 u = null;
2030 }
2031 else {
2032 @SuppressWarnings("unchecked") U us = (U) s;
2033 u = us;
2034 }
2035 if (ex == null) {
2036 try {
2037 if (e != null)
2038 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
2039 else
2040 fn.accept(t, u);
2041 } catch (Throwable rex) {
2042 ex = rex;
2043 }
2044 }
2045 if (e == null || ex != null)
2046 dst.internalComplete(null, ex);
2047 }
2048 helpPostComplete();
2049 other.helpPostComplete();
2050 return dst;
2051 }
2052
2053 /**
2054 * Creates and returns a CompletableFuture that is completed
2055 * when this and the other given CompletableFuture both
2056 * complete. If this and/or the other CompletableFuture complete
2057 * exceptionally, then the returned CompletableFuture also does
2058 * so, with a CompletionException holding one of these exceptions as
2059 * its cause.
2060 *
2061 * @param other the other CompletableFuture
2062 * @param action the action to perform before completing the
2063 * returned CompletableFuture
2064 * @return the new CompletableFuture
2065 */
2066 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2067 Runnable action) {
2068 return doThenBiRun(other, action, null);
2069 }
2070
2071 /**
2072 * Creates and returns a CompletableFuture that is completed
2073 * asynchronously using the {@link ForkJoinPool#commonPool()}
2074 * when this and the other given CompletableFuture both
2075 * complete. If this and/or the other CompletableFuture complete
2076 * exceptionally, then the returned CompletableFuture also does
2077 * so, with a CompletionException holding one of these exceptions as
2078 * its cause.
2079 *
2080 * @param other the other CompletableFuture
2081 * @param action the action to perform before completing the
2082 * returned CompletableFuture
2083 * @return the new CompletableFuture
2084 */
2085 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2086 Runnable action) {
2087 return doThenBiRun(other, action, ForkJoinPool.commonPool());
2088 }
2089
2090 /**
2091 * Creates and returns a CompletableFuture that is completed
2092 * asynchronously using the given executor
2093 * when this and the other given CompletableFuture both
2094 * complete. If this and/or the other CompletableFuture complete
2095 * exceptionally, then the returned CompletableFuture also does
2096 * so, with a CompletionException holding one of these exceptions as
2097 * its cause.
2098 *
2099 * @param other the other CompletableFuture
2100 * @param action the action to perform before completing the
2101 * returned CompletableFuture
2102 * @param executor the executor to use for asynchronous execution
2103 * @return the new CompletableFuture
2104 */
2105 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2106 Runnable action,
2107 Executor executor) {
2108 if (executor == null) throw new NullPointerException();
2109 return doThenBiRun(other, action, executor);
2110 }
2111
2112 private CompletableFuture<Void> doThenBiRun(CompletableFuture<?> other,
2113 Runnable action,
2114 Executor e) {
2115 if (other == null || action == null) throw new NullPointerException();
2116 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2117 BiRunCompletion<T> d = null;
2118 Object r, s = null;
2119 if ((r = result) == null || (s = other.result) == null) {
2120 d = new BiRunCompletion<T>(this, other, action, dst, e);
2121 CompletionNode q = null, p = new CompletionNode(d);
2122 while ((r == null && (r = result) == null) ||
2123 (s == null && (s = other.result) == null)) {
2124 if (q != null) {
2125 if (s != null ||
2126 UNSAFE.compareAndSwapObject
2127 (other, COMPLETIONS, q.next = other.completions, q))
2128 break;
2129 }
2130 else if (r != null ||
2131 UNSAFE.compareAndSwapObject
2132 (this, COMPLETIONS, p.next = completions, p)) {
2133 if (s != null)
2134 break;
2135 q = new CompletionNode(d);
2136 }
2137 }
2138 }
2139 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2140 Throwable ex;
2141 if (r instanceof AltResult)
2142 ex = ((AltResult)r).ex;
2143 else
2144 ex = null;
2145 if (ex == null && (s instanceof AltResult))
2146 ex = ((AltResult)s).ex;
2147 if (ex == null) {
2148 try {
2149 if (e != null)
2150 e.execute(new AsyncRun(action, dst));
2151 else
2152 action.run();
2153 } catch (Throwable rex) {
2154 ex = rex;
2155 }
2156 }
2157 if (e == null || ex != null)
2158 dst.internalComplete(null, ex);
2159 }
2160 helpPostComplete();
2161 other.helpPostComplete();
2162 return dst;
2163 }
2164
2165 /**
2166 * Creates and returns a CompletableFuture that is completed with
2167 * the result of the given function of either this or the other
2168 * given CompletableFuture's results when either complete. If
2169 * this and/or the other CompletableFuture complete exceptionally,
2170 * then the returned CompletableFuture may also do so, with a
2171 * CompletionException holding one of these exceptions as its cause.
2172 * No guarantees are made about which result or exception is used
2173 * in the returned CompletableFuture.
2174 *
2175 * @param other the other CompletableFuture
2176 * @param fn the function to use to compute the value of
2177 * the returned CompletableFuture
2178 * @return the new CompletableFuture
2179 */
2180 public <U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other,
2181 Function<? super T, U> fn) {
2182 return doOrApply(other, fn, null);
2183 }
2184
2185 /**
2186 * Creates and returns a CompletableFuture that is completed
2187 * asynchronously using the {@link ForkJoinPool#commonPool()} with
2188 * the result of the given function of either this or the other
2189 * given CompletableFuture's results when either complete. If
2190 * this and/or the other CompletableFuture complete exceptionally,
2191 * then the returned CompletableFuture may also do so, with a
2192 * CompletionException holding one of these exceptions as its cause.
2193 * No guarantees are made about which result or exception is used
2194 * in the returned CompletableFuture.
2195 *
2196 * @param other the other CompletableFuture
2197 * @param fn the function to use to compute the value of
2198 * the returned CompletableFuture
2199 * @return the new CompletableFuture
2200 */
2201 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
2202 Function<? super T, U> fn) {
2203 return doOrApply(other, fn, ForkJoinPool.commonPool());
2204 }
2205
2206 /**
2207 * Creates and returns a CompletableFuture that is completed
2208 * asynchronously using the given executor with the result of the
2209 * given function of either this or the other given
2210 * CompletableFuture's results when either complete. If this
2211 * and/or the other CompletableFuture complete exceptionally, then
2212 * the returned CompletableFuture may also do so, with a
2213 * CompletionException holding one of these exceptions as its cause.
2214 * No guarantees are made about which result or exception is used
2215 * in the returned CompletableFuture.
2216 *
2217 * @param other the other CompletableFuture
2218 * @param fn the function to use to compute the value of
2219 * the returned CompletableFuture
2220 * @param executor the executor to use for asynchronous execution
2221 * @return the new CompletableFuture
2222 */
2223 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
2224 Function<? super T, U> fn,
2225 Executor executor) {
2226 if (executor == null) throw new NullPointerException();
2227 return doOrApply(other, fn, executor);
2228 }
2229
2230 private <U> CompletableFuture<U> doOrApply(CompletableFuture<? extends T> other,
2231 Function<? super T, U> fn,
2232 Executor e) {
2233 if (other == null || fn == null) throw new NullPointerException();
2234 CompletableFuture<U> dst = new CompletableFuture<U>();
2235 OrApplyCompletion<T,U> d = null;
2236 Object r;
2237 if ((r = result) == null && (r = other.result) == null) {
2238 d = new OrApplyCompletion<T,U>(this, other, fn, dst, e);
2239 CompletionNode q = null, p = new CompletionNode(d);
2240 while ((r = result) == null && (r = other.result) == null) {
2241 if (q != null) {
2242 if (UNSAFE.compareAndSwapObject
2243 (other, COMPLETIONS, q.next = other.completions, q))
2244 break;
2245 }
2246 else if (UNSAFE.compareAndSwapObject
2247 (this, COMPLETIONS, p.next = completions, p))
2248 q = new CompletionNode(d);
2249 }
2250 }
2251 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2252 T t; Throwable ex;
2253 if (r instanceof AltResult) {
2254 ex = ((AltResult)r).ex;
2255 t = null;
2256 }
2257 else {
2258 ex = null;
2259 @SuppressWarnings("unchecked") T tr = (T) r;
2260 t = tr;
2261 }
2262 U u = null;
2263 if (ex == null) {
2264 try {
2265 if (e != null)
2266 e.execute(new AsyncApply<T,U>(t, fn, dst));
2267 else
2268 u = fn.apply(t);
2269 } catch (Throwable rex) {
2270 ex = rex;
2271 }
2272 }
2273 if (e == null || ex != null)
2274 dst.internalComplete(u, ex);
2275 }
2276 helpPostComplete();
2277 other.helpPostComplete();
2278 return dst;
2279 }
2280
2281 /**
2282 * Creates and returns a CompletableFuture that is completed after
2283 * performing the given action with the result of either this or the
2284 * other given CompletableFuture's result, when either complete.
2285 * If this and/or the other CompletableFuture complete
2286 * exceptionally, then the returned CompletableFuture may also do
2287 * so, with a CompletionException holding one of these exceptions as
2288 * its cause. No guarantees are made about which exception is
2289 * used in the returned CompletableFuture.
2290 *
2291 * @param other the other CompletableFuture
2292 * @param block the action to perform before completing the
2293 * returned CompletableFuture
2294 * @return the new CompletableFuture
2295 */
2296 public CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other,
2297 Consumer<? super T> block) {
2298 return doOrAccept(other, block, null);
2299 }
2300
2301 /**
2302 * Creates and returns a CompletableFuture that is completed
2303 * asynchronously using the {@link ForkJoinPool#commonPool()},
2304 * performing the given action with the result of either this or
2305 * the other given CompletableFuture's result, when either
2306 * complete. If this and/or the other CompletableFuture complete
2307 * exceptionally, then the returned CompletableFuture may also do
2308 * so, with a CompletionException holding one of these exceptions as
2309 * its cause. No guarantees are made about which exception is
2310 * used in the returned CompletableFuture.
2311 *
2312 * @param other the other CompletableFuture
2313 * @param block the action to perform before completing the
2314 * returned CompletableFuture
2315 * @return the new CompletableFuture
2316 */
2317 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
2318 Consumer<? super T> block) {
2319 return doOrAccept(other, block, ForkJoinPool.commonPool());
2320 }
2321
2322 /**
2323 * Creates and returns a CompletableFuture that is completed
2324 * asynchronously using the given executor,
2325 * performing the given action with the result of either this or
2326 * the other given CompletableFuture's result, when either
2327 * complete. If this and/or the other CompletableFuture complete
2328 * exceptionally, then the returned CompletableFuture may also do
2329 * so, with a CompletionException holding one of these exceptions as
2330 * its cause. No guarantees are made about which exception is
2331 * used in the returned CompletableFuture.
2332 *
2333 * @param other the other CompletableFuture
2334 * @param block the action to perform before completing the
2335 * returned CompletableFuture
2336 * @param executor the executor to use for asynchronous execution
2337 * @return the new CompletableFuture
2338 */
2339 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
2340 Consumer<? super T> block,
2341 Executor executor) {
2342 if (executor == null) throw new NullPointerException();
2343 return doOrAccept(other, block, executor);
2344 }
2345
2346 private CompletableFuture<Void> doOrAccept(CompletableFuture<? extends T> other,
2347 Consumer<? super T> fn,
2348 Executor e) {
2349 if (other == null || fn == null) throw new NullPointerException();
2350 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2351 OrAcceptCompletion<T> d = null;
2352 Object r;
2353 if ((r = result) == null && (r = other.result) == null) {
2354 d = new OrAcceptCompletion<T>(this, other, fn, dst, e);
2355 CompletionNode q = null, p = new CompletionNode(d);
2356 while ((r = result) == null && (r = other.result) == null) {
2357 if (q != null) {
2358 if (UNSAFE.compareAndSwapObject
2359 (other, COMPLETIONS, q.next = other.completions, q))
2360 break;
2361 }
2362 else if (UNSAFE.compareAndSwapObject
2363 (this, COMPLETIONS, p.next = completions, p))
2364 q = new CompletionNode(d);
2365 }
2366 }
2367 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2368 T t; Throwable ex;
2369 if (r instanceof AltResult) {
2370 ex = ((AltResult)r).ex;
2371 t = null;
2372 }
2373 else {
2374 ex = null;
2375 @SuppressWarnings("unchecked") T tr = (T) r;
2376 t = tr;
2377 }
2378 if (ex == null) {
2379 try {
2380 if (e != null)
2381 e.execute(new AsyncAccept<T>(t, fn, dst));
2382 else
2383 fn.accept(t);
2384 } catch (Throwable rex) {
2385 ex = rex;
2386 }
2387 }
2388 if (e == null || ex != null)
2389 dst.internalComplete(null, ex);
2390 }
2391 helpPostComplete();
2392 other.helpPostComplete();
2393 return dst;
2394 }
2395
2396 /**
2397 * Creates and returns a CompletableFuture that is completed
2398 * after this or the other given CompletableFuture complete. If
2399 * this and/or the other CompletableFuture complete exceptionally,
2400 * then the returned CompletableFuture may also do so, with a
2401 * CompletionException holding one of these exceptions as its cause.
2402 * No guarantees are made about which exception is used in the
2403 * returned CompletableFuture.
2404 *
2405 * @param other the other CompletableFuture
2406 * @param action the action to perform before completing the
2407 * returned CompletableFuture
2408 * @return the new CompletableFuture
2409 */
2410 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2411 Runnable action) {
2412 return doOrRun(other, action, null);
2413 }
2414
2415 /**
2416 * Creates and returns a CompletableFuture that is completed
2417 * asynchronously using the {@link ForkJoinPool#commonPool()}
2418 * after this or the other given CompletableFuture complete. If
2419 * this and/or the other CompletableFuture complete exceptionally,
2420 * then the returned CompletableFuture may also do so, with a
2421 * CompletionException holding one of these exceptions as its cause.
2422 * No guarantees are made about which exception is used in the
2423 * returned CompletableFuture.
2424 *
2425 * @param other the other CompletableFuture
2426 * @param action the action to perform before completing the
2427 * returned CompletableFuture
2428 * @return the new CompletableFuture
2429 */
2430 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
2431 Runnable action) {
2432 return doOrRun(other, action, ForkJoinPool.commonPool());
2433 }
2434
2435 /**
2436 * Creates and returns a CompletableFuture that is completed
2437 * asynchronously using the given executor after this or the other
2438 * given CompletableFuture complete. If this and/or the other
2439 * CompletableFuture complete exceptionally, then the returned
2440 * CompletableFuture may also do so, with a CompletionException
2441 * holding one of these exceptions as its cause. No guarantees are
2442 * made about which exception is used in the returned
2443 * CompletableFuture.
2444 *
2445 * @param other the other CompletableFuture
2446 * @param action the action to perform before completing the
2447 * returned CompletableFuture
2448 * @param executor the executor to use for asynchronous execution
2449 * @return the new CompletableFuture
2450 */
2451 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
2452 Runnable action,
2453 Executor executor) {
2454 if (executor == null) throw new NullPointerException();
2455 return doOrRun(other, action, executor);
2456 }
2457
2458 private CompletableFuture<Void> doOrRun(CompletableFuture<?> other,
2459 Runnable action,
2460 Executor e) {
2461 if (other == null || action == null) throw new NullPointerException();
2462 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2463 OrRunCompletion<T> d = null;
2464 Object r;
2465 if ((r = result) == null && (r = other.result) == null) {
2466 d = new OrRunCompletion<T>(this, other, action, dst, e);
2467 CompletionNode q = null, p = new CompletionNode(d);
2468 while ((r = result) == null && (r = other.result) == null) {
2469 if (q != null) {
2470 if (UNSAFE.compareAndSwapObject
2471 (other, COMPLETIONS, q.next = other.completions, q))
2472 break;
2473 }
2474 else if (UNSAFE.compareAndSwapObject
2475 (this, COMPLETIONS, p.next = completions, p))
2476 q = new CompletionNode(d);
2477 }
2478 }
2479 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2480 Throwable ex;
2481 if (r instanceof AltResult)
2482 ex = ((AltResult)r).ex;
2483 else
2484 ex = null;
2485 if (ex == null) {
2486 try {
2487 if (e != null)
2488 e.execute(new AsyncRun(action, dst));
2489 else
2490 action.run();
2491 } catch (Throwable rex) {
2492 ex = rex;
2493 }
2494 }
2495 if (e == null || ex != null)
2496 dst.internalComplete(null, ex);
2497 }
2498 helpPostComplete();
2499 other.helpPostComplete();
2500 return dst;
2501 }
2502
2503 /**
2504 * Returns a CompletableFuture (or an equivalent one) produced by
2505 * the given function of the result of this CompletableFuture when
2506 * completed. If this CompletableFuture completes exceptionally,
2507 * then the returned CompletableFuture also does so, with a
2508 * CompletionException holding this exception as its cause.
2509 *
2510 * @param fn the function returning a new CompletableFuture.
2511 * @return the CompletableFuture, that {@code isDone()} upon
2512 * return if completed by the given function, or an exception
2513 * occurs.
2514 */
2515 public <U> CompletableFuture<U> thenCompose(Function<? super T,
2516 CompletableFuture<U>> fn) {
2517 if (fn == null) throw new NullPointerException();
2518 CompletableFuture<U> dst = null;
2519 ComposeCompletion<T,U> d = null;
2520 Object r;
2521 if ((r = result) == null) {
2522 dst = new CompletableFuture<U>();
2523 CompletionNode p = new CompletionNode
2524 (d = new ComposeCompletion<T,U>(this, fn, dst));
2525 while ((r = result) == null) {
2526 if (UNSAFE.compareAndSwapObject
2527 (this, COMPLETIONS, p.next = completions, p))
2528 break;
2529 }
2530 }
2531 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2532 T t; Throwable ex;
2533 if (r instanceof AltResult) {
2534 ex = ((AltResult)r).ex;
2535 t = null;
2536 }
2537 else {
2538 ex = null;
2539 @SuppressWarnings("unchecked") T tr = (T) r;
2540 t = tr;
2541 }
2542 if (ex == null) {
2543 try {
2544 dst = fn.apply(t);
2545 } catch (Throwable rex) {
2546 ex = rex;
2547 }
2548 }
2549 if (dst == null) {
2550 dst = new CompletableFuture<U>();
2551 if (ex == null)
2552 ex = new NullPointerException();
2553 }
2554 if (ex != null)
2555 dst.internalComplete(null, ex);
2556 }
2557 helpPostComplete();
2558 dst.helpPostComplete();
2559 return dst;
2560 }
2561
2562 /**
2563 * Creates and returns a CompletableFuture that is completed with
2564 * the result of the given function of the exception triggering
2565 * this CompletableFuture's completion when it completes
2566 * exceptionally; Otherwise, if this CompletableFuture completes
2567 * normally, then the returned CompletableFuture also completes
2568 * normally with the same value.
2569 *
2570 * @param fn the function to use to compute the value of the
2571 * returned CompletableFuture if this CompletableFuture completed
2572 * exceptionally
2573 * @return the new CompletableFuture
2574 */
2575 public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {
2576 if (fn == null) throw new NullPointerException();
2577 CompletableFuture<T> dst = new CompletableFuture<T>();
2578 ExceptionCompletion<T> d = null;
2579 Object r;
2580 if ((r = result) == null) {
2581 CompletionNode p =
2582 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2583 while ((r = result) == null) {
2584 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2585 p.next = completions, p))
2586 break;
2587 }
2588 }
2589 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2590 T t = null; Throwable ex, dx = null;
2591 if (r instanceof AltResult) {
2592 if ((ex = ((AltResult)r).ex) != null) {
2593 try {
2594 t = fn.apply(ex);
2595 } catch (Throwable rex) {
2596 dx = rex;
2597 }
2598 }
2599 }
2600 else {
2601 @SuppressWarnings("unchecked") T tr = (T) r;
2602 t = tr;
2603 }
2604 dst.internalComplete(t, dx);
2605 }
2606 helpPostComplete();
2607 return dst;
2608 }
2609
2610 /**
2611 * Creates and returns a CompletableFuture that is completed with
2612 * the result of the given function of the result and exception of
2613 * this CompletableFuture's completion when it completes. The
2614 * given function is invoked with the result (or {@code null} if
2615 * none) and the exception (or {@code null} if none) of this
2616 * CompletableFuture when complete.
2617 *
2618 * @param fn the function to use to compute the value of the
2619 * returned CompletableFuture
2620
2621 * @return the new CompletableFuture
2622 */
2623 public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {
2624 if (fn == null) throw new NullPointerException();
2625 CompletableFuture<U> dst = new CompletableFuture<U>();
2626 HandleCompletion<T,U> d = null;
2627 Object r;
2628 if ((r = result) == null) {
2629 CompletionNode p =
2630 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2631 while ((r = result) == null) {
2632 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2633 p.next = completions, p))
2634 break;
2635 }
2636 }
2637 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2638 T t; Throwable ex;
2639 if (r instanceof AltResult) {
2640 ex = ((AltResult)r).ex;
2641 t = null;
2642 }
2643 else {
2644 ex = null;
2645 @SuppressWarnings("unchecked") T tr = (T) r;
2646 t = tr;
2647 }
2648 U u; Throwable dx;
2649 try {
2650 u = fn.apply(t, ex);
2651 dx = null;
2652 } catch (Throwable rex) {
2653 dx = rex;
2654 u = null;
2655 }
2656 dst.internalComplete(u, dx);
2657 }
2658 helpPostComplete();
2659 return dst;
2660 }
2661
2662
2663 /* ------------- Arbitrary-arity constructions -------------- */
2664
2665 /*
2666 * The basic plan of attack is to recursively form binary
2667 * completion trees of elements. This can be overkill for small
2668 * sets, but scales nicely. The And/All vs Or/Any forms use the
2669 * same idea, but details differ.
2670 */
2671
2672 /**
2673 * Returns a new CompletableFuture that is completed when all of
2674 * the given CompletableFutures complete. If any of the component
2675 * CompletableFuture complete exceptionally, then so does the
2676 * returned CompletableFuture. Otherwise, the results, if any, of
2677 * the component CompletableFutures are not reflected in the
2678 * returned CompletableFuture, but may be obtained by inspecting
2679 * them individually. If the number of components is zero, returns
2680 * a completed CompletableFuture.
2681 *
2682 * <p>Among the applications of this method is to await completion
2683 * of a set of independent CompletableFutures before continuing a
2684 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2685 * c3).join();}.
2686 *
2687 * @param cfs the CompletableFutures
2688 * @return a CompletableFuture that is complete when all of the
2689 * given CompletableFutures complete
2690 * @throws NullPointerException if the array or any of its elements are
2691 * {@code null}
2692 */
2693 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2694 int len = cfs.length; // Directly handle empty and singleton cases
2695 if (len > 1)
2696 return allTree(cfs, 0, len - 1);
2697 else {
2698 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2699 CompletableFuture<?> f;
2700 if (len == 0)
2701 dst.result = NIL;
2702 else if ((f = cfs[0]) == null)
2703 throw new NullPointerException();
2704 else {
2705 ThenCopy d = null;
2706 CompletionNode p = null;
2707 Object r;
2708 while ((r = f.result) == null) {
2709 if (d == null)
2710 d = new ThenCopy(f, dst);
2711 else if (p == null)
2712 p = new CompletionNode(d);
2713 else if (UNSAFE.compareAndSwapObject
2714 (f, COMPLETIONS, p.next = f.completions, p))
2715 break;
2716 }
2717 if (r != null && (d == null || d.compareAndSet(0, 1)))
2718 dst.internalComplete(null, (r instanceof AltResult) ?
2719 ((AltResult)r).ex : null);
2720 f.helpPostComplete();
2721 }
2722 return dst;
2723 }
2724 }
2725
2726 /**
2727 * Recursively constructs an And'ed tree of CompletableFutures.
2728 * Called only when array known to have at least two elements.
2729 */
2730 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2731 int lo, int hi) {
2732 CompletableFuture<?> fst, snd;
2733 int mid = (lo + hi) >>> 1;
2734 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2735 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2736 throw new NullPointerException();
2737 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2738 AndCompletion d = null;
2739 CompletionNode p = null, q = null;
2740 Object r = null, s = null;
2741 while ((r = fst.result) == null || (s = snd.result) == null) {
2742 if (d == null)
2743 d = new AndCompletion(fst, snd, dst);
2744 else if (p == null)
2745 p = new CompletionNode(d);
2746 else if (q == null) {
2747 if (UNSAFE.compareAndSwapObject
2748 (fst, COMPLETIONS, p.next = fst.completions, p))
2749 q = new CompletionNode(d);
2750 }
2751 else if (UNSAFE.compareAndSwapObject
2752 (snd, COMPLETIONS, q.next = snd.completions, q))
2753 break;
2754 }
2755 if ((r != null || (r = fst.result) != null) &&
2756 (s != null || (s = snd.result) != null) &&
2757 (d == null || d.compareAndSet(0, 1))) {
2758 Throwable ex;
2759 if (r instanceof AltResult)
2760 ex = ((AltResult)r).ex;
2761 else
2762 ex = null;
2763 if (ex == null && (s instanceof AltResult))
2764 ex = ((AltResult)s).ex;
2765 dst.internalComplete(null, ex);
2766 }
2767 fst.helpPostComplete();
2768 snd.helpPostComplete();
2769 return dst;
2770 }
2771
2772 /**
2773 * Returns a new CompletableFuture that is completed when any of
2774 * the component CompletableFutures complete; with the same result if
2775 * it completed normally, otherwise exceptionally. If the number
2776 * of components is zero, returns a completed CompletableFuture.
2777 *
2778 * @param cfs the CompletableFutures
2779 * @return a CompletableFuture that is complete when any of the
2780 * given CompletableFutures complete
2781 * @throws NullPointerException if the array or any of its elements are
2782 * {@code null}
2783 */
2784 public static CompletableFuture<?> anyOf(CompletableFuture<?>... cfs) {
2785 int len = cfs.length; // Same idea as allOf
2786 if (len > 1)
2787 return anyTree(cfs, 0, len - 1);
2788 else {
2789 CompletableFuture<?> dst = new CompletableFuture<Object>();
2790 CompletableFuture<?> f;
2791 if (len == 0)
2792 dst.result = NIL;
2793 else if ((f = cfs[0]) == null)
2794 throw new NullPointerException();
2795 else {
2796 ThenCopy d = null;
2797 CompletionNode p = null;
2798 Object r;
2799 while ((r = f.result) == null) {
2800 if (d == null)
2801 d = new ThenCopy(f, dst);
2802 else if (p == null)
2803 p = new CompletionNode(d);
2804 else if (UNSAFE.compareAndSwapObject
2805 (f, COMPLETIONS, p.next = f.completions, p))
2806 break;
2807 }
2808 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2809 Throwable ex; Object t;
2810 if (r instanceof AltResult) {
2811 ex = ((AltResult)r).ex;
2812 t = null;
2813 }
2814 else {
2815 ex = null;
2816 t = r;
2817 }
2818 dst.internalComplete(t, ex);
2819 }
2820 f.helpPostComplete();
2821 }
2822 return dst;
2823 }
2824 }
2825
2826 /**
2827 * Recursively constructs an Or'ed tree of CompletableFutures
2828 */
2829 private static CompletableFuture<?> anyTree(CompletableFuture<?>[] cfs,
2830 int lo, int hi) {
2831 CompletableFuture<?> fst, snd;
2832 int mid = (lo + hi) >>> 1;
2833 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
2834 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
2835 throw new NullPointerException();
2836 CompletableFuture<?> dst = new CompletableFuture<Object>();
2837 OrCompletion d = null;
2838 CompletionNode p = null, q = null;
2839 Object r;
2840 while ((r = fst.result) == null && (r = snd.result) == null) {
2841 if (d == null)
2842 d = new OrCompletion(fst, snd, dst);
2843 else if (p == null)
2844 p = new CompletionNode(d);
2845 else if (q == null) {
2846 if (UNSAFE.compareAndSwapObject
2847 (fst, COMPLETIONS, p.next = fst.completions, p))
2848 q = new CompletionNode(d);
2849 }
2850 else if (UNSAFE.compareAndSwapObject
2851 (snd, COMPLETIONS, q.next = snd.completions, q))
2852 break;
2853 }
2854 if ((r != null || (r = fst.result) != null ||
2855 (r = snd.result) != null) &&
2856 (d == null || d.compareAndSet(0, 1))) {
2857 Throwable ex; Object t;
2858 if (r instanceof AltResult) {
2859 ex = ((AltResult)r).ex;
2860 t = null;
2861 }
2862 else {
2863 ex = null;
2864 t = r;
2865 }
2866 dst.internalComplete(t, ex);
2867 }
2868 fst.helpPostComplete();
2869 snd.helpPostComplete();
2870 return dst;
2871 }
2872
2873
2874 /* ------------- Control and status methods -------------- */
2875
2876 /**
2877 * Attempts to complete this CompletableFuture with
2878 * a {@link CancellationException}.
2879 *
2880 * @param mayInterruptIfRunning this value has no effect in this
2881 * implementation because interrupts are not used to control
2882 * processing.
2883 *
2884 * @return {@code true} if this task is now cancelled
2885 */
2886 public boolean cancel(boolean mayInterruptIfRunning) {
2887 Object r;
2888 while ((r = result) == null) {
2889 r = new AltResult(new CancellationException());
2890 if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
2891 postComplete();
2892 return true;
2893 }
2894 }
2895 return ((r instanceof AltResult) &&
2896 (((AltResult)r).ex instanceof CancellationException));
2897 }
2898
2899 /**
2900 * Returns {@code true} if this CompletableFuture was cancelled
2901 * before it completed normally.
2902 *
2903 * @return {@code true} if this CompletableFuture was cancelled
2904 * before it completed normally
2905 */
2906 public boolean isCancelled() {
2907 Object r;
2908 return ((r = result) != null &&
2909 (r instanceof AltResult) &&
2910 (((AltResult)r).ex instanceof CancellationException));
2911 }
2912
2913 /**
2914 * Forcibly sets or resets the value subsequently returned by
2915 * method get() and related methods, whether or not already
2916 * completed. This method is designed for use only in error
2917 * recovery actions, and even in such situations may result in
2918 * ongoing dependent completions using established versus
2919 * overwritten outcomes.
2920 *
2921 * @param value the completion value
2922 */
2923 public void obtrudeValue(T value) {
2924 result = (value == null) ? NIL : value;
2925 postComplete();
2926 }
2927
2928 /**
2929 * Forcibly causes subsequent invocations of method get() and
2930 * related methods to throw the given exception, whether or not
2931 * already completed. This method is designed for use only in
2932 * recovery actions, and even in such situations may result in
2933 * ongoing dependent completions using established versus
2934 * overwritten outcomes.
2935 *
2936 * @param ex the exception
2937 */
2938 public void obtrudeException(Throwable ex) {
2939 if (ex == null) throw new NullPointerException();
2940 result = new AltResult(ex);
2941 postComplete();
2942 }
2943
2944 /**
2945 * Returns the estimated number of CompletableFutures whose
2946 * completions are awaiting completion of this CompletableFuture.
2947 * This method is designed for use in monitoring system state, not
2948 * for synchronization control.
2949 *
2950 * @return the number of dependent CompletableFutures
2951 */
2952 public int getNumberOfDependents() {
2953 int count = 0;
2954 for (CompletionNode p = completions; p != null; p = p.next)
2955 ++count;
2956 return count;
2957 }
2958
2959 /**
2960 * Returns a string identifying this CompletableFuture, as well as
2961 * its completion state. The state, in brackets, includes the
2962 * String {@code "Completed Normally"} or the String {@code
2963 * "Completed Exceptionally"}, or the String {@code "Not
2964 * completed"} followed by the number of CompletableFutures
2965 * dependent upon its completion, if any.
2966 *
2967 * @return a string identifying this CompletableFuture, as well as its state
2968 */
2969 public String toString() {
2970 String id = super.toString();
2971 int count = getNumberOfDependents();
2972 Object r = result;
2973 Throwable ex = (r != null && (r instanceof AltResult) ?
2974 ((AltResult)r).ex : null);
2975 return id + (ex != null ? "[Completed exceptionally]" :
2976 (r != null ? "[Completed normally]" :
2977 (count == 0 ? "[Not completed]" :
2978 ("[Not completed, " + count + " dependents]"))));
2979 }
2980
2981 // Unsafe mechanics
2982 private static final sun.misc.Unsafe UNSAFE;
2983 private static final long RESULT;
2984 private static final long WAITERS;
2985 private static final long COMPLETIONS;
2986 static {
2987 try {
2988 UNSAFE = sun.misc.Unsafe.getUnsafe();
2989 Class<?> k = CompletableFuture.class;
2990 RESULT = UNSAFE.objectFieldOffset
2991 (k.getDeclaredField("result"));
2992 WAITERS = UNSAFE.objectFieldOffset
2993 (k.getDeclaredField("waiters"));
2994 COMPLETIONS = UNSAFE.objectFieldOffset
2995 (k.getDeclaredField("completions"));
2996 } catch (Exception e) {
2997 throw new Error(e);
2998 }
2999 }
3000 }