ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk8/java/util/concurrent/CompletableFuture.java
(Generate patch)

Comparing jsr166/src/jdk8/java/util/concurrent/CompletableFuture.java (file contents):
Revision 1.1 by jsr166, Sat Mar 26 06:22:49 2016 UTC vs.
Revision 1.6 by jsr166, Thu May 5 16:26:20 2016 UTC

# Line 207 | Line 207 | public class CompletableFuture<T> implem
207       * pointing back to its sources. So we null out fields as soon as
208       * possible.  The screening checks needed anyway harmlessly ignore
209       * null arguments that may have been obtained during races with
210 <     * threads nulling out fields.  We also try to unlink fired
211 <     * Completions from stacks that might never be popped (see method
212 <     * postFire).  Completion fields need not be declared as final or
213 <     * volatile because they are only visible to other threads upon
214 <     * safe publication.
210 >     * threads nulling out fields.  We also try to unlink non-isLive
211 >     * (fired or cancelled) Completions from stacks that might
212 >     * otherwise never be popped: Method cleanStack always unlinks non
213 >     * isLive completions from the head of stack; others may
214 >     * occasionally remain if racing with other cancellations or
215 >     * removals.
216 >     *
217 >     * Completion fields need not be declared as final or volatile
218 >     * because they are only visible to other threads upon safe
219 >     * publication.
220       */
221  
222      volatile Object result;       // Either the result or boxed AltResult
# Line 322 | Line 327 | public class CompletableFuture<T> implem
327       */
328      static Object encodeRelay(Object r) {
329          Throwable x;
330 <        return (((r instanceof AltResult) &&
331 <                 (x = ((AltResult)r).ex) != null &&
332 <                 !(x instanceof CompletionException)) ?
333 <                new AltResult(new CompletionException(x)) : r);
330 >        if (r instanceof AltResult
331 >            && (x = ((AltResult)r).ex) != null
332 >            && !(x instanceof CompletionException))
333 >            r = new AltResult(new CompletionException(x));
334 >        return r;
335      }
336  
337      /**
# Line 340 | Line 346 | public class CompletableFuture<T> implem
346      /**
347       * Reports result using Future.get conventions.
348       */
349 <    private static <T> T reportGet(Object r)
349 >    private static Object reportGet(Object r)
350          throws InterruptedException, ExecutionException {
351          if (r == null) // by convention below, null means interrupted
352              throw new InterruptedException();
# Line 355 | Line 361 | public class CompletableFuture<T> implem
361                  x = cause;
362              throw new ExecutionException(x);
363          }
364 <        @SuppressWarnings("unchecked") T t = (T) r;
359 <        return t;
364 >        return r;
365      }
366  
367      /**
368       * Decodes outcome to return result or throw unchecked exception.
369       */
370 <    private static <T> T reportJoin(Object r) {
370 >    private static Object reportJoin(Object r) {
371          if (r instanceof AltResult) {
372              Throwable x;
373              if ((x = ((AltResult)r).ex) == null)
# Line 373 | Line 378 | public class CompletableFuture<T> implem
378                  throw (CompletionException)x;
379              throw new CompletionException(x);
380          }
381 <        @SuppressWarnings("unchecked") T t = (T) r;
377 <        return t;
381 >        return r;
382      }
383  
384      /* ------------- Async task preliminaries -------------- */
# Line 420 | Line 424 | public class CompletableFuture<T> implem
424      static final int ASYNC  =  1;
425      static final int NESTED = -1;
426  
423    /**
424     * Spins before blocking in waitingGet
425     */
426    static final int SPINS = (Runtime.getRuntime().availableProcessors() > 1 ?
427                              1 << 8 : 0);
428
427      /* ------------- Base Completion classes and operations -------------- */
428  
429      @SuppressWarnings("serial")
# Line 454 | Line 452 | public class CompletableFuture<T> implem
452          U.putOrderedObject(c, NEXT, next);
453      }
454  
455 +    static boolean casNext(Completion c, Completion cmp, Completion val) {
456 +        return U.compareAndSwapObject(c, NEXT, cmp, val);
457 +    }
458 +
459      /**
460       * Pops and tries to trigger all reachable dependents.  Call only
461       * when known to be done.
# Line 474 | Line 476 | public class CompletableFuture<T> implem
476                          pushStack(h);
477                          continue;
478                      }
479 <                    h.next = null;    // detach
479 >                    casNext(h, t, null);    // try to detach
480                  }
481                  f = (d = h.tryFire(NESTED)) == null ? this : d;
482              }
483          }
484      }
485  
486 <    /** Traverses stack and unlinks dead Completions. */
486 >    /** Traverses stack and unlinks one or more dead Completions, if found. */
487      final void cleanStack() {
488 <        for (Completion p = null, q = stack; q != null;) {
489 <            Completion s = q.next;
490 <            if (q.isLive()) {
491 <                p = q;
492 <                q = s;
493 <            }
494 <            else if (p == null) {
495 <                casStack(q, s);
496 <                q = stack;
495 <            }
496 <            else {
497 <                p.next = s;
498 <                if (p.isLive())
488 >        boolean unlinked = false;
489 >        Completion p;
490 >        while ((p = stack) != null && !p.isLive()) // ensure head of stack live
491 >            unlinked = casStack(p, p.next);
492 >        if (p != null && !unlinked) {              // try to unlink first nonlive
493 >            for (Completion q = p.next; q != null;) {
494 >                Completion s = q.next;
495 >                if (q.isLive()) {
496 >                    p = q;
497                      q = s;
498 +                }
499                  else {
500 <                    p = null;  // restart
501 <                    q = stack;
500 >                    casNext(p, q, s);
501 >                    break;
502                  }
503              }
504          }
# Line 539 | Line 538 | public class CompletableFuture<T> implem
538          final boolean isLive() { return dep != null; }
539      }
540  
541 <    /** Pushes the given completion (if it exists) unless done. */
542 <    final void push(UniCompletion<?,?> c) {
541 >    /**
542 >     * Pushes the given completion unless it completes while trying.
543 >     * Caller should have first checked that result is null.
544 >     */
545 >    final void unipush(UniCompletion<?,?> c) {
546          if (c != null) {
547 <            while (result == null && !tryPushStack(c))
548 <                lazySetNext(c, null); // clear on failure
547 >            while (!tryPushStack(c)) {
548 >                if (result != null) {
549 >                    lazySetNext(c, null);
550 >                    break;
551 >                }
552 >            }
553 >            if (result != null)
554 >                c.tryFire(SYNC);
555          }
556      }
557  
# Line 554 | Line 562 | public class CompletableFuture<T> implem
562       */
563      final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
564          if (a != null && a.stack != null) {
565 <            if (a.result == null)
565 >            Object r;
566 >            if ((r = a.result) == null)
567                  a.cleanStack();
568 <            else if (mode >= 0)
568 >            if (mode >= 0 && (r != null || a.result != null))
569                  a.postComplete();
570          }
571          if (result != null && stack != null) {
# Line 618 | Line 627 | public class CompletableFuture<T> implem
627          CompletableFuture<V> d = newIncompleteFuture();
628          if (e != null || !d.uniApply(this, f, null)) {
629              UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
630 <            push(c);
631 <            c.tryFire(SYNC);
630 >            if (e != null && result != null) {
631 >                try {
632 >                    e.execute(c);
633 >                } catch (Throwable ex) {
634 >                    d.completeThrowable(ex);
635 >                }
636 >            }
637 >            else {
638 >                unipush(c);
639 >            }
640          }
641          return d;
642      }
# Line 673 | Line 690 | public class CompletableFuture<T> implem
690          CompletableFuture<Void> d = newIncompleteFuture();
691          if (e != null || !d.uniAccept(this, f, null)) {
692              UniAccept<T> c = new UniAccept<T>(e, d, this, f);
693 <            push(c);
694 <            c.tryFire(SYNC);
693 >            if (e != null && result != null) {
694 >                try {
695 >                    e.execute(c);
696 >                } catch (Throwable ex) {
697 >                    d.completeThrowable(ex);
698 >                }
699 >            }
700 >            else {
701 >                unipush(c);
702 >            }
703          }
704          return d;
705      }
# Line 721 | Line 746 | public class CompletableFuture<T> implem
746          CompletableFuture<Void> d = newIncompleteFuture();
747          if (e != null || !d.uniRun(this, f, null)) {
748              UniRun<T> c = new UniRun<T>(e, d, this, f);
749 <            push(c);
750 <            c.tryFire(SYNC);
749 >            if (e != null && result != null) {
750 >                try {
751 >                    e.execute(c);
752 >                } catch (Throwable ex) {
753 >                    d.completeThrowable(ex);
754 >                }
755 >            }
756 >            else {
757 >                unipush(c);
758 >            }
759          }
760          return d;
761      }
# Line 784 | Line 817 | public class CompletableFuture<T> implem
817          CompletableFuture<T> d = newIncompleteFuture();
818          if (e != null || !d.uniWhenComplete(this, f, null)) {
819              UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f);
820 <            push(c);
821 <            c.tryFire(SYNC);
820 >            if (e != null && result != null) {
821 >                try {
822 >                    e.execute(c);
823 >                } catch (Throwable ex) {
824 >                    d.completeThrowable(ex);
825 >                }
826 >            }
827 >            else {
828 >                unipush(c);
829 >            }
830          }
831          return d;
832      }
# Line 840 | Line 881 | public class CompletableFuture<T> implem
881          CompletableFuture<V> d = newIncompleteFuture();
882          if (e != null || !d.uniHandle(this, f, null)) {
883              UniHandle<T,V> c = new UniHandle<T,V>(e, d, this, f);
884 <            push(c);
885 <            c.tryFire(SYNC);
884 >            if (e != null && result != null) {
885 >                try {
886 >                    e.execute(c);
887 >                } catch (Throwable ex) {
888 >                    d.completeThrowable(ex);
889 >                }
890 >            }
891 >            else {
892 >                unipush(c);
893 >            }
894          }
895          return d;
896      }
# Line 888 | Line 937 | public class CompletableFuture<T> implem
937          Function<Throwable, ? extends T> f) {
938          if (f == null) throw new NullPointerException();
939          CompletableFuture<T> d = newIncompleteFuture();
940 <        if (!d.uniExceptionally(this, f, null)) {
941 <            UniExceptionally<T> c = new UniExceptionally<T>(d, this, f);
893 <            push(c);
894 <            c.tryFire(SYNC);
895 <        }
940 >        if (!d.uniExceptionally(this, f, null))
941 >            unipush(new UniExceptionally<T>(d, this, f));
942          return d;
943      }
944  
# Line 925 | Line 971 | public class CompletableFuture<T> implem
971          if ((r = result) != null)
972              d.completeRelay(r);
973          else {
974 <            UniRelay<T> c = new UniRelay<T>(d, this);
929 <            push(c);
930 <            c.tryFire(SYNC);
974 >            unipush(new UniRelay<T>(d, this));
975          }
976          return d;
977      }
# Line 937 | Line 981 | public class CompletableFuture<T> implem
981          if ((r = result) != null)
982              return new MinimalStage<T>(encodeRelay(r));
983          MinimalStage<T> d = new MinimalStage<T>();
984 <        UniRelay<T> c = new UniRelay<T>(d, this);
941 <        push(c);
942 <        c.tryFire(SYNC);
984 >        unipush(new UniRelay<T>(d, this));
985          return d;
986      }
987  
# Line 982 | Line 1024 | public class CompletableFuture<T> implem
1024                  @SuppressWarnings("unchecked") S s = (S) r;
1025                  CompletableFuture<T> g = f.apply(s).toCompletableFuture();
1026                  if (g.result == null || !uniRelay(g)) {
1027 <                    UniRelay<T> copy = new UniRelay<T>(this, g);
986 <                    g.push(copy);
987 <                    copy.tryFire(SYNC);
1027 >                    g.unipush(new UniRelay<T>(this, g));
1028                      if (result == null)
1029                          return false;
1030                  }
# Line 1000 | Line 1040 | public class CompletableFuture<T> implem
1040          if (f == null) throw new NullPointerException();
1041          Object r, s; Throwable x;
1042          CompletableFuture<V> d = newIncompleteFuture();
1043 <        if (e == null && (r = result) != null) {
1043 >        if ((r = result) != null && e == null) {
1044              if (r instanceof AltResult) {
1045                  if ((x = ((AltResult)r).ex) != null) {
1046                      d.result = encodeThrowable(x, r);
# Line 1014 | Line 1054 | public class CompletableFuture<T> implem
1054                  if ((s = g.result) != null)
1055                      d.completeRelay(s);
1056                  else {
1057 <                    UniRelay<V> c = new UniRelay<V>(d, g);
1018 <                    g.push(c);
1019 <                    c.tryFire(SYNC);
1057 >                    g.unipush(new UniRelay<V>(d, g));
1058                  }
1059                  return d;
1060              } catch (Throwable ex) {
# Line 1024 | Line 1062 | public class CompletableFuture<T> implem
1062                  return d;
1063              }
1064          }
1065 <        UniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f);
1066 <        push(c);
1067 <        c.tryFire(SYNC);
1065 >        if (r != null && e != null) {
1066 >            try {
1067 >                e.execute(new UniCompose<T,V>(null, d, this, f));
1068 >            } catch (Throwable ex) {
1069 >                d.completeThrowable(ex);
1070 >            }
1071 >        }
1072 >        else {
1073 >            unipush(new UniCompose<T,V>(e, d, this, f));
1074 >        }
1075          return d;
1076      }
1077  
# Line 1078 | Line 1123 | public class CompletableFuture<T> implem
1123      final CompletableFuture<T> postFire(CompletableFuture<?> a,
1124                                          CompletableFuture<?> b, int mode) {
1125          if (b != null && b.stack != null) { // clean second source
1126 <            if (b.result == null)
1126 >            Object r;
1127 >            if ((r = b.result) == null)
1128                  b.cleanStack();
1129 <            else if (mode >= 0)
1129 >            if (mode >= 0 && (r != null || b.result != null))
1130                  b.postComplete();
1131          }
1132          return postFire(a, mode);
# Line 1151 | Line 1197 | public class CompletableFuture<T> implem
1197          CompletableFuture<V> d = newIncompleteFuture();
1198          if (e != null || !d.biApply(this, b, f, null)) {
1199              BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f);
1200 <            bipush(b, c);
1201 <            c.tryFire(SYNC);
1200 >            if (e != null && result != null && b.result != null) {
1201 >                try {
1202 >                    e.execute(c);
1203 >                } catch (Throwable ex) {
1204 >                    d.completeThrowable(ex);
1205 >                }
1206 >            }
1207 >            else {
1208 >                bipush(b, c);
1209 >                c.tryFire(SYNC);
1210 >            }
1211          }
1212          return d;
1213      }
# Line 1223 | Line 1278 | public class CompletableFuture<T> implem
1278          CompletableFuture<Void> d = newIncompleteFuture();
1279          if (e != null || !d.biAccept(this, b, f, null)) {
1280              BiAccept<T,U> c = new BiAccept<T,U>(e, d, this, b, f);
1281 <            bipush(b, c);
1282 <            c.tryFire(SYNC);
1281 >            if (e != null && result != null && b.result != null) {
1282 >                try {
1283 >                    e.execute(c);
1284 >                } catch (Throwable ex) {
1285 >                    d.completeThrowable(ex);
1286 >                }
1287 >            }
1288 >            else {
1289 >                bipush(b, c);
1290 >                c.tryFire(SYNC);
1291 >            }
1292          }
1293          return d;
1294      }
# Line 1282 | Line 1346 | public class CompletableFuture<T> implem
1346          CompletableFuture<Void> d = newIncompleteFuture();
1347          if (e != null || !d.biRun(this, b, f, null)) {
1348              BiRun<T,?> c = new BiRun<>(e, d, this, b, f);
1349 <            bipush(b, c);
1350 <            c.tryFire(SYNC);
1349 >            if (e != null && result != null && b.result != null) {
1350 >                try {
1351 >                    e.execute(c);
1352 >                } catch (Throwable ex) {
1353 >                    d.completeThrowable(ex);
1354 >                }
1355 >            }
1356 >            else {
1357 >                bipush(b, c);
1358 >                c.tryFire(SYNC);
1359 >            }
1360          }
1361          return d;
1362      }
# Line 1423 | Line 1496 | public class CompletableFuture<T> implem
1496          CompletableFuture<V> d = newIncompleteFuture();
1497          if (e != null || !d.orApply(this, b, f, null)) {
1498              OrApply<T,U,V> c = new OrApply<T,U,V>(e, d, this, b, f);
1499 <            orpush(b, c);
1500 <            c.tryFire(SYNC);
1499 >            if (e != null && (result != null || b.result != null)) {
1500 >                try {
1501 >                    e.execute(c);
1502 >                } catch (Throwable ex) {
1503 >                    d.completeThrowable(ex);
1504 >                }
1505 >            }
1506 >            else {
1507 >                orpush(b, c);
1508 >                c.tryFire(SYNC);
1509 >            }
1510          }
1511          return d;
1512      }
# Line 1487 | Line 1569 | public class CompletableFuture<T> implem
1569          CompletableFuture<Void> d = newIncompleteFuture();
1570          if (e != null || !d.orAccept(this, b, f, null)) {
1571              OrAccept<T,U> c = new OrAccept<T,U>(e, d, this, b, f);
1572 <            orpush(b, c);
1573 <            c.tryFire(SYNC);
1572 >            if (e != null && (result != null || b.result != null)) {
1573 >                try {
1574 >                    e.execute(c);
1575 >                } catch (Throwable ex) {
1576 >                    d.completeThrowable(ex);
1577 >                }
1578 >            }
1579 >            else {
1580 >                orpush(b, c);
1581 >                c.tryFire(SYNC);
1582 >            }
1583          }
1584          return d;
1585      }
# Line 1545 | Line 1636 | public class CompletableFuture<T> implem
1636          CompletableFuture<Void> d = newIncompleteFuture();
1637          if (e != null || !d.orRun(this, b, f, null)) {
1638              OrRun<T,?> c = new OrRun<>(e, d, this, b, f);
1639 <            orpush(b, c);
1640 <            c.tryFire(SYNC);
1639 >            if (e != null && (result != null || b.result != null)) {
1640 >                try {
1641 >                    e.execute(c);
1642 >                } catch (Throwable ex) {
1643 >                    d.completeThrowable(ex);
1644 >                }
1645 >            }
1646 >            else {
1647 >                orpush(b, c);
1648 >                c.tryFire(SYNC);
1649 >            }
1650          }
1651          return d;
1652      }
# Line 1611 | Line 1711 | public class CompletableFuture<T> implem
1711  
1712          public final Void getRawResult() { return null; }
1713          public final void setRawResult(Void v) {}
1714 <        public final boolean exec() { run(); return true; }
1714 >        public final boolean exec() { run(); return false; }
1715  
1716          public void run() {
1717              CompletableFuture<T> d; Supplier<? extends T> f;
# Line 1647 | Line 1747 | public class CompletableFuture<T> implem
1747  
1748          public final Void getRawResult() { return null; }
1749          public final void setRawResult(Void v) {}
1750 <        public final boolean exec() { run(); return true; }
1750 >        public final boolean exec() { run(); return false; }
1751  
1752          public void run() {
1753              CompletableFuture<Void> d; Runnable f;
# Line 1731 | Line 1831 | public class CompletableFuture<T> implem
1831      private Object waitingGet(boolean interruptible) {
1832          Signaller q = null;
1833          boolean queued = false;
1734        int spins = SPINS;
1834          Object r;
1835          while ((r = result) == null) {
1836 <            if (spins > 0) {
1738 <                if (ThreadLocalRandom.nextSecondarySeed() >= 0)
1739 <                    --spins;
1740 <            }
1741 <            else if (q == null)
1836 >            if (q == null) {
1837                  q = new Signaller(interruptible, 0L, 0L);
1838 +                if (Thread.currentThread() instanceof ForkJoinWorkerThread)
1839 +                    ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q);
1840 +            }
1841              else if (!queued)
1842                  queued = tryPushStack(q);
1843              else {
# Line 1752 | Line 1850 | public class CompletableFuture<T> implem
1850                      break;
1851              }
1852          }
1853 <        if (q != null) {
1853 >        if (q != null && queued) {
1854              q.thread = null;
1855 <            if (q.interrupted) {
1856 <                if (interruptible)
1857 <                    cleanStack();
1858 <                else
1761 <                    Thread.currentThread().interrupt();
1762 <            }
1855 >            if (!interruptible && q.interrupted)
1856 >                Thread.currentThread().interrupt();
1857 >            if (r == null)
1858 >                cleanStack();
1859          }
1860 <        if (r != null)
1860 >        if (r != null || (r = result) != null)
1861              postComplete();
1862          return r;
1863      }
# Line 1779 | Line 1875 | public class CompletableFuture<T> implem
1875              Signaller q = null;
1876              boolean queued = false;
1877              Object r;
1878 <            while ((r = result) == null) { // similar to untimed, without spins
1879 <                if (q == null)
1878 >            while ((r = result) == null) { // similar to untimed
1879 >                if (q == null) {
1880                      q = new Signaller(true, nanos, deadline);
1881 +                    if (Thread.currentThread() instanceof ForkJoinWorkerThread)
1882 +                        ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q);
1883 +                }
1884                  else if (!queued)
1885                      queued = tryPushStack(q);
1886                  else if (q.nanos <= 0L)
# Line 1796 | Line 1895 | public class CompletableFuture<T> implem
1895                          break;
1896                  }
1897              }
1898 <            if (q != null)
1898 >            if (q != null && queued) {
1899                  q.thread = null;
1900 <            if (r != null)
1900 >                if (r == null)
1901 >                    cleanStack();
1902 >            }
1903 >            if (r != null || (r = result) != null)
1904                  postComplete();
1803            else
1804                cleanStack();
1905              if (r != null || (q != null && q.interrupted))
1906                  return r;
1907          }
# Line 1913 | Line 2013 | public class CompletableFuture<T> implem
2013       * @throws InterruptedException if the current thread was interrupted
2014       * while waiting
2015       */
2016 +    @SuppressWarnings("unchecked")
2017      public T get() throws InterruptedException, ExecutionException {
2018          Object r;
2019 <        return reportGet((r = result) == null ? waitingGet(true) : r);
2019 >        if ((r = result) == null)
2020 >            r = waitingGet(true);
2021 >        return (T) reportGet(r);
2022      }
2023  
2024      /**
# Line 1931 | Line 2034 | public class CompletableFuture<T> implem
2034       * while waiting
2035       * @throws TimeoutException if the wait timed out
2036       */
2037 +    @SuppressWarnings("unchecked")
2038      public T get(long timeout, TimeUnit unit)
2039          throws InterruptedException, ExecutionException, TimeoutException {
1936        Object r;
2040          long nanos = unit.toNanos(timeout);
2041 <        return reportGet((r = result) == null ? timedGet(nanos) : r);
2041 >        Object r;
2042 >        if ((r = result) == null)
2043 >            r = timedGet(nanos);
2044 >        return (T) reportGet(r);
2045      }
2046  
2047      /**
# Line 1952 | Line 2058 | public class CompletableFuture<T> implem
2058       * @throws CompletionException if this future completed
2059       * exceptionally or a completion computation threw an exception
2060       */
2061 +    @SuppressWarnings("unchecked")
2062      public T join() {
2063          Object r;
2064 <        return reportJoin((r = result) == null ? waitingGet(false) : r);
2064 >        if ((r = result) == null)
2065 >            r = waitingGet(false);
2066 >        return (T) reportJoin(r);
2067      }
2068  
2069      /**
# Line 1967 | Line 2076 | public class CompletableFuture<T> implem
2076       * @throws CompletionException if this future completed
2077       * exceptionally or a completion computation threw an exception
2078       */
2079 +    @SuppressWarnings("unchecked")
2080      public T getNow(T valueIfAbsent) {
2081          Object r;
2082 <        return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
2082 >        return ((r = result) == null) ? valueIfAbsent : (T) reportJoin(r);
2083      }
2084  
2085      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines