ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/CompletableFuture.java (file contents):
Revision 1.36 by jsr166, Sat Feb 2 20:39:34 2013 UTC vs.
Revision 1.37 by dl, Tue Feb 5 12:49:13 2013 UTC

# Line 536 | Line 536 | public class CompletableFuture<T> implem
536          private static final long serialVersionUID = 5232453952276885070L;
537      }
538  
539 +    static final class AsyncCompose<T,U> extends Async {
540 +        final Function<? super T, CompletableFuture<U>> fn;
541 +        final T arg;
542 +        final CompletableFuture<U> dst;
543 +        AsyncCompose(T arg,
544 +                     Function<? super T, CompletableFuture<U>> fn,
545 +                     CompletableFuture<U> dst) {
546 +            this.arg = arg; this.fn = fn; this.dst = dst;
547 +        }
548 +        public final boolean exec() {
549 +            CompletableFuture<U> d, fr; U u; Throwable ex;
550 +            if ((d = this.dst) != null && d.result == null) {
551 +                try {
552 +                    fr = fn.apply(arg);
553 +                    ex = null;
554 +                } catch (Throwable rex) {
555 +                    ex = rex;
556 +                    fr = null;
557 +                }
558 +                if (ex != null)
559 +                    u = null;
560 +                else if (fr == null) {
561 +                    ex = new NullPointerException();
562 +                    u = null;
563 +                }
564 +                else {
565 +                    Object r = fr.result;
566 +                    if (r instanceof AltResult) {
567 +                        ex = ((AltResult)r).ex;
568 +                        u = null;
569 +                    }
570 +                    else {
571 +                        @SuppressWarnings("unchecked") U ur = (U) r;
572 +                        u = ur;
573 +                    }
574 +                }
575 +                d.internalComplete(u, ex);
576 +            }
577 +            return true;
578 +        }
579 +        private static final long serialVersionUID = 5232453952276885070L;
580 +    }
581 +
582      /* ------------- Completions -------------- */
583  
584      /**
# Line 1201 | Line 1244 | public class CompletableFuture<T> implem
1244          final CompletableFuture<? extends T> src;
1245          final Function<? super T, CompletableFuture<U>> fn;
1246          final CompletableFuture<U> dst;
1247 +        final Executor executor;
1248          ComposeCompletion(CompletableFuture<? extends T> src,
1249                            Function<? super T, CompletableFuture<U>> fn,
1250 <                          final CompletableFuture<U> dst) {
1250 >                          final CompletableFuture<U> dst, Executor executor) {
1251              this.src = src; this.fn = fn; this.dst = dst;
1252 +            this.executor = executor;
1253          }
1254          public final void run() {
1255              final CompletableFuture<? extends T> a;
1256              final Function<? super T, CompletableFuture<U>> fn;
1257              final CompletableFuture<U> dst;
1258 <            Object r; T t; Throwable ex;
1258 >            Object r; T t; Throwable ex; Executor e;
1259              if ((dst = this.dst) != null &&
1260                  (fn = this.fn) != null &&
1261                  (a = this.src) != null &&
# Line 1229 | Line 1274 | public class CompletableFuture<T> implem
1274                  U u = null;
1275                  boolean complete = false;
1276                  if (ex == null) {
1277 <                    try {
1278 <                        c = fn.apply(t);
1279 <                    } catch (Throwable rex) {
1280 <                        ex = rex;
1277 >                    if ((e = executor) != null)
1278 >                        e.execute(new AsyncCompose<T,U>(t, fn, dst));
1279 >                    else {
1280 >                        try {
1281 >                            if ((c = fn.apply(t)) == null)
1282 >                                ex = new NullPointerException();
1283 >                        } catch (Throwable rex) {
1284 >                            ex = rex;
1285 >                        }
1286                      }
1287                  }
1288 <                if (ex != null || c == null) {
1239 <                    if (ex == null)
1240 <                        ex = new NullPointerException();
1241 <                }
1242 <                else {
1288 >                if (c != null) {
1289                      ThenCopy d = null;
1290                      Object s;
1291                      if ((s = c.result) == null) {
# Line 2514 | Line 2560 | public class CompletableFuture<T> implem
2560       */
2561      public <U> CompletableFuture<U> thenCompose(Function<? super T,
2562                                                  CompletableFuture<U>> fn) {
2563 +        return doCompose(fn, null);
2564 +    }
2565 +
2566 +    /**
2567 +     * Returns a CompletableFuture (or an equivalent one) produced
2568 +     * asynchronously using the {@link ForkJoinPool#commonPool()} by
2569 +     * the given function of the result of this CompletableFuture when
2570 +     * completed.  If this CompletableFuture completes exceptionally,
2571 +     * then the returned CompletableFuture also does so, with a
2572 +     * CompletionException holding this exception as its cause.
2573 +     *
2574 +     * @param fn the function returning a new CompletableFuture.
2575 +     * @return the CompletableFuture, that {@code isDone()} upon
2576 +     * return if completed by the given function, or an exception
2577 +     * occurs.
2578 +     */
2579 +    public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,
2580 +                                                     CompletableFuture<U>> fn) {
2581 +        return doCompose(fn, ForkJoinPool.commonPool());
2582 +    }
2583 +
2584 +    /**
2585 +     * Returns a CompletableFuture (or an equivalent one) produced
2586 +     * asynchronously using the the given executor by the given
2587 +     * function of the result of this CompletableFuture when
2588 +     * completed.  If this CompletableFuture completes exceptionally,
2589 +     * then the returned CompletableFuture also does so, with a
2590 +     * CompletionException holding this exception as its cause.
2591 +     *
2592 +     * @param fn the function returning a new CompletableFuture.
2593 +     * @param executor the executor to use for asynchronous execution
2594 +     * @return the CompletableFuture, that {@code isDone()} upon
2595 +     * return if completed by the given function, or an exception
2596 +     * occurs.
2597 +     */
2598 +    public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,
2599 +                                                     CompletableFuture<U>> fn,
2600 +                                                     Executor executor) {
2601 +        if (executor == null) throw new NullPointerException();
2602 +        return doCompose(fn, executor);
2603 +    }
2604 +
2605 +    private <U> CompletableFuture<U> doCompose(Function<? super T,
2606 +                                               CompletableFuture<U>> fn,
2607 +                                               Executor e) {
2608          if (fn == null) throw new NullPointerException();
2609          CompletableFuture<U> dst = null;
2610          ComposeCompletion<T,U> d = null;
# Line 2521 | Line 2612 | public class CompletableFuture<T> implem
2612          if ((r = result) == null) {
2613              dst = new CompletableFuture<U>();
2614              CompletionNode p = new CompletionNode
2615 <                (d = new ComposeCompletion<T,U>(this, fn, dst));
2615 >                (d = new ComposeCompletion<T,U>(this, fn, dst, e));
2616              while ((r = result) == null) {
2617                  if (UNSAFE.compareAndSwapObject
2618                      (this, COMPLETIONS, p.next = completions, p))
# Line 2540 | Line 2631 | public class CompletableFuture<T> implem
2631                  t = tr;
2632              }
2633              if (ex == null) {
2634 <                try {
2635 <                    dst = fn.apply(t);
2636 <                } catch (Throwable rex) {
2637 <                    ex = rex;
2634 >                if (e != null) {
2635 >                    if (dst == null)
2636 >                        dst = new CompletableFuture<U>();
2637 >                    e.execute(new AsyncCompose<T,U>(t, fn, dst));
2638 >                }
2639 >                else {
2640 >                    try {
2641 >                        dst = fn.apply(t);
2642 >                    } catch (Throwable rex) {
2643 >                        ex = rex;
2644 >                    }
2645 >                    if (dst == null) {
2646 >                        dst = new CompletableFuture<U>();
2647 >                        if (ex == null)
2648 >                            ex = new NullPointerException();
2649 >                    }
2650                  }
2651              }
2652 <            if (dst == null) {
2550 <                dst = new CompletableFuture<U>();
2551 <                if (ex == null)
2552 <                    ex = new NullPointerException();
2553 <            }
2554 <            if (ex != null)
2652 >            if (e == null && ex != null)
2653                  dst.internalComplete(null, ex);
2654          }
2655          helpPostComplete();
# Line 2874 | Line 2972 | public class CompletableFuture<T> implem
2972      /* ------------- Control and status methods -------------- */
2973  
2974      /**
2975 <     * Attempts to complete this CompletableFuture with
2976 <     * a {@link CancellationException}.
2975 >     * If not already completed, completes this CompletableFuture with
2976 >     * a {@link CancellationException}. Dependent CompletableFutures
2977 >     * that have not already completed will also complete
2978 >     * exceptionally, with a {@link CompletionException} caused by
2979 >     * this {@code CancellationException}.
2980       *
2981       * @param mayInterruptIfRunning this value has no effect in this
2982       * implementation because interrupts are not used to control

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines