ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/CompletableFuture.java
Revision: 1.14
Committed: Sat Feb 16 20:50:29 2013 UTC (11 years, 3 months ago) by jsr166
Branch: MAIN
Changes since 1.13: +0 -1 lines
Log Message:
javadoc comment correctness

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package jsr166e;
8 import java.util.concurrent.Future;
9 import java.util.concurrent.TimeUnit;
10 import java.util.concurrent.Executor;
11 import java.util.concurrent.ThreadLocalRandom;
12 import java.util.concurrent.ExecutionException;
13 import java.util.concurrent.TimeoutException;
14 import java.util.concurrent.CancellationException;
15 import java.util.concurrent.atomic.AtomicInteger;
16 import java.util.concurrent.locks.LockSupport;
17
18 /**
19 * A {@link Future} that may be explicitly completed (setting its
20 * value and status), and may include dependent functions and actions
21 * that trigger upon its completion. Methods are available for adding
22 * those based on Functions, Blocks, and Runnables, depending on
23 * whether they require arguments and/or produce results, as well as
24 * those triggered after either or both the current and another
25 * CompletableFuture complete. Functions and actions supplied for
26 * dependent completions (mainly using methods with prefix {@code
27 * then}) may be performed by the thread that completes the current
28 * CompletableFuture, or by any other caller of these methods. There
29 * are no guarantees about the order of processing completions unless
30 * constrained by these methods.
31 *
32 * <p>When two or more threads attempt to {@link #complete} or {@link
33 * #completeExceptionally} a CompletableFuture, only one of them
34 * succeeds.
35 *
36 * <p>Upon exceptional completion, or when a completion entails
37 * computation of a function or action, and it terminates abruptly
38 * with an (unchecked) exception or error, then further completions
39 * act as {@code completeExceptionally} with a {@link
40 * CompletionException} holding that exception as its cause. If a
41 * CompletableFuture completes exceptionally, and is not followed by a
42 * {@link #exceptionally} or {@link #handle} completion, then all of
43 * its dependents (and their dependents) also complete exceptionally
44 * with CompletionExceptions holding the ultimate cause. In case of a
45 * CompletionException, methods {@link #get()} and {@link #get(long,
46 * TimeUnit)} throw an {@link ExecutionException} with the same cause
47 * as would be held in the corresponding CompletionException. However,
48 * in these cases, methods {@link #join()} and {@link #getNow} throw
49 * the CompletionException, which simplifies usage especially within
50 * other completion functions.
51 *
52 * <p>CompletableFutures themselves do not execute asynchronously.
53 * However, the {@code async} methods provide commonly useful ways to
54 * commence asynchronous processing, using either a given {@link
55 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
56 * function or action that will result in the completion of a new
57 * CompletableFuture. To simplify monitoring, debugging, and tracking,
58 * all generated asynchronous tasks are instances of the tagging
59 * interface {@link AsynchronousCompletionTask}.
60 *
61 * <p><em>jsr166e note: During transition, this class
62 * uses nested functional interfaces with different names but the
63 * same forms as those expected for JDK8.</em>
64 *
65 * @author Doug Lea
66 * @since 1.8
67 */
68 public class CompletableFuture<T> implements Future<T> {
69 // jsr166e nested interfaces
70
71 /** Interface describing a void action of one argument */
72 public interface Action<A> { void accept(A a); }
73 /** Interface describing a void action of two arguments */
74 public interface BiAction<A,B> { void accept(A a, B b); }
75 /** Interface describing a function of one argument */
76 public interface Fun<A,T> { T apply(A a); }
77 /** Interface describing a function of two arguments */
78 public interface BiFun<A,B,T> { T apply(A a, B b); }
79 /** Interface describing a function of no arguments */
80 public interface Generator<T> { T get(); }
81
82
83 /*
84 * Overview:
85 *
86 * 1. Non-nullness of field result (set via CAS) indicates done.
87 * An AltResult is used to box null as a result, as well as to
88 * hold exceptions. Using a single field makes completion fast
89 * and simple to detect and trigger, at the expense of a lot of
90 * encoding and decoding that infiltrates many methods. One minor
91 * simplification relies on the (static) NIL (to box null results)
92 * being the only AltResult with a null exception field, so we
93 * don't usually need explicit comparisons with NIL. The CF
94 * exception propagation mechanics surrounding decoding rely on
95 * unchecked casts of decoded results really being unchecked,
96 * where user type errors are caught at point of use, as is
97 * currently the case in Java. These are highlighted by using
98 * SuppressWarnings-annotated temporaries.
99 *
100 * 2. Waiters are held in a Treiber stack similar to the one used
101 * in FutureTask, Phaser, and SynchronousQueue. See their
102 * internal documentation for algorithmic details.
103 *
104 * 3. Completions are also kept in a list/stack, and pulled off
105 * and run when completion is triggered. (We could even use the
106 * same stack as for waiters, but would give up the potential
107 * parallelism obtained because woken waiters help release/run
108 * others -- see method postComplete). Because post-processing
109 * may race with direct calls, class Completion opportunistically
110 * extends AtomicInteger so callers can claim the action via
111 * compareAndSet(0, 1). The Completion.run methods are all
112 * written a boringly similar uniform way (that sometimes includes
113 * unnecessary-looking checks, kept to maintain uniformity). There
114 * are enough dimensions upon which they differ that factoring to
115 * use common code isn't worthwhile.
116 *
117 * 4. The exported then/and/or methods do support a bit of
118 * factoring (see doThenApply etc). They must cope with the
119 * intrinsic races surrounding addition of a dependent action
120 * versus performing the action directly because the task is
121 * already complete. For example, a CF may not be complete upon
122 * entry, so a dependent completion is added, but by the time it
123 * is added, the target CF is complete, so must be directly
124 * executed. This is all done while avoiding unnecessary object
125 * construction in safe-bypass cases.
126 */
127
128 // preliminaries
129
130 static final class AltResult {
131 final Throwable ex; // null only for NIL
132 AltResult(Throwable ex) { this.ex = ex; }
133 }
134
135 static final AltResult NIL = new AltResult(null);
136
137 // Fields
138
139 volatile Object result; // Either the result or boxed AltResult
140 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
141 volatile CompletionNode completions; // list (Treiber stack) of completions
142
143 // Basic utilities for triggering and processing completions
144
145 /**
146 * Removes and signals all waiting threads and runs all completions.
147 */
148 final void postComplete() {
149 WaitNode q; Thread t;
150 while ((q = waiters) != null) {
151 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
152 (t = q.thread) != null) {
153 q.thread = null;
154 LockSupport.unpark(t);
155 }
156 }
157
158 CompletionNode h; Completion c;
159 while ((h = completions) != null) {
160 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
161 (c = h.completion) != null)
162 c.run();
163 }
164 }
165
166 /**
167 * Triggers completion with the encoding of the given arguments:
168 * if the exception is non-null, encodes it as a wrapped
169 * CompletionException unless it is one already. Otherwise uses
170 * the given result, boxed as NIL if null.
171 */
172 final void internalComplete(Object v, Throwable ex) {
173 if (result == null)
174 UNSAFE.compareAndSwapObject
175 (this, RESULT, null,
176 (ex == null) ? (v == null) ? NIL : v :
177 new AltResult((ex instanceof CompletionException) ? ex :
178 new CompletionException(ex)));
179 postComplete(); // help out even if not triggered
180 }
181
182 /**
183 * If triggered, helps release and/or process completions.
184 */
185 final void helpPostComplete() {
186 if (result != null)
187 postComplete();
188 }
189
190 /* ------------- waiting for completions -------------- */
191
192 /**
193 * Heuristic spin value for waitingGet() before blocking on
194 * multiprocessors
195 */
196 static final int WAITING_GET_SPINS = 256;
197
198 /**
199 * Linked nodes to record waiting threads in a Treiber stack. See
200 * other classes such as Phaser and SynchronousQueue for more
201 * detailed explanation. This class implements ManagedBlocker to
202 * avoid starvation when blocking actions pile up in
203 * ForkJoinPools.
204 */
205 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
206 long nanos; // wait time if timed
207 final long deadline; // non-zero if timed
208 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
209 volatile Thread thread;
210 volatile WaitNode next;
211 WaitNode(boolean interruptible, long nanos, long deadline) {
212 this.thread = Thread.currentThread();
213 this.interruptControl = interruptible ? 1 : 0;
214 this.nanos = nanos;
215 this.deadline = deadline;
216 }
217 public boolean isReleasable() {
218 if (thread == null)
219 return true;
220 if (Thread.interrupted()) {
221 int i = interruptControl;
222 interruptControl = -1;
223 if (i > 0)
224 return true;
225 }
226 if (deadline != 0L &&
227 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
228 thread = null;
229 return true;
230 }
231 return false;
232 }
233 public boolean block() {
234 if (isReleasable())
235 return true;
236 else if (deadline == 0L)
237 LockSupport.park(this);
238 else if (nanos > 0L)
239 LockSupport.parkNanos(this, nanos);
240 return isReleasable();
241 }
242 }
243
244 /**
245 * Returns raw result after waiting, or null if interruptible and
246 * interrupted.
247 */
248 private Object waitingGet(boolean interruptible) {
249 WaitNode q = null;
250 boolean queued = false;
251 int h = 0, spins = 0;
252 for (Object r;;) {
253 if ((r = result) != null) {
254 if (q != null) { // suppress unpark
255 q.thread = null;
256 if (q.interruptControl < 0) {
257 if (interruptible) {
258 removeWaiter(q);
259 return null;
260 }
261 Thread.currentThread().interrupt();
262 }
263 }
264 postComplete(); // help release others
265 return r;
266 }
267 else if (h == 0) {
268 h = ThreadLocalRandom.current().nextInt();
269 if (Runtime.getRuntime().availableProcessors() > 1)
270 spins = WAITING_GET_SPINS;
271 }
272 else if (spins > 0) {
273 h ^= h << 1; // xorshift
274 h ^= h >>> 3;
275 if ((h ^= h << 10) >= 0)
276 --spins;
277 }
278 else if (q == null)
279 q = new WaitNode(interruptible, 0L, 0L);
280 else if (!queued)
281 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
282 q.next = waiters, q);
283 else if (interruptible && q.interruptControl < 0) {
284 removeWaiter(q);
285 return null;
286 }
287 else if (q.thread != null && result == null) {
288 try {
289 ForkJoinPool.managedBlock(q);
290 } catch (InterruptedException ex) {
291 q.interruptControl = -1;
292 }
293 }
294 }
295 }
296
297 /**
298 * Awaits completion or aborts on interrupt or timeout.
299 *
300 * @param nanos time to wait
301 * @return raw result
302 */
303 private Object timedAwaitDone(long nanos)
304 throws InterruptedException, TimeoutException {
305 WaitNode q = null;
306 boolean queued = false;
307 for (Object r;;) {
308 if ((r = result) != null) {
309 if (q != null) {
310 q.thread = null;
311 if (q.interruptControl < 0) {
312 removeWaiter(q);
313 throw new InterruptedException();
314 }
315 }
316 postComplete();
317 return r;
318 }
319 else if (q == null) {
320 if (nanos <= 0L)
321 throw new TimeoutException();
322 long d = System.nanoTime() + nanos;
323 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
324 }
325 else if (!queued)
326 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
327 q.next = waiters, q);
328 else if (q.interruptControl < 0) {
329 removeWaiter(q);
330 throw new InterruptedException();
331 }
332 else if (q.nanos <= 0L) {
333 if (result == null) {
334 removeWaiter(q);
335 throw new TimeoutException();
336 }
337 }
338 else if (q.thread != null && result == null) {
339 try {
340 ForkJoinPool.managedBlock(q);
341 } catch (InterruptedException ex) {
342 q.interruptControl = -1;
343 }
344 }
345 }
346 }
347
348 /**
349 * Tries to unlink a timed-out or interrupted wait node to avoid
350 * accumulating garbage. Internal nodes are simply unspliced
351 * without CAS since it is harmless if they are traversed anyway
352 * by releasers. To avoid effects of unsplicing from already
353 * removed nodes, the list is retraversed in case of an apparent
354 * race. This is slow when there are a lot of nodes, but we don't
355 * expect lists to be long enough to outweigh higher-overhead
356 * schemes.
357 */
358 private void removeWaiter(WaitNode node) {
359 if (node != null) {
360 node.thread = null;
361 retry:
362 for (;;) { // restart on removeWaiter race
363 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
364 s = q.next;
365 if (q.thread != null)
366 pred = q;
367 else if (pred != null) {
368 pred.next = s;
369 if (pred.thread == null) // check for race
370 continue retry;
371 }
372 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
373 continue retry;
374 }
375 break;
376 }
377 }
378 }
379
380 /* ------------- Async tasks -------------- */
381
382 /**
383 * A tagging interface identifying asynchronous tasks produced by
384 * {@code async} methods. This may be useful for monitoring,
385 * debugging, and tracking asynchronous activities.
386 */
387 public static interface AsynchronousCompletionTask {
388 }
389
390 /** Base class can act as either FJ or plain Runnable */
391 abstract static class Async extends ForkJoinTask<Void>
392 implements Runnable, AsynchronousCompletionTask {
393 public final Void getRawResult() { return null; }
394 public final void setRawResult(Void v) { }
395 public final void run() { exec(); }
396 }
397
398 static final class AsyncRun extends Async {
399 final Runnable fn;
400 final CompletableFuture<Void> dst;
401 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
402 this.fn = fn; this.dst = dst;
403 }
404 public final boolean exec() {
405 CompletableFuture<Void> d; Throwable ex;
406 if ((d = this.dst) != null && d.result == null) {
407 try {
408 fn.run();
409 ex = null;
410 } catch (Throwable rex) {
411 ex = rex;
412 }
413 d.internalComplete(null, ex);
414 }
415 return true;
416 }
417 private static final long serialVersionUID = 5232453952276885070L;
418 }
419
420 static final class AsyncSupply<U> extends Async {
421 final Generator<U> fn;
422 final CompletableFuture<U> dst;
423 AsyncSupply(Generator<U> fn, CompletableFuture<U> dst) {
424 this.fn = fn; this.dst = dst;
425 }
426 public final boolean exec() {
427 CompletableFuture<U> d; U u; Throwable ex;
428 if ((d = this.dst) != null && d.result == null) {
429 try {
430 u = fn.get();
431 ex = null;
432 } catch (Throwable rex) {
433 ex = rex;
434 u = null;
435 }
436 d.internalComplete(u, ex);
437 }
438 return true;
439 }
440 private static final long serialVersionUID = 5232453952276885070L;
441 }
442
443 static final class AsyncApply<T,U> extends Async {
444 final Fun<? super T,? extends U> fn;
445 final T arg;
446 final CompletableFuture<U> dst;
447 AsyncApply(T arg, Fun<? super T,? extends U> fn,
448 CompletableFuture<U> dst) {
449 this.arg = arg; this.fn = fn; this.dst = dst;
450 }
451 public final boolean exec() {
452 CompletableFuture<U> d; U u; Throwable ex;
453 if ((d = this.dst) != null && d.result == null) {
454 try {
455 u = fn.apply(arg);
456 ex = null;
457 } catch (Throwable rex) {
458 ex = rex;
459 u = null;
460 }
461 d.internalComplete(u, ex);
462 }
463 return true;
464 }
465 private static final long serialVersionUID = 5232453952276885070L;
466 }
467
468 static final class AsyncBiApply<T,U,V> extends Async {
469 final BiFun<? super T,? super U,? extends V> fn;
470 final T arg1;
471 final U arg2;
472 final CompletableFuture<V> dst;
473 AsyncBiApply(T arg1, U arg2,
474 BiFun<? super T,? super U,? extends V> fn,
475 CompletableFuture<V> dst) {
476 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
477 }
478 public final boolean exec() {
479 CompletableFuture<V> d; V v; Throwable ex;
480 if ((d = this.dst) != null && d.result == null) {
481 try {
482 v = fn.apply(arg1, arg2);
483 ex = null;
484 } catch (Throwable rex) {
485 ex = rex;
486 v = null;
487 }
488 d.internalComplete(v, ex);
489 }
490 return true;
491 }
492 private static final long serialVersionUID = 5232453952276885070L;
493 }
494
495 static final class AsyncAccept<T> extends Async {
496 final Action<? super T> fn;
497 final T arg;
498 final CompletableFuture<Void> dst;
499 AsyncAccept(T arg, Action<? super T> fn,
500 CompletableFuture<Void> dst) {
501 this.arg = arg; this.fn = fn; this.dst = dst;
502 }
503 public final boolean exec() {
504 CompletableFuture<Void> d; Throwable ex;
505 if ((d = this.dst) != null && d.result == null) {
506 try {
507 fn.accept(arg);
508 ex = null;
509 } catch (Throwable rex) {
510 ex = rex;
511 }
512 d.internalComplete(null, ex);
513 }
514 return true;
515 }
516 private static final long serialVersionUID = 5232453952276885070L;
517 }
518
519 static final class AsyncBiAccept<T,U> extends Async {
520 final BiAction<? super T,? super U> fn;
521 final T arg1;
522 final U arg2;
523 final CompletableFuture<Void> dst;
524 AsyncBiAccept(T arg1, U arg2,
525 BiAction<? super T,? super U> fn,
526 CompletableFuture<Void> dst) {
527 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
528 }
529 public final boolean exec() {
530 CompletableFuture<Void> d; Throwable ex;
531 if ((d = this.dst) != null && d.result == null) {
532 try {
533 fn.accept(arg1, arg2);
534 ex = null;
535 } catch (Throwable rex) {
536 ex = rex;
537 }
538 d.internalComplete(null, ex);
539 }
540 return true;
541 }
542 private static final long serialVersionUID = 5232453952276885070L;
543 }
544
545 /* ------------- Completions -------------- */
546
547 /**
548 * Simple linked list nodes to record completions, used in
549 * basically the same way as WaitNodes. (We separate nodes from
550 * the Completions themselves mainly because for the And and Or
551 * methods, the same Completion object resides in two lists.)
552 */
553 static final class CompletionNode {
554 final Completion completion;
555 volatile CompletionNode next;
556 CompletionNode(Completion completion) { this.completion = completion; }
557 }
558
559 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
560 abstract static class Completion extends AtomicInteger implements Runnable {
561 }
562
563 static final class ApplyCompletion<T,U> extends Completion {
564 final CompletableFuture<? extends T> src;
565 final Fun<? super T,? extends U> fn;
566 final CompletableFuture<U> dst;
567 final Executor executor;
568 ApplyCompletion(CompletableFuture<? extends T> src,
569 Fun<? super T,? extends U> fn,
570 CompletableFuture<U> dst, Executor executor) {
571 this.src = src; this.fn = fn; this.dst = dst;
572 this.executor = executor;
573 }
574 public final void run() {
575 final CompletableFuture<? extends T> a;
576 final Fun<? super T,? extends U> fn;
577 final CompletableFuture<U> dst;
578 Object r; T t; Throwable ex;
579 if ((dst = this.dst) != null &&
580 (fn = this.fn) != null &&
581 (a = this.src) != null &&
582 (r = a.result) != null &&
583 compareAndSet(0, 1)) {
584 if (r instanceof AltResult) {
585 ex = ((AltResult)r).ex;
586 t = null;
587 }
588 else {
589 ex = null;
590 @SuppressWarnings("unchecked") T tr = (T) r;
591 t = tr;
592 }
593 Executor e = executor;
594 U u = null;
595 if (ex == null) {
596 try {
597 if (e != null)
598 e.execute(new AsyncApply<T,U>(t, fn, dst));
599 else
600 u = fn.apply(t);
601 } catch (Throwable rex) {
602 ex = rex;
603 }
604 }
605 if (e == null || ex != null)
606 dst.internalComplete(u, ex);
607 }
608 }
609 private static final long serialVersionUID = 5232453952276885070L;
610 }
611
612 static final class AcceptCompletion<T> extends Completion {
613 final CompletableFuture<? extends T> src;
614 final Action<? super T> fn;
615 final CompletableFuture<Void> dst;
616 final Executor executor;
617 AcceptCompletion(CompletableFuture<? extends T> src,
618 Action<? super T> fn,
619 CompletableFuture<Void> dst, Executor executor) {
620 this.src = src; this.fn = fn; this.dst = dst;
621 this.executor = executor;
622 }
623 public final void run() {
624 final CompletableFuture<? extends T> a;
625 final Action<? super T> fn;
626 final CompletableFuture<Void> dst;
627 Object r; T t; Throwable ex;
628 if ((dst = this.dst) != null &&
629 (fn = this.fn) != null &&
630 (a = this.src) != null &&
631 (r = a.result) != null &&
632 compareAndSet(0, 1)) {
633 if (r instanceof AltResult) {
634 ex = ((AltResult)r).ex;
635 t = null;
636 }
637 else {
638 ex = null;
639 @SuppressWarnings("unchecked") T tr = (T) r;
640 t = tr;
641 }
642 Executor e = executor;
643 if (ex == null) {
644 try {
645 if (e != null)
646 e.execute(new AsyncAccept<T>(t, fn, dst));
647 else
648 fn.accept(t);
649 } catch (Throwable rex) {
650 ex = rex;
651 }
652 }
653 if (e == null || ex != null)
654 dst.internalComplete(null, ex);
655 }
656 }
657 private static final long serialVersionUID = 5232453952276885070L;
658 }
659
660 static final class RunCompletion<T> extends Completion {
661 final CompletableFuture<? extends T> src;
662 final Runnable fn;
663 final CompletableFuture<Void> dst;
664 final Executor executor;
665 RunCompletion(CompletableFuture<? extends T> src,
666 Runnable fn,
667 CompletableFuture<Void> dst,
668 Executor executor) {
669 this.src = src; this.fn = fn; this.dst = dst;
670 this.executor = executor;
671 }
672 public final void run() {
673 final CompletableFuture<? extends T> a;
674 final Runnable fn;
675 final CompletableFuture<Void> dst;
676 Object r; Throwable ex;
677 if ((dst = this.dst) != null &&
678 (fn = this.fn) != null &&
679 (a = this.src) != null &&
680 (r = a.result) != null &&
681 compareAndSet(0, 1)) {
682 if (r instanceof AltResult)
683 ex = ((AltResult)r).ex;
684 else
685 ex = null;
686 Executor e = executor;
687 if (ex == null) {
688 try {
689 if (e != null)
690 e.execute(new AsyncRun(fn, dst));
691 else
692 fn.run();
693 } catch (Throwable rex) {
694 ex = rex;
695 }
696 }
697 if (e == null || ex != null)
698 dst.internalComplete(null, ex);
699 }
700 }
701 private static final long serialVersionUID = 5232453952276885070L;
702 }
703
704 static final class BiApplyCompletion<T,U,V> extends Completion {
705 final CompletableFuture<? extends T> src;
706 final CompletableFuture<? extends U> snd;
707 final BiFun<? super T,? super U,? extends V> fn;
708 final CompletableFuture<V> dst;
709 final Executor executor;
710 BiApplyCompletion(CompletableFuture<? extends T> src,
711 CompletableFuture<? extends U> snd,
712 BiFun<? super T,? super U,? extends V> fn,
713 CompletableFuture<V> dst, Executor executor) {
714 this.src = src; this.snd = snd;
715 this.fn = fn; this.dst = dst;
716 this.executor = executor;
717 }
718 public final void run() {
719 final CompletableFuture<? extends T> a;
720 final CompletableFuture<? extends U> b;
721 final BiFun<? super T,? super U,? extends V> fn;
722 final CompletableFuture<V> dst;
723 Object r, s; T t; U u; Throwable ex;
724 if ((dst = this.dst) != null &&
725 (fn = this.fn) != null &&
726 (a = this.src) != null &&
727 (r = a.result) != null &&
728 (b = this.snd) != null &&
729 (s = b.result) != null &&
730 compareAndSet(0, 1)) {
731 if (r instanceof AltResult) {
732 ex = ((AltResult)r).ex;
733 t = null;
734 }
735 else {
736 ex = null;
737 @SuppressWarnings("unchecked") T tr = (T) r;
738 t = tr;
739 }
740 if (ex != null)
741 u = null;
742 else if (s instanceof AltResult) {
743 ex = ((AltResult)s).ex;
744 u = null;
745 }
746 else {
747 @SuppressWarnings("unchecked") U us = (U) s;
748 u = us;
749 }
750 Executor e = executor;
751 V v = null;
752 if (ex == null) {
753 try {
754 if (e != null)
755 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
756 else
757 v = fn.apply(t, u);
758 } catch (Throwable rex) {
759 ex = rex;
760 }
761 }
762 if (e == null || ex != null)
763 dst.internalComplete(v, ex);
764 }
765 }
766 private static final long serialVersionUID = 5232453952276885070L;
767 }
768
769 static final class BiAcceptCompletion<T,U> extends Completion {
770 final CompletableFuture<? extends T> src;
771 final CompletableFuture<? extends U> snd;
772 final BiAction<? super T,? super U> fn;
773 final CompletableFuture<Void> dst;
774 final Executor executor;
775 BiAcceptCompletion(CompletableFuture<? extends T> src,
776 CompletableFuture<? extends U> snd,
777 BiAction<? super T,? super U> fn,
778 CompletableFuture<Void> dst, Executor executor) {
779 this.src = src; this.snd = snd;
780 this.fn = fn; this.dst = dst;
781 this.executor = executor;
782 }
783 public final void run() {
784 final CompletableFuture<? extends T> a;
785 final CompletableFuture<? extends U> b;
786 final BiAction<? super T,? super U> fn;
787 final CompletableFuture<Void> dst;
788 Object r, s; T t; U u; Throwable ex;
789 if ((dst = this.dst) != null &&
790 (fn = this.fn) != null &&
791 (a = this.src) != null &&
792 (r = a.result) != null &&
793 (b = this.snd) != null &&
794 (s = b.result) != null &&
795 compareAndSet(0, 1)) {
796 if (r instanceof AltResult) {
797 ex = ((AltResult)r).ex;
798 t = null;
799 }
800 else {
801 ex = null;
802 @SuppressWarnings("unchecked") T tr = (T) r;
803 t = tr;
804 }
805 if (ex != null)
806 u = null;
807 else if (s instanceof AltResult) {
808 ex = ((AltResult)s).ex;
809 u = null;
810 }
811 else {
812 @SuppressWarnings("unchecked") U us = (U) s;
813 u = us;
814 }
815 Executor e = executor;
816 if (ex == null) {
817 try {
818 if (e != null)
819 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
820 else
821 fn.accept(t, u);
822 } catch (Throwable rex) {
823 ex = rex;
824 }
825 }
826 if (e == null || ex != null)
827 dst.internalComplete(null, ex);
828 }
829 }
830 private static final long serialVersionUID = 5232453952276885070L;
831 }
832
833 static final class BiRunCompletion<T> extends Completion {
834 final CompletableFuture<? extends T> src;
835 final CompletableFuture<?> snd;
836 final Runnable fn;
837 final CompletableFuture<Void> dst;
838 final Executor executor;
839 BiRunCompletion(CompletableFuture<? extends T> src,
840 CompletableFuture<?> snd,
841 Runnable fn,
842 CompletableFuture<Void> dst, Executor executor) {
843 this.src = src; this.snd = snd;
844 this.fn = fn; this.dst = dst;
845 this.executor = executor;
846 }
847 public final void run() {
848 final CompletableFuture<? extends T> a;
849 final CompletableFuture<?> b;
850 final Runnable fn;
851 final CompletableFuture<Void> dst;
852 Object r, s; Throwable ex;
853 if ((dst = this.dst) != null &&
854 (fn = this.fn) != null &&
855 (a = this.src) != null &&
856 (r = a.result) != null &&
857 (b = this.snd) != null &&
858 (s = b.result) != null &&
859 compareAndSet(0, 1)) {
860 if (r instanceof AltResult)
861 ex = ((AltResult)r).ex;
862 else
863 ex = null;
864 if (ex == null && (s instanceof AltResult))
865 ex = ((AltResult)s).ex;
866 Executor e = executor;
867 if (ex == null) {
868 try {
869 if (e != null)
870 e.execute(new AsyncRun(fn, dst));
871 else
872 fn.run();
873 } catch (Throwable rex) {
874 ex = rex;
875 }
876 }
877 if (e == null || ex != null)
878 dst.internalComplete(null, ex);
879 }
880 }
881 private static final long serialVersionUID = 5232453952276885070L;
882 }
883
884 static final class OrApplyCompletion<T,U> extends Completion {
885 final CompletableFuture<? extends T> src;
886 final CompletableFuture<? extends T> snd;
887 final Fun<? super T,? extends U> fn;
888 final CompletableFuture<U> dst;
889 final Executor executor;
890 OrApplyCompletion(CompletableFuture<? extends T> src,
891 CompletableFuture<? extends T> snd,
892 Fun<? super T,? extends U> fn,
893 CompletableFuture<U> dst, Executor executor) {
894 this.src = src; this.snd = snd;
895 this.fn = fn; this.dst = dst;
896 this.executor = executor;
897 }
898 public final void run() {
899 final CompletableFuture<? extends T> a;
900 final CompletableFuture<? extends T> b;
901 final Fun<? super T,? extends U> fn;
902 final CompletableFuture<U> dst;
903 Object r; T t; Throwable ex;
904 if ((dst = this.dst) != null &&
905 (fn = this.fn) != null &&
906 (((a = this.src) != null && (r = a.result) != null) ||
907 ((b = this.snd) != null && (r = b.result) != null)) &&
908 compareAndSet(0, 1)) {
909 if (r instanceof AltResult) {
910 ex = ((AltResult)r).ex;
911 t = null;
912 }
913 else {
914 ex = null;
915 @SuppressWarnings("unchecked") T tr = (T) r;
916 t = tr;
917 }
918 Executor e = executor;
919 U u = null;
920 if (ex == null) {
921 try {
922 if (e != null)
923 e.execute(new AsyncApply<T,U>(t, fn, dst));
924 else
925 u = fn.apply(t);
926 } catch (Throwable rex) {
927 ex = rex;
928 }
929 }
930 if (e == null || ex != null)
931 dst.internalComplete(u, ex);
932 }
933 }
934 private static final long serialVersionUID = 5232453952276885070L;
935 }
936
937 static final class OrAcceptCompletion<T> extends Completion {
938 final CompletableFuture<? extends T> src;
939 final CompletableFuture<? extends T> snd;
940 final Action<? super T> fn;
941 final CompletableFuture<Void> dst;
942 final Executor executor;
943 OrAcceptCompletion(CompletableFuture<? extends T> src,
944 CompletableFuture<? extends T> snd,
945 Action<? super T> fn,
946 CompletableFuture<Void> dst, Executor executor) {
947 this.src = src; this.snd = snd;
948 this.fn = fn; this.dst = dst;
949 this.executor = executor;
950 }
951 public final void run() {
952 final CompletableFuture<? extends T> a;
953 final CompletableFuture<? extends T> b;
954 final Action<? super T> fn;
955 final CompletableFuture<Void> dst;
956 Object r; T t; Throwable ex;
957 if ((dst = this.dst) != null &&
958 (fn = this.fn) != null &&
959 (((a = this.src) != null && (r = a.result) != null) ||
960 ((b = this.snd) != null && (r = b.result) != null)) &&
961 compareAndSet(0, 1)) {
962 if (r instanceof AltResult) {
963 ex = ((AltResult)r).ex;
964 t = null;
965 }
966 else {
967 ex = null;
968 @SuppressWarnings("unchecked") T tr = (T) r;
969 t = tr;
970 }
971 Executor e = executor;
972 if (ex == null) {
973 try {
974 if (e != null)
975 e.execute(new AsyncAccept<T>(t, fn, dst));
976 else
977 fn.accept(t);
978 } catch (Throwable rex) {
979 ex = rex;
980 }
981 }
982 if (e == null || ex != null)
983 dst.internalComplete(null, ex);
984 }
985 }
986 private static final long serialVersionUID = 5232453952276885070L;
987 }
988
989 static final class OrRunCompletion<T> extends Completion {
990 final CompletableFuture<? extends T> src;
991 final CompletableFuture<?> snd;
992 final Runnable fn;
993 final CompletableFuture<Void> dst;
994 final Executor executor;
995 OrRunCompletion(CompletableFuture<? extends T> src,
996 CompletableFuture<?> snd,
997 Runnable fn,
998 CompletableFuture<Void> dst, Executor executor) {
999 this.src = src; this.snd = snd;
1000 this.fn = fn; this.dst = dst;
1001 this.executor = executor;
1002 }
1003 public final void run() {
1004 final CompletableFuture<? extends T> a;
1005 final CompletableFuture<?> b;
1006 final Runnable fn;
1007 final CompletableFuture<Void> dst;
1008 Object r; Throwable ex;
1009 if ((dst = this.dst) != null &&
1010 (fn = this.fn) != null &&
1011 (((a = this.src) != null && (r = a.result) != null) ||
1012 ((b = this.snd) != null && (r = b.result) != null)) &&
1013 compareAndSet(0, 1)) {
1014 if (r instanceof AltResult)
1015 ex = ((AltResult)r).ex;
1016 else
1017 ex = null;
1018 Executor e = executor;
1019 if (ex == null) {
1020 try {
1021 if (e != null)
1022 e.execute(new AsyncRun(fn, dst));
1023 else
1024 fn.run();
1025 } catch (Throwable rex) {
1026 ex = rex;
1027 }
1028 }
1029 if (e == null || ex != null)
1030 dst.internalComplete(null, ex);
1031 }
1032 }
1033 private static final long serialVersionUID = 5232453952276885070L;
1034 }
1035
1036 static final class ExceptionCompletion<T> extends Completion {
1037 final CompletableFuture<? extends T> src;
1038 final Fun<? super Throwable, ? extends T> fn;
1039 final CompletableFuture<T> dst;
1040 ExceptionCompletion(CompletableFuture<? extends T> src,
1041 Fun<? super Throwable, ? extends T> fn,
1042 CompletableFuture<T> dst) {
1043 this.src = src; this.fn = fn; this.dst = dst;
1044 }
1045 public final void run() {
1046 final CompletableFuture<? extends T> a;
1047 final Fun<? super Throwable, ? extends T> fn;
1048 final CompletableFuture<T> dst;
1049 Object r; T t = null; Throwable ex, dx = null;
1050 if ((dst = this.dst) != null &&
1051 (fn = this.fn) != null &&
1052 (a = this.src) != null &&
1053 (r = a.result) != null &&
1054 compareAndSet(0, 1)) {
1055 if ((r instanceof AltResult) &&
1056 (ex = ((AltResult)r).ex) != null) {
1057 try {
1058 t = fn.apply(ex);
1059 } catch (Throwable rex) {
1060 dx = rex;
1061 }
1062 }
1063 else {
1064 @SuppressWarnings("unchecked") T tr = (T) r;
1065 t = tr;
1066 }
1067 dst.internalComplete(t, dx);
1068 }
1069 }
1070 private static final long serialVersionUID = 5232453952276885070L;
1071 }
1072
1073 static final class ThenCopy<T> extends Completion {
1074 final CompletableFuture<? extends T> src;
1075 final CompletableFuture<T> dst;
1076 ThenCopy(CompletableFuture<? extends T> src,
1077 CompletableFuture<T> dst) {
1078 this.src = src; this.dst = dst;
1079 }
1080 public final void run() {
1081 final CompletableFuture<? extends T> a;
1082 final CompletableFuture<T> dst;
1083 Object r; Object t; Throwable ex;
1084 if ((dst = this.dst) != null &&
1085 (a = this.src) != null &&
1086 (r = a.result) != null &&
1087 compareAndSet(0, 1)) {
1088 if (r instanceof AltResult) {
1089 ex = ((AltResult)r).ex;
1090 t = null;
1091 }
1092 else {
1093 ex = null;
1094 t = r;
1095 }
1096 dst.internalComplete(t, ex);
1097 }
1098 }
1099 private static final long serialVersionUID = 5232453952276885070L;
1100 }
1101
1102 static final class HandleCompletion<T,U> extends Completion {
1103 final CompletableFuture<? extends T> src;
1104 final BiFun<? super T, Throwable, ? extends U> fn;
1105 final CompletableFuture<U> dst;
1106 HandleCompletion(CompletableFuture<? extends T> src,
1107 BiFun<? super T, Throwable, ? extends U> fn,
1108 final CompletableFuture<U> dst) {
1109 this.src = src; this.fn = fn; this.dst = dst;
1110 }
1111 public final void run() {
1112 final CompletableFuture<? extends T> a;
1113 final BiFun<? super T, Throwable, ? extends U> fn;
1114 final CompletableFuture<U> dst;
1115 Object r; T t; Throwable ex;
1116 if ((dst = this.dst) != null &&
1117 (fn = this.fn) != null &&
1118 (a = this.src) != null &&
1119 (r = a.result) != null &&
1120 compareAndSet(0, 1)) {
1121 if (r instanceof AltResult) {
1122 ex = ((AltResult)r).ex;
1123 t = null;
1124 }
1125 else {
1126 ex = null;
1127 @SuppressWarnings("unchecked") T tr = (T) r;
1128 t = tr;
1129 }
1130 U u = null; Throwable dx = null;
1131 try {
1132 u = fn.apply(t, ex);
1133 } catch (Throwable rex) {
1134 dx = rex;
1135 }
1136 dst.internalComplete(u, dx);
1137 }
1138 }
1139 private static final long serialVersionUID = 5232453952276885070L;
1140 }
1141
1142 static final class ComposeCompletion<T,U> extends Completion {
1143 final CompletableFuture<? extends T> src;
1144 final Fun<? super T, CompletableFuture<U>> fn;
1145 final CompletableFuture<U> dst;
1146 ComposeCompletion(CompletableFuture<? extends T> src,
1147 Fun<? super T, CompletableFuture<U>> fn,
1148 final CompletableFuture<U> dst) {
1149 this.src = src; this.fn = fn; this.dst = dst;
1150 }
1151 public final void run() {
1152 final CompletableFuture<? extends T> a;
1153 final Fun<? super T, CompletableFuture<U>> fn;
1154 final CompletableFuture<U> dst;
1155 Object r; T t; Throwable ex;
1156 if ((dst = this.dst) != null &&
1157 (fn = this.fn) != null &&
1158 (a = this.src) != null &&
1159 (r = a.result) != null &&
1160 compareAndSet(0, 1)) {
1161 if (r instanceof AltResult) {
1162 ex = ((AltResult)r).ex;
1163 t = null;
1164 }
1165 else {
1166 ex = null;
1167 @SuppressWarnings("unchecked") T tr = (T) r;
1168 t = tr;
1169 }
1170 CompletableFuture<U> c = null;
1171 U u = null;
1172 boolean complete = false;
1173 if (ex == null) {
1174 try {
1175 c = fn.apply(t);
1176 } catch (Throwable rex) {
1177 ex = rex;
1178 }
1179 }
1180 if (ex != null || c == null) {
1181 if (ex == null)
1182 ex = new NullPointerException();
1183 }
1184 else {
1185 ThenCopy<U> d = null;
1186 Object s;
1187 if ((s = c.result) == null) {
1188 CompletionNode p = new CompletionNode
1189 (d = new ThenCopy<U>(c, dst));
1190 while ((s = c.result) == null) {
1191 if (UNSAFE.compareAndSwapObject
1192 (c, COMPLETIONS, p.next = c.completions, p))
1193 break;
1194 }
1195 }
1196 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1197 complete = true;
1198 if (s instanceof AltResult) {
1199 ex = ((AltResult)s).ex; // no rewrap
1200 u = null;
1201 }
1202 else {
1203 @SuppressWarnings("unchecked") U us = (U) s;
1204 u = us;
1205 }
1206 }
1207 }
1208 if (complete || ex != null)
1209 dst.internalComplete(u, ex);
1210 if (c != null)
1211 c.helpPostComplete();
1212 }
1213 }
1214 private static final long serialVersionUID = 5232453952276885070L;
1215 }
1216
1217 // public methods
1218
1219 /**
1220 * Creates a new incomplete CompletableFuture.
1221 */
1222 public CompletableFuture() {
1223 }
1224
1225 /**
1226 * Asynchronously executes in the {@link
1227 * ForkJoinPool#commonPool()}, a task that completes the returned
1228 * CompletableFuture with the result of the given Supplier.
1229 *
1230 * @param supplier a function returning the value to be used
1231 * to complete the returned CompletableFuture
1232 * @return the CompletableFuture
1233 */
1234 public static <U> CompletableFuture<U> supplyAsync(Generator<U> supplier) {
1235 if (supplier == null) throw new NullPointerException();
1236 CompletableFuture<U> f = new CompletableFuture<U>();
1237 ForkJoinPool.commonPool().
1238 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1239 return f;
1240 }
1241
1242 /**
1243 * Asynchronously executes using the given executor, a task that
1244 * completes the returned CompletableFuture with the result of the
1245 * given Supplier.
1246 *
1247 * @param supplier a function returning the value to be used
1248 * to complete the returned CompletableFuture
1249 * @param executor the executor to use for asynchronous execution
1250 * @return the CompletableFuture
1251 */
1252 public static <U> CompletableFuture<U> supplyAsync(Generator<U> supplier,
1253 Executor executor) {
1254 if (executor == null || supplier == null)
1255 throw new NullPointerException();
1256 CompletableFuture<U> f = new CompletableFuture<U>();
1257 executor.execute(new AsyncSupply<U>(supplier, f));
1258 return f;
1259 }
1260
1261 /**
1262 * Asynchronously executes in the {@link
1263 * ForkJoinPool#commonPool()} a task that runs the given action,
1264 * and then completes the returned CompletableFuture.
1265 *
1266 * @param runnable the action to run before completing the
1267 * returned CompletableFuture
1268 * @return the CompletableFuture
1269 */
1270 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1271 if (runnable == null) throw new NullPointerException();
1272 CompletableFuture<Void> f = new CompletableFuture<Void>();
1273 ForkJoinPool.commonPool().
1274 execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1275 return f;
1276 }
1277
1278 /**
1279 * Asynchronously executes using the given executor, a task that
1280 * runs the given action, and then completes the returned
1281 * CompletableFuture.
1282 *
1283 * @param runnable the action to run before completing the
1284 * returned CompletableFuture
1285 * @param executor the executor to use for asynchronous execution
1286 * @return the CompletableFuture
1287 */
1288 public static CompletableFuture<Void> runAsync(Runnable runnable,
1289 Executor executor) {
1290 if (executor == null || runnable == null)
1291 throw new NullPointerException();
1292 CompletableFuture<Void> f = new CompletableFuture<Void>();
1293 executor.execute(new AsyncRun(runnable, f));
1294 return f;
1295 }
1296
1297 /**
1298 * Returns {@code true} if completed in any fashion: normally,
1299 * exceptionally, or via cancellation.
1300 *
1301 * @return {@code true} if completed
1302 */
1303 public boolean isDone() {
1304 return result != null;
1305 }
1306
1307 /**
1308 * Waits if necessary for the computation to complete, and then
1309 * retrieves its result.
1310 *
1311 * @return the computed result
1312 * @throws CancellationException if the computation was cancelled
1313 * @throws ExecutionException if the computation threw an
1314 * exception
1315 * @throws InterruptedException if the current thread was interrupted
1316 * while waiting
1317 */
1318 public T get() throws InterruptedException, ExecutionException {
1319 Object r; Throwable ex, cause;
1320 if ((r = result) == null && (r = waitingGet(true)) == null)
1321 throw new InterruptedException();
1322 if (!(r instanceof AltResult)) {
1323 @SuppressWarnings("unchecked") T tr = (T) r;
1324 return tr;
1325 }
1326 if ((ex = ((AltResult)r).ex) == null)
1327 return null;
1328 if (ex instanceof CancellationException)
1329 throw (CancellationException)ex;
1330 if ((ex instanceof CompletionException) &&
1331 (cause = ex.getCause()) != null)
1332 ex = cause;
1333 throw new ExecutionException(ex);
1334 }
1335
1336 /**
1337 * Waits if necessary for at most the given time for completion,
1338 * and then retrieves its result, if available.
1339 *
1340 * @param timeout the maximum time to wait
1341 * @param unit the time unit of the timeout argument
1342 * @return the computed result
1343 * @throws CancellationException if the computation was cancelled
1344 * @throws ExecutionException if the computation threw an
1345 * exception
1346 * @throws InterruptedException if the current thread was interrupted
1347 * while waiting
1348 * @throws TimeoutException if the wait timed out
1349 */
1350 public T get(long timeout, TimeUnit unit)
1351 throws InterruptedException, ExecutionException, TimeoutException {
1352 Object r; Throwable ex, cause;
1353 long nanos = unit.toNanos(timeout);
1354 if (Thread.interrupted())
1355 throw new InterruptedException();
1356 if ((r = result) == null)
1357 r = timedAwaitDone(nanos);
1358 if (!(r instanceof AltResult)) {
1359 @SuppressWarnings("unchecked") T tr = (T) r;
1360 return tr;
1361 }
1362 if ((ex = ((AltResult)r).ex) == null)
1363 return null;
1364 if (ex instanceof CancellationException)
1365 throw (CancellationException)ex;
1366 if ((ex instanceof CompletionException) &&
1367 (cause = ex.getCause()) != null)
1368 ex = cause;
1369 throw new ExecutionException(ex);
1370 }
1371
1372 /**
1373 * Returns the result value when complete, or throws an
1374 * (unchecked) exception if completed exceptionally. To better
1375 * conform with the use of common functional forms, if a
1376 * computation involved in the completion of this
1377 * CompletableFuture threw an exception, this method throws an
1378 * (unchecked) {@link CompletionException} with the underlying
1379 * exception as its cause.
1380 *
1381 * @return the result value
1382 * @throws CancellationException if the computation was cancelled
1383 * @throws CompletionException if a completion computation threw
1384 * an exception
1385 */
1386 public T join() {
1387 Object r; Throwable ex;
1388 if ((r = result) == null)
1389 r = waitingGet(false);
1390 if (!(r instanceof AltResult)) {
1391 @SuppressWarnings("unchecked") T tr = (T) r;
1392 return tr;
1393 }
1394 if ((ex = ((AltResult)r).ex) == null)
1395 return null;
1396 if (ex instanceof CancellationException)
1397 throw (CancellationException)ex;
1398 if (ex instanceof CompletionException)
1399 throw (CompletionException)ex;
1400 throw new CompletionException(ex);
1401 }
1402
1403 /**
1404 * Returns the result value (or throws any encountered exception)
1405 * if completed, else returns the given valueIfAbsent.
1406 *
1407 * @param valueIfAbsent the value to return if not completed
1408 * @return the result value, if completed, else the given valueIfAbsent
1409 * @throws CancellationException if the computation was cancelled
1410 * @throws CompletionException if a completion computation threw
1411 * an exception
1412 */
1413 public T getNow(T valueIfAbsent) {
1414 Object r; Throwable ex;
1415 if ((r = result) == null)
1416 return valueIfAbsent;
1417 if (!(r instanceof AltResult)) {
1418 @SuppressWarnings("unchecked") T tr = (T) r;
1419 return tr;
1420 }
1421 if ((ex = ((AltResult)r).ex) == null)
1422 return null;
1423 if (ex instanceof CancellationException)
1424 throw (CancellationException)ex;
1425 if (ex instanceof CompletionException)
1426 throw (CompletionException)ex;
1427 throw new CompletionException(ex);
1428 }
1429
1430 /**
1431 * If not already completed, sets the value returned by {@link
1432 * #get()} and related methods to the given value.
1433 *
1434 * @param value the result value
1435 * @return {@code true} if this invocation caused this CompletableFuture
1436 * to transition to a completed state, else {@code false}
1437 */
1438 public boolean complete(T value) {
1439 boolean triggered = result == null &&
1440 UNSAFE.compareAndSwapObject(this, RESULT, null,
1441 value == null ? NIL : value);
1442 postComplete();
1443 return triggered;
1444 }
1445
1446 /**
1447 * If not already completed, causes invocations of {@link #get()}
1448 * and related methods to throw the given exception.
1449 *
1450 * @param ex the exception
1451 * @return {@code true} if this invocation caused this CompletableFuture
1452 * to transition to a completed state, else {@code false}
1453 */
1454 public boolean completeExceptionally(Throwable ex) {
1455 if (ex == null) throw new NullPointerException();
1456 boolean triggered = result == null &&
1457 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1458 postComplete();
1459 return triggered;
1460 }
1461
1462 /**
1463 * Creates and returns a CompletableFuture that is completed with
1464 * the result of the given function of this CompletableFuture.
1465 * If this CompletableFuture completes exceptionally,
1466 * then the returned CompletableFuture also does so,
1467 * with a CompletionException holding this exception as
1468 * its cause.
1469 *
1470 * @param fn the function to use to compute the value of
1471 * the returned CompletableFuture
1472 * @return the new CompletableFuture
1473 */
1474 public <U> CompletableFuture<U> thenApply(Fun<? super T,? extends U> fn) {
1475 return doThenApply(fn, null);
1476 }
1477
1478 /**
1479 * Creates and returns a CompletableFuture that is asynchronously
1480 * completed using the {@link ForkJoinPool#commonPool()} with the
1481 * result of the given function of this CompletableFuture. If
1482 * this CompletableFuture completes exceptionally, then the
1483 * returned CompletableFuture also does so, with a
1484 * CompletionException holding this exception as its cause.
1485 *
1486 * @param fn the function to use to compute the value of
1487 * the returned CompletableFuture
1488 * @return the new CompletableFuture
1489 */
1490 public <U> CompletableFuture<U> thenApplyAsync(Fun<? super T,? extends U> fn) {
1491 return doThenApply(fn, ForkJoinPool.commonPool());
1492 }
1493
1494 /**
1495 * Creates and returns a CompletableFuture that is asynchronously
1496 * completed using the given executor with the result of the given
1497 * function of this CompletableFuture. If this CompletableFuture
1498 * completes exceptionally, then the returned CompletableFuture
1499 * also does so, with a CompletionException holding this exception as
1500 * its cause.
1501 *
1502 * @param fn the function to use to compute the value of
1503 * the returned CompletableFuture
1504 * @param executor the executor to use for asynchronous execution
1505 * @return the new CompletableFuture
1506 */
1507 public <U> CompletableFuture<U> thenApplyAsync(Fun<? super T,? extends U> fn,
1508 Executor executor) {
1509 if (executor == null) throw new NullPointerException();
1510 return doThenApply(fn, executor);
1511 }
1512
1513 private <U> CompletableFuture<U> doThenApply(Fun<? super T,? extends U> fn,
1514 Executor e) {
1515 if (fn == null) throw new NullPointerException();
1516 CompletableFuture<U> dst = new CompletableFuture<U>();
1517 ApplyCompletion<T,U> d = null;
1518 Object r;
1519 if ((r = result) == null) {
1520 CompletionNode p = new CompletionNode
1521 (d = new ApplyCompletion<T,U>(this, fn, dst, e));
1522 while ((r = result) == null) {
1523 if (UNSAFE.compareAndSwapObject
1524 (this, COMPLETIONS, p.next = completions, p))
1525 break;
1526 }
1527 }
1528 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1529 T t; Throwable ex;
1530 if (r instanceof AltResult) {
1531 ex = ((AltResult)r).ex;
1532 t = null;
1533 }
1534 else {
1535 ex = null;
1536 @SuppressWarnings("unchecked") T tr = (T) r;
1537 t = tr;
1538 }
1539 U u = null;
1540 if (ex == null) {
1541 try {
1542 if (e != null)
1543 e.execute(new AsyncApply<T,U>(t, fn, dst));
1544 else
1545 u = fn.apply(t);
1546 } catch (Throwable rex) {
1547 ex = rex;
1548 }
1549 }
1550 if (e == null || ex != null)
1551 dst.internalComplete(u, ex);
1552 }
1553 helpPostComplete();
1554 return dst;
1555 }
1556
1557 /**
1558 * Creates and returns a CompletableFuture that is completed after
1559 * performing the given action with this CompletableFuture's
1560 * result when it completes. If this CompletableFuture
1561 * completes exceptionally, then the returned CompletableFuture
1562 * also does so, with a CompletionException holding this exception as
1563 * its cause.
1564 *
1565 * @param block the action to perform before completing the
1566 * returned CompletableFuture
1567 * @return the new CompletableFuture
1568 */
1569 public CompletableFuture<Void> thenAccept(Action<? super T> block) {
1570 return doThenAccept(block, null);
1571 }
1572
1573 /**
1574 * Creates and returns a CompletableFuture that is asynchronously
1575 * completed using the {@link ForkJoinPool#commonPool()} with this
1576 * CompletableFuture's result when it completes. If this
1577 * CompletableFuture completes exceptionally, then the returned
1578 * CompletableFuture also does so, with a CompletionException holding
1579 * this exception as its cause.
1580 *
1581 * @param block the action to perform before completing the
1582 * returned CompletableFuture
1583 * @return the new CompletableFuture
1584 */
1585 public CompletableFuture<Void> thenAcceptAsync(Action<? super T> block) {
1586 return doThenAccept(block, ForkJoinPool.commonPool());
1587 }
1588
1589 /**
1590 * Creates and returns a CompletableFuture that is asynchronously
1591 * completed using the given executor with this
1592 * CompletableFuture's result when it completes. If this
1593 * CompletableFuture completes exceptionally, then the returned
1594 * CompletableFuture also does so, with a CompletionException holding
1595 * this exception as its cause.
1596 *
1597 * @param block the action to perform before completing the
1598 * returned CompletableFuture
1599 * @param executor the executor to use for asynchronous execution
1600 * @return the new CompletableFuture
1601 */
1602 public CompletableFuture<Void> thenAcceptAsync(Action<? super T> block,
1603 Executor executor) {
1604 if (executor == null) throw new NullPointerException();
1605 return doThenAccept(block, executor);
1606 }
1607
1608 private CompletableFuture<Void> doThenAccept(Action<? super T> fn,
1609 Executor e) {
1610 if (fn == null) throw new NullPointerException();
1611 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1612 AcceptCompletion<T> d = null;
1613 Object r;
1614 if ((r = result) == null) {
1615 CompletionNode p = new CompletionNode
1616 (d = new AcceptCompletion<T>(this, fn, dst, e));
1617 while ((r = result) == null) {
1618 if (UNSAFE.compareAndSwapObject
1619 (this, COMPLETIONS, p.next = completions, p))
1620 break;
1621 }
1622 }
1623 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1624 T t; Throwable ex;
1625 if (r instanceof AltResult) {
1626 ex = ((AltResult)r).ex;
1627 t = null;
1628 }
1629 else {
1630 ex = null;
1631 @SuppressWarnings("unchecked") T tr = (T) r;
1632 t = tr;
1633 }
1634 if (ex == null) {
1635 try {
1636 if (e != null)
1637 e.execute(new AsyncAccept<T>(t, fn, dst));
1638 else
1639 fn.accept(t);
1640 } catch (Throwable rex) {
1641 ex = rex;
1642 }
1643 }
1644 if (e == null || ex != null)
1645 dst.internalComplete(null, ex);
1646 }
1647 helpPostComplete();
1648 return dst;
1649 }
1650
1651 /**
1652 * Creates and returns a CompletableFuture that is completed after
1653 * performing the given action when this CompletableFuture
1654 * completes. If this CompletableFuture completes exceptionally,
1655 * then the returned CompletableFuture also does so, with a
1656 * CompletionException holding this exception as its cause.
1657 *
1658 * @param action the action to perform before completing the
1659 * returned CompletableFuture
1660 * @return the new CompletableFuture
1661 */
1662 public CompletableFuture<Void> thenRun(Runnable action) {
1663 return doThenRun(action, null);
1664 }
1665
1666 /**
1667 * Creates and returns a CompletableFuture that is asynchronously
1668 * completed using the {@link ForkJoinPool#commonPool()} after
1669 * performing the given action when this CompletableFuture
1670 * completes. If this CompletableFuture completes exceptionally,
1671 * then the returned CompletableFuture also does so, with a
1672 * CompletionException holding this exception as its cause.
1673 *
1674 * @param action the action to perform before completing the
1675 * returned CompletableFuture
1676 * @return the new CompletableFuture
1677 */
1678 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1679 return doThenRun(action, ForkJoinPool.commonPool());
1680 }
1681
1682 /**
1683 * Creates and returns a CompletableFuture that is asynchronously
1684 * completed using the given executor after performing the given
1685 * action when this CompletableFuture completes. If this
1686 * CompletableFuture completes exceptionally, then the returned
1687 * CompletableFuture also does so, with a CompletionException holding
1688 * this exception as its cause.
1689 *
1690 * @param action the action to perform before completing the
1691 * returned CompletableFuture
1692 * @param executor the executor to use for asynchronous execution
1693 * @return the new CompletableFuture
1694 */
1695 public CompletableFuture<Void> thenRunAsync(Runnable action,
1696 Executor executor) {
1697 if (executor == null) throw new NullPointerException();
1698 return doThenRun(action, executor);
1699 }
1700
1701 private CompletableFuture<Void> doThenRun(Runnable action,
1702 Executor e) {
1703 if (action == null) throw new NullPointerException();
1704 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1705 RunCompletion<T> d = null;
1706 Object r;
1707 if ((r = result) == null) {
1708 CompletionNode p = new CompletionNode
1709 (d = new RunCompletion<T>(this, action, dst, e));
1710 while ((r = result) == null) {
1711 if (UNSAFE.compareAndSwapObject
1712 (this, COMPLETIONS, p.next = completions, p))
1713 break;
1714 }
1715 }
1716 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1717 Throwable ex;
1718 if (r instanceof AltResult)
1719 ex = ((AltResult)r).ex;
1720 else
1721 ex = null;
1722 if (ex == null) {
1723 try {
1724 if (e != null)
1725 e.execute(new AsyncRun(action, dst));
1726 else
1727 action.run();
1728 } catch (Throwable rex) {
1729 ex = rex;
1730 }
1731 }
1732 if (e == null || ex != null)
1733 dst.internalComplete(null, ex);
1734 }
1735 helpPostComplete();
1736 return dst;
1737 }
1738
1739 /**
1740 * Creates and returns a CompletableFuture that is completed with
1741 * the result of the given function of this and the other given
1742 * CompletableFuture's results when both complete. If this or
1743 * the other CompletableFuture complete exceptionally, then the
1744 * returned CompletableFuture also does so, with a
1745 * CompletionException holding the exception as its cause.
1746 *
1747 * @param other the other CompletableFuture
1748 * @param fn the function to use to compute the value of
1749 * the returned CompletableFuture
1750 * @return the new CompletableFuture
1751 */
1752 public <U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other,
1753 BiFun<? super T,? super U,? extends V> fn) {
1754 return doThenBiApply(other, fn, null);
1755 }
1756
1757 /**
1758 * Creates and returns a CompletableFuture that is asynchronously
1759 * completed using the {@link ForkJoinPool#commonPool()} with
1760 * the result of the given function of this and the other given
1761 * CompletableFuture's results when both complete. If this or
1762 * the other CompletableFuture complete exceptionally, then the
1763 * returned CompletableFuture also does so, with a
1764 * CompletionException holding the exception as its cause.
1765 *
1766 * @param other the other CompletableFuture
1767 * @param fn the function to use to compute the value of
1768 * the returned CompletableFuture
1769 * @return the new CompletableFuture
1770 */
1771 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
1772 BiFun<? super T,? super U,? extends V> fn) {
1773 return doThenBiApply(other, fn, ForkJoinPool.commonPool());
1774 }
1775
1776 /**
1777 * Creates and returns a CompletableFuture that is
1778 * asynchronously completed using the given executor with the
1779 * result of the given function of this and the other given
1780 * CompletableFuture's results when both complete. If this or
1781 * the other CompletableFuture complete exceptionally, then the
1782 * returned CompletableFuture also does so, with a
1783 * CompletionException holding the exception as its cause.
1784 *
1785 * @param other the other CompletableFuture
1786 * @param fn the function to use to compute the value of
1787 * the returned CompletableFuture
1788 * @param executor the executor to use for asynchronous execution
1789 * @return the new CompletableFuture
1790 */
1791 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
1792 BiFun<? super T,? super U,? extends V> fn,
1793 Executor executor) {
1794 if (executor == null) throw new NullPointerException();
1795 return doThenBiApply(other, fn, executor);
1796 }
1797
1798 private <U,V> CompletableFuture<V> doThenBiApply(CompletableFuture<? extends U> other,
1799 BiFun<? super T,? super U,? extends V> fn,
1800 Executor e) {
1801 if (other == null || fn == null) throw new NullPointerException();
1802 CompletableFuture<V> dst = new CompletableFuture<V>();
1803 BiApplyCompletion<T,U,V> d = null;
1804 Object r, s = null;
1805 if ((r = result) == null || (s = other.result) == null) {
1806 d = new BiApplyCompletion<T,U,V>(this, other, fn, dst, e);
1807 CompletionNode q = null, p = new CompletionNode(d);
1808 while ((r == null && (r = result) == null) ||
1809 (s == null && (s = other.result) == null)) {
1810 if (q != null) {
1811 if (s != null ||
1812 UNSAFE.compareAndSwapObject
1813 (other, COMPLETIONS, q.next = other.completions, q))
1814 break;
1815 }
1816 else if (r != null ||
1817 UNSAFE.compareAndSwapObject
1818 (this, COMPLETIONS, p.next = completions, p)) {
1819 if (s != null)
1820 break;
1821 q = new CompletionNode(d);
1822 }
1823 }
1824 }
1825 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1826 T t; U u; Throwable ex;
1827 if (r instanceof AltResult) {
1828 ex = ((AltResult)r).ex;
1829 t = null;
1830 }
1831 else {
1832 ex = null;
1833 @SuppressWarnings("unchecked") T tr = (T) r;
1834 t = tr;
1835 }
1836 if (ex != null)
1837 u = null;
1838 else if (s instanceof AltResult) {
1839 ex = ((AltResult)s).ex;
1840 u = null;
1841 }
1842 else {
1843 @SuppressWarnings("unchecked") U us = (U) s;
1844 u = us;
1845 }
1846 V v = null;
1847 if (ex == null) {
1848 try {
1849 if (e != null)
1850 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
1851 else
1852 v = fn.apply(t, u);
1853 } catch (Throwable rex) {
1854 ex = rex;
1855 }
1856 }
1857 if (e == null || ex != null)
1858 dst.internalComplete(v, ex);
1859 }
1860 helpPostComplete();
1861 other.helpPostComplete();
1862 return dst;
1863 }
1864
1865 /**
1866 * Creates and returns a CompletableFuture that is completed with
1867 * the results of this and the other given CompletableFuture if
1868 * both complete. If this and/or the other CompletableFuture
1869 * complete exceptionally, then the returned CompletableFuture
1870 * also does so, with a CompletionException holding one of these
1871 * exceptions as its cause.
1872 *
1873 * @param other the other CompletableFuture
1874 * @param block the action to perform before completing the
1875 * returned CompletableFuture
1876 * @return the new CompletableFuture
1877 */
1878 public <U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other,
1879 BiAction<? super T, ? super U> block) {
1880 return doThenBiAccept(other, block, null);
1881 }
1882
1883 /**
1884 * Creates and returns a CompletableFuture that is completed
1885 * asynchronously using the {@link ForkJoinPool#commonPool()} with
1886 * the results of this and the other given CompletableFuture when
1887 * both complete. If this and/or the other CompletableFuture
1888 * complete exceptionally, then the returned CompletableFuture
1889 * also does so, with a CompletionException holding one of these
1890 * exceptions as its cause.
1891 *
1892 * @param other the other CompletableFuture
1893 * @param block the action to perform before completing the
1894 * returned CompletableFuture
1895 * @return the new CompletableFuture
1896 */
1897 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
1898 BiAction<? super T, ? super U> block) {
1899 return doThenBiAccept(other, block, ForkJoinPool.commonPool());
1900 }
1901
1902 /**
1903 * Creates and returns a CompletableFuture that is completed
1904 * asynchronously using the given executor with the results of
1905 * this and the other given CompletableFuture when both complete.
1906 * If this and/or the other CompletableFuture complete
1907 * exceptionally, then the returned CompletableFuture also does
1908 * so, with a CompletionException holding one of these exceptions as
1909 * its cause.
1910 *
1911 * @param other the other CompletableFuture
1912 * @param block the action to perform before completing the
1913 * returned CompletableFuture
1914 * @param executor the executor to use for asynchronous execution
1915 * @return the new CompletableFuture
1916 */
1917 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
1918 BiAction<? super T, ? super U> block,
1919 Executor executor) {
1920 if (executor == null) throw new NullPointerException();
1921 return doThenBiAccept(other, block, executor);
1922 }
1923
1924 private <U> CompletableFuture<Void> doThenBiAccept(CompletableFuture<? extends U> other,
1925 BiAction<? super T,? super U> fn,
1926 Executor e) {
1927 if (other == null || fn == null) throw new NullPointerException();
1928 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1929 BiAcceptCompletion<T,U> d = null;
1930 Object r, s = null;
1931 if ((r = result) == null || (s = other.result) == null) {
1932 d = new BiAcceptCompletion<T,U>(this, other, fn, dst, e);
1933 CompletionNode q = null, p = new CompletionNode(d);
1934 while ((r == null && (r = result) == null) ||
1935 (s == null && (s = other.result) == null)) {
1936 if (q != null) {
1937 if (s != null ||
1938 UNSAFE.compareAndSwapObject
1939 (other, COMPLETIONS, q.next = other.completions, q))
1940 break;
1941 }
1942 else if (r != null ||
1943 UNSAFE.compareAndSwapObject
1944 (this, COMPLETIONS, p.next = completions, p)) {
1945 if (s != null)
1946 break;
1947 q = new CompletionNode(d);
1948 }
1949 }
1950 }
1951 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1952 T t; U u; Throwable ex;
1953 if (r instanceof AltResult) {
1954 ex = ((AltResult)r).ex;
1955 t = null;
1956 }
1957 else {
1958 ex = null;
1959 @SuppressWarnings("unchecked") T tr = (T) r;
1960 t = tr;
1961 }
1962 if (ex != null)
1963 u = null;
1964 else if (s instanceof AltResult) {
1965 ex = ((AltResult)s).ex;
1966 u = null;
1967 }
1968 else {
1969 @SuppressWarnings("unchecked") U us = (U) s;
1970 u = us;
1971 }
1972 if (ex == null) {
1973 try {
1974 if (e != null)
1975 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
1976 else
1977 fn.accept(t, u);
1978 } catch (Throwable rex) {
1979 ex = rex;
1980 }
1981 }
1982 if (e == null || ex != null)
1983 dst.internalComplete(null, ex);
1984 }
1985 helpPostComplete();
1986 other.helpPostComplete();
1987 return dst;
1988 }
1989
1990 /**
1991 * Creates and returns a CompletableFuture that is completed
1992 * when this and the other given CompletableFuture both
1993 * complete. If this and/or the other CompletableFuture complete
1994 * exceptionally, then the returned CompletableFuture also does
1995 * so, with a CompletionException holding one of these exceptions as
1996 * its cause.
1997 *
1998 * @param other the other CompletableFuture
1999 * @param action the action to perform before completing the
2000 * returned CompletableFuture
2001 * @return the new CompletableFuture
2002 */
2003 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2004 Runnable action) {
2005 return doThenBiRun(other, action, null);
2006 }
2007
2008 /**
2009 * Creates and returns a CompletableFuture that is completed
2010 * asynchronously using the {@link ForkJoinPool#commonPool()}
2011 * when this and the other given CompletableFuture both
2012 * complete. If this and/or the other CompletableFuture complete
2013 * exceptionally, then the returned CompletableFuture also does
2014 * so, with a CompletionException holding one of these exceptions as
2015 * its cause.
2016 *
2017 * @param other the other CompletableFuture
2018 * @param action the action to perform before completing the
2019 * returned CompletableFuture
2020 * @return the new CompletableFuture
2021 */
2022 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2023 Runnable action) {
2024 return doThenBiRun(other, action, ForkJoinPool.commonPool());
2025 }
2026
2027 /**
2028 * Creates and returns a CompletableFuture that is completed
2029 * asynchronously using the given executor
2030 * when this and the other given CompletableFuture both
2031 * complete. If this and/or the other CompletableFuture complete
2032 * exceptionally, then the returned CompletableFuture also does
2033 * so, with a CompletionException holding one of these exceptions as
2034 * its cause.
2035 *
2036 * @param other the other CompletableFuture
2037 * @param action the action to perform before completing the
2038 * returned CompletableFuture
2039 * @param executor the executor to use for asynchronous execution
2040 * @return the new CompletableFuture
2041 */
2042 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2043 Runnable action,
2044 Executor executor) {
2045 if (executor == null) throw new NullPointerException();
2046 return doThenBiRun(other, action, executor);
2047 }
2048
2049 private CompletableFuture<Void> doThenBiRun(CompletableFuture<?> other,
2050 Runnable action,
2051 Executor e) {
2052 if (other == null || action == null) throw new NullPointerException();
2053 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2054 BiRunCompletion<T> d = null;
2055 Object r, s = null;
2056 if ((r = result) == null || (s = other.result) == null) {
2057 d = new BiRunCompletion<T>(this, other, action, dst, e);
2058 CompletionNode q = null, p = new CompletionNode(d);
2059 while ((r == null && (r = result) == null) ||
2060 (s == null && (s = other.result) == null)) {
2061 if (q != null) {
2062 if (s != null ||
2063 UNSAFE.compareAndSwapObject
2064 (other, COMPLETIONS, q.next = other.completions, q))
2065 break;
2066 }
2067 else if (r != null ||
2068 UNSAFE.compareAndSwapObject
2069 (this, COMPLETIONS, p.next = completions, p)) {
2070 if (s != null)
2071 break;
2072 q = new CompletionNode(d);
2073 }
2074 }
2075 }
2076 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2077 Throwable ex;
2078 if (r instanceof AltResult)
2079 ex = ((AltResult)r).ex;
2080 else
2081 ex = null;
2082 if (ex == null && (s instanceof AltResult))
2083 ex = ((AltResult)s).ex;
2084 if (ex == null) {
2085 try {
2086 if (e != null)
2087 e.execute(new AsyncRun(action, dst));
2088 else
2089 action.run();
2090 } catch (Throwable rex) {
2091 ex = rex;
2092 }
2093 }
2094 if (e == null || ex != null)
2095 dst.internalComplete(null, ex);
2096 }
2097 helpPostComplete();
2098 other.helpPostComplete();
2099 return dst;
2100 }
2101
2102 /**
2103 * Creates and returns a CompletableFuture that is completed with
2104 * the result of the given function of either this or the other
2105 * given CompletableFuture's results when either complete. If
2106 * this and/or the other CompletableFuture complete exceptionally,
2107 * then the returned CompletableFuture may also do so, with a
2108 * CompletionException holding one of these exceptions as its cause.
2109 * No guarantees are made about which result or exception is used
2110 * in the returned CompletableFuture.
2111 *
2112 * @param other the other CompletableFuture
2113 * @param fn the function to use to compute the value of
2114 * the returned CompletableFuture
2115 * @return the new CompletableFuture
2116 */
2117 public <U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other,
2118 Fun<? super T, U> fn) {
2119 return doOrApply(other, fn, null);
2120 }
2121
2122 /**
2123 * Creates and returns a CompletableFuture that is completed
2124 * asynchronously using the {@link ForkJoinPool#commonPool()} with
2125 * the result of the given function of either this or the other
2126 * given CompletableFuture's results when either complete. If
2127 * this and/or the other CompletableFuture complete exceptionally,
2128 * then the returned CompletableFuture may also do so, with a
2129 * CompletionException holding one of these exceptions as its cause.
2130 * No guarantees are made about which result or exception is used
2131 * in the returned CompletableFuture.
2132 *
2133 * @param other the other CompletableFuture
2134 * @param fn the function to use to compute the value of
2135 * the returned CompletableFuture
2136 * @return the new CompletableFuture
2137 */
2138 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
2139 Fun<? super T, U> fn) {
2140 return doOrApply(other, fn, ForkJoinPool.commonPool());
2141 }
2142
2143 /**
2144 * Creates and returns a CompletableFuture that is completed
2145 * asynchronously using the given executor with the result of the
2146 * given function of either this or the other given
2147 * CompletableFuture's results when either complete. If this
2148 * and/or the other CompletableFuture complete exceptionally, then
2149 * the returned CompletableFuture may also do so, with a
2150 * CompletionException holding one of these exceptions as its cause.
2151 * No guarantees are made about which result or exception is used
2152 * in the returned CompletableFuture.
2153 *
2154 * @param other the other CompletableFuture
2155 * @param fn the function to use to compute the value of
2156 * the returned CompletableFuture
2157 * @param executor the executor to use for asynchronous execution
2158 * @return the new CompletableFuture
2159 */
2160 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
2161 Fun<? super T, U> fn,
2162 Executor executor) {
2163 if (executor == null) throw new NullPointerException();
2164 return doOrApply(other, fn, executor);
2165 }
2166
2167 private <U> CompletableFuture<U> doOrApply(CompletableFuture<? extends T> other,
2168 Fun<? super T, U> fn,
2169 Executor e) {
2170 if (other == null || fn == null) throw new NullPointerException();
2171 CompletableFuture<U> dst = new CompletableFuture<U>();
2172 OrApplyCompletion<T,U> d = null;
2173 Object r;
2174 if ((r = result) == null && (r = other.result) == null) {
2175 d = new OrApplyCompletion<T,U>(this, other, fn, dst, e);
2176 CompletionNode q = null, p = new CompletionNode(d);
2177 while ((r = result) == null && (r = other.result) == null) {
2178 if (q != null) {
2179 if (UNSAFE.compareAndSwapObject
2180 (other, COMPLETIONS, q.next = other.completions, q))
2181 break;
2182 }
2183 else if (UNSAFE.compareAndSwapObject
2184 (this, COMPLETIONS, p.next = completions, p))
2185 q = new CompletionNode(d);
2186 }
2187 }
2188 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2189 T t; Throwable ex;
2190 if (r instanceof AltResult) {
2191 ex = ((AltResult)r).ex;
2192 t = null;
2193 }
2194 else {
2195 ex = null;
2196 @SuppressWarnings("unchecked") T tr = (T) r;
2197 t = tr;
2198 }
2199 U u = null;
2200 if (ex == null) {
2201 try {
2202 if (e != null)
2203 e.execute(new AsyncApply<T,U>(t, fn, dst));
2204 else
2205 u = fn.apply(t);
2206 } catch (Throwable rex) {
2207 ex = rex;
2208 }
2209 }
2210 if (e == null || ex != null)
2211 dst.internalComplete(u, ex);
2212 }
2213 helpPostComplete();
2214 other.helpPostComplete();
2215 return dst;
2216 }
2217
2218 /**
2219 * Creates and returns a CompletableFuture that is completed after
2220 * performing the given action with the result of either this or the
2221 * other given CompletableFuture's result, when either complete.
2222 * If this and/or the other CompletableFuture complete
2223 * exceptionally, then the returned CompletableFuture may also do
2224 * so, with a CompletionException holding one of these exceptions as
2225 * its cause. No guarantees are made about which exception is
2226 * used in the returned CompletableFuture.
2227 *
2228 * @param other the other CompletableFuture
2229 * @param block the action to perform before completing the
2230 * returned CompletableFuture
2231 * @return the new CompletableFuture
2232 */
2233 public CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other,
2234 Action<? super T> block) {
2235 return doOrAccept(other, block, null);
2236 }
2237
2238 /**
2239 * Creates and returns a CompletableFuture that is completed
2240 * asynchronously using the {@link ForkJoinPool#commonPool()},
2241 * performing the given action with the result of either this or
2242 * the other given CompletableFuture's result, when either
2243 * complete. If this and/or the other CompletableFuture complete
2244 * exceptionally, then the returned CompletableFuture may also do
2245 * so, with a CompletionException holding one of these exceptions as
2246 * its cause. No guarantees are made about which exception is
2247 * used in the returned CompletableFuture.
2248 *
2249 * @param other the other CompletableFuture
2250 * @param block the action to perform before completing the
2251 * returned CompletableFuture
2252 * @return the new CompletableFuture
2253 */
2254 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
2255 Action<? super T> block) {
2256 return doOrAccept(other, block, ForkJoinPool.commonPool());
2257 }
2258
2259 /**
2260 * Creates and returns a CompletableFuture that is completed
2261 * asynchronously using the given executor,
2262 * performing the given action with the result of either this or
2263 * the other given CompletableFuture's result, when either
2264 * complete. If this and/or the other CompletableFuture complete
2265 * exceptionally, then the returned CompletableFuture may also do
2266 * so, with a CompletionException holding one of these exceptions as
2267 * its cause. No guarantees are made about which exception is
2268 * used in the returned CompletableFuture.
2269 *
2270 * @param other the other CompletableFuture
2271 * @param block the action to perform before completing the
2272 * returned CompletableFuture
2273 * @param executor the executor to use for asynchronous execution
2274 * @return the new CompletableFuture
2275 */
2276 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
2277 Action<? super T> block,
2278 Executor executor) {
2279 if (executor == null) throw new NullPointerException();
2280 return doOrAccept(other, block, executor);
2281 }
2282
2283 private CompletableFuture<Void> doOrAccept(CompletableFuture<? extends T> other,
2284 Action<? super T> fn,
2285 Executor e) {
2286 if (other == null || fn == null) throw new NullPointerException();
2287 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2288 OrAcceptCompletion<T> d = null;
2289 Object r;
2290 if ((r = result) == null && (r = other.result) == null) {
2291 d = new OrAcceptCompletion<T>(this, other, fn, dst, e);
2292 CompletionNode q = null, p = new CompletionNode(d);
2293 while ((r = result) == null && (r = other.result) == null) {
2294 if (q != null) {
2295 if (UNSAFE.compareAndSwapObject
2296 (other, COMPLETIONS, q.next = other.completions, q))
2297 break;
2298 }
2299 else if (UNSAFE.compareAndSwapObject
2300 (this, COMPLETIONS, p.next = completions, p))
2301 q = new CompletionNode(d);
2302 }
2303 }
2304 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2305 T t; Throwable ex;
2306 if (r instanceof AltResult) {
2307 ex = ((AltResult)r).ex;
2308 t = null;
2309 }
2310 else {
2311 ex = null;
2312 @SuppressWarnings("unchecked") T tr = (T) r;
2313 t = tr;
2314 }
2315 if (ex == null) {
2316 try {
2317 if (e != null)
2318 e.execute(new AsyncAccept<T>(t, fn, dst));
2319 else
2320 fn.accept(t);
2321 } catch (Throwable rex) {
2322 ex = rex;
2323 }
2324 }
2325 if (e == null || ex != null)
2326 dst.internalComplete(null, ex);
2327 }
2328 helpPostComplete();
2329 other.helpPostComplete();
2330 return dst;
2331 }
2332
2333 /**
2334 * Creates and returns a CompletableFuture that is completed
2335 * after this or the other given CompletableFuture complete. If
2336 * this and/or the other CompletableFuture complete exceptionally,
2337 * then the returned CompletableFuture may also do so, with a
2338 * CompletionException holding one of these exceptions as its cause.
2339 * No guarantees are made about which exception is used in the
2340 * returned CompletableFuture.
2341 *
2342 * @param other the other CompletableFuture
2343 * @param action the action to perform before completing the
2344 * returned CompletableFuture
2345 * @return the new CompletableFuture
2346 */
2347 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2348 Runnable action) {
2349 return doOrRun(other, action, null);
2350 }
2351
2352 /**
2353 * Creates and returns a CompletableFuture that is completed
2354 * asynchronously using the {@link ForkJoinPool#commonPool()}
2355 * after this or the other given CompletableFuture complete. If
2356 * this and/or the other CompletableFuture complete exceptionally,
2357 * then the returned CompletableFuture may also do so, with a
2358 * CompletionException holding one of these exceptions as its cause.
2359 * No guarantees are made about which exception is used in the
2360 * returned CompletableFuture.
2361 *
2362 * @param other the other CompletableFuture
2363 * @param action the action to perform before completing the
2364 * returned CompletableFuture
2365 * @return the new CompletableFuture
2366 */
2367 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
2368 Runnable action) {
2369 return doOrRun(other, action, ForkJoinPool.commonPool());
2370 }
2371
2372 /**
2373 * Creates and returns a CompletableFuture that is completed
2374 * asynchronously using the given executor after this or the other
2375 * given CompletableFuture complete. If this and/or the other
2376 * CompletableFuture complete exceptionally, then the returned
2377 * CompletableFuture may also do so, with a CompletionException
2378 * holding one of these exceptions as its cause. No guarantees are
2379 * made about which exception is used in the returned
2380 * CompletableFuture.
2381 *
2382 * @param other the other CompletableFuture
2383 * @param action the action to perform before completing the
2384 * returned CompletableFuture
2385 * @param executor the executor to use for asynchronous execution
2386 * @return the new CompletableFuture
2387 */
2388 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
2389 Runnable action,
2390 Executor executor) {
2391 if (executor == null) throw new NullPointerException();
2392 return doOrRun(other, action, executor);
2393 }
2394
2395 private CompletableFuture<Void> doOrRun(CompletableFuture<?> other,
2396 Runnable action,
2397 Executor e) {
2398 if (other == null || action == null) throw new NullPointerException();
2399 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2400 OrRunCompletion<T> d = null;
2401 Object r;
2402 if ((r = result) == null && (r = other.result) == null) {
2403 d = new OrRunCompletion<T>(this, other, action, dst, e);
2404 CompletionNode q = null, p = new CompletionNode(d);
2405 while ((r = result) == null && (r = other.result) == null) {
2406 if (q != null) {
2407 if (UNSAFE.compareAndSwapObject
2408 (other, COMPLETIONS, q.next = other.completions, q))
2409 break;
2410 }
2411 else if (UNSAFE.compareAndSwapObject
2412 (this, COMPLETIONS, p.next = completions, p))
2413 q = new CompletionNode(d);
2414 }
2415 }
2416 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2417 Throwable ex;
2418 if (r instanceof AltResult)
2419 ex = ((AltResult)r).ex;
2420 else
2421 ex = null;
2422 if (ex == null) {
2423 try {
2424 if (e != null)
2425 e.execute(new AsyncRun(action, dst));
2426 else
2427 action.run();
2428 } catch (Throwable rex) {
2429 ex = rex;
2430 }
2431 }
2432 if (e == null || ex != null)
2433 dst.internalComplete(null, ex);
2434 }
2435 helpPostComplete();
2436 other.helpPostComplete();
2437 return dst;
2438 }
2439
2440 /**
2441 * Returns a CompletableFuture (or an equivalent one) produced by
2442 * the given function of the result of this CompletableFuture when
2443 * completed. If this CompletableFuture completes exceptionally,
2444 * then the returned CompletableFuture also does so, with a
2445 * CompletionException holding this exception as its cause.
2446 *
2447 * @param fn the function returning a new CompletableFuture
2448 * @return the CompletableFuture, that {@code isDone()} upon
2449 * return if completed by the given function, or an exception
2450 * occurs
2451 */
2452 public <U> CompletableFuture<U> thenCompose(Fun<? super T,
2453 CompletableFuture<U>> fn) {
2454 if (fn == null) throw new NullPointerException();
2455 CompletableFuture<U> dst = null;
2456 ComposeCompletion<T,U> d = null;
2457 Object r;
2458 if ((r = result) == null) {
2459 dst = new CompletableFuture<U>();
2460 CompletionNode p = new CompletionNode
2461 (d = new ComposeCompletion<T,U>(this, fn, dst));
2462 while ((r = result) == null) {
2463 if (UNSAFE.compareAndSwapObject
2464 (this, COMPLETIONS, p.next = completions, p))
2465 break;
2466 }
2467 }
2468 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2469 T t; Throwable ex;
2470 if (r instanceof AltResult) {
2471 ex = ((AltResult)r).ex;
2472 t = null;
2473 }
2474 else {
2475 ex = null;
2476 @SuppressWarnings("unchecked") T tr = (T) r;
2477 t = tr;
2478 }
2479 if (ex == null) {
2480 try {
2481 dst = fn.apply(t);
2482 } catch (Throwable rex) {
2483 ex = rex;
2484 }
2485 }
2486 if (dst == null) {
2487 dst = new CompletableFuture<U>();
2488 if (ex == null)
2489 ex = new NullPointerException();
2490 }
2491 if (ex != null)
2492 dst.internalComplete(null, ex);
2493 }
2494 helpPostComplete();
2495 dst.helpPostComplete();
2496 return dst;
2497 }
2498
2499 /**
2500 * Creates and returns a CompletableFuture that is completed with
2501 * the result of the given function of the exception triggering
2502 * this CompletableFuture's completion when it completes
2503 * exceptionally; Otherwise, if this CompletableFuture completes
2504 * normally, then the returned CompletableFuture also completes
2505 * normally with the same value.
2506 *
2507 * @param fn the function to use to compute the value of the
2508 * returned CompletableFuture if this CompletableFuture completed
2509 * exceptionally
2510 * @return the new CompletableFuture
2511 */
2512 public CompletableFuture<T> exceptionally(Fun<Throwable, ? extends T> fn) {
2513 if (fn == null) throw new NullPointerException();
2514 CompletableFuture<T> dst = new CompletableFuture<T>();
2515 ExceptionCompletion<T> d = null;
2516 Object r;
2517 if ((r = result) == null) {
2518 CompletionNode p =
2519 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2520 while ((r = result) == null) {
2521 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2522 p.next = completions, p))
2523 break;
2524 }
2525 }
2526 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2527 T t = null; Throwable ex, dx = null;
2528 if (r instanceof AltResult) {
2529 if ((ex = ((AltResult)r).ex) != null) {
2530 try {
2531 t = fn.apply(ex);
2532 } catch (Throwable rex) {
2533 dx = rex;
2534 }
2535 }
2536 }
2537 else {
2538 @SuppressWarnings("unchecked") T tr = (T) r;
2539 t = tr;
2540 }
2541 dst.internalComplete(t, dx);
2542 }
2543 helpPostComplete();
2544 return dst;
2545 }
2546
2547 /**
2548 * Creates and returns a CompletableFuture that is completed with
2549 * the result of the given function of the result and exception of
2550 * this CompletableFuture's completion when it completes. The
2551 * given function is invoked with the result (or {@code null} if
2552 * none) and the exception (or {@code null} if none) of this
2553 * CompletableFuture when complete.
2554 *
2555 * @param fn the function to use to compute the value of the
2556 * returned CompletableFuture
2557 * @return the new CompletableFuture
2558 */
2559 public <U> CompletableFuture<U> handle(BiFun<? super T, Throwable, ? extends U> fn) {
2560 if (fn == null) throw new NullPointerException();
2561 CompletableFuture<U> dst = new CompletableFuture<U>();
2562 HandleCompletion<T,U> d = null;
2563 Object r;
2564 if ((r = result) == null) {
2565 CompletionNode p =
2566 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2567 while ((r = result) == null) {
2568 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2569 p.next = completions, p))
2570 break;
2571 }
2572 }
2573 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2574 T t; Throwable ex;
2575 if (r instanceof AltResult) {
2576 ex = ((AltResult)r).ex;
2577 t = null;
2578 }
2579 else {
2580 ex = null;
2581 @SuppressWarnings("unchecked") T tr = (T) r;
2582 t = tr;
2583 }
2584 U u; Throwable dx;
2585 try {
2586 u = fn.apply(t, ex);
2587 dx = null;
2588 } catch (Throwable rex) {
2589 dx = rex;
2590 u = null;
2591 }
2592 dst.internalComplete(u, dx);
2593 }
2594 helpPostComplete();
2595 return dst;
2596 }
2597
2598 /**
2599 * Attempts to complete this CompletableFuture with
2600 * a {@link CancellationException}.
2601 *
2602 * @param mayInterruptIfRunning this value has no effect in this
2603 * implementation because interrupts are not used to control
2604 * processing.
2605 *
2606 * @return {@code true} if this task is now cancelled
2607 */
2608 public boolean cancel(boolean mayInterruptIfRunning) {
2609 Object r;
2610 while ((r = result) == null) {
2611 r = new AltResult(new CancellationException());
2612 if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
2613 postComplete();
2614 return true;
2615 }
2616 }
2617 return ((r instanceof AltResult) &&
2618 (((AltResult)r).ex instanceof CancellationException));
2619 }
2620
2621 /**
2622 * Returns {@code true} if this CompletableFuture was cancelled
2623 * before it completed normally.
2624 *
2625 * @return {@code true} if this CompletableFuture was cancelled
2626 * before it completed normally
2627 */
2628 public boolean isCancelled() {
2629 Object r;
2630 return ((r = result) instanceof AltResult) &&
2631 (((AltResult)r).ex instanceof CancellationException);
2632 }
2633
2634 /**
2635 * Forcibly sets or resets the value subsequently returned by
2636 * method {@link #get()} and related methods, whether or not
2637 * already completed. This method is designed for use only in
2638 * error recovery actions, and even in such situations may result
2639 * in ongoing dependent completions using established versus
2640 * overwritten outcomes.
2641 *
2642 * @param value the completion value
2643 */
2644 public void obtrudeValue(T value) {
2645 result = (value == null) ? NIL : value;
2646 postComplete();
2647 }
2648
2649 /**
2650 * Forcibly causes subsequent invocations of method {@link #get()}
2651 * and related methods to throw the given exception, whether or
2652 * not already completed. This method is designed for use only in
2653 * recovery actions, and even in such situations may result in
2654 * ongoing dependent completions using established versus
2655 * overwritten outcomes.
2656 *
2657 * @param ex the exception
2658 */
2659 public void obtrudeException(Throwable ex) {
2660 if (ex == null) throw new NullPointerException();
2661 result = new AltResult(ex);
2662 postComplete();
2663 }
2664
2665 // Unsafe mechanics
2666 private static final sun.misc.Unsafe UNSAFE;
2667 private static final long RESULT;
2668 private static final long WAITERS;
2669 private static final long COMPLETIONS;
2670 static {
2671 try {
2672 UNSAFE = getUnsafe();
2673 Class<?> k = CompletableFuture.class;
2674 RESULT = UNSAFE.objectFieldOffset
2675 (k.getDeclaredField("result"));
2676 WAITERS = UNSAFE.objectFieldOffset
2677 (k.getDeclaredField("waiters"));
2678 COMPLETIONS = UNSAFE.objectFieldOffset
2679 (k.getDeclaredField("completions"));
2680 } catch (Exception e) {
2681 throw new Error(e);
2682 }
2683 }
2684
2685
2686 /**
2687 * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
2688 * Replace with a simple call to Unsafe.getUnsafe when integrating
2689 * into a jdk.
2690 *
2691 * @return a sun.misc.Unsafe
2692 */
2693 private static sun.misc.Unsafe getUnsafe() {
2694 try {
2695 return sun.misc.Unsafe.getUnsafe();
2696 } catch (SecurityException tryReflectionInstead) {}
2697 try {
2698 return java.security.AccessController.doPrivileged
2699 (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
2700 public sun.misc.Unsafe run() throws Exception {
2701 Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
2702 for (java.lang.reflect.Field f : k.getDeclaredFields()) {
2703 f.setAccessible(true);
2704 Object x = f.get(null);
2705 if (k.isInstance(x))
2706 return k.cast(x);
2707 }
2708 throw new NoSuchFieldError("the Unsafe");
2709 }});
2710 } catch (java.security.PrivilegedActionException e) {
2711 throw new RuntimeException("Could not initialize intrinsics",
2712 e.getCause());
2713 }
2714 }
2715 }