ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.33
Committed: Fri Jan 18 04:23:28 2013 UTC (11 years, 4 months ago) by jsr166
Branch: MAIN
Changes since 1.32: +2 -2 lines
Log Message:
use blessed modifier order

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Block;
10 import java.util.function.BiBlock;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.locks.LockSupport;
24
25 /**
26 * A {@link Future} that may be explicitly completed (setting its
27 * value and status), and may include dependent functions and actions
28 * that trigger upon its completion. Methods are available for adding
29 * those based on Functions, Blocks, and Runnables, depending on
30 * whether they require arguments and/or produce results, as well as
31 * those triggered after either or both the current and another
32 * CompletableFuture complete. Functions and actions supplied for
33 * dependent completions (mainly using methods with prefix {@code
34 * then}) may be performed by the thread that completes the current
35 * CompletableFuture, or by any other caller of these methods. There
36 * are no guarantees about the order of processing completions unless
37 * constrained by these methods.
38 *
39 * <p>When two or more threads attempt to {@link #complete} or {@link
40 * #completeExceptionally} a CompletableFuture, only one of them
41 * succeeds.
42 *
43 * <p>Upon exceptional completion, or when a completion entails
44 * computation of a function or action, and it terminates abruptly
45 * with an (unchecked) exception or error, then further completions
46 * act as {@code completeExceptionally} with a {@link
47 * CompletionException} holding that exception as its cause. If a
48 * CompletableFuture completes exceptionally, and is not followed by a
49 * {@link #exceptionally} or {@link #handle} completion, then all of
50 * its dependents (and their dependents) also complete exceptionally
51 * with CompletionExceptions holding the ultimate cause. In case of a
52 * CompletionException, methods {@link #get()} and {@link #get(long,
53 * TimeUnit)} throw an {@link ExecutionException} with the same cause
54 * as would be held in the corresponding CompletionException. However,
55 * in these cases, methods {@link #join()} and {@link #getNow} throw
56 * the CompletionException, which simplifies usage especially within
57 * other completion functions.
58 *
59 * <p>CompletableFutures themselves do not execute asynchronously.
60 * However, the {@code async} methods provide commonly useful ways to
61 * commence asynchronous processing, using either a given {@link
62 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
63 * function or action that will result in the completion of a new
64 * CompletableFuture. To simplify monitoring, debugging, and tracking,
65 * all generated asynchronous tasks are instances of the tagging
66 * interface {@link AsynchronousCompletionTask}.
67 *
68 * @author Doug Lea
69 * @since 1.8
70 */
71 public class CompletableFuture<T> implements Future<T> {
72
73 /*
74 * Overview:
75 *
76 * 1. Non-nullness of field result (set via CAS) indicates done.
77 * An AltResult is used to box null as a result, as well as to
78 * hold exceptions. Using a single field makes completion fast
79 * and simple to detect and trigger, at the expense of a lot of
80 * encoding and decoding that infiltrates many methods. One minor
81 * simplification relies on the (static) NIL (to box null results)
82 * being the only AltResult with a null exception field, so we
83 * don't usually need explicit comparisons with NIL. The CF
84 * exception propagation mechanics surrounding decoding rely on
85 * unchecked casts of decoded results really being unchecked,
86 * where user type errors are caught at point of use, as is
87 * currently the case in Java. These are highlighted by using
88 * SuppressWarnings-annotated temporaries.
89 *
90 * 2. Waiters are held in a Treiber stack similar to the one used
91 * in FutureTask, Phaser, and SynchronousQueue. See their
92 * internal documentation for algorithmic details.
93 *
94 * 3. Completions are also kept in a list/stack, and pulled off
95 * and run when completion is triggered. (We could even use the
96 * same stack as for waiters, but would give up the potential
97 * parallelism obtained because woken waiters help release/run
98 * others -- see method postComplete). Because post-processing
99 * may race with direct calls, class Completion opportunistically
100 * extends AtomicInteger so callers can claim the action via
101 * compareAndSet(0, 1). The Completion.run methods are all
102 * written a boringly similar uniform way (that sometimes includes
103 * unnecessary-looking checks, kept to maintain uniformity). There
104 * are enough dimensions upon which they differ that factoring to
105 * use common code isn't worthwhile.
106 *
107 * 4. The exported then/and/or methods do support a bit of
108 * factoring (see doThenApply etc). They must cope with the
109 * intrinsic races surrounding addition of a dependent action
110 * versus performing the action directly because the task is
111 * already complete. For example, a CF may not be complete upon
112 * entry, so a dependent completion is added, but by the time it
113 * is added, the target CF is complete, so must be directly
114 * executed. This is all done while avoiding unnecessary object
115 * construction in safe-bypass cases.
116 */
117
118 // preliminaries
119
120 static final class AltResult {
121 final Throwable ex; // null only for NIL
122 AltResult(Throwable ex) { this.ex = ex; }
123 }
124
125 static final AltResult NIL = new AltResult(null);
126
127 // Fields
128
129 volatile Object result; // Either the result or boxed AltResult
130 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
131 volatile CompletionNode completions; // list (Treiber stack) of completions
132
133 // Basic utilities for triggering and processing completions
134
135 /**
136 * Removes and signals all waiting threads and runs all completions.
137 */
138 final void postComplete() {
139 WaitNode q; Thread t;
140 while ((q = waiters) != null) {
141 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
142 (t = q.thread) != null) {
143 q.thread = null;
144 LockSupport.unpark(t);
145 }
146 }
147
148 CompletionNode h; Completion c;
149 while ((h = completions) != null) {
150 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
151 (c = h.completion) != null)
152 c.run();
153 }
154 }
155
156 /**
157 * Triggers completion with the encoding of the given arguments:
158 * if the exception is non-null, encodes it as a wrapped
159 * CompletionException unless it is one already. Otherwise uses
160 * the given result, boxed as NIL if null.
161 */
162 final void internalComplete(Object v, Throwable ex) {
163 if (result == null)
164 UNSAFE.compareAndSwapObject
165 (this, RESULT, null,
166 (ex == null) ? (v == null) ? NIL : v :
167 new AltResult((ex instanceof CompletionException) ? ex :
168 new CompletionException(ex)));
169 postComplete(); // help out even if not triggered
170 }
171
172 /**
173 * If triggered, helps release and/or process completions.
174 */
175 final void helpPostComplete() {
176 if (result != null)
177 postComplete();
178 }
179
180 /* ------------- waiting for completions -------------- */
181
182 /**
183 * Heuristic spin value for waitingGet() before blocking on
184 * multiprocessors
185 */
186 static final int WAITING_GET_SPINS = 256;
187
188 /**
189 * Linked nodes to record waiting threads in a Treiber stack. See
190 * other classes such as Phaser and SynchronousQueue for more
191 * detailed explanation. This class implements ManagedBlocker to
192 * avoid starvation when blocking actions pile up in
193 * ForkJoinPools.
194 */
195 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
196 long nanos; // wait time if timed
197 final long deadline; // non-zero if timed
198 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
199 volatile Thread thread;
200 volatile WaitNode next;
201 WaitNode(boolean interruptible, long nanos, long deadline) {
202 this.thread = Thread.currentThread();
203 this.interruptControl = interruptible ? 1 : 0;
204 this.nanos = nanos;
205 this.deadline = deadline;
206 }
207 public boolean isReleasable() {
208 if (thread == null)
209 return true;
210 if (Thread.interrupted()) {
211 int i = interruptControl;
212 interruptControl = -1;
213 if (i > 0)
214 return true;
215 }
216 if (deadline != 0L &&
217 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
218 thread = null;
219 return true;
220 }
221 return false;
222 }
223 public boolean block() {
224 if (isReleasable())
225 return true;
226 else if (deadline == 0L)
227 LockSupport.park(this);
228 else if (nanos > 0L)
229 LockSupport.parkNanos(this, nanos);
230 return isReleasable();
231 }
232 }
233
234 /**
235 * Returns raw result after waiting, or null if interruptible and
236 * interrupted.
237 */
238 private Object waitingGet(boolean interruptible) {
239 WaitNode q = null;
240 boolean queued = false;
241 int h = 0, spins = 0;
242 for (Object r;;) {
243 if ((r = result) != null) {
244 if (q != null) { // suppress unpark
245 q.thread = null;
246 if (q.interruptControl < 0) {
247 if (interruptible) {
248 removeWaiter(q);
249 return null;
250 }
251 Thread.currentThread().interrupt();
252 }
253 }
254 postComplete(); // help release others
255 return r;
256 }
257 else if (h == 0) {
258 h = ThreadLocalRandom.current().nextInt();
259 if (Runtime.getRuntime().availableProcessors() > 1)
260 spins = WAITING_GET_SPINS;
261 }
262 else if (spins > 0) {
263 h ^= h << 1; // xorshift
264 h ^= h >>> 3;
265 if ((h ^= h << 10) >= 0)
266 --spins;
267 }
268 else if (q == null)
269 q = new WaitNode(interruptible, 0L, 0L);
270 else if (!queued)
271 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
272 q.next = waiters, q);
273 else if (interruptible && q.interruptControl < 0) {
274 removeWaiter(q);
275 return null;
276 }
277 else if (q.thread != null && result == null) {
278 try {
279 ForkJoinPool.managedBlock(q);
280 } catch (InterruptedException ex) {
281 q.interruptControl = -1;
282 }
283 }
284 }
285 }
286
287 /**
288 * Awaits completion or aborts on interrupt or timeout.
289 *
290 * @param nanos time to wait
291 * @return raw result
292 */
293 private Object timedAwaitDone(long nanos)
294 throws InterruptedException, TimeoutException {
295 WaitNode q = null;
296 boolean queued = false;
297 for (Object r;;) {
298 if ((r = result) != null) {
299 if (q != null) {
300 q.thread = null;
301 if (q.interruptControl < 0) {
302 removeWaiter(q);
303 throw new InterruptedException();
304 }
305 }
306 postComplete();
307 return r;
308 }
309 else if (q == null) {
310 if (nanos <= 0L)
311 throw new TimeoutException();
312 long d = System.nanoTime() + nanos;
313 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
314 }
315 else if (!queued)
316 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
317 q.next = waiters, q);
318 else if (q.interruptControl < 0) {
319 removeWaiter(q);
320 throw new InterruptedException();
321 }
322 else if (q.nanos <= 0L) {
323 if (result == null) {
324 removeWaiter(q);
325 throw new TimeoutException();
326 }
327 }
328 else if (q.thread != null && result == null) {
329 try {
330 ForkJoinPool.managedBlock(q);
331 } catch (InterruptedException ex) {
332 q.interruptControl = -1;
333 }
334 }
335 }
336 }
337
338 /**
339 * Tries to unlink a timed-out or interrupted wait node to avoid
340 * accumulating garbage. Internal nodes are simply unspliced
341 * without CAS since it is harmless if they are traversed anyway
342 * by releasers. To avoid effects of unsplicing from already
343 * removed nodes, the list is retraversed in case of an apparent
344 * race. This is slow when there are a lot of nodes, but we don't
345 * expect lists to be long enough to outweigh higher-overhead
346 * schemes.
347 */
348 private void removeWaiter(WaitNode node) {
349 if (node != null) {
350 node.thread = null;
351 retry:
352 for (;;) { // restart on removeWaiter race
353 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
354 s = q.next;
355 if (q.thread != null)
356 pred = q;
357 else if (pred != null) {
358 pred.next = s;
359 if (pred.thread == null) // check for race
360 continue retry;
361 }
362 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
363 continue retry;
364 }
365 break;
366 }
367 }
368 }
369
370 /* ------------- Async tasks -------------- */
371
372 /**
373 * A tagging interface identifying asynchronous tasks produced by
374 * {@code async} methods. This may be useful for monitoring,
375 * debugging, and tracking asynchronous activities.
376 */
377 public static interface AsynchronousCompletionTask {
378 }
379
380 /** Base class can act as either FJ or plain Runnable */
381 abstract static class Async extends ForkJoinTask<Void>
382 implements Runnable, AsynchronousCompletionTask {
383 public final Void getRawResult() { return null; }
384 public final void setRawResult(Void v) { }
385 public final void run() { exec(); }
386 }
387
388 static final class AsyncRun extends Async {
389 final Runnable fn;
390 final CompletableFuture<Void> dst;
391 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
392 this.fn = fn; this.dst = dst;
393 }
394 public final boolean exec() {
395 CompletableFuture<Void> d; Throwable ex;
396 if ((d = this.dst) != null && d.result == null) {
397 try {
398 fn.run();
399 ex = null;
400 } catch (Throwable rex) {
401 ex = rex;
402 }
403 d.internalComplete(null, ex);
404 }
405 return true;
406 }
407 private static final long serialVersionUID = 5232453952276885070L;
408 }
409
410 static final class AsyncSupply<U> extends Async {
411 final Supplier<U> fn;
412 final CompletableFuture<U> dst;
413 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
414 this.fn = fn; this.dst = dst;
415 }
416 public final boolean exec() {
417 CompletableFuture<U> d; U u; Throwable ex;
418 if ((d = this.dst) != null && d.result == null) {
419 try {
420 u = fn.get();
421 ex = null;
422 } catch (Throwable rex) {
423 ex = rex;
424 u = null;
425 }
426 d.internalComplete(u, ex);
427 }
428 return true;
429 }
430 private static final long serialVersionUID = 5232453952276885070L;
431 }
432
433 static final class AsyncApply<T,U> extends Async {
434 final Function<? super T,? extends U> fn;
435 final T arg;
436 final CompletableFuture<U> dst;
437 AsyncApply(T arg, Function<? super T,? extends U> fn,
438 CompletableFuture<U> dst) {
439 this.arg = arg; this.fn = fn; this.dst = dst;
440 }
441 public final boolean exec() {
442 CompletableFuture<U> d; U u; Throwable ex;
443 if ((d = this.dst) != null && d.result == null) {
444 try {
445 u = fn.apply(arg);
446 ex = null;
447 } catch (Throwable rex) {
448 ex = rex;
449 u = null;
450 }
451 d.internalComplete(u, ex);
452 }
453 return true;
454 }
455 private static final long serialVersionUID = 5232453952276885070L;
456 }
457
458 static final class AsyncBiApply<T,U,V> extends Async {
459 final BiFunction<? super T,? super U,? extends V> fn;
460 final T arg1;
461 final U arg2;
462 final CompletableFuture<V> dst;
463 AsyncBiApply(T arg1, U arg2,
464 BiFunction<? super T,? super U,? extends V> fn,
465 CompletableFuture<V> dst) {
466 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
467 }
468 public final boolean exec() {
469 CompletableFuture<V> d; V v; Throwable ex;
470 if ((d = this.dst) != null && d.result == null) {
471 try {
472 v = fn.apply(arg1, arg2);
473 ex = null;
474 } catch (Throwable rex) {
475 ex = rex;
476 v = null;
477 }
478 d.internalComplete(v, ex);
479 }
480 return true;
481 }
482 private static final long serialVersionUID = 5232453952276885070L;
483 }
484
485 static final class AsyncAccept<T> extends Async {
486 final Block<? super T> fn;
487 final T arg;
488 final CompletableFuture<Void> dst;
489 AsyncAccept(T arg, Block<? super T> fn,
490 CompletableFuture<Void> dst) {
491 this.arg = arg; this.fn = fn; this.dst = dst;
492 }
493 public final boolean exec() {
494 CompletableFuture<Void> d; Throwable ex;
495 if ((d = this.dst) != null && d.result == null) {
496 try {
497 fn.accept(arg);
498 ex = null;
499 } catch (Throwable rex) {
500 ex = rex;
501 }
502 d.internalComplete(null, ex);
503 }
504 return true;
505 }
506 private static final long serialVersionUID = 5232453952276885070L;
507 }
508
509 static final class AsyncBiAccept<T,U> extends Async {
510 final BiBlock<? super T,? super U> fn;
511 final T arg1;
512 final U arg2;
513 final CompletableFuture<Void> dst;
514 AsyncBiAccept(T arg1, U arg2,
515 BiBlock<? super T,? super U> fn,
516 CompletableFuture<Void> dst) {
517 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
518 }
519 public final boolean exec() {
520 CompletableFuture<Void> d; Throwable ex;
521 if ((d = this.dst) != null && d.result == null) {
522 try {
523 fn.accept(arg1, arg2);
524 ex = null;
525 } catch (Throwable rex) {
526 ex = rex;
527 }
528 d.internalComplete(null, ex);
529 }
530 return true;
531 }
532 private static final long serialVersionUID = 5232453952276885070L;
533 }
534
535 /* ------------- Completions -------------- */
536
537 /**
538 * Simple linked list nodes to record completions, used in
539 * basically the same way as WaitNodes. (We separate nodes from
540 * the Completions themselves mainly because for the And and Or
541 * methods, the same Completion object resides in two lists.)
542 */
543 static final class CompletionNode {
544 final Completion completion;
545 volatile CompletionNode next;
546 CompletionNode(Completion completion) { this.completion = completion; }
547 }
548
549 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
550 abstract static class Completion extends AtomicInteger implements Runnable {
551 }
552
553 static final class ApplyCompletion<T,U> extends Completion {
554 final CompletableFuture<? extends T> src;
555 final Function<? super T,? extends U> fn;
556 final CompletableFuture<U> dst;
557 final Executor executor;
558 ApplyCompletion(CompletableFuture<? extends T> src,
559 Function<? super T,? extends U> fn,
560 CompletableFuture<U> dst, Executor executor) {
561 this.src = src; this.fn = fn; this.dst = dst;
562 this.executor = executor;
563 }
564 public final void run() {
565 final CompletableFuture<? extends T> a;
566 final Function<? super T,? extends U> fn;
567 final CompletableFuture<U> dst;
568 Object r; T t; Throwable ex;
569 if ((dst = this.dst) != null &&
570 (fn = this.fn) != null &&
571 (a = this.src) != null &&
572 (r = a.result) != null &&
573 compareAndSet(0, 1)) {
574 if (r instanceof AltResult) {
575 ex = ((AltResult)r).ex;
576 t = null;
577 }
578 else {
579 ex = null;
580 @SuppressWarnings("unchecked") T tr = (T) r;
581 t = tr;
582 }
583 Executor e = executor;
584 U u = null;
585 if (ex == null) {
586 try {
587 if (e != null)
588 e.execute(new AsyncApply<T,U>(t, fn, dst));
589 else
590 u = fn.apply(t);
591 } catch (Throwable rex) {
592 ex = rex;
593 }
594 }
595 if (e == null || ex != null)
596 dst.internalComplete(u, ex);
597 }
598 }
599 private static final long serialVersionUID = 5232453952276885070L;
600 }
601
602 static final class AcceptCompletion<T> extends Completion {
603 final CompletableFuture<? extends T> src;
604 final Block<? super T> fn;
605 final CompletableFuture<Void> dst;
606 final Executor executor;
607 AcceptCompletion(CompletableFuture<? extends T> src,
608 Block<? super T> fn,
609 CompletableFuture<Void> dst, Executor executor) {
610 this.src = src; this.fn = fn; this.dst = dst;
611 this.executor = executor;
612 }
613 public final void run() {
614 final CompletableFuture<? extends T> a;
615 final Block<? super T> fn;
616 final CompletableFuture<Void> dst;
617 Object r; T t; Throwable ex;
618 if ((dst = this.dst) != null &&
619 (fn = this.fn) != null &&
620 (a = this.src) != null &&
621 (r = a.result) != null &&
622 compareAndSet(0, 1)) {
623 if (r instanceof AltResult) {
624 ex = ((AltResult)r).ex;
625 t = null;
626 }
627 else {
628 ex = null;
629 @SuppressWarnings("unchecked") T tr = (T) r;
630 t = tr;
631 }
632 Executor e = executor;
633 if (ex == null) {
634 try {
635 if (e != null)
636 e.execute(new AsyncAccept<T>(t, fn, dst));
637 else
638 fn.accept(t);
639 } catch (Throwable rex) {
640 ex = rex;
641 }
642 }
643 if (e == null || ex != null)
644 dst.internalComplete(null, ex);
645 }
646 }
647 private static final long serialVersionUID = 5232453952276885070L;
648 }
649
650 static final class RunCompletion<T> extends Completion {
651 final CompletableFuture<? extends T> src;
652 final Runnable fn;
653 final CompletableFuture<Void> dst;
654 final Executor executor;
655 RunCompletion(CompletableFuture<? extends T> src,
656 Runnable fn,
657 CompletableFuture<Void> dst,
658 Executor executor) {
659 this.src = src; this.fn = fn; this.dst = dst;
660 this.executor = executor;
661 }
662 public final void run() {
663 final CompletableFuture<? extends T> a;
664 final Runnable fn;
665 final CompletableFuture<Void> dst;
666 Object r; Throwable ex;
667 if ((dst = this.dst) != null &&
668 (fn = this.fn) != null &&
669 (a = this.src) != null &&
670 (r = a.result) != null &&
671 compareAndSet(0, 1)) {
672 if (r instanceof AltResult)
673 ex = ((AltResult)r).ex;
674 else
675 ex = null;
676 Executor e = executor;
677 if (ex == null) {
678 try {
679 if (e != null)
680 e.execute(new AsyncRun(fn, dst));
681 else
682 fn.run();
683 } catch (Throwable rex) {
684 ex = rex;
685 }
686 }
687 if (e == null || ex != null)
688 dst.internalComplete(null, ex);
689 }
690 }
691 private static final long serialVersionUID = 5232453952276885070L;
692 }
693
694 static final class BiApplyCompletion<T,U,V> extends Completion {
695 final CompletableFuture<? extends T> src;
696 final CompletableFuture<? extends U> snd;
697 final BiFunction<? super T,? super U,? extends V> fn;
698 final CompletableFuture<V> dst;
699 final Executor executor;
700 BiApplyCompletion(CompletableFuture<? extends T> src,
701 CompletableFuture<? extends U> snd,
702 BiFunction<? super T,? super U,? extends V> fn,
703 CompletableFuture<V> dst, Executor executor) {
704 this.src = src; this.snd = snd;
705 this.fn = fn; this.dst = dst;
706 this.executor = executor;
707 }
708 public final void run() {
709 final CompletableFuture<? extends T> a;
710 final CompletableFuture<? extends U> b;
711 final BiFunction<? super T,? super U,? extends V> fn;
712 final CompletableFuture<V> dst;
713 Object r, s; T t; U u; Throwable ex;
714 if ((dst = this.dst) != null &&
715 (fn = this.fn) != null &&
716 (a = this.src) != null &&
717 (r = a.result) != null &&
718 (b = this.snd) != null &&
719 (s = b.result) != null &&
720 compareAndSet(0, 1)) {
721 if (r instanceof AltResult) {
722 ex = ((AltResult)r).ex;
723 t = null;
724 }
725 else {
726 ex = null;
727 @SuppressWarnings("unchecked") T tr = (T) r;
728 t = tr;
729 }
730 if (ex != null)
731 u = null;
732 else if (s instanceof AltResult) {
733 ex = ((AltResult)s).ex;
734 u = null;
735 }
736 else {
737 @SuppressWarnings("unchecked") U us = (U) s;
738 u = us;
739 }
740 Executor e = executor;
741 V v = null;
742 if (ex == null) {
743 try {
744 if (e != null)
745 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
746 else
747 v = fn.apply(t, u);
748 } catch (Throwable rex) {
749 ex = rex;
750 }
751 }
752 if (e == null || ex != null)
753 dst.internalComplete(v, ex);
754 }
755 }
756 private static final long serialVersionUID = 5232453952276885070L;
757 }
758
759 static final class BiAcceptCompletion<T,U> extends Completion {
760 final CompletableFuture<? extends T> src;
761 final CompletableFuture<? extends U> snd;
762 final BiBlock<? super T,? super U> fn;
763 final CompletableFuture<Void> dst;
764 final Executor executor;
765 BiAcceptCompletion(CompletableFuture<? extends T> src,
766 CompletableFuture<? extends U> snd,
767 BiBlock<? super T,? super U> fn,
768 CompletableFuture<Void> dst, Executor executor) {
769 this.src = src; this.snd = snd;
770 this.fn = fn; this.dst = dst;
771 this.executor = executor;
772 }
773 public final void run() {
774 final CompletableFuture<? extends T> a;
775 final CompletableFuture<? extends U> b;
776 final BiBlock<? super T,? super U> fn;
777 final CompletableFuture<Void> dst;
778 Object r, s; T t; U u; Throwable ex;
779 if ((dst = this.dst) != null &&
780 (fn = this.fn) != null &&
781 (a = this.src) != null &&
782 (r = a.result) != null &&
783 (b = this.snd) != null &&
784 (s = b.result) != null &&
785 compareAndSet(0, 1)) {
786 if (r instanceof AltResult) {
787 ex = ((AltResult)r).ex;
788 t = null;
789 }
790 else {
791 ex = null;
792 @SuppressWarnings("unchecked") T tr = (T) r;
793 t = tr;
794 }
795 if (ex != null)
796 u = null;
797 else if (s instanceof AltResult) {
798 ex = ((AltResult)s).ex;
799 u = null;
800 }
801 else {
802 @SuppressWarnings("unchecked") U us = (U) s;
803 u = us;
804 }
805 Executor e = executor;
806 if (ex == null) {
807 try {
808 if (e != null)
809 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
810 else
811 fn.accept(t, u);
812 } catch (Throwable rex) {
813 ex = rex;
814 }
815 }
816 if (e == null || ex != null)
817 dst.internalComplete(null, ex);
818 }
819 }
820 private static final long serialVersionUID = 5232453952276885070L;
821 }
822
823 static final class BiRunCompletion<T> extends Completion {
824 final CompletableFuture<? extends T> src;
825 final CompletableFuture<?> snd;
826 final Runnable fn;
827 final CompletableFuture<Void> dst;
828 final Executor executor;
829 BiRunCompletion(CompletableFuture<? extends T> src,
830 CompletableFuture<?> snd,
831 Runnable fn,
832 CompletableFuture<Void> dst, Executor executor) {
833 this.src = src; this.snd = snd;
834 this.fn = fn; this.dst = dst;
835 this.executor = executor;
836 }
837 public final void run() {
838 final CompletableFuture<? extends T> a;
839 final CompletableFuture<?> b;
840 final Runnable fn;
841 final CompletableFuture<Void> dst;
842 Object r, s; Throwable ex;
843 if ((dst = this.dst) != null &&
844 (fn = this.fn) != null &&
845 (a = this.src) != null &&
846 (r = a.result) != null &&
847 (b = this.snd) != null &&
848 (s = b.result) != null &&
849 compareAndSet(0, 1)) {
850 if (r instanceof AltResult)
851 ex = ((AltResult)r).ex;
852 else
853 ex = null;
854 if (ex == null && (s instanceof AltResult))
855 ex = ((AltResult)s).ex;
856 Executor e = executor;
857 if (ex == null) {
858 try {
859 if (e != null)
860 e.execute(new AsyncRun(fn, dst));
861 else
862 fn.run();
863 } catch (Throwable rex) {
864 ex = rex;
865 }
866 }
867 if (e == null || ex != null)
868 dst.internalComplete(null, ex);
869 }
870 }
871 private static final long serialVersionUID = 5232453952276885070L;
872 }
873
874 static final class OrApplyCompletion<T,U> extends Completion {
875 final CompletableFuture<? extends T> src;
876 final CompletableFuture<? extends T> snd;
877 final Function<? super T,? extends U> fn;
878 final CompletableFuture<U> dst;
879 final Executor executor;
880 OrApplyCompletion(CompletableFuture<? extends T> src,
881 CompletableFuture<? extends T> snd,
882 Function<? super T,? extends U> fn,
883 CompletableFuture<U> dst, Executor executor) {
884 this.src = src; this.snd = snd;
885 this.fn = fn; this.dst = dst;
886 this.executor = executor;
887 }
888 public final void run() {
889 final CompletableFuture<? extends T> a;
890 final CompletableFuture<? extends T> b;
891 final Function<? super T,? extends U> fn;
892 final CompletableFuture<U> dst;
893 Object r; T t; Throwable ex;
894 if ((dst = this.dst) != null &&
895 (fn = this.fn) != null &&
896 (((a = this.src) != null && (r = a.result) != null) ||
897 ((b = this.snd) != null && (r = b.result) != null)) &&
898 compareAndSet(0, 1)) {
899 if (r instanceof AltResult) {
900 ex = ((AltResult)r).ex;
901 t = null;
902 }
903 else {
904 ex = null;
905 @SuppressWarnings("unchecked") T tr = (T) r;
906 t = tr;
907 }
908 Executor e = executor;
909 U u = null;
910 if (ex == null) {
911 try {
912 if (e != null)
913 e.execute(new AsyncApply<T,U>(t, fn, dst));
914 else
915 u = fn.apply(t);
916 } catch (Throwable rex) {
917 ex = rex;
918 }
919 }
920 if (e == null || ex != null)
921 dst.internalComplete(u, ex);
922 }
923 }
924 private static final long serialVersionUID = 5232453952276885070L;
925 }
926
927 static final class OrAcceptCompletion<T> extends Completion {
928 final CompletableFuture<? extends T> src;
929 final CompletableFuture<? extends T> snd;
930 final Block<? super T> fn;
931 final CompletableFuture<Void> dst;
932 final Executor executor;
933 OrAcceptCompletion(CompletableFuture<? extends T> src,
934 CompletableFuture<? extends T> snd,
935 Block<? super T> fn,
936 CompletableFuture<Void> dst, Executor executor) {
937 this.src = src; this.snd = snd;
938 this.fn = fn; this.dst = dst;
939 this.executor = executor;
940 }
941 public final void run() {
942 final CompletableFuture<? extends T> a;
943 final CompletableFuture<? extends T> b;
944 final Block<? super T> fn;
945 final CompletableFuture<Void> dst;
946 Object r; T t; Throwable ex;
947 if ((dst = this.dst) != null &&
948 (fn = this.fn) != null &&
949 (((a = this.src) != null && (r = a.result) != null) ||
950 ((b = this.snd) != null && (r = b.result) != null)) &&
951 compareAndSet(0, 1)) {
952 if (r instanceof AltResult) {
953 ex = ((AltResult)r).ex;
954 t = null;
955 }
956 else {
957 ex = null;
958 @SuppressWarnings("unchecked") T tr = (T) r;
959 t = tr;
960 }
961 Executor e = executor;
962 if (ex == null) {
963 try {
964 if (e != null)
965 e.execute(new AsyncAccept<T>(t, fn, dst));
966 else
967 fn.accept(t);
968 } catch (Throwable rex) {
969 ex = rex;
970 }
971 }
972 if (e == null || ex != null)
973 dst.internalComplete(null, ex);
974 }
975 }
976 private static final long serialVersionUID = 5232453952276885070L;
977 }
978
979 static final class OrRunCompletion<T> extends Completion {
980 final CompletableFuture<? extends T> src;
981 final CompletableFuture<?> snd;
982 final Runnable fn;
983 final CompletableFuture<Void> dst;
984 final Executor executor;
985 OrRunCompletion(CompletableFuture<? extends T> src,
986 CompletableFuture<?> snd,
987 Runnable fn,
988 CompletableFuture<Void> dst, Executor executor) {
989 this.src = src; this.snd = snd;
990 this.fn = fn; this.dst = dst;
991 this.executor = executor;
992 }
993 public final void run() {
994 final CompletableFuture<? extends T> a;
995 final CompletableFuture<?> b;
996 final Runnable fn;
997 final CompletableFuture<Void> dst;
998 Object r; Throwable ex;
999 if ((dst = this.dst) != null &&
1000 (fn = this.fn) != null &&
1001 (((a = this.src) != null && (r = a.result) != null) ||
1002 ((b = this.snd) != null && (r = b.result) != null)) &&
1003 compareAndSet(0, 1)) {
1004 if (r instanceof AltResult)
1005 ex = ((AltResult)r).ex;
1006 else
1007 ex = null;
1008 Executor e = executor;
1009 if (ex == null) {
1010 try {
1011 if (e != null)
1012 e.execute(new AsyncRun(fn, dst));
1013 else
1014 fn.run();
1015 } catch (Throwable rex) {
1016 ex = rex;
1017 }
1018 }
1019 if (e == null || ex != null)
1020 dst.internalComplete(null, ex);
1021 }
1022 }
1023 private static final long serialVersionUID = 5232453952276885070L;
1024 }
1025
1026 static final class ExceptionCompletion<T> extends Completion {
1027 final CompletableFuture<? extends T> src;
1028 final Function<? super Throwable, ? extends T> fn;
1029 final CompletableFuture<T> dst;
1030 ExceptionCompletion(CompletableFuture<? extends T> src,
1031 Function<? super Throwable, ? extends T> fn,
1032 CompletableFuture<T> dst) {
1033 this.src = src; this.fn = fn; this.dst = dst;
1034 }
1035 public final void run() {
1036 final CompletableFuture<? extends T> a;
1037 final Function<? super Throwable, ? extends T> fn;
1038 final CompletableFuture<T> dst;
1039 Object r; T t = null; Throwable ex, dx = null;
1040 if ((dst = this.dst) != null &&
1041 (fn = this.fn) != null &&
1042 (a = this.src) != null &&
1043 (r = a.result) != null &&
1044 compareAndSet(0, 1)) {
1045 if ((r instanceof AltResult) &&
1046 (ex = ((AltResult)r).ex) != null) {
1047 try {
1048 t = fn.apply(ex);
1049 } catch (Throwable rex) {
1050 dx = rex;
1051 }
1052 }
1053 else {
1054 @SuppressWarnings("unchecked") T tr = (T) r;
1055 t = tr;
1056 }
1057 dst.internalComplete(t, dx);
1058 }
1059 }
1060 private static final long serialVersionUID = 5232453952276885070L;
1061 }
1062
1063 static final class ThenCopy<T> extends Completion {
1064 final CompletableFuture<? extends T> src;
1065 final CompletableFuture<T> dst;
1066 ThenCopy(CompletableFuture<? extends T> src,
1067 CompletableFuture<T> dst) {
1068 this.src = src; this.dst = dst;
1069 }
1070 public final void run() {
1071 final CompletableFuture<? extends T> a;
1072 final CompletableFuture<T> dst;
1073 Object r; Object t; Throwable ex;
1074 if ((dst = this.dst) != null &&
1075 (a = this.src) != null &&
1076 (r = a.result) != null &&
1077 compareAndSet(0, 1)) {
1078 if (r instanceof AltResult) {
1079 ex = ((AltResult)r).ex;
1080 t = null;
1081 }
1082 else {
1083 ex = null;
1084 t = r;
1085 }
1086 dst.internalComplete(t, ex);
1087 }
1088 }
1089 private static final long serialVersionUID = 5232453952276885070L;
1090 }
1091
1092 static final class HandleCompletion<T,U> extends Completion {
1093 final CompletableFuture<? extends T> src;
1094 final BiFunction<? super T, Throwable, ? extends U> fn;
1095 final CompletableFuture<U> dst;
1096 HandleCompletion(CompletableFuture<? extends T> src,
1097 BiFunction<? super T, Throwable, ? extends U> fn,
1098 final CompletableFuture<U> dst) {
1099 this.src = src; this.fn = fn; this.dst = dst;
1100 }
1101 public final void run() {
1102 final CompletableFuture<? extends T> a;
1103 final BiFunction<? super T, Throwable, ? extends U> fn;
1104 final CompletableFuture<U> dst;
1105 Object r; T t; Throwable ex;
1106 if ((dst = this.dst) != null &&
1107 (fn = this.fn) != null &&
1108 (a = this.src) != null &&
1109 (r = a.result) != null &&
1110 compareAndSet(0, 1)) {
1111 if (r instanceof AltResult) {
1112 ex = ((AltResult)r).ex;
1113 t = null;
1114 }
1115 else {
1116 ex = null;
1117 @SuppressWarnings("unchecked") T tr = (T) r;
1118 t = tr;
1119 }
1120 U u = null; Throwable dx = null;
1121 try {
1122 u = fn.apply(t, ex);
1123 } catch (Throwable rex) {
1124 dx = rex;
1125 }
1126 dst.internalComplete(u, dx);
1127 }
1128 }
1129 private static final long serialVersionUID = 5232453952276885070L;
1130 }
1131
1132 static final class ComposeCompletion<T,U> extends Completion {
1133 final CompletableFuture<? extends T> src;
1134 final Function<? super T, CompletableFuture<U>> fn;
1135 final CompletableFuture<U> dst;
1136 ComposeCompletion(CompletableFuture<? extends T> src,
1137 Function<? super T, CompletableFuture<U>> fn,
1138 final CompletableFuture<U> dst) {
1139 this.src = src; this.fn = fn; this.dst = dst;
1140 }
1141 public final void run() {
1142 final CompletableFuture<? extends T> a;
1143 final Function<? super T, CompletableFuture<U>> fn;
1144 final CompletableFuture<U> dst;
1145 Object r; T t; Throwable ex;
1146 if ((dst = this.dst) != null &&
1147 (fn = this.fn) != null &&
1148 (a = this.src) != null &&
1149 (r = a.result) != null &&
1150 compareAndSet(0, 1)) {
1151 if (r instanceof AltResult) {
1152 ex = ((AltResult)r).ex;
1153 t = null;
1154 }
1155 else {
1156 ex = null;
1157 @SuppressWarnings("unchecked") T tr = (T) r;
1158 t = tr;
1159 }
1160 CompletableFuture<U> c = null;
1161 U u = null;
1162 boolean complete = false;
1163 if (ex == null) {
1164 try {
1165 c = fn.apply(t);
1166 } catch (Throwable rex) {
1167 ex = rex;
1168 }
1169 }
1170 if (ex != null || c == null) {
1171 if (ex == null)
1172 ex = new NullPointerException();
1173 }
1174 else {
1175 ThenCopy<U> d = null;
1176 Object s;
1177 if ((s = c.result) == null) {
1178 CompletionNode p = new CompletionNode
1179 (d = new ThenCopy<U>(c, dst));
1180 while ((s = c.result) == null) {
1181 if (UNSAFE.compareAndSwapObject
1182 (c, COMPLETIONS, p.next = c.completions, p))
1183 break;
1184 }
1185 }
1186 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1187 complete = true;
1188 if (s instanceof AltResult) {
1189 ex = ((AltResult)s).ex; // no rewrap
1190 u = null;
1191 }
1192 else {
1193 @SuppressWarnings("unchecked") U us = (U) s;
1194 u = us;
1195 }
1196 }
1197 }
1198 if (complete || ex != null)
1199 dst.internalComplete(u, ex);
1200 if (c != null)
1201 c.helpPostComplete();
1202 }
1203 }
1204 private static final long serialVersionUID = 5232453952276885070L;
1205 }
1206
1207 // public methods
1208
1209 /**
1210 * Creates a new incomplete CompletableFuture.
1211 */
1212 public CompletableFuture() {
1213 }
1214
1215 /**
1216 * Asynchronously executes in the {@link
1217 * ForkJoinPool#commonPool()}, a task that completes the returned
1218 * CompletableFuture with the result of the given Supplier.
1219 *
1220 * @param supplier a function returning the value to be used
1221 * to complete the returned CompletableFuture
1222 * @return the CompletableFuture
1223 */
1224 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1225 if (supplier == null) throw new NullPointerException();
1226 CompletableFuture<U> f = new CompletableFuture<U>();
1227 ForkJoinPool.commonPool().
1228 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1229 return f;
1230 }
1231
1232 /**
1233 * Asynchronously executes using the given executor, a task that
1234 * completes the returned CompletableFuture with the result of the
1235 * given Supplier.
1236 *
1237 * @param supplier a function returning the value to be used
1238 * to complete the returned CompletableFuture
1239 * @param executor the executor to use for asynchronous execution
1240 * @return the CompletableFuture
1241 */
1242 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1243 Executor executor) {
1244 if (executor == null || supplier == null)
1245 throw new NullPointerException();
1246 CompletableFuture<U> f = new CompletableFuture<U>();
1247 executor.execute(new AsyncSupply<U>(supplier, f));
1248 return f;
1249 }
1250
1251 /**
1252 * Asynchronously executes in the {@link
1253 * ForkJoinPool#commonPool()} a task that runs the given action,
1254 * and then completes the returned CompletableFuture.
1255 *
1256 * @param runnable the action to run before completing the
1257 * returned CompletableFuture
1258 * @return the CompletableFuture
1259 */
1260 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1261 if (runnable == null) throw new NullPointerException();
1262 CompletableFuture<Void> f = new CompletableFuture<Void>();
1263 ForkJoinPool.commonPool().
1264 execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1265 return f;
1266 }
1267
1268 /**
1269 * Asynchronously executes using the given executor, a task that
1270 * runs the given action, and then completes the returned
1271 * CompletableFuture.
1272 *
1273 * @param runnable the action to run before completing the
1274 * returned CompletableFuture
1275 * @param executor the executor to use for asynchronous execution
1276 * @return the CompletableFuture
1277 */
1278 public static CompletableFuture<Void> runAsync(Runnable runnable,
1279 Executor executor) {
1280 if (executor == null || runnable == null)
1281 throw new NullPointerException();
1282 CompletableFuture<Void> f = new CompletableFuture<Void>();
1283 executor.execute(new AsyncRun(runnable, f));
1284 return f;
1285 }
1286
1287 /**
1288 * Returns {@code true} if completed in any fashion: normally,
1289 * exceptionally, or via cancellation.
1290 *
1291 * @return {@code true} if completed
1292 */
1293 public boolean isDone() {
1294 return result != null;
1295 }
1296
1297 /**
1298 * Waits if necessary for the computation to complete, and then
1299 * retrieves its result.
1300 *
1301 * @return the computed result
1302 * @throws CancellationException if the computation was cancelled
1303 * @throws ExecutionException if the computation threw an
1304 * exception
1305 * @throws InterruptedException if the current thread was interrupted
1306 * while waiting
1307 */
1308 public T get() throws InterruptedException, ExecutionException {
1309 Object r; Throwable ex, cause;
1310 if ((r = result) == null && (r = waitingGet(true)) == null)
1311 throw new InterruptedException();
1312 if (r instanceof AltResult) {
1313 if ((ex = ((AltResult)r).ex) != null) {
1314 if (ex instanceof CancellationException)
1315 throw (CancellationException)ex;
1316 if ((ex instanceof CompletionException) &&
1317 (cause = ex.getCause()) != null)
1318 ex = cause;
1319 throw new ExecutionException(ex);
1320 }
1321 return null;
1322 }
1323 @SuppressWarnings("unchecked") T tr = (T) r;
1324 return tr;
1325 }
1326
1327 /**
1328 * Waits if necessary for at most the given time for completion,
1329 * and then retrieves its result, if available.
1330 *
1331 * @param timeout the maximum time to wait
1332 * @param unit the time unit of the timeout argument
1333 * @return the computed result
1334 * @throws CancellationException if the computation was cancelled
1335 * @throws ExecutionException if the computation threw an
1336 * exception
1337 * @throws InterruptedException if the current thread was interrupted
1338 * while waiting
1339 * @throws TimeoutException if the wait timed out
1340 */
1341 public T get(long timeout, TimeUnit unit)
1342 throws InterruptedException, ExecutionException, TimeoutException {
1343 Object r; Throwable ex, cause;
1344 long nanos = unit.toNanos(timeout);
1345 if (Thread.interrupted())
1346 throw new InterruptedException();
1347 if ((r = result) == null)
1348 r = timedAwaitDone(nanos);
1349 if (r instanceof AltResult) {
1350 if ((ex = ((AltResult)r).ex) != null) {
1351 if (ex instanceof CancellationException)
1352 throw (CancellationException)ex;
1353 if ((ex instanceof CompletionException) &&
1354 (cause = ex.getCause()) != null)
1355 ex = cause;
1356 throw new ExecutionException(ex);
1357 }
1358 return null;
1359 }
1360 @SuppressWarnings("unchecked") T tr = (T) r;
1361 return tr;
1362 }
1363
1364 /**
1365 * Returns the result value when complete, or throws an
1366 * (unchecked) exception if completed exceptionally. To better
1367 * conform with the use of common functional forms, if a
1368 * computation involved in the completion of this
1369 * CompletableFuture threw an exception, this method throws an
1370 * (unchecked) {@link CompletionException} with the underlying
1371 * exception as its cause.
1372 *
1373 * @return the result value
1374 * @throws CancellationException if the computation was cancelled
1375 * @throws CompletionException if a completion computation threw
1376 * an exception
1377 */
1378 public T join() {
1379 Object r; Throwable ex;
1380 if ((r = result) == null)
1381 r = waitingGet(false);
1382 if (r instanceof AltResult) {
1383 if ((ex = ((AltResult)r).ex) != null) {
1384 if (ex instanceof CancellationException)
1385 throw (CancellationException)ex;
1386 if (ex instanceof CompletionException)
1387 throw (CompletionException)ex;
1388 throw new CompletionException(ex);
1389 }
1390 return null;
1391 }
1392 @SuppressWarnings("unchecked") T tr = (T) r;
1393 return tr;
1394 }
1395
1396 /**
1397 * Returns the result value (or throws any encountered exception)
1398 * if completed, else returns the given valueIfAbsent.
1399 *
1400 * @param valueIfAbsent the value to return if not completed
1401 * @return the result value, if completed, else the given valueIfAbsent
1402 * @throws CancellationException if the computation was cancelled
1403 * @throws CompletionException if a completion computation threw
1404 * an exception
1405 */
1406 public T getNow(T valueIfAbsent) {
1407 Object r; Throwable ex;
1408 if ((r = result) == null)
1409 return valueIfAbsent;
1410 if (r instanceof AltResult) {
1411 if ((ex = ((AltResult)r).ex) != null) {
1412 if (ex instanceof CancellationException)
1413 throw (CancellationException)ex;
1414 if (ex instanceof CompletionException)
1415 throw (CompletionException)ex;
1416 throw new CompletionException(ex);
1417 }
1418 return null;
1419 }
1420 @SuppressWarnings("unchecked") T tr = (T) r;
1421 return tr;
1422 }
1423
1424 /**
1425 * If not already completed, sets the value returned by {@link
1426 * #get()} and related methods to the given value.
1427 *
1428 * @param value the result value
1429 * @return {@code true} if this invocation caused this CompletableFuture
1430 * to transition to a completed state, else {@code false}
1431 */
1432 public boolean complete(T value) {
1433 boolean triggered = result == null &&
1434 UNSAFE.compareAndSwapObject(this, RESULT, null,
1435 value == null ? NIL : value);
1436 postComplete();
1437 return triggered;
1438 }
1439
1440 /**
1441 * If not already completed, causes invocations of {@link #get()}
1442 * and related methods to throw the given exception.
1443 *
1444 * @param ex the exception
1445 * @return {@code true} if this invocation caused this CompletableFuture
1446 * to transition to a completed state, else {@code false}
1447 */
1448 public boolean completeExceptionally(Throwable ex) {
1449 if (ex == null) throw new NullPointerException();
1450 boolean triggered = result == null &&
1451 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1452 postComplete();
1453 return triggered;
1454 }
1455
1456 /**
1457 * Creates and returns a CompletableFuture that is completed with
1458 * the result of the given function of this CompletableFuture.
1459 * If this CompletableFuture completes exceptionally,
1460 * then the returned CompletableFuture also does so,
1461 * with a CompletionException holding this exception as
1462 * its cause.
1463 *
1464 * @param fn the function to use to compute the value of
1465 * the returned CompletableFuture
1466 * @return the new CompletableFuture
1467 */
1468 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
1469 return doThenApply(fn, null);
1470 }
1471
1472 /**
1473 * Creates and returns a CompletableFuture that is asynchronously
1474 * completed using the {@link ForkJoinPool#commonPool()} with the
1475 * result of the given function of this CompletableFuture. If
1476 * this CompletableFuture completes exceptionally, then the
1477 * returned CompletableFuture also does so, with a
1478 * CompletionException holding this exception as its cause.
1479 *
1480 * @param fn the function to use to compute the value of
1481 * the returned CompletableFuture
1482 * @return the new CompletableFuture
1483 */
1484 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {
1485 return doThenApply(fn, ForkJoinPool.commonPool());
1486 }
1487
1488 /**
1489 * Creates and returns a CompletableFuture that is asynchronously
1490 * completed using the given executor with the result of the given
1491 * function of this CompletableFuture. If this CompletableFuture
1492 * completes exceptionally, then the returned CompletableFuture
1493 * also does so, with a CompletionException holding this exception as
1494 * its cause.
1495 *
1496 * @param fn the function to use to compute the value of
1497 * the returned CompletableFuture
1498 * @param executor the executor to use for asynchronous execution
1499 * @return the new CompletableFuture
1500 */
1501 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,
1502 Executor executor) {
1503 if (executor == null) throw new NullPointerException();
1504 return doThenApply(fn, executor);
1505 }
1506
1507 private <U> CompletableFuture<U> doThenApply(Function<? super T,? extends U> fn,
1508 Executor e) {
1509 if (fn == null) throw new NullPointerException();
1510 CompletableFuture<U> dst = new CompletableFuture<U>();
1511 ApplyCompletion<T,U> d = null;
1512 Object r;
1513 if ((r = result) == null) {
1514 CompletionNode p = new CompletionNode
1515 (d = new ApplyCompletion<T,U>(this, fn, dst, e));
1516 while ((r = result) == null) {
1517 if (UNSAFE.compareAndSwapObject
1518 (this, COMPLETIONS, p.next = completions, p))
1519 break;
1520 }
1521 }
1522 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1523 T t; Throwable ex;
1524 if (r instanceof AltResult) {
1525 ex = ((AltResult)r).ex;
1526 t = null;
1527 }
1528 else {
1529 ex = null;
1530 @SuppressWarnings("unchecked") T tr = (T) r;
1531 t = tr;
1532 }
1533 U u = null;
1534 if (ex == null) {
1535 try {
1536 if (e != null)
1537 e.execute(new AsyncApply<T,U>(t, fn, dst));
1538 else
1539 u = fn.apply(t);
1540 } catch (Throwable rex) {
1541 ex = rex;
1542 }
1543 }
1544 if (e == null || ex != null)
1545 dst.internalComplete(u, ex);
1546 }
1547 helpPostComplete();
1548 return dst;
1549 }
1550
1551 /**
1552 * Creates and returns a CompletableFuture that is completed after
1553 * performing the given action with this CompletableFuture's
1554 * result when it completes. If this CompletableFuture
1555 * completes exceptionally, then the returned CompletableFuture
1556 * also does so, with a CompletionException holding this exception as
1557 * its cause.
1558 *
1559 * @param block the action to perform before completing the
1560 * returned CompletableFuture
1561 * @return the new CompletableFuture
1562 */
1563 public CompletableFuture<Void> thenAccept(Block<? super T> block) {
1564 return doThenAccept(block, null);
1565 }
1566
1567 /**
1568 * Creates and returns a CompletableFuture that is asynchronously
1569 * completed using the {@link ForkJoinPool#commonPool()} with this
1570 * CompletableFuture's result when it completes. If this
1571 * CompletableFuture completes exceptionally, then the returned
1572 * CompletableFuture also does so, with a CompletionException holding
1573 * this exception as its cause.
1574 *
1575 * @param block the action to perform before completing the
1576 * returned CompletableFuture
1577 * @return the new CompletableFuture
1578 */
1579 public CompletableFuture<Void> thenAcceptAsync(Block<? super T> block) {
1580 return doThenAccept(block, ForkJoinPool.commonPool());
1581 }
1582
1583 /**
1584 * Creates and returns a CompletableFuture that is asynchronously
1585 * completed using the given executor with this
1586 * CompletableFuture's result when it completes. If this
1587 * CompletableFuture completes exceptionally, then the returned
1588 * CompletableFuture also does so, with a CompletionException holding
1589 * this exception as its cause.
1590 *
1591 * @param block the action to perform before completing the
1592 * returned CompletableFuture
1593 * @param executor the executor to use for asynchronous execution
1594 * @return the new CompletableFuture
1595 */
1596 public CompletableFuture<Void> thenAcceptAsync(Block<? super T> block,
1597 Executor executor) {
1598 if (executor == null) throw new NullPointerException();
1599 return doThenAccept(block, executor);
1600 }
1601
1602 private CompletableFuture<Void> doThenAccept(Block<? super T> fn,
1603 Executor e) {
1604 if (fn == null) throw new NullPointerException();
1605 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1606 AcceptCompletion<T> d = null;
1607 Object r;
1608 if ((r = result) == null) {
1609 CompletionNode p = new CompletionNode
1610 (d = new AcceptCompletion<T>(this, fn, dst, e));
1611 while ((r = result) == null) {
1612 if (UNSAFE.compareAndSwapObject
1613 (this, COMPLETIONS, p.next = completions, p))
1614 break;
1615 }
1616 }
1617 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1618 T t; Throwable ex;
1619 if (r instanceof AltResult) {
1620 ex = ((AltResult)r).ex;
1621 t = null;
1622 }
1623 else {
1624 ex = null;
1625 @SuppressWarnings("unchecked") T tr = (T) r;
1626 t = tr;
1627 }
1628 if (ex == null) {
1629 try {
1630 if (e != null)
1631 e.execute(new AsyncAccept<T>(t, fn, dst));
1632 else
1633 fn.accept(t);
1634 } catch (Throwable rex) {
1635 ex = rex;
1636 }
1637 }
1638 if (e == null || ex != null)
1639 dst.internalComplete(null, ex);
1640 }
1641 helpPostComplete();
1642 return dst;
1643 }
1644
1645 /**
1646 * Creates and returns a CompletableFuture that is completed after
1647 * performing the given action when this CompletableFuture
1648 * completes. If this CompletableFuture completes exceptionally,
1649 * then the returned CompletableFuture also does so, with a
1650 * CompletionException holding this exception as its cause.
1651 *
1652 * @param action the action to perform before completing the
1653 * returned CompletableFuture
1654 * @return the new CompletableFuture
1655 */
1656 public CompletableFuture<Void> thenRun(Runnable action) {
1657 return doThenRun(action, null);
1658 }
1659
1660 /**
1661 * Creates and returns a CompletableFuture that is asynchronously
1662 * completed using the {@link ForkJoinPool#commonPool()} after
1663 * performing the given action when this CompletableFuture
1664 * completes. If this CompletableFuture completes exceptionally,
1665 * then the returned CompletableFuture also does so, with a
1666 * CompletionException holding this exception as its cause.
1667 *
1668 * @param action the action to perform before completing the
1669 * returned CompletableFuture
1670 * @return the new CompletableFuture
1671 */
1672 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1673 return doThenRun(action, ForkJoinPool.commonPool());
1674 }
1675
1676 /**
1677 * Creates and returns a CompletableFuture that is asynchronously
1678 * completed using the given executor after performing the given
1679 * action when this CompletableFuture completes. If this
1680 * CompletableFuture completes exceptionally, then the returned
1681 * CompletableFuture also does so, with a CompletionException holding
1682 * this exception as its cause.
1683 *
1684 * @param action the action to perform before completing the
1685 * returned CompletableFuture
1686 * @param executor the executor to use for asynchronous execution
1687 * @return the new CompletableFuture
1688 */
1689 public CompletableFuture<Void> thenRunAsync(Runnable action,
1690 Executor executor) {
1691 if (executor == null) throw new NullPointerException();
1692 return doThenRun(action, executor);
1693 }
1694
1695 private CompletableFuture<Void> doThenRun(Runnable action,
1696 Executor e) {
1697 if (action == null) throw new NullPointerException();
1698 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1699 RunCompletion<T> d = null;
1700 Object r;
1701 if ((r = result) == null) {
1702 CompletionNode p = new CompletionNode
1703 (d = new RunCompletion<T>(this, action, dst, e));
1704 while ((r = result) == null) {
1705 if (UNSAFE.compareAndSwapObject
1706 (this, COMPLETIONS, p.next = completions, p))
1707 break;
1708 }
1709 }
1710 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1711 Throwable ex;
1712 if (r instanceof AltResult)
1713 ex = ((AltResult)r).ex;
1714 else
1715 ex = null;
1716 if (ex == null) {
1717 try {
1718 if (e != null)
1719 e.execute(new AsyncRun(action, dst));
1720 else
1721 action.run();
1722 } catch (Throwable rex) {
1723 ex = rex;
1724 }
1725 }
1726 if (e == null || ex != null)
1727 dst.internalComplete(null, ex);
1728 }
1729 helpPostComplete();
1730 return dst;
1731 }
1732
1733 /**
1734 * Creates and returns a CompletableFuture that is completed with
1735 * the result of the given function of this and the other given
1736 * CompletableFuture's results when both complete. If this or
1737 * the other CompletableFuture complete exceptionally, then the
1738 * returned CompletableFuture also does so, with a
1739 * CompletionException holding the exception as its cause.
1740 *
1741 * @param other the other CompletableFuture
1742 * @param fn the function to use to compute the value of
1743 * the returned CompletableFuture
1744 * @return the new CompletableFuture
1745 */
1746 public <U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other,
1747 BiFunction<? super T,? super U,? extends V> fn) {
1748 return doThenBiApply(other, fn, null);
1749 }
1750
1751 /**
1752 * Creates and returns a CompletableFuture that is asynchronously
1753 * completed using the {@link ForkJoinPool#commonPool()} with
1754 * the result of the given function of this and the other given
1755 * CompletableFuture's results when both complete. If this or
1756 * the other CompletableFuture complete exceptionally, then the
1757 * returned CompletableFuture also does so, with a
1758 * CompletionException holding the exception as its cause.
1759 *
1760 * @param other the other CompletableFuture
1761 * @param fn the function to use to compute the value of
1762 * the returned CompletableFuture
1763 * @return the new CompletableFuture
1764 */
1765 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
1766 BiFunction<? super T,? super U,? extends V> fn) {
1767 return doThenBiApply(other, fn, ForkJoinPool.commonPool());
1768 }
1769
1770 /**
1771 * Creates and returns a CompletableFuture that is
1772 * asynchronously completed using the given executor with the
1773 * result of the given function of this and the other given
1774 * CompletableFuture's results when both complete. If this or
1775 * the other CompletableFuture complete exceptionally, then the
1776 * returned CompletableFuture also does so, with a
1777 * CompletionException holding the exception as its cause.
1778 *
1779 * @param other the other CompletableFuture
1780 * @param fn the function to use to compute the value of
1781 * the returned CompletableFuture
1782 * @param executor the executor to use for asynchronous execution
1783 * @return the new CompletableFuture
1784 */
1785
1786 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
1787 BiFunction<? super T,? super U,? extends V> fn,
1788 Executor executor) {
1789 if (executor == null) throw new NullPointerException();
1790 return doThenBiApply(other, fn, executor);
1791 }
1792
1793 private <U,V> CompletableFuture<V> doThenBiApply(CompletableFuture<? extends U> other,
1794 BiFunction<? super T,? super U,? extends V> fn,
1795 Executor e) {
1796 if (other == null || fn == null) throw new NullPointerException();
1797 CompletableFuture<V> dst = new CompletableFuture<V>();
1798 BiApplyCompletion<T,U,V> d = null;
1799 Object r, s = null;
1800 if ((r = result) == null || (s = other.result) == null) {
1801 d = new BiApplyCompletion<T,U,V>(this, other, fn, dst, e);
1802 CompletionNode q = null, p = new CompletionNode(d);
1803 while ((r == null && (r = result) == null) ||
1804 (s == null && (s = other.result) == null)) {
1805 if (q != null) {
1806 if (s != null ||
1807 UNSAFE.compareAndSwapObject
1808 (other, COMPLETIONS, q.next = other.completions, q))
1809 break;
1810 }
1811 else if (r != null ||
1812 UNSAFE.compareAndSwapObject
1813 (this, COMPLETIONS, p.next = completions, p)) {
1814 if (s != null)
1815 break;
1816 q = new CompletionNode(d);
1817 }
1818 }
1819 }
1820 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1821 T t; U u; Throwable ex;
1822 if (r instanceof AltResult) {
1823 ex = ((AltResult)r).ex;
1824 t = null;
1825 }
1826 else {
1827 ex = null;
1828 @SuppressWarnings("unchecked") T tr = (T) r;
1829 t = tr;
1830 }
1831 if (ex != null)
1832 u = null;
1833 else if (s instanceof AltResult) {
1834 ex = ((AltResult)s).ex;
1835 u = null;
1836 }
1837 else {
1838 @SuppressWarnings("unchecked") U us = (U) s;
1839 u = us;
1840 }
1841 V v = null;
1842 if (ex == null) {
1843 try {
1844 if (e != null)
1845 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
1846 else
1847 v = fn.apply(t, u);
1848 } catch (Throwable rex) {
1849 ex = rex;
1850 }
1851 }
1852 if (e == null || ex != null)
1853 dst.internalComplete(v, ex);
1854 }
1855 helpPostComplete();
1856 other.helpPostComplete();
1857 return dst;
1858 }
1859
1860 /**
1861 * Creates and returns a CompletableFuture that is completed with
1862 * the results of this and the other given CompletableFuture if
1863 * both complete. If this and/or the other CompletableFuture
1864 * complete exceptionally, then the returned CompletableFuture
1865 * also does so, with a CompletionException holding one of these
1866 * exceptions as its cause.
1867 *
1868 * @param other the other CompletableFuture
1869 * @param block the action to perform before completing the
1870 * returned CompletableFuture
1871 * @return the new CompletableFuture
1872 */
1873 public <U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other,
1874 BiBlock<? super T, ? super U> block) {
1875 return doThenBiAccept(other, block, null);
1876 }
1877
1878 /**
1879 * Creates and returns a CompletableFuture that is completed
1880 * asynchronously using the {@link ForkJoinPool#commonPool()} with
1881 * the results of this and the other given CompletableFuture when
1882 * both complete. If this and/or the other CompletableFuture
1883 * complete exceptionally, then the returned CompletableFuture
1884 * also does so, with a CompletionException holding one of these
1885 * exceptions as its cause.
1886 *
1887 * @param other the other CompletableFuture
1888 * @param block the action to perform before completing the
1889 * returned CompletableFuture
1890 * @return the new CompletableFuture
1891 */
1892 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
1893 BiBlock<? super T, ? super U> block) {
1894 return doThenBiAccept(other, block, ForkJoinPool.commonPool());
1895 }
1896
1897 /**
1898 * Creates and returns a CompletableFuture that is completed
1899 * asynchronously using the given executor with the results of
1900 * this and the other given CompletableFuture when both complete.
1901 * If this and/or the other CompletableFuture complete
1902 * exceptionally, then the returned CompletableFuture also does
1903 * so, with a CompletionException holding one of these exceptions as
1904 * its cause.
1905 *
1906 * @param other the other CompletableFuture
1907 * @param block the action to perform before completing the
1908 * returned CompletableFuture
1909 * @param executor the executor to use for asynchronous execution
1910 * @return the new CompletableFuture
1911 */
1912 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
1913 BiBlock<? super T, ? super U> block,
1914 Executor executor) {
1915 if (executor == null) throw new NullPointerException();
1916 return doThenBiAccept(other, block, executor);
1917 }
1918
1919 private <U> CompletableFuture<Void> doThenBiAccept(CompletableFuture<? extends U> other,
1920 BiBlock<? super T,? super U> fn,
1921 Executor e) {
1922 if (other == null || fn == null) throw new NullPointerException();
1923 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1924 BiAcceptCompletion<T,U> d = null;
1925 Object r, s = null;
1926 if ((r = result) == null || (s = other.result) == null) {
1927 d = new BiAcceptCompletion<T,U>(this, other, fn, dst, e);
1928 CompletionNode q = null, p = new CompletionNode(d);
1929 while ((r == null && (r = result) == null) ||
1930 (s == null && (s = other.result) == null)) {
1931 if (q != null) {
1932 if (s != null ||
1933 UNSAFE.compareAndSwapObject
1934 (other, COMPLETIONS, q.next = other.completions, q))
1935 break;
1936 }
1937 else if (r != null ||
1938 UNSAFE.compareAndSwapObject
1939 (this, COMPLETIONS, p.next = completions, p)) {
1940 if (s != null)
1941 break;
1942 q = new CompletionNode(d);
1943 }
1944 }
1945 }
1946 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1947 T t; U u; Throwable ex;
1948 if (r instanceof AltResult) {
1949 ex = ((AltResult)r).ex;
1950 t = null;
1951 }
1952 else {
1953 ex = null;
1954 @SuppressWarnings("unchecked") T tr = (T) r;
1955 t = tr;
1956 }
1957 if (ex != null)
1958 u = null;
1959 else if (s instanceof AltResult) {
1960 ex = ((AltResult)s).ex;
1961 u = null;
1962 }
1963 else {
1964 @SuppressWarnings("unchecked") U us = (U) s;
1965 u = us;
1966 }
1967 if (ex == null) {
1968 try {
1969 if (e != null)
1970 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
1971 else
1972 fn.accept(t, u);
1973 } catch (Throwable rex) {
1974 ex = rex;
1975 }
1976 }
1977 if (e == null || ex != null)
1978 dst.internalComplete(null, ex);
1979 }
1980 helpPostComplete();
1981 other.helpPostComplete();
1982 return dst;
1983 }
1984
1985 /**
1986 * Creates and returns a CompletableFuture that is completed
1987 * when this and the other given CompletableFuture both
1988 * complete. If this and/or the other CompletableFuture complete
1989 * exceptionally, then the returned CompletableFuture also does
1990 * so, with a CompletionException holding one of these exceptions as
1991 * its cause.
1992 *
1993 * @param other the other CompletableFuture
1994 * @param action the action to perform before completing the
1995 * returned CompletableFuture
1996 * @return the new CompletableFuture
1997 */
1998 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
1999 Runnable action) {
2000 return doThenBiRun(other, action, null);
2001 }
2002
2003 /**
2004 * Creates and returns a CompletableFuture that is completed
2005 * asynchronously using the {@link ForkJoinPool#commonPool()}
2006 * when this and the other given CompletableFuture both
2007 * complete. If this and/or the other CompletableFuture complete
2008 * exceptionally, then the returned CompletableFuture also does
2009 * so, with a CompletionException holding one of these exceptions as
2010 * its cause.
2011 *
2012 * @param other the other CompletableFuture
2013 * @param action the action to perform before completing the
2014 * returned CompletableFuture
2015 * @return the new CompletableFuture
2016 */
2017 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2018 Runnable action) {
2019 return doThenBiRun(other, action, ForkJoinPool.commonPool());
2020 }
2021
2022 /**
2023 * Creates and returns a CompletableFuture that is completed
2024 * asynchronously using the given executor
2025 * when this and the other given CompletableFuture both
2026 * complete. If this and/or the other CompletableFuture complete
2027 * exceptionally, then the returned CompletableFuture also does
2028 * so, with a CompletionException holding one of these exceptions as
2029 * its cause.
2030 *
2031 * @param other the other CompletableFuture
2032 * @param action the action to perform before completing the
2033 * returned CompletableFuture
2034 * @param executor the executor to use for asynchronous execution
2035 * @return the new CompletableFuture
2036 */
2037 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2038 Runnable action,
2039 Executor executor) {
2040 if (executor == null) throw new NullPointerException();
2041 return doThenBiRun(other, action, executor);
2042 }
2043
2044 private CompletableFuture<Void> doThenBiRun(CompletableFuture<?> other,
2045 Runnable action,
2046 Executor e) {
2047 if (other == null || action == null) throw new NullPointerException();
2048 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2049 BiRunCompletion<T> d = null;
2050 Object r, s = null;
2051 if ((r = result) == null || (s = other.result) == null) {
2052 d = new BiRunCompletion<T>(this, other, action, dst, e);
2053 CompletionNode q = null, p = new CompletionNode(d);
2054 while ((r == null && (r = result) == null) ||
2055 (s == null && (s = other.result) == null)) {
2056 if (q != null) {
2057 if (s != null ||
2058 UNSAFE.compareAndSwapObject
2059 (other, COMPLETIONS, q.next = other.completions, q))
2060 break;
2061 }
2062 else if (r != null ||
2063 UNSAFE.compareAndSwapObject
2064 (this, COMPLETIONS, p.next = completions, p)) {
2065 if (s != null)
2066 break;
2067 q = new CompletionNode(d);
2068 }
2069 }
2070 }
2071 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2072 Throwable ex;
2073 if (r instanceof AltResult)
2074 ex = ((AltResult)r).ex;
2075 else
2076 ex = null;
2077 if (ex == null && (s instanceof AltResult))
2078 ex = ((AltResult)s).ex;
2079 if (ex == null) {
2080 try {
2081 if (e != null)
2082 e.execute(new AsyncRun(action, dst));
2083 else
2084 action.run();
2085 } catch (Throwable rex) {
2086 ex = rex;
2087 }
2088 }
2089 if (e == null || ex != null)
2090 dst.internalComplete(null, ex);
2091 }
2092 helpPostComplete();
2093 other.helpPostComplete();
2094 return dst;
2095 }
2096
2097 /**
2098 * Creates and returns a CompletableFuture that is completed with
2099 * the result of the given function of either this or the other
2100 * given CompletableFuture's results when either complete. If
2101 * this and/or the other CompletableFuture complete exceptionally,
2102 * then the returned CompletableFuture may also do so, with a
2103 * CompletionException holding one of these exceptions as its cause.
2104 * No guarantees are made about which result or exception is used
2105 * in the returned CompletableFuture.
2106 *
2107 * @param other the other CompletableFuture
2108 * @param fn the function to use to compute the value of
2109 * the returned CompletableFuture
2110 * @return the new CompletableFuture
2111 */
2112 public <U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other,
2113 Function<? super T, U> fn) {
2114 return doOrApply(other, fn, null);
2115 }
2116
2117 /**
2118 * Creates and returns a CompletableFuture that is completed
2119 * asynchronously using the {@link ForkJoinPool#commonPool()} with
2120 * the result of the given function of either this or the other
2121 * given CompletableFuture's results when either complete. If
2122 * this and/or the other CompletableFuture complete exceptionally,
2123 * then the returned CompletableFuture may also do so, with a
2124 * CompletionException holding one of these exceptions as its cause.
2125 * No guarantees are made about which result or exception is used
2126 * in the returned CompletableFuture.
2127 *
2128 * @param other the other CompletableFuture
2129 * @param fn the function to use to compute the value of
2130 * the returned CompletableFuture
2131 * @return the new CompletableFuture
2132 */
2133 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
2134 Function<? super T, U> fn) {
2135 return doOrApply(other, fn, ForkJoinPool.commonPool());
2136 }
2137
2138 /**
2139 * Creates and returns a CompletableFuture that is completed
2140 * asynchronously using the given executor with the result of the
2141 * given function of either this or the other given
2142 * CompletableFuture's results when either complete. If this
2143 * and/or the other CompletableFuture complete exceptionally, then
2144 * the returned CompletableFuture may also do so, with a
2145 * CompletionException holding one of these exceptions as its cause.
2146 * No guarantees are made about which result or exception is used
2147 * in the returned CompletableFuture.
2148 *
2149 * @param other the other CompletableFuture
2150 * @param fn the function to use to compute the value of
2151 * the returned CompletableFuture
2152 * @param executor the executor to use for asynchronous execution
2153 * @return the new CompletableFuture
2154 */
2155 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
2156 Function<? super T, U> fn,
2157 Executor executor) {
2158 if (executor == null) throw new NullPointerException();
2159 return doOrApply(other, fn, executor);
2160 }
2161
2162 private <U> CompletableFuture<U> doOrApply(CompletableFuture<? extends T> other,
2163 Function<? super T, U> fn,
2164 Executor e) {
2165 if (other == null || fn == null) throw new NullPointerException();
2166 CompletableFuture<U> dst = new CompletableFuture<U>();
2167 OrApplyCompletion<T,U> d = null;
2168 Object r;
2169 if ((r = result) == null && (r = other.result) == null) {
2170 d = new OrApplyCompletion<T,U>(this, other, fn, dst, e);
2171 CompletionNode q = null, p = new CompletionNode(d);
2172 while ((r = result) == null && (r = other.result) == null) {
2173 if (q != null) {
2174 if (UNSAFE.compareAndSwapObject
2175 (other, COMPLETIONS, q.next = other.completions, q))
2176 break;
2177 }
2178 else if (UNSAFE.compareAndSwapObject
2179 (this, COMPLETIONS, p.next = completions, p))
2180 q = new CompletionNode(d);
2181 }
2182 }
2183 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2184 T t; Throwable ex;
2185 if (r instanceof AltResult) {
2186 ex = ((AltResult)r).ex;
2187 t = null;
2188 }
2189 else {
2190 ex = null;
2191 @SuppressWarnings("unchecked") T tr = (T) r;
2192 t = tr;
2193 }
2194 U u = null;
2195 if (ex == null) {
2196 try {
2197 if (e != null)
2198 e.execute(new AsyncApply<T,U>(t, fn, dst));
2199 else
2200 u = fn.apply(t);
2201 } catch (Throwable rex) {
2202 ex = rex;
2203 }
2204 }
2205 if (e == null || ex != null)
2206 dst.internalComplete(u, ex);
2207 }
2208 helpPostComplete();
2209 other.helpPostComplete();
2210 return dst;
2211 }
2212
2213 /**
2214 * Creates and returns a CompletableFuture that is completed after
2215 * performing the given action with the result of either this or the
2216 * other given CompletableFuture's result, when either complete.
2217 * If this and/or the other CompletableFuture complete
2218 * exceptionally, then the returned CompletableFuture may also do
2219 * so, with a CompletionException holding one of these exceptions as
2220 * its cause. No guarantees are made about which exception is
2221 * used in the returned CompletableFuture.
2222 *
2223 * @param other the other CompletableFuture
2224 * @param block the action to perform before completing the
2225 * returned CompletableFuture
2226 * @return the new CompletableFuture
2227 */
2228 public CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other,
2229 Block<? super T> block) {
2230 return doOrAccept(other, block, null);
2231 }
2232
2233 /**
2234 * Creates and returns a CompletableFuture that is completed
2235 * asynchronously using the {@link ForkJoinPool#commonPool()},
2236 * performing the given action with the result of either this or
2237 * the other given CompletableFuture's result, when either
2238 * complete. If this and/or the other CompletableFuture complete
2239 * exceptionally, then the returned CompletableFuture may also do
2240 * so, with a CompletionException holding one of these exceptions as
2241 * its cause. No guarantees are made about which exception is
2242 * used in the returned CompletableFuture.
2243 *
2244 * @param other the other CompletableFuture
2245 * @param block the action to perform before completing the
2246 * returned CompletableFuture
2247 * @return the new CompletableFuture
2248 */
2249 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
2250 Block<? super T> block) {
2251 return doOrAccept(other, block, ForkJoinPool.commonPool());
2252 }
2253
2254 /**
2255 * Creates and returns a CompletableFuture that is completed
2256 * asynchronously using the given executor,
2257 * performing the given action with the result of either this or
2258 * the other given CompletableFuture's result, when either
2259 * complete. If this and/or the other CompletableFuture complete
2260 * exceptionally, then the returned CompletableFuture may also do
2261 * so, with a CompletionException holding one of these exceptions as
2262 * its cause. No guarantees are made about which exception is
2263 * used in the returned CompletableFuture.
2264 *
2265 * @param other the other CompletableFuture
2266 * @param block the action to perform before completing the
2267 * returned CompletableFuture
2268 * @param executor the executor to use for asynchronous execution
2269 * @return the new CompletableFuture
2270 */
2271 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
2272 Block<? super T> block,
2273 Executor executor) {
2274 if (executor == null) throw new NullPointerException();
2275 return doOrAccept(other, block, executor);
2276 }
2277
2278 private CompletableFuture<Void> doOrAccept(CompletableFuture<? extends T> other,
2279 Block<? super T> fn,
2280 Executor e) {
2281 if (other == null || fn == null) throw new NullPointerException();
2282 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2283 OrAcceptCompletion<T> d = null;
2284 Object r;
2285 if ((r = result) == null && (r = other.result) == null) {
2286 d = new OrAcceptCompletion<T>(this, other, fn, dst, e);
2287 CompletionNode q = null, p = new CompletionNode(d);
2288 while ((r = result) == null && (r = other.result) == null) {
2289 if (q != null) {
2290 if (UNSAFE.compareAndSwapObject
2291 (other, COMPLETIONS, q.next = other.completions, q))
2292 break;
2293 }
2294 else if (UNSAFE.compareAndSwapObject
2295 (this, COMPLETIONS, p.next = completions, p))
2296 q = new CompletionNode(d);
2297 }
2298 }
2299 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2300 T t; Throwable ex;
2301 if (r instanceof AltResult) {
2302 ex = ((AltResult)r).ex;
2303 t = null;
2304 }
2305 else {
2306 ex = null;
2307 @SuppressWarnings("unchecked") T tr = (T) r;
2308 t = tr;
2309 }
2310 if (ex == null) {
2311 try {
2312 if (e != null)
2313 e.execute(new AsyncAccept<T>(t, fn, dst));
2314 else
2315 fn.accept(t);
2316 } catch (Throwable rex) {
2317 ex = rex;
2318 }
2319 }
2320 if (e == null || ex != null)
2321 dst.internalComplete(null, ex);
2322 }
2323 helpPostComplete();
2324 other.helpPostComplete();
2325 return dst;
2326 }
2327
2328 /**
2329 * Creates and returns a CompletableFuture that is completed
2330 * after this or the other given CompletableFuture complete. If
2331 * this and/or the other CompletableFuture complete exceptionally,
2332 * then the returned CompletableFuture may also do so, with a
2333 * CompletionException holding one of these exceptions as its cause.
2334 * No guarantees are made about which exception is used in the
2335 * returned CompletableFuture.
2336 *
2337 * @param other the other CompletableFuture
2338 * @param action the action to perform before completing the
2339 * returned CompletableFuture
2340 * @return the new CompletableFuture
2341 */
2342 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2343 Runnable action) {
2344 return doOrRun(other, action, null);
2345 }
2346
2347 /**
2348 * Creates and returns a CompletableFuture that is completed
2349 * asynchronously using the {@link ForkJoinPool#commonPool()}
2350 * after this or the other given CompletableFuture complete. If
2351 * this and/or the other CompletableFuture complete exceptionally,
2352 * then the returned CompletableFuture may also do so, with a
2353 * CompletionException holding one of these exceptions as its cause.
2354 * No guarantees are made about which exception is used in the
2355 * returned CompletableFuture.
2356 *
2357 * @param other the other CompletableFuture
2358 * @param action the action to perform before completing the
2359 * returned CompletableFuture
2360 * @return the new CompletableFuture
2361 */
2362 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
2363 Runnable action) {
2364 return doOrRun(other, action, ForkJoinPool.commonPool());
2365 }
2366
2367 /**
2368 * Creates and returns a CompletableFuture that is completed
2369 * asynchronously using the given executor after this or the other
2370 * given CompletableFuture complete. If this and/or the other
2371 * CompletableFuture complete exceptionally, then the returned
2372 * CompletableFuture may also do so, with a CompletionException
2373 * holding one of these exceptions as its cause. No guarantees are
2374 * made about which exception is used in the returned
2375 * CompletableFuture.
2376 *
2377 * @param other the other CompletableFuture
2378 * @param action the action to perform before completing the
2379 * returned CompletableFuture
2380 * @param executor the executor to use for asynchronous execution
2381 * @return the new CompletableFuture
2382 */
2383 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
2384 Runnable action,
2385 Executor executor) {
2386 if (executor == null) throw new NullPointerException();
2387 return doOrRun(other, action, executor);
2388 }
2389
2390 private CompletableFuture<Void> doOrRun(CompletableFuture<?> other,
2391 Runnable action,
2392 Executor e) {
2393 if (other == null || action == null) throw new NullPointerException();
2394 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2395 OrRunCompletion<T> d = null;
2396 Object r;
2397 if ((r = result) == null && (r = other.result) == null) {
2398 d = new OrRunCompletion<T>(this, other, action, dst, e);
2399 CompletionNode q = null, p = new CompletionNode(d);
2400 while ((r = result) == null && (r = other.result) == null) {
2401 if (q != null) {
2402 if (UNSAFE.compareAndSwapObject
2403 (other, COMPLETIONS, q.next = other.completions, q))
2404 break;
2405 }
2406 else if (UNSAFE.compareAndSwapObject
2407 (this, COMPLETIONS, p.next = completions, p))
2408 q = new CompletionNode(d);
2409 }
2410 }
2411 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2412 Throwable ex;
2413 if (r instanceof AltResult)
2414 ex = ((AltResult)r).ex;
2415 else
2416 ex = null;
2417 if (ex == null) {
2418 try {
2419 if (e != null)
2420 e.execute(new AsyncRun(action, dst));
2421 else
2422 action.run();
2423 } catch (Throwable rex) {
2424 ex = rex;
2425 }
2426 }
2427 if (e == null || ex != null)
2428 dst.internalComplete(null, ex);
2429 }
2430 helpPostComplete();
2431 other.helpPostComplete();
2432 return dst;
2433 }
2434
2435 /**
2436 * Returns a CompletableFuture (or an equivalent one) produced by
2437 * the given function of the result of this CompletableFuture when
2438 * completed. If this CompletableFuture completes exceptionally,
2439 * then the returned CompletableFuture also does so, with a
2440 * CompletionException holding this exception as its cause.
2441 *
2442 * @param fn the function returning a new CompletableFuture.
2443 * @return the CompletableFuture, that {@code isDone()} upon
2444 * return if completed by the given function, or an exception
2445 * occurs.
2446 */
2447 public <U> CompletableFuture<U> thenCompose(Function<? super T,
2448 CompletableFuture<U>> fn) {
2449 if (fn == null) throw new NullPointerException();
2450 CompletableFuture<U> dst = null;
2451 ComposeCompletion<T,U> d = null;
2452 Object r;
2453 if ((r = result) == null) {
2454 dst = new CompletableFuture<U>();
2455 CompletionNode p = new CompletionNode
2456 (d = new ComposeCompletion<T,U>(this, fn, dst));
2457 while ((r = result) == null) {
2458 if (UNSAFE.compareAndSwapObject
2459 (this, COMPLETIONS, p.next = completions, p))
2460 break;
2461 }
2462 }
2463 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2464 T t; Throwable ex;
2465 if (r instanceof AltResult) {
2466 ex = ((AltResult)r).ex;
2467 t = null;
2468 }
2469 else {
2470 ex = null;
2471 @SuppressWarnings("unchecked") T tr = (T) r;
2472 t = tr;
2473 }
2474 if (ex == null) {
2475 try {
2476 dst = fn.apply(t);
2477 } catch (Throwable rex) {
2478 ex = rex;
2479 }
2480 }
2481 if (dst == null) {
2482 dst = new CompletableFuture<U>();
2483 if (ex == null)
2484 ex = new NullPointerException();
2485 }
2486 if (ex != null)
2487 dst.internalComplete(null, ex);
2488 }
2489 helpPostComplete();
2490 dst.helpPostComplete();
2491 return dst;
2492 }
2493
2494 /**
2495 * Creates and returns a CompletableFuture that is completed with
2496 * the result of the given function of the exception triggering
2497 * this CompletableFuture's completion when it completes
2498 * exceptionally; Otherwise, if this CompletableFuture completes
2499 * normally, then the returned CompletableFuture also completes
2500 * normally with the same value.
2501 *
2502 * @param fn the function to use to compute the value of the
2503 * returned CompletableFuture if this CompletableFuture completed
2504 * exceptionally
2505 * @return the new CompletableFuture
2506 */
2507 public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {
2508 if (fn == null) throw new NullPointerException();
2509 CompletableFuture<T> dst = new CompletableFuture<T>();
2510 ExceptionCompletion<T> d = null;
2511 Object r;
2512 if ((r = result) == null) {
2513 CompletionNode p =
2514 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2515 while ((r = result) == null) {
2516 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2517 p.next = completions, p))
2518 break;
2519 }
2520 }
2521 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2522 T t = null; Throwable ex, dx = null;
2523 if (r instanceof AltResult) {
2524 if ((ex = ((AltResult)r).ex) != null) {
2525 try {
2526 t = fn.apply(ex);
2527 } catch (Throwable rex) {
2528 dx = rex;
2529 }
2530 }
2531 }
2532 else {
2533 @SuppressWarnings("unchecked") T tr = (T) r;
2534 t = tr;
2535 }
2536 dst.internalComplete(t, dx);
2537 }
2538 helpPostComplete();
2539 return dst;
2540 }
2541
2542 /**
2543 * Creates and returns a CompletableFuture that is completed with
2544 * the result of the given function of the result and exception of
2545 * this CompletableFuture's completion when it completes. The
2546 * given function is invoked with the result (or {@code null} if
2547 * none) and the exception (or {@code null} if none) of this
2548 * CompletableFuture when complete.
2549 *
2550 * @param fn the function to use to compute the value of the
2551 * returned CompletableFuture
2552
2553 * @return the new CompletableFuture
2554 */
2555 public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {
2556 if (fn == null) throw new NullPointerException();
2557 CompletableFuture<U> dst = new CompletableFuture<U>();
2558 HandleCompletion<T,U> d = null;
2559 Object r;
2560 if ((r = result) == null) {
2561 CompletionNode p =
2562 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2563 while ((r = result) == null) {
2564 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2565 p.next = completions, p))
2566 break;
2567 }
2568 }
2569 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2570 T t; Throwable ex;
2571 if (r instanceof AltResult) {
2572 ex = ((AltResult)r).ex;
2573 t = null;
2574 }
2575 else {
2576 ex = null;
2577 @SuppressWarnings("unchecked") T tr = (T) r;
2578 t = tr;
2579 }
2580 U u; Throwable dx;
2581 try {
2582 u = fn.apply(t, ex);
2583 dx = null;
2584 } catch (Throwable rex) {
2585 dx = rex;
2586 u = null;
2587 }
2588 dst.internalComplete(u, dx);
2589 }
2590 helpPostComplete();
2591 return dst;
2592 }
2593
2594 /**
2595 * Attempts to complete this CompletableFuture with
2596 * a {@link CancellationException}.
2597 *
2598 * @param mayInterruptIfRunning this value has no effect in this
2599 * implementation because interrupts are not used to control
2600 * processing.
2601 *
2602 * @return {@code true} if this task is now cancelled
2603 */
2604 public boolean cancel(boolean mayInterruptIfRunning) {
2605 Object r;
2606 while ((r = result) == null) {
2607 r = new AltResult(new CancellationException());
2608 if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
2609 postComplete();
2610 return true;
2611 }
2612 }
2613 return ((r instanceof AltResult) &&
2614 (((AltResult)r).ex instanceof CancellationException));
2615 }
2616
2617 /**
2618 * Returns {@code true} if this CompletableFuture was cancelled
2619 * before it completed normally.
2620 *
2621 * @return {@code true} if this CompletableFuture was cancelled
2622 * before it completed normally
2623 */
2624 public boolean isCancelled() {
2625 Object r;
2626 return ((r = result) != null &&
2627 (r instanceof AltResult) &&
2628 (((AltResult)r).ex instanceof CancellationException));
2629 }
2630
2631 /**
2632 * Forcibly sets or resets the value subsequently returned by
2633 * method get() and related methods, whether or not already
2634 * completed. This method is designed for use only in error
2635 * recovery actions, and even in such situations may result in
2636 * ongoing dependent completions using established versus
2637 * overwritten outcomes.
2638 *
2639 * @param value the completion value
2640 */
2641 public void obtrudeValue(T value) {
2642 result = (value == null) ? NIL : value;
2643 postComplete();
2644 }
2645
2646 /**
2647 * Forcibly causes subsequent invocations of method get() and
2648 * related methods to throw the given exception, whether or not
2649 * already completed. This method is designed for use only in
2650 * recovery actions, and even in such situations may result in
2651 * ongoing dependent completions using established versus
2652 * overwritten outcomes.
2653 *
2654 * @param ex the exception
2655 */
2656 public void obtrudeException(Throwable ex) {
2657 if (ex == null) throw new NullPointerException();
2658 result = new AltResult(ex);
2659 postComplete();
2660 }
2661
2662 // Unsafe mechanics
2663 private static final sun.misc.Unsafe UNSAFE;
2664 private static final long RESULT;
2665 private static final long WAITERS;
2666 private static final long COMPLETIONS;
2667 static {
2668 try {
2669 UNSAFE = sun.misc.Unsafe.getUnsafe();
2670 Class<?> k = CompletableFuture.class;
2671 RESULT = UNSAFE.objectFieldOffset
2672 (k.getDeclaredField("result"));
2673 WAITERS = UNSAFE.objectFieldOffset
2674 (k.getDeclaredField("waiters"));
2675 COMPLETIONS = UNSAFE.objectFieldOffset
2676 (k.getDeclaredField("completions"));
2677 } catch (Exception e) {
2678 throw new Error(e);
2679 }
2680 }
2681 }