ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/CompletableFuture.java (file contents):
Revision 1.95 by jsr166, Thu Jul 18 17:13:42 2013 UTC vs.
Revision 1.96 by dl, Wed Jul 24 15:25:27 2013 UTC

# Line 43 | Line 43 | import java.util.concurrent.locks.LockSu
43   * <li>Actions supplied for dependent completions of
44   * <em>non-async</em> methods may be performed by the thread that
45   * completes the current CompletableFuture, or by any other caller of
46 < * these methods.</li>
46 > * a completion method.</li>
47   *
48   * <li>All <em>async</em> methods without an explicit Executor
49 < * argument are performed using the {@link
50 < * ForkJoinPool#commonPool()}. To simplify monitoring, debugging, and
51 < * tracking, all generated asynchronous tasks are instances of the
52 < * marker interface {@link AsynchronousCompletionTask}. </li>
49 > * argument are performed using the {@link ForkJoinPool#commonPool()}
50 > * (unless it does not support a parallelism level of at least two, in
51 > * which case, a new Thread is used). To simplify monitoring,
52 > * debugging, and tracking, all generated asynchronous tasks are
53 > * instances of the marker interface {@link
54 > * AsynchronousCompletionTask}. </li>
55   *
56   * <li>All CompletionStage methods are implemented independently of
57   * other public methods, so the behavior of one method is not impacted
# Line 396 | Line 398 | public class CompletableFuture<T> implem
398          public final void run() { exec(); }
399      }
400  
401 +    /**
402 +     * Starts the given async task using the given executor, unless
403 +     * the executor is ForkJoinPool.commonPool and it has been
404 +     * disabled, in which case starts a new thread.
405 +     */
406 +    static void execAsync(Executor e, Async r) {
407 +        if (e == ForkJoinPool.commonPool() &&
408 +            ForkJoinPool.getCommonPoolParallelism() <= 1)
409 +            new Thread(r).start();
410 +        else
411 +            e.execute(r);
412 +    }
413 +
414      static final class AsyncRun extends Async {
415          final Runnable fn;
416          final CompletableFuture<Void> dst;
# Line 666 | Line 681 | public class CompletableFuture<T> implem
681                  if (ex == null) {
682                      try {
683                          if (e != null)
684 <                            e.execute(new AsyncApply<T,U>(t, fn, dst));
684 >                            execAsync(e, new AsyncApply<T,U>(t, fn, dst));
685                          else
686                              u = fn.apply(t);
687                      } catch (Throwable rex) {
# Line 715 | Line 730 | public class CompletableFuture<T> implem
730                  if (ex == null) {
731                      try {
732                          if (e != null)
733 <                            e.execute(new AsyncAccept<T>(t, fn, dst));
733 >                            execAsync(e, new AsyncAccept<T>(t, fn, dst));
734                          else
735                              fn.accept(t);
736                      } catch (Throwable rex) {
# Line 759 | Line 774 | public class CompletableFuture<T> implem
774                  if (ex == null) {
775                      try {
776                          if (e != null)
777 <                            e.execute(new AsyncRun(fn, dst));
777 >                            execAsync(e, new AsyncRun(fn, dst));
778                          else
779                              fn.run();
780                      } catch (Throwable rex) {
# Line 825 | Line 840 | public class CompletableFuture<T> implem
840                  if (ex == null) {
841                      try {
842                          if (e != null)
843 <                            e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst));
843 >                            execAsync(e, new AsyncCombine<T,U,V>(t, u, fn, dst));
844                          else
845                              v = fn.apply(t, u);
846                      } catch (Throwable rex) {
# Line 890 | Line 905 | public class CompletableFuture<T> implem
905                  if (ex == null) {
906                      try {
907                          if (e != null)
908 <                            e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst));
908 >                            execAsync(e, new AsyncAcceptBoth<T,U>(t, u, fn, dst));
909                          else
910                              fn.accept(t, u);
911                      } catch (Throwable rex) {
# Line 942 | Line 957 | public class CompletableFuture<T> implem
957                  if (ex == null) {
958                      try {
959                          if (e != null)
960 <                            e.execute(new AsyncRun(fn, dst));
960 >                            execAsync(e, new AsyncRun(fn, dst));
961                          else
962                              fn.run();
963                      } catch (Throwable rex) {
# Line 1028 | Line 1043 | public class CompletableFuture<T> implem
1043                  if (ex == null) {
1044                      try {
1045                          if (e != null)
1046 <                            e.execute(new AsyncApply<T,U>(t, fn, dst));
1046 >                            execAsync(e, new AsyncApply<T,U>(t, fn, dst));
1047                          else
1048                              u = fn.apply(t);
1049                      } catch (Throwable rex) {
# Line 1081 | Line 1096 | public class CompletableFuture<T> implem
1096                  if (ex == null) {
1097                      try {
1098                          if (e != null)
1099 <                            e.execute(new AsyncAccept<T>(t, fn, dst));
1099 >                            execAsync(e, new AsyncAccept<T>(t, fn, dst));
1100                          else
1101                              fn.accept(t);
1102                      } catch (Throwable rex) {
# Line 1129 | Line 1144 | public class CompletableFuture<T> implem
1144                  if (ex == null) {
1145                      try {
1146                          if (e != null)
1147 <                            e.execute(new AsyncRun(fn, dst));
1147 >                            execAsync(e, new AsyncRun(fn, dst));
1148                          else
1149                              fn.run();
1150                      } catch (Throwable rex) {
# Line 1247 | Line 1262 | public class CompletableFuture<T> implem
1262                  Throwable dx = null;
1263                  try {
1264                      if (e != null)
1265 <                        e.execute(new AsyncWhenComplete<T>(t, ex, fn, dst));
1265 >                        execAsync(e, new AsyncWhenComplete<T>(t, ex, fn, dst));
1266                      else
1267                          fn.accept(t, ex);
1268                  } catch (Throwable rex) {
# Line 1352 | Line 1367 | public class CompletableFuture<T> implem
1367                  Throwable dx = null;
1368                  try {
1369                      if (e != null)
1370 <                        e.execute(new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
1370 >                        execAsync(e, new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
1371                      else
1372                          u = fn.apply(t, ex);
1373                  } catch (Throwable rex) {
# Line 1401 | Line 1416 | public class CompletableFuture<T> implem
1416                  boolean complete = false;
1417                  if (ex == null) {
1418                      if ((e = executor) != null)
1419 <                        e.execute(new AsyncCompose<T,U>(t, fn, dst));
1419 >                        execAsync(e, new AsyncCompose<T,U>(t, fn, dst));
1420                      else {
1421                          try {
1422                              CompletionStage<U> cs = fn.apply(t);
# Line 1479 | Line 1494 | public class CompletableFuture<T> implem
1494              if (ex == null) {
1495                  try {
1496                      if (e != null)
1497 <                        e.execute(new AsyncApply<T,U>(t, fn, dst));
1497 >                        execAsync(e, new AsyncApply<T,U>(t, fn, dst));
1498                      else
1499                          u = fn.apply(t);
1500                  } catch (Throwable rex) {
# Line 1522 | Line 1537 | public class CompletableFuture<T> implem
1537              if (ex == null) {
1538                  try {
1539                      if (e != null)
1540 <                        e.execute(new AsyncAccept<T>(t, fn, dst));
1540 >                        execAsync(e, new AsyncAccept<T>(t, fn, dst));
1541                      else
1542                          fn.accept(t);
1543                  } catch (Throwable rex) {
# Line 1560 | Line 1575 | public class CompletableFuture<T> implem
1575              if (ex == null) {
1576                  try {
1577                      if (e != null)
1578 <                        e.execute(new AsyncRun(action, dst));
1578 >                        execAsync(e, new AsyncRun(action, dst));
1579                      else
1580                          action.run();
1581                  } catch (Throwable rex) {
# Line 1627 | Line 1642 | public class CompletableFuture<T> implem
1642              if (ex == null) {
1643                  try {
1644                      if (e != null)
1645 <                        e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst));
1645 >                        execAsync(e, new AsyncCombine<T,U,V>(t, u, fn, dst));
1646                      else
1647                          v = fn.apply(t, u);
1648                  } catch (Throwable rex) {
# Line 1694 | Line 1709 | public class CompletableFuture<T> implem
1709              if (ex == null) {
1710                  try {
1711                      if (e != null)
1712 <                        e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst));
1712 >                        execAsync(e, new AsyncAcceptBoth<T,U>(t, u, fn, dst));
1713                      else
1714                          fn.accept(t, u);
1715                  } catch (Throwable rex) {
# Line 1747 | Line 1762 | public class CompletableFuture<T> implem
1762              if (ex == null) {
1763                  try {
1764                      if (e != null)
1765 <                        e.execute(new AsyncRun(action, dst));
1765 >                        execAsync(e, new AsyncRun(action, dst));
1766                      else
1767                          action.run();
1768                  } catch (Throwable rex) {
# Line 1799 | Line 1814 | public class CompletableFuture<T> implem
1814              if (ex == null) {
1815                  try {
1816                      if (e != null)
1817 <                        e.execute(new AsyncApply<T,U>(t, fn, dst));
1817 >                        execAsync(e, new AsyncApply<T,U>(t, fn, dst));
1818                      else
1819                          u = fn.apply(t);
1820                  } catch (Throwable rex) {
# Line 1850 | Line 1865 | public class CompletableFuture<T> implem
1865              if (ex == null) {
1866                  try {
1867                      if (e != null)
1868 <                        e.execute(new AsyncAccept<T>(t, fn, dst));
1868 >                        execAsync(e, new AsyncAccept<T>(t, fn, dst));
1869                      else
1870                          fn.accept(t);
1871                  } catch (Throwable rex) {
# Line 1896 | Line 1911 | public class CompletableFuture<T> implem
1911              if (ex == null) {
1912                  try {
1913                      if (e != null)
1914 <                        e.execute(new AsyncRun(action, dst));
1914 >                        execAsync(e, new AsyncRun(action, dst));
1915                      else
1916                          action.run();
1917                  } catch (Throwable rex) {
# Line 1943 | Line 1958 | public class CompletableFuture<T> implem
1958                  if (e != null) {
1959                      if (dst == null)
1960                          dst = new CompletableFuture<U>();
1961 <                    e.execute(new AsyncCompose<T,U>(t, fn, dst));
1961 >                    execAsync(e, new AsyncCompose<T,U>(t, fn, dst));
1962                  }
1963                  else {
1964                      try {
# Line 1997 | Line 2012 | public class CompletableFuture<T> implem
2012              Throwable dx = null;
2013              try {
2014                  if (e != null)
2015 <                    e.execute(new AsyncWhenComplete<T>(t, ex, fn, dst));
2015 >                    execAsync(e, new AsyncWhenComplete<T>(t, ex, fn, dst));
2016                  else
2017                      fn.accept(t, ex);
2018              } catch (Throwable rex) {
# Line 2042 | Line 2057 | public class CompletableFuture<T> implem
2057              Throwable dx = null;
2058              try {
2059                  if (e != null)
2060 <                    e.execute(new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
2060 >                    execAsync(e, new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
2061                  else {
2062                      u = fn.apply(t, ex);
2063                      dx = null;
# Line 2080 | Line 2095 | public class CompletableFuture<T> implem
2095      public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
2096          if (supplier == null) throw new NullPointerException();
2097          CompletableFuture<U> f = new CompletableFuture<U>();
2098 <        ForkJoinPool.commonPool().
2084 <            execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
2098 >        execAsync(ForkJoinPool.commonPool(), new AsyncSupply<U>(supplier, f));
2099          return f;
2100      }
2101  
# Line 2101 | Line 2115 | public class CompletableFuture<T> implem
2115          if (executor == null || supplier == null)
2116              throw new NullPointerException();
2117          CompletableFuture<U> f = new CompletableFuture<U>();
2118 <        executor.execute(new AsyncSupply<U>(supplier, f));
2118 >        execAsync(executor, new AsyncSupply<U>(supplier, f));
2119          return f;
2120      }
2121  
# Line 2117 | Line 2131 | public class CompletableFuture<T> implem
2131      public static CompletableFuture<Void> runAsync(Runnable runnable) {
2132          if (runnable == null) throw new NullPointerException();
2133          CompletableFuture<Void> f = new CompletableFuture<Void>();
2134 <        ForkJoinPool.commonPool().
2121 <            execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
2134 >        execAsync(ForkJoinPool.commonPool(), new AsyncRun(runnable, f));
2135          return f;
2136      }
2137  
# Line 2137 | Line 2150 | public class CompletableFuture<T> implem
2150          if (executor == null || runnable == null)
2151              throw new NullPointerException();
2152          CompletableFuture<Void> f = new CompletableFuture<Void>();
2153 <        executor.execute(new AsyncRun(runnable, f));
2153 >        execAsync(executor, new AsyncRun(runnable, f));
2154          return f;
2155      }
2156  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines