536 |
|
private static final long serialVersionUID = 5232453952276885070L; |
537 |
|
} |
538 |
|
|
539 |
+ |
static final class AsyncCompose<T,U> extends Async { |
540 |
+ |
final Function<? super T, CompletableFuture<U>> fn; |
541 |
+ |
final T arg; |
542 |
+ |
final CompletableFuture<U> dst; |
543 |
+ |
AsyncCompose(T arg, |
544 |
+ |
Function<? super T, CompletableFuture<U>> fn, |
545 |
+ |
CompletableFuture<U> dst) { |
546 |
+ |
this.arg = arg; this.fn = fn; this.dst = dst; |
547 |
+ |
} |
548 |
+ |
public final boolean exec() { |
549 |
+ |
CompletableFuture<U> d, fr; U u; Throwable ex; |
550 |
+ |
if ((d = this.dst) != null && d.result == null) { |
551 |
+ |
try { |
552 |
+ |
fr = fn.apply(arg); |
553 |
+ |
ex = null; |
554 |
+ |
} catch (Throwable rex) { |
555 |
+ |
ex = rex; |
556 |
+ |
fr = null; |
557 |
+ |
} |
558 |
+ |
if (ex != null) |
559 |
+ |
u = null; |
560 |
+ |
else if (fr == null) { |
561 |
+ |
ex = new NullPointerException(); |
562 |
+ |
u = null; |
563 |
+ |
} |
564 |
+ |
else { |
565 |
+ |
Object r = fr.result; |
566 |
+ |
if (r instanceof AltResult) { |
567 |
+ |
ex = ((AltResult)r).ex; |
568 |
+ |
u = null; |
569 |
+ |
} |
570 |
+ |
else { |
571 |
+ |
@SuppressWarnings("unchecked") U ur = (U) r; |
572 |
+ |
u = ur; |
573 |
+ |
} |
574 |
+ |
} |
575 |
+ |
d.internalComplete(u, ex); |
576 |
+ |
} |
577 |
+ |
return true; |
578 |
+ |
} |
579 |
+ |
private static final long serialVersionUID = 5232453952276885070L; |
580 |
+ |
} |
581 |
+ |
|
582 |
|
/* ------------- Completions -------------- */ |
583 |
|
|
584 |
|
/** |
1244 |
|
final CompletableFuture<? extends T> src; |
1245 |
|
final Function<? super T, CompletableFuture<U>> fn; |
1246 |
|
final CompletableFuture<U> dst; |
1247 |
+ |
final Executor executor; |
1248 |
|
ComposeCompletion(CompletableFuture<? extends T> src, |
1249 |
|
Function<? super T, CompletableFuture<U>> fn, |
1250 |
< |
final CompletableFuture<U> dst) { |
1250 |
> |
final CompletableFuture<U> dst, Executor executor) { |
1251 |
|
this.src = src; this.fn = fn; this.dst = dst; |
1252 |
+ |
this.executor = executor; |
1253 |
|
} |
1254 |
|
public final void run() { |
1255 |
|
final CompletableFuture<? extends T> a; |
1256 |
|
final Function<? super T, CompletableFuture<U>> fn; |
1257 |
|
final CompletableFuture<U> dst; |
1258 |
< |
Object r; T t; Throwable ex; |
1258 |
> |
Object r; T t; Throwable ex; Executor e; |
1259 |
|
if ((dst = this.dst) != null && |
1260 |
|
(fn = this.fn) != null && |
1261 |
|
(a = this.src) != null && |
1274 |
|
U u = null; |
1275 |
|
boolean complete = false; |
1276 |
|
if (ex == null) { |
1277 |
< |
try { |
1278 |
< |
c = fn.apply(t); |
1279 |
< |
} catch (Throwable rex) { |
1280 |
< |
ex = rex; |
1277 |
> |
if ((e = executor) != null) |
1278 |
> |
e.execute(new AsyncCompose<T,U>(t, fn, dst)); |
1279 |
> |
else { |
1280 |
> |
try { |
1281 |
> |
if ((c = fn.apply(t)) == null) |
1282 |
> |
ex = new NullPointerException(); |
1283 |
> |
} catch (Throwable rex) { |
1284 |
> |
ex = rex; |
1285 |
> |
} |
1286 |
|
} |
1287 |
|
} |
1288 |
< |
if (ex != null || c == null) { |
1239 |
< |
if (ex == null) |
1240 |
< |
ex = new NullPointerException(); |
1241 |
< |
} |
1242 |
< |
else { |
1288 |
> |
if (c != null) { |
1289 |
|
ThenCopy d = null; |
1290 |
|
Object s; |
1291 |
|
if ((s = c.result) == null) { |
2560 |
|
*/ |
2561 |
|
public <U> CompletableFuture<U> thenCompose(Function<? super T, |
2562 |
|
CompletableFuture<U>> fn) { |
2563 |
+ |
return doCompose(fn, null); |
2564 |
+ |
} |
2565 |
+ |
|
2566 |
+ |
/** |
2567 |
+ |
* Returns a CompletableFuture (or an equivalent one) produced |
2568 |
+ |
* asynchronously using the {@link ForkJoinPool#commonPool()} by |
2569 |
+ |
* the given function of the result of this CompletableFuture when |
2570 |
+ |
* completed. If this CompletableFuture completes exceptionally, |
2571 |
+ |
* then the returned CompletableFuture also does so, with a |
2572 |
+ |
* CompletionException holding this exception as its cause. |
2573 |
+ |
* |
2574 |
+ |
* @param fn the function returning a new CompletableFuture. |
2575 |
+ |
* @return the CompletableFuture, that {@code isDone()} upon |
2576 |
+ |
* return if completed by the given function, or an exception |
2577 |
+ |
* occurs. |
2578 |
+ |
*/ |
2579 |
+ |
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, |
2580 |
+ |
CompletableFuture<U>> fn) { |
2581 |
+ |
return doCompose(fn, ForkJoinPool.commonPool()); |
2582 |
+ |
} |
2583 |
+ |
|
2584 |
+ |
/** |
2585 |
+ |
* Returns a CompletableFuture (or an equivalent one) produced |
2586 |
+ |
* asynchronously using the the given executor by the given |
2587 |
+ |
* function of the result of this CompletableFuture when |
2588 |
+ |
* completed. If this CompletableFuture completes exceptionally, |
2589 |
+ |
* then the returned CompletableFuture also does so, with a |
2590 |
+ |
* CompletionException holding this exception as its cause. |
2591 |
+ |
* |
2592 |
+ |
* @param fn the function returning a new CompletableFuture. |
2593 |
+ |
* @param executor the executor to use for asynchronous execution |
2594 |
+ |
* @return the CompletableFuture, that {@code isDone()} upon |
2595 |
+ |
* return if completed by the given function, or an exception |
2596 |
+ |
* occurs. |
2597 |
+ |
*/ |
2598 |
+ |
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, |
2599 |
+ |
CompletableFuture<U>> fn, |
2600 |
+ |
Executor executor) { |
2601 |
+ |
if (executor == null) throw new NullPointerException(); |
2602 |
+ |
return doCompose(fn, executor); |
2603 |
+ |
} |
2604 |
+ |
|
2605 |
+ |
private <U> CompletableFuture<U> doCompose(Function<? super T, |
2606 |
+ |
CompletableFuture<U>> fn, |
2607 |
+ |
Executor e) { |
2608 |
|
if (fn == null) throw new NullPointerException(); |
2609 |
|
CompletableFuture<U> dst = null; |
2610 |
|
ComposeCompletion<T,U> d = null; |
2612 |
|
if ((r = result) == null) { |
2613 |
|
dst = new CompletableFuture<U>(); |
2614 |
|
CompletionNode p = new CompletionNode |
2615 |
< |
(d = new ComposeCompletion<T,U>(this, fn, dst)); |
2615 |
> |
(d = new ComposeCompletion<T,U>(this, fn, dst, e)); |
2616 |
|
while ((r = result) == null) { |
2617 |
|
if (UNSAFE.compareAndSwapObject |
2618 |
|
(this, COMPLETIONS, p.next = completions, p)) |
2631 |
|
t = tr; |
2632 |
|
} |
2633 |
|
if (ex == null) { |
2634 |
< |
try { |
2635 |
< |
dst = fn.apply(t); |
2636 |
< |
} catch (Throwable rex) { |
2637 |
< |
ex = rex; |
2634 |
> |
if (e != null) { |
2635 |
> |
if (dst == null) |
2636 |
> |
dst = new CompletableFuture<U>(); |
2637 |
> |
e.execute(new AsyncCompose<T,U>(t, fn, dst)); |
2638 |
> |
} |
2639 |
> |
else { |
2640 |
> |
try { |
2641 |
> |
dst = fn.apply(t); |
2642 |
> |
} catch (Throwable rex) { |
2643 |
> |
ex = rex; |
2644 |
> |
} |
2645 |
> |
if (dst == null) { |
2646 |
> |
dst = new CompletableFuture<U>(); |
2647 |
> |
if (ex == null) |
2648 |
> |
ex = new NullPointerException(); |
2649 |
> |
} |
2650 |
|
} |
2651 |
|
} |
2652 |
< |
if (dst == null) { |
2550 |
< |
dst = new CompletableFuture<U>(); |
2551 |
< |
if (ex == null) |
2552 |
< |
ex = new NullPointerException(); |
2553 |
< |
} |
2554 |
< |
if (ex != null) |
2652 |
> |
if (e == null && ex != null) |
2653 |
|
dst.internalComplete(null, ex); |
2654 |
|
} |
2655 |
|
helpPostComplete(); |
2972 |
|
/* ------------- Control and status methods -------------- */ |
2973 |
|
|
2974 |
|
/** |
2975 |
< |
* Attempts to complete this CompletableFuture with |
2976 |
< |
* a {@link CancellationException}. |
2975 |
> |
* If not already completed, completes this CompletableFuture with |
2976 |
> |
* a {@link CancellationException}. Dependent CompletableFutures |
2977 |
> |
* that have not already completed will also complete |
2978 |
> |
* exceptionally, with a {@link CompletionException} caused by |
2979 |
> |
* this {@code CancellationException}. |
2980 |
|
* |
2981 |
|
* @param mayInterruptIfRunning this value has no effect in this |
2982 |
|
* implementation because interrupts are not used to control |