ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/CompletableFuture.java
Revision: 1.13
Committed: Sat Feb 16 16:53:34 2013 UTC (11 years, 3 months ago) by jsr166
Branch: MAIN
Changes since 1.12: +0 -1 lines
Log Message:
whitespace

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package jsr166e;
8 import java.util.concurrent.Future;
9 import java.util.concurrent.TimeUnit;
10 import java.util.concurrent.Executor;
11 import java.util.concurrent.ThreadLocalRandom;
12 import java.util.concurrent.ExecutionException;
13 import java.util.concurrent.TimeoutException;
14 import java.util.concurrent.CancellationException;
15 import java.util.concurrent.atomic.AtomicInteger;
16 import java.util.concurrent.locks.LockSupport;
17
18 /**
19 * A {@link Future} that may be explicitly completed (setting its
20 * value and status), and may include dependent functions and actions
21 * that trigger upon its completion. Methods are available for adding
22 * those based on Functions, Blocks, and Runnables, depending on
23 * whether they require arguments and/or produce results, as well as
24 * those triggered after either or both the current and another
25 * CompletableFuture complete. Functions and actions supplied for
26 * dependent completions (mainly using methods with prefix {@code
27 * then}) may be performed by the thread that completes the current
28 * CompletableFuture, or by any other caller of these methods. There
29 * are no guarantees about the order of processing completions unless
30 * constrained by these methods.
31 *
32 * <p>When two or more threads attempt to {@link #complete} or {@link
33 * #completeExceptionally} a CompletableFuture, only one of them
34 * succeeds.
35 *
36 * <p>Upon exceptional completion, or when a completion entails
37 * computation of a function or action, and it terminates abruptly
38 * with an (unchecked) exception or error, then further completions
39 * act as {@code completeExceptionally} with a {@link
40 * CompletionException} holding that exception as its cause. If a
41 * CompletableFuture completes exceptionally, and is not followed by a
42 * {@link #exceptionally} or {@link #handle} completion, then all of
43 * its dependents (and their dependents) also complete exceptionally
44 * with CompletionExceptions holding the ultimate cause. In case of a
45 * CompletionException, methods {@link #get()} and {@link #get(long,
46 * TimeUnit)} throw an {@link ExecutionException} with the same cause
47 * as would be held in the corresponding CompletionException. However,
48 * in these cases, methods {@link #join()} and {@link #getNow} throw
49 * the CompletionException, which simplifies usage especially within
50 * other completion functions.
51 *
52 * <p>CompletableFutures themselves do not execute asynchronously.
53 * However, the {@code async} methods provide commonly useful ways to
54 * commence asynchronous processing, using either a given {@link
55 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
56 * function or action that will result in the completion of a new
57 * CompletableFuture. To simplify monitoring, debugging, and tracking,
58 * all generated asynchronous tasks are instances of the tagging
59 * interface {@link AsynchronousCompletionTask}.
60 *
61 * <p><em>jsr166e note: During transition, this class
62 * uses nested functional interfaces with different names but the
63 * same forms as those expected for JDK8.</em>
64 *
65 * @author Doug Lea
66 * @since 1.8
67 */
68 public class CompletableFuture<T> implements Future<T> {
69 // jsr166e nested interfaces
70
71 /** Interface describing a void action of one argument */
72 public interface Action<A> { void accept(A a); }
73 /** Interface describing a void action of two arguments */
74 public interface BiAction<A,B> { void accept(A a, B b); }
75 /** Interface describing a function of one argument */
76 public interface Fun<A,T> { T apply(A a); }
77 /** Interface describing a function of two arguments */
78 public interface BiFun<A,B,T> { T apply(A a, B b); }
79 /** Interface describing a function of no arguments */
80 public interface Generator<T> { T get(); }
81
82
83 /*
84 * Overview:
85 *
86 * 1. Non-nullness of field result (set via CAS) indicates done.
87 * An AltResult is used to box null as a result, as well as to
88 * hold exceptions. Using a single field makes completion fast
89 * and simple to detect and trigger, at the expense of a lot of
90 * encoding and decoding that infiltrates many methods. One minor
91 * simplification relies on the (static) NIL (to box null results)
92 * being the only AltResult with a null exception field, so we
93 * don't usually need explicit comparisons with NIL. The CF
94 * exception propagation mechanics surrounding decoding rely on
95 * unchecked casts of decoded results really being unchecked,
96 * where user type errors are caught at point of use, as is
97 * currently the case in Java. These are highlighted by using
98 * SuppressWarnings-annotated temporaries.
99 *
100 * 2. Waiters are held in a Treiber stack similar to the one used
101 * in FutureTask, Phaser, and SynchronousQueue. See their
102 * internal documentation for algorithmic details.
103 *
104 * 3. Completions are also kept in a list/stack, and pulled off
105 * and run when completion is triggered. (We could even use the
106 * same stack as for waiters, but would give up the potential
107 * parallelism obtained because woken waiters help release/run
108 * others -- see method postComplete). Because post-processing
109 * may race with direct calls, class Completion opportunistically
110 * extends AtomicInteger so callers can claim the action via
111 * compareAndSet(0, 1). The Completion.run methods are all
112 * written a boringly similar uniform way (that sometimes includes
113 * unnecessary-looking checks, kept to maintain uniformity). There
114 * are enough dimensions upon which they differ that factoring to
115 * use common code isn't worthwhile.
116 *
117 * 4. The exported then/and/or methods do support a bit of
118 * factoring (see doThenApply etc). They must cope with the
119 * intrinsic races surrounding addition of a dependent action
120 * versus performing the action directly because the task is
121 * already complete. For example, a CF may not be complete upon
122 * entry, so a dependent completion is added, but by the time it
123 * is added, the target CF is complete, so must be directly
124 * executed. This is all done while avoiding unnecessary object
125 * construction in safe-bypass cases.
126 */
127
128 // preliminaries
129
130 static final class AltResult {
131 final Throwable ex; // null only for NIL
132 AltResult(Throwable ex) { this.ex = ex; }
133 }
134
135 static final AltResult NIL = new AltResult(null);
136
137 // Fields
138
139 volatile Object result; // Either the result or boxed AltResult
140 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
141 volatile CompletionNode completions; // list (Treiber stack) of completions
142
143 // Basic utilities for triggering and processing completions
144
145 /**
146 * Removes and signals all waiting threads and runs all completions.
147 */
148 final void postComplete() {
149 WaitNode q; Thread t;
150 while ((q = waiters) != null) {
151 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
152 (t = q.thread) != null) {
153 q.thread = null;
154 LockSupport.unpark(t);
155 }
156 }
157
158 CompletionNode h; Completion c;
159 while ((h = completions) != null) {
160 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
161 (c = h.completion) != null)
162 c.run();
163 }
164 }
165
166 /**
167 * Triggers completion with the encoding of the given arguments:
168 * if the exception is non-null, encodes it as a wrapped
169 * CompletionException unless it is one already. Otherwise uses
170 * the given result, boxed as NIL if null.
171 */
172 final void internalComplete(Object v, Throwable ex) {
173 if (result == null)
174 UNSAFE.compareAndSwapObject
175 (this, RESULT, null,
176 (ex == null) ? (v == null) ? NIL : v :
177 new AltResult((ex instanceof CompletionException) ? ex :
178 new CompletionException(ex)));
179 postComplete(); // help out even if not triggered
180 }
181
182 /**
183 * If triggered, helps release and/or process completions.
184 */
185 final void helpPostComplete() {
186 if (result != null)
187 postComplete();
188 }
189
190 /* ------------- waiting for completions -------------- */
191
192 /**
193 * Heuristic spin value for waitingGet() before blocking on
194 * multiprocessors
195 */
196 static final int WAITING_GET_SPINS = 256;
197
198 /**
199 * Linked nodes to record waiting threads in a Treiber stack. See
200 * other classes such as Phaser and SynchronousQueue for more
201 * detailed explanation. This class implements ManagedBlocker to
202 * avoid starvation when blocking actions pile up in
203 * ForkJoinPools.
204 */
205 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
206 long nanos; // wait time if timed
207 final long deadline; // non-zero if timed
208 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
209 volatile Thread thread;
210 volatile WaitNode next;
211 WaitNode(boolean interruptible, long nanos, long deadline) {
212 this.thread = Thread.currentThread();
213 this.interruptControl = interruptible ? 1 : 0;
214 this.nanos = nanos;
215 this.deadline = deadline;
216 }
217 public boolean isReleasable() {
218 if (thread == null)
219 return true;
220 if (Thread.interrupted()) {
221 int i = interruptControl;
222 interruptControl = -1;
223 if (i > 0)
224 return true;
225 }
226 if (deadline != 0L &&
227 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
228 thread = null;
229 return true;
230 }
231 return false;
232 }
233 public boolean block() {
234 if (isReleasable())
235 return true;
236 else if (deadline == 0L)
237 LockSupport.park(this);
238 else if (nanos > 0L)
239 LockSupport.parkNanos(this, nanos);
240 return isReleasable();
241 }
242 }
243
244 /**
245 * Returns raw result after waiting, or null if interruptible and
246 * interrupted.
247 */
248 private Object waitingGet(boolean interruptible) {
249 WaitNode q = null;
250 boolean queued = false;
251 int h = 0, spins = 0;
252 for (Object r;;) {
253 if ((r = result) != null) {
254 if (q != null) { // suppress unpark
255 q.thread = null;
256 if (q.interruptControl < 0) {
257 if (interruptible) {
258 removeWaiter(q);
259 return null;
260 }
261 Thread.currentThread().interrupt();
262 }
263 }
264 postComplete(); // help release others
265 return r;
266 }
267 else if (h == 0) {
268 h = ThreadLocalRandom.current().nextInt();
269 if (Runtime.getRuntime().availableProcessors() > 1)
270 spins = WAITING_GET_SPINS;
271 }
272 else if (spins > 0) {
273 h ^= h << 1; // xorshift
274 h ^= h >>> 3;
275 if ((h ^= h << 10) >= 0)
276 --spins;
277 }
278 else if (q == null)
279 q = new WaitNode(interruptible, 0L, 0L);
280 else if (!queued)
281 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
282 q.next = waiters, q);
283 else if (interruptible && q.interruptControl < 0) {
284 removeWaiter(q);
285 return null;
286 }
287 else if (q.thread != null && result == null) {
288 try {
289 ForkJoinPool.managedBlock(q);
290 } catch (InterruptedException ex) {
291 q.interruptControl = -1;
292 }
293 }
294 }
295 }
296
297 /**
298 * Awaits completion or aborts on interrupt or timeout.
299 *
300 * @param nanos time to wait
301 * @return raw result
302 */
303 private Object timedAwaitDone(long nanos)
304 throws InterruptedException, TimeoutException {
305 WaitNode q = null;
306 boolean queued = false;
307 for (Object r;;) {
308 if ((r = result) != null) {
309 if (q != null) {
310 q.thread = null;
311 if (q.interruptControl < 0) {
312 removeWaiter(q);
313 throw new InterruptedException();
314 }
315 }
316 postComplete();
317 return r;
318 }
319 else if (q == null) {
320 if (nanos <= 0L)
321 throw new TimeoutException();
322 long d = System.nanoTime() + nanos;
323 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
324 }
325 else if (!queued)
326 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
327 q.next = waiters, q);
328 else if (q.interruptControl < 0) {
329 removeWaiter(q);
330 throw new InterruptedException();
331 }
332 else if (q.nanos <= 0L) {
333 if (result == null) {
334 removeWaiter(q);
335 throw new TimeoutException();
336 }
337 }
338 else if (q.thread != null && result == null) {
339 try {
340 ForkJoinPool.managedBlock(q);
341 } catch (InterruptedException ex) {
342 q.interruptControl = -1;
343 }
344 }
345 }
346 }
347
348 /**
349 * Tries to unlink a timed-out or interrupted wait node to avoid
350 * accumulating garbage. Internal nodes are simply unspliced
351 * without CAS since it is harmless if they are traversed anyway
352 * by releasers. To avoid effects of unsplicing from already
353 * removed nodes, the list is retraversed in case of an apparent
354 * race. This is slow when there are a lot of nodes, but we don't
355 * expect lists to be long enough to outweigh higher-overhead
356 * schemes.
357 */
358 private void removeWaiter(WaitNode node) {
359 if (node != null) {
360 node.thread = null;
361 retry:
362 for (;;) { // restart on removeWaiter race
363 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
364 s = q.next;
365 if (q.thread != null)
366 pred = q;
367 else if (pred != null) {
368 pred.next = s;
369 if (pred.thread == null) // check for race
370 continue retry;
371 }
372 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
373 continue retry;
374 }
375 break;
376 }
377 }
378 }
379
380 /* ------------- Async tasks -------------- */
381
382 /**
383 * A tagging interface identifying asynchronous tasks produced by
384 * {@code async} methods. This may be useful for monitoring,
385 * debugging, and tracking asynchronous activities.
386 */
387 public static interface AsynchronousCompletionTask {
388 }
389
390 /** Base class can act as either FJ or plain Runnable */
391 abstract static class Async extends ForkJoinTask<Void>
392 implements Runnable, AsynchronousCompletionTask {
393 public final Void getRawResult() { return null; }
394 public final void setRawResult(Void v) { }
395 public final void run() { exec(); }
396 }
397
398 static final class AsyncRun extends Async {
399 final Runnable fn;
400 final CompletableFuture<Void> dst;
401 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
402 this.fn = fn; this.dst = dst;
403 }
404 public final boolean exec() {
405 CompletableFuture<Void> d; Throwable ex;
406 if ((d = this.dst) != null && d.result == null) {
407 try {
408 fn.run();
409 ex = null;
410 } catch (Throwable rex) {
411 ex = rex;
412 }
413 d.internalComplete(null, ex);
414 }
415 return true;
416 }
417 private static final long serialVersionUID = 5232453952276885070L;
418 }
419
420 static final class AsyncSupply<U> extends Async {
421 final Generator<U> fn;
422 final CompletableFuture<U> dst;
423 AsyncSupply(Generator<U> fn, CompletableFuture<U> dst) {
424 this.fn = fn; this.dst = dst;
425 }
426 public final boolean exec() {
427 CompletableFuture<U> d; U u; Throwable ex;
428 if ((d = this.dst) != null && d.result == null) {
429 try {
430 u = fn.get();
431 ex = null;
432 } catch (Throwable rex) {
433 ex = rex;
434 u = null;
435 }
436 d.internalComplete(u, ex);
437 }
438 return true;
439 }
440 private static final long serialVersionUID = 5232453952276885070L;
441 }
442
443 static final class AsyncApply<T,U> extends Async {
444 final Fun<? super T,? extends U> fn;
445 final T arg;
446 final CompletableFuture<U> dst;
447 AsyncApply(T arg, Fun<? super T,? extends U> fn,
448 CompletableFuture<U> dst) {
449 this.arg = arg; this.fn = fn; this.dst = dst;
450 }
451 public final boolean exec() {
452 CompletableFuture<U> d; U u; Throwable ex;
453 if ((d = this.dst) != null && d.result == null) {
454 try {
455 u = fn.apply(arg);
456 ex = null;
457 } catch (Throwable rex) {
458 ex = rex;
459 u = null;
460 }
461 d.internalComplete(u, ex);
462 }
463 return true;
464 }
465 private static final long serialVersionUID = 5232453952276885070L;
466 }
467
468 static final class AsyncBiApply<T,U,V> extends Async {
469 final BiFun<? super T,? super U,? extends V> fn;
470 final T arg1;
471 final U arg2;
472 final CompletableFuture<V> dst;
473 AsyncBiApply(T arg1, U arg2,
474 BiFun<? super T,? super U,? extends V> fn,
475 CompletableFuture<V> dst) {
476 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
477 }
478 public final boolean exec() {
479 CompletableFuture<V> d; V v; Throwable ex;
480 if ((d = this.dst) != null && d.result == null) {
481 try {
482 v = fn.apply(arg1, arg2);
483 ex = null;
484 } catch (Throwable rex) {
485 ex = rex;
486 v = null;
487 }
488 d.internalComplete(v, ex);
489 }
490 return true;
491 }
492 private static final long serialVersionUID = 5232453952276885070L;
493 }
494
495 static final class AsyncAccept<T> extends Async {
496 final Action<? super T> fn;
497 final T arg;
498 final CompletableFuture<Void> dst;
499 AsyncAccept(T arg, Action<? super T> fn,
500 CompletableFuture<Void> dst) {
501 this.arg = arg; this.fn = fn; this.dst = dst;
502 }
503 public final boolean exec() {
504 CompletableFuture<Void> d; Throwable ex;
505 if ((d = this.dst) != null && d.result == null) {
506 try {
507 fn.accept(arg);
508 ex = null;
509 } catch (Throwable rex) {
510 ex = rex;
511 }
512 d.internalComplete(null, ex);
513 }
514 return true;
515 }
516 private static final long serialVersionUID = 5232453952276885070L;
517 }
518
519 static final class AsyncBiAccept<T,U> extends Async {
520 final BiAction<? super T,? super U> fn;
521 final T arg1;
522 final U arg2;
523 final CompletableFuture<Void> dst;
524 AsyncBiAccept(T arg1, U arg2,
525 BiAction<? super T,? super U> fn,
526 CompletableFuture<Void> dst) {
527 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
528 }
529 public final boolean exec() {
530 CompletableFuture<Void> d; Throwable ex;
531 if ((d = this.dst) != null && d.result == null) {
532 try {
533 fn.accept(arg1, arg2);
534 ex = null;
535 } catch (Throwable rex) {
536 ex = rex;
537 }
538 d.internalComplete(null, ex);
539 }
540 return true;
541 }
542 private static final long serialVersionUID = 5232453952276885070L;
543 }
544
545 /* ------------- Completions -------------- */
546
547 /**
548 * Simple linked list nodes to record completions, used in
549 * basically the same way as WaitNodes. (We separate nodes from
550 * the Completions themselves mainly because for the And and Or
551 * methods, the same Completion object resides in two lists.)
552 */
553 static final class CompletionNode {
554 final Completion completion;
555 volatile CompletionNode next;
556 CompletionNode(Completion completion) { this.completion = completion; }
557 }
558
559 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
560 abstract static class Completion extends AtomicInteger implements Runnable {
561 }
562
563 static final class ApplyCompletion<T,U> extends Completion {
564 final CompletableFuture<? extends T> src;
565 final Fun<? super T,? extends U> fn;
566 final CompletableFuture<U> dst;
567 final Executor executor;
568 ApplyCompletion(CompletableFuture<? extends T> src,
569 Fun<? super T,? extends U> fn,
570 CompletableFuture<U> dst, Executor executor) {
571 this.src = src; this.fn = fn; this.dst = dst;
572 this.executor = executor;
573 }
574 public final void run() {
575 final CompletableFuture<? extends T> a;
576 final Fun<? super T,? extends U> fn;
577 final CompletableFuture<U> dst;
578 Object r; T t; Throwable ex;
579 if ((dst = this.dst) != null &&
580 (fn = this.fn) != null &&
581 (a = this.src) != null &&
582 (r = a.result) != null &&
583 compareAndSet(0, 1)) {
584 if (r instanceof AltResult) {
585 ex = ((AltResult)r).ex;
586 t = null;
587 }
588 else {
589 ex = null;
590 @SuppressWarnings("unchecked") T tr = (T) r;
591 t = tr;
592 }
593 Executor e = executor;
594 U u = null;
595 if (ex == null) {
596 try {
597 if (e != null)
598 e.execute(new AsyncApply<T,U>(t, fn, dst));
599 else
600 u = fn.apply(t);
601 } catch (Throwable rex) {
602 ex = rex;
603 }
604 }
605 if (e == null || ex != null)
606 dst.internalComplete(u, ex);
607 }
608 }
609 private static final long serialVersionUID = 5232453952276885070L;
610 }
611
612 static final class AcceptCompletion<T> extends Completion {
613 final CompletableFuture<? extends T> src;
614 final Action<? super T> fn;
615 final CompletableFuture<Void> dst;
616 final Executor executor;
617 AcceptCompletion(CompletableFuture<? extends T> src,
618 Action<? super T> fn,
619 CompletableFuture<Void> dst, Executor executor) {
620 this.src = src; this.fn = fn; this.dst = dst;
621 this.executor = executor;
622 }
623 public final void run() {
624 final CompletableFuture<? extends T> a;
625 final Action<? super T> fn;
626 final CompletableFuture<Void> dst;
627 Object r; T t; Throwable ex;
628 if ((dst = this.dst) != null &&
629 (fn = this.fn) != null &&
630 (a = this.src) != null &&
631 (r = a.result) != null &&
632 compareAndSet(0, 1)) {
633 if (r instanceof AltResult) {
634 ex = ((AltResult)r).ex;
635 t = null;
636 }
637 else {
638 ex = null;
639 @SuppressWarnings("unchecked") T tr = (T) r;
640 t = tr;
641 }
642 Executor e = executor;
643 if (ex == null) {
644 try {
645 if (e != null)
646 e.execute(new AsyncAccept<T>(t, fn, dst));
647 else
648 fn.accept(t);
649 } catch (Throwable rex) {
650 ex = rex;
651 }
652 }
653 if (e == null || ex != null)
654 dst.internalComplete(null, ex);
655 }
656 }
657 private static final long serialVersionUID = 5232453952276885070L;
658 }
659
660 static final class RunCompletion<T> extends Completion {
661 final CompletableFuture<? extends T> src;
662 final Runnable fn;
663 final CompletableFuture<Void> dst;
664 final Executor executor;
665 RunCompletion(CompletableFuture<? extends T> src,
666 Runnable fn,
667 CompletableFuture<Void> dst,
668 Executor executor) {
669 this.src = src; this.fn = fn; this.dst = dst;
670 this.executor = executor;
671 }
672 public final void run() {
673 final CompletableFuture<? extends T> a;
674 final Runnable fn;
675 final CompletableFuture<Void> dst;
676 Object r; Throwable ex;
677 if ((dst = this.dst) != null &&
678 (fn = this.fn) != null &&
679 (a = this.src) != null &&
680 (r = a.result) != null &&
681 compareAndSet(0, 1)) {
682 if (r instanceof AltResult)
683 ex = ((AltResult)r).ex;
684 else
685 ex = null;
686 Executor e = executor;
687 if (ex == null) {
688 try {
689 if (e != null)
690 e.execute(new AsyncRun(fn, dst));
691 else
692 fn.run();
693 } catch (Throwable rex) {
694 ex = rex;
695 }
696 }
697 if (e == null || ex != null)
698 dst.internalComplete(null, ex);
699 }
700 }
701 private static final long serialVersionUID = 5232453952276885070L;
702 }
703
704 static final class BiApplyCompletion<T,U,V> extends Completion {
705 final CompletableFuture<? extends T> src;
706 final CompletableFuture<? extends U> snd;
707 final BiFun<? super T,? super U,? extends V> fn;
708 final CompletableFuture<V> dst;
709 final Executor executor;
710 BiApplyCompletion(CompletableFuture<? extends T> src,
711 CompletableFuture<? extends U> snd,
712 BiFun<? super T,? super U,? extends V> fn,
713 CompletableFuture<V> dst, Executor executor) {
714 this.src = src; this.snd = snd;
715 this.fn = fn; this.dst = dst;
716 this.executor = executor;
717 }
718 public final void run() {
719 final CompletableFuture<? extends T> a;
720 final CompletableFuture<? extends U> b;
721 final BiFun<? super T,? super U,? extends V> fn;
722 final CompletableFuture<V> dst;
723 Object r, s; T t; U u; Throwable ex;
724 if ((dst = this.dst) != null &&
725 (fn = this.fn) != null &&
726 (a = this.src) != null &&
727 (r = a.result) != null &&
728 (b = this.snd) != null &&
729 (s = b.result) != null &&
730 compareAndSet(0, 1)) {
731 if (r instanceof AltResult) {
732 ex = ((AltResult)r).ex;
733 t = null;
734 }
735 else {
736 ex = null;
737 @SuppressWarnings("unchecked") T tr = (T) r;
738 t = tr;
739 }
740 if (ex != null)
741 u = null;
742 else if (s instanceof AltResult) {
743 ex = ((AltResult)s).ex;
744 u = null;
745 }
746 else {
747 @SuppressWarnings("unchecked") U us = (U) s;
748 u = us;
749 }
750 Executor e = executor;
751 V v = null;
752 if (ex == null) {
753 try {
754 if (e != null)
755 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
756 else
757 v = fn.apply(t, u);
758 } catch (Throwable rex) {
759 ex = rex;
760 }
761 }
762 if (e == null || ex != null)
763 dst.internalComplete(v, ex);
764 }
765 }
766 private static final long serialVersionUID = 5232453952276885070L;
767 }
768
769 static final class BiAcceptCompletion<T,U> extends Completion {
770 final CompletableFuture<? extends T> src;
771 final CompletableFuture<? extends U> snd;
772 final BiAction<? super T,? super U> fn;
773 final CompletableFuture<Void> dst;
774 final Executor executor;
775 BiAcceptCompletion(CompletableFuture<? extends T> src,
776 CompletableFuture<? extends U> snd,
777 BiAction<? super T,? super U> fn,
778 CompletableFuture<Void> dst, Executor executor) {
779 this.src = src; this.snd = snd;
780 this.fn = fn; this.dst = dst;
781 this.executor = executor;
782 }
783 public final void run() {
784 final CompletableFuture<? extends T> a;
785 final CompletableFuture<? extends U> b;
786 final BiAction<? super T,? super U> fn;
787 final CompletableFuture<Void> dst;
788 Object r, s; T t; U u; Throwable ex;
789 if ((dst = this.dst) != null &&
790 (fn = this.fn) != null &&
791 (a = this.src) != null &&
792 (r = a.result) != null &&
793 (b = this.snd) != null &&
794 (s = b.result) != null &&
795 compareAndSet(0, 1)) {
796 if (r instanceof AltResult) {
797 ex = ((AltResult)r).ex;
798 t = null;
799 }
800 else {
801 ex = null;
802 @SuppressWarnings("unchecked") T tr = (T) r;
803 t = tr;
804 }
805 if (ex != null)
806 u = null;
807 else if (s instanceof AltResult) {
808 ex = ((AltResult)s).ex;
809 u = null;
810 }
811 else {
812 @SuppressWarnings("unchecked") U us = (U) s;
813 u = us;
814 }
815 Executor e = executor;
816 if (ex == null) {
817 try {
818 if (e != null)
819 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
820 else
821 fn.accept(t, u);
822 } catch (Throwable rex) {
823 ex = rex;
824 }
825 }
826 if (e == null || ex != null)
827 dst.internalComplete(null, ex);
828 }
829 }
830 private static final long serialVersionUID = 5232453952276885070L;
831 }
832
833 static final class BiRunCompletion<T> extends Completion {
834 final CompletableFuture<? extends T> src;
835 final CompletableFuture<?> snd;
836 final Runnable fn;
837 final CompletableFuture<Void> dst;
838 final Executor executor;
839 BiRunCompletion(CompletableFuture<? extends T> src,
840 CompletableFuture<?> snd,
841 Runnable fn,
842 CompletableFuture<Void> dst, Executor executor) {
843 this.src = src; this.snd = snd;
844 this.fn = fn; this.dst = dst;
845 this.executor = executor;
846 }
847 public final void run() {
848 final CompletableFuture<? extends T> a;
849 final CompletableFuture<?> b;
850 final Runnable fn;
851 final CompletableFuture<Void> dst;
852 Object r, s; Throwable ex;
853 if ((dst = this.dst) != null &&
854 (fn = this.fn) != null &&
855 (a = this.src) != null &&
856 (r = a.result) != null &&
857 (b = this.snd) != null &&
858 (s = b.result) != null &&
859 compareAndSet(0, 1)) {
860 if (r instanceof AltResult)
861 ex = ((AltResult)r).ex;
862 else
863 ex = null;
864 if (ex == null && (s instanceof AltResult))
865 ex = ((AltResult)s).ex;
866 Executor e = executor;
867 if (ex == null) {
868 try {
869 if (e != null)
870 e.execute(new AsyncRun(fn, dst));
871 else
872 fn.run();
873 } catch (Throwable rex) {
874 ex = rex;
875 }
876 }
877 if (e == null || ex != null)
878 dst.internalComplete(null, ex);
879 }
880 }
881 private static final long serialVersionUID = 5232453952276885070L;
882 }
883
884 static final class OrApplyCompletion<T,U> extends Completion {
885 final CompletableFuture<? extends T> src;
886 final CompletableFuture<? extends T> snd;
887 final Fun<? super T,? extends U> fn;
888 final CompletableFuture<U> dst;
889 final Executor executor;
890 OrApplyCompletion(CompletableFuture<? extends T> src,
891 CompletableFuture<? extends T> snd,
892 Fun<? super T,? extends U> fn,
893 CompletableFuture<U> dst, Executor executor) {
894 this.src = src; this.snd = snd;
895 this.fn = fn; this.dst = dst;
896 this.executor = executor;
897 }
898 public final void run() {
899 final CompletableFuture<? extends T> a;
900 final CompletableFuture<? extends T> b;
901 final Fun<? super T,? extends U> fn;
902 final CompletableFuture<U> dst;
903 Object r; T t; Throwable ex;
904 if ((dst = this.dst) != null &&
905 (fn = this.fn) != null &&
906 (((a = this.src) != null && (r = a.result) != null) ||
907 ((b = this.snd) != null && (r = b.result) != null)) &&
908 compareAndSet(0, 1)) {
909 if (r instanceof AltResult) {
910 ex = ((AltResult)r).ex;
911 t = null;
912 }
913 else {
914 ex = null;
915 @SuppressWarnings("unchecked") T tr = (T) r;
916 t = tr;
917 }
918 Executor e = executor;
919 U u = null;
920 if (ex == null) {
921 try {
922 if (e != null)
923 e.execute(new AsyncApply<T,U>(t, fn, dst));
924 else
925 u = fn.apply(t);
926 } catch (Throwable rex) {
927 ex = rex;
928 }
929 }
930 if (e == null || ex != null)
931 dst.internalComplete(u, ex);
932 }
933 }
934 private static final long serialVersionUID = 5232453952276885070L;
935 }
936
937 static final class OrAcceptCompletion<T> extends Completion {
938 final CompletableFuture<? extends T> src;
939 final CompletableFuture<? extends T> snd;
940 final Action<? super T> fn;
941 final CompletableFuture<Void> dst;
942 final Executor executor;
943 OrAcceptCompletion(CompletableFuture<? extends T> src,
944 CompletableFuture<? extends T> snd,
945 Action<? super T> fn,
946 CompletableFuture<Void> dst, Executor executor) {
947 this.src = src; this.snd = snd;
948 this.fn = fn; this.dst = dst;
949 this.executor = executor;
950 }
951 public final void run() {
952 final CompletableFuture<? extends T> a;
953 final CompletableFuture<? extends T> b;
954 final Action<? super T> fn;
955 final CompletableFuture<Void> dst;
956 Object r; T t; Throwable ex;
957 if ((dst = this.dst) != null &&
958 (fn = this.fn) != null &&
959 (((a = this.src) != null && (r = a.result) != null) ||
960 ((b = this.snd) != null && (r = b.result) != null)) &&
961 compareAndSet(0, 1)) {
962 if (r instanceof AltResult) {
963 ex = ((AltResult)r).ex;
964 t = null;
965 }
966 else {
967 ex = null;
968 @SuppressWarnings("unchecked") T tr = (T) r;
969 t = tr;
970 }
971 Executor e = executor;
972 if (ex == null) {
973 try {
974 if (e != null)
975 e.execute(new AsyncAccept<T>(t, fn, dst));
976 else
977 fn.accept(t);
978 } catch (Throwable rex) {
979 ex = rex;
980 }
981 }
982 if (e == null || ex != null)
983 dst.internalComplete(null, ex);
984 }
985 }
986 private static final long serialVersionUID = 5232453952276885070L;
987 }
988
989 static final class OrRunCompletion<T> extends Completion {
990 final CompletableFuture<? extends T> src;
991 final CompletableFuture<?> snd;
992 final Runnable fn;
993 final CompletableFuture<Void> dst;
994 final Executor executor;
995 OrRunCompletion(CompletableFuture<? extends T> src,
996 CompletableFuture<?> snd,
997 Runnable fn,
998 CompletableFuture<Void> dst, Executor executor) {
999 this.src = src; this.snd = snd;
1000 this.fn = fn; this.dst = dst;
1001 this.executor = executor;
1002 }
1003 public final void run() {
1004 final CompletableFuture<? extends T> a;
1005 final CompletableFuture<?> b;
1006 final Runnable fn;
1007 final CompletableFuture<Void> dst;
1008 Object r; Throwable ex;
1009 if ((dst = this.dst) != null &&
1010 (fn = this.fn) != null &&
1011 (((a = this.src) != null && (r = a.result) != null) ||
1012 ((b = this.snd) != null && (r = b.result) != null)) &&
1013 compareAndSet(0, 1)) {
1014 if (r instanceof AltResult)
1015 ex = ((AltResult)r).ex;
1016 else
1017 ex = null;
1018 Executor e = executor;
1019 if (ex == null) {
1020 try {
1021 if (e != null)
1022 e.execute(new AsyncRun(fn, dst));
1023 else
1024 fn.run();
1025 } catch (Throwable rex) {
1026 ex = rex;
1027 }
1028 }
1029 if (e == null || ex != null)
1030 dst.internalComplete(null, ex);
1031 }
1032 }
1033 private static final long serialVersionUID = 5232453952276885070L;
1034 }
1035
1036 static final class ExceptionCompletion<T> extends Completion {
1037 final CompletableFuture<? extends T> src;
1038 final Fun<? super Throwable, ? extends T> fn;
1039 final CompletableFuture<T> dst;
1040 ExceptionCompletion(CompletableFuture<? extends T> src,
1041 Fun<? super Throwable, ? extends T> fn,
1042 CompletableFuture<T> dst) {
1043 this.src = src; this.fn = fn; this.dst = dst;
1044 }
1045 public final void run() {
1046 final CompletableFuture<? extends T> a;
1047 final Fun<? super Throwable, ? extends T> fn;
1048 final CompletableFuture<T> dst;
1049 Object r; T t = null; Throwable ex, dx = null;
1050 if ((dst = this.dst) != null &&
1051 (fn = this.fn) != null &&
1052 (a = this.src) != null &&
1053 (r = a.result) != null &&
1054 compareAndSet(0, 1)) {
1055 if ((r instanceof AltResult) &&
1056 (ex = ((AltResult)r).ex) != null) {
1057 try {
1058 t = fn.apply(ex);
1059 } catch (Throwable rex) {
1060 dx = rex;
1061 }
1062 }
1063 else {
1064 @SuppressWarnings("unchecked") T tr = (T) r;
1065 t = tr;
1066 }
1067 dst.internalComplete(t, dx);
1068 }
1069 }
1070 private static final long serialVersionUID = 5232453952276885070L;
1071 }
1072
1073 static final class ThenCopy<T> extends Completion {
1074 final CompletableFuture<? extends T> src;
1075 final CompletableFuture<T> dst;
1076 ThenCopy(CompletableFuture<? extends T> src,
1077 CompletableFuture<T> dst) {
1078 this.src = src; this.dst = dst;
1079 }
1080 public final void run() {
1081 final CompletableFuture<? extends T> a;
1082 final CompletableFuture<T> dst;
1083 Object r; Object t; Throwable ex;
1084 if ((dst = this.dst) != null &&
1085 (a = this.src) != null &&
1086 (r = a.result) != null &&
1087 compareAndSet(0, 1)) {
1088 if (r instanceof AltResult) {
1089 ex = ((AltResult)r).ex;
1090 t = null;
1091 }
1092 else {
1093 ex = null;
1094 t = r;
1095 }
1096 dst.internalComplete(t, ex);
1097 }
1098 }
1099 private static final long serialVersionUID = 5232453952276885070L;
1100 }
1101
1102 static final class HandleCompletion<T,U> extends Completion {
1103 final CompletableFuture<? extends T> src;
1104 final BiFun<? super T, Throwable, ? extends U> fn;
1105 final CompletableFuture<U> dst;
1106 HandleCompletion(CompletableFuture<? extends T> src,
1107 BiFun<? super T, Throwable, ? extends U> fn,
1108 final CompletableFuture<U> dst) {
1109 this.src = src; this.fn = fn; this.dst = dst;
1110 }
1111 public final void run() {
1112 final CompletableFuture<? extends T> a;
1113 final BiFun<? super T, Throwable, ? extends U> fn;
1114 final CompletableFuture<U> dst;
1115 Object r; T t; Throwable ex;
1116 if ((dst = this.dst) != null &&
1117 (fn = this.fn) != null &&
1118 (a = this.src) != null &&
1119 (r = a.result) != null &&
1120 compareAndSet(0, 1)) {
1121 if (r instanceof AltResult) {
1122 ex = ((AltResult)r).ex;
1123 t = null;
1124 }
1125 else {
1126 ex = null;
1127 @SuppressWarnings("unchecked") T tr = (T) r;
1128 t = tr;
1129 }
1130 U u = null; Throwable dx = null;
1131 try {
1132 u = fn.apply(t, ex);
1133 } catch (Throwable rex) {
1134 dx = rex;
1135 }
1136 dst.internalComplete(u, dx);
1137 }
1138 }
1139 private static final long serialVersionUID = 5232453952276885070L;
1140 }
1141
1142 static final class ComposeCompletion<T,U> extends Completion {
1143 final CompletableFuture<? extends T> src;
1144 final Fun<? super T, CompletableFuture<U>> fn;
1145 final CompletableFuture<U> dst;
1146 ComposeCompletion(CompletableFuture<? extends T> src,
1147 Fun<? super T, CompletableFuture<U>> fn,
1148 final CompletableFuture<U> dst) {
1149 this.src = src; this.fn = fn; this.dst = dst;
1150 }
1151 public final void run() {
1152 final CompletableFuture<? extends T> a;
1153 final Fun<? super T, CompletableFuture<U>> fn;
1154 final CompletableFuture<U> dst;
1155 Object r; T t; Throwable ex;
1156 if ((dst = this.dst) != null &&
1157 (fn = this.fn) != null &&
1158 (a = this.src) != null &&
1159 (r = a.result) != null &&
1160 compareAndSet(0, 1)) {
1161 if (r instanceof AltResult) {
1162 ex = ((AltResult)r).ex;
1163 t = null;
1164 }
1165 else {
1166 ex = null;
1167 @SuppressWarnings("unchecked") T tr = (T) r;
1168 t = tr;
1169 }
1170 CompletableFuture<U> c = null;
1171 U u = null;
1172 boolean complete = false;
1173 if (ex == null) {
1174 try {
1175 c = fn.apply(t);
1176 } catch (Throwable rex) {
1177 ex = rex;
1178 }
1179 }
1180 if (ex != null || c == null) {
1181 if (ex == null)
1182 ex = new NullPointerException();
1183 }
1184 else {
1185 ThenCopy<U> d = null;
1186 Object s;
1187 if ((s = c.result) == null) {
1188 CompletionNode p = new CompletionNode
1189 (d = new ThenCopy<U>(c, dst));
1190 while ((s = c.result) == null) {
1191 if (UNSAFE.compareAndSwapObject
1192 (c, COMPLETIONS, p.next = c.completions, p))
1193 break;
1194 }
1195 }
1196 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1197 complete = true;
1198 if (s instanceof AltResult) {
1199 ex = ((AltResult)s).ex; // no rewrap
1200 u = null;
1201 }
1202 else {
1203 @SuppressWarnings("unchecked") U us = (U) s;
1204 u = us;
1205 }
1206 }
1207 }
1208 if (complete || ex != null)
1209 dst.internalComplete(u, ex);
1210 if (c != null)
1211 c.helpPostComplete();
1212 }
1213 }
1214 private static final long serialVersionUID = 5232453952276885070L;
1215 }
1216
1217 // public methods
1218
1219 /**
1220 * Creates a new incomplete CompletableFuture.
1221 */
1222 public CompletableFuture() {
1223 }
1224
1225 /**
1226 * Asynchronously executes in the {@link
1227 * ForkJoinPool#commonPool()}, a task that completes the returned
1228 * CompletableFuture with the result of the given Supplier.
1229 *
1230 * @param supplier a function returning the value to be used
1231 * to complete the returned CompletableFuture
1232 * @return the CompletableFuture
1233 */
1234 public static <U> CompletableFuture<U> supplyAsync(Generator<U> supplier) {
1235 if (supplier == null) throw new NullPointerException();
1236 CompletableFuture<U> f = new CompletableFuture<U>();
1237 ForkJoinPool.commonPool().
1238 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1239 return f;
1240 }
1241
1242 /**
1243 * Asynchronously executes using the given executor, a task that
1244 * completes the returned CompletableFuture with the result of the
1245 * given Supplier.
1246 *
1247 * @param supplier a function returning the value to be used
1248 * to complete the returned CompletableFuture
1249 * @param executor the executor to use for asynchronous execution
1250 * @return the CompletableFuture
1251 */
1252 public static <U> CompletableFuture<U> supplyAsync(Generator<U> supplier,
1253 Executor executor) {
1254 if (executor == null || supplier == null)
1255 throw new NullPointerException();
1256 CompletableFuture<U> f = new CompletableFuture<U>();
1257 executor.execute(new AsyncSupply<U>(supplier, f));
1258 return f;
1259 }
1260
1261 /**
1262 * Asynchronously executes in the {@link
1263 * ForkJoinPool#commonPool()} a task that runs the given action,
1264 * and then completes the returned CompletableFuture.
1265 *
1266 * @param runnable the action to run before completing the
1267 * returned CompletableFuture
1268 * @return the CompletableFuture
1269 */
1270 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1271 if (runnable == null) throw new NullPointerException();
1272 CompletableFuture<Void> f = new CompletableFuture<Void>();
1273 ForkJoinPool.commonPool().
1274 execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1275 return f;
1276 }
1277
1278 /**
1279 * Asynchronously executes using the given executor, a task that
1280 * runs the given action, and then completes the returned
1281 * CompletableFuture.
1282 *
1283 * @param runnable the action to run before completing the
1284 * returned CompletableFuture
1285 * @param executor the executor to use for asynchronous execution
1286 * @return the CompletableFuture
1287 */
1288 public static CompletableFuture<Void> runAsync(Runnable runnable,
1289 Executor executor) {
1290 if (executor == null || runnable == null)
1291 throw new NullPointerException();
1292 CompletableFuture<Void> f = new CompletableFuture<Void>();
1293 executor.execute(new AsyncRun(runnable, f));
1294 return f;
1295 }
1296
1297 /**
1298 * Returns {@code true} if completed in any fashion: normally,
1299 * exceptionally, or via cancellation.
1300 *
1301 * @return {@code true} if completed
1302 */
1303 public boolean isDone() {
1304 return result != null;
1305 }
1306
1307 /**
1308 * Waits if necessary for the computation to complete, and then
1309 * retrieves its result.
1310 *
1311 * @return the computed result
1312 * @throws CancellationException if the computation was cancelled
1313 * @throws ExecutionException if the computation threw an
1314 * exception
1315 * @throws InterruptedException if the current thread was interrupted
1316 * while waiting
1317 */
1318 public T get() throws InterruptedException, ExecutionException {
1319 Object r; Throwable ex, cause;
1320 if ((r = result) == null && (r = waitingGet(true)) == null)
1321 throw new InterruptedException();
1322 if (!(r instanceof AltResult)) {
1323 @SuppressWarnings("unchecked") T tr = (T) r;
1324 return tr;
1325 }
1326 if ((ex = ((AltResult)r).ex) == null)
1327 return null;
1328 if (ex instanceof CancellationException)
1329 throw (CancellationException)ex;
1330 if ((ex instanceof CompletionException) &&
1331 (cause = ex.getCause()) != null)
1332 ex = cause;
1333 throw new ExecutionException(ex);
1334 }
1335
1336 /**
1337 * Waits if necessary for at most the given time for completion,
1338 * and then retrieves its result, if available.
1339 *
1340 * @param timeout the maximum time to wait
1341 * @param unit the time unit of the timeout argument
1342 * @return the computed result
1343 * @throws CancellationException if the computation was cancelled
1344 * @throws ExecutionException if the computation threw an
1345 * exception
1346 * @throws InterruptedException if the current thread was interrupted
1347 * while waiting
1348 * @throws TimeoutException if the wait timed out
1349 */
1350 public T get(long timeout, TimeUnit unit)
1351 throws InterruptedException, ExecutionException, TimeoutException {
1352 Object r; Throwable ex, cause;
1353 long nanos = unit.toNanos(timeout);
1354 if (Thread.interrupted())
1355 throw new InterruptedException();
1356 if ((r = result) == null)
1357 r = timedAwaitDone(nanos);
1358 if (!(r instanceof AltResult)) {
1359 @SuppressWarnings("unchecked") T tr = (T) r;
1360 return tr;
1361 }
1362 if ((ex = ((AltResult)r).ex) == null)
1363 return null;
1364 if (ex instanceof CancellationException)
1365 throw (CancellationException)ex;
1366 if ((ex instanceof CompletionException) &&
1367 (cause = ex.getCause()) != null)
1368 ex = cause;
1369 throw new ExecutionException(ex);
1370 }
1371
1372 /**
1373 * Returns the result value when complete, or throws an
1374 * (unchecked) exception if completed exceptionally. To better
1375 * conform with the use of common functional forms, if a
1376 * computation involved in the completion of this
1377 * CompletableFuture threw an exception, this method throws an
1378 * (unchecked) {@link CompletionException} with the underlying
1379 * exception as its cause.
1380 *
1381 * @return the result value
1382 * @throws CancellationException if the computation was cancelled
1383 * @throws CompletionException if a completion computation threw
1384 * an exception
1385 */
1386 public T join() {
1387 Object r; Throwable ex;
1388 if ((r = result) == null)
1389 r = waitingGet(false);
1390 if (!(r instanceof AltResult)) {
1391 @SuppressWarnings("unchecked") T tr = (T) r;
1392 return tr;
1393 }
1394 if ((ex = ((AltResult)r).ex) == null)
1395 return null;
1396 if (ex instanceof CancellationException)
1397 throw (CancellationException)ex;
1398 if (ex instanceof CompletionException)
1399 throw (CompletionException)ex;
1400 throw new CompletionException(ex);
1401 }
1402
1403 /**
1404 * Returns the result value (or throws any encountered exception)
1405 * if completed, else returns the given valueIfAbsent.
1406 *
1407 * @param valueIfAbsent the value to return if not completed
1408 * @return the result value, if completed, else the given valueIfAbsent
1409 * @throws CancellationException if the computation was cancelled
1410 * @throws CompletionException if a completion computation threw
1411 * an exception
1412 */
1413 public T getNow(T valueIfAbsent) {
1414 Object r; Throwable ex;
1415 if ((r = result) == null)
1416 return valueIfAbsent;
1417 if (!(r instanceof AltResult)) {
1418 @SuppressWarnings("unchecked") T tr = (T) r;
1419 return tr;
1420 }
1421 if ((ex = ((AltResult)r).ex) == null)
1422 return null;
1423 if (ex instanceof CancellationException)
1424 throw (CancellationException)ex;
1425 if (ex instanceof CompletionException)
1426 throw (CompletionException)ex;
1427 throw new CompletionException(ex);
1428 }
1429
1430 /**
1431 * If not already completed, sets the value returned by {@link
1432 * #get()} and related methods to the given value.
1433 *
1434 * @param value the result value
1435 * @return {@code true} if this invocation caused this CompletableFuture
1436 * to transition to a completed state, else {@code false}
1437 */
1438 public boolean complete(T value) {
1439 boolean triggered = result == null &&
1440 UNSAFE.compareAndSwapObject(this, RESULT, null,
1441 value == null ? NIL : value);
1442 postComplete();
1443 return triggered;
1444 }
1445
1446 /**
1447 * If not already completed, causes invocations of {@link #get()}
1448 * and related methods to throw the given exception.
1449 *
1450 * @param ex the exception
1451 * @return {@code true} if this invocation caused this CompletableFuture
1452 * to transition to a completed state, else {@code false}
1453 */
1454 public boolean completeExceptionally(Throwable ex) {
1455 if (ex == null) throw new NullPointerException();
1456 boolean triggered = result == null &&
1457 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1458 postComplete();
1459 return triggered;
1460 }
1461
1462 /**
1463 * Creates and returns a CompletableFuture that is completed with
1464 * the result of the given function of this CompletableFuture.
1465 * If this CompletableFuture completes exceptionally,
1466 * then the returned CompletableFuture also does so,
1467 * with a CompletionException holding this exception as
1468 * its cause.
1469 *
1470 * @param fn the function to use to compute the value of
1471 * the returned CompletableFuture
1472 * @return the new CompletableFuture
1473 */
1474 public <U> CompletableFuture<U> thenApply(Fun<? super T,? extends U> fn) {
1475 return doThenApply(fn, null);
1476 }
1477
1478 /**
1479 * Creates and returns a CompletableFuture that is asynchronously
1480 * completed using the {@link ForkJoinPool#commonPool()} with the
1481 * result of the given function of this CompletableFuture. If
1482 * this CompletableFuture completes exceptionally, then the
1483 * returned CompletableFuture also does so, with a
1484 * CompletionException holding this exception as its cause.
1485 *
1486 * @param fn the function to use to compute the value of
1487 * the returned CompletableFuture
1488 * @return the new CompletableFuture
1489 */
1490 public <U> CompletableFuture<U> thenApplyAsync(Fun<? super T,? extends U> fn) {
1491 return doThenApply(fn, ForkJoinPool.commonPool());
1492 }
1493
1494 /**
1495 * Creates and returns a CompletableFuture that is asynchronously
1496 * completed using the given executor with the result of the given
1497 * function of this CompletableFuture. If this CompletableFuture
1498 * completes exceptionally, then the returned CompletableFuture
1499 * also does so, with a CompletionException holding this exception as
1500 * its cause.
1501 *
1502 * @param fn the function to use to compute the value of
1503 * the returned CompletableFuture
1504 * @param executor the executor to use for asynchronous execution
1505 * @return the new CompletableFuture
1506 */
1507 public <U> CompletableFuture<U> thenApplyAsync(Fun<? super T,? extends U> fn,
1508 Executor executor) {
1509 if (executor == null) throw new NullPointerException();
1510 return doThenApply(fn, executor);
1511 }
1512
1513 private <U> CompletableFuture<U> doThenApply(Fun<? super T,? extends U> fn,
1514 Executor e) {
1515 if (fn == null) throw new NullPointerException();
1516 CompletableFuture<U> dst = new CompletableFuture<U>();
1517 ApplyCompletion<T,U> d = null;
1518 Object r;
1519 if ((r = result) == null) {
1520 CompletionNode p = new CompletionNode
1521 (d = new ApplyCompletion<T,U>(this, fn, dst, e));
1522 while ((r = result) == null) {
1523 if (UNSAFE.compareAndSwapObject
1524 (this, COMPLETIONS, p.next = completions, p))
1525 break;
1526 }
1527 }
1528 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1529 T t; Throwable ex;
1530 if (r instanceof AltResult) {
1531 ex = ((AltResult)r).ex;
1532 t = null;
1533 }
1534 else {
1535 ex = null;
1536 @SuppressWarnings("unchecked") T tr = (T) r;
1537 t = tr;
1538 }
1539 U u = null;
1540 if (ex == null) {
1541 try {
1542 if (e != null)
1543 e.execute(new AsyncApply<T,U>(t, fn, dst));
1544 else
1545 u = fn.apply(t);
1546 } catch (Throwable rex) {
1547 ex = rex;
1548 }
1549 }
1550 if (e == null || ex != null)
1551 dst.internalComplete(u, ex);
1552 }
1553 helpPostComplete();
1554 return dst;
1555 }
1556
1557 /**
1558 * Creates and returns a CompletableFuture that is completed after
1559 * performing the given action with this CompletableFuture's
1560 * result when it completes. If this CompletableFuture
1561 * completes exceptionally, then the returned CompletableFuture
1562 * also does so, with a CompletionException holding this exception as
1563 * its cause.
1564 *
1565 * @param block the action to perform before completing the
1566 * returned CompletableFuture
1567 * @return the new CompletableFuture
1568 */
1569 public CompletableFuture<Void> thenAccept(Action<? super T> block) {
1570 return doThenAccept(block, null);
1571 }
1572
1573 /**
1574 * Creates and returns a CompletableFuture that is asynchronously
1575 * completed using the {@link ForkJoinPool#commonPool()} with this
1576 * CompletableFuture's result when it completes. If this
1577 * CompletableFuture completes exceptionally, then the returned
1578 * CompletableFuture also does so, with a CompletionException holding
1579 * this exception as its cause.
1580 *
1581 * @param block the action to perform before completing the
1582 * returned CompletableFuture
1583 * @return the new CompletableFuture
1584 */
1585 public CompletableFuture<Void> thenAcceptAsync(Action<? super T> block) {
1586 return doThenAccept(block, ForkJoinPool.commonPool());
1587 }
1588
1589 /**
1590 * Creates and returns a CompletableFuture that is asynchronously
1591 * completed using the given executor with this
1592 * CompletableFuture's result when it completes. If this
1593 * CompletableFuture completes exceptionally, then the returned
1594 * CompletableFuture also does so, with a CompletionException holding
1595 * this exception as its cause.
1596 *
1597 * @param block the action to perform before completing the
1598 * returned CompletableFuture
1599 * @param executor the executor to use for asynchronous execution
1600 * @return the new CompletableFuture
1601 */
1602 public CompletableFuture<Void> thenAcceptAsync(Action<? super T> block,
1603 Executor executor) {
1604 if (executor == null) throw new NullPointerException();
1605 return doThenAccept(block, executor);
1606 }
1607
1608 private CompletableFuture<Void> doThenAccept(Action<? super T> fn,
1609 Executor e) {
1610 if (fn == null) throw new NullPointerException();
1611 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1612 AcceptCompletion<T> d = null;
1613 Object r;
1614 if ((r = result) == null) {
1615 CompletionNode p = new CompletionNode
1616 (d = new AcceptCompletion<T>(this, fn, dst, e));
1617 while ((r = result) == null) {
1618 if (UNSAFE.compareAndSwapObject
1619 (this, COMPLETIONS, p.next = completions, p))
1620 break;
1621 }
1622 }
1623 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1624 T t; Throwable ex;
1625 if (r instanceof AltResult) {
1626 ex = ((AltResult)r).ex;
1627 t = null;
1628 }
1629 else {
1630 ex = null;
1631 @SuppressWarnings("unchecked") T tr = (T) r;
1632 t = tr;
1633 }
1634 if (ex == null) {
1635 try {
1636 if (e != null)
1637 e.execute(new AsyncAccept<T>(t, fn, dst));
1638 else
1639 fn.accept(t);
1640 } catch (Throwable rex) {
1641 ex = rex;
1642 }
1643 }
1644 if (e == null || ex != null)
1645 dst.internalComplete(null, ex);
1646 }
1647 helpPostComplete();
1648 return dst;
1649 }
1650
1651 /**
1652 * Creates and returns a CompletableFuture that is completed after
1653 * performing the given action when this CompletableFuture
1654 * completes. If this CompletableFuture completes exceptionally,
1655 * then the returned CompletableFuture also does so, with a
1656 * CompletionException holding this exception as its cause.
1657 *
1658 * @param action the action to perform before completing the
1659 * returned CompletableFuture
1660 * @return the new CompletableFuture
1661 */
1662 public CompletableFuture<Void> thenRun(Runnable action) {
1663 return doThenRun(action, null);
1664 }
1665
1666 /**
1667 * Creates and returns a CompletableFuture that is asynchronously
1668 * completed using the {@link ForkJoinPool#commonPool()} after
1669 * performing the given action when this CompletableFuture
1670 * completes. If this CompletableFuture completes exceptionally,
1671 * then the returned CompletableFuture also does so, with a
1672 * CompletionException holding this exception as its cause.
1673 *
1674 * @param action the action to perform before completing the
1675 * returned CompletableFuture
1676 * @return the new CompletableFuture
1677 */
1678 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1679 return doThenRun(action, ForkJoinPool.commonPool());
1680 }
1681
1682 /**
1683 * Creates and returns a CompletableFuture that is asynchronously
1684 * completed using the given executor after performing the given
1685 * action when this CompletableFuture completes. If this
1686 * CompletableFuture completes exceptionally, then the returned
1687 * CompletableFuture also does so, with a CompletionException holding
1688 * this exception as its cause.
1689 *
1690 * @param action the action to perform before completing the
1691 * returned CompletableFuture
1692 * @param executor the executor to use for asynchronous execution
1693 * @return the new CompletableFuture
1694 */
1695 public CompletableFuture<Void> thenRunAsync(Runnable action,
1696 Executor executor) {
1697 if (executor == null) throw new NullPointerException();
1698 return doThenRun(action, executor);
1699 }
1700
1701 private CompletableFuture<Void> doThenRun(Runnable action,
1702 Executor e) {
1703 if (action == null) throw new NullPointerException();
1704 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1705 RunCompletion<T> d = null;
1706 Object r;
1707 if ((r = result) == null) {
1708 CompletionNode p = new CompletionNode
1709 (d = new RunCompletion<T>(this, action, dst, e));
1710 while ((r = result) == null) {
1711 if (UNSAFE.compareAndSwapObject
1712 (this, COMPLETIONS, p.next = completions, p))
1713 break;
1714 }
1715 }
1716 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1717 Throwable ex;
1718 if (r instanceof AltResult)
1719 ex = ((AltResult)r).ex;
1720 else
1721 ex = null;
1722 if (ex == null) {
1723 try {
1724 if (e != null)
1725 e.execute(new AsyncRun(action, dst));
1726 else
1727 action.run();
1728 } catch (Throwable rex) {
1729 ex = rex;
1730 }
1731 }
1732 if (e == null || ex != null)
1733 dst.internalComplete(null, ex);
1734 }
1735 helpPostComplete();
1736 return dst;
1737 }
1738
1739 /**
1740 * Creates and returns a CompletableFuture that is completed with
1741 * the result of the given function of this and the other given
1742 * CompletableFuture's results when both complete. If this or
1743 * the other CompletableFuture complete exceptionally, then the
1744 * returned CompletableFuture also does so, with a
1745 * CompletionException holding the exception as its cause.
1746 *
1747 * @param other the other CompletableFuture
1748 * @param fn the function to use to compute the value of
1749 * the returned CompletableFuture
1750 * @return the new CompletableFuture
1751 */
1752 public <U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other,
1753 BiFun<? super T,? super U,? extends V> fn) {
1754 return doThenBiApply(other, fn, null);
1755 }
1756
1757 /**
1758 * Creates and returns a CompletableFuture that is asynchronously
1759 * completed using the {@link ForkJoinPool#commonPool()} with
1760 * the result of the given function of this and the other given
1761 * CompletableFuture's results when both complete. If this or
1762 * the other CompletableFuture complete exceptionally, then the
1763 * returned CompletableFuture also does so, with a
1764 * CompletionException holding the exception as its cause.
1765 *
1766 * @param other the other CompletableFuture
1767 * @param fn the function to use to compute the value of
1768 * the returned CompletableFuture
1769 * @return the new CompletableFuture
1770 */
1771 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
1772 BiFun<? super T,? super U,? extends V> fn) {
1773 return doThenBiApply(other, fn, ForkJoinPool.commonPool());
1774 }
1775
1776 /**
1777 * Creates and returns a CompletableFuture that is
1778 * asynchronously completed using the given executor with the
1779 * result of the given function of this and the other given
1780 * CompletableFuture's results when both complete. If this or
1781 * the other CompletableFuture complete exceptionally, then the
1782 * returned CompletableFuture also does so, with a
1783 * CompletionException holding the exception as its cause.
1784 *
1785 * @param other the other CompletableFuture
1786 * @param fn the function to use to compute the value of
1787 * the returned CompletableFuture
1788 * @param executor the executor to use for asynchronous execution
1789 * @return the new CompletableFuture
1790 */
1791
1792 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
1793 BiFun<? super T,? super U,? extends V> fn,
1794 Executor executor) {
1795 if (executor == null) throw new NullPointerException();
1796 return doThenBiApply(other, fn, executor);
1797 }
1798
1799 private <U,V> CompletableFuture<V> doThenBiApply(CompletableFuture<? extends U> other,
1800 BiFun<? super T,? super U,? extends V> fn,
1801 Executor e) {
1802 if (other == null || fn == null) throw new NullPointerException();
1803 CompletableFuture<V> dst = new CompletableFuture<V>();
1804 BiApplyCompletion<T,U,V> d = null;
1805 Object r, s = null;
1806 if ((r = result) == null || (s = other.result) == null) {
1807 d = new BiApplyCompletion<T,U,V>(this, other, fn, dst, e);
1808 CompletionNode q = null, p = new CompletionNode(d);
1809 while ((r == null && (r = result) == null) ||
1810 (s == null && (s = other.result) == null)) {
1811 if (q != null) {
1812 if (s != null ||
1813 UNSAFE.compareAndSwapObject
1814 (other, COMPLETIONS, q.next = other.completions, q))
1815 break;
1816 }
1817 else if (r != null ||
1818 UNSAFE.compareAndSwapObject
1819 (this, COMPLETIONS, p.next = completions, p)) {
1820 if (s != null)
1821 break;
1822 q = new CompletionNode(d);
1823 }
1824 }
1825 }
1826 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1827 T t; U u; Throwable ex;
1828 if (r instanceof AltResult) {
1829 ex = ((AltResult)r).ex;
1830 t = null;
1831 }
1832 else {
1833 ex = null;
1834 @SuppressWarnings("unchecked") T tr = (T) r;
1835 t = tr;
1836 }
1837 if (ex != null)
1838 u = null;
1839 else if (s instanceof AltResult) {
1840 ex = ((AltResult)s).ex;
1841 u = null;
1842 }
1843 else {
1844 @SuppressWarnings("unchecked") U us = (U) s;
1845 u = us;
1846 }
1847 V v = null;
1848 if (ex == null) {
1849 try {
1850 if (e != null)
1851 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
1852 else
1853 v = fn.apply(t, u);
1854 } catch (Throwable rex) {
1855 ex = rex;
1856 }
1857 }
1858 if (e == null || ex != null)
1859 dst.internalComplete(v, ex);
1860 }
1861 helpPostComplete();
1862 other.helpPostComplete();
1863 return dst;
1864 }
1865
1866 /**
1867 * Creates and returns a CompletableFuture that is completed with
1868 * the results of this and the other given CompletableFuture if
1869 * both complete. If this and/or the other CompletableFuture
1870 * complete exceptionally, then the returned CompletableFuture
1871 * also does so, with a CompletionException holding one of these
1872 * exceptions as its cause.
1873 *
1874 * @param other the other CompletableFuture
1875 * @param block the action to perform before completing the
1876 * returned CompletableFuture
1877 * @return the new CompletableFuture
1878 */
1879 public <U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other,
1880 BiAction<? super T, ? super U> block) {
1881 return doThenBiAccept(other, block, null);
1882 }
1883
1884 /**
1885 * Creates and returns a CompletableFuture that is completed
1886 * asynchronously using the {@link ForkJoinPool#commonPool()} with
1887 * the results of this and the other given CompletableFuture when
1888 * both complete. If this and/or the other CompletableFuture
1889 * complete exceptionally, then the returned CompletableFuture
1890 * also does so, with a CompletionException holding one of these
1891 * exceptions as its cause.
1892 *
1893 * @param other the other CompletableFuture
1894 * @param block the action to perform before completing the
1895 * returned CompletableFuture
1896 * @return the new CompletableFuture
1897 */
1898 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
1899 BiAction<? super T, ? super U> block) {
1900 return doThenBiAccept(other, block, ForkJoinPool.commonPool());
1901 }
1902
1903 /**
1904 * Creates and returns a CompletableFuture that is completed
1905 * asynchronously using the given executor with the results of
1906 * this and the other given CompletableFuture when both complete.
1907 * If this and/or the other CompletableFuture complete
1908 * exceptionally, then the returned CompletableFuture also does
1909 * so, with a CompletionException holding one of these exceptions as
1910 * its cause.
1911 *
1912 * @param other the other CompletableFuture
1913 * @param block the action to perform before completing the
1914 * returned CompletableFuture
1915 * @param executor the executor to use for asynchronous execution
1916 * @return the new CompletableFuture
1917 */
1918 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
1919 BiAction<? super T, ? super U> block,
1920 Executor executor) {
1921 if (executor == null) throw new NullPointerException();
1922 return doThenBiAccept(other, block, executor);
1923 }
1924
1925 private <U> CompletableFuture<Void> doThenBiAccept(CompletableFuture<? extends U> other,
1926 BiAction<? super T,? super U> fn,
1927 Executor e) {
1928 if (other == null || fn == null) throw new NullPointerException();
1929 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1930 BiAcceptCompletion<T,U> d = null;
1931 Object r, s = null;
1932 if ((r = result) == null || (s = other.result) == null) {
1933 d = new BiAcceptCompletion<T,U>(this, other, fn, dst, e);
1934 CompletionNode q = null, p = new CompletionNode(d);
1935 while ((r == null && (r = result) == null) ||
1936 (s == null && (s = other.result) == null)) {
1937 if (q != null) {
1938 if (s != null ||
1939 UNSAFE.compareAndSwapObject
1940 (other, COMPLETIONS, q.next = other.completions, q))
1941 break;
1942 }
1943 else if (r != null ||
1944 UNSAFE.compareAndSwapObject
1945 (this, COMPLETIONS, p.next = completions, p)) {
1946 if (s != null)
1947 break;
1948 q = new CompletionNode(d);
1949 }
1950 }
1951 }
1952 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1953 T t; U u; Throwable ex;
1954 if (r instanceof AltResult) {
1955 ex = ((AltResult)r).ex;
1956 t = null;
1957 }
1958 else {
1959 ex = null;
1960 @SuppressWarnings("unchecked") T tr = (T) r;
1961 t = tr;
1962 }
1963 if (ex != null)
1964 u = null;
1965 else if (s instanceof AltResult) {
1966 ex = ((AltResult)s).ex;
1967 u = null;
1968 }
1969 else {
1970 @SuppressWarnings("unchecked") U us = (U) s;
1971 u = us;
1972 }
1973 if (ex == null) {
1974 try {
1975 if (e != null)
1976 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
1977 else
1978 fn.accept(t, u);
1979 } catch (Throwable rex) {
1980 ex = rex;
1981 }
1982 }
1983 if (e == null || ex != null)
1984 dst.internalComplete(null, ex);
1985 }
1986 helpPostComplete();
1987 other.helpPostComplete();
1988 return dst;
1989 }
1990
1991 /**
1992 * Creates and returns a CompletableFuture that is completed
1993 * when this and the other given CompletableFuture both
1994 * complete. If this and/or the other CompletableFuture complete
1995 * exceptionally, then the returned CompletableFuture also does
1996 * so, with a CompletionException holding one of these exceptions as
1997 * its cause.
1998 *
1999 * @param other the other CompletableFuture
2000 * @param action the action to perform before completing the
2001 * returned CompletableFuture
2002 * @return the new CompletableFuture
2003 */
2004 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2005 Runnable action) {
2006 return doThenBiRun(other, action, null);
2007 }
2008
2009 /**
2010 * Creates and returns a CompletableFuture that is completed
2011 * asynchronously using the {@link ForkJoinPool#commonPool()}
2012 * when this and the other given CompletableFuture both
2013 * complete. If this and/or the other CompletableFuture complete
2014 * exceptionally, then the returned CompletableFuture also does
2015 * so, with a CompletionException holding one of these exceptions as
2016 * its cause.
2017 *
2018 * @param other the other CompletableFuture
2019 * @param action the action to perform before completing the
2020 * returned CompletableFuture
2021 * @return the new CompletableFuture
2022 */
2023 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2024 Runnable action) {
2025 return doThenBiRun(other, action, ForkJoinPool.commonPool());
2026 }
2027
2028 /**
2029 * Creates and returns a CompletableFuture that is completed
2030 * asynchronously using the given executor
2031 * when this and the other given CompletableFuture both
2032 * complete. If this and/or the other CompletableFuture complete
2033 * exceptionally, then the returned CompletableFuture also does
2034 * so, with a CompletionException holding one of these exceptions as
2035 * its cause.
2036 *
2037 * @param other the other CompletableFuture
2038 * @param action the action to perform before completing the
2039 * returned CompletableFuture
2040 * @param executor the executor to use for asynchronous execution
2041 * @return the new CompletableFuture
2042 */
2043 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2044 Runnable action,
2045 Executor executor) {
2046 if (executor == null) throw new NullPointerException();
2047 return doThenBiRun(other, action, executor);
2048 }
2049
2050 private CompletableFuture<Void> doThenBiRun(CompletableFuture<?> other,
2051 Runnable action,
2052 Executor e) {
2053 if (other == null || action == null) throw new NullPointerException();
2054 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2055 BiRunCompletion<T> d = null;
2056 Object r, s = null;
2057 if ((r = result) == null || (s = other.result) == null) {
2058 d = new BiRunCompletion<T>(this, other, action, dst, e);
2059 CompletionNode q = null, p = new CompletionNode(d);
2060 while ((r == null && (r = result) == null) ||
2061 (s == null && (s = other.result) == null)) {
2062 if (q != null) {
2063 if (s != null ||
2064 UNSAFE.compareAndSwapObject
2065 (other, COMPLETIONS, q.next = other.completions, q))
2066 break;
2067 }
2068 else if (r != null ||
2069 UNSAFE.compareAndSwapObject
2070 (this, COMPLETIONS, p.next = completions, p)) {
2071 if (s != null)
2072 break;
2073 q = new CompletionNode(d);
2074 }
2075 }
2076 }
2077 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2078 Throwable ex;
2079 if (r instanceof AltResult)
2080 ex = ((AltResult)r).ex;
2081 else
2082 ex = null;
2083 if (ex == null && (s instanceof AltResult))
2084 ex = ((AltResult)s).ex;
2085 if (ex == null) {
2086 try {
2087 if (e != null)
2088 e.execute(new AsyncRun(action, dst));
2089 else
2090 action.run();
2091 } catch (Throwable rex) {
2092 ex = rex;
2093 }
2094 }
2095 if (e == null || ex != null)
2096 dst.internalComplete(null, ex);
2097 }
2098 helpPostComplete();
2099 other.helpPostComplete();
2100 return dst;
2101 }
2102
2103 /**
2104 * Creates and returns a CompletableFuture that is completed with
2105 * the result of the given function of either this or the other
2106 * given CompletableFuture's results when either complete. If
2107 * this and/or the other CompletableFuture complete exceptionally,
2108 * then the returned CompletableFuture may also do so, with a
2109 * CompletionException holding one of these exceptions as its cause.
2110 * No guarantees are made about which result or exception is used
2111 * in the returned CompletableFuture.
2112 *
2113 * @param other the other CompletableFuture
2114 * @param fn the function to use to compute the value of
2115 * the returned CompletableFuture
2116 * @return the new CompletableFuture
2117 */
2118 public <U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other,
2119 Fun<? super T, U> fn) {
2120 return doOrApply(other, fn, null);
2121 }
2122
2123 /**
2124 * Creates and returns a CompletableFuture that is completed
2125 * asynchronously using the {@link ForkJoinPool#commonPool()} with
2126 * the result of the given function of either this or the other
2127 * given CompletableFuture's results when either complete. If
2128 * this and/or the other CompletableFuture complete exceptionally,
2129 * then the returned CompletableFuture may also do so, with a
2130 * CompletionException holding one of these exceptions as its cause.
2131 * No guarantees are made about which result or exception is used
2132 * in the returned CompletableFuture.
2133 *
2134 * @param other the other CompletableFuture
2135 * @param fn the function to use to compute the value of
2136 * the returned CompletableFuture
2137 * @return the new CompletableFuture
2138 */
2139 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
2140 Fun<? super T, U> fn) {
2141 return doOrApply(other, fn, ForkJoinPool.commonPool());
2142 }
2143
2144 /**
2145 * Creates and returns a CompletableFuture that is completed
2146 * asynchronously using the given executor with the result of the
2147 * given function of either this or the other given
2148 * CompletableFuture's results when either complete. If this
2149 * and/or the other CompletableFuture complete exceptionally, then
2150 * the returned CompletableFuture may also do so, with a
2151 * CompletionException holding one of these exceptions as its cause.
2152 * No guarantees are made about which result or exception is used
2153 * in the returned CompletableFuture.
2154 *
2155 * @param other the other CompletableFuture
2156 * @param fn the function to use to compute the value of
2157 * the returned CompletableFuture
2158 * @param executor the executor to use for asynchronous execution
2159 * @return the new CompletableFuture
2160 */
2161 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
2162 Fun<? super T, U> fn,
2163 Executor executor) {
2164 if (executor == null) throw new NullPointerException();
2165 return doOrApply(other, fn, executor);
2166 }
2167
2168 private <U> CompletableFuture<U> doOrApply(CompletableFuture<? extends T> other,
2169 Fun<? super T, U> fn,
2170 Executor e) {
2171 if (other == null || fn == null) throw new NullPointerException();
2172 CompletableFuture<U> dst = new CompletableFuture<U>();
2173 OrApplyCompletion<T,U> d = null;
2174 Object r;
2175 if ((r = result) == null && (r = other.result) == null) {
2176 d = new OrApplyCompletion<T,U>(this, other, fn, dst, e);
2177 CompletionNode q = null, p = new CompletionNode(d);
2178 while ((r = result) == null && (r = other.result) == null) {
2179 if (q != null) {
2180 if (UNSAFE.compareAndSwapObject
2181 (other, COMPLETIONS, q.next = other.completions, q))
2182 break;
2183 }
2184 else if (UNSAFE.compareAndSwapObject
2185 (this, COMPLETIONS, p.next = completions, p))
2186 q = new CompletionNode(d);
2187 }
2188 }
2189 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2190 T t; Throwable ex;
2191 if (r instanceof AltResult) {
2192 ex = ((AltResult)r).ex;
2193 t = null;
2194 }
2195 else {
2196 ex = null;
2197 @SuppressWarnings("unchecked") T tr = (T) r;
2198 t = tr;
2199 }
2200 U u = null;
2201 if (ex == null) {
2202 try {
2203 if (e != null)
2204 e.execute(new AsyncApply<T,U>(t, fn, dst));
2205 else
2206 u = fn.apply(t);
2207 } catch (Throwable rex) {
2208 ex = rex;
2209 }
2210 }
2211 if (e == null || ex != null)
2212 dst.internalComplete(u, ex);
2213 }
2214 helpPostComplete();
2215 other.helpPostComplete();
2216 return dst;
2217 }
2218
2219 /**
2220 * Creates and returns a CompletableFuture that is completed after
2221 * performing the given action with the result of either this or the
2222 * other given CompletableFuture's result, when either complete.
2223 * If this and/or the other CompletableFuture complete
2224 * exceptionally, then the returned CompletableFuture may also do
2225 * so, with a CompletionException holding one of these exceptions as
2226 * its cause. No guarantees are made about which exception is
2227 * used in the returned CompletableFuture.
2228 *
2229 * @param other the other CompletableFuture
2230 * @param block the action to perform before completing the
2231 * returned CompletableFuture
2232 * @return the new CompletableFuture
2233 */
2234 public CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other,
2235 Action<? super T> block) {
2236 return doOrAccept(other, block, null);
2237 }
2238
2239 /**
2240 * Creates and returns a CompletableFuture that is completed
2241 * asynchronously using the {@link ForkJoinPool#commonPool()},
2242 * performing the given action with the result of either this or
2243 * the other given CompletableFuture's result, when either
2244 * complete. If this and/or the other CompletableFuture complete
2245 * exceptionally, then the returned CompletableFuture may also do
2246 * so, with a CompletionException holding one of these exceptions as
2247 * its cause. No guarantees are made about which exception is
2248 * used in the returned CompletableFuture.
2249 *
2250 * @param other the other CompletableFuture
2251 * @param block the action to perform before completing the
2252 * returned CompletableFuture
2253 * @return the new CompletableFuture
2254 */
2255 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
2256 Action<? super T> block) {
2257 return doOrAccept(other, block, ForkJoinPool.commonPool());
2258 }
2259
2260 /**
2261 * Creates and returns a CompletableFuture that is completed
2262 * asynchronously using the given executor,
2263 * performing the given action with the result of either this or
2264 * the other given CompletableFuture's result, when either
2265 * complete. If this and/or the other CompletableFuture complete
2266 * exceptionally, then the returned CompletableFuture may also do
2267 * so, with a CompletionException holding one of these exceptions as
2268 * its cause. No guarantees are made about which exception is
2269 * used in the returned CompletableFuture.
2270 *
2271 * @param other the other CompletableFuture
2272 * @param block the action to perform before completing the
2273 * returned CompletableFuture
2274 * @param executor the executor to use for asynchronous execution
2275 * @return the new CompletableFuture
2276 */
2277 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
2278 Action<? super T> block,
2279 Executor executor) {
2280 if (executor == null) throw new NullPointerException();
2281 return doOrAccept(other, block, executor);
2282 }
2283
2284 private CompletableFuture<Void> doOrAccept(CompletableFuture<? extends T> other,
2285 Action<? super T> fn,
2286 Executor e) {
2287 if (other == null || fn == null) throw new NullPointerException();
2288 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2289 OrAcceptCompletion<T> d = null;
2290 Object r;
2291 if ((r = result) == null && (r = other.result) == null) {
2292 d = new OrAcceptCompletion<T>(this, other, fn, dst, e);
2293 CompletionNode q = null, p = new CompletionNode(d);
2294 while ((r = result) == null && (r = other.result) == null) {
2295 if (q != null) {
2296 if (UNSAFE.compareAndSwapObject
2297 (other, COMPLETIONS, q.next = other.completions, q))
2298 break;
2299 }
2300 else if (UNSAFE.compareAndSwapObject
2301 (this, COMPLETIONS, p.next = completions, p))
2302 q = new CompletionNode(d);
2303 }
2304 }
2305 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2306 T t; Throwable ex;
2307 if (r instanceof AltResult) {
2308 ex = ((AltResult)r).ex;
2309 t = null;
2310 }
2311 else {
2312 ex = null;
2313 @SuppressWarnings("unchecked") T tr = (T) r;
2314 t = tr;
2315 }
2316 if (ex == null) {
2317 try {
2318 if (e != null)
2319 e.execute(new AsyncAccept<T>(t, fn, dst));
2320 else
2321 fn.accept(t);
2322 } catch (Throwable rex) {
2323 ex = rex;
2324 }
2325 }
2326 if (e == null || ex != null)
2327 dst.internalComplete(null, ex);
2328 }
2329 helpPostComplete();
2330 other.helpPostComplete();
2331 return dst;
2332 }
2333
2334 /**
2335 * Creates and returns a CompletableFuture that is completed
2336 * after this or the other given CompletableFuture complete. If
2337 * this and/or the other CompletableFuture complete exceptionally,
2338 * then the returned CompletableFuture may also do so, with a
2339 * CompletionException holding one of these exceptions as its cause.
2340 * No guarantees are made about which exception is used in the
2341 * returned CompletableFuture.
2342 *
2343 * @param other the other CompletableFuture
2344 * @param action the action to perform before completing the
2345 * returned CompletableFuture
2346 * @return the new CompletableFuture
2347 */
2348 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2349 Runnable action) {
2350 return doOrRun(other, action, null);
2351 }
2352
2353 /**
2354 * Creates and returns a CompletableFuture that is completed
2355 * asynchronously using the {@link ForkJoinPool#commonPool()}
2356 * after this or the other given CompletableFuture complete. If
2357 * this and/or the other CompletableFuture complete exceptionally,
2358 * then the returned CompletableFuture may also do so, with a
2359 * CompletionException holding one of these exceptions as its cause.
2360 * No guarantees are made about which exception is used in the
2361 * returned CompletableFuture.
2362 *
2363 * @param other the other CompletableFuture
2364 * @param action the action to perform before completing the
2365 * returned CompletableFuture
2366 * @return the new CompletableFuture
2367 */
2368 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
2369 Runnable action) {
2370 return doOrRun(other, action, ForkJoinPool.commonPool());
2371 }
2372
2373 /**
2374 * Creates and returns a CompletableFuture that is completed
2375 * asynchronously using the given executor after this or the other
2376 * given CompletableFuture complete. If this and/or the other
2377 * CompletableFuture complete exceptionally, then the returned
2378 * CompletableFuture may also do so, with a CompletionException
2379 * holding one of these exceptions as its cause. No guarantees are
2380 * made about which exception is used in the returned
2381 * CompletableFuture.
2382 *
2383 * @param other the other CompletableFuture
2384 * @param action the action to perform before completing the
2385 * returned CompletableFuture
2386 * @param executor the executor to use for asynchronous execution
2387 * @return the new CompletableFuture
2388 */
2389 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
2390 Runnable action,
2391 Executor executor) {
2392 if (executor == null) throw new NullPointerException();
2393 return doOrRun(other, action, executor);
2394 }
2395
2396 private CompletableFuture<Void> doOrRun(CompletableFuture<?> other,
2397 Runnable action,
2398 Executor e) {
2399 if (other == null || action == null) throw new NullPointerException();
2400 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2401 OrRunCompletion<T> d = null;
2402 Object r;
2403 if ((r = result) == null && (r = other.result) == null) {
2404 d = new OrRunCompletion<T>(this, other, action, dst, e);
2405 CompletionNode q = null, p = new CompletionNode(d);
2406 while ((r = result) == null && (r = other.result) == null) {
2407 if (q != null) {
2408 if (UNSAFE.compareAndSwapObject
2409 (other, COMPLETIONS, q.next = other.completions, q))
2410 break;
2411 }
2412 else if (UNSAFE.compareAndSwapObject
2413 (this, COMPLETIONS, p.next = completions, p))
2414 q = new CompletionNode(d);
2415 }
2416 }
2417 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2418 Throwable ex;
2419 if (r instanceof AltResult)
2420 ex = ((AltResult)r).ex;
2421 else
2422 ex = null;
2423 if (ex == null) {
2424 try {
2425 if (e != null)
2426 e.execute(new AsyncRun(action, dst));
2427 else
2428 action.run();
2429 } catch (Throwable rex) {
2430 ex = rex;
2431 }
2432 }
2433 if (e == null || ex != null)
2434 dst.internalComplete(null, ex);
2435 }
2436 helpPostComplete();
2437 other.helpPostComplete();
2438 return dst;
2439 }
2440
2441 /**
2442 * Returns a CompletableFuture (or an equivalent one) produced by
2443 * the given function of the result of this CompletableFuture when
2444 * completed. If this CompletableFuture completes exceptionally,
2445 * then the returned CompletableFuture also does so, with a
2446 * CompletionException holding this exception as its cause.
2447 *
2448 * @param fn the function returning a new CompletableFuture
2449 * @return the CompletableFuture, that {@code isDone()} upon
2450 * return if completed by the given function, or an exception
2451 * occurs
2452 */
2453 public <U> CompletableFuture<U> thenCompose(Fun<? super T,
2454 CompletableFuture<U>> fn) {
2455 if (fn == null) throw new NullPointerException();
2456 CompletableFuture<U> dst = null;
2457 ComposeCompletion<T,U> d = null;
2458 Object r;
2459 if ((r = result) == null) {
2460 dst = new CompletableFuture<U>();
2461 CompletionNode p = new CompletionNode
2462 (d = new ComposeCompletion<T,U>(this, fn, dst));
2463 while ((r = result) == null) {
2464 if (UNSAFE.compareAndSwapObject
2465 (this, COMPLETIONS, p.next = completions, p))
2466 break;
2467 }
2468 }
2469 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2470 T t; Throwable ex;
2471 if (r instanceof AltResult) {
2472 ex = ((AltResult)r).ex;
2473 t = null;
2474 }
2475 else {
2476 ex = null;
2477 @SuppressWarnings("unchecked") T tr = (T) r;
2478 t = tr;
2479 }
2480 if (ex == null) {
2481 try {
2482 dst = fn.apply(t);
2483 } catch (Throwable rex) {
2484 ex = rex;
2485 }
2486 }
2487 if (dst == null) {
2488 dst = new CompletableFuture<U>();
2489 if (ex == null)
2490 ex = new NullPointerException();
2491 }
2492 if (ex != null)
2493 dst.internalComplete(null, ex);
2494 }
2495 helpPostComplete();
2496 dst.helpPostComplete();
2497 return dst;
2498 }
2499
2500 /**
2501 * Creates and returns a CompletableFuture that is completed with
2502 * the result of the given function of the exception triggering
2503 * this CompletableFuture's completion when it completes
2504 * exceptionally; Otherwise, if this CompletableFuture completes
2505 * normally, then the returned CompletableFuture also completes
2506 * normally with the same value.
2507 *
2508 * @param fn the function to use to compute the value of the
2509 * returned CompletableFuture if this CompletableFuture completed
2510 * exceptionally
2511 * @return the new CompletableFuture
2512 */
2513 public CompletableFuture<T> exceptionally(Fun<Throwable, ? extends T> fn) {
2514 if (fn == null) throw new NullPointerException();
2515 CompletableFuture<T> dst = new CompletableFuture<T>();
2516 ExceptionCompletion<T> d = null;
2517 Object r;
2518 if ((r = result) == null) {
2519 CompletionNode p =
2520 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2521 while ((r = result) == null) {
2522 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2523 p.next = completions, p))
2524 break;
2525 }
2526 }
2527 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2528 T t = null; Throwable ex, dx = null;
2529 if (r instanceof AltResult) {
2530 if ((ex = ((AltResult)r).ex) != null) {
2531 try {
2532 t = fn.apply(ex);
2533 } catch (Throwable rex) {
2534 dx = rex;
2535 }
2536 }
2537 }
2538 else {
2539 @SuppressWarnings("unchecked") T tr = (T) r;
2540 t = tr;
2541 }
2542 dst.internalComplete(t, dx);
2543 }
2544 helpPostComplete();
2545 return dst;
2546 }
2547
2548 /**
2549 * Creates and returns a CompletableFuture that is completed with
2550 * the result of the given function of the result and exception of
2551 * this CompletableFuture's completion when it completes. The
2552 * given function is invoked with the result (or {@code null} if
2553 * none) and the exception (or {@code null} if none) of this
2554 * CompletableFuture when complete.
2555 *
2556 * @param fn the function to use to compute the value of the
2557 * returned CompletableFuture
2558 * @return the new CompletableFuture
2559 */
2560 public <U> CompletableFuture<U> handle(BiFun<? super T, Throwable, ? extends U> fn) {
2561 if (fn == null) throw new NullPointerException();
2562 CompletableFuture<U> dst = new CompletableFuture<U>();
2563 HandleCompletion<T,U> d = null;
2564 Object r;
2565 if ((r = result) == null) {
2566 CompletionNode p =
2567 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2568 while ((r = result) == null) {
2569 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2570 p.next = completions, p))
2571 break;
2572 }
2573 }
2574 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2575 T t; Throwable ex;
2576 if (r instanceof AltResult) {
2577 ex = ((AltResult)r).ex;
2578 t = null;
2579 }
2580 else {
2581 ex = null;
2582 @SuppressWarnings("unchecked") T tr = (T) r;
2583 t = tr;
2584 }
2585 U u; Throwable dx;
2586 try {
2587 u = fn.apply(t, ex);
2588 dx = null;
2589 } catch (Throwable rex) {
2590 dx = rex;
2591 u = null;
2592 }
2593 dst.internalComplete(u, dx);
2594 }
2595 helpPostComplete();
2596 return dst;
2597 }
2598
2599 /**
2600 * Attempts to complete this CompletableFuture with
2601 * a {@link CancellationException}.
2602 *
2603 * @param mayInterruptIfRunning this value has no effect in this
2604 * implementation because interrupts are not used to control
2605 * processing.
2606 *
2607 * @return {@code true} if this task is now cancelled
2608 */
2609 public boolean cancel(boolean mayInterruptIfRunning) {
2610 Object r;
2611 while ((r = result) == null) {
2612 r = new AltResult(new CancellationException());
2613 if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
2614 postComplete();
2615 return true;
2616 }
2617 }
2618 return ((r instanceof AltResult) &&
2619 (((AltResult)r).ex instanceof CancellationException));
2620 }
2621
2622 /**
2623 * Returns {@code true} if this CompletableFuture was cancelled
2624 * before it completed normally.
2625 *
2626 * @return {@code true} if this CompletableFuture was cancelled
2627 * before it completed normally
2628 */
2629 public boolean isCancelled() {
2630 Object r;
2631 return ((r = result) instanceof AltResult) &&
2632 (((AltResult)r).ex instanceof CancellationException);
2633 }
2634
2635 /**
2636 * Forcibly sets or resets the value subsequently returned by
2637 * method {@link #get()} and related methods, whether or not
2638 * already completed. This method is designed for use only in
2639 * error recovery actions, and even in such situations may result
2640 * in ongoing dependent completions using established versus
2641 * overwritten outcomes.
2642 *
2643 * @param value the completion value
2644 */
2645 public void obtrudeValue(T value) {
2646 result = (value == null) ? NIL : value;
2647 postComplete();
2648 }
2649
2650 /**
2651 * Forcibly causes subsequent invocations of method {@link #get()}
2652 * and related methods to throw the given exception, whether or
2653 * not already completed. This method is designed for use only in
2654 * recovery actions, and even in such situations may result in
2655 * ongoing dependent completions using established versus
2656 * overwritten outcomes.
2657 *
2658 * @param ex the exception
2659 */
2660 public void obtrudeException(Throwable ex) {
2661 if (ex == null) throw new NullPointerException();
2662 result = new AltResult(ex);
2663 postComplete();
2664 }
2665
2666 // Unsafe mechanics
2667 private static final sun.misc.Unsafe UNSAFE;
2668 private static final long RESULT;
2669 private static final long WAITERS;
2670 private static final long COMPLETIONS;
2671 static {
2672 try {
2673 UNSAFE = getUnsafe();
2674 Class<?> k = CompletableFuture.class;
2675 RESULT = UNSAFE.objectFieldOffset
2676 (k.getDeclaredField("result"));
2677 WAITERS = UNSAFE.objectFieldOffset
2678 (k.getDeclaredField("waiters"));
2679 COMPLETIONS = UNSAFE.objectFieldOffset
2680 (k.getDeclaredField("completions"));
2681 } catch (Exception e) {
2682 throw new Error(e);
2683 }
2684 }
2685
2686
2687 /**
2688 * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
2689 * Replace with a simple call to Unsafe.getUnsafe when integrating
2690 * into a jdk.
2691 *
2692 * @return a sun.misc.Unsafe
2693 */
2694 private static sun.misc.Unsafe getUnsafe() {
2695 try {
2696 return sun.misc.Unsafe.getUnsafe();
2697 } catch (SecurityException tryReflectionInstead) {}
2698 try {
2699 return java.security.AccessController.doPrivileged
2700 (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
2701 public sun.misc.Unsafe run() throws Exception {
2702 Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
2703 for (java.lang.reflect.Field f : k.getDeclaredFields()) {
2704 f.setAccessible(true);
2705 Object x = f.get(null);
2706 if (k.isInstance(x))
2707 return k.cast(x);
2708 }
2709 throw new NoSuchFieldError("the Unsafe");
2710 }});
2711 } catch (java.security.PrivilegedActionException e) {
2712 throw new RuntimeException("Could not initialize intrinsics",
2713 e.getCause());
2714 }
2715 }
2716 }