117 |
|
* * Class AsyncX class (for example AsyncApply) that calls nowX |
118 |
|
* from another task, |
119 |
|
* * Class DelayedX (for example DelayedApply) that holds |
120 |
< |
* arguments and calls Xnow when ready. |
120 |
> |
* arguments and calls nowX when ready. |
121 |
|
* |
122 |
|
* For each public CompletionStage method M* (for example |
123 |
|
* thenApply{Async}), there is a method doM (for example |
251 |
|
/* ------------- Async Tasks -------------- */ |
252 |
|
|
253 |
|
/** |
254 |
– |
* Default executor -- ForkJoinPool.commonPool() unless it cannot |
255 |
– |
* support parallelism. |
256 |
– |
*/ |
257 |
– |
static final Executor asyncPool = |
258 |
– |
(ForkJoinPool.getCommonPoolParallelism() > 1) ? |
259 |
– |
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); |
260 |
– |
|
261 |
– |
/** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ |
262 |
– |
static final class ThreadPerTaskExecutor implements Executor { |
263 |
– |
public void execute(Runnable r) { new Thread(r).start(); } |
264 |
– |
} |
265 |
– |
|
266 |
– |
/** |
254 |
|
* A marker interface identifying asynchronous tasks produced by |
255 |
|
* {@code async} methods. This may be useful for monitoring, |
256 |
|
* debugging, and tracking asynchronous activities. |
293 |
|
private static final long serialVersionUID = 5232453952276885070L; |
294 |
|
} |
295 |
|
|
296 |
+ |
/** |
297 |
+ |
* Default executor -- ForkJoinPool.commonPool() unless it cannot |
298 |
+ |
* support parallelism. |
299 |
+ |
*/ |
300 |
+ |
static final Executor asyncPool = |
301 |
+ |
(ForkJoinPool.getCommonPoolParallelism() > 1) ? |
302 |
+ |
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); |
303 |
+ |
|
304 |
+ |
/** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ |
305 |
+ |
static final class ThreadPerTaskExecutor implements Executor { |
306 |
+ |
public void execute(Runnable r) { new Thread(r).start(); } |
307 |
+ |
} |
308 |
+ |
|
309 |
+ |
/** |
310 |
+ |
* Null-checks user executor argument, and translates uses of |
311 |
+ |
* commonPool to asyncPool in case parallelism disabled. |
312 |
+ |
*/ |
313 |
+ |
static Executor screenExecutor(Executor e) { |
314 |
+ |
if (e == null) throw new NullPointerException(); |
315 |
+ |
return (e == ForkJoinPool.commonPool()) ? asyncPool : e; |
316 |
+ |
} |
317 |
+ |
|
318 |
|
/* ------------- Completions -------------- */ |
319 |
|
|
320 |
|
abstract static class Completion<T> { // See above |
1586 |
|
*/ |
1587 |
|
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, |
1588 |
|
Executor executor) { |
1589 |
< |
if (executor == null || supplier == null) |
1590 |
< |
throw new NullPointerException(); |
1589 |
> |
if (supplier == null) throw new NullPointerException(); |
1590 |
> |
Executor e = screenExecutor(executor); |
1591 |
|
CompletableFuture<U> d = new CompletableFuture<U>(); |
1592 |
< |
executor.execute(new AsyncSupply<U>(d, supplier)); |
1592 |
> |
e.execute(new AsyncSupply<U>(d, supplier)); |
1593 |
|
return d; |
1594 |
|
} |
1595 |
|
|
1621 |
|
*/ |
1622 |
|
public static CompletableFuture<Void> runAsync(Runnable runnable, |
1623 |
|
Executor executor) { |
1624 |
< |
if (executor == null || runnable == null) |
1625 |
< |
throw new NullPointerException(); |
1624 |
> |
if (runnable == null) throw new NullPointerException(); |
1625 |
> |
Executor e = screenExecutor(executor); |
1626 |
|
CompletableFuture<Void> d = new CompletableFuture<Void>(); |
1627 |
< |
executor.execute(new AsyncRun<Void>(d, runnable)); |
1627 |
> |
e.execute(new AsyncRun<Void>(d, runnable)); |
1628 |
|
return d; |
1629 |
|
} |
1630 |
|
|
1762 |
|
|
1763 |
|
public <U> CompletableFuture<U> thenApplyAsync( |
1764 |
|
Function<? super T,? extends U> fn, Executor executor) { |
1765 |
< |
if (executor == null) throw new NullPointerException(); |
1757 |
< |
return doThenApply(fn, executor); |
1765 |
> |
return doThenApply(fn, screenExecutor(executor)); |
1766 |
|
} |
1767 |
|
|
1768 |
|
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) { |
1775 |
|
|
1776 |
|
public CompletableFuture<Void> thenAcceptAsync( |
1777 |
|
Consumer<? super T> action, Executor executor) { |
1778 |
< |
if (executor == null) throw new NullPointerException(); |
1771 |
< |
return doThenAccept(action, executor); |
1778 |
> |
return doThenAccept(action, screenExecutor(executor)); |
1779 |
|
} |
1780 |
|
|
1781 |
|
public CompletableFuture<Void> thenRun(Runnable action) { |
1788 |
|
|
1789 |
|
public CompletableFuture<Void> thenRunAsync( |
1790 |
|
Runnable action, Executor executor) { |
1791 |
< |
if (executor == null) throw new NullPointerException(); |
1785 |
< |
return doThenRun(action, executor); |
1791 |
> |
return doThenRun(action, screenExecutor(executor)); |
1792 |
|
} |
1793 |
|
|
1794 |
|
public <U,V> CompletableFuture<V> thenCombine( |
1807 |
|
CompletionStage<? extends U> other, |
1808 |
|
BiFunction<? super T,? super U,? extends V> fn, |
1809 |
|
Executor executor) { |
1810 |
< |
if (executor == null) throw new NullPointerException(); |
1811 |
< |
return doThenCombine(other.toCompletableFuture(), fn, executor); |
1810 |
> |
return doThenCombine(other.toCompletableFuture(), fn, |
1811 |
> |
screenExecutor(executor)); |
1812 |
|
} |
1813 |
|
|
1814 |
|
public <U> CompletableFuture<Void> thenAcceptBoth( |
1827 |
|
CompletionStage<? extends U> other, |
1828 |
|
BiConsumer<? super T, ? super U> action, |
1829 |
|
Executor executor) { |
1830 |
< |
if (executor == null) throw new NullPointerException(); |
1831 |
< |
return doThenAcceptBoth(other.toCompletableFuture(), action, executor); |
1830 |
> |
return doThenAcceptBoth(other.toCompletableFuture(), action, |
1831 |
> |
screenExecutor(executor)); |
1832 |
|
} |
1833 |
|
|
1834 |
|
public CompletableFuture<Void> runAfterBoth( |
1843 |
|
|
1844 |
|
public CompletableFuture<Void> runAfterBothAsync( |
1845 |
|
CompletionStage<?> other, Runnable action, Executor executor) { |
1846 |
< |
if (executor == null) throw new NullPointerException(); |
1847 |
< |
return doRunAfterBoth(other.toCompletableFuture(), action, executor); |
1846 |
> |
return doRunAfterBoth(other.toCompletableFuture(), action, |
1847 |
> |
screenExecutor(executor)); |
1848 |
|
} |
1849 |
|
|
1850 |
|
public <U> CompletableFuture<U> applyToEither( |
1860 |
|
public <U> CompletableFuture<U> applyToEitherAsync |
1861 |
|
(CompletionStage<? extends T> other, Function<? super T, U> fn, |
1862 |
|
Executor executor) { |
1863 |
< |
if (executor == null) throw new NullPointerException(); |
1864 |
< |
return doApplyToEither(other.toCompletableFuture(), fn, executor); |
1863 |
> |
return doApplyToEither(other.toCompletableFuture(), fn, |
1864 |
> |
screenExecutor(executor)); |
1865 |
|
} |
1866 |
|
|
1867 |
|
public CompletableFuture<Void> acceptEither( |
1877 |
|
public CompletableFuture<Void> acceptEitherAsync( |
1878 |
|
CompletionStage<? extends T> other, Consumer<? super T> action, |
1879 |
|
Executor executor) { |
1880 |
< |
if (executor == null) throw new NullPointerException(); |
1881 |
< |
return doAcceptEither(other.toCompletableFuture(), action, executor); |
1880 |
> |
return doAcceptEither(other.toCompletableFuture(), action, |
1881 |
> |
screenExecutor(executor)); |
1882 |
|
} |
1883 |
|
|
1884 |
|
public CompletableFuture<Void> runAfterEither( |
1893 |
|
|
1894 |
|
public CompletableFuture<Void> runAfterEitherAsync( |
1895 |
|
CompletionStage<?> other, Runnable action, Executor executor) { |
1896 |
< |
if (executor == null) throw new NullPointerException(); |
1897 |
< |
return doRunAfterEither(other.toCompletableFuture(), action, executor); |
1896 |
> |
return doRunAfterEither(other.toCompletableFuture(), action, |
1897 |
> |
screenExecutor(executor)); |
1898 |
|
} |
1899 |
|
|
1900 |
|
public <U> CompletableFuture<U> thenCompose |
1910 |
|
public <U> CompletableFuture<U> thenComposeAsync( |
1911 |
|
Function<? super T, ? extends CompletionStage<U>> fn, |
1912 |
|
Executor executor) { |
1913 |
< |
if (executor == null) throw new NullPointerException(); |
1908 |
< |
return doThenCompose(fn, executor); |
1913 |
> |
return doThenCompose(fn, screenExecutor(executor)); |
1914 |
|
} |
1915 |
|
|
1916 |
|
public CompletableFuture<T> whenComplete( |
1925 |
|
|
1926 |
|
public CompletableFuture<T> whenCompleteAsync( |
1927 |
|
BiConsumer<? super T, ? super Throwable> action, Executor executor) { |
1928 |
< |
if (executor == null) throw new NullPointerException(); |
1924 |
< |
return doWhenComplete(action, executor); |
1928 |
> |
return doWhenComplete(action, screenExecutor(executor)); |
1929 |
|
} |
1930 |
|
|
1931 |
|
public <U> CompletableFuture<U> handle( |
1940 |
|
|
1941 |
|
public <U> CompletableFuture<U> handleAsync( |
1942 |
|
BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) { |
1943 |
< |
if (executor == null) throw new NullPointerException(); |
1940 |
< |
return doHandle(fn, executor); |
1943 |
> |
return doHandle(fn, screenExecutor(executor)); |
1944 |
|
} |
1945 |
|
|
1946 |
|
/** |