37 |
|
* <li>All <em>async</em> methods without an explicit Executor |
38 |
|
* argument are performed using the {@link ForkJoinPool#commonPool()} |
39 |
|
* (unless it does not support a parallelism level of at least two, in |
40 |
< |
* which case, a new Thread is created to run each task). To simplify |
41 |
< |
* monitoring, debugging, and tracking, all generated asynchronous |
42 |
< |
* tasks are instances of the marker interface {@link |
43 |
< |
* AsynchronousCompletionTask}. </li> |
40 |
> |
* which case, a new Thread is created to run each task). This may be |
41 |
> |
* overridden for non-static methods in subclasses by defining method |
42 |
> |
* {@link #defaultExecutor()}. To simplify monitoring, debugging, |
43 |
> |
* and tracking, all generated asynchronous tasks are instances of the |
44 |
> |
* marker interface {@link AsynchronousCompletionTask}. Operations |
45 |
> |
* with time-delays can use adaptor methods defined in this class, for |
46 |
> |
* example: {@code supplyAsync(supplier, delayedExecutor(timeout, |
47 |
> |
* timeUnit))}. To support methods with delays and timeouts, this |
48 |
> |
* class maintains at most one daemon thread for triggering and |
49 |
> |
* cancelling actions, not for running them.</li> |
50 |
|
* |
51 |
|
* <li>All CompletionStage methods are implemented independently of |
52 |
|
* other public methods, so the behavior of one method is not impacted |
53 |
< |
* by overrides of others in subclasses. </li> </ul> |
53 |
> |
* by overrides of others in subclasses. </li> |
54 |
> |
* |
55 |
> |
* <li>All CompletionStage methods return CompletableFutures. To |
56 |
> |
* restrict usages to only those methods defined in interface |
57 |
> |
* CompletionStage, use the new CompletionStage from method {@link |
58 |
> |
* minimalCompletionStage}. Or to ensure only that clients do not |
59 |
> |
* themselves modify a future, use method {@link copy}. </li> </ul> |
60 |
|
* |
61 |
|
* <p>CompletableFuture also implements {@link Future} with the following |
62 |
|
* policies: <ul> |
77 |
|
* {@link #getNow} that instead throw the CompletionException directly |
78 |
|
* in these cases.</li> </ul> |
79 |
|
* |
80 |
+ |
* <p>Subclasses of this class should normally override the "virtual |
81 |
+ |
* constructor" method {@link #newIncompleteFuture}, which establishes |
82 |
+ |
* the concrete type returned by CompletionStage methods. For example, |
83 |
+ |
* here is a class that substitutes a different default Executor and |
84 |
+ |
* disables the {@code obtrude} methods: |
85 |
+ |
* |
86 |
+ |
* <pre> {@code |
87 |
+ |
* class MyCompletableFuture<T> extends CompletableFuture<T> { |
88 |
+ |
* static final Executor myExecutor = ...; |
89 |
+ |
* public MyCompletableFuture() { } |
90 |
+ |
* public <U> CompletableFuture<U> newIncompleteFuture() { |
91 |
+ |
* return new MyCompletableFuture<U>(); } |
92 |
+ |
* public Executor defaultExecutor() { |
93 |
+ |
* return myExecutor; } |
94 |
+ |
* public void obtrudeValue(T value) { |
95 |
+ |
* throw new UnsupportedOperationException(); } |
96 |
+ |
* public void obtrudeException(Throwable ex) { |
97 |
+ |
* throw new UnsupportedOperationException(); } |
98 |
+ |
* }}</pre> |
99 |
+ |
* |
100 |
|
* @author Doug Lea |
101 |
|
* @since 1.8 |
102 |
|
*/ |
143 |
|
* fields for source(s), actions, and dependent. They are |
144 |
|
* boringly similar, differing from others only with respect to |
145 |
|
* underlying functional forms. We do this so that users don't |
146 |
< |
* encounter layers of adaptors in common usages. We also |
115 |
< |
* include "Relay" classes/methods that don't correspond to user |
116 |
< |
* methods; they copy results from one stage to another. |
146 |
> |
* encounter layers of adaptors in common usages. |
147 |
|
* |
148 |
|
* * Boolean CompletableFuture method x(...) (for example |
149 |
|
* uniApply) takes all of the arguments needed to check that an |
601 |
|
private <V> CompletableFuture<V> uniApplyStage( |
602 |
|
Executor e, Function<? super T,? extends V> f) { |
603 |
|
if (f == null) throw new NullPointerException(); |
604 |
< |
CompletableFuture<V> d = new CompletableFuture<V>(); |
604 |
> |
CompletableFuture<V> d = newIncompleteFuture(); |
605 |
|
if (e != null || !d.uniApply(this, f, null)) { |
606 |
|
UniApply<T,V> c = new UniApply<T,V>(e, d, this, f); |
607 |
|
push(c); |
656 |
|
private CompletableFuture<Void> uniAcceptStage(Executor e, |
657 |
|
Consumer<? super T> f) { |
658 |
|
if (f == null) throw new NullPointerException(); |
659 |
< |
CompletableFuture<Void> d = new CompletableFuture<Void>(); |
659 |
> |
CompletableFuture<Void> d = newIncompleteFuture(); |
660 |
|
if (e != null || !d.uniAccept(this, f, null)) { |
661 |
|
UniAccept<T> c = new UniAccept<T>(e, d, this, f); |
662 |
|
push(c); |
704 |
|
|
705 |
|
private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) { |
706 |
|
if (f == null) throw new NullPointerException(); |
707 |
< |
CompletableFuture<Void> d = new CompletableFuture<Void>(); |
707 |
> |
CompletableFuture<Void> d = newIncompleteFuture(); |
708 |
|
if (e != null || !d.uniRun(this, f, null)) { |
709 |
|
UniRun<T> c = new UniRun<T>(e, d, this, f); |
710 |
|
push(c); |
765 |
|
private CompletableFuture<T> uniWhenCompleteStage( |
766 |
|
Executor e, BiConsumer<? super T, ? super Throwable> f) { |
767 |
|
if (f == null) throw new NullPointerException(); |
768 |
< |
CompletableFuture<T> d = new CompletableFuture<T>(); |
768 |
> |
CompletableFuture<T> d = newIncompleteFuture(); |
769 |
|
if (e != null || !d.uniWhenComplete(this, f, null)) { |
770 |
|
UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f); |
771 |
|
push(c); |
821 |
|
private <V> CompletableFuture<V> uniHandleStage( |
822 |
|
Executor e, BiFunction<? super T, Throwable, ? extends V> f) { |
823 |
|
if (f == null) throw new NullPointerException(); |
824 |
< |
CompletableFuture<V> d = new CompletableFuture<V>(); |
824 |
> |
CompletableFuture<V> d = newIncompleteFuture(); |
825 |
|
if (e != null || !d.uniHandle(this, f, null)) { |
826 |
|
UniHandle<T,V> c = new UniHandle<T,V>(e, d, this, f); |
827 |
|
push(c); |
871 |
|
private CompletableFuture<T> uniExceptionallyStage( |
872 |
|
Function<Throwable, ? extends T> f) { |
873 |
|
if (f == null) throw new NullPointerException(); |
874 |
< |
CompletableFuture<T> d = new CompletableFuture<T>(); |
874 |
> |
CompletableFuture<T> d = newIncompleteFuture(); |
875 |
|
if (!d.uniExceptionally(this, f, null)) { |
876 |
|
UniExceptionally<T> c = new UniExceptionally<T>(d, this, f); |
877 |
|
push(c); |
903 |
|
return true; |
904 |
|
} |
905 |
|
|
906 |
+ |
private CompletableFuture<T> uniCopyStage() { |
907 |
+ |
Object r; |
908 |
+ |
CompletableFuture<T> d = newIncompleteFuture(); |
909 |
+ |
if ((r = result) != null) |
910 |
+ |
d.completeRelay(r); |
911 |
+ |
else { |
912 |
+ |
UniRelay<T> c = new UniRelay<T>(d, this); |
913 |
+ |
push(c); |
914 |
+ |
c.tryFire(SYNC); |
915 |
+ |
} |
916 |
+ |
return d; |
917 |
+ |
} |
918 |
+ |
|
919 |
+ |
private MinimalStage<T> uniAsMinimalStage() { |
920 |
+ |
Object r; |
921 |
+ |
if ((r = result) != null) |
922 |
+ |
return new MinimalStage<T>(encodeRelay(r)); |
923 |
+ |
MinimalStage<T> d = new MinimalStage<T>(); |
924 |
+ |
UniRelay<T> c = new UniRelay<T>(d, this); |
925 |
+ |
push(c); |
926 |
+ |
c.tryFire(SYNC); |
927 |
+ |
return d; |
928 |
+ |
} |
929 |
+ |
|
930 |
|
@SuppressWarnings("serial") |
931 |
|
static final class UniCompose<T,V> extends UniCompletion<T,V> { |
932 |
|
Function<? super T, ? extends CompletionStage<V>> fn; |
982 |
|
private <V> CompletableFuture<V> uniComposeStage( |
983 |
|
Executor e, Function<? super T, ? extends CompletionStage<V>> f) { |
984 |
|
if (f == null) throw new NullPointerException(); |
985 |
< |
Object r; Throwable x; |
985 |
> |
Object r, s; Throwable x; |
986 |
> |
CompletableFuture<V> d = newIncompleteFuture(); |
987 |
|
if (e == null && (r = result) != null) { |
933 |
– |
// try to return function result directly |
988 |
|
if (r instanceof AltResult) { |
989 |
|
if ((x = ((AltResult)r).ex) != null) { |
990 |
< |
return new CompletableFuture<V>(encodeThrowable(x, r)); |
990 |
> |
d.result = encodeThrowable(x, r); |
991 |
> |
return d; |
992 |
|
} |
993 |
|
r = null; |
994 |
|
} |
995 |
|
try { |
996 |
|
@SuppressWarnings("unchecked") T t = (T) r; |
997 |
|
CompletableFuture<V> g = f.apply(t).toCompletableFuture(); |
998 |
< |
Object s = g.result; |
999 |
< |
if (s != null) |
1000 |
< |
return new CompletableFuture<V>(encodeRelay(s)); |
1001 |
< |
CompletableFuture<V> d = new CompletableFuture<V>(); |
1002 |
< |
UniRelay<V> copy = new UniRelay<V>(d, g); |
1003 |
< |
g.push(copy); |
1004 |
< |
copy.tryFire(SYNC); |
998 |
> |
if ((s = g.result) != null) |
999 |
> |
d.completeRelay(s); |
1000 |
> |
else { |
1001 |
> |
UniRelay<V> c = new UniRelay<V>(d, g); |
1002 |
> |
push(c); |
1003 |
> |
c.tryFire(SYNC); |
1004 |
> |
} |
1005 |
|
return d; |
1006 |
|
} catch (Throwable ex) { |
1007 |
< |
return new CompletableFuture<V>(encodeThrowable(ex)); |
1007 |
> |
d.result = encodeThrowable(ex); |
1008 |
> |
return d; |
1009 |
|
} |
1010 |
|
} |
955 |
– |
CompletableFuture<V> d = new CompletableFuture<V>(); |
1011 |
|
UniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f); |
1012 |
|
push(c); |
1013 |
|
c.tryFire(SYNC); |
1132 |
|
CompletableFuture<U> b; |
1133 |
|
if (f == null || (b = o.toCompletableFuture()) == null) |
1134 |
|
throw new NullPointerException(); |
1135 |
< |
CompletableFuture<V> d = new CompletableFuture<V>(); |
1135 |
> |
CompletableFuture<V> d = newIncompleteFuture(); |
1136 |
|
if (e != null || !d.biApply(this, b, f, null)) { |
1137 |
|
BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f); |
1138 |
|
bipush(b, c); |
1204 |
|
CompletableFuture<U> b; |
1205 |
|
if (f == null || (b = o.toCompletableFuture()) == null) |
1206 |
|
throw new NullPointerException(); |
1207 |
< |
CompletableFuture<Void> d = new CompletableFuture<Void>(); |
1207 |
> |
CompletableFuture<Void> d = newIncompleteFuture(); |
1208 |
|
if (e != null || !d.biAccept(this, b, f, null)) { |
1209 |
|
BiAccept<T,U> c = new BiAccept<T,U>(e, d, this, b, f); |
1210 |
|
bipush(b, c); |
1263 |
|
CompletableFuture<?> b; |
1264 |
|
if (f == null || (b = o.toCompletableFuture()) == null) |
1265 |
|
throw new NullPointerException(); |
1266 |
< |
CompletableFuture<Void> d = new CompletableFuture<Void>(); |
1266 |
> |
CompletableFuture<Void> d = newIncompleteFuture(); |
1267 |
|
if (e != null || !d.biRun(this, b, f, null)) { |
1268 |
|
BiRun<T,?> c = new BiRun<>(e, d, this, b, f); |
1269 |
|
bipush(b, c); |
1404 |
|
CompletableFuture<U> b; |
1405 |
|
if (f == null || (b = o.toCompletableFuture()) == null) |
1406 |
|
throw new NullPointerException(); |
1407 |
< |
CompletableFuture<V> d = new CompletableFuture<V>(); |
1407 |
> |
CompletableFuture<V> d = newIncompleteFuture(); |
1408 |
|
if (e != null || !d.orApply(this, b, f, null)) { |
1409 |
|
OrApply<T,U,V> c = new OrApply<T,U,V>(e, d, this, b, f); |
1410 |
|
orpush(b, c); |
1468 |
|
CompletableFuture<U> b; |
1469 |
|
if (f == null || (b = o.toCompletableFuture()) == null) |
1470 |
|
throw new NullPointerException(); |
1471 |
< |
CompletableFuture<Void> d = new CompletableFuture<Void>(); |
1471 |
> |
CompletableFuture<Void> d = newIncompleteFuture(); |
1472 |
|
if (e != null || !d.orAccept(this, b, f, null)) { |
1473 |
|
OrAccept<T,U> c = new OrAccept<T,U>(e, d, this, b, f); |
1474 |
|
orpush(b, c); |
1526 |
|
CompletableFuture<?> b; |
1527 |
|
if (f == null || (b = o.toCompletableFuture()) == null) |
1528 |
|
throw new NullPointerException(); |
1529 |
< |
CompletableFuture<Void> d = new CompletableFuture<Void>(); |
1529 |
> |
CompletableFuture<Void> d = newIncompleteFuture(); |
1530 |
|
if (e != null || !d.orRun(this, b, f, null)) { |
1531 |
|
OrRun<T,?> c = new OrRun<>(e, d, this, b, f); |
1532 |
|
orpush(b, c); |
1562 |
|
return true; |
1563 |
|
} |
1564 |
|
|
1565 |
+ |
|
1566 |
|
/** Recursively constructs a tree of completions. */ |
1567 |
|
static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs, |
1568 |
|
int lo, int hi) { |
1588 |
|
|
1589 |
|
@SuppressWarnings("serial") |
1590 |
|
static final class AsyncSupply<T> extends ForkJoinTask<Void> |
1591 |
< |
implements Runnable, AsynchronousCompletionTask { |
1591 |
> |
implements Runnable, AsynchronousCompletionTask { |
1592 |
|
CompletableFuture<T> dep; Supplier<T> fn; |
1593 |
|
AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) { |
1594 |
|
this.dep = dep; this.fn = fn; |
1624 |
|
|
1625 |
|
@SuppressWarnings("serial") |
1626 |
|
static final class AsyncRun extends ForkJoinTask<Void> |
1627 |
< |
implements Runnable, AsynchronousCompletionTask { |
1627 |
> |
implements Runnable, AsynchronousCompletionTask { |
1628 |
|
CompletableFuture<Void> dep; Runnable fn; |
1629 |
|
AsyncRun(CompletableFuture<Void> dep, Runnable fn) { |
1630 |
|
this.dep = dep; this.fn = fn; |
1813 |
|
/** |
1814 |
|
* Creates a new complete CompletableFuture with given encoded result. |
1815 |
|
*/ |
1816 |
< |
private CompletableFuture(Object r) { |
1816 |
> |
CompletableFuture(Object r) { |
1817 |
|
this.result = r; |
1818 |
|
} |
1819 |
|
|
2002 |
|
|
2003 |
|
public <U> CompletableFuture<U> thenApplyAsync( |
2004 |
|
Function<? super T,? extends U> fn) { |
2005 |
< |
return uniApplyStage(asyncPool, fn); |
2005 |
> |
return uniApplyStage(defaultExecutor(), fn); |
2006 |
|
} |
2007 |
|
|
2008 |
|
public <U> CompletableFuture<U> thenApplyAsync( |
2015 |
|
} |
2016 |
|
|
2017 |
|
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) { |
2018 |
< |
return uniAcceptStage(asyncPool, action); |
2018 |
> |
return uniAcceptStage(defaultExecutor(), action); |
2019 |
|
} |
2020 |
|
|
2021 |
|
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, |
2028 |
|
} |
2029 |
|
|
2030 |
|
public CompletableFuture<Void> thenRunAsync(Runnable action) { |
2031 |
< |
return uniRunStage(asyncPool, action); |
2031 |
> |
return uniRunStage(defaultExecutor(), action); |
2032 |
|
} |
2033 |
|
|
2034 |
|
public CompletableFuture<Void> thenRunAsync(Runnable action, |
2045 |
|
public <U,V> CompletableFuture<V> thenCombineAsync( |
2046 |
|
CompletionStage<? extends U> other, |
2047 |
|
BiFunction<? super T,? super U,? extends V> fn) { |
2048 |
< |
return biApplyStage(asyncPool, other, fn); |
2048 |
> |
return biApplyStage(defaultExecutor(), other, fn); |
2049 |
|
} |
2050 |
|
|
2051 |
|
public <U,V> CompletableFuture<V> thenCombineAsync( |
2063 |
|
public <U> CompletableFuture<Void> thenAcceptBothAsync( |
2064 |
|
CompletionStage<? extends U> other, |
2065 |
|
BiConsumer<? super T, ? super U> action) { |
2066 |
< |
return biAcceptStage(asyncPool, other, action); |
2066 |
> |
return biAcceptStage(defaultExecutor(), other, action); |
2067 |
|
} |
2068 |
|
|
2069 |
|
public <U> CompletableFuture<Void> thenAcceptBothAsync( |
2079 |
|
|
2080 |
|
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, |
2081 |
|
Runnable action) { |
2082 |
< |
return biRunStage(asyncPool, other, action); |
2082 |
> |
return biRunStage(defaultExecutor(), other, action); |
2083 |
|
} |
2084 |
|
|
2085 |
|
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, |
2095 |
|
|
2096 |
|
public <U> CompletableFuture<U> applyToEitherAsync( |
2097 |
|
CompletionStage<? extends T> other, Function<? super T, U> fn) { |
2098 |
< |
return orApplyStage(asyncPool, other, fn); |
2098 |
> |
return orApplyStage(defaultExecutor(), other, fn); |
2099 |
|
} |
2100 |
|
|
2101 |
|
public <U> CompletableFuture<U> applyToEitherAsync( |
2111 |
|
|
2112 |
|
public CompletableFuture<Void> acceptEitherAsync( |
2113 |
|
CompletionStage<? extends T> other, Consumer<? super T> action) { |
2114 |
< |
return orAcceptStage(asyncPool, other, action); |
2114 |
> |
return orAcceptStage(defaultExecutor(), other, action); |
2115 |
|
} |
2116 |
|
|
2117 |
|
public CompletableFuture<Void> acceptEitherAsync( |
2127 |
|
|
2128 |
|
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, |
2129 |
|
Runnable action) { |
2130 |
< |
return orRunStage(asyncPool, other, action); |
2130 |
> |
return orRunStage(defaultExecutor(), other, action); |
2131 |
|
} |
2132 |
|
|
2133 |
|
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, |
2143 |
|
|
2144 |
|
public <U> CompletableFuture<U> thenComposeAsync( |
2145 |
|
Function<? super T, ? extends CompletionStage<U>> fn) { |
2146 |
< |
return uniComposeStage(asyncPool, fn); |
2146 |
> |
return uniComposeStage(defaultExecutor(), fn); |
2147 |
|
} |
2148 |
|
|
2149 |
|
public <U> CompletableFuture<U> thenComposeAsync( |
2159 |
|
|
2160 |
|
public CompletableFuture<T> whenCompleteAsync( |
2161 |
|
BiConsumer<? super T, ? super Throwable> action) { |
2162 |
< |
return uniWhenCompleteStage(asyncPool, action); |
2162 |
> |
return uniWhenCompleteStage(defaultExecutor(), action); |
2163 |
|
} |
2164 |
|
|
2165 |
|
public CompletableFuture<T> whenCompleteAsync( |
2174 |
|
|
2175 |
|
public <U> CompletableFuture<U> handleAsync( |
2176 |
|
BiFunction<? super T, Throwable, ? extends U> fn) { |
2177 |
< |
return uniHandleStage(asyncPool, fn); |
2177 |
> |
return uniHandleStage(defaultExecutor(), fn); |
2178 |
|
} |
2179 |
|
|
2180 |
|
public <U> CompletableFuture<U> handleAsync( |
2213 |
|
return uniExceptionallyStage(fn); |
2214 |
|
} |
2215 |
|
|
2216 |
+ |
|
2217 |
|
/* ------------- Arbitrary-arity constructions -------------- */ |
2218 |
|
|
2219 |
|
/** |
2371 |
|
*/ |
2372 |
|
public String toString() { |
2373 |
|
Object r = result; |
2374 |
< |
int count; |
2374 |
> |
int count = 0; // avoid call to getNumberOfDependents in case disabled |
2375 |
> |
for (Completion p = stack; p != null; p = p.next) |
2376 |
> |
++count; |
2377 |
|
return super.toString() + |
2378 |
|
((r == null) ? |
2379 |
< |
(((count = getNumberOfDependents()) == 0) ? |
2379 |
> |
((count == 0) ? |
2380 |
|
"[Not completed]" : |
2381 |
|
"[Not completed, " + count + " dependents]") : |
2382 |
|
(((r instanceof AltResult) && ((AltResult)r).ex != null) ? |
2384 |
|
"[Completed normally]")); |
2385 |
|
} |
2386 |
|
|
2387 |
+ |
// jdk9 additions |
2388 |
+ |
|
2389 |
+ |
/** |
2390 |
+ |
* Creates a new incomplete CompletableFuture of the type to be |
2391 |
+ |
* returned by a CompletionStage method. Subclasses should |
2392 |
+ |
* normally override this method to return an instance of the same |
2393 |
+ |
* class as this CompletableFuture. The default implementation |
2394 |
+ |
* returns an instance of class CompletableFuture. |
2395 |
+ |
* |
2396 |
+ |
* @return a new CompletableFuture |
2397 |
+ |
* @param <U> the type of the value |
2398 |
+ |
* @since 1.9 |
2399 |
+ |
*/ |
2400 |
+ |
public <U> CompletableFuture<U> newIncompleteFuture() { |
2401 |
+ |
return new CompletableFuture<U>(); |
2402 |
+ |
} |
2403 |
+ |
/** |
2404 |
+ |
* Returns the default Executor used for async methods that do not |
2405 |
+ |
* specify an Executor. This class uses the {@link |
2406 |
+ |
* ForkJoinPool#commonPool()}, but may be overridden in subclasses |
2407 |
+ |
* with an Executor that provides at least one independent thread. |
2408 |
+ |
* |
2409 |
+ |
* @return the executor |
2410 |
+ |
* @since 1.9 |
2411 |
+ |
*/ |
2412 |
+ |
public Executor defaultExecutor() { |
2413 |
+ |
return asyncPool; |
2414 |
+ |
} |
2415 |
+ |
|
2416 |
+ |
/** |
2417 |
+ |
* Returns a new CompletableFuture that is completed normally with |
2418 |
+ |
* the same value as this Completablefuture when it completes |
2419 |
+ |
* normally. If this CompletableFuture completes exceptionally, |
2420 |
+ |
* then the returned CompletableFuture completes exceptionally |
2421 |
+ |
* with a CompletionException with this exception as cause. The |
2422 |
+ |
* behavior equivalent is to {@code thenApply(x -> x)}. This |
2423 |
+ |
* method may be useful as a form of "defensive copying", to |
2424 |
+ |
* prevent clients from completing, while still being able to |
2425 |
+ |
* arrange dependent actions. |
2426 |
+ |
* |
2427 |
+ |
* @return the new CompletableFuture |
2428 |
+ |
* @since 1.9 |
2429 |
+ |
*/ |
2430 |
+ |
public CompletableFuture<T> copy() { |
2431 |
+ |
return uniCopyStage(); |
2432 |
+ |
} |
2433 |
+ |
|
2434 |
+ |
/** |
2435 |
+ |
* Returns a new CompletionStage that is completed normally with |
2436 |
+ |
* the same value as this Completablefuture when it completes |
2437 |
+ |
* normally, and cannot be independently completed or otherwise |
2438 |
+ |
* used in ways not defined by the methods of interface {@link |
2439 |
+ |
* CompletionStage}. If this CompletableFuture completes |
2440 |
+ |
* exceptionally, then the returned CompletionStage completes |
2441 |
+ |
* exceptionally with a CompletionException with this exception as |
2442 |
+ |
* cause. |
2443 |
+ |
* |
2444 |
+ |
* @return the new CompletionStage |
2445 |
+ |
* @since 1.9 |
2446 |
+ |
*/ |
2447 |
+ |
public CompletionStage<T> minimalCompletionStage() { |
2448 |
+ |
return uniAsMinimalStage(); |
2449 |
+ |
} |
2450 |
+ |
|
2451 |
+ |
/** |
2452 |
+ |
* Completes this CompletableFuture with the result of |
2453 |
+ |
* the given Supplier function invoked from an asynchronous |
2454 |
+ |
* task using the given executor. |
2455 |
+ |
* |
2456 |
+ |
* @param supplier a function returning the value to be used |
2457 |
+ |
* to complete this CompletableFuture |
2458 |
+ |
* @param executor the executor to use for asynchronous execution |
2459 |
+ |
* @return this CompletableFuture |
2460 |
+ |
* @since 1.9 |
2461 |
+ |
*/ |
2462 |
+ |
public CompletableFuture<T> completeAsync(Supplier<T> supplier, |
2463 |
+ |
Executor executor) { |
2464 |
+ |
if (supplier == null || executor == null) |
2465 |
+ |
throw new NullPointerException(); |
2466 |
+ |
executor.execute(new AsyncSupply<T>(this, supplier)); |
2467 |
+ |
return this; |
2468 |
+ |
} |
2469 |
+ |
|
2470 |
+ |
/** |
2471 |
+ |
* Completes this CompletableFuture with the result of the given |
2472 |
+ |
* Supplier function invoked from an asynchronous task using the |
2473 |
+ |
* the default executor. |
2474 |
+ |
* |
2475 |
+ |
* @param supplier a function returning the value to be used |
2476 |
+ |
* to complete this CompletableFuture |
2477 |
+ |
* @return this CompletableFuture |
2478 |
+ |
* @since 1.9 |
2479 |
+ |
*/ |
2480 |
+ |
public CompletableFuture<T> completeAsync(Supplier<T> supplier) { |
2481 |
+ |
return completeAsync(supplier, defaultExecutor()); |
2482 |
+ |
} |
2483 |
+ |
|
2484 |
+ |
/** |
2485 |
+ |
* Exceptionally completes this CompletableFuture with |
2486 |
+ |
* a {@link TimeoutException} if not otherwise completed |
2487 |
+ |
* before the given timeout. |
2488 |
+ |
* |
2489 |
+ |
* @param timeout how long to wait before completing exceptionally |
2490 |
+ |
* with a TimeoutException, in units of {@code unit} |
2491 |
+ |
* @param unit a {@code TimeUnit} determining how to interpret the |
2492 |
+ |
* {@code timeout} parameter |
2493 |
+ |
* @return this CompletableFuture |
2494 |
+ |
* @since 1.9 |
2495 |
+ |
*/ |
2496 |
+ |
public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) { |
2497 |
+ |
if (result == null) |
2498 |
+ |
whenComplete(new Canceller(Delayer.delay(new Timeout(this), |
2499 |
+ |
timeout, unit))); |
2500 |
+ |
return this; |
2501 |
+ |
} |
2502 |
+ |
|
2503 |
+ |
/** |
2504 |
+ |
* Returns a new Executor that submits a task to the given base |
2505 |
+ |
* executor after the given delay. |
2506 |
+ |
* |
2507 |
+ |
* @param delay how long to delay, in units of {@code unit} |
2508 |
+ |
* @param unit a {@code TimeUnit} determining how to interpret the |
2509 |
+ |
* {@code delay} parameter |
2510 |
+ |
* @param executor the base executor |
2511 |
+ |
* @return the new delayed executor |
2512 |
+ |
* @since 1.9 |
2513 |
+ |
*/ |
2514 |
+ |
public static Executor delayedExecutor(long delay, TimeUnit unit, |
2515 |
+ |
Executor executor) { |
2516 |
+ |
if (unit == null || executor == null) |
2517 |
+ |
throw new NullPointerException(); |
2518 |
+ |
return new DelayedExecutor(delay, unit, executor); |
2519 |
+ |
} |
2520 |
+ |
|
2521 |
+ |
/** |
2522 |
+ |
* Returns a new Executor that submits a task to the default |
2523 |
+ |
* executor after the given delay. |
2524 |
+ |
* |
2525 |
+ |
* @param delay how long to delay, in units of {@code unit} |
2526 |
+ |
* @param unit a {@code TimeUnit} determining how to interpret the |
2527 |
+ |
* {@code delay} parameter |
2528 |
+ |
* @return the new delayed executor |
2529 |
+ |
* @since 1.9 |
2530 |
+ |
*/ |
2531 |
+ |
public static Executor delayedExecutor(long delay, TimeUnit unit) { |
2532 |
+ |
return new DelayedExecutor(delay, unit, asyncPool); |
2533 |
+ |
} |
2534 |
+ |
|
2535 |
+ |
/** |
2536 |
+ |
* Returns a new CompletionStage that is already completed with |
2537 |
+ |
* the given value and supports only those methods in |
2538 |
+ |
* interface {@link CompletionStage}. |
2539 |
+ |
* |
2540 |
+ |
* @param value the value |
2541 |
+ |
* @param <U> the type of the value |
2542 |
+ |
* @return the completed CompletionStage |
2543 |
+ |
* @since 1.9 |
2544 |
+ |
*/ |
2545 |
+ |
public static <U> CompletionStage<U> completedStage(U value) { |
2546 |
+ |
return new MinimalStage<U>((value == null) ? NIL : value); |
2547 |
+ |
} |
2548 |
+ |
|
2549 |
+ |
/** |
2550 |
+ |
* Returns a new CompletableFuture that is already completed |
2551 |
+ |
* exceptionally with the given exception. |
2552 |
+ |
* |
2553 |
+ |
* @param ex the ex |
2554 |
+ |
* @param <U> the type of the value |
2555 |
+ |
* @return the exceptionally completed CompletableFuture |
2556 |
+ |
* @since 1.9 |
2557 |
+ |
*/ |
2558 |
+ |
public static <U> CompletableFuture<U> failedFuture(Throwable ex) { |
2559 |
+ |
if (ex == null) throw new NullPointerException(); |
2560 |
+ |
return new CompletableFuture<U>(encodeThrowable(ex)); |
2561 |
+ |
} |
2562 |
+ |
|
2563 |
+ |
/** |
2564 |
+ |
* Returns a new CompletionStage that is already completed |
2565 |
+ |
* exceptionally with the given exception and supports only those |
2566 |
+ |
* methods in interface {@link CompletionStage}. |
2567 |
+ |
* |
2568 |
+ |
* @param ex the ex |
2569 |
+ |
* @param <U> the type of the value |
2570 |
+ |
* @return the exceptionally completed CompletionStage |
2571 |
+ |
* @since 1.9 |
2572 |
+ |
*/ |
2573 |
+ |
public static <U> CompletableFuture<U> failedStage(Throwable ex) { |
2574 |
+ |
if (ex == null) throw new NullPointerException(); |
2575 |
+ |
return new MinimalStage<U>(encodeThrowable(ex)); |
2576 |
+ |
} |
2577 |
+ |
|
2578 |
+ |
/** |
2579 |
+ |
* Singleton delay scheduler, used only for starting and |
2580 |
+ |
* cancelling tasks. |
2581 |
+ |
*/ |
2582 |
+ |
static final class Delayer extends ScheduledThreadPoolExecutor { |
2583 |
+ |
static final class DaemonThreadFactory implements ThreadFactory { |
2584 |
+ |
public Thread newThread(Runnable r) { |
2585 |
+ |
Thread t = new Thread(r); |
2586 |
+ |
t.setDaemon(true); |
2587 |
+ |
t.setName("CompletableFutureDelayScheduler"); |
2588 |
+ |
return t; |
2589 |
+ |
} |
2590 |
+ |
} |
2591 |
+ |
Delayer() { |
2592 |
+ |
super(1, new DaemonThreadFactory()); |
2593 |
+ |
setRemoveOnCancelPolicy(true); |
2594 |
+ |
} |
2595 |
+ |
static final Delayer instance = new Delayer(); |
2596 |
+ |
|
2597 |
+ |
public static ScheduledFuture<?> delay(Runnable command, |
2598 |
+ |
long delay, |
2599 |
+ |
TimeUnit unit) { |
2600 |
+ |
return instance.schedule(command, delay, unit); |
2601 |
+ |
} |
2602 |
+ |
} |
2603 |
+ |
|
2604 |
+ |
// Little class-ified lambdas to better support monitoring |
2605 |
+ |
|
2606 |
+ |
static final class DelayedExecutor implements Executor { |
2607 |
+ |
final long delay; |
2608 |
+ |
final TimeUnit unit; |
2609 |
+ |
final Executor executor; |
2610 |
+ |
DelayedExecutor(long delay, TimeUnit unit, Executor executor) { |
2611 |
+ |
this.delay = delay; this.unit = unit; this.executor = executor; |
2612 |
+ |
} |
2613 |
+ |
public void execute(Runnable r) { |
2614 |
+ |
Delayer.delay(new Submitter(executor, r), delay, unit); |
2615 |
+ |
} |
2616 |
+ |
} |
2617 |
+ |
|
2618 |
+ |
/** Delay action to asynchronously start user task */ |
2619 |
+ |
static final class Submitter implements Runnable { |
2620 |
+ |
final Executor executor; |
2621 |
+ |
final Runnable action; |
2622 |
+ |
Submitter(Executor executor, Runnable action) { |
2623 |
+ |
this.executor = executor; |
2624 |
+ |
this.action = action; |
2625 |
+ |
} |
2626 |
+ |
public void run() { executor.execute(action); } |
2627 |
+ |
} |
2628 |
+ |
|
2629 |
+ |
/** Action to completeExceptionally on timeout */ |
2630 |
+ |
static final class Timeout implements Runnable { |
2631 |
+ |
final CompletableFuture<?> f; |
2632 |
+ |
Timeout(CompletableFuture<?> f) { this.f = f; } |
2633 |
+ |
public void run() { |
2634 |
+ |
if (f != null && !f.isDone()) |
2635 |
+ |
f.completeExceptionally(new TimeoutException()); |
2636 |
+ |
} |
2637 |
+ |
} |
2638 |
+ |
|
2639 |
+ |
/** Action to cancel unneeded timeouts */ |
2640 |
+ |
static final class Canceller implements BiConsumer<Object, Throwable> { |
2641 |
+ |
final Future<?> f; |
2642 |
+ |
Canceller(Future<?> f) { this.f = f; } |
2643 |
+ |
public void accept(Object ignore, Throwable ex) { |
2644 |
+ |
if (ex == null && f != null && !f.isDone()) |
2645 |
+ |
f.cancel(false); |
2646 |
+ |
} |
2647 |
+ |
} |
2648 |
+ |
|
2649 |
+ |
// MinimalStage subclass just throws UOE for non-CompletionStage methods |
2650 |
+ |
static final class MinimalStage<T> extends CompletableFuture<T> { |
2651 |
+ |
MinimalStage() { } |
2652 |
+ |
MinimalStage(Object r) { super(r); } |
2653 |
+ |
public <U> CompletableFuture<U> newIncompleteFuture() { |
2654 |
+ |
return new MinimalStage<U>(); } |
2655 |
+ |
public T get() { |
2656 |
+ |
throw new UnsupportedOperationException(); } |
2657 |
+ |
public T get(long timeout, TimeUnit unit) { |
2658 |
+ |
throw new UnsupportedOperationException(); } |
2659 |
+ |
public T getNow(T valueIfAbsent) { |
2660 |
+ |
throw new UnsupportedOperationException(); } |
2661 |
+ |
public T join() { |
2662 |
+ |
throw new UnsupportedOperationException(); } |
2663 |
+ |
public boolean complete(T value) { |
2664 |
+ |
throw new UnsupportedOperationException(); } |
2665 |
+ |
public boolean completeExceptionally(Throwable ex) { |
2666 |
+ |
throw new UnsupportedOperationException(); } |
2667 |
+ |
public boolean cancel(boolean mayInterruptIfRunning) { |
2668 |
+ |
throw new UnsupportedOperationException(); } |
2669 |
+ |
public void obtrudeValue(T value) { |
2670 |
+ |
throw new UnsupportedOperationException(); } |
2671 |
+ |
public void obtrudeException(Throwable ex) { |
2672 |
+ |
throw new UnsupportedOperationException(); } |
2673 |
+ |
public boolean isDone() { |
2674 |
+ |
throw new UnsupportedOperationException(); } |
2675 |
+ |
public boolean isCancelled() { |
2676 |
+ |
throw new UnsupportedOperationException(); } |
2677 |
+ |
public boolean isCompletedExceptionally() { |
2678 |
+ |
throw new UnsupportedOperationException(); } |
2679 |
+ |
public int getNumberOfDependents() { |
2680 |
+ |
throw new UnsupportedOperationException(); } |
2681 |
+ |
} |
2682 |
+ |
|
2683 |
|
// Unsafe mechanics |
2684 |
|
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); |
2685 |
|
private static final long RESULT; |