ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.101
Committed: Sun Apr 13 22:25:37 2014 UTC (10 years, 1 month ago) by dl
Branch: MAIN
Changes since 1.100: +238 -265 lines
Log Message:
Avoid StackOverflowError for branching completions

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.CompletionException;
23 import java.util.concurrent.CompletionStage;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import java.util.concurrent.locks.LockSupport;
26
27 /**
28 * A {@link Future} that may be explicitly completed (setting its
29 * value and status), and may be used as a {@link CompletionStage},
30 * supporting dependent functions and actions that trigger upon its
31 * completion.
32 *
33 * <p>When two or more threads attempt to
34 * {@link #complete complete},
35 * {@link #completeExceptionally completeExceptionally}, or
36 * {@link #cancel cancel}
37 * a CompletableFuture, only one of them succeeds.
38 *
39 * <p>In addition to these and related methods for directly
40 * manipulating status and results, CompletableFuture implements
41 * interface {@link CompletionStage} with the following policies: <ul>
42 *
43 * <li>Actions supplied for dependent completions of
44 * <em>non-async</em> methods may be performed by the thread that
45 * completes the current CompletableFuture, or by any other caller of
46 * a completion method.</li>
47 *
48 * <li>All <em>async</em> methods without an explicit Executor
49 * argument are performed using the {@link ForkJoinPool#commonPool()}
50 * (unless it does not support a parallelism level of at least two, in
51 * which case, a new Thread is used). To simplify monitoring,
52 * debugging, and tracking, all generated asynchronous tasks are
53 * instances of the marker interface {@link
54 * AsynchronousCompletionTask}. </li>
55 *
56 * <li>All CompletionStage methods are implemented independently of
57 * other public methods, so the behavior of one method is not impacted
58 * by overrides of others in subclasses. </li> </ul>
59 *
60 * <p>CompletableFuture also implements {@link Future} with the following
61 * policies: <ul>
62 *
63 * <li>Since (unlike {@link FutureTask}) this class has no direct
64 * control over the computation that causes it to be completed,
65 * cancellation is treated as just another form of exceptional
66 * completion. Method {@link #cancel cancel} has the same effect as
67 * {@code completeExceptionally(new CancellationException())}. Method
68 * {@link #isCompletedExceptionally} can be used to determine if a
69 * CompletableFuture completed in any exceptional fashion.</li>
70 *
71 * <li>In case of exceptional completion with a CompletionException,
72 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
73 * {@link ExecutionException} with the same cause as held in the
74 * corresponding CompletionException. To simplify usage in most
75 * contexts, this class also defines methods {@link #join()} and
76 * {@link #getNow} that instead throw the CompletionException directly
77 * in these cases.</li> </ul>
78 *
79 * @author Doug Lea
80 * @since 1.8
81 */
82 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
83
84 /*
85 * Overview:
86 *
87 * 1. Non-nullness of field result (set via CAS) indicates done.
88 * An AltResult is used to box null as a result, as well as to
89 * hold exceptions. Using a single field makes completion fast
90 * and simple to detect and trigger, at the expense of a lot of
91 * encoding and decoding that infiltrates many methods. One minor
92 * simplification relies on the (static) NIL (to box null results)
93 * being the only AltResult with a null exception field, so we
94 * don't usually need explicit comparisons with NIL. The CF
95 * exception propagation mechanics surrounding decoding rely on
96 * unchecked casts of decoded results really being unchecked,
97 * where user type errors are caught at point of use, as is
98 * currently the case in Java. These are highlighted by using
99 * SuppressWarnings-annotated temporaries.
100 *
101 * 2. Waiters are held in a Treiber stack similar to the one used
102 * in FutureTask, Phaser, and SynchronousQueue. See their
103 * internal documentation for algorithmic details.
104 *
105 * 3. Completions are also kept in a list/stack, and pulled off
106 * and run when completion of an observable CF is triggered. (We
107 * could even use the same stack as for waiters, but would give up
108 * the potential parallelism obtained because woken waiters help
109 * release/run others -- see method postComplete). Because
110 * post-processing may race with direct calls, class Completion
111 * opportunistically extends AtomicInteger so callers can claim
112 * the action via compareAndSet(0, 1). The Completion.tryComplete
113 * methods are all written a boringly similar uniform way (that
114 * sometimes includes unnecessary-looking checks, kept to maintain
115 * uniformity). There are enough dimensions upon which they
116 * differ that attempts to factor commonalities while maintaining
117 * efficiency require more lines of code than they would save.
118 *
119 * 4. The exported then/and/or methods do support a bit of
120 * factoring (see doThenApply etc). They must cope with the
121 * intrinsic races surrounding addition of a dependent action
122 * versus performing the action directly because the task is
123 * already complete. For example, a CF may not be complete upon
124 * entry, so a dependent completion is added, but by the time it
125 * is added, the target CF is complete, so must be directly
126 * executed. This is all done while avoiding unnecessary object
127 * construction in safe-bypass cases.
128 */
129
130 // preliminaries
131
132 static final class AltResult {
133 final Throwable ex; // null only for NIL
134 AltResult(Throwable ex) { this.ex = ex; }
135 }
136
137 static final AltResult NIL = new AltResult(null);
138
139 // Fields
140
141 volatile Object result; // Either the result or boxed AltResult
142 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
143 volatile CompletionNode completions; // list (Treiber stack) of completions
144
145 // Basic utilities for triggering and processing completions
146
147 /**
148 * Triggers completion with the encoding of the given arguments:
149 * if the exception is non-null, encodes it as a wrapped
150 * CompletionException unless it is one already. Otherwise uses
151 * the given result, boxed as NIL if null.
152 */
153 final void internalComplete(T v, Throwable ex) {
154 if (result == null)
155 UNSAFE.compareAndSwapObject
156 (this, RESULT, null,
157 (ex == null) ? (v == null) ? NIL : v :
158 new AltResult((ex instanceof CompletionException) ? ex :
159 new CompletionException(ex)));
160 }
161
162 /**
163 * Signals waiters and triggers all enabled dependent completions
164 * reachable from src.
165 *
166 * @param src if non-null a completed CompletableFuture
167 */
168 static final void postComplete(CompletableFuture<?> src) {
169 /*
170 * CF "src" is always the base of a possible chain of
171 * completions that may need further processing. To avoid
172 * potential StackOverflowErrors, we extend along only one
173 * path ("dep") at a time, holding others by pushing them on
174 * src's completion list, which advances tail-recursion style
175 * when possible. On each step, "f" is dep if non-null, else
176 * src.
177 */
178 for (CompletableFuture<?> f = src, dep = null; f != null;) {
179 WaitNode q; CompletionNode h; Thread w;
180 if ((q = f.waiters) != null) {
181 if (UNSAFE.compareAndSwapObject(f, WAITERS, q, q.next) &&
182 (w = q.thread) != null) {
183 q.thread = null;
184 LockSupport.unpark(w);
185 }
186 }
187 else if ((h = f.completions) == null) {
188 if (dep == null)
189 break;
190 dep = null;
191 f = src;
192 }
193 else if (UNSAFE.compareAndSwapObject(f, COMPLETIONS, h, h.next)) {
194 Completion c; CompletableFuture<?> d;
195 if (dep != null && dep.completions != null) { // push to src
196 do {} while (!UNSAFE.compareAndSwapObject(
197 src, COMPLETIONS,
198 h.next = src.completions, h));
199 }
200 else if ((c = h.completion) == null ||
201 (d = c.tryComplete()) == null ||
202 d.result == null) {
203 dep = null;
204 f = src;
205 }
206 else if (src.completions == null) {
207 dep = null;
208 f = src = d;
209 }
210 else
211 f = dep = d;
212 }
213 }
214 }
215
216 /* ------------- waiting for completions -------------- */
217
218 /** Number of processors, for spin control */
219 static final int NCPU = Runtime.getRuntime().availableProcessors();
220
221 /**
222 * Heuristic spin value for waitingGet() before blocking on
223 * multiprocessors
224 */
225 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
226
227 /**
228 * Linked nodes to record waiting threads in a Treiber stack. See
229 * other classes such as Phaser and SynchronousQueue for more
230 * detailed explanation. This class implements ManagedBlocker to
231 * avoid starvation when blocking actions pile up in
232 * ForkJoinPools.
233 */
234 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
235 long nanos; // wait time if timed
236 final long deadline; // non-zero if timed
237 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
238 volatile Thread thread;
239 volatile WaitNode next;
240 WaitNode(boolean interruptible, long nanos, long deadline) {
241 this.thread = Thread.currentThread();
242 this.interruptControl = interruptible ? 1 : 0;
243 this.nanos = nanos;
244 this.deadline = deadline;
245 }
246 public boolean isReleasable() {
247 if (thread == null)
248 return true;
249 if (Thread.interrupted()) {
250 int i = interruptControl;
251 interruptControl = -1;
252 if (i > 0)
253 return true;
254 }
255 if (deadline != 0L &&
256 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
257 thread = null;
258 return true;
259 }
260 return false;
261 }
262 public boolean block() {
263 if (isReleasable())
264 return true;
265 else if (deadline == 0L)
266 LockSupport.park(this);
267 else if (nanos > 0L)
268 LockSupport.parkNanos(this, nanos);
269 return isReleasable();
270 }
271 }
272
273 /**
274 * Returns raw result after waiting, or null if interruptible and
275 * interrupted.
276 */
277 private Object waitingGet(boolean interruptible) {
278 WaitNode q = null;
279 boolean queued = false;
280 int spins = SPINS;
281 for (Object r;;) {
282 if ((r = result) != null) {
283 if (q != null) { // suppress unpark
284 q.thread = null;
285 if (q.interruptControl < 0) {
286 if (interruptible) {
287 removeCancelledWaiters();
288 return null;
289 }
290 Thread.currentThread().interrupt();
291 }
292 }
293 postComplete(this); // help release others
294 return r;
295 }
296 else if (spins > 0) {
297 int rnd = ThreadLocalRandom.nextSecondarySeed();
298 if (rnd == 0)
299 rnd = ThreadLocalRandom.current().nextInt();
300 if (rnd >= 0)
301 --spins;
302 }
303 else if (q == null)
304 q = new WaitNode(interruptible, 0L, 0L);
305 else if (!queued)
306 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
307 q.next = waiters, q);
308 else if (interruptible && q.interruptControl < 0) {
309 q.thread = null;
310 removeCancelledWaiters();
311 return null;
312 }
313 else if (q.thread != null && result == null) {
314 try {
315 ForkJoinPool.managedBlock(q);
316 } catch (InterruptedException ex) {
317 q.interruptControl = -1;
318 }
319 }
320 }
321 }
322
323 /**
324 * Awaits completion or aborts on interrupt or timeout.
325 *
326 * @param nanos time to wait
327 * @return raw result
328 */
329 private Object timedAwaitDone(long nanos)
330 throws InterruptedException, TimeoutException {
331 WaitNode q = null;
332 boolean queued = false;
333 for (Object r;;) {
334 if ((r = result) != null) {
335 if (q != null) {
336 q.thread = null;
337 if (q.interruptControl < 0) {
338 removeCancelledWaiters();
339 throw new InterruptedException();
340 }
341 }
342 postComplete(this);
343 return r;
344 }
345 else if (q == null) {
346 if (nanos <= 0L)
347 throw new TimeoutException();
348 long d = System.nanoTime() + nanos;
349 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
350 }
351 else if (!queued)
352 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
353 q.next = waiters, q);
354 else if (q.interruptControl < 0) {
355 q.thread = null;
356 removeCancelledWaiters();
357 throw new InterruptedException();
358 }
359 else if (q.nanos <= 0L) {
360 if (result == null) {
361 q.thread = null;
362 removeCancelledWaiters();
363 throw new TimeoutException();
364 }
365 }
366 else if (q.thread != null && result == null) {
367 try {
368 ForkJoinPool.managedBlock(q);
369 } catch (InterruptedException ex) {
370 q.interruptControl = -1;
371 }
372 }
373 }
374 }
375
376 /**
377 * Tries to unlink timed-out or interrupted wait nodes to avoid
378 * accumulating garbage. Internal nodes are simply unspliced
379 * without CAS since it is harmless if they are traversed anyway
380 * by releasers. To avoid effects of unsplicing from already
381 * removed nodes, the list is retraversed in case of an apparent
382 * race. This is slow when there are a lot of nodes, but we don't
383 * expect lists to be long enough to outweigh higher-overhead
384 * schemes.
385 */
386 private void removeCancelledWaiters() {
387 retry: for (;;) { // restart on removeWaiter race
388 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
389 s = q.next;
390 if (q.thread != null)
391 pred = q;
392 else if (pred != null) {
393 pred.next = s;
394 if (pred.thread == null) // check for race
395 continue retry;
396 }
397 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
398 continue retry;
399 }
400 break;
401 }
402 }
403
404 /* ------------- Async tasks -------------- */
405
406 /**
407 * A marker interface identifying asynchronous tasks produced by
408 * {@code async} methods. This may be useful for monitoring,
409 * debugging, and tracking asynchronous activities.
410 *
411 * @since 1.8
412 */
413 public static interface AsynchronousCompletionTask {
414 }
415
416 /** Base class can act as either FJ or plain Runnable */
417 @SuppressWarnings("serial")
418 abstract static class Async extends ForkJoinTask<Void>
419 implements Runnable, AsynchronousCompletionTask {
420 public final Void getRawResult() { return null; }
421 public final void setRawResult(Void v) { }
422 public final void run() { exec(); }
423 }
424
425 /**
426 * Starts the given async task using the given executor, unless
427 * the executor is ForkJoinPool.commonPool and it has been
428 * disabled, in which case starts a new thread.
429 */
430 static void execAsync(Executor e, Async r) {
431 if (e == ForkJoinPool.commonPool() &&
432 ForkJoinPool.getCommonPoolParallelism() <= 1)
433 new Thread(r).start();
434 else
435 e.execute(r);
436 }
437
438 static final class AsyncRun extends Async {
439 final Runnable fn;
440 final CompletableFuture<Void> dst;
441 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
442 this.fn = fn; this.dst = dst;
443 }
444 public final boolean exec() {
445 CompletableFuture<Void> d; Throwable ex;
446 if ((d = this.dst) != null && d.result == null) {
447 try {
448 fn.run();
449 ex = null;
450 } catch (Throwable rex) {
451 ex = rex;
452 }
453 d.internalComplete(null, ex);
454 }
455 postComplete(d);
456 return true;
457 }
458 private static final long serialVersionUID = 5232453952276885070L;
459 }
460
461 static final class AsyncSupply<U> extends Async {
462 final Supplier<U> fn;
463 final CompletableFuture<U> dst;
464 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
465 this.fn = fn; this.dst = dst;
466 }
467 public final boolean exec() {
468 CompletableFuture<U> d; U u; Throwable ex;
469 if ((d = this.dst) != null && d.result == null) {
470 try {
471 u = fn.get();
472 ex = null;
473 } catch (Throwable rex) {
474 ex = rex;
475 u = null;
476 }
477 d.internalComplete(u, ex);
478 }
479 postComplete(d);
480 return true;
481 }
482 private static final long serialVersionUID = 5232453952276885070L;
483 }
484
485 static final class AsyncApply<T,U> extends Async {
486 final T arg;
487 final Function<? super T,? extends U> fn;
488 final CompletableFuture<U> dst;
489 AsyncApply(T arg, Function<? super T,? extends U> fn,
490 CompletableFuture<U> dst) {
491 this.arg = arg; this.fn = fn; this.dst = dst;
492 }
493 public final boolean exec() {
494 CompletableFuture<U> d; U u; Throwable ex;
495 if ((d = this.dst) != null && d.result == null) {
496 try {
497 u = fn.apply(arg);
498 ex = null;
499 } catch (Throwable rex) {
500 ex = rex;
501 u = null;
502 }
503 d.internalComplete(u, ex);
504 }
505 postComplete(d);
506 return true;
507 }
508 private static final long serialVersionUID = 5232453952276885070L;
509 }
510
511 static final class AsyncCombine<T,U,V> extends Async {
512 final T arg1;
513 final U arg2;
514 final BiFunction<? super T,? super U,? extends V> fn;
515 final CompletableFuture<V> dst;
516 AsyncCombine(T arg1, U arg2,
517 BiFunction<? super T,? super U,? extends V> fn,
518 CompletableFuture<V> dst) {
519 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
520 }
521 public final boolean exec() {
522 CompletableFuture<V> d; V v; Throwable ex;
523 if ((d = this.dst) != null && d.result == null) {
524 try {
525 v = fn.apply(arg1, arg2);
526 ex = null;
527 } catch (Throwable rex) {
528 ex = rex;
529 v = null;
530 }
531 d.internalComplete(v, ex);
532 }
533 postComplete(d);
534 return true;
535 }
536 private static final long serialVersionUID = 5232453952276885070L;
537 }
538
539 static final class AsyncAccept<T> extends Async {
540 final T arg;
541 final Consumer<? super T> fn;
542 final CompletableFuture<?> dst;
543 AsyncAccept(T arg, Consumer<? super T> fn,
544 CompletableFuture<?> dst) {
545 this.arg = arg; this.fn = fn; this.dst = dst;
546 }
547 public final boolean exec() {
548 CompletableFuture<?> d; Throwable ex;
549 if ((d = this.dst) != null && d.result == null) {
550 try {
551 fn.accept(arg);
552 ex = null;
553 } catch (Throwable rex) {
554 ex = rex;
555 }
556 d.internalComplete(null, ex);
557 }
558 postComplete(d);
559 return true;
560 }
561 private static final long serialVersionUID = 5232453952276885070L;
562 }
563
564 static final class AsyncAcceptBoth<T,U> extends Async {
565 final T arg1;
566 final U arg2;
567 final BiConsumer<? super T,? super U> fn;
568 final CompletableFuture<?> dst;
569 AsyncAcceptBoth(T arg1, U arg2,
570 BiConsumer<? super T,? super U> fn,
571 CompletableFuture<?> dst) {
572 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
573 }
574 public final boolean exec() {
575 CompletableFuture<?> d; Throwable ex;
576 if ((d = this.dst) != null && d.result == null) {
577 try {
578 fn.accept(arg1, arg2);
579 ex = null;
580 } catch (Throwable rex) {
581 ex = rex;
582 }
583 d.internalComplete(null, ex);
584 }
585 postComplete(d);
586 return true;
587 }
588 private static final long serialVersionUID = 5232453952276885070L;
589 }
590
591 static final class AsyncCompose<T,U> extends Async {
592 final T arg;
593 final Function<? super T, ? extends CompletionStage<U>> fn;
594 final CompletableFuture<U> dst;
595 AsyncCompose(T arg,
596 Function<? super T, ? extends CompletionStage<U>> fn,
597 CompletableFuture<U> dst) {
598 this.arg = arg; this.fn = fn; this.dst = dst;
599 }
600 public final boolean exec() {
601 CompletableFuture<U> d, fr; U u; Throwable ex;
602 if ((d = this.dst) != null && d.result == null) {
603 try {
604 CompletionStage<U> cs = fn.apply(arg);
605 fr = (cs == null) ? null : cs.toCompletableFuture();
606 ex = (fr == null) ? new NullPointerException() : null;
607 } catch (Throwable rex) {
608 ex = rex;
609 fr = null;
610 }
611 if (ex != null)
612 u = null;
613 else {
614 Object r = fr.result;
615 if (r == null)
616 r = fr.waitingGet(false);
617 if (r instanceof AltResult) {
618 ex = ((AltResult)r).ex;
619 u = null;
620 }
621 else {
622 @SuppressWarnings("unchecked") U ur = (U) r;
623 u = ur;
624 }
625 }
626 d.internalComplete(u, ex);
627 }
628 postComplete(d);
629 return true;
630 }
631 private static final long serialVersionUID = 5232453952276885070L;
632 }
633
634 static final class AsyncWhenComplete<T> extends Async {
635 final T arg1;
636 final Throwable arg2;
637 final BiConsumer<? super T,? super Throwable> fn;
638 final CompletableFuture<T> dst;
639 AsyncWhenComplete(T arg1, Throwable arg2,
640 BiConsumer<? super T,? super Throwable> fn,
641 CompletableFuture<T> dst) {
642 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
643 }
644 public final boolean exec() {
645 CompletableFuture<T> d;
646 if ((d = this.dst) != null && d.result == null) {
647 Throwable ex = arg2;
648 try {
649 fn.accept(arg1, ex);
650 } catch (Throwable rex) {
651 if (ex == null)
652 ex = rex;
653 }
654 d.internalComplete(arg1, ex);
655 }
656 postComplete(d);
657 return true;
658 }
659 private static final long serialVersionUID = 5232453952276885070L;
660 }
661
662 /* ------------- Completions -------------- */
663
664 /**
665 * Simple linked list nodes to record completions, used in
666 * basically the same way as WaitNodes. (We separate nodes from
667 * the Completions themselves mainly because for the And and Or
668 * methods, the same Completion object resides in two lists.)
669 */
670 static final class CompletionNode {
671 final Completion completion;
672 volatile CompletionNode next;
673 CompletionNode(Completion completion) { this.completion = completion; }
674 }
675
676 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
677 @SuppressWarnings("serial")
678 abstract static class Completion extends AtomicInteger {
679 /**
680 * Completes a dependent Completablefuture if enabled
681 * @return the dependent Completablefuture
682 */
683 public abstract CompletableFuture<?> tryComplete();
684 }
685
686 static final class ThenApply<T,U> extends Completion {
687 final CompletableFuture<? extends T> src;
688 final Function<? super T,? extends U> fn;
689 final CompletableFuture<U> dst;
690 final Executor executor;
691 ThenApply(CompletableFuture<? extends T> src,
692 Function<? super T,? extends U> fn,
693 CompletableFuture<U> dst,
694 Executor executor) {
695 this.src = src; this.fn = fn; this.dst = dst;
696 this.executor = executor;
697 }
698 public final CompletableFuture<?> tryComplete() {
699 final CompletableFuture<? extends T> a;
700 final Function<? super T,? extends U> fn;
701 final CompletableFuture<U> dst;
702 Object r; T t; Throwable ex;
703 if ((dst = this.dst) != null &&
704 (fn = this.fn) != null &&
705 (a = this.src) != null &&
706 (r = a.result) != null &&
707 get() == 0 && compareAndSet(0, 1)) {
708 if (r instanceof AltResult) {
709 ex = ((AltResult)r).ex;
710 t = null;
711 }
712 else {
713 ex = null;
714 @SuppressWarnings("unchecked") T tr = (T) r;
715 t = tr;
716 }
717 Executor e = executor;
718 U u = null;
719 if (ex == null) {
720 try {
721 if (e != null)
722 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
723 else
724 u = fn.apply(t);
725 } catch (Throwable rex) {
726 ex = rex;
727 }
728 }
729 if (e == null || ex != null)
730 dst.internalComplete(u, ex);
731 }
732 return dst;
733 }
734 private static final long serialVersionUID = 5232453952276885070L;
735 }
736
737 static final class ThenAccept<T> extends Completion {
738 final CompletableFuture<? extends T> src;
739 final Consumer<? super T> fn;
740 final CompletableFuture<?> dst;
741 final Executor executor;
742 ThenAccept(CompletableFuture<? extends T> src,
743 Consumer<? super T> fn,
744 CompletableFuture<?> dst,
745 Executor executor) {
746 this.src = src; this.fn = fn; this.dst = dst;
747 this.executor = executor;
748 }
749 public final CompletableFuture<?> tryComplete() {
750 final CompletableFuture<? extends T> a;
751 final Consumer<? super T> fn;
752 final CompletableFuture<?> dst;
753 Object r; T t; Throwable ex;
754 if ((dst = this.dst) != null &&
755 (fn = this.fn) != null &&
756 (a = this.src) != null &&
757 (r = a.result) != null &&
758 get() == 0 && compareAndSet(0, 1)) {
759 if (r instanceof AltResult) {
760 ex = ((AltResult)r).ex;
761 t = null;
762 }
763 else {
764 ex = null;
765 @SuppressWarnings("unchecked") T tr = (T) r;
766 t = tr;
767 }
768 Executor e = executor;
769 if (ex == null) {
770 try {
771 if (e != null)
772 execAsync(e, new AsyncAccept<T>(t, fn, dst));
773 else
774 fn.accept(t);
775 } catch (Throwable rex) {
776 ex = rex;
777 }
778 }
779 if (e == null || ex != null)
780 dst.internalComplete(null, ex);
781 }
782 return dst;
783 }
784 private static final long serialVersionUID = 5232453952276885070L;
785 }
786
787 static final class ThenRun extends Completion {
788 final CompletableFuture<?> src;
789 final Runnable fn;
790 final CompletableFuture<Void> dst;
791 final Executor executor;
792 ThenRun(CompletableFuture<?> src,
793 Runnable fn,
794 CompletableFuture<Void> dst,
795 Executor executor) {
796 this.src = src; this.fn = fn; this.dst = dst;
797 this.executor = executor;
798 }
799 public final CompletableFuture<?> tryComplete() {
800 final CompletableFuture<?> a;
801 final Runnable fn;
802 final CompletableFuture<Void> dst;
803 Object r; Throwable ex;
804 if ((dst = this.dst) != null &&
805 (fn = this.fn) != null &&
806 (a = this.src) != null &&
807 (r = a.result) != null &&
808 get() == 0 && compareAndSet(0, 1)) {
809 if (r instanceof AltResult)
810 ex = ((AltResult)r).ex;
811 else
812 ex = null;
813 Executor e = executor;
814 if (ex == null) {
815 try {
816 if (e != null)
817 execAsync(e, new AsyncRun(fn, dst));
818 else
819 fn.run();
820 } catch (Throwable rex) {
821 ex = rex;
822 }
823 }
824 if (e == null || ex != null)
825 dst.internalComplete(null, ex);
826 }
827 return dst;
828 }
829 private static final long serialVersionUID = 5232453952276885070L;
830 }
831
832 static final class ThenCombine<T,U,V> extends Completion {
833 final CompletableFuture<? extends T> src;
834 final CompletableFuture<? extends U> snd;
835 final BiFunction<? super T,? super U,? extends V> fn;
836 final CompletableFuture<V> dst;
837 final Executor executor;
838 ThenCombine(CompletableFuture<? extends T> src,
839 CompletableFuture<? extends U> snd,
840 BiFunction<? super T,? super U,? extends V> fn,
841 CompletableFuture<V> dst,
842 Executor executor) {
843 this.src = src; this.snd = snd;
844 this.fn = fn; this.dst = dst;
845 this.executor = executor;
846 }
847 public final CompletableFuture<?> tryComplete() {
848 final CompletableFuture<? extends T> a;
849 final CompletableFuture<? extends U> b;
850 final BiFunction<? super T,? super U,? extends V> fn;
851 final CompletableFuture<V> dst;
852 Object r, s; T t; U u; Throwable ex;
853 if ((dst = this.dst) != null &&
854 (fn = this.fn) != null &&
855 (a = this.src) != null &&
856 (r = a.result) != null &&
857 (b = this.snd) != null &&
858 (s = b.result) != null &&
859 get() == 0 && compareAndSet(0, 1)) {
860 if (r instanceof AltResult) {
861 ex = ((AltResult)r).ex;
862 t = null;
863 }
864 else {
865 ex = null;
866 @SuppressWarnings("unchecked") T tr = (T) r;
867 t = tr;
868 }
869 if (ex != null)
870 u = null;
871 else if (s instanceof AltResult) {
872 ex = ((AltResult)s).ex;
873 u = null;
874 }
875 else {
876 @SuppressWarnings("unchecked") U us = (U) s;
877 u = us;
878 }
879 Executor e = executor;
880 V v = null;
881 if (ex == null) {
882 try {
883 if (e != null)
884 execAsync(e, new AsyncCombine<T,U,V>(t, u, fn, dst));
885 else
886 v = fn.apply(t, u);
887 } catch (Throwable rex) {
888 ex = rex;
889 }
890 }
891 if (e == null || ex != null)
892 dst.internalComplete(v, ex);
893 }
894 return dst;
895 }
896 private static final long serialVersionUID = 5232453952276885070L;
897 }
898
899 static final class ThenAcceptBoth<T,U> extends Completion {
900 final CompletableFuture<? extends T> src;
901 final CompletableFuture<? extends U> snd;
902 final BiConsumer<? super T,? super U> fn;
903 final CompletableFuture<Void> dst;
904 final Executor executor;
905 ThenAcceptBoth(CompletableFuture<? extends T> src,
906 CompletableFuture<? extends U> snd,
907 BiConsumer<? super T,? super U> fn,
908 CompletableFuture<Void> dst,
909 Executor executor) {
910 this.src = src; this.snd = snd;
911 this.fn = fn; this.dst = dst;
912 this.executor = executor;
913 }
914 public final CompletableFuture<?> tryComplete() {
915 final CompletableFuture<? extends T> a;
916 final CompletableFuture<? extends U> b;
917 final BiConsumer<? super T,? super U> fn;
918 final CompletableFuture<Void> dst;
919 Object r, s; T t; U u; Throwable ex;
920 if ((dst = this.dst) != null &&
921 (fn = this.fn) != null &&
922 (a = this.src) != null &&
923 (r = a.result) != null &&
924 (b = this.snd) != null &&
925 (s = b.result) != null &&
926 get() == 0 && compareAndSet(0, 1)) {
927 if (r instanceof AltResult) {
928 ex = ((AltResult)r).ex;
929 t = null;
930 }
931 else {
932 ex = null;
933 @SuppressWarnings("unchecked") T tr = (T) r;
934 t = tr;
935 }
936 if (ex != null)
937 u = null;
938 else if (s instanceof AltResult) {
939 ex = ((AltResult)s).ex;
940 u = null;
941 }
942 else {
943 @SuppressWarnings("unchecked") U us = (U) s;
944 u = us;
945 }
946 Executor e = executor;
947 if (ex == null) {
948 try {
949 if (e != null)
950 execAsync(e, new AsyncAcceptBoth<T,U>(t, u, fn, dst));
951 else
952 fn.accept(t, u);
953 } catch (Throwable rex) {
954 ex = rex;
955 }
956 }
957 if (e == null || ex != null)
958 dst.internalComplete(null, ex);
959 }
960 return dst;
961 }
962 private static final long serialVersionUID = 5232453952276885070L;
963 }
964
965 static final class RunAfterBoth extends Completion {
966 final CompletableFuture<?> src;
967 final CompletableFuture<?> snd;
968 final Runnable fn;
969 final CompletableFuture<Void> dst;
970 final Executor executor;
971 RunAfterBoth(CompletableFuture<?> src,
972 CompletableFuture<?> snd,
973 Runnable fn,
974 CompletableFuture<Void> dst,
975 Executor executor) {
976 this.src = src; this.snd = snd;
977 this.fn = fn; this.dst = dst;
978 this.executor = executor;
979 }
980 public final CompletableFuture<?> tryComplete() {
981 final CompletableFuture<?> a;
982 final CompletableFuture<?> b;
983 final Runnable fn;
984 final CompletableFuture<Void> dst;
985 Object r, s; Throwable ex;
986 if ((dst = this.dst) != null &&
987 (fn = this.fn) != null &&
988 (a = this.src) != null &&
989 (r = a.result) != null &&
990 (b = this.snd) != null &&
991 (s = b.result) != null &&
992 get() == 0 && compareAndSet(0, 1)) {
993 if (r instanceof AltResult)
994 ex = ((AltResult)r).ex;
995 else
996 ex = null;
997 if (ex == null && (s instanceof AltResult))
998 ex = ((AltResult)s).ex;
999 Executor e = executor;
1000 if (ex == null) {
1001 try {
1002 if (e != null)
1003 execAsync(e, new AsyncRun(fn, dst));
1004 else
1005 fn.run();
1006 } catch (Throwable rex) {
1007 ex = rex;
1008 }
1009 }
1010 if (e == null || ex != null)
1011 dst.internalComplete(null, ex);
1012 }
1013 return dst;
1014 }
1015 private static final long serialVersionUID = 5232453952276885070L;
1016 }
1017
1018 static final class AndCompletion extends Completion {
1019 final CompletableFuture<?> src;
1020 final CompletableFuture<?> snd;
1021 final CompletableFuture<Void> dst;
1022 AndCompletion(CompletableFuture<?> src,
1023 CompletableFuture<?> snd,
1024 CompletableFuture<Void> dst) {
1025 this.src = src; this.snd = snd; this.dst = dst;
1026 }
1027 public final CompletableFuture<?> tryComplete() {
1028 final CompletableFuture<?> a;
1029 final CompletableFuture<?> b;
1030 final CompletableFuture<Void> dst;
1031 Object r, s; Throwable ex;
1032 if ((dst = this.dst) != null &&
1033 (a = this.src) != null &&
1034 (r = a.result) != null &&
1035 (b = this.snd) != null &&
1036 (s = b.result) != null &&
1037 get() == 0 && compareAndSet(0, 1)) {
1038 if (r instanceof AltResult)
1039 ex = ((AltResult)r).ex;
1040 else
1041 ex = null;
1042 if (ex == null && (s instanceof AltResult))
1043 ex = ((AltResult)s).ex;
1044 dst.internalComplete(null, ex);
1045 }
1046 return dst;
1047 }
1048 private static final long serialVersionUID = 5232453952276885070L;
1049 }
1050
1051 static final class ApplyToEither<T,U> extends Completion {
1052 final CompletableFuture<? extends T> src;
1053 final CompletableFuture<? extends T> snd;
1054 final Function<? super T,? extends U> fn;
1055 final CompletableFuture<U> dst;
1056 final Executor executor;
1057 ApplyToEither(CompletableFuture<? extends T> src,
1058 CompletableFuture<? extends T> snd,
1059 Function<? super T,? extends U> fn,
1060 CompletableFuture<U> dst,
1061 Executor executor) {
1062 this.src = src; this.snd = snd;
1063 this.fn = fn; this.dst = dst;
1064 this.executor = executor;
1065 }
1066 public final CompletableFuture<?> tryComplete() {
1067 final CompletableFuture<? extends T> a;
1068 final CompletableFuture<? extends T> b;
1069 final Function<? super T,? extends U> fn;
1070 final CompletableFuture<U> dst;
1071 Object r; T t; Throwable ex;
1072 if ((dst = this.dst) != null &&
1073 (fn = this.fn) != null &&
1074 (((a = this.src) != null && (r = a.result) != null) ||
1075 ((b = this.snd) != null && (r = b.result) != null)) &&
1076 get() == 0 && compareAndSet(0, 1)) {
1077 if (r instanceof AltResult) {
1078 ex = ((AltResult)r).ex;
1079 t = null;
1080 }
1081 else {
1082 ex = null;
1083 @SuppressWarnings("unchecked") T tr = (T) r;
1084 t = tr;
1085 }
1086 Executor e = executor;
1087 U u = null;
1088 if (ex == null) {
1089 try {
1090 if (e != null)
1091 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
1092 else
1093 u = fn.apply(t);
1094 } catch (Throwable rex) {
1095 ex = rex;
1096 }
1097 }
1098 if (e == null || ex != null)
1099 dst.internalComplete(u, ex);
1100 }
1101 return dst;
1102 }
1103 private static final long serialVersionUID = 5232453952276885070L;
1104 }
1105
1106 static final class AcceptEither<T> extends Completion {
1107 final CompletableFuture<? extends T> src;
1108 final CompletableFuture<? extends T> snd;
1109 final Consumer<? super T> fn;
1110 final CompletableFuture<Void> dst;
1111 final Executor executor;
1112 AcceptEither(CompletableFuture<? extends T> src,
1113 CompletableFuture<? extends T> snd,
1114 Consumer<? super T> fn,
1115 CompletableFuture<Void> dst,
1116 Executor executor) {
1117 this.src = src; this.snd = snd;
1118 this.fn = fn; this.dst = dst;
1119 this.executor = executor;
1120 }
1121 public final CompletableFuture<?> tryComplete() {
1122 final CompletableFuture<? extends T> a;
1123 final CompletableFuture<? extends T> b;
1124 final Consumer<? super T> fn;
1125 final CompletableFuture<Void> dst;
1126 Object r; T t; Throwable ex;
1127 if ((dst = this.dst) != null &&
1128 (fn = this.fn) != null &&
1129 (((a = this.src) != null && (r = a.result) != null) ||
1130 ((b = this.snd) != null && (r = b.result) != null)) &&
1131 get() == 0 && compareAndSet(0, 1)) {
1132 if (r instanceof AltResult) {
1133 ex = ((AltResult)r).ex;
1134 t = null;
1135 }
1136 else {
1137 ex = null;
1138 @SuppressWarnings("unchecked") T tr = (T) r;
1139 t = tr;
1140 }
1141 Executor e = executor;
1142 if (ex == null) {
1143 try {
1144 if (e != null)
1145 execAsync(e, new AsyncAccept<T>(t, fn, dst));
1146 else
1147 fn.accept(t);
1148 } catch (Throwable rex) {
1149 ex = rex;
1150 }
1151 }
1152 if (e == null || ex != null)
1153 dst.internalComplete(null, ex);
1154 }
1155 return dst;
1156 }
1157 private static final long serialVersionUID = 5232453952276885070L;
1158 }
1159
1160 static final class RunAfterEither extends Completion {
1161 final CompletableFuture<?> src;
1162 final CompletableFuture<?> snd;
1163 final Runnable fn;
1164 final CompletableFuture<Void> dst;
1165 final Executor executor;
1166 RunAfterEither(CompletableFuture<?> src,
1167 CompletableFuture<?> snd,
1168 Runnable fn,
1169 CompletableFuture<Void> dst,
1170 Executor executor) {
1171 this.src = src; this.snd = snd;
1172 this.fn = fn; this.dst = dst;
1173 this.executor = executor;
1174 }
1175 public final CompletableFuture<?> tryComplete() {
1176 final CompletableFuture<?> a;
1177 final CompletableFuture<?> b;
1178 final Runnable fn;
1179 final CompletableFuture<Void> dst;
1180 Object r; Throwable ex;
1181 if ((dst = this.dst) != null &&
1182 (fn = this.fn) != null &&
1183 (((a = this.src) != null && (r = a.result) != null) ||
1184 ((b = this.snd) != null && (r = b.result) != null)) &&
1185 get() == 0 && compareAndSet(0, 1)) {
1186 if (r instanceof AltResult)
1187 ex = ((AltResult)r).ex;
1188 else
1189 ex = null;
1190 Executor e = executor;
1191 if (ex == null) {
1192 try {
1193 if (e != null)
1194 execAsync(e, new AsyncRun(fn, dst));
1195 else
1196 fn.run();
1197 } catch (Throwable rex) {
1198 ex = rex;
1199 }
1200 }
1201 if (e == null || ex != null)
1202 dst.internalComplete(null, ex);
1203 }
1204 return dst;
1205 }
1206 private static final long serialVersionUID = 5232453952276885070L;
1207 }
1208
1209 static final class OrCompletion extends Completion {
1210 final CompletableFuture<?> src;
1211 final CompletableFuture<?> snd;
1212 final CompletableFuture<Object> dst;
1213 OrCompletion(CompletableFuture<?> src,
1214 CompletableFuture<?> snd,
1215 CompletableFuture<Object> dst) {
1216 this.src = src; this.snd = snd; this.dst = dst;
1217 }
1218 public final CompletableFuture<?> tryComplete() {
1219 final CompletableFuture<?> a;
1220 final CompletableFuture<?> b;
1221 final CompletableFuture<Object> dst;
1222 Object r, t; Throwable ex;
1223 if ((dst = this.dst) != null &&
1224 (((a = this.src) != null && (r = a.result) != null) ||
1225 ((b = this.snd) != null && (r = b.result) != null)) &&
1226 get() == 0 && compareAndSet(0, 1)) {
1227 if (r instanceof AltResult) {
1228 ex = ((AltResult)r).ex;
1229 t = null;
1230 }
1231 else {
1232 ex = null;
1233 t = r;
1234 }
1235 dst.internalComplete(t, ex);
1236 }
1237 return dst;
1238 }
1239 private static final long serialVersionUID = 5232453952276885070L;
1240 }
1241
1242 static final class ExceptionCompletion<T> extends Completion {
1243 final CompletableFuture<? extends T> src;
1244 final Function<? super Throwable, ? extends T> fn;
1245 final CompletableFuture<T> dst;
1246 ExceptionCompletion(CompletableFuture<? extends T> src,
1247 Function<? super Throwable, ? extends T> fn,
1248 CompletableFuture<T> dst) {
1249 this.src = src; this.fn = fn; this.dst = dst;
1250 }
1251 public final CompletableFuture<?> tryComplete() {
1252 final CompletableFuture<? extends T> a;
1253 final Function<? super Throwable, ? extends T> fn;
1254 final CompletableFuture<T> dst;
1255 Object r; T t = null; Throwable ex, dx = null;
1256 if ((dst = this.dst) != null &&
1257 (fn = this.fn) != null &&
1258 (a = this.src) != null &&
1259 (r = a.result) != null &&
1260 get() == 0 && compareAndSet(0, 1)) {
1261 if ((r instanceof AltResult) &&
1262 (ex = ((AltResult)r).ex) != null) {
1263 try {
1264 t = fn.apply(ex);
1265 } catch (Throwable rex) {
1266 dx = rex;
1267 }
1268 }
1269 else {
1270 @SuppressWarnings("unchecked") T tr = (T) r;
1271 t = tr;
1272 }
1273 dst.internalComplete(t, dx);
1274 }
1275 return dst;
1276 }
1277 private static final long serialVersionUID = 5232453952276885070L;
1278 }
1279
1280 static final class WhenCompleteCompletion<T> extends Completion {
1281 final CompletableFuture<? extends T> src;
1282 final BiConsumer<? super T, ? super Throwable> fn;
1283 final CompletableFuture<T> dst;
1284 final Executor executor;
1285 WhenCompleteCompletion(CompletableFuture<? extends T> src,
1286 BiConsumer<? super T, ? super Throwable> fn,
1287 CompletableFuture<T> dst,
1288 Executor executor) {
1289 this.src = src; this.fn = fn; this.dst = dst;
1290 this.executor = executor;
1291 }
1292 public final CompletableFuture<?> tryComplete() {
1293 final CompletableFuture<? extends T> a;
1294 final BiConsumer<? super T, ? super Throwable> fn;
1295 final CompletableFuture<T> dst;
1296 Object r; T t; Throwable ex;
1297 if ((dst = this.dst) != null &&
1298 (fn = this.fn) != null &&
1299 (a = this.src) != null &&
1300 (r = a.result) != null &&
1301 get() == 0 && compareAndSet(0, 1)) {
1302 if (r instanceof AltResult) {
1303 ex = ((AltResult)r).ex;
1304 t = null;
1305 }
1306 else {
1307 ex = null;
1308 @SuppressWarnings("unchecked") T tr = (T) r;
1309 t = tr;
1310 }
1311 Executor e = executor;
1312 Throwable dx = null;
1313 try {
1314 if (e != null)
1315 execAsync(e, new AsyncWhenComplete<T>(t, ex, fn, dst));
1316 else
1317 fn.accept(t, ex);
1318 } catch (Throwable rex) {
1319 dx = rex;
1320 }
1321 if (e == null || dx != null)
1322 dst.internalComplete(t, ex != null ? ex : dx);
1323 }
1324 return dst;
1325 }
1326 private static final long serialVersionUID = 5232453952276885070L;
1327 }
1328
1329 static final class ThenCopy<T> extends Completion {
1330 final CompletableFuture<?> src;
1331 final CompletableFuture<T> dst;
1332 ThenCopy(CompletableFuture<?> src,
1333 CompletableFuture<T> dst) {
1334 this.src = src; this.dst = dst;
1335 }
1336 public final CompletableFuture<?> tryComplete() {
1337 final CompletableFuture<?> a;
1338 final CompletableFuture<T> dst;
1339 Object r; T t; Throwable ex;
1340 if ((dst = this.dst) != null &&
1341 (a = this.src) != null &&
1342 (r = a.result) != null &&
1343 get() == 0 && compareAndSet(0, 1)) {
1344 if (r instanceof AltResult) {
1345 ex = ((AltResult)r).ex;
1346 t = null;
1347 }
1348 else {
1349 ex = null;
1350 @SuppressWarnings("unchecked") T tr = (T) r;
1351 t = tr;
1352 }
1353 dst.internalComplete(t, ex);
1354 }
1355 return dst;
1356 }
1357 private static final long serialVersionUID = 5232453952276885070L;
1358 }
1359
1360 // version of ThenCopy for CompletableFuture<Void> dst
1361 static final class ThenPropagate extends Completion {
1362 final CompletableFuture<?> src;
1363 final CompletableFuture<Void> dst;
1364 ThenPropagate(CompletableFuture<?> src,
1365 CompletableFuture<Void> dst) {
1366 this.src = src; this.dst = dst;
1367 }
1368 public final CompletableFuture<?> tryComplete() {
1369 final CompletableFuture<?> a;
1370 final CompletableFuture<Void> dst;
1371 Object r; Throwable ex;
1372 if ((dst = this.dst) != null &&
1373 (a = this.src) != null &&
1374 (r = a.result) != null &&
1375 get() == 0 && compareAndSet(0, 1)) {
1376 if (r instanceof AltResult)
1377 ex = ((AltResult)r).ex;
1378 else
1379 ex = null;
1380 dst.internalComplete(null, ex);
1381 }
1382 return dst;
1383 }
1384 private static final long serialVersionUID = 5232453952276885070L;
1385 }
1386
1387 static final class HandleCompletion<T,U> extends Completion {
1388 final CompletableFuture<? extends T> src;
1389 final BiFunction<? super T, Throwable, ? extends U> fn;
1390 final CompletableFuture<U> dst;
1391 final Executor executor;
1392 HandleCompletion(CompletableFuture<? extends T> src,
1393 BiFunction<? super T, Throwable, ? extends U> fn,
1394 CompletableFuture<U> dst,
1395 Executor executor) {
1396 this.src = src; this.fn = fn; this.dst = dst;
1397 this.executor = executor;
1398 }
1399 public final CompletableFuture<?> tryComplete() {
1400 final CompletableFuture<? extends T> a;
1401 final BiFunction<? super T, Throwable, ? extends U> fn;
1402 final CompletableFuture<U> dst;
1403 Object r; T t; Throwable ex;
1404 if ((dst = this.dst) != null &&
1405 (fn = this.fn) != null &&
1406 (a = this.src) != null &&
1407 (r = a.result) != null &&
1408 get() == 0 && compareAndSet(0, 1)) {
1409 if (r instanceof AltResult) {
1410 ex = ((AltResult)r).ex;
1411 t = null;
1412 }
1413 else {
1414 ex = null;
1415 @SuppressWarnings("unchecked") T tr = (T) r;
1416 t = tr;
1417 }
1418 Executor e = executor;
1419 U u = null;
1420 Throwable dx = null;
1421 try {
1422 if (e != null)
1423 execAsync(e, new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
1424 else
1425 u = fn.apply(t, ex);
1426 } catch (Throwable rex) {
1427 dx = rex;
1428 }
1429 if (e == null || dx != null)
1430 dst.internalComplete(u, dx);
1431 }
1432 return dst;
1433 }
1434 private static final long serialVersionUID = 5232453952276885070L;
1435 }
1436
1437 static final class ThenCompose<T,U> extends Completion {
1438 final CompletableFuture<? extends T> src;
1439 final Function<? super T, ? extends CompletionStage<U>> fn;
1440 final CompletableFuture<U> dst;
1441 final Executor executor;
1442 ThenCompose(CompletableFuture<? extends T> src,
1443 Function<? super T, ? extends CompletionStage<U>> fn,
1444 CompletableFuture<U> dst,
1445 Executor executor) {
1446 this.src = src; this.fn = fn; this.dst = dst;
1447 this.executor = executor;
1448 }
1449 public final CompletableFuture<?> tryComplete() {
1450 final CompletableFuture<? extends T> a;
1451 final Function<? super T, ? extends CompletionStage<U>> fn;
1452 final CompletableFuture<U> dst;
1453 Object r; T t; Throwable ex; Executor e;
1454 if ((dst = this.dst) != null &&
1455 (fn = this.fn) != null &&
1456 (a = this.src) != null &&
1457 (r = a.result) != null &&
1458 get() == 0 && compareAndSet(0, 1)) {
1459 if (r instanceof AltResult) {
1460 ex = ((AltResult)r).ex;
1461 t = null;
1462 }
1463 else {
1464 ex = null;
1465 @SuppressWarnings("unchecked") T tr = (T) r;
1466 t = tr;
1467 }
1468 CompletableFuture<U> c = null;
1469 U u = null;
1470 boolean complete = false;
1471 if (ex == null) {
1472 if ((e = executor) != null)
1473 execAsync(e, new AsyncCompose<T,U>(t, fn, dst));
1474 else {
1475 try {
1476 CompletionStage<U> cs = fn.apply(t);
1477 c = (cs == null) ? null : cs.toCompletableFuture();
1478 if (c == null)
1479 ex = new NullPointerException();
1480 } catch (Throwable rex) {
1481 ex = rex;
1482 }
1483 }
1484 }
1485 if (c != null) {
1486 Object s;
1487 if ((s = c.result) == null) {
1488 ThenCopy<U> d = new ThenCopy<U>(c, dst);
1489 CompletionNode p = new CompletionNode(d);
1490 while ((s = c.result) == null) {
1491 if (UNSAFE.compareAndSwapObject
1492 (c, COMPLETIONS, p.next = c.completions, p))
1493 break;
1494 }
1495 d.tryComplete();
1496 }
1497 else {
1498 complete = true;
1499 if (s instanceof AltResult) {
1500 ex = ((AltResult)s).ex; // no rewrap
1501 u = null;
1502 }
1503 else {
1504 @SuppressWarnings("unchecked") U us = (U) s;
1505 u = us;
1506 }
1507 }
1508 }
1509 if (complete || ex != null)
1510 dst.internalComplete(u, ex);
1511 }
1512 return dst;
1513 }
1514 private static final long serialVersionUID = 5232453952276885070L;
1515 }
1516
1517 // Implementations of stage methods with (plain, async, Executor) forms
1518
1519 private <U> CompletableFuture<U> doThenApply
1520 (Function<? super T,? extends U> fn,
1521 Executor e) {
1522 if (fn == null) throw new NullPointerException();
1523 CompletableFuture<U> dst = new CompletableFuture<U>();
1524 Object r;
1525 if ((r = result) == null) {
1526 ThenApply<T,U> d = new ThenApply<T,U>(this, fn, dst, e);
1527 CompletionNode p = new CompletionNode(d);
1528 while (result == null) {
1529 if (UNSAFE.compareAndSwapObject
1530 (this, COMPLETIONS, p.next = completions, p))
1531 break;
1532 }
1533 d.tryComplete();
1534 }
1535 else {
1536 T t; Throwable ex;
1537 if (r instanceof AltResult) {
1538 ex = ((AltResult)r).ex;
1539 t = null;
1540 }
1541 else {
1542 ex = null;
1543 @SuppressWarnings("unchecked") T tr = (T) r;
1544 t = tr;
1545 }
1546 U u = null;
1547 if (ex == null) {
1548 try {
1549 if (e != null)
1550 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
1551 else
1552 u = fn.apply(t);
1553 } catch (Throwable rex) {
1554 ex = rex;
1555 }
1556 }
1557 if (e == null || ex != null)
1558 dst.internalComplete(u, ex);
1559 }
1560 return dst;
1561 }
1562
1563 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1564 Executor e) {
1565 if (fn == null) throw new NullPointerException();
1566 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1567 Object r;
1568 if ((r = result) == null) {
1569 ThenAccept<T> d = new ThenAccept<T>(this, fn, dst, e);
1570 CompletionNode p = new CompletionNode(d);
1571 while (result == null) {
1572 if (UNSAFE.compareAndSwapObject
1573 (this, COMPLETIONS, p.next = completions, p))
1574 break;
1575 }
1576 d.tryComplete();
1577 }
1578 else {
1579 T t; Throwable ex;
1580 if (r instanceof AltResult) {
1581 ex = ((AltResult)r).ex;
1582 t = null;
1583 }
1584 else {
1585 ex = null;
1586 @SuppressWarnings("unchecked") T tr = (T) r;
1587 t = tr;
1588 }
1589 if (ex == null) {
1590 try {
1591 if (e != null)
1592 execAsync(e, new AsyncAccept<T>(t, fn, dst));
1593 else
1594 fn.accept(t);
1595 } catch (Throwable rex) {
1596 ex = rex;
1597 }
1598 }
1599 if (e == null || ex != null)
1600 dst.internalComplete(null, ex);
1601 }
1602 return dst;
1603 }
1604
1605 private CompletableFuture<Void> doThenRun(Runnable action,
1606 Executor e) {
1607 if (action == null) throw new NullPointerException();
1608 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1609 Object r;
1610 if ((r = result) == null) {
1611 ThenRun d = new ThenRun(this, action, dst, e);
1612 CompletionNode p = new CompletionNode(d);
1613 while (result == null) {
1614 if (UNSAFE.compareAndSwapObject
1615 (this, COMPLETIONS, p.next = completions, p))
1616 break;
1617 }
1618 d.tryComplete();
1619 }
1620 else {
1621 Throwable ex;
1622 if (r instanceof AltResult)
1623 ex = ((AltResult)r).ex;
1624 else
1625 ex = null;
1626 if (ex == null) {
1627 try {
1628 if (e != null)
1629 execAsync(e, new AsyncRun(action, dst));
1630 else
1631 action.run();
1632 } catch (Throwable rex) {
1633 ex = rex;
1634 }
1635 }
1636 if (e == null || ex != null)
1637 dst.internalComplete(null, ex);
1638 }
1639 return dst;
1640 }
1641
1642 private <U,V> CompletableFuture<V> doThenCombine
1643 (CompletableFuture<? extends U> other,
1644 BiFunction<? super T,? super U,? extends V> fn,
1645 Executor e) {
1646 if (other == null || fn == null) throw new NullPointerException();
1647 CompletableFuture<V> dst = new CompletableFuture<V>();
1648 Object r = result, s = other.result;
1649 if (r == null || s == null) {
1650 ThenCombine<T,U,V> d =
1651 new ThenCombine<T,U,V>(this, other, fn, dst, e);
1652 CompletionNode q = null, p = new CompletionNode(d);
1653 while ((r == null && (r = result) == null) ||
1654 (s == null && (s = other.result) == null)) {
1655 if (q != null) {
1656 if (s != null ||
1657 UNSAFE.compareAndSwapObject
1658 (other, COMPLETIONS, q.next = other.completions, q))
1659 break;
1660 }
1661 else if (r != null ||
1662 UNSAFE.compareAndSwapObject
1663 (this, COMPLETIONS, p.next = completions, p)) {
1664 if (s != null)
1665 break;
1666 q = new CompletionNode(d);
1667 }
1668 }
1669 d.tryComplete();
1670 }
1671 else {
1672 T t; U u; Throwable ex;
1673 if (r instanceof AltResult) {
1674 ex = ((AltResult)r).ex;
1675 t = null;
1676 }
1677 else {
1678 ex = null;
1679 @SuppressWarnings("unchecked") T tr = (T) r;
1680 t = tr;
1681 }
1682 if (ex != null)
1683 u = null;
1684 else if (s instanceof AltResult) {
1685 ex = ((AltResult)s).ex;
1686 u = null;
1687 }
1688 else {
1689 @SuppressWarnings("unchecked") U us = (U) s;
1690 u = us;
1691 }
1692 V v = null;
1693 if (ex == null) {
1694 try {
1695 if (e != null)
1696 execAsync(e, new AsyncCombine<T,U,V>(t, u, fn, dst));
1697 else
1698 v = fn.apply(t, u);
1699 } catch (Throwable rex) {
1700 ex = rex;
1701 }
1702 }
1703 if (e == null || ex != null)
1704 dst.internalComplete(v, ex);
1705 }
1706 return dst;
1707 }
1708
1709 private <U> CompletableFuture<Void> doThenAcceptBoth
1710 (CompletableFuture<? extends U> other,
1711 BiConsumer<? super T,? super U> fn,
1712 Executor e) {
1713 if (other == null || fn == null) throw new NullPointerException();
1714 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1715 Object r = result, s = other.result;
1716 if (r == null || s == null) {
1717 ThenAcceptBoth<T,U> d =
1718 new ThenAcceptBoth<T,U>(this, other, fn, dst, e);
1719 CompletionNode q = null, p = new CompletionNode(d);
1720 while ((r == null && (r = result) == null) ||
1721 (s == null && (s = other.result) == null)) {
1722 if (q != null) {
1723 if (s != null ||
1724 UNSAFE.compareAndSwapObject
1725 (other, COMPLETIONS, q.next = other.completions, q))
1726 break;
1727 }
1728 else if (r != null ||
1729 UNSAFE.compareAndSwapObject
1730 (this, COMPLETIONS, p.next = completions, p)) {
1731 if (s != null)
1732 break;
1733 q = new CompletionNode(d);
1734 }
1735 }
1736 d.tryComplete();
1737 }
1738 else {
1739 T t; U u; Throwable ex;
1740 if (r instanceof AltResult) {
1741 ex = ((AltResult)r).ex;
1742 t = null;
1743 }
1744 else {
1745 ex = null;
1746 @SuppressWarnings("unchecked") T tr = (T) r;
1747 t = tr;
1748 }
1749 if (ex != null)
1750 u = null;
1751 else if (s instanceof AltResult) {
1752 ex = ((AltResult)s).ex;
1753 u = null;
1754 }
1755 else {
1756 @SuppressWarnings("unchecked") U us = (U) s;
1757 u = us;
1758 }
1759 if (ex == null) {
1760 try {
1761 if (e != null)
1762 execAsync(e, new AsyncAcceptBoth<T,U>(t, u, fn, dst));
1763 else
1764 fn.accept(t, u);
1765 } catch (Throwable rex) {
1766 ex = rex;
1767 }
1768 }
1769 if (e == null || ex != null)
1770 dst.internalComplete(null, ex);
1771 }
1772 return dst;
1773 }
1774
1775 private CompletableFuture<Void> doRunAfterBoth(CompletableFuture<?> other,
1776 Runnable action,
1777 Executor e) {
1778 if (other == null || action == null) throw new NullPointerException();
1779 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1780 Object r = result, s = other.result;
1781 if (r == null || s == null) {
1782 RunAfterBoth d = new RunAfterBoth(this, other, action, dst, e);
1783 CompletionNode q = null, p = new CompletionNode(d);
1784 while ((r == null && (r = result) == null) ||
1785 (s == null && (s = other.result) == null)) {
1786 if (q != null) {
1787 if (s != null ||
1788 UNSAFE.compareAndSwapObject
1789 (other, COMPLETIONS, q.next = other.completions, q))
1790 break;
1791 }
1792 else if (r != null ||
1793 UNSAFE.compareAndSwapObject
1794 (this, COMPLETIONS, p.next = completions, p)) {
1795 if (s != null)
1796 break;
1797 q = new CompletionNode(d);
1798 }
1799 }
1800 d.tryComplete();
1801 }
1802 else {
1803 Throwable ex;
1804 if (r instanceof AltResult)
1805 ex = ((AltResult)r).ex;
1806 else
1807 ex = null;
1808 if (ex == null && (s instanceof AltResult))
1809 ex = ((AltResult)s).ex;
1810 if (ex == null) {
1811 try {
1812 if (e != null)
1813 execAsync(e, new AsyncRun(action, dst));
1814 else
1815 action.run();
1816 } catch (Throwable rex) {
1817 ex = rex;
1818 }
1819 }
1820 if (e == null || ex != null)
1821 dst.internalComplete(null, ex);
1822 }
1823 return dst;
1824 }
1825
1826 private <U> CompletableFuture<U> doApplyToEither
1827 (CompletableFuture<? extends T> other,
1828 Function<? super T, U> fn,
1829 Executor e) {
1830 if (other == null || fn == null) throw new NullPointerException();
1831 CompletableFuture<U> dst = new CompletableFuture<U>();
1832 Object r;
1833 if ((r = result) == null && (r = other.result) == null) {
1834 ApplyToEither<T,U> d =
1835 new ApplyToEither<T,U>(this, other, fn, dst, e);
1836 CompletionNode q = null, p = new CompletionNode(d);
1837 while (result == null && other.result == null) {
1838 if (q != null) {
1839 if (UNSAFE.compareAndSwapObject
1840 (other, COMPLETIONS, q.next = other.completions, q))
1841 break;
1842 }
1843 else if (UNSAFE.compareAndSwapObject
1844 (this, COMPLETIONS, p.next = completions, p))
1845 q = new CompletionNode(d);
1846 }
1847 d.tryComplete();
1848 }
1849 else {
1850 T t; Throwable ex;
1851 if (r instanceof AltResult) {
1852 ex = ((AltResult)r).ex;
1853 t = null;
1854 }
1855 else {
1856 ex = null;
1857 @SuppressWarnings("unchecked") T tr = (T) r;
1858 t = tr;
1859 }
1860 U u = null;
1861 if (ex == null) {
1862 try {
1863 if (e != null)
1864 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
1865 else
1866 u = fn.apply(t);
1867 } catch (Throwable rex) {
1868 ex = rex;
1869 }
1870 }
1871 if (e == null || ex != null)
1872 dst.internalComplete(u, ex);
1873 }
1874 return dst;
1875 }
1876
1877 private CompletableFuture<Void> doAcceptEither
1878 (CompletableFuture<? extends T> other,
1879 Consumer<? super T> fn,
1880 Executor e) {
1881 if (other == null || fn == null) throw new NullPointerException();
1882 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1883 Object r;
1884 if ((r = result) == null && (r = other.result) == null) {
1885 AcceptEither<T> d =
1886 new AcceptEither<T>(this, other, fn, dst, e);
1887 CompletionNode q = null, p = new CompletionNode(d);
1888 while (result == null && other.result == null) {
1889 if (q != null) {
1890 if (UNSAFE.compareAndSwapObject
1891 (other, COMPLETIONS, q.next = other.completions, q))
1892 break;
1893 }
1894 else if (UNSAFE.compareAndSwapObject
1895 (this, COMPLETIONS, p.next = completions, p))
1896 q = new CompletionNode(d);
1897 }
1898 d.tryComplete();
1899 }
1900 else {
1901 T t; Throwable ex;
1902 if (r instanceof AltResult) {
1903 ex = ((AltResult)r).ex;
1904 t = null;
1905 }
1906 else {
1907 ex = null;
1908 @SuppressWarnings("unchecked") T tr = (T) r;
1909 t = tr;
1910 }
1911 if (ex == null) {
1912 try {
1913 if (e != null)
1914 execAsync(e, new AsyncAccept<T>(t, fn, dst));
1915 else
1916 fn.accept(t);
1917 } catch (Throwable rex) {
1918 ex = rex;
1919 }
1920 }
1921 if (e == null || ex != null)
1922 dst.internalComplete(null, ex);
1923 }
1924 return dst;
1925 }
1926
1927 private CompletableFuture<Void> doRunAfterEither
1928 (CompletableFuture<?> other,
1929 Runnable action,
1930 Executor e) {
1931 if (other == null || action == null) throw new NullPointerException();
1932 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1933 Object r;
1934 if ((r = result) == null && (r = other.result) == null) {
1935 RunAfterEither d =
1936 new RunAfterEither(this, other, action, dst, e);
1937 CompletionNode q = null, p = new CompletionNode(d);
1938 while (result == null && other.result == null) {
1939 if (q != null) {
1940 if (UNSAFE.compareAndSwapObject
1941 (other, COMPLETIONS, q.next = other.completions, q))
1942 break;
1943 }
1944 else if (UNSAFE.compareAndSwapObject
1945 (this, COMPLETIONS, p.next = completions, p))
1946 q = new CompletionNode(d);
1947 }
1948 d.tryComplete();
1949 }
1950 else {
1951 Throwable ex;
1952 if (r instanceof AltResult)
1953 ex = ((AltResult)r).ex;
1954 else
1955 ex = null;
1956 if (ex == null) {
1957 try {
1958 if (e != null)
1959 execAsync(e, new AsyncRun(action, dst));
1960 else
1961 action.run();
1962 } catch (Throwable rex) {
1963 ex = rex;
1964 }
1965 }
1966 if (e == null || ex != null)
1967 dst.internalComplete(null, ex);
1968 }
1969 return dst;
1970 }
1971
1972 private <U> CompletableFuture<U> doThenCompose
1973 (Function<? super T, ? extends CompletionStage<U>> fn,
1974 Executor e) {
1975 if (fn == null) throw new NullPointerException();
1976 CompletableFuture<U> dst = null;
1977 Object r;
1978 if ((r = result) == null) {
1979 dst = new CompletableFuture<U>();
1980 ThenCompose<T,U> d = new ThenCompose<T,U>(this, fn, dst, e);
1981 CompletionNode p = new CompletionNode(d);
1982 while (result == null) {
1983 if (UNSAFE.compareAndSwapObject
1984 (this, COMPLETIONS, p.next = completions, p))
1985 break;
1986 }
1987 d.tryComplete();
1988 }
1989 else {
1990 T t; Throwable ex;
1991 if (r instanceof AltResult) {
1992 ex = ((AltResult)r).ex;
1993 t = null;
1994 }
1995 else {
1996 ex = null;
1997 @SuppressWarnings("unchecked") T tr = (T) r;
1998 t = tr;
1999 }
2000 if (ex == null) {
2001 if (e != null) {
2002 if (dst == null)
2003 dst = new CompletableFuture<U>();
2004 execAsync(e, new AsyncCompose<T,U>(t, fn, dst));
2005 }
2006 else {
2007 try {
2008 CompletionStage<U> cs = fn.apply(t);
2009 if (cs == null ||
2010 (dst = cs.toCompletableFuture()) == null)
2011 ex = new NullPointerException();
2012 } catch (Throwable rex) {
2013 ex = rex;
2014 }
2015 }
2016 }
2017 if (dst == null)
2018 dst = new CompletableFuture<U>();
2019 if (ex != null)
2020 dst.internalComplete(null, ex);
2021 }
2022 return dst;
2023 }
2024
2025 private CompletableFuture<T> doWhenComplete
2026 (BiConsumer<? super T, ? super Throwable> fn,
2027 Executor e) {
2028 if (fn == null) throw new NullPointerException();
2029 CompletableFuture<T> dst = new CompletableFuture<T>();
2030 Object r;
2031 if ((r = result) == null) {
2032 WhenCompleteCompletion<T> d =
2033 new WhenCompleteCompletion<T>(this, fn, dst, e);
2034 CompletionNode p = new CompletionNode(d);
2035 while (result == null) {
2036 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2037 p.next = completions, p))
2038 break;
2039 }
2040 d.tryComplete();
2041 }
2042 else {
2043 T t; Throwable ex;
2044 if (r instanceof AltResult) {
2045 ex = ((AltResult)r).ex;
2046 t = null;
2047 }
2048 else {
2049 ex = null;
2050 @SuppressWarnings("unchecked") T tr = (T) r;
2051 t = tr;
2052 }
2053 Throwable dx = null;
2054 try {
2055 if (e != null)
2056 execAsync(e, new AsyncWhenComplete<T>(t, ex, fn, dst));
2057 else
2058 fn.accept(t, ex);
2059 } catch (Throwable rex) {
2060 dx = rex;
2061 }
2062 if (e == null || dx != null)
2063 dst.internalComplete(t, ex != null ? ex : dx);
2064 }
2065 return dst;
2066 }
2067
2068 private <U> CompletableFuture<U> doHandle
2069 (BiFunction<? super T, Throwable, ? extends U> fn,
2070 Executor e) {
2071 if (fn == null) throw new NullPointerException();
2072 CompletableFuture<U> dst = new CompletableFuture<U>();
2073 Object r;
2074 if ((r = result) == null) {
2075 HandleCompletion<T,U> d =
2076 new HandleCompletion<T,U>(this, fn, dst, e);
2077 CompletionNode p = new CompletionNode(d);
2078 while (result == null) {
2079 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2080 p.next = completions, p))
2081 break;
2082 }
2083 d.tryComplete();
2084 }
2085 else {
2086 T t; Throwable ex;
2087 if (r instanceof AltResult) {
2088 ex = ((AltResult)r).ex;
2089 t = null;
2090 }
2091 else {
2092 ex = null;
2093 @SuppressWarnings("unchecked") T tr = (T) r;
2094 t = tr;
2095 }
2096 U u = null;
2097 Throwable dx = null;
2098 try {
2099 if (e != null)
2100 execAsync(e, new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
2101 else {
2102 u = fn.apply(t, ex);
2103 dx = null;
2104 }
2105 } catch (Throwable rex) {
2106 dx = rex;
2107 u = null;
2108 }
2109 if (e == null || dx != null)
2110 dst.internalComplete(u, dx);
2111 }
2112 return dst;
2113 }
2114
2115 // public methods
2116
2117 /**
2118 * Creates a new incomplete CompletableFuture.
2119 */
2120 public CompletableFuture() {
2121 }
2122
2123 /**
2124 * Returns a new CompletableFuture that is asynchronously completed
2125 * by a task running in the {@link ForkJoinPool#commonPool()} with
2126 * the value obtained by calling the given Supplier.
2127 *
2128 * @param supplier a function returning the value to be used
2129 * to complete the returned CompletableFuture
2130 * @param <U> the function's return type
2131 * @return the new CompletableFuture
2132 */
2133 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
2134 if (supplier == null) throw new NullPointerException();
2135 CompletableFuture<U> f = new CompletableFuture<U>();
2136 execAsync(ForkJoinPool.commonPool(), new AsyncSupply<U>(supplier, f));
2137 return f;
2138 }
2139
2140 /**
2141 * Returns a new CompletableFuture that is asynchronously completed
2142 * by a task running in the given executor with the value obtained
2143 * by calling the given Supplier.
2144 *
2145 * @param supplier a function returning the value to be used
2146 * to complete the returned CompletableFuture
2147 * @param executor the executor to use for asynchronous execution
2148 * @param <U> the function's return type
2149 * @return the new CompletableFuture
2150 */
2151 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
2152 Executor executor) {
2153 if (executor == null || supplier == null)
2154 throw new NullPointerException();
2155 CompletableFuture<U> f = new CompletableFuture<U>();
2156 execAsync(executor, new AsyncSupply<U>(supplier, f));
2157 return f;
2158 }
2159
2160 /**
2161 * Returns a new CompletableFuture that is asynchronously completed
2162 * by a task running in the {@link ForkJoinPool#commonPool()} after
2163 * it runs the given action.
2164 *
2165 * @param runnable the action to run before completing the
2166 * returned CompletableFuture
2167 * @return the new CompletableFuture
2168 */
2169 public static CompletableFuture<Void> runAsync(Runnable runnable) {
2170 if (runnable == null) throw new NullPointerException();
2171 CompletableFuture<Void> f = new CompletableFuture<Void>();
2172 execAsync(ForkJoinPool.commonPool(), new AsyncRun(runnable, f));
2173 return f;
2174 }
2175
2176 /**
2177 * Returns a new CompletableFuture that is asynchronously completed
2178 * by a task running in the given executor after it runs the given
2179 * action.
2180 *
2181 * @param runnable the action to run before completing the
2182 * returned CompletableFuture
2183 * @param executor the executor to use for asynchronous execution
2184 * @return the new CompletableFuture
2185 */
2186 public static CompletableFuture<Void> runAsync(Runnable runnable,
2187 Executor executor) {
2188 if (executor == null || runnable == null)
2189 throw new NullPointerException();
2190 CompletableFuture<Void> f = new CompletableFuture<Void>();
2191 execAsync(executor, new AsyncRun(runnable, f));
2192 return f;
2193 }
2194
2195 /**
2196 * Returns a new CompletableFuture that is already completed with
2197 * the given value.
2198 *
2199 * @param value the value
2200 * @param <U> the type of the value
2201 * @return the completed CompletableFuture
2202 */
2203 public static <U> CompletableFuture<U> completedFuture(U value) {
2204 CompletableFuture<U> f = new CompletableFuture<U>();
2205 f.result = (value == null) ? NIL : value;
2206 return f;
2207 }
2208
2209 /**
2210 * Returns {@code true} if completed in any fashion: normally,
2211 * exceptionally, or via cancellation.
2212 *
2213 * @return {@code true} if completed
2214 */
2215 public boolean isDone() {
2216 return result != null;
2217 }
2218
2219 /**
2220 * Waits if necessary for this future to complete, and then
2221 * returns its result.
2222 *
2223 * @return the result value
2224 * @throws CancellationException if this future was cancelled
2225 * @throws ExecutionException if this future completed exceptionally
2226 * @throws InterruptedException if the current thread was interrupted
2227 * while waiting
2228 */
2229 public T get() throws InterruptedException, ExecutionException {
2230 Object r; Throwable ex, cause;
2231 if ((r = result) == null && (r = waitingGet(true)) == null)
2232 throw new InterruptedException();
2233 if (!(r instanceof AltResult)) {
2234 @SuppressWarnings("unchecked") T tr = (T) r;
2235 return tr;
2236 }
2237 if ((ex = ((AltResult)r).ex) == null)
2238 return null;
2239 if (ex instanceof CancellationException)
2240 throw (CancellationException)ex;
2241 if ((ex instanceof CompletionException) &&
2242 (cause = ex.getCause()) != null)
2243 ex = cause;
2244 throw new ExecutionException(ex);
2245 }
2246
2247 /**
2248 * Waits if necessary for at most the given time for this future
2249 * to complete, and then returns its result, if available.
2250 *
2251 * @param timeout the maximum time to wait
2252 * @param unit the time unit of the timeout argument
2253 * @return the result value
2254 * @throws CancellationException if this future was cancelled
2255 * @throws ExecutionException if this future completed exceptionally
2256 * @throws InterruptedException if the current thread was interrupted
2257 * while waiting
2258 * @throws TimeoutException if the wait timed out
2259 */
2260 public T get(long timeout, TimeUnit unit)
2261 throws InterruptedException, ExecutionException, TimeoutException {
2262 Object r; Throwable ex, cause;
2263 long nanos = unit.toNanos(timeout);
2264 if (Thread.interrupted())
2265 throw new InterruptedException();
2266 if ((r = result) == null)
2267 r = timedAwaitDone(nanos);
2268 if (!(r instanceof AltResult)) {
2269 @SuppressWarnings("unchecked") T tr = (T) r;
2270 return tr;
2271 }
2272 if ((ex = ((AltResult)r).ex) == null)
2273 return null;
2274 if (ex instanceof CancellationException)
2275 throw (CancellationException)ex;
2276 if ((ex instanceof CompletionException) &&
2277 (cause = ex.getCause()) != null)
2278 ex = cause;
2279 throw new ExecutionException(ex);
2280 }
2281
2282 /**
2283 * Returns the result value when complete, or throws an
2284 * (unchecked) exception if completed exceptionally. To better
2285 * conform with the use of common functional forms, if a
2286 * computation involved in the completion of this
2287 * CompletableFuture threw an exception, this method throws an
2288 * (unchecked) {@link CompletionException} with the underlying
2289 * exception as its cause.
2290 *
2291 * @return the result value
2292 * @throws CancellationException if the computation was cancelled
2293 * @throws CompletionException if this future completed
2294 * exceptionally or a completion computation threw an exception
2295 */
2296 public T join() {
2297 Object r; Throwable ex;
2298 if ((r = result) == null)
2299 r = waitingGet(false);
2300 if (!(r instanceof AltResult)) {
2301 @SuppressWarnings("unchecked") T tr = (T) r;
2302 return tr;
2303 }
2304 if ((ex = ((AltResult)r).ex) == null)
2305 return null;
2306 if (ex instanceof CancellationException)
2307 throw (CancellationException)ex;
2308 if (ex instanceof CompletionException)
2309 throw (CompletionException)ex;
2310 throw new CompletionException(ex);
2311 }
2312
2313 /**
2314 * Returns the result value (or throws any encountered exception)
2315 * if completed, else returns the given valueIfAbsent.
2316 *
2317 * @param valueIfAbsent the value to return if not completed
2318 * @return the result value, if completed, else the given valueIfAbsent
2319 * @throws CancellationException if the computation was cancelled
2320 * @throws CompletionException if this future completed
2321 * exceptionally or a completion computation threw an exception
2322 */
2323 public T getNow(T valueIfAbsent) {
2324 Object r; Throwable ex;
2325 if ((r = result) == null)
2326 return valueIfAbsent;
2327 if (!(r instanceof AltResult)) {
2328 @SuppressWarnings("unchecked") T tr = (T) r;
2329 return tr;
2330 }
2331 if ((ex = ((AltResult)r).ex) == null)
2332 return null;
2333 if (ex instanceof CancellationException)
2334 throw (CancellationException)ex;
2335 if (ex instanceof CompletionException)
2336 throw (CompletionException)ex;
2337 throw new CompletionException(ex);
2338 }
2339
2340 /**
2341 * If not already completed, sets the value returned by {@link
2342 * #get()} and related methods to the given value.
2343 *
2344 * @param value the result value
2345 * @return {@code true} if this invocation caused this CompletableFuture
2346 * to transition to a completed state, else {@code false}
2347 */
2348 public boolean complete(T value) {
2349 boolean triggered = result == null &&
2350 UNSAFE.compareAndSwapObject(this, RESULT, null,
2351 value == null ? NIL : value);
2352 postComplete(this);
2353 return triggered;
2354 }
2355
2356 /**
2357 * If not already completed, causes invocations of {@link #get()}
2358 * and related methods to throw the given exception.
2359 *
2360 * @param ex the exception
2361 * @return {@code true} if this invocation caused this CompletableFuture
2362 * to transition to a completed state, else {@code false}
2363 */
2364 public boolean completeExceptionally(Throwable ex) {
2365 if (ex == null) throw new NullPointerException();
2366 boolean triggered = result == null &&
2367 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
2368 postComplete(this);
2369 return triggered;
2370 }
2371
2372 // CompletionStage methods
2373
2374 public <U> CompletableFuture<U> thenApply
2375 (Function<? super T,? extends U> fn) {
2376 return doThenApply(fn, null);
2377 }
2378
2379 public <U> CompletableFuture<U> thenApplyAsync
2380 (Function<? super T,? extends U> fn) {
2381 return doThenApply(fn, ForkJoinPool.commonPool());
2382 }
2383
2384 public <U> CompletableFuture<U> thenApplyAsync
2385 (Function<? super T,? extends U> fn,
2386 Executor executor) {
2387 if (executor == null) throw new NullPointerException();
2388 return doThenApply(fn, executor);
2389 }
2390
2391 public CompletableFuture<Void> thenAccept
2392 (Consumer<? super T> action) {
2393 return doThenAccept(action, null);
2394 }
2395
2396 public CompletableFuture<Void> thenAcceptAsync
2397 (Consumer<? super T> action) {
2398 return doThenAccept(action, ForkJoinPool.commonPool());
2399 }
2400
2401 public CompletableFuture<Void> thenAcceptAsync
2402 (Consumer<? super T> action,
2403 Executor executor) {
2404 if (executor == null) throw new NullPointerException();
2405 return doThenAccept(action, executor);
2406 }
2407
2408 public CompletableFuture<Void> thenRun
2409 (Runnable action) {
2410 return doThenRun(action, null);
2411 }
2412
2413 public CompletableFuture<Void> thenRunAsync
2414 (Runnable action) {
2415 return doThenRun(action, ForkJoinPool.commonPool());
2416 }
2417
2418 public CompletableFuture<Void> thenRunAsync
2419 (Runnable action,
2420 Executor executor) {
2421 if (executor == null) throw new NullPointerException();
2422 return doThenRun(action, executor);
2423 }
2424
2425 public <U,V> CompletableFuture<V> thenCombine
2426 (CompletionStage<? extends U> other,
2427 BiFunction<? super T,? super U,? extends V> fn) {
2428 return doThenCombine(other.toCompletableFuture(), fn, null);
2429 }
2430
2431 public <U,V> CompletableFuture<V> thenCombineAsync
2432 (CompletionStage<? extends U> other,
2433 BiFunction<? super T,? super U,? extends V> fn) {
2434 return doThenCombine(other.toCompletableFuture(), fn,
2435 ForkJoinPool.commonPool());
2436 }
2437
2438 public <U,V> CompletableFuture<V> thenCombineAsync
2439 (CompletionStage<? extends U> other,
2440 BiFunction<? super T,? super U,? extends V> fn,
2441 Executor executor) {
2442 if (executor == null) throw new NullPointerException();
2443 return doThenCombine(other.toCompletableFuture(), fn, executor);
2444 }
2445
2446 public <U> CompletableFuture<Void> thenAcceptBoth
2447 (CompletionStage<? extends U> other,
2448 BiConsumer<? super T, ? super U> action) {
2449 return doThenAcceptBoth(other.toCompletableFuture(), action, null);
2450 }
2451
2452 public <U> CompletableFuture<Void> thenAcceptBothAsync
2453 (CompletionStage<? extends U> other,
2454 BiConsumer<? super T, ? super U> action) {
2455 return doThenAcceptBoth(other.toCompletableFuture(), action,
2456 ForkJoinPool.commonPool());
2457 }
2458
2459 public <U> CompletableFuture<Void> thenAcceptBothAsync
2460 (CompletionStage<? extends U> other,
2461 BiConsumer<? super T, ? super U> action,
2462 Executor executor) {
2463 if (executor == null) throw new NullPointerException();
2464 return doThenAcceptBoth(other.toCompletableFuture(), action, executor);
2465 }
2466
2467 public CompletableFuture<Void> runAfterBoth
2468 (CompletionStage<?> other,
2469 Runnable action) {
2470 return doRunAfterBoth(other.toCompletableFuture(), action, null);
2471 }
2472
2473 public CompletableFuture<Void> runAfterBothAsync
2474 (CompletionStage<?> other,
2475 Runnable action) {
2476 return doRunAfterBoth(other.toCompletableFuture(), action,
2477 ForkJoinPool.commonPool());
2478 }
2479
2480 public CompletableFuture<Void> runAfterBothAsync
2481 (CompletionStage<?> other,
2482 Runnable action,
2483 Executor executor) {
2484 if (executor == null) throw new NullPointerException();
2485 return doRunAfterBoth(other.toCompletableFuture(), action, executor);
2486 }
2487
2488
2489 public <U> CompletableFuture<U> applyToEither
2490 (CompletionStage<? extends T> other,
2491 Function<? super T, U> fn) {
2492 return doApplyToEither(other.toCompletableFuture(), fn, null);
2493 }
2494
2495 public <U> CompletableFuture<U> applyToEitherAsync
2496 (CompletionStage<? extends T> other,
2497 Function<? super T, U> fn) {
2498 return doApplyToEither(other.toCompletableFuture(), fn,
2499 ForkJoinPool.commonPool());
2500 }
2501
2502 public <U> CompletableFuture<U> applyToEitherAsync
2503 (CompletionStage<? extends T> other,
2504 Function<? super T, U> fn,
2505 Executor executor) {
2506 if (executor == null) throw new NullPointerException();
2507 return doApplyToEither(other.toCompletableFuture(), fn, executor);
2508 }
2509
2510 public CompletableFuture<Void> acceptEither
2511 (CompletionStage<? extends T> other,
2512 Consumer<? super T> action) {
2513 return doAcceptEither(other.toCompletableFuture(), action, null);
2514 }
2515
2516 public CompletableFuture<Void> acceptEitherAsync
2517 (CompletionStage<? extends T> other,
2518 Consumer<? super T> action) {
2519 return doAcceptEither(other.toCompletableFuture(), action,
2520 ForkJoinPool.commonPool());
2521 }
2522
2523 public CompletableFuture<Void> acceptEitherAsync
2524 (CompletionStage<? extends T> other,
2525 Consumer<? super T> action,
2526 Executor executor) {
2527 if (executor == null) throw new NullPointerException();
2528 return doAcceptEither(other.toCompletableFuture(), action, executor);
2529 }
2530
2531 public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
2532 Runnable action) {
2533 return doRunAfterEither(other.toCompletableFuture(), action, null);
2534 }
2535
2536 public CompletableFuture<Void> runAfterEitherAsync
2537 (CompletionStage<?> other,
2538 Runnable action) {
2539 return doRunAfterEither(other.toCompletableFuture(), action,
2540 ForkJoinPool.commonPool());
2541 }
2542
2543 public CompletableFuture<Void> runAfterEitherAsync
2544 (CompletionStage<?> other,
2545 Runnable action,
2546 Executor executor) {
2547 if (executor == null) throw new NullPointerException();
2548 return doRunAfterEither(other.toCompletableFuture(), action, executor);
2549 }
2550
2551 public <U> CompletableFuture<U> thenCompose
2552 (Function<? super T, ? extends CompletionStage<U>> fn) {
2553 return doThenCompose(fn, null);
2554 }
2555
2556 public <U> CompletableFuture<U> thenComposeAsync
2557 (Function<? super T, ? extends CompletionStage<U>> fn) {
2558 return doThenCompose(fn, ForkJoinPool.commonPool());
2559 }
2560
2561 public <U> CompletableFuture<U> thenComposeAsync
2562 (Function<? super T, ? extends CompletionStage<U>> fn,
2563 Executor executor) {
2564 if (executor == null) throw new NullPointerException();
2565 return doThenCompose(fn, executor);
2566 }
2567
2568 public CompletableFuture<T> whenComplete
2569 (BiConsumer<? super T, ? super Throwable> action) {
2570 return doWhenComplete(action, null);
2571 }
2572
2573 public CompletableFuture<T> whenCompleteAsync
2574 (BiConsumer<? super T, ? super Throwable> action) {
2575 return doWhenComplete(action, ForkJoinPool.commonPool());
2576 }
2577
2578 public CompletableFuture<T> whenCompleteAsync
2579 (BiConsumer<? super T, ? super Throwable> action,
2580 Executor executor) {
2581 if (executor == null) throw new NullPointerException();
2582 return doWhenComplete(action, executor);
2583 }
2584
2585 public <U> CompletableFuture<U> handle
2586 (BiFunction<? super T, Throwable, ? extends U> fn) {
2587 return doHandle(fn, null);
2588 }
2589
2590 public <U> CompletableFuture<U> handleAsync
2591 (BiFunction<? super T, Throwable, ? extends U> fn) {
2592 return doHandle(fn, ForkJoinPool.commonPool());
2593 }
2594
2595 public <U> CompletableFuture<U> handleAsync
2596 (BiFunction<? super T, Throwable, ? extends U> fn,
2597 Executor executor) {
2598 if (executor == null) throw new NullPointerException();
2599 return doHandle(fn, executor);
2600 }
2601
2602 /**
2603 * Returns this CompletableFuture
2604 *
2605 * @return this CompletableFuture
2606 */
2607 public CompletableFuture<T> toCompletableFuture() {
2608 return this;
2609 }
2610
2611 // not in interface CompletionStage
2612
2613 /**
2614 * Returns a new CompletableFuture that is completed when this
2615 * CompletableFuture completes, with the result of the given
2616 * function of the exception triggering this CompletableFuture's
2617 * completion when it completes exceptionally; otherwise, if this
2618 * CompletableFuture completes normally, then the returned
2619 * CompletableFuture also completes normally with the same value.
2620 * Note: More flexible versions of this functionality are
2621 * available using methods {@code whenComplete} and {@code handle}.
2622 *
2623 * @param fn the function to use to compute the value of the
2624 * returned CompletableFuture if this CompletableFuture completed
2625 * exceptionally
2626 * @return the new CompletableFuture
2627 */
2628 public CompletableFuture<T> exceptionally
2629 (Function<Throwable, ? extends T> fn) {
2630 if (fn == null) throw new NullPointerException();
2631 CompletableFuture<T> dst = new CompletableFuture<T>();
2632 Object r;
2633 if ((r = result) == null) {
2634 ExceptionCompletion<T> d =
2635 new ExceptionCompletion<T>(this, fn, dst);
2636 CompletionNode p = new CompletionNode(d);
2637 while (result == null) {
2638 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2639 p.next = completions, p))
2640 break;
2641 }
2642 d.tryComplete();
2643 }
2644 else {
2645 T t = null; Throwable ex, dx = null;
2646 if (r instanceof AltResult) {
2647 if ((ex = ((AltResult)r).ex) != null) {
2648 try {
2649 t = fn.apply(ex);
2650 } catch (Throwable rex) {
2651 dx = rex;
2652 }
2653 }
2654 }
2655 else {
2656 @SuppressWarnings("unchecked") T tr = (T) r;
2657 t = tr;
2658 }
2659 dst.internalComplete(t, dx);
2660 }
2661 return dst;
2662 }
2663
2664 /* ------------- Arbitrary-arity constructions -------------- */
2665
2666 /*
2667 * The basic plan of attack is to recursively form binary
2668 * completion trees of elements. This can be overkill for small
2669 * sets, but scales nicely. The And/All vs Or/Any forms use the
2670 * same idea, but details differ.
2671 */
2672
2673 /**
2674 * Returns a new CompletableFuture that is completed when all of
2675 * the given CompletableFutures complete. If any of the given
2676 * CompletableFutures complete exceptionally, then the returned
2677 * CompletableFuture also does so, with a CompletionException
2678 * holding this exception as its cause. Otherwise, the results,
2679 * if any, of the given CompletableFutures are not reflected in
2680 * the returned CompletableFuture, but may be obtained by
2681 * inspecting them individually. If no CompletableFutures are
2682 * provided, returns a CompletableFuture completed with the value
2683 * {@code null}.
2684 *
2685 * <p>Among the applications of this method is to await completion
2686 * of a set of independent CompletableFutures before continuing a
2687 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2688 * c3).join();}.
2689 *
2690 * @param cfs the CompletableFutures
2691 * @return a new CompletableFuture that is completed when all of the
2692 * given CompletableFutures complete
2693 * @throws NullPointerException if the array or any of its elements are
2694 * {@code null}
2695 */
2696 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2697 int len = cfs.length; // Directly handle empty and singleton cases
2698 if (len > 1)
2699 return allTree(cfs, 0, len - 1);
2700 else {
2701 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2702 CompletableFuture<?> f;
2703 if (len == 0)
2704 dst.result = NIL;
2705 else if ((f = cfs[0]) == null)
2706 throw new NullPointerException();
2707 else {
2708 ThenPropagate d = null;
2709 CompletionNode p = null;
2710 Object r;
2711 while ((r = f.result) == null) {
2712 if (d == null)
2713 d = new ThenPropagate(f, dst);
2714 else if (p == null)
2715 p = new CompletionNode(d);
2716 else if (UNSAFE.compareAndSwapObject
2717 (f, COMPLETIONS, p.next = f.completions, p))
2718 break;
2719 }
2720 if (d != null)
2721 d.tryComplete();
2722 else
2723 dst.internalComplete(null, (r instanceof AltResult) ?
2724 ((AltResult)r).ex : null);
2725 }
2726 return dst;
2727 }
2728 }
2729
2730 /**
2731 * Recursively constructs an And'ed tree of CompletableFutures.
2732 * Called only when array known to have at least two elements.
2733 */
2734 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2735 int lo, int hi) {
2736 CompletableFuture<?> fst, snd;
2737 int mid = (lo + hi) >>> 1;
2738 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2739 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2740 throw new NullPointerException();
2741 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2742 AndCompletion d = null;
2743 CompletionNode p = null, q = null;
2744 Object r = null, s = null;
2745 while ((r = fst.result) == null || (s = snd.result) == null) {
2746 if (d == null)
2747 d = new AndCompletion(fst, snd, dst);
2748 else if (p == null)
2749 p = new CompletionNode(d);
2750 else if (q == null) {
2751 if (UNSAFE.compareAndSwapObject
2752 (fst, COMPLETIONS, p.next = fst.completions, p))
2753 q = new CompletionNode(d);
2754 }
2755 else if (UNSAFE.compareAndSwapObject
2756 (snd, COMPLETIONS, q.next = snd.completions, q))
2757 break;
2758 }
2759 if (d != null)
2760 d.tryComplete();
2761 else {
2762 Throwable ex;
2763 if (r instanceof AltResult)
2764 ex = ((AltResult)r).ex;
2765 else
2766 ex = null;
2767 if (ex == null && (s instanceof AltResult))
2768 ex = ((AltResult)s).ex;
2769 dst.internalComplete(null, ex);
2770 }
2771 return dst;
2772 }
2773
2774 /**
2775 * Returns a new CompletableFuture that is completed when any of
2776 * the given CompletableFutures complete, with the same result.
2777 * Otherwise, if it completed exceptionally, the returned
2778 * CompletableFuture also does so, with a CompletionException
2779 * holding this exception as its cause. If no CompletableFutures
2780 * are provided, returns an incomplete CompletableFuture.
2781 *
2782 * @param cfs the CompletableFutures
2783 * @return a new CompletableFuture that is completed with the
2784 * result or exception of any of the given CompletableFutures when
2785 * one completes
2786 * @throws NullPointerException if the array or any of its elements are
2787 * {@code null}
2788 */
2789 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
2790 int len = cfs.length; // Same idea as allOf
2791 if (len > 1)
2792 return anyTree(cfs, 0, len - 1);
2793 else {
2794 CompletableFuture<Object> dst = new CompletableFuture<Object>();
2795 CompletableFuture<?> f;
2796 if (len == 0)
2797 ; // skip
2798 else if ((f = cfs[0]) == null)
2799 throw new NullPointerException();
2800 else {
2801 ThenCopy<Object> d = null;
2802 CompletionNode p = null;
2803 Object r;
2804 while ((r = f.result) == null) {
2805 if (d == null)
2806 d = new ThenCopy<Object>(f, dst);
2807 else if (p == null)
2808 p = new CompletionNode(d);
2809 else if (UNSAFE.compareAndSwapObject
2810 (f, COMPLETIONS, p.next = f.completions, p))
2811 break;
2812 }
2813 if (d != null)
2814 d.tryComplete();
2815 else {
2816 Throwable ex; Object t;
2817 if (r instanceof AltResult) {
2818 ex = ((AltResult)r).ex;
2819 t = null;
2820 }
2821 else {
2822 ex = null;
2823 t = r;
2824 }
2825 dst.internalComplete(t, ex);
2826 }
2827 }
2828 return dst;
2829 }
2830 }
2831
2832 /**
2833 * Recursively constructs an Or'ed tree of CompletableFutures.
2834 */
2835 private static CompletableFuture<Object> anyTree(CompletableFuture<?>[] cfs,
2836 int lo, int hi) {
2837 CompletableFuture<?> fst, snd;
2838 int mid = (lo + hi) >>> 1;
2839 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
2840 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
2841 throw new NullPointerException();
2842 CompletableFuture<Object> dst = new CompletableFuture<Object>();
2843 OrCompletion d = null;
2844 CompletionNode p = null, q = null;
2845 Object r;
2846 while ((r = fst.result) == null && (r = snd.result) == null) {
2847 if (d == null)
2848 d = new OrCompletion(fst, snd, dst);
2849 else if (p == null)
2850 p = new CompletionNode(d);
2851 else if (q == null) {
2852 if (UNSAFE.compareAndSwapObject
2853 (fst, COMPLETIONS, p.next = fst.completions, p))
2854 q = new CompletionNode(d);
2855 }
2856 else if (UNSAFE.compareAndSwapObject
2857 (snd, COMPLETIONS, q.next = snd.completions, q))
2858 break;
2859 }
2860 if (d != null)
2861 d.tryComplete();
2862 else {
2863 Throwable ex; Object t;
2864 if (r instanceof AltResult) {
2865 ex = ((AltResult)r).ex;
2866 t = null;
2867 }
2868 else {
2869 ex = null;
2870 t = r;
2871 }
2872 dst.internalComplete(t, ex);
2873 }
2874 return dst;
2875 }
2876
2877 /* ------------- Control and status methods -------------- */
2878
2879 /**
2880 * If not already completed, completes this CompletableFuture with
2881 * a {@link CancellationException}. Dependent CompletableFutures
2882 * that have not already completed will also complete
2883 * exceptionally, with a {@link CompletionException} caused by
2884 * this {@code CancellationException}.
2885 *
2886 * @param mayInterruptIfRunning this value has no effect in this
2887 * implementation because interrupts are not used to control
2888 * processing.
2889 *
2890 * @return {@code true} if this task is now cancelled
2891 */
2892 public boolean cancel(boolean mayInterruptIfRunning) {
2893 boolean cancelled = (result == null) &&
2894 UNSAFE.compareAndSwapObject
2895 (this, RESULT, null, new AltResult(new CancellationException()));
2896 postComplete(this);
2897 return cancelled || isCancelled();
2898 }
2899
2900 /**
2901 * Returns {@code true} if this CompletableFuture was cancelled
2902 * before it completed normally.
2903 *
2904 * @return {@code true} if this CompletableFuture was cancelled
2905 * before it completed normally
2906 */
2907 public boolean isCancelled() {
2908 Object r;
2909 return ((r = result) instanceof AltResult) &&
2910 (((AltResult)r).ex instanceof CancellationException);
2911 }
2912
2913 /**
2914 * Returns {@code true} if this CompletableFuture completed
2915 * exceptionally, in any way. Possible causes include
2916 * cancellation, explicit invocation of {@code
2917 * completeExceptionally}, and abrupt termination of a
2918 * CompletionStage action.
2919 *
2920 * @return {@code true} if this CompletableFuture completed
2921 * exceptionally
2922 */
2923 public boolean isCompletedExceptionally() {
2924 Object r;
2925 return ((r = result) instanceof AltResult) && r != NIL;
2926 }
2927
2928 /**
2929 * Forcibly sets or resets the value subsequently returned by
2930 * method {@link #get()} and related methods, whether or not
2931 * already completed. This method is designed for use only in
2932 * error recovery actions, and even in such situations may result
2933 * in ongoing dependent completions using established versus
2934 * overwritten outcomes.
2935 *
2936 * @param value the completion value
2937 */
2938 public void obtrudeValue(T value) {
2939 result = (value == null) ? NIL : value;
2940 postComplete(this);
2941 }
2942
2943 /**
2944 * Forcibly causes subsequent invocations of method {@link #get()}
2945 * and related methods to throw the given exception, whether or
2946 * not already completed. This method is designed for use only in
2947 * recovery actions, and even in such situations may result in
2948 * ongoing dependent completions using established versus
2949 * overwritten outcomes.
2950 *
2951 * @param ex the exception
2952 */
2953 public void obtrudeException(Throwable ex) {
2954 if (ex == null) throw new NullPointerException();
2955 result = new AltResult(ex);
2956 postComplete(this);
2957 }
2958
2959 /**
2960 * Returns the estimated number of CompletableFutures whose
2961 * completions are awaiting completion of this CompletableFuture.
2962 * This method is designed for use in monitoring system state, not
2963 * for synchronization control.
2964 *
2965 * @return the number of dependent CompletableFutures
2966 */
2967 public int getNumberOfDependents() {
2968 int count = 0;
2969 for (CompletionNode p = completions; p != null; p = p.next)
2970 ++count;
2971 return count;
2972 }
2973
2974 /**
2975 * Returns a string identifying this CompletableFuture, as well as
2976 * its completion state. The state, in brackets, contains the
2977 * String {@code "Completed Normally"} or the String {@code
2978 * "Completed Exceptionally"}, or the String {@code "Not
2979 * completed"} followed by the number of CompletableFutures
2980 * dependent upon its completion, if any.
2981 *
2982 * @return a string identifying this CompletableFuture, as well as its state
2983 */
2984 public String toString() {
2985 Object r = result;
2986 int count;
2987 return super.toString() +
2988 ((r == null) ?
2989 (((count = getNumberOfDependents()) == 0) ?
2990 "[Not completed]" :
2991 "[Not completed, " + count + " dependents]") :
2992 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
2993 "[Completed exceptionally]" :
2994 "[Completed normally]"));
2995 }
2996
2997 // Unsafe mechanics
2998 private static final sun.misc.Unsafe UNSAFE;
2999 private static final long RESULT;
3000 private static final long WAITERS;
3001 private static final long COMPLETIONS;
3002 static {
3003 try {
3004 UNSAFE = sun.misc.Unsafe.getUnsafe();
3005 Class<?> k = CompletableFuture.class;
3006 RESULT = UNSAFE.objectFieldOffset
3007 (k.getDeclaredField("result"));
3008 WAITERS = UNSAFE.objectFieldOffset
3009 (k.getDeclaredField("waiters"));
3010 COMPLETIONS = UNSAFE.objectFieldOffset
3011 (k.getDeclaredField("completions"));
3012 } catch (Exception e) {
3013 throw new Error(e);
3014 }
3015 }
3016 }