ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/CompletableFuture.java
Revision: 1.11
Committed: Wed Feb 6 07:51:57 2013 UTC (11 years, 3 months ago) by jsr166
Branch: MAIN
Changes since 1.10: +2 -3 lines
Log Message:
improve isCancelled

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package jsr166e;
8 import java.util.concurrent.Future;
9 import java.util.concurrent.TimeUnit;
10 import java.util.concurrent.Executor;
11 import java.util.concurrent.ThreadLocalRandom;
12 import java.util.concurrent.ExecutionException;
13 import java.util.concurrent.TimeoutException;
14 import java.util.concurrent.CancellationException;
15 import java.util.concurrent.atomic.AtomicInteger;
16 import java.util.concurrent.locks.LockSupport;
17
18 /**
19 * A {@link Future} that may be explicitly completed (setting its
20 * value and status), and may include dependent functions and actions
21 * that trigger upon its completion. Methods are available for adding
22 * those based on Functions, Blocks, and Runnables, depending on
23 * whether they require arguments and/or produce results, as well as
24 * those triggered after either or both the current and another
25 * CompletableFuture complete. Functions and actions supplied for
26 * dependent completions (mainly using methods with prefix {@code
27 * then}) may be performed by the thread that completes the current
28 * CompletableFuture, or by any other caller of these methods. There
29 * are no guarantees about the order of processing completions unless
30 * constrained by these methods.
31 *
32 * <p>When two or more threads attempt to {@link #complete} or {@link
33 * #completeExceptionally} a CompletableFuture, only one of them
34 * succeeds.
35 *
36 * <p>Upon exceptional completion, or when a completion entails
37 * computation of a function or action, and it terminates abruptly
38 * with an (unchecked) exception or error, then further completions
39 * act as {@code completeExceptionally} with a {@link
40 * CompletionException} holding that exception as its cause. If a
41 * CompletableFuture completes exceptionally, and is not followed by a
42 * {@link #exceptionally} or {@link #handle} completion, then all of
43 * its dependents (and their dependents) also complete exceptionally
44 * with CompletionExceptions holding the ultimate cause. In case of a
45 * CompletionException, methods {@link #get()} and {@link #get(long,
46 * TimeUnit)} throw an {@link ExecutionException} with the same cause
47 * as would be held in the corresponding CompletionException. However,
48 * in these cases, methods {@link #join()} and {@link #getNow} throw
49 * the CompletionException, which simplifies usage especially within
50 * other completion functions.
51 *
52 * <p>CompletableFutures themselves do not execute asynchronously.
53 * However, the {@code async} methods provide commonly useful ways to
54 * commence asynchronous processing, using either a given {@link
55 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
56 * function or action that will result in the completion of a new
57 * CompletableFuture. To simplify monitoring, debugging, and tracking,
58 * all generated asynchronous tasks are instances of the tagging
59 * interface {@link AsynchronousCompletionTask}.
60 *
61 * <p><em>jsr166e note: During transition, this class
62 * uses nested functional interfaces with different names but the
63 * same forms as those expected for JDK8.</em>
64 *
65 * @author Doug Lea
66 * @since 1.8
67 */
68 public class CompletableFuture<T> implements Future<T> {
69 // jsr166e nested interfaces
70
71 /** Interface describing a void action of one argument */
72 public interface Action<A> { void accept(A a); }
73 /** Interface describing a void action of two arguments */
74 public interface BiAction<A,B> { void accept(A a, B b); }
75 /** Interface describing a function of one argument */
76 public interface Fun<A,T> { T apply(A a); }
77 /** Interface describing a function of two arguments */
78 public interface BiFun<A,B,T> { T apply(A a, B b); }
79 /** Interface describing a function of no arguments */
80 public interface Generator<T> { T get(); }
81
82
83 /*
84 * Overview:
85 *
86 * 1. Non-nullness of field result (set via CAS) indicates done.
87 * An AltResult is used to box null as a result, as well as to
88 * hold exceptions. Using a single field makes completion fast
89 * and simple to detect and trigger, at the expense of a lot of
90 * encoding and decoding that infiltrates many methods. One minor
91 * simplification relies on the (static) NIL (to box null results)
92 * being the only AltResult with a null exception field, so we
93 * don't usually need explicit comparisons with NIL. The CF
94 * exception propagation mechanics surrounding decoding rely on
95 * unchecked casts of decoded results really being unchecked,
96 * where user type errors are caught at point of use, as is
97 * currently the case in Java. These are highlighted by using
98 * SuppressWarnings-annotated temporaries.
99 *
100 * 2. Waiters are held in a Treiber stack similar to the one used
101 * in FutureTask, Phaser, and SynchronousQueue. See their
102 * internal documentation for algorithmic details.
103 *
104 * 3. Completions are also kept in a list/stack, and pulled off
105 * and run when completion is triggered. (We could even use the
106 * same stack as for waiters, but would give up the potential
107 * parallelism obtained because woken waiters help release/run
108 * others -- see method postComplete). Because post-processing
109 * may race with direct calls, class Completion opportunistically
110 * extends AtomicInteger so callers can claim the action via
111 * compareAndSet(0, 1). The Completion.run methods are all
112 * written a boringly similar uniform way (that sometimes includes
113 * unnecessary-looking checks, kept to maintain uniformity). There
114 * are enough dimensions upon which they differ that factoring to
115 * use common code isn't worthwhile.
116 *
117 * 4. The exported then/and/or methods do support a bit of
118 * factoring (see doThenApply etc). They must cope with the
119 * intrinsic races surrounding addition of a dependent action
120 * versus performing the action directly because the task is
121 * already complete. For example, a CF may not be complete upon
122 * entry, so a dependent completion is added, but by the time it
123 * is added, the target CF is complete, so must be directly
124 * executed. This is all done while avoiding unnecessary object
125 * construction in safe-bypass cases.
126 */
127
128 // preliminaries
129
130 static final class AltResult {
131 final Throwable ex; // null only for NIL
132 AltResult(Throwable ex) { this.ex = ex; }
133 }
134
135 static final AltResult NIL = new AltResult(null);
136
137 // Fields
138
139 volatile Object result; // Either the result or boxed AltResult
140 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
141 volatile CompletionNode completions; // list (Treiber stack) of completions
142
143 // Basic utilities for triggering and processing completions
144
145 /**
146 * Removes and signals all waiting threads and runs all completions.
147 */
148 final void postComplete() {
149 WaitNode q; Thread t;
150 while ((q = waiters) != null) {
151 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
152 (t = q.thread) != null) {
153 q.thread = null;
154 LockSupport.unpark(t);
155 }
156 }
157
158 CompletionNode h; Completion c;
159 while ((h = completions) != null) {
160 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
161 (c = h.completion) != null)
162 c.run();
163 }
164 }
165
166 /**
167 * Triggers completion with the encoding of the given arguments:
168 * if the exception is non-null, encodes it as a wrapped
169 * CompletionException unless it is one already. Otherwise uses
170 * the given result, boxed as NIL if null.
171 */
172 final void internalComplete(Object v, Throwable ex) {
173 if (result == null)
174 UNSAFE.compareAndSwapObject
175 (this, RESULT, null,
176 (ex == null) ? (v == null) ? NIL : v :
177 new AltResult((ex instanceof CompletionException) ? ex :
178 new CompletionException(ex)));
179 postComplete(); // help out even if not triggered
180 }
181
182 /**
183 * If triggered, helps release and/or process completions.
184 */
185 final void helpPostComplete() {
186 if (result != null)
187 postComplete();
188 }
189
190 /* ------------- waiting for completions -------------- */
191
192 /**
193 * Heuristic spin value for waitingGet() before blocking on
194 * multiprocessors
195 */
196 static final int WAITING_GET_SPINS = 256;
197
198 /**
199 * Linked nodes to record waiting threads in a Treiber stack. See
200 * other classes such as Phaser and SynchronousQueue for more
201 * detailed explanation. This class implements ManagedBlocker to
202 * avoid starvation when blocking actions pile up in
203 * ForkJoinPools.
204 */
205 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
206 long nanos; // wait time if timed
207 final long deadline; // non-zero if timed
208 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
209 volatile Thread thread;
210 volatile WaitNode next;
211 WaitNode(boolean interruptible, long nanos, long deadline) {
212 this.thread = Thread.currentThread();
213 this.interruptControl = interruptible ? 1 : 0;
214 this.nanos = nanos;
215 this.deadline = deadline;
216 }
217 public boolean isReleasable() {
218 if (thread == null)
219 return true;
220 if (Thread.interrupted()) {
221 int i = interruptControl;
222 interruptControl = -1;
223 if (i > 0)
224 return true;
225 }
226 if (deadline != 0L &&
227 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
228 thread = null;
229 return true;
230 }
231 return false;
232 }
233 public boolean block() {
234 if (isReleasable())
235 return true;
236 else if (deadline == 0L)
237 LockSupport.park(this);
238 else if (nanos > 0L)
239 LockSupport.parkNanos(this, nanos);
240 return isReleasable();
241 }
242 }
243
244 /**
245 * Returns raw result after waiting, or null if interruptible and
246 * interrupted.
247 */
248 private Object waitingGet(boolean interruptible) {
249 WaitNode q = null;
250 boolean queued = false;
251 int h = 0, spins = 0;
252 for (Object r;;) {
253 if ((r = result) != null) {
254 if (q != null) { // suppress unpark
255 q.thread = null;
256 if (q.interruptControl < 0) {
257 if (interruptible) {
258 removeWaiter(q);
259 return null;
260 }
261 Thread.currentThread().interrupt();
262 }
263 }
264 postComplete(); // help release others
265 return r;
266 }
267 else if (h == 0) {
268 h = ThreadLocalRandom.current().nextInt();
269 if (Runtime.getRuntime().availableProcessors() > 1)
270 spins = WAITING_GET_SPINS;
271 }
272 else if (spins > 0) {
273 h ^= h << 1; // xorshift
274 h ^= h >>> 3;
275 if ((h ^= h << 10) >= 0)
276 --spins;
277 }
278 else if (q == null)
279 q = new WaitNode(interruptible, 0L, 0L);
280 else if (!queued)
281 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
282 q.next = waiters, q);
283 else if (interruptible && q.interruptControl < 0) {
284 removeWaiter(q);
285 return null;
286 }
287 else if (q.thread != null && result == null) {
288 try {
289 ForkJoinPool.managedBlock(q);
290 } catch (InterruptedException ex) {
291 q.interruptControl = -1;
292 }
293 }
294 }
295 }
296
297 /**
298 * Awaits completion or aborts on interrupt or timeout.
299 *
300 * @param nanos time to wait
301 * @return raw result
302 */
303 private Object timedAwaitDone(long nanos)
304 throws InterruptedException, TimeoutException {
305 WaitNode q = null;
306 boolean queued = false;
307 for (Object r;;) {
308 if ((r = result) != null) {
309 if (q != null) {
310 q.thread = null;
311 if (q.interruptControl < 0) {
312 removeWaiter(q);
313 throw new InterruptedException();
314 }
315 }
316 postComplete();
317 return r;
318 }
319 else if (q == null) {
320 if (nanos <= 0L)
321 throw new TimeoutException();
322 long d = System.nanoTime() + nanos;
323 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
324 }
325 else if (!queued)
326 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
327 q.next = waiters, q);
328 else if (q.interruptControl < 0) {
329 removeWaiter(q);
330 throw new InterruptedException();
331 }
332 else if (q.nanos <= 0L) {
333 if (result == null) {
334 removeWaiter(q);
335 throw new TimeoutException();
336 }
337 }
338 else if (q.thread != null && result == null) {
339 try {
340 ForkJoinPool.managedBlock(q);
341 } catch (InterruptedException ex) {
342 q.interruptControl = -1;
343 }
344 }
345 }
346 }
347
348 /**
349 * Tries to unlink a timed-out or interrupted wait node to avoid
350 * accumulating garbage. Internal nodes are simply unspliced
351 * without CAS since it is harmless if they are traversed anyway
352 * by releasers. To avoid effects of unsplicing from already
353 * removed nodes, the list is retraversed in case of an apparent
354 * race. This is slow when there are a lot of nodes, but we don't
355 * expect lists to be long enough to outweigh higher-overhead
356 * schemes.
357 */
358 private void removeWaiter(WaitNode node) {
359 if (node != null) {
360 node.thread = null;
361 retry:
362 for (;;) { // restart on removeWaiter race
363 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
364 s = q.next;
365 if (q.thread != null)
366 pred = q;
367 else if (pred != null) {
368 pred.next = s;
369 if (pred.thread == null) // check for race
370 continue retry;
371 }
372 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
373 continue retry;
374 }
375 break;
376 }
377 }
378 }
379
380 /* ------------- Async tasks -------------- */
381
382 /**
383 * A tagging interface identifying asynchronous tasks produced by
384 * {@code async} methods. This may be useful for monitoring,
385 * debugging, and tracking asynchronous activities.
386 */
387 public static interface AsynchronousCompletionTask {
388 }
389
390 /** Base class can act as either FJ or plain Runnable */
391 abstract static class Async extends ForkJoinTask<Void>
392 implements Runnable, AsynchronousCompletionTask {
393 public final Void getRawResult() { return null; }
394 public final void setRawResult(Void v) { }
395 public final void run() { exec(); }
396 }
397
398 static final class AsyncRun extends Async {
399 final Runnable fn;
400 final CompletableFuture<Void> dst;
401 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
402 this.fn = fn; this.dst = dst;
403 }
404 public final boolean exec() {
405 CompletableFuture<Void> d; Throwable ex;
406 if ((d = this.dst) != null && d.result == null) {
407 try {
408 fn.run();
409 ex = null;
410 } catch (Throwable rex) {
411 ex = rex;
412 }
413 d.internalComplete(null, ex);
414 }
415 return true;
416 }
417 private static final long serialVersionUID = 5232453952276885070L;
418 }
419
420 static final class AsyncSupply<U> extends Async {
421 final Generator<U> fn;
422 final CompletableFuture<U> dst;
423 AsyncSupply(Generator<U> fn, CompletableFuture<U> dst) {
424 this.fn = fn; this.dst = dst;
425 }
426 public final boolean exec() {
427 CompletableFuture<U> d; U u; Throwable ex;
428 if ((d = this.dst) != null && d.result == null) {
429 try {
430 u = fn.get();
431 ex = null;
432 } catch (Throwable rex) {
433 ex = rex;
434 u = null;
435 }
436 d.internalComplete(u, ex);
437 }
438 return true;
439 }
440 private static final long serialVersionUID = 5232453952276885070L;
441 }
442
443 static final class AsyncApply<T,U> extends Async {
444 final Fun<? super T,? extends U> fn;
445 final T arg;
446 final CompletableFuture<U> dst;
447 AsyncApply(T arg, Fun<? super T,? extends U> fn,
448 CompletableFuture<U> dst) {
449 this.arg = arg; this.fn = fn; this.dst = dst;
450 }
451 public final boolean exec() {
452 CompletableFuture<U> d; U u; Throwable ex;
453 if ((d = this.dst) != null && d.result == null) {
454 try {
455 u = fn.apply(arg);
456 ex = null;
457 } catch (Throwable rex) {
458 ex = rex;
459 u = null;
460 }
461 d.internalComplete(u, ex);
462 }
463 return true;
464 }
465 private static final long serialVersionUID = 5232453952276885070L;
466 }
467
468 static final class AsyncBiApply<T,U,V> extends Async {
469 final BiFun<? super T,? super U,? extends V> fn;
470 final T arg1;
471 final U arg2;
472 final CompletableFuture<V> dst;
473 AsyncBiApply(T arg1, U arg2,
474 BiFun<? super T,? super U,? extends V> fn,
475 CompletableFuture<V> dst) {
476 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
477 }
478 public final boolean exec() {
479 CompletableFuture<V> d; V v; Throwable ex;
480 if ((d = this.dst) != null && d.result == null) {
481 try {
482 v = fn.apply(arg1, arg2);
483 ex = null;
484 } catch (Throwable rex) {
485 ex = rex;
486 v = null;
487 }
488 d.internalComplete(v, ex);
489 }
490 return true;
491 }
492 private static final long serialVersionUID = 5232453952276885070L;
493 }
494
495 static final class AsyncAccept<T> extends Async {
496 final Action<? super T> fn;
497 final T arg;
498 final CompletableFuture<Void> dst;
499 AsyncAccept(T arg, Action<? super T> fn,
500 CompletableFuture<Void> dst) {
501 this.arg = arg; this.fn = fn; this.dst = dst;
502 }
503 public final boolean exec() {
504 CompletableFuture<Void> d; Throwable ex;
505 if ((d = this.dst) != null && d.result == null) {
506 try {
507 fn.accept(arg);
508 ex = null;
509 } catch (Throwable rex) {
510 ex = rex;
511 }
512 d.internalComplete(null, ex);
513 }
514 return true;
515 }
516 private static final long serialVersionUID = 5232453952276885070L;
517 }
518
519 static final class AsyncBiAccept<T,U> extends Async {
520 final BiAction<? super T,? super U> fn;
521 final T arg1;
522 final U arg2;
523 final CompletableFuture<Void> dst;
524 AsyncBiAccept(T arg1, U arg2,
525 BiAction<? super T,? super U> fn,
526 CompletableFuture<Void> dst) {
527 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
528 }
529 public final boolean exec() {
530 CompletableFuture<Void> d; Throwable ex;
531 if ((d = this.dst) != null && d.result == null) {
532 try {
533 fn.accept(arg1, arg2);
534 ex = null;
535 } catch (Throwable rex) {
536 ex = rex;
537 }
538 d.internalComplete(null, ex);
539 }
540 return true;
541 }
542 private static final long serialVersionUID = 5232453952276885070L;
543 }
544
545 /* ------------- Completions -------------- */
546
547 /**
548 * Simple linked list nodes to record completions, used in
549 * basically the same way as WaitNodes. (We separate nodes from
550 * the Completions themselves mainly because for the And and Or
551 * methods, the same Completion object resides in two lists.)
552 */
553 static final class CompletionNode {
554 final Completion completion;
555 volatile CompletionNode next;
556 CompletionNode(Completion completion) { this.completion = completion; }
557 }
558
559 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
560 abstract static class Completion extends AtomicInteger implements Runnable {
561 }
562
563 static final class ApplyCompletion<T,U> extends Completion {
564 final CompletableFuture<? extends T> src;
565 final Fun<? super T,? extends U> fn;
566 final CompletableFuture<U> dst;
567 final Executor executor;
568 ApplyCompletion(CompletableFuture<? extends T> src,
569 Fun<? super T,? extends U> fn,
570 CompletableFuture<U> dst, Executor executor) {
571 this.src = src; this.fn = fn; this.dst = dst;
572 this.executor = executor;
573 }
574 public final void run() {
575 final CompletableFuture<? extends T> a;
576 final Fun<? super T,? extends U> fn;
577 final CompletableFuture<U> dst;
578 Object r; T t; Throwable ex;
579 if ((dst = this.dst) != null &&
580 (fn = this.fn) != null &&
581 (a = this.src) != null &&
582 (r = a.result) != null &&
583 compareAndSet(0, 1)) {
584 if (r instanceof AltResult) {
585 ex = ((AltResult)r).ex;
586 t = null;
587 }
588 else {
589 ex = null;
590 @SuppressWarnings("unchecked") T tr = (T) r;
591 t = tr;
592 }
593 Executor e = executor;
594 U u = null;
595 if (ex == null) {
596 try {
597 if (e != null)
598 e.execute(new AsyncApply<T,U>(t, fn, dst));
599 else
600 u = fn.apply(t);
601 } catch (Throwable rex) {
602 ex = rex;
603 }
604 }
605 if (e == null || ex != null)
606 dst.internalComplete(u, ex);
607 }
608 }
609 private static final long serialVersionUID = 5232453952276885070L;
610 }
611
612 static final class AcceptCompletion<T> extends Completion {
613 final CompletableFuture<? extends T> src;
614 final Action<? super T> fn;
615 final CompletableFuture<Void> dst;
616 final Executor executor;
617 AcceptCompletion(CompletableFuture<? extends T> src,
618 Action<? super T> fn,
619 CompletableFuture<Void> dst, Executor executor) {
620 this.src = src; this.fn = fn; this.dst = dst;
621 this.executor = executor;
622 }
623 public final void run() {
624 final CompletableFuture<? extends T> a;
625 final Action<? super T> fn;
626 final CompletableFuture<Void> dst;
627 Object r; T t; Throwable ex;
628 if ((dst = this.dst) != null &&
629 (fn = this.fn) != null &&
630 (a = this.src) != null &&
631 (r = a.result) != null &&
632 compareAndSet(0, 1)) {
633 if (r instanceof AltResult) {
634 ex = ((AltResult)r).ex;
635 t = null;
636 }
637 else {
638 ex = null;
639 @SuppressWarnings("unchecked") T tr = (T) r;
640 t = tr;
641 }
642 Executor e = executor;
643 if (ex == null) {
644 try {
645 if (e != null)
646 e.execute(new AsyncAccept<T>(t, fn, dst));
647 else
648 fn.accept(t);
649 } catch (Throwable rex) {
650 ex = rex;
651 }
652 }
653 if (e == null || ex != null)
654 dst.internalComplete(null, ex);
655 }
656 }
657 private static final long serialVersionUID = 5232453952276885070L;
658 }
659
660 static final class RunCompletion<T> extends Completion {
661 final CompletableFuture<? extends T> src;
662 final Runnable fn;
663 final CompletableFuture<Void> dst;
664 final Executor executor;
665 RunCompletion(CompletableFuture<? extends T> src,
666 Runnable fn,
667 CompletableFuture<Void> dst,
668 Executor executor) {
669 this.src = src; this.fn = fn; this.dst = dst;
670 this.executor = executor;
671 }
672 public final void run() {
673 final CompletableFuture<? extends T> a;
674 final Runnable fn;
675 final CompletableFuture<Void> dst;
676 Object r; Throwable ex;
677 if ((dst = this.dst) != null &&
678 (fn = this.fn) != null &&
679 (a = this.src) != null &&
680 (r = a.result) != null &&
681 compareAndSet(0, 1)) {
682 if (r instanceof AltResult)
683 ex = ((AltResult)r).ex;
684 else
685 ex = null;
686 Executor e = executor;
687 if (ex == null) {
688 try {
689 if (e != null)
690 e.execute(new AsyncRun(fn, dst));
691 else
692 fn.run();
693 } catch (Throwable rex) {
694 ex = rex;
695 }
696 }
697 if (e == null || ex != null)
698 dst.internalComplete(null, ex);
699 }
700 }
701 private static final long serialVersionUID = 5232453952276885070L;
702 }
703
704 static final class BiApplyCompletion<T,U,V> extends Completion {
705 final CompletableFuture<? extends T> src;
706 final CompletableFuture<? extends U> snd;
707 final BiFun<? super T,? super U,? extends V> fn;
708 final CompletableFuture<V> dst;
709 final Executor executor;
710 BiApplyCompletion(CompletableFuture<? extends T> src,
711 CompletableFuture<? extends U> snd,
712 BiFun<? super T,? super U,? extends V> fn,
713 CompletableFuture<V> dst, Executor executor) {
714 this.src = src; this.snd = snd;
715 this.fn = fn; this.dst = dst;
716 this.executor = executor;
717 }
718 public final void run() {
719 final CompletableFuture<? extends T> a;
720 final CompletableFuture<? extends U> b;
721 final BiFun<? super T,? super U,? extends V> fn;
722 final CompletableFuture<V> dst;
723 Object r, s; T t; U u; Throwable ex;
724 if ((dst = this.dst) != null &&
725 (fn = this.fn) != null &&
726 (a = this.src) != null &&
727 (r = a.result) != null &&
728 (b = this.snd) != null &&
729 (s = b.result) != null &&
730 compareAndSet(0, 1)) {
731 if (r instanceof AltResult) {
732 ex = ((AltResult)r).ex;
733 t = null;
734 }
735 else {
736 ex = null;
737 @SuppressWarnings("unchecked") T tr = (T) r;
738 t = tr;
739 }
740 if (ex != null)
741 u = null;
742 else if (s instanceof AltResult) {
743 ex = ((AltResult)s).ex;
744 u = null;
745 }
746 else {
747 @SuppressWarnings("unchecked") U us = (U) s;
748 u = us;
749 }
750 Executor e = executor;
751 V v = null;
752 if (ex == null) {
753 try {
754 if (e != null)
755 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
756 else
757 v = fn.apply(t, u);
758 } catch (Throwable rex) {
759 ex = rex;
760 }
761 }
762 if (e == null || ex != null)
763 dst.internalComplete(v, ex);
764 }
765 }
766 private static final long serialVersionUID = 5232453952276885070L;
767 }
768
769 static final class BiAcceptCompletion<T,U> extends Completion {
770 final CompletableFuture<? extends T> src;
771 final CompletableFuture<? extends U> snd;
772 final BiAction<? super T,? super U> fn;
773 final CompletableFuture<Void> dst;
774 final Executor executor;
775 BiAcceptCompletion(CompletableFuture<? extends T> src,
776 CompletableFuture<? extends U> snd,
777 BiAction<? super T,? super U> fn,
778 CompletableFuture<Void> dst, Executor executor) {
779 this.src = src; this.snd = snd;
780 this.fn = fn; this.dst = dst;
781 this.executor = executor;
782 }
783 public final void run() {
784 final CompletableFuture<? extends T> a;
785 final CompletableFuture<? extends U> b;
786 final BiAction<? super T,? super U> fn;
787 final CompletableFuture<Void> dst;
788 Object r, s; T t; U u; Throwable ex;
789 if ((dst = this.dst) != null &&
790 (fn = this.fn) != null &&
791 (a = this.src) != null &&
792 (r = a.result) != null &&
793 (b = this.snd) != null &&
794 (s = b.result) != null &&
795 compareAndSet(0, 1)) {
796 if (r instanceof AltResult) {
797 ex = ((AltResult)r).ex;
798 t = null;
799 }
800 else {
801 ex = null;
802 @SuppressWarnings("unchecked") T tr = (T) r;
803 t = tr;
804 }
805 if (ex != null)
806 u = null;
807 else if (s instanceof AltResult) {
808 ex = ((AltResult)s).ex;
809 u = null;
810 }
811 else {
812 @SuppressWarnings("unchecked") U us = (U) s;
813 u = us;
814 }
815 Executor e = executor;
816 if (ex == null) {
817 try {
818 if (e != null)
819 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
820 else
821 fn.accept(t, u);
822 } catch (Throwable rex) {
823 ex = rex;
824 }
825 }
826 if (e == null || ex != null)
827 dst.internalComplete(null, ex);
828 }
829 }
830 private static final long serialVersionUID = 5232453952276885070L;
831 }
832
833 static final class BiRunCompletion<T> extends Completion {
834 final CompletableFuture<? extends T> src;
835 final CompletableFuture<?> snd;
836 final Runnable fn;
837 final CompletableFuture<Void> dst;
838 final Executor executor;
839 BiRunCompletion(CompletableFuture<? extends T> src,
840 CompletableFuture<?> snd,
841 Runnable fn,
842 CompletableFuture<Void> dst, Executor executor) {
843 this.src = src; this.snd = snd;
844 this.fn = fn; this.dst = dst;
845 this.executor = executor;
846 }
847 public final void run() {
848 final CompletableFuture<? extends T> a;
849 final CompletableFuture<?> b;
850 final Runnable fn;
851 final CompletableFuture<Void> dst;
852 Object r, s; Throwable ex;
853 if ((dst = this.dst) != null &&
854 (fn = this.fn) != null &&
855 (a = this.src) != null &&
856 (r = a.result) != null &&
857 (b = this.snd) != null &&
858 (s = b.result) != null &&
859 compareAndSet(0, 1)) {
860 if (r instanceof AltResult)
861 ex = ((AltResult)r).ex;
862 else
863 ex = null;
864 if (ex == null && (s instanceof AltResult))
865 ex = ((AltResult)s).ex;
866 Executor e = executor;
867 if (ex == null) {
868 try {
869 if (e != null)
870 e.execute(new AsyncRun(fn, dst));
871 else
872 fn.run();
873 } catch (Throwable rex) {
874 ex = rex;
875 }
876 }
877 if (e == null || ex != null)
878 dst.internalComplete(null, ex);
879 }
880 }
881 private static final long serialVersionUID = 5232453952276885070L;
882 }
883
884 static final class OrApplyCompletion<T,U> extends Completion {
885 final CompletableFuture<? extends T> src;
886 final CompletableFuture<? extends T> snd;
887 final Fun<? super T,? extends U> fn;
888 final CompletableFuture<U> dst;
889 final Executor executor;
890 OrApplyCompletion(CompletableFuture<? extends T> src,
891 CompletableFuture<? extends T> snd,
892 Fun<? super T,? extends U> fn,
893 CompletableFuture<U> dst, Executor executor) {
894 this.src = src; this.snd = snd;
895 this.fn = fn; this.dst = dst;
896 this.executor = executor;
897 }
898 public final void run() {
899 final CompletableFuture<? extends T> a;
900 final CompletableFuture<? extends T> b;
901 final Fun<? super T,? extends U> fn;
902 final CompletableFuture<U> dst;
903 Object r; T t; Throwable ex;
904 if ((dst = this.dst) != null &&
905 (fn = this.fn) != null &&
906 (((a = this.src) != null && (r = a.result) != null) ||
907 ((b = this.snd) != null && (r = b.result) != null)) &&
908 compareAndSet(0, 1)) {
909 if (r instanceof AltResult) {
910 ex = ((AltResult)r).ex;
911 t = null;
912 }
913 else {
914 ex = null;
915 @SuppressWarnings("unchecked") T tr = (T) r;
916 t = tr;
917 }
918 Executor e = executor;
919 U u = null;
920 if (ex == null) {
921 try {
922 if (e != null)
923 e.execute(new AsyncApply<T,U>(t, fn, dst));
924 else
925 u = fn.apply(t);
926 } catch (Throwable rex) {
927 ex = rex;
928 }
929 }
930 if (e == null || ex != null)
931 dst.internalComplete(u, ex);
932 }
933 }
934 private static final long serialVersionUID = 5232453952276885070L;
935 }
936
937 static final class OrAcceptCompletion<T> extends Completion {
938 final CompletableFuture<? extends T> src;
939 final CompletableFuture<? extends T> snd;
940 final Action<? super T> fn;
941 final CompletableFuture<Void> dst;
942 final Executor executor;
943 OrAcceptCompletion(CompletableFuture<? extends T> src,
944 CompletableFuture<? extends T> snd,
945 Action<? super T> fn,
946 CompletableFuture<Void> dst, Executor executor) {
947 this.src = src; this.snd = snd;
948 this.fn = fn; this.dst = dst;
949 this.executor = executor;
950 }
951 public final void run() {
952 final CompletableFuture<? extends T> a;
953 final CompletableFuture<? extends T> b;
954 final Action<? super T> fn;
955 final CompletableFuture<Void> dst;
956 Object r; T t; Throwable ex;
957 if ((dst = this.dst) != null &&
958 (fn = this.fn) != null &&
959 (((a = this.src) != null && (r = a.result) != null) ||
960 ((b = this.snd) != null && (r = b.result) != null)) &&
961 compareAndSet(0, 1)) {
962 if (r instanceof AltResult) {
963 ex = ((AltResult)r).ex;
964 t = null;
965 }
966 else {
967 ex = null;
968 @SuppressWarnings("unchecked") T tr = (T) r;
969 t = tr;
970 }
971 Executor e = executor;
972 if (ex == null) {
973 try {
974 if (e != null)
975 e.execute(new AsyncAccept<T>(t, fn, dst));
976 else
977 fn.accept(t);
978 } catch (Throwable rex) {
979 ex = rex;
980 }
981 }
982 if (e == null || ex != null)
983 dst.internalComplete(null, ex);
984 }
985 }
986 private static final long serialVersionUID = 5232453952276885070L;
987 }
988
989 static final class OrRunCompletion<T> extends Completion {
990 final CompletableFuture<? extends T> src;
991 final CompletableFuture<?> snd;
992 final Runnable fn;
993 final CompletableFuture<Void> dst;
994 final Executor executor;
995 OrRunCompletion(CompletableFuture<? extends T> src,
996 CompletableFuture<?> snd,
997 Runnable fn,
998 CompletableFuture<Void> dst, Executor executor) {
999 this.src = src; this.snd = snd;
1000 this.fn = fn; this.dst = dst;
1001 this.executor = executor;
1002 }
1003 public final void run() {
1004 final CompletableFuture<? extends T> a;
1005 final CompletableFuture<?> b;
1006 final Runnable fn;
1007 final CompletableFuture<Void> dst;
1008 Object r; Throwable ex;
1009 if ((dst = this.dst) != null &&
1010 (fn = this.fn) != null &&
1011 (((a = this.src) != null && (r = a.result) != null) ||
1012 ((b = this.snd) != null && (r = b.result) != null)) &&
1013 compareAndSet(0, 1)) {
1014 if (r instanceof AltResult)
1015 ex = ((AltResult)r).ex;
1016 else
1017 ex = null;
1018 Executor e = executor;
1019 if (ex == null) {
1020 try {
1021 if (e != null)
1022 e.execute(new AsyncRun(fn, dst));
1023 else
1024 fn.run();
1025 } catch (Throwable rex) {
1026 ex = rex;
1027 }
1028 }
1029 if (e == null || ex != null)
1030 dst.internalComplete(null, ex);
1031 }
1032 }
1033 private static final long serialVersionUID = 5232453952276885070L;
1034 }
1035
1036 static final class ExceptionCompletion<T> extends Completion {
1037 final CompletableFuture<? extends T> src;
1038 final Fun<? super Throwable, ? extends T> fn;
1039 final CompletableFuture<T> dst;
1040 ExceptionCompletion(CompletableFuture<? extends T> src,
1041 Fun<? super Throwable, ? extends T> fn,
1042 CompletableFuture<T> dst) {
1043 this.src = src; this.fn = fn; this.dst = dst;
1044 }
1045 public final void run() {
1046 final CompletableFuture<? extends T> a;
1047 final Fun<? super Throwable, ? extends T> fn;
1048 final CompletableFuture<T> dst;
1049 Object r; T t = null; Throwable ex, dx = null;
1050 if ((dst = this.dst) != null &&
1051 (fn = this.fn) != null &&
1052 (a = this.src) != null &&
1053 (r = a.result) != null &&
1054 compareAndSet(0, 1)) {
1055 if ((r instanceof AltResult) &&
1056 (ex = ((AltResult)r).ex) != null) {
1057 try {
1058 t = fn.apply(ex);
1059 } catch (Throwable rex) {
1060 dx = rex;
1061 }
1062 }
1063 else {
1064 @SuppressWarnings("unchecked") T tr = (T) r;
1065 t = tr;
1066 }
1067 dst.internalComplete(t, dx);
1068 }
1069 }
1070 private static final long serialVersionUID = 5232453952276885070L;
1071 }
1072
1073 static final class ThenCopy<T> extends Completion {
1074 final CompletableFuture<? extends T> src;
1075 final CompletableFuture<T> dst;
1076 ThenCopy(CompletableFuture<? extends T> src,
1077 CompletableFuture<T> dst) {
1078 this.src = src; this.dst = dst;
1079 }
1080 public final void run() {
1081 final CompletableFuture<? extends T> a;
1082 final CompletableFuture<T> dst;
1083 Object r; Object t; Throwable ex;
1084 if ((dst = this.dst) != null &&
1085 (a = this.src) != null &&
1086 (r = a.result) != null &&
1087 compareAndSet(0, 1)) {
1088 if (r instanceof AltResult) {
1089 ex = ((AltResult)r).ex;
1090 t = null;
1091 }
1092 else {
1093 ex = null;
1094 t = r;
1095 }
1096 dst.internalComplete(t, ex);
1097 }
1098 }
1099 private static final long serialVersionUID = 5232453952276885070L;
1100 }
1101
1102 static final class HandleCompletion<T,U> extends Completion {
1103 final CompletableFuture<? extends T> src;
1104 final BiFun<? super T, Throwable, ? extends U> fn;
1105 final CompletableFuture<U> dst;
1106 HandleCompletion(CompletableFuture<? extends T> src,
1107 BiFun<? super T, Throwable, ? extends U> fn,
1108 final CompletableFuture<U> dst) {
1109 this.src = src; this.fn = fn; this.dst = dst;
1110 }
1111 public final void run() {
1112 final CompletableFuture<? extends T> a;
1113 final BiFun<? super T, Throwable, ? extends U> fn;
1114 final CompletableFuture<U> dst;
1115 Object r; T t; Throwable ex;
1116 if ((dst = this.dst) != null &&
1117 (fn = this.fn) != null &&
1118 (a = this.src) != null &&
1119 (r = a.result) != null &&
1120 compareAndSet(0, 1)) {
1121 if (r instanceof AltResult) {
1122 ex = ((AltResult)r).ex;
1123 t = null;
1124 }
1125 else {
1126 ex = null;
1127 @SuppressWarnings("unchecked") T tr = (T) r;
1128 t = tr;
1129 }
1130 U u = null; Throwable dx = null;
1131 try {
1132 u = fn.apply(t, ex);
1133 } catch (Throwable rex) {
1134 dx = rex;
1135 }
1136 dst.internalComplete(u, dx);
1137 }
1138 }
1139 private static final long serialVersionUID = 5232453952276885070L;
1140 }
1141
1142 static final class ComposeCompletion<T,U> extends Completion {
1143 final CompletableFuture<? extends T> src;
1144 final Fun<? super T, CompletableFuture<U>> fn;
1145 final CompletableFuture<U> dst;
1146 ComposeCompletion(CompletableFuture<? extends T> src,
1147 Fun<? super T, CompletableFuture<U>> fn,
1148 final CompletableFuture<U> dst) {
1149 this.src = src; this.fn = fn; this.dst = dst;
1150 }
1151 public final void run() {
1152 final CompletableFuture<? extends T> a;
1153 final Fun<? super T, CompletableFuture<U>> fn;
1154 final CompletableFuture<U> dst;
1155 Object r; T t; Throwable ex;
1156 if ((dst = this.dst) != null &&
1157 (fn = this.fn) != null &&
1158 (a = this.src) != null &&
1159 (r = a.result) != null &&
1160 compareAndSet(0, 1)) {
1161 if (r instanceof AltResult) {
1162 ex = ((AltResult)r).ex;
1163 t = null;
1164 }
1165 else {
1166 ex = null;
1167 @SuppressWarnings("unchecked") T tr = (T) r;
1168 t = tr;
1169 }
1170 CompletableFuture<U> c = null;
1171 U u = null;
1172 boolean complete = false;
1173 if (ex == null) {
1174 try {
1175 c = fn.apply(t);
1176 } catch (Throwable rex) {
1177 ex = rex;
1178 }
1179 }
1180 if (ex != null || c == null) {
1181 if (ex == null)
1182 ex = new NullPointerException();
1183 }
1184 else {
1185 ThenCopy<U> d = null;
1186 Object s;
1187 if ((s = c.result) == null) {
1188 CompletionNode p = new CompletionNode
1189 (d = new ThenCopy<U>(c, dst));
1190 while ((s = c.result) == null) {
1191 if (UNSAFE.compareAndSwapObject
1192 (c, COMPLETIONS, p.next = c.completions, p))
1193 break;
1194 }
1195 }
1196 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1197 complete = true;
1198 if (s instanceof AltResult) {
1199 ex = ((AltResult)s).ex; // no rewrap
1200 u = null;
1201 }
1202 else {
1203 @SuppressWarnings("unchecked") U us = (U) s;
1204 u = us;
1205 }
1206 }
1207 }
1208 if (complete || ex != null)
1209 dst.internalComplete(u, ex);
1210 if (c != null)
1211 c.helpPostComplete();
1212 }
1213 }
1214 private static final long serialVersionUID = 5232453952276885070L;
1215 }
1216
1217 // public methods
1218
1219 /**
1220 * Creates a new incomplete CompletableFuture.
1221 */
1222 public CompletableFuture() {
1223 }
1224
1225 /**
1226 * Asynchronously executes in the {@link
1227 * ForkJoinPool#commonPool()}, a task that completes the returned
1228 * CompletableFuture with the result of the given Supplier.
1229 *
1230 * @param supplier a function returning the value to be used
1231 * to complete the returned CompletableFuture
1232 * @return the CompletableFuture
1233 */
1234 public static <U> CompletableFuture<U> supplyAsync(Generator<U> supplier) {
1235 if (supplier == null) throw new NullPointerException();
1236 CompletableFuture<U> f = new CompletableFuture<U>();
1237 ForkJoinPool.commonPool().
1238 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1239 return f;
1240 }
1241
1242 /**
1243 * Asynchronously executes using the given executor, a task that
1244 * completes the returned CompletableFuture with the result of the
1245 * given Supplier.
1246 *
1247 * @param supplier a function returning the value to be used
1248 * to complete the returned CompletableFuture
1249 * @param executor the executor to use for asynchronous execution
1250 * @return the CompletableFuture
1251 */
1252 public static <U> CompletableFuture<U> supplyAsync(Generator<U> supplier,
1253 Executor executor) {
1254 if (executor == null || supplier == null)
1255 throw new NullPointerException();
1256 CompletableFuture<U> f = new CompletableFuture<U>();
1257 executor.execute(new AsyncSupply<U>(supplier, f));
1258 return f;
1259 }
1260
1261 /**
1262 * Asynchronously executes in the {@link
1263 * ForkJoinPool#commonPool()} a task that runs the given action,
1264 * and then completes the returned CompletableFuture.
1265 *
1266 * @param runnable the action to run before completing the
1267 * returned CompletableFuture
1268 * @return the CompletableFuture
1269 */
1270 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1271 if (runnable == null) throw new NullPointerException();
1272 CompletableFuture<Void> f = new CompletableFuture<Void>();
1273 ForkJoinPool.commonPool().
1274 execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1275 return f;
1276 }
1277
1278 /**
1279 * Asynchronously executes using the given executor, a task that
1280 * runs the given action, and then completes the returned
1281 * CompletableFuture.
1282 *
1283 * @param runnable the action to run before completing the
1284 * returned CompletableFuture
1285 * @param executor the executor to use for asynchronous execution
1286 * @return the CompletableFuture
1287 */
1288 public static CompletableFuture<Void> runAsync(Runnable runnable,
1289 Executor executor) {
1290 if (executor == null || runnable == null)
1291 throw new NullPointerException();
1292 CompletableFuture<Void> f = new CompletableFuture<Void>();
1293 executor.execute(new AsyncRun(runnable, f));
1294 return f;
1295 }
1296
1297 /**
1298 * Returns {@code true} if completed in any fashion: normally,
1299 * exceptionally, or via cancellation.
1300 *
1301 * @return {@code true} if completed
1302 */
1303 public boolean isDone() {
1304 return result != null;
1305 }
1306
1307 /**
1308 * Waits if necessary for the computation to complete, and then
1309 * retrieves its result.
1310 *
1311 * @return the computed result
1312 * @throws CancellationException if the computation was cancelled
1313 * @throws ExecutionException if the computation threw an
1314 * exception
1315 * @throws InterruptedException if the current thread was interrupted
1316 * while waiting
1317 */
1318 public T get() throws InterruptedException, ExecutionException {
1319 Object r; Throwable ex, cause;
1320 if ((r = result) == null && (r = waitingGet(true)) == null)
1321 throw new InterruptedException();
1322 if (r instanceof AltResult) {
1323 if ((ex = ((AltResult)r).ex) != null) {
1324 if (ex instanceof CancellationException)
1325 throw (CancellationException)ex;
1326 if ((ex instanceof CompletionException) &&
1327 (cause = ex.getCause()) != null)
1328 ex = cause;
1329 throw new ExecutionException(ex);
1330 }
1331 return null;
1332 }
1333 @SuppressWarnings("unchecked") T tr = (T) r;
1334 return tr;
1335 }
1336
1337 /**
1338 * Waits if necessary for at most the given time for completion,
1339 * and then retrieves its result, if available.
1340 *
1341 * @param timeout the maximum time to wait
1342 * @param unit the time unit of the timeout argument
1343 * @return the computed result
1344 * @throws CancellationException if the computation was cancelled
1345 * @throws ExecutionException if the computation threw an
1346 * exception
1347 * @throws InterruptedException if the current thread was interrupted
1348 * while waiting
1349 * @throws TimeoutException if the wait timed out
1350 */
1351 public T get(long timeout, TimeUnit unit)
1352 throws InterruptedException, ExecutionException, TimeoutException {
1353 Object r; Throwable ex, cause;
1354 long nanos = unit.toNanos(timeout);
1355 if (Thread.interrupted())
1356 throw new InterruptedException();
1357 if ((r = result) == null)
1358 r = timedAwaitDone(nanos);
1359 if (r instanceof AltResult) {
1360 if ((ex = ((AltResult)r).ex) != null) {
1361 if (ex instanceof CancellationException)
1362 throw (CancellationException)ex;
1363 if ((ex instanceof CompletionException) &&
1364 (cause = ex.getCause()) != null)
1365 ex = cause;
1366 throw new ExecutionException(ex);
1367 }
1368 return null;
1369 }
1370 @SuppressWarnings("unchecked") T tr = (T) r;
1371 return tr;
1372 }
1373
1374 /**
1375 * Returns the result value when complete, or throws an
1376 * (unchecked) exception if completed exceptionally. To better
1377 * conform with the use of common functional forms, if a
1378 * computation involved in the completion of this
1379 * CompletableFuture threw an exception, this method throws an
1380 * (unchecked) {@link CompletionException} with the underlying
1381 * exception as its cause.
1382 *
1383 * @return the result value
1384 * @throws CancellationException if the computation was cancelled
1385 * @throws CompletionException if a completion computation threw
1386 * an exception
1387 */
1388 public T join() {
1389 Object r; Throwable ex;
1390 if ((r = result) == null)
1391 r = waitingGet(false);
1392 if (r instanceof AltResult) {
1393 if ((ex = ((AltResult)r).ex) != null) {
1394 if (ex instanceof CancellationException)
1395 throw (CancellationException)ex;
1396 if (ex instanceof CompletionException)
1397 throw (CompletionException)ex;
1398 throw new CompletionException(ex);
1399 }
1400 return null;
1401 }
1402 @SuppressWarnings("unchecked") T tr = (T) r;
1403 return tr;
1404 }
1405
1406 /**
1407 * Returns the result value (or throws any encountered exception)
1408 * if completed, else returns the given valueIfAbsent.
1409 *
1410 * @param valueIfAbsent the value to return if not completed
1411 * @return the result value, if completed, else the given valueIfAbsent
1412 * @throws CancellationException if the computation was cancelled
1413 * @throws CompletionException if a completion computation threw
1414 * an exception
1415 */
1416 public T getNow(T valueIfAbsent) {
1417 Object r; Throwable ex;
1418 if ((r = result) == null)
1419 return valueIfAbsent;
1420 if (r instanceof AltResult) {
1421 if ((ex = ((AltResult)r).ex) != null) {
1422 if (ex instanceof CancellationException)
1423 throw (CancellationException)ex;
1424 if (ex instanceof CompletionException)
1425 throw (CompletionException)ex;
1426 throw new CompletionException(ex);
1427 }
1428 return null;
1429 }
1430 @SuppressWarnings("unchecked") T tr = (T) r;
1431 return tr;
1432 }
1433
1434 /**
1435 * If not already completed, sets the value returned by {@link
1436 * #get()} and related methods to the given value.
1437 *
1438 * @param value the result value
1439 * @return {@code true} if this invocation caused this CompletableFuture
1440 * to transition to a completed state, else {@code false}
1441 */
1442 public boolean complete(T value) {
1443 boolean triggered = result == null &&
1444 UNSAFE.compareAndSwapObject(this, RESULT, null,
1445 value == null ? NIL : value);
1446 postComplete();
1447 return triggered;
1448 }
1449
1450 /**
1451 * If not already completed, causes invocations of {@link #get()}
1452 * and related methods to throw the given exception.
1453 *
1454 * @param ex the exception
1455 * @return {@code true} if this invocation caused this CompletableFuture
1456 * to transition to a completed state, else {@code false}
1457 */
1458 public boolean completeExceptionally(Throwable ex) {
1459 if (ex == null) throw new NullPointerException();
1460 boolean triggered = result == null &&
1461 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1462 postComplete();
1463 return triggered;
1464 }
1465
1466 /**
1467 * Creates and returns a CompletableFuture that is completed with
1468 * the result of the given function of this CompletableFuture.
1469 * If this CompletableFuture completes exceptionally,
1470 * then the returned CompletableFuture also does so,
1471 * with a CompletionException holding this exception as
1472 * its cause.
1473 *
1474 * @param fn the function to use to compute the value of
1475 * the returned CompletableFuture
1476 * @return the new CompletableFuture
1477 */
1478 public <U> CompletableFuture<U> thenApply(Fun<? super T,? extends U> fn) {
1479 return doThenApply(fn, null);
1480 }
1481
1482 /**
1483 * Creates and returns a CompletableFuture that is asynchronously
1484 * completed using the {@link ForkJoinPool#commonPool()} with the
1485 * result of the given function of this CompletableFuture. If
1486 * this CompletableFuture completes exceptionally, then the
1487 * returned CompletableFuture also does so, with a
1488 * CompletionException holding this exception as its cause.
1489 *
1490 * @param fn the function to use to compute the value of
1491 * the returned CompletableFuture
1492 * @return the new CompletableFuture
1493 */
1494 public <U> CompletableFuture<U> thenApplyAsync(Fun<? super T,? extends U> fn) {
1495 return doThenApply(fn, ForkJoinPool.commonPool());
1496 }
1497
1498 /**
1499 * Creates and returns a CompletableFuture that is asynchronously
1500 * completed using the given executor with the result of the given
1501 * function of this CompletableFuture. If this CompletableFuture
1502 * completes exceptionally, then the returned CompletableFuture
1503 * also does so, with a CompletionException holding this exception as
1504 * its cause.
1505 *
1506 * @param fn the function to use to compute the value of
1507 * the returned CompletableFuture
1508 * @param executor the executor to use for asynchronous execution
1509 * @return the new CompletableFuture
1510 */
1511 public <U> CompletableFuture<U> thenApplyAsync(Fun<? super T,? extends U> fn,
1512 Executor executor) {
1513 if (executor == null) throw new NullPointerException();
1514 return doThenApply(fn, executor);
1515 }
1516
1517 private <U> CompletableFuture<U> doThenApply(Fun<? super T,? extends U> fn,
1518 Executor e) {
1519 if (fn == null) throw new NullPointerException();
1520 CompletableFuture<U> dst = new CompletableFuture<U>();
1521 ApplyCompletion<T,U> d = null;
1522 Object r;
1523 if ((r = result) == null) {
1524 CompletionNode p = new CompletionNode
1525 (d = new ApplyCompletion<T,U>(this, fn, dst, e));
1526 while ((r = result) == null) {
1527 if (UNSAFE.compareAndSwapObject
1528 (this, COMPLETIONS, p.next = completions, p))
1529 break;
1530 }
1531 }
1532 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1533 T t; Throwable ex;
1534 if (r instanceof AltResult) {
1535 ex = ((AltResult)r).ex;
1536 t = null;
1537 }
1538 else {
1539 ex = null;
1540 @SuppressWarnings("unchecked") T tr = (T) r;
1541 t = tr;
1542 }
1543 U u = null;
1544 if (ex == null) {
1545 try {
1546 if (e != null)
1547 e.execute(new AsyncApply<T,U>(t, fn, dst));
1548 else
1549 u = fn.apply(t);
1550 } catch (Throwable rex) {
1551 ex = rex;
1552 }
1553 }
1554 if (e == null || ex != null)
1555 dst.internalComplete(u, ex);
1556 }
1557 helpPostComplete();
1558 return dst;
1559 }
1560
1561 /**
1562 * Creates and returns a CompletableFuture that is completed after
1563 * performing the given action with this CompletableFuture's
1564 * result when it completes. If this CompletableFuture
1565 * completes exceptionally, then the returned CompletableFuture
1566 * also does so, with a CompletionException holding this exception as
1567 * its cause.
1568 *
1569 * @param block the action to perform before completing the
1570 * returned CompletableFuture
1571 * @return the new CompletableFuture
1572 */
1573 public CompletableFuture<Void> thenAccept(Action<? super T> block) {
1574 return doThenAccept(block, null);
1575 }
1576
1577 /**
1578 * Creates and returns a CompletableFuture that is asynchronously
1579 * completed using the {@link ForkJoinPool#commonPool()} with this
1580 * CompletableFuture's result when it completes. If this
1581 * CompletableFuture completes exceptionally, then the returned
1582 * CompletableFuture also does so, with a CompletionException holding
1583 * this exception as its cause.
1584 *
1585 * @param block the action to perform before completing the
1586 * returned CompletableFuture
1587 * @return the new CompletableFuture
1588 */
1589 public CompletableFuture<Void> thenAcceptAsync(Action<? super T> block) {
1590 return doThenAccept(block, ForkJoinPool.commonPool());
1591 }
1592
1593 /**
1594 * Creates and returns a CompletableFuture that is asynchronously
1595 * completed using the given executor with this
1596 * CompletableFuture's result when it completes. If this
1597 * CompletableFuture completes exceptionally, then the returned
1598 * CompletableFuture also does so, with a CompletionException holding
1599 * this exception as its cause.
1600 *
1601 * @param block the action to perform before completing the
1602 * returned CompletableFuture
1603 * @param executor the executor to use for asynchronous execution
1604 * @return the new CompletableFuture
1605 */
1606 public CompletableFuture<Void> thenAcceptAsync(Action<? super T> block,
1607 Executor executor) {
1608 if (executor == null) throw new NullPointerException();
1609 return doThenAccept(block, executor);
1610 }
1611
1612 private CompletableFuture<Void> doThenAccept(Action<? super T> fn,
1613 Executor e) {
1614 if (fn == null) throw new NullPointerException();
1615 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1616 AcceptCompletion<T> d = null;
1617 Object r;
1618 if ((r = result) == null) {
1619 CompletionNode p = new CompletionNode
1620 (d = new AcceptCompletion<T>(this, fn, dst, e));
1621 while ((r = result) == null) {
1622 if (UNSAFE.compareAndSwapObject
1623 (this, COMPLETIONS, p.next = completions, p))
1624 break;
1625 }
1626 }
1627 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1628 T t; Throwable ex;
1629 if (r instanceof AltResult) {
1630 ex = ((AltResult)r).ex;
1631 t = null;
1632 }
1633 else {
1634 ex = null;
1635 @SuppressWarnings("unchecked") T tr = (T) r;
1636 t = tr;
1637 }
1638 if (ex == null) {
1639 try {
1640 if (e != null)
1641 e.execute(new AsyncAccept<T>(t, fn, dst));
1642 else
1643 fn.accept(t);
1644 } catch (Throwable rex) {
1645 ex = rex;
1646 }
1647 }
1648 if (e == null || ex != null)
1649 dst.internalComplete(null, ex);
1650 }
1651 helpPostComplete();
1652 return dst;
1653 }
1654
1655 /**
1656 * Creates and returns a CompletableFuture that is completed after
1657 * performing the given action when this CompletableFuture
1658 * completes. If this CompletableFuture completes exceptionally,
1659 * then the returned CompletableFuture also does so, with a
1660 * CompletionException holding this exception as its cause.
1661 *
1662 * @param action the action to perform before completing the
1663 * returned CompletableFuture
1664 * @return the new CompletableFuture
1665 */
1666 public CompletableFuture<Void> thenRun(Runnable action) {
1667 return doThenRun(action, null);
1668 }
1669
1670 /**
1671 * Creates and returns a CompletableFuture that is asynchronously
1672 * completed using the {@link ForkJoinPool#commonPool()} after
1673 * performing the given action when this CompletableFuture
1674 * completes. If this CompletableFuture completes exceptionally,
1675 * then the returned CompletableFuture also does so, with a
1676 * CompletionException holding this exception as its cause.
1677 *
1678 * @param action the action to perform before completing the
1679 * returned CompletableFuture
1680 * @return the new CompletableFuture
1681 */
1682 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1683 return doThenRun(action, ForkJoinPool.commonPool());
1684 }
1685
1686 /**
1687 * Creates and returns a CompletableFuture that is asynchronously
1688 * completed using the given executor after performing the given
1689 * action when this CompletableFuture completes. If this
1690 * CompletableFuture completes exceptionally, then the returned
1691 * CompletableFuture also does so, with a CompletionException holding
1692 * this exception as its cause.
1693 *
1694 * @param action the action to perform before completing the
1695 * returned CompletableFuture
1696 * @param executor the executor to use for asynchronous execution
1697 * @return the new CompletableFuture
1698 */
1699 public CompletableFuture<Void> thenRunAsync(Runnable action,
1700 Executor executor) {
1701 if (executor == null) throw new NullPointerException();
1702 return doThenRun(action, executor);
1703 }
1704
1705 private CompletableFuture<Void> doThenRun(Runnable action,
1706 Executor e) {
1707 if (action == null) throw new NullPointerException();
1708 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1709 RunCompletion<T> d = null;
1710 Object r;
1711 if ((r = result) == null) {
1712 CompletionNode p = new CompletionNode
1713 (d = new RunCompletion<T>(this, action, dst, e));
1714 while ((r = result) == null) {
1715 if (UNSAFE.compareAndSwapObject
1716 (this, COMPLETIONS, p.next = completions, p))
1717 break;
1718 }
1719 }
1720 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1721 Throwable ex;
1722 if (r instanceof AltResult)
1723 ex = ((AltResult)r).ex;
1724 else
1725 ex = null;
1726 if (ex == null) {
1727 try {
1728 if (e != null)
1729 e.execute(new AsyncRun(action, dst));
1730 else
1731 action.run();
1732 } catch (Throwable rex) {
1733 ex = rex;
1734 }
1735 }
1736 if (e == null || ex != null)
1737 dst.internalComplete(null, ex);
1738 }
1739 helpPostComplete();
1740 return dst;
1741 }
1742
1743 /**
1744 * Creates and returns a CompletableFuture that is completed with
1745 * the result of the given function of this and the other given
1746 * CompletableFuture's results when both complete. If this or
1747 * the other CompletableFuture complete exceptionally, then the
1748 * returned CompletableFuture also does so, with a
1749 * CompletionException holding the exception as its cause.
1750 *
1751 * @param other the other CompletableFuture
1752 * @param fn the function to use to compute the value of
1753 * the returned CompletableFuture
1754 * @return the new CompletableFuture
1755 */
1756 public <U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other,
1757 BiFun<? super T,? super U,? extends V> fn) {
1758 return doThenBiApply(other, fn, null);
1759 }
1760
1761 /**
1762 * Creates and returns a CompletableFuture that is asynchronously
1763 * completed using the {@link ForkJoinPool#commonPool()} with
1764 * the result of the given function of this and the other given
1765 * CompletableFuture's results when both complete. If this or
1766 * the other CompletableFuture complete exceptionally, then the
1767 * returned CompletableFuture also does so, with a
1768 * CompletionException holding the exception as its cause.
1769 *
1770 * @param other the other CompletableFuture
1771 * @param fn the function to use to compute the value of
1772 * the returned CompletableFuture
1773 * @return the new CompletableFuture
1774 */
1775 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
1776 BiFun<? super T,? super U,? extends V> fn) {
1777 return doThenBiApply(other, fn, ForkJoinPool.commonPool());
1778 }
1779
1780 /**
1781 * Creates and returns a CompletableFuture that is
1782 * asynchronously completed using the given executor with the
1783 * result of the given function of this and the other given
1784 * CompletableFuture's results when both complete. If this or
1785 * the other CompletableFuture complete exceptionally, then the
1786 * returned CompletableFuture also does so, with a
1787 * CompletionException holding the exception as its cause.
1788 *
1789 * @param other the other CompletableFuture
1790 * @param fn the function to use to compute the value of
1791 * the returned CompletableFuture
1792 * @param executor the executor to use for asynchronous execution
1793 * @return the new CompletableFuture
1794 */
1795
1796 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
1797 BiFun<? super T,? super U,? extends V> fn,
1798 Executor executor) {
1799 if (executor == null) throw new NullPointerException();
1800 return doThenBiApply(other, fn, executor);
1801 }
1802
1803 private <U,V> CompletableFuture<V> doThenBiApply(CompletableFuture<? extends U> other,
1804 BiFun<? super T,? super U,? extends V> fn,
1805 Executor e) {
1806 if (other == null || fn == null) throw new NullPointerException();
1807 CompletableFuture<V> dst = new CompletableFuture<V>();
1808 BiApplyCompletion<T,U,V> d = null;
1809 Object r, s = null;
1810 if ((r = result) == null || (s = other.result) == null) {
1811 d = new BiApplyCompletion<T,U,V>(this, other, fn, dst, e);
1812 CompletionNode q = null, p = new CompletionNode(d);
1813 while ((r == null && (r = result) == null) ||
1814 (s == null && (s = other.result) == null)) {
1815 if (q != null) {
1816 if (s != null ||
1817 UNSAFE.compareAndSwapObject
1818 (other, COMPLETIONS, q.next = other.completions, q))
1819 break;
1820 }
1821 else if (r != null ||
1822 UNSAFE.compareAndSwapObject
1823 (this, COMPLETIONS, p.next = completions, p)) {
1824 if (s != null)
1825 break;
1826 q = new CompletionNode(d);
1827 }
1828 }
1829 }
1830 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1831 T t; U u; Throwable ex;
1832 if (r instanceof AltResult) {
1833 ex = ((AltResult)r).ex;
1834 t = null;
1835 }
1836 else {
1837 ex = null;
1838 @SuppressWarnings("unchecked") T tr = (T) r;
1839 t = tr;
1840 }
1841 if (ex != null)
1842 u = null;
1843 else if (s instanceof AltResult) {
1844 ex = ((AltResult)s).ex;
1845 u = null;
1846 }
1847 else {
1848 @SuppressWarnings("unchecked") U us = (U) s;
1849 u = us;
1850 }
1851 V v = null;
1852 if (ex == null) {
1853 try {
1854 if (e != null)
1855 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
1856 else
1857 v = fn.apply(t, u);
1858 } catch (Throwable rex) {
1859 ex = rex;
1860 }
1861 }
1862 if (e == null || ex != null)
1863 dst.internalComplete(v, ex);
1864 }
1865 helpPostComplete();
1866 other.helpPostComplete();
1867 return dst;
1868 }
1869
1870 /**
1871 * Creates and returns a CompletableFuture that is completed with
1872 * the results of this and the other given CompletableFuture if
1873 * both complete. If this and/or the other CompletableFuture
1874 * complete exceptionally, then the returned CompletableFuture
1875 * also does so, with a CompletionException holding one of these
1876 * exceptions as its cause.
1877 *
1878 * @param other the other CompletableFuture
1879 * @param block the action to perform before completing the
1880 * returned CompletableFuture
1881 * @return the new CompletableFuture
1882 */
1883 public <U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other,
1884 BiAction<? super T, ? super U> block) {
1885 return doThenBiAccept(other, block, null);
1886 }
1887
1888 /**
1889 * Creates and returns a CompletableFuture that is completed
1890 * asynchronously using the {@link ForkJoinPool#commonPool()} with
1891 * the results of this and the other given CompletableFuture when
1892 * both complete. If this and/or the other CompletableFuture
1893 * complete exceptionally, then the returned CompletableFuture
1894 * also does so, with a CompletionException holding one of these
1895 * exceptions as its cause.
1896 *
1897 * @param other the other CompletableFuture
1898 * @param block the action to perform before completing the
1899 * returned CompletableFuture
1900 * @return the new CompletableFuture
1901 */
1902 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
1903 BiAction<? super T, ? super U> block) {
1904 return doThenBiAccept(other, block, ForkJoinPool.commonPool());
1905 }
1906
1907 /**
1908 * Creates and returns a CompletableFuture that is completed
1909 * asynchronously using the given executor with the results of
1910 * this and the other given CompletableFuture when both complete.
1911 * If this and/or the other CompletableFuture complete
1912 * exceptionally, then the returned CompletableFuture also does
1913 * so, with a CompletionException holding one of these exceptions as
1914 * its cause.
1915 *
1916 * @param other the other CompletableFuture
1917 * @param block the action to perform before completing the
1918 * returned CompletableFuture
1919 * @param executor the executor to use for asynchronous execution
1920 * @return the new CompletableFuture
1921 */
1922 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
1923 BiAction<? super T, ? super U> block,
1924 Executor executor) {
1925 if (executor == null) throw new NullPointerException();
1926 return doThenBiAccept(other, block, executor);
1927 }
1928
1929 private <U> CompletableFuture<Void> doThenBiAccept(CompletableFuture<? extends U> other,
1930 BiAction<? super T,? super U> fn,
1931 Executor e) {
1932 if (other == null || fn == null) throw new NullPointerException();
1933 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1934 BiAcceptCompletion<T,U> d = null;
1935 Object r, s = null;
1936 if ((r = result) == null || (s = other.result) == null) {
1937 d = new BiAcceptCompletion<T,U>(this, other, fn, dst, e);
1938 CompletionNode q = null, p = new CompletionNode(d);
1939 while ((r == null && (r = result) == null) ||
1940 (s == null && (s = other.result) == null)) {
1941 if (q != null) {
1942 if (s != null ||
1943 UNSAFE.compareAndSwapObject
1944 (other, COMPLETIONS, q.next = other.completions, q))
1945 break;
1946 }
1947 else if (r != null ||
1948 UNSAFE.compareAndSwapObject
1949 (this, COMPLETIONS, p.next = completions, p)) {
1950 if (s != null)
1951 break;
1952 q = new CompletionNode(d);
1953 }
1954 }
1955 }
1956 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1957 T t; U u; Throwable ex;
1958 if (r instanceof AltResult) {
1959 ex = ((AltResult)r).ex;
1960 t = null;
1961 }
1962 else {
1963 ex = null;
1964 @SuppressWarnings("unchecked") T tr = (T) r;
1965 t = tr;
1966 }
1967 if (ex != null)
1968 u = null;
1969 else if (s instanceof AltResult) {
1970 ex = ((AltResult)s).ex;
1971 u = null;
1972 }
1973 else {
1974 @SuppressWarnings("unchecked") U us = (U) s;
1975 u = us;
1976 }
1977 if (ex == null) {
1978 try {
1979 if (e != null)
1980 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
1981 else
1982 fn.accept(t, u);
1983 } catch (Throwable rex) {
1984 ex = rex;
1985 }
1986 }
1987 if (e == null || ex != null)
1988 dst.internalComplete(null, ex);
1989 }
1990 helpPostComplete();
1991 other.helpPostComplete();
1992 return dst;
1993 }
1994
1995 /**
1996 * Creates and returns a CompletableFuture that is completed
1997 * when this and the other given CompletableFuture both
1998 * complete. If this and/or the other CompletableFuture complete
1999 * exceptionally, then the returned CompletableFuture also does
2000 * so, with a CompletionException holding one of these exceptions as
2001 * its cause.
2002 *
2003 * @param other the other CompletableFuture
2004 * @param action the action to perform before completing the
2005 * returned CompletableFuture
2006 * @return the new CompletableFuture
2007 */
2008 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2009 Runnable action) {
2010 return doThenBiRun(other, action, null);
2011 }
2012
2013 /**
2014 * Creates and returns a CompletableFuture that is completed
2015 * asynchronously using the {@link ForkJoinPool#commonPool()}
2016 * when this and the other given CompletableFuture both
2017 * complete. If this and/or the other CompletableFuture complete
2018 * exceptionally, then the returned CompletableFuture also does
2019 * so, with a CompletionException holding one of these exceptions as
2020 * its cause.
2021 *
2022 * @param other the other CompletableFuture
2023 * @param action the action to perform before completing the
2024 * returned CompletableFuture
2025 * @return the new CompletableFuture
2026 */
2027 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2028 Runnable action) {
2029 return doThenBiRun(other, action, ForkJoinPool.commonPool());
2030 }
2031
2032 /**
2033 * Creates and returns a CompletableFuture that is completed
2034 * asynchronously using the given executor
2035 * when this and the other given CompletableFuture both
2036 * complete. If this and/or the other CompletableFuture complete
2037 * exceptionally, then the returned CompletableFuture also does
2038 * so, with a CompletionException holding one of these exceptions as
2039 * its cause.
2040 *
2041 * @param other the other CompletableFuture
2042 * @param action the action to perform before completing the
2043 * returned CompletableFuture
2044 * @param executor the executor to use for asynchronous execution
2045 * @return the new CompletableFuture
2046 */
2047 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2048 Runnable action,
2049 Executor executor) {
2050 if (executor == null) throw new NullPointerException();
2051 return doThenBiRun(other, action, executor);
2052 }
2053
2054 private CompletableFuture<Void> doThenBiRun(CompletableFuture<?> other,
2055 Runnable action,
2056 Executor e) {
2057 if (other == null || action == null) throw new NullPointerException();
2058 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2059 BiRunCompletion<T> d = null;
2060 Object r, s = null;
2061 if ((r = result) == null || (s = other.result) == null) {
2062 d = new BiRunCompletion<T>(this, other, action, dst, e);
2063 CompletionNode q = null, p = new CompletionNode(d);
2064 while ((r == null && (r = result) == null) ||
2065 (s == null && (s = other.result) == null)) {
2066 if (q != null) {
2067 if (s != null ||
2068 UNSAFE.compareAndSwapObject
2069 (other, COMPLETIONS, q.next = other.completions, q))
2070 break;
2071 }
2072 else if (r != null ||
2073 UNSAFE.compareAndSwapObject
2074 (this, COMPLETIONS, p.next = completions, p)) {
2075 if (s != null)
2076 break;
2077 q = new CompletionNode(d);
2078 }
2079 }
2080 }
2081 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2082 Throwable ex;
2083 if (r instanceof AltResult)
2084 ex = ((AltResult)r).ex;
2085 else
2086 ex = null;
2087 if (ex == null && (s instanceof AltResult))
2088 ex = ((AltResult)s).ex;
2089 if (ex == null) {
2090 try {
2091 if (e != null)
2092 e.execute(new AsyncRun(action, dst));
2093 else
2094 action.run();
2095 } catch (Throwable rex) {
2096 ex = rex;
2097 }
2098 }
2099 if (e == null || ex != null)
2100 dst.internalComplete(null, ex);
2101 }
2102 helpPostComplete();
2103 other.helpPostComplete();
2104 return dst;
2105 }
2106
2107 /**
2108 * Creates and returns a CompletableFuture that is completed with
2109 * the result of the given function of either this or the other
2110 * given CompletableFuture's results when either complete. If
2111 * this and/or the other CompletableFuture complete exceptionally,
2112 * then the returned CompletableFuture may also do so, with a
2113 * CompletionException holding one of these exceptions as its cause.
2114 * No guarantees are made about which result or exception is used
2115 * in the returned CompletableFuture.
2116 *
2117 * @param other the other CompletableFuture
2118 * @param fn the function to use to compute the value of
2119 * the returned CompletableFuture
2120 * @return the new CompletableFuture
2121 */
2122 public <U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other,
2123 Fun<? super T, U> fn) {
2124 return doOrApply(other, fn, null);
2125 }
2126
2127 /**
2128 * Creates and returns a CompletableFuture that is completed
2129 * asynchronously using the {@link ForkJoinPool#commonPool()} with
2130 * the result of the given function of either this or the other
2131 * given CompletableFuture's results when either complete. If
2132 * this and/or the other CompletableFuture complete exceptionally,
2133 * then the returned CompletableFuture may also do so, with a
2134 * CompletionException holding one of these exceptions as its cause.
2135 * No guarantees are made about which result or exception is used
2136 * in the returned CompletableFuture.
2137 *
2138 * @param other the other CompletableFuture
2139 * @param fn the function to use to compute the value of
2140 * the returned CompletableFuture
2141 * @return the new CompletableFuture
2142 */
2143 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
2144 Fun<? super T, U> fn) {
2145 return doOrApply(other, fn, ForkJoinPool.commonPool());
2146 }
2147
2148 /**
2149 * Creates and returns a CompletableFuture that is completed
2150 * asynchronously using the given executor with the result of the
2151 * given function of either this or the other given
2152 * CompletableFuture's results when either complete. If this
2153 * and/or the other CompletableFuture complete exceptionally, then
2154 * the returned CompletableFuture may also do so, with a
2155 * CompletionException holding one of these exceptions as its cause.
2156 * No guarantees are made about which result or exception is used
2157 * in the returned CompletableFuture.
2158 *
2159 * @param other the other CompletableFuture
2160 * @param fn the function to use to compute the value of
2161 * the returned CompletableFuture
2162 * @param executor the executor to use for asynchronous execution
2163 * @return the new CompletableFuture
2164 */
2165 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
2166 Fun<? super T, U> fn,
2167 Executor executor) {
2168 if (executor == null) throw new NullPointerException();
2169 return doOrApply(other, fn, executor);
2170 }
2171
2172 private <U> CompletableFuture<U> doOrApply(CompletableFuture<? extends T> other,
2173 Fun<? super T, U> fn,
2174 Executor e) {
2175 if (other == null || fn == null) throw new NullPointerException();
2176 CompletableFuture<U> dst = new CompletableFuture<U>();
2177 OrApplyCompletion<T,U> d = null;
2178 Object r;
2179 if ((r = result) == null && (r = other.result) == null) {
2180 d = new OrApplyCompletion<T,U>(this, other, fn, dst, e);
2181 CompletionNode q = null, p = new CompletionNode(d);
2182 while ((r = result) == null && (r = other.result) == null) {
2183 if (q != null) {
2184 if (UNSAFE.compareAndSwapObject
2185 (other, COMPLETIONS, q.next = other.completions, q))
2186 break;
2187 }
2188 else if (UNSAFE.compareAndSwapObject
2189 (this, COMPLETIONS, p.next = completions, p))
2190 q = new CompletionNode(d);
2191 }
2192 }
2193 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2194 T t; Throwable ex;
2195 if (r instanceof AltResult) {
2196 ex = ((AltResult)r).ex;
2197 t = null;
2198 }
2199 else {
2200 ex = null;
2201 @SuppressWarnings("unchecked") T tr = (T) r;
2202 t = tr;
2203 }
2204 U u = null;
2205 if (ex == null) {
2206 try {
2207 if (e != null)
2208 e.execute(new AsyncApply<T,U>(t, fn, dst));
2209 else
2210 u = fn.apply(t);
2211 } catch (Throwable rex) {
2212 ex = rex;
2213 }
2214 }
2215 if (e == null || ex != null)
2216 dst.internalComplete(u, ex);
2217 }
2218 helpPostComplete();
2219 other.helpPostComplete();
2220 return dst;
2221 }
2222
2223 /**
2224 * Creates and returns a CompletableFuture that is completed after
2225 * performing the given action with the result of either this or the
2226 * other given CompletableFuture's result, when either complete.
2227 * If this and/or the other CompletableFuture complete
2228 * exceptionally, then the returned CompletableFuture may also do
2229 * so, with a CompletionException holding one of these exceptions as
2230 * its cause. No guarantees are made about which exception is
2231 * used in the returned CompletableFuture.
2232 *
2233 * @param other the other CompletableFuture
2234 * @param block the action to perform before completing the
2235 * returned CompletableFuture
2236 * @return the new CompletableFuture
2237 */
2238 public CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other,
2239 Action<? super T> block) {
2240 return doOrAccept(other, block, null);
2241 }
2242
2243 /**
2244 * Creates and returns a CompletableFuture that is completed
2245 * asynchronously using the {@link ForkJoinPool#commonPool()},
2246 * performing the given action with the result of either this or
2247 * the other given CompletableFuture's result, when either
2248 * complete. If this and/or the other CompletableFuture complete
2249 * exceptionally, then the returned CompletableFuture may also do
2250 * so, with a CompletionException holding one of these exceptions as
2251 * its cause. No guarantees are made about which exception is
2252 * used in the returned CompletableFuture.
2253 *
2254 * @param other the other CompletableFuture
2255 * @param block the action to perform before completing the
2256 * returned CompletableFuture
2257 * @return the new CompletableFuture
2258 */
2259 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
2260 Action<? super T> block) {
2261 return doOrAccept(other, block, ForkJoinPool.commonPool());
2262 }
2263
2264 /**
2265 * Creates and returns a CompletableFuture that is completed
2266 * asynchronously using the given executor,
2267 * performing the given action with the result of either this or
2268 * the other given CompletableFuture's result, when either
2269 * complete. If this and/or the other CompletableFuture complete
2270 * exceptionally, then the returned CompletableFuture may also do
2271 * so, with a CompletionException holding one of these exceptions as
2272 * its cause. No guarantees are made about which exception is
2273 * used in the returned CompletableFuture.
2274 *
2275 * @param other the other CompletableFuture
2276 * @param block the action to perform before completing the
2277 * returned CompletableFuture
2278 * @param executor the executor to use for asynchronous execution
2279 * @return the new CompletableFuture
2280 */
2281 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
2282 Action<? super T> block,
2283 Executor executor) {
2284 if (executor == null) throw new NullPointerException();
2285 return doOrAccept(other, block, executor);
2286 }
2287
2288 private CompletableFuture<Void> doOrAccept(CompletableFuture<? extends T> other,
2289 Action<? super T> fn,
2290 Executor e) {
2291 if (other == null || fn == null) throw new NullPointerException();
2292 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2293 OrAcceptCompletion<T> d = null;
2294 Object r;
2295 if ((r = result) == null && (r = other.result) == null) {
2296 d = new OrAcceptCompletion<T>(this, other, fn, dst, e);
2297 CompletionNode q = null, p = new CompletionNode(d);
2298 while ((r = result) == null && (r = other.result) == null) {
2299 if (q != null) {
2300 if (UNSAFE.compareAndSwapObject
2301 (other, COMPLETIONS, q.next = other.completions, q))
2302 break;
2303 }
2304 else if (UNSAFE.compareAndSwapObject
2305 (this, COMPLETIONS, p.next = completions, p))
2306 q = new CompletionNode(d);
2307 }
2308 }
2309 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2310 T t; Throwable ex;
2311 if (r instanceof AltResult) {
2312 ex = ((AltResult)r).ex;
2313 t = null;
2314 }
2315 else {
2316 ex = null;
2317 @SuppressWarnings("unchecked") T tr = (T) r;
2318 t = tr;
2319 }
2320 if (ex == null) {
2321 try {
2322 if (e != null)
2323 e.execute(new AsyncAccept<T>(t, fn, dst));
2324 else
2325 fn.accept(t);
2326 } catch (Throwable rex) {
2327 ex = rex;
2328 }
2329 }
2330 if (e == null || ex != null)
2331 dst.internalComplete(null, ex);
2332 }
2333 helpPostComplete();
2334 other.helpPostComplete();
2335 return dst;
2336 }
2337
2338 /**
2339 * Creates and returns a CompletableFuture that is completed
2340 * after this or the other given CompletableFuture complete. If
2341 * this and/or the other CompletableFuture complete exceptionally,
2342 * then the returned CompletableFuture may also do so, with a
2343 * CompletionException holding one of these exceptions as its cause.
2344 * No guarantees are made about which exception is used in the
2345 * returned CompletableFuture.
2346 *
2347 * @param other the other CompletableFuture
2348 * @param action the action to perform before completing the
2349 * returned CompletableFuture
2350 * @return the new CompletableFuture
2351 */
2352 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2353 Runnable action) {
2354 return doOrRun(other, action, null);
2355 }
2356
2357 /**
2358 * Creates and returns a CompletableFuture that is completed
2359 * asynchronously using the {@link ForkJoinPool#commonPool()}
2360 * after this or the other given CompletableFuture complete. If
2361 * this and/or the other CompletableFuture complete exceptionally,
2362 * then the returned CompletableFuture may also do so, with a
2363 * CompletionException holding one of these exceptions as its cause.
2364 * No guarantees are made about which exception is used in the
2365 * returned CompletableFuture.
2366 *
2367 * @param other the other CompletableFuture
2368 * @param action the action to perform before completing the
2369 * returned CompletableFuture
2370 * @return the new CompletableFuture
2371 */
2372 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
2373 Runnable action) {
2374 return doOrRun(other, action, ForkJoinPool.commonPool());
2375 }
2376
2377 /**
2378 * Creates and returns a CompletableFuture that is completed
2379 * asynchronously using the given executor after this or the other
2380 * given CompletableFuture complete. If this and/or the other
2381 * CompletableFuture complete exceptionally, then the returned
2382 * CompletableFuture may also do so, with a CompletionException
2383 * holding one of these exceptions as its cause. No guarantees are
2384 * made about which exception is used in the returned
2385 * CompletableFuture.
2386 *
2387 * @param other the other CompletableFuture
2388 * @param action the action to perform before completing the
2389 * returned CompletableFuture
2390 * @param executor the executor to use for asynchronous execution
2391 * @return the new CompletableFuture
2392 */
2393 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
2394 Runnable action,
2395 Executor executor) {
2396 if (executor == null) throw new NullPointerException();
2397 return doOrRun(other, action, executor);
2398 }
2399
2400 private CompletableFuture<Void> doOrRun(CompletableFuture<?> other,
2401 Runnable action,
2402 Executor e) {
2403 if (other == null || action == null) throw new NullPointerException();
2404 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2405 OrRunCompletion<T> d = null;
2406 Object r;
2407 if ((r = result) == null && (r = other.result) == null) {
2408 d = new OrRunCompletion<T>(this, other, action, dst, e);
2409 CompletionNode q = null, p = new CompletionNode(d);
2410 while ((r = result) == null && (r = other.result) == null) {
2411 if (q != null) {
2412 if (UNSAFE.compareAndSwapObject
2413 (other, COMPLETIONS, q.next = other.completions, q))
2414 break;
2415 }
2416 else if (UNSAFE.compareAndSwapObject
2417 (this, COMPLETIONS, p.next = completions, p))
2418 q = new CompletionNode(d);
2419 }
2420 }
2421 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2422 Throwable ex;
2423 if (r instanceof AltResult)
2424 ex = ((AltResult)r).ex;
2425 else
2426 ex = null;
2427 if (ex == null) {
2428 try {
2429 if (e != null)
2430 e.execute(new AsyncRun(action, dst));
2431 else
2432 action.run();
2433 } catch (Throwable rex) {
2434 ex = rex;
2435 }
2436 }
2437 if (e == null || ex != null)
2438 dst.internalComplete(null, ex);
2439 }
2440 helpPostComplete();
2441 other.helpPostComplete();
2442 return dst;
2443 }
2444
2445 /**
2446 * Returns a CompletableFuture (or an equivalent one) produced by
2447 * the given function of the result of this CompletableFuture when
2448 * completed. If this CompletableFuture completes exceptionally,
2449 * then the returned CompletableFuture also does so, with a
2450 * CompletionException holding this exception as its cause.
2451 *
2452 * @param fn the function returning a new CompletableFuture
2453 * @return the CompletableFuture, that {@code isDone()} upon
2454 * return if completed by the given function, or an exception
2455 * occurs
2456 */
2457 public <U> CompletableFuture<U> thenCompose(Fun<? super T,
2458 CompletableFuture<U>> fn) {
2459 if (fn == null) throw new NullPointerException();
2460 CompletableFuture<U> dst = null;
2461 ComposeCompletion<T,U> d = null;
2462 Object r;
2463 if ((r = result) == null) {
2464 dst = new CompletableFuture<U>();
2465 CompletionNode p = new CompletionNode
2466 (d = new ComposeCompletion<T,U>(this, fn, dst));
2467 while ((r = result) == null) {
2468 if (UNSAFE.compareAndSwapObject
2469 (this, COMPLETIONS, p.next = completions, p))
2470 break;
2471 }
2472 }
2473 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2474 T t; Throwable ex;
2475 if (r instanceof AltResult) {
2476 ex = ((AltResult)r).ex;
2477 t = null;
2478 }
2479 else {
2480 ex = null;
2481 @SuppressWarnings("unchecked") T tr = (T) r;
2482 t = tr;
2483 }
2484 if (ex == null) {
2485 try {
2486 dst = fn.apply(t);
2487 } catch (Throwable rex) {
2488 ex = rex;
2489 }
2490 }
2491 if (dst == null) {
2492 dst = new CompletableFuture<U>();
2493 if (ex == null)
2494 ex = new NullPointerException();
2495 }
2496 if (ex != null)
2497 dst.internalComplete(null, ex);
2498 }
2499 helpPostComplete();
2500 dst.helpPostComplete();
2501 return dst;
2502 }
2503
2504 /**
2505 * Creates and returns a CompletableFuture that is completed with
2506 * the result of the given function of the exception triggering
2507 * this CompletableFuture's completion when it completes
2508 * exceptionally; Otherwise, if this CompletableFuture completes
2509 * normally, then the returned CompletableFuture also completes
2510 * normally with the same value.
2511 *
2512 * @param fn the function to use to compute the value of the
2513 * returned CompletableFuture if this CompletableFuture completed
2514 * exceptionally
2515 * @return the new CompletableFuture
2516 */
2517 public CompletableFuture<T> exceptionally(Fun<Throwable, ? extends T> fn) {
2518 if (fn == null) throw new NullPointerException();
2519 CompletableFuture<T> dst = new CompletableFuture<T>();
2520 ExceptionCompletion<T> d = null;
2521 Object r;
2522 if ((r = result) == null) {
2523 CompletionNode p =
2524 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2525 while ((r = result) == null) {
2526 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2527 p.next = completions, p))
2528 break;
2529 }
2530 }
2531 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2532 T t = null; Throwable ex, dx = null;
2533 if (r instanceof AltResult) {
2534 if ((ex = ((AltResult)r).ex) != null) {
2535 try {
2536 t = fn.apply(ex);
2537 } catch (Throwable rex) {
2538 dx = rex;
2539 }
2540 }
2541 }
2542 else {
2543 @SuppressWarnings("unchecked") T tr = (T) r;
2544 t = tr;
2545 }
2546 dst.internalComplete(t, dx);
2547 }
2548 helpPostComplete();
2549 return dst;
2550 }
2551
2552 /**
2553 * Creates and returns a CompletableFuture that is completed with
2554 * the result of the given function of the result and exception of
2555 * this CompletableFuture's completion when it completes. The
2556 * given function is invoked with the result (or {@code null} if
2557 * none) and the exception (or {@code null} if none) of this
2558 * CompletableFuture when complete.
2559 *
2560 * @param fn the function to use to compute the value of the
2561 * returned CompletableFuture
2562
2563 * @return the new CompletableFuture
2564 */
2565 public <U> CompletableFuture<U> handle(BiFun<? super T, Throwable, ? extends U> fn) {
2566 if (fn == null) throw new NullPointerException();
2567 CompletableFuture<U> dst = new CompletableFuture<U>();
2568 HandleCompletion<T,U> d = null;
2569 Object r;
2570 if ((r = result) == null) {
2571 CompletionNode p =
2572 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2573 while ((r = result) == null) {
2574 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2575 p.next = completions, p))
2576 break;
2577 }
2578 }
2579 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2580 T t; Throwable ex;
2581 if (r instanceof AltResult) {
2582 ex = ((AltResult)r).ex;
2583 t = null;
2584 }
2585 else {
2586 ex = null;
2587 @SuppressWarnings("unchecked") T tr = (T) r;
2588 t = tr;
2589 }
2590 U u; Throwable dx;
2591 try {
2592 u = fn.apply(t, ex);
2593 dx = null;
2594 } catch (Throwable rex) {
2595 dx = rex;
2596 u = null;
2597 }
2598 dst.internalComplete(u, dx);
2599 }
2600 helpPostComplete();
2601 return dst;
2602 }
2603
2604 /**
2605 * Attempts to complete this CompletableFuture with
2606 * a {@link CancellationException}.
2607 *
2608 * @param mayInterruptIfRunning this value has no effect in this
2609 * implementation because interrupts are not used to control
2610 * processing.
2611 *
2612 * @return {@code true} if this task is now cancelled
2613 */
2614 public boolean cancel(boolean mayInterruptIfRunning) {
2615 Object r;
2616 while ((r = result) == null) {
2617 r = new AltResult(new CancellationException());
2618 if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
2619 postComplete();
2620 return true;
2621 }
2622 }
2623 return ((r instanceof AltResult) &&
2624 (((AltResult)r).ex instanceof CancellationException));
2625 }
2626
2627 /**
2628 * Returns {@code true} if this CompletableFuture was cancelled
2629 * before it completed normally.
2630 *
2631 * @return {@code true} if this CompletableFuture was cancelled
2632 * before it completed normally
2633 */
2634 public boolean isCancelled() {
2635 Object r;
2636 return ((r = result) instanceof AltResult) &&
2637 (((AltResult)r).ex instanceof CancellationException);
2638 }
2639
2640 /**
2641 * Forcibly sets or resets the value subsequently returned by
2642 * method {@link #get()} and related methods, whether or not
2643 * already completed. This method is designed for use only in
2644 * error recovery actions, and even in such situations may result
2645 * in ongoing dependent completions using established versus
2646 * overwritten outcomes.
2647 *
2648 * @param value the completion value
2649 */
2650 public void obtrudeValue(T value) {
2651 result = (value == null) ? NIL : value;
2652 postComplete();
2653 }
2654
2655 /**
2656 * Forcibly causes subsequent invocations of method {@link #get()}
2657 * and related methods to throw the given exception, whether or
2658 * not already completed. This method is designed for use only in
2659 * recovery actions, and even in such situations may result in
2660 * ongoing dependent completions using established versus
2661 * overwritten outcomes.
2662 *
2663 * @param ex the exception
2664 */
2665 public void obtrudeException(Throwable ex) {
2666 if (ex == null) throw new NullPointerException();
2667 result = new AltResult(ex);
2668 postComplete();
2669 }
2670
2671 // Unsafe mechanics
2672 private static final sun.misc.Unsafe UNSAFE;
2673 private static final long RESULT;
2674 private static final long WAITERS;
2675 private static final long COMPLETIONS;
2676 static {
2677 try {
2678 UNSAFE = getUnsafe();
2679 Class<?> k = CompletableFuture.class;
2680 RESULT = UNSAFE.objectFieldOffset
2681 (k.getDeclaredField("result"));
2682 WAITERS = UNSAFE.objectFieldOffset
2683 (k.getDeclaredField("waiters"));
2684 COMPLETIONS = UNSAFE.objectFieldOffset
2685 (k.getDeclaredField("completions"));
2686 } catch (Exception e) {
2687 throw new Error(e);
2688 }
2689 }
2690
2691
2692 /**
2693 * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
2694 * Replace with a simple call to Unsafe.getUnsafe when integrating
2695 * into a jdk.
2696 *
2697 * @return a sun.misc.Unsafe
2698 */
2699 private static sun.misc.Unsafe getUnsafe() {
2700 try {
2701 return sun.misc.Unsafe.getUnsafe();
2702 } catch (SecurityException tryReflectionInstead) {}
2703 try {
2704 return java.security.AccessController.doPrivileged
2705 (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
2706 public sun.misc.Unsafe run() throws Exception {
2707 Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
2708 for (java.lang.reflect.Field f : k.getDeclaredFields()) {
2709 f.setAccessible(true);
2710 Object x = f.get(null);
2711 if (k.isInstance(x))
2712 return k.cast(x);
2713 }
2714 throw new NoSuchFieldError("the Unsafe");
2715 }});
2716 } catch (java.security.PrivilegedActionException e) {
2717 throw new RuntimeException("Could not initialize intrinsics",
2718 e.getCause());
2719 }
2720 }
2721 }