ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/CompletableFuture.java (file contents):
Revision 1.142 by jsr166, Wed Jan 7 17:19:00 2015 UTC vs.
Revision 1.143 by dl, Wed Jan 14 17:06:00 2015 UTC

# Line 37 | Line 37 | import java.util.function.Supplier;
37   * <li>All <em>async</em> methods without an explicit Executor
38   * argument are performed using the {@link ForkJoinPool#commonPool()}
39   * (unless it does not support a parallelism level of at least two, in
40 < * which case, a new Thread is created to run each task).  To simplify
41 < * monitoring, debugging, and tracking, all generated asynchronous
42 < * tasks are instances of the marker interface {@link
43 < * AsynchronousCompletionTask}. </li>
40 > * which case, a new Thread is created to run each task).  This may be
41 > * overridden for non-static methods in subclasses by defining method
42 > * {@link #defaultExecutor()}. To simplify monitoring, debugging,
43 > * and tracking, all generated asynchronous tasks are instances of the
44 > * marker interface {@link AsynchronousCompletionTask}.  Operations
45 > * with time-delays can use adaptor methods defined in this class, for
46 > * example: {@code supplyAsync(supplier, delayedExecutor(timeout,
47 > * timeUnit))}.  To support methods with delays and timeouts, this
48 > * class maintains at most one daemon thread for triggering and
49 > * cancelling actions, not for running them.</li>
50   *
51   * <li>All CompletionStage methods are implemented independently of
52   * other public methods, so the behavior of one method is not impacted
53 < * by overrides of others in subclasses.  </li> </ul>
53 > * by overrides of others in subclasses.  </li>
54 > *
55 > * <li>All CompletionStage methods return CompletableFutures.  To
56 > * restrict usages to only those methods defined in interface
57 > * CompletionStage, use the new CompletionStage from method {@link
58 > * minimalCompletionStage}. Or to ensure only that clients do not
59 > * themselves modify a future, use method {@link copy}. </li> </ul>
60   *
61   * <p>CompletableFuture also implements {@link Future} with the following
62   * policies: <ul>
# Line 65 | Line 77 | import java.util.function.Supplier;
77   * {@link #getNow} that instead throw the CompletionException directly
78   * in these cases.</li> </ul>
79   *
80 + * <p>Subclasses of this class should normally override the "virtual
81 + * constructor" method {@link #newIncompleteFuture}, which establishes
82 + * the concrete type returned by CompletionStage methods. For example,
83 + * here is a class that substitutes a different default Executor and
84 + * disables the {@code obtrude} methods:
85 + *
86 + * <pre> {@code
87 + * class MyCompletableFuture<T> extends CompletableFuture<T> {
88 + *   static final Executor myExecutor = ...;
89 + *   public MyCompletableFuture() { }
90 + *   public <U> CompletableFuture<U> newIncompleteFuture() {
91 + *     return new MyCompletableFuture<U>(); }
92 + *   public Executor defaultExecutor() {
93 + *     return myExecutor; }
94 + *   public void obtrudeValue(T value) {
95 + *     throw new UnsupportedOperationException(); }
96 + *    public void obtrudeException(Throwable ex) {
97 + *     throw new UnsupportedOperationException(); }
98 + * }}</pre>
99 + *
100   * @author Doug Lea
101   * @since 1.8
102   */
# Line 111 | Line 143 | public class CompletableFuture<T> implem
143       *   fields for source(s), actions, and dependent. They are
144       *   boringly similar, differing from others only with respect to
145       *   underlying functional forms. We do this so that users don't
146 <     *   encounter layers of adaptors in common usages. We also
115 <     *   include "Relay" classes/methods that don't correspond to user
116 <     *   methods; they copy results from one stage to another.
146 >     *   encounter layers of adaptors in common usages.
147       *
148       * * Boolean CompletableFuture method x(...) (for example
149       *   uniApply) takes all of the arguments needed to check that an
# Line 571 | Line 601 | public class CompletableFuture<T> implem
601      private <V> CompletableFuture<V> uniApplyStage(
602          Executor e, Function<? super T,? extends V> f) {
603          if (f == null) throw new NullPointerException();
604 <        CompletableFuture<V> d = new CompletableFuture<V>();
604 >        CompletableFuture<V> d = newIncompleteFuture();
605          if (e != null || !d.uniApply(this, f, null)) {
606              UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
607              push(c);
# Line 626 | Line 656 | public class CompletableFuture<T> implem
656      private CompletableFuture<Void> uniAcceptStage(Executor e,
657                                                     Consumer<? super T> f) {
658          if (f == null) throw new NullPointerException();
659 <        CompletableFuture<Void> d = new CompletableFuture<Void>();
659 >        CompletableFuture<Void> d = newIncompleteFuture();
660          if (e != null || !d.uniAccept(this, f, null)) {
661              UniAccept<T> c = new UniAccept<T>(e, d, this, f);
662              push(c);
# Line 674 | Line 704 | public class CompletableFuture<T> implem
704  
705      private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {
706          if (f == null) throw new NullPointerException();
707 <        CompletableFuture<Void> d = new CompletableFuture<Void>();
707 >        CompletableFuture<Void> d = newIncompleteFuture();
708          if (e != null || !d.uniRun(this, f, null)) {
709              UniRun<T> c = new UniRun<T>(e, d, this, f);
710              push(c);
# Line 735 | Line 765 | public class CompletableFuture<T> implem
765      private CompletableFuture<T> uniWhenCompleteStage(
766          Executor e, BiConsumer<? super T, ? super Throwable> f) {
767          if (f == null) throw new NullPointerException();
768 <        CompletableFuture<T> d = new CompletableFuture<T>();
768 >        CompletableFuture<T> d = newIncompleteFuture();
769          if (e != null || !d.uniWhenComplete(this, f, null)) {
770              UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f);
771              push(c);
# Line 791 | Line 821 | public class CompletableFuture<T> implem
821      private <V> CompletableFuture<V> uniHandleStage(
822          Executor e, BiFunction<? super T, Throwable, ? extends V> f) {
823          if (f == null) throw new NullPointerException();
824 <        CompletableFuture<V> d = new CompletableFuture<V>();
824 >        CompletableFuture<V> d = newIncompleteFuture();
825          if (e != null || !d.uniHandle(this, f, null)) {
826              UniHandle<T,V> c = new UniHandle<T,V>(e, d, this, f);
827              push(c);
# Line 841 | Line 871 | public class CompletableFuture<T> implem
871      private CompletableFuture<T> uniExceptionallyStage(
872          Function<Throwable, ? extends T> f) {
873          if (f == null) throw new NullPointerException();
874 <        CompletableFuture<T> d = new CompletableFuture<T>();
874 >        CompletableFuture<T> d = newIncompleteFuture();
875          if (!d.uniExceptionally(this, f, null)) {
876              UniExceptionally<T> c = new UniExceptionally<T>(d, this, f);
877              push(c);
# Line 873 | Line 903 | public class CompletableFuture<T> implem
903          return true;
904      }
905  
906 +    private CompletableFuture<T> uniCopyStage() {
907 +        Object r;
908 +        CompletableFuture<T> d = newIncompleteFuture();
909 +        if ((r = result) != null)
910 +            d.completeRelay(r);
911 +        else {
912 +            UniRelay<T> c = new UniRelay<T>(d, this);
913 +            push(c);
914 +            c.tryFire(SYNC);
915 +        }
916 +        return d;
917 +    }
918 +
919 +    private MinimalStage<T> uniAsMinimalStage() {
920 +        Object r;
921 +        if ((r = result) != null)
922 +            return new MinimalStage<T>(encodeRelay(r));
923 +        MinimalStage<T> d = new MinimalStage<T>();
924 +        UniRelay<T> c = new UniRelay<T>(d, this);
925 +        push(c);
926 +        c.tryFire(SYNC);
927 +        return d;
928 +    }
929 +
930      @SuppressWarnings("serial")
931      static final class UniCompose<T,V> extends UniCompletion<T,V> {
932          Function<? super T, ? extends CompletionStage<V>> fn;
# Line 928 | Line 982 | public class CompletableFuture<T> implem
982      private <V> CompletableFuture<V> uniComposeStage(
983          Executor e, Function<? super T, ? extends CompletionStage<V>> f) {
984          if (f == null) throw new NullPointerException();
985 <        Object r; Throwable x;
985 >        Object r, s; Throwable x;
986 >        CompletableFuture<V> d = newIncompleteFuture();
987          if (e == null && (r = result) != null) {
933            // try to return function result directly
988              if (r instanceof AltResult) {
989                  if ((x = ((AltResult)r).ex) != null) {
990 <                    return new CompletableFuture<V>(encodeThrowable(x, r));
990 >                    d.result = encodeThrowable(x, r);
991 >                    return d;
992                  }
993                  r = null;
994              }
995              try {
996                  @SuppressWarnings("unchecked") T t = (T) r;
997                  CompletableFuture<V> g = f.apply(t).toCompletableFuture();
998 <                Object s = g.result;
999 <                if (s != null)
1000 <                    return new CompletableFuture<V>(encodeRelay(s));
1001 <                CompletableFuture<V> d = new CompletableFuture<V>();
1002 <                UniRelay<V> copy = new UniRelay<V>(d, g);
1003 <                g.push(copy);
1004 <                copy.tryFire(SYNC);
998 >                if ((s = g.result) != null)
999 >                    d.completeRelay(s);
1000 >                else {
1001 >                    UniRelay<V> c = new UniRelay<V>(d, g);
1002 >                    push(c);
1003 >                    c.tryFire(SYNC);
1004 >                }
1005                  return d;
1006              } catch (Throwable ex) {
1007 <                return new CompletableFuture<V>(encodeThrowable(ex));
1007 >                d.result = encodeThrowable(ex);
1008 >                return d;
1009              }
1010          }
955        CompletableFuture<V> d = new CompletableFuture<V>();
1011          UniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f);
1012          push(c);
1013          c.tryFire(SYNC);
# Line 1077 | Line 1132 | public class CompletableFuture<T> implem
1132          CompletableFuture<U> b;
1133          if (f == null || (b = o.toCompletableFuture()) == null)
1134              throw new NullPointerException();
1135 <        CompletableFuture<V> d = new CompletableFuture<V>();
1135 >        CompletableFuture<V> d = newIncompleteFuture();
1136          if (e != null || !d.biApply(this, b, f, null)) {
1137              BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f);
1138              bipush(b, c);
# Line 1149 | Line 1204 | public class CompletableFuture<T> implem
1204          CompletableFuture<U> b;
1205          if (f == null || (b = o.toCompletableFuture()) == null)
1206              throw new NullPointerException();
1207 <        CompletableFuture<Void> d = new CompletableFuture<Void>();
1207 >        CompletableFuture<Void> d = newIncompleteFuture();
1208          if (e != null || !d.biAccept(this, b, f, null)) {
1209              BiAccept<T,U> c = new BiAccept<T,U>(e, d, this, b, f);
1210              bipush(b, c);
# Line 1208 | Line 1263 | public class CompletableFuture<T> implem
1263          CompletableFuture<?> b;
1264          if (f == null || (b = o.toCompletableFuture()) == null)
1265              throw new NullPointerException();
1266 <        CompletableFuture<Void> d = new CompletableFuture<Void>();
1266 >        CompletableFuture<Void> d = newIncompleteFuture();
1267          if (e != null || !d.biRun(this, b, f, null)) {
1268              BiRun<T,?> c = new BiRun<>(e, d, this, b, f);
1269              bipush(b, c);
# Line 1349 | Line 1404 | public class CompletableFuture<T> implem
1404          CompletableFuture<U> b;
1405          if (f == null || (b = o.toCompletableFuture()) == null)
1406              throw new NullPointerException();
1407 <        CompletableFuture<V> d = new CompletableFuture<V>();
1407 >        CompletableFuture<V> d = newIncompleteFuture();
1408          if (e != null || !d.orApply(this, b, f, null)) {
1409              OrApply<T,U,V> c = new OrApply<T,U,V>(e, d, this, b, f);
1410              orpush(b, c);
# Line 1413 | Line 1468 | public class CompletableFuture<T> implem
1468          CompletableFuture<U> b;
1469          if (f == null || (b = o.toCompletableFuture()) == null)
1470              throw new NullPointerException();
1471 <        CompletableFuture<Void> d = new CompletableFuture<Void>();
1471 >        CompletableFuture<Void> d = newIncompleteFuture();
1472          if (e != null || !d.orAccept(this, b, f, null)) {
1473              OrAccept<T,U> c = new OrAccept<T,U>(e, d, this, b, f);
1474              orpush(b, c);
# Line 1471 | Line 1526 | public class CompletableFuture<T> implem
1526          CompletableFuture<?> b;
1527          if (f == null || (b = o.toCompletableFuture()) == null)
1528              throw new NullPointerException();
1529 <        CompletableFuture<Void> d = new CompletableFuture<Void>();
1529 >        CompletableFuture<Void> d = newIncompleteFuture();
1530          if (e != null || !d.orRun(this, b, f, null)) {
1531              OrRun<T,?> c = new OrRun<>(e, d, this, b, f);
1532              orpush(b, c);
# Line 1507 | Line 1562 | public class CompletableFuture<T> implem
1562          return true;
1563      }
1564  
1565 +
1566      /** Recursively constructs a tree of completions. */
1567      static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs,
1568                                              int lo, int hi) {
# Line 1532 | Line 1588 | public class CompletableFuture<T> implem
1588  
1589      @SuppressWarnings("serial")
1590      static final class AsyncSupply<T> extends ForkJoinTask<Void>
1591 <            implements Runnable, AsynchronousCompletionTask {
1591 >        implements Runnable, AsynchronousCompletionTask {
1592          CompletableFuture<T> dep; Supplier<T> fn;
1593          AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
1594              this.dep = dep; this.fn = fn;
# Line 1568 | Line 1624 | public class CompletableFuture<T> implem
1624  
1625      @SuppressWarnings("serial")
1626      static final class AsyncRun extends ForkJoinTask<Void>
1627 <            implements Runnable, AsynchronousCompletionTask {
1627 >        implements Runnable, AsynchronousCompletionTask {
1628          CompletableFuture<Void> dep; Runnable fn;
1629          AsyncRun(CompletableFuture<Void> dep, Runnable fn) {
1630              this.dep = dep; this.fn = fn;
# Line 1757 | Line 1813 | public class CompletableFuture<T> implem
1813      /**
1814       * Creates a new complete CompletableFuture with given encoded result.
1815       */
1816 <    private CompletableFuture(Object r) {
1816 >    CompletableFuture(Object r) {
1817          this.result = r;
1818      }
1819  
# Line 1946 | Line 2002 | public class CompletableFuture<T> implem
2002  
2003      public <U> CompletableFuture<U> thenApplyAsync(
2004          Function<? super T,? extends U> fn) {
2005 <        return uniApplyStage(asyncPool, fn);
2005 >        return uniApplyStage(defaultExecutor(), fn);
2006      }
2007  
2008      public <U> CompletableFuture<U> thenApplyAsync(
# Line 1959 | Line 2015 | public class CompletableFuture<T> implem
2015      }
2016  
2017      public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
2018 <        return uniAcceptStage(asyncPool, action);
2018 >        return uniAcceptStage(defaultExecutor(), action);
2019      }
2020  
2021      public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
# Line 1972 | Line 2028 | public class CompletableFuture<T> implem
2028      }
2029  
2030      public CompletableFuture<Void> thenRunAsync(Runnable action) {
2031 <        return uniRunStage(asyncPool, action);
2031 >        return uniRunStage(defaultExecutor(), action);
2032      }
2033  
2034      public CompletableFuture<Void> thenRunAsync(Runnable action,
# Line 1989 | Line 2045 | public class CompletableFuture<T> implem
2045      public <U,V> CompletableFuture<V> thenCombineAsync(
2046          CompletionStage<? extends U> other,
2047          BiFunction<? super T,? super U,? extends V> fn) {
2048 <        return biApplyStage(asyncPool, other, fn);
2048 >        return biApplyStage(defaultExecutor(), other, fn);
2049      }
2050  
2051      public <U,V> CompletableFuture<V> thenCombineAsync(
# Line 2007 | Line 2063 | public class CompletableFuture<T> implem
2063      public <U> CompletableFuture<Void> thenAcceptBothAsync(
2064          CompletionStage<? extends U> other,
2065          BiConsumer<? super T, ? super U> action) {
2066 <        return biAcceptStage(asyncPool, other, action);
2066 >        return biAcceptStage(defaultExecutor(), other, action);
2067      }
2068  
2069      public <U> CompletableFuture<Void> thenAcceptBothAsync(
# Line 2023 | Line 2079 | public class CompletableFuture<T> implem
2079  
2080      public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
2081                                                       Runnable action) {
2082 <        return biRunStage(asyncPool, other, action);
2082 >        return biRunStage(defaultExecutor(), other, action);
2083      }
2084  
2085      public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
# Line 2039 | Line 2095 | public class CompletableFuture<T> implem
2095  
2096      public <U> CompletableFuture<U> applyToEitherAsync(
2097          CompletionStage<? extends T> other, Function<? super T, U> fn) {
2098 <        return orApplyStage(asyncPool, other, fn);
2098 >        return orApplyStage(defaultExecutor(), other, fn);
2099      }
2100  
2101      public <U> CompletableFuture<U> applyToEitherAsync(
# Line 2055 | Line 2111 | public class CompletableFuture<T> implem
2111  
2112      public CompletableFuture<Void> acceptEitherAsync(
2113          CompletionStage<? extends T> other, Consumer<? super T> action) {
2114 <        return orAcceptStage(asyncPool, other, action);
2114 >        return orAcceptStage(defaultExecutor(), other, action);
2115      }
2116  
2117      public CompletableFuture<Void> acceptEitherAsync(
# Line 2071 | Line 2127 | public class CompletableFuture<T> implem
2127  
2128      public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
2129                                                         Runnable action) {
2130 <        return orRunStage(asyncPool, other, action);
2130 >        return orRunStage(defaultExecutor(), other, action);
2131      }
2132  
2133      public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
# Line 2087 | Line 2143 | public class CompletableFuture<T> implem
2143  
2144      public <U> CompletableFuture<U> thenComposeAsync(
2145          Function<? super T, ? extends CompletionStage<U>> fn) {
2146 <        return uniComposeStage(asyncPool, fn);
2146 >        return uniComposeStage(defaultExecutor(), fn);
2147      }
2148  
2149      public <U> CompletableFuture<U> thenComposeAsync(
# Line 2103 | Line 2159 | public class CompletableFuture<T> implem
2159  
2160      public CompletableFuture<T> whenCompleteAsync(
2161          BiConsumer<? super T, ? super Throwable> action) {
2162 <        return uniWhenCompleteStage(asyncPool, action);
2162 >        return uniWhenCompleteStage(defaultExecutor(), action);
2163      }
2164  
2165      public CompletableFuture<T> whenCompleteAsync(
# Line 2118 | Line 2174 | public class CompletableFuture<T> implem
2174  
2175      public <U> CompletableFuture<U> handleAsync(
2176          BiFunction<? super T, Throwable, ? extends U> fn) {
2177 <        return uniHandleStage(asyncPool, fn);
2177 >        return uniHandleStage(defaultExecutor(), fn);
2178      }
2179  
2180      public <U> CompletableFuture<U> handleAsync(
# Line 2157 | Line 2213 | public class CompletableFuture<T> implem
2213          return uniExceptionallyStage(fn);
2214      }
2215  
2216 +
2217      /* ------------- Arbitrary-arity constructions -------------- */
2218  
2219      /**
# Line 2314 | Line 2371 | public class CompletableFuture<T> implem
2371       */
2372      public String toString() {
2373          Object r = result;
2374 <        int count;
2374 >        int count = 0; // avoid call to getNumberOfDependents in case disabled
2375 >        for (Completion p = stack; p != null; p = p.next)
2376 >            ++count;
2377          return super.toString() +
2378              ((r == null) ?
2379 <             (((count = getNumberOfDependents()) == 0) ?
2379 >             ((count == 0) ?
2380                "[Not completed]" :
2381                "[Not completed, " + count + " dependents]") :
2382               (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
# Line 2325 | Line 2384 | public class CompletableFuture<T> implem
2384                "[Completed normally]"));
2385      }
2386  
2387 +    // jdk9 additions
2388 +
2389 +    /**
2390 +     * Creates a new incomplete CompletableFuture of the type to be
2391 +     * returned by a CompletionStage method. Subclasses should
2392 +     * normally override this method to return an instance of the same
2393 +     * class as this CompletableFuture. The default implementation
2394 +     * returns an instance of class CompletableFuture.
2395 +     *
2396 +     * @return a new CompletableFuture
2397 +     * @param <U> the type of the value
2398 +     * @since 1.9
2399 +     */
2400 +    public <U> CompletableFuture<U> newIncompleteFuture() {
2401 +        return new CompletableFuture<U>();
2402 +    }
2403 +    /**
2404 +     * Returns the default Executor used for async methods that do not
2405 +     * specify an Executor. This class uses the {@link
2406 +     * ForkJoinPool#commonPool()}, but may be overridden in subclasses
2407 +     * with an Executor that provides at least one independent thread.
2408 +     *
2409 +     * @return the executor
2410 +     * @since 1.9
2411 +     */
2412 +    public Executor defaultExecutor() {
2413 +        return asyncPool;
2414 +    }
2415 +
2416 +    /**
2417 +     * Returns a new CompletableFuture that is completed normally with
2418 +     * the same value as this Completablefuture when it completes
2419 +     * normally. If this CompletableFuture completes exceptionally,
2420 +     * then the returned CompletableFuture completes exceptionally
2421 +     * with a CompletionException with this exception as cause. The
2422 +     * behavior equivalent is to {@code thenApply(x -> x)}. This
2423 +     * method may be useful as a form of "defensive copying", to
2424 +     * prevent clients from completing, while still being able to
2425 +     * arrange dependent actions.
2426 +     *
2427 +     * @return the new CompletableFuture
2428 +     * @since 1.9
2429 +     */
2430 +    public CompletableFuture<T> copy() {
2431 +        return uniCopyStage();
2432 +    }
2433 +
2434 +    /**
2435 +     * Returns a new CompletionStage that is completed normally with
2436 +     * the same value as this Completablefuture when it completes
2437 +     * normally, and cannot be independently completed or otherwise
2438 +     * used in ways not defined by the methods of interface {@link
2439 +     * CompletionStage}.  If this CompletableFuture completes
2440 +     * exceptionally, then the returned CompletionStage completes
2441 +     * exceptionally with a CompletionException with this exception as
2442 +     * cause.
2443 +     *
2444 +     * @return the new CompletionStage
2445 +     * @since 1.9
2446 +     */
2447 +    public CompletionStage<T> minimalCompletionStage() {
2448 +        return uniAsMinimalStage();
2449 +    }
2450 +
2451 +    /**
2452 +     * Completes this CompletableFuture with the result of
2453 +     * the given Supplier function invoked from an asynchronous
2454 +     * task using the given executor.
2455 +     *
2456 +     * @param supplier a function returning the value to be used
2457 +     * to complete this CompletableFuture
2458 +     * @param executor the executor to use for asynchronous execution
2459 +     * @return this CompletableFuture
2460 +     * @since 1.9
2461 +     */
2462 +    public CompletableFuture<T> completeAsync(Supplier<T> supplier,
2463 +                                              Executor executor) {
2464 +        if (supplier == null || executor == null)
2465 +            throw new NullPointerException();
2466 +        executor.execute(new AsyncSupply<T>(this, supplier));
2467 +        return this;
2468 +    }
2469 +
2470 +    /**
2471 +     * Completes this CompletableFuture with the result of the given
2472 +     * Supplier function invoked from an asynchronous task using the
2473 +     * the default executor.
2474 +     *
2475 +     * @param supplier a function returning the value to be used
2476 +     * to complete this CompletableFuture
2477 +     * @return this CompletableFuture
2478 +     * @since 1.9
2479 +     */
2480 +    public CompletableFuture<T> completeAsync(Supplier<T> supplier) {
2481 +        return completeAsync(supplier, defaultExecutor());
2482 +    }
2483 +
2484 +    /**
2485 +     * Exceptionally completes this CompletableFuture with
2486 +     * a {@link TimeoutException} if not otherwise completed
2487 +     * before the given timeout.
2488 +     *
2489 +     * @param timeout how long to wait before completing exceptionally
2490 +     *        with a TimeoutException, in units of {@code unit}
2491 +     * @param unit a {@code TimeUnit} determining how to interpret the
2492 +     *        {@code timeout} parameter
2493 +     * @return this CompletableFuture
2494 +     * @since 1.9
2495 +     */
2496 +    public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) {
2497 +        if (result == null)
2498 +            whenComplete(new Canceller(Delayer.delay(new Timeout(this),
2499 +                                                     timeout, unit)));
2500 +        return this;
2501 +    }
2502 +
2503 +    /**
2504 +     * Returns a new Executor that submits a task to the given base
2505 +     * executor after the given delay.
2506 +     *
2507 +     * @param delay how long to delay, in units of {@code unit}
2508 +     * @param unit a {@code TimeUnit} determining how to interpret the
2509 +     *        {@code delay} parameter
2510 +     * @param executor the base executor
2511 +     * @return the new delayed executor
2512 +     * @since 1.9
2513 +     */
2514 +    public static Executor delayedExecutor(long delay, TimeUnit unit,
2515 +                                           Executor executor) {
2516 +        if (unit == null || executor == null)
2517 +            throw new NullPointerException();
2518 +        return new DelayedExecutor(delay, unit, executor);
2519 +    }
2520 +
2521 +    /**
2522 +     * Returns a new Executor that submits a task to the default
2523 +     * executor after the given delay.
2524 +     *
2525 +     * @param delay how long to delay, in units of {@code unit}
2526 +     * @param unit a {@code TimeUnit} determining how to interpret the
2527 +     *        {@code delay} parameter
2528 +     * @return the new delayed executor
2529 +     * @since 1.9
2530 +     */
2531 +    public static Executor delayedExecutor(long delay, TimeUnit unit) {
2532 +        return new DelayedExecutor(delay, unit, asyncPool);
2533 +    }
2534 +
2535 +    /**
2536 +     * Returns a new CompletionStage that is already completed with
2537 +     * the given value and supports only those methods in
2538 +     * interface {@link CompletionStage}.
2539 +     *
2540 +     * @param value the value
2541 +     * @param <U> the type of the value
2542 +     * @return the completed CompletionStage
2543 +     * @since 1.9
2544 +     */
2545 +    public static <U> CompletionStage<U> completedStage(U value) {
2546 +        return new MinimalStage<U>((value == null) ? NIL : value);
2547 +    }
2548 +
2549 +    /**
2550 +     * Returns a new CompletableFuture that is already completed
2551 +     * exceptionally with the given exception.
2552 +     *
2553 +     * @param ex the ex
2554 +     * @param <U> the type of the value
2555 +     * @return the exceptionally completed CompletableFuture
2556 +     * @since 1.9
2557 +     */
2558 +    public static <U> CompletableFuture<U> failedFuture(Throwable ex) {
2559 +        if (ex == null) throw new NullPointerException();
2560 +        return new CompletableFuture<U>(encodeThrowable(ex));
2561 +    }
2562 +
2563 +    /**
2564 +     * Returns a new CompletionStage that is already completed
2565 +     * exceptionally with the given exception and supports only those
2566 +     * methods in interface {@link CompletionStage}.
2567 +     *
2568 +     * @param ex the ex
2569 +     * @param <U> the type of the value
2570 +     * @return the exceptionally completed CompletionStage
2571 +     * @since 1.9
2572 +     */
2573 +    public static <U> CompletableFuture<U> failedStage(Throwable ex) {
2574 +        if (ex == null) throw new NullPointerException();
2575 +        return new MinimalStage<U>(encodeThrowable(ex));
2576 +    }
2577 +
2578 +    /**
2579 +     * Singleton delay scheduler, used only for starting and
2580 +     * cancelling tasks.
2581 +     */
2582 +    static final class Delayer extends ScheduledThreadPoolExecutor {
2583 +        static final class DaemonThreadFactory implements ThreadFactory {
2584 +            public Thread newThread(Runnable r) {
2585 +                Thread t = new Thread(r);
2586 +                t.setDaemon(true);
2587 +                t.setName("CompletableFutureDelayScheduler");
2588 +                return t;
2589 +            }
2590 +        }
2591 +        Delayer() {
2592 +            super(1, new DaemonThreadFactory());
2593 +            setRemoveOnCancelPolicy(true);
2594 +        }
2595 +        static final Delayer instance = new Delayer();
2596 +
2597 +        public static ScheduledFuture<?> delay(Runnable command,
2598 +                                               long delay,
2599 +                                               TimeUnit unit) {
2600 +            return instance.schedule(command, delay, unit);
2601 +        }
2602 +    }
2603 +
2604 +    // Little class-ified lambdas to better support monitoring
2605 +
2606 +    static final class DelayedExecutor implements Executor {
2607 +        final long delay;
2608 +        final TimeUnit unit;
2609 +        final Executor executor;
2610 +        DelayedExecutor(long delay, TimeUnit unit, Executor executor) {
2611 +            this.delay = delay; this.unit = unit; this.executor = executor;
2612 +        }
2613 +        public void execute(Runnable r) {
2614 +            Delayer.delay(new Submitter(executor, r), delay, unit);
2615 +        }
2616 +    }
2617 +
2618 +    /** Delay action to asynchronously start user task */
2619 +    static final class Submitter implements Runnable {
2620 +        final Executor executor;
2621 +        final Runnable action;
2622 +        Submitter(Executor executor, Runnable action) {
2623 +            this.executor = executor;
2624 +            this.action = action;
2625 +        }
2626 +        public void run() { executor.execute(action); }
2627 +    }
2628 +
2629 +    /** Action to completeExceptionally on timeout */
2630 +    static final class Timeout implements Runnable {
2631 +        final CompletableFuture<?> f;
2632 +        Timeout(CompletableFuture<?> f) { this.f = f; }
2633 +        public void run() {
2634 +            if (f != null && !f.isDone())
2635 +                f.completeExceptionally(new TimeoutException());
2636 +        }
2637 +    }
2638 +
2639 +    /** Action to cancel unneeded timeouts */
2640 +    static final class Canceller implements BiConsumer<Object, Throwable>  {
2641 +        final Future<?> f;
2642 +        Canceller(Future<?> f) { this.f = f; }
2643 +        public void accept(Object ignore, Throwable ex) {
2644 +            if (ex == null && f != null && !f.isDone())
2645 +                f.cancel(false);
2646 +        }
2647 +    }
2648 +
2649 +    // MinimalStage subclass just throws UOE for non-CompletionStage methods
2650 +    static final class MinimalStage<T> extends CompletableFuture<T> {
2651 +        MinimalStage() { }
2652 +        MinimalStage(Object r) { super(r); }
2653 +        public <U> CompletableFuture<U> newIncompleteFuture() {
2654 +            return new MinimalStage<U>(); }
2655 +        public T get() {
2656 +            throw new UnsupportedOperationException(); }
2657 +        public T get(long timeout, TimeUnit unit) {
2658 +            throw new UnsupportedOperationException(); }
2659 +        public T getNow(T valueIfAbsent) {
2660 +            throw new UnsupportedOperationException(); }
2661 +        public T join() {
2662 +            throw new UnsupportedOperationException(); }
2663 +        public boolean complete(T value) {
2664 +            throw new UnsupportedOperationException(); }
2665 +        public boolean completeExceptionally(Throwable ex) {
2666 +            throw new UnsupportedOperationException(); }
2667 +        public boolean cancel(boolean mayInterruptIfRunning) {
2668 +            throw new UnsupportedOperationException(); }
2669 +        public void obtrudeValue(T value) {
2670 +            throw new UnsupportedOperationException(); }
2671 +        public void obtrudeException(Throwable ex) {
2672 +            throw new UnsupportedOperationException(); }
2673 +        public boolean isDone() {
2674 +            throw new UnsupportedOperationException(); }
2675 +        public boolean isCancelled() {
2676 +            throw new UnsupportedOperationException(); }
2677 +        public boolean isCompletedExceptionally() {
2678 +            throw new UnsupportedOperationException(); }
2679 +        public int getNumberOfDependents() {
2680 +            throw new UnsupportedOperationException(); }
2681 +    }
2682 +
2683      // Unsafe mechanics
2684      private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
2685      private static final long RESULT;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines