ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk8/java/util/concurrent/CompletableFuture.java
(Generate patch)

Comparing jsr166/src/jdk8/java/util/concurrent/CompletableFuture.java (file contents):
Revision 1.1 by jsr166, Sat Mar 26 06:22:49 2016 UTC vs.
Revision 1.4 by dl, Tue Apr 5 22:53:24 2016 UTC

# Line 207 | Line 207 | public class CompletableFuture<T> implem
207       * pointing back to its sources. So we null out fields as soon as
208       * possible.  The screening checks needed anyway harmlessly ignore
209       * null arguments that may have been obtained during races with
210 <     * threads nulling out fields.  We also try to unlink fired
211 <     * Completions from stacks that might never be popped (see method
212 <     * postFire).  Completion fields need not be declared as final or
213 <     * volatile because they are only visible to other threads upon
214 <     * safe publication.
210 >     * threads nulling out fields.  We also try to unlink non-isLive
211 >     * (fired or cancelled) Completions from stacks that might
212 >     * otherwise never be popped: Method cleanStack always unlinks non
213 >     * isLive completions from the head of stack; others may
214 >     * occasionally remain if racing with other cancellations or
215 >     * removals.
216 >     *
217 >     * Completion fields need not be declared as final or volatile
218 >     * because they are only visible to other threads upon safe
219 >     * publication.
220       */
221  
222      volatile Object result;       // Either the result or boxed AltResult
# Line 420 | Line 425 | public class CompletableFuture<T> implem
425      static final int ASYNC  =  1;
426      static final int NESTED = -1;
427  
423    /**
424     * Spins before blocking in waitingGet
425     */
426    static final int SPINS = (Runtime.getRuntime().availableProcessors() > 1 ?
427                              1 << 8 : 0);
428
428      /* ------------- Base Completion classes and operations -------------- */
429  
430      @SuppressWarnings("serial")
# Line 454 | Line 453 | public class CompletableFuture<T> implem
453          U.putOrderedObject(c, NEXT, next);
454      }
455  
456 +    static boolean casNext(Completion c, Completion cmp, Completion val) {
457 +        return U.compareAndSwapObject(c, NEXT, cmp, val);
458 +    }
459 +
460      /**
461       * Pops and tries to trigger all reachable dependents.  Call only
462       * when known to be done.
# Line 474 | Line 477 | public class CompletableFuture<T> implem
477                          pushStack(h);
478                          continue;
479                      }
480 <                    h.next = null;    // detach
480 >                    casNext(h, t, null);    // try to detach
481                  }
482                  f = (d = h.tryFire(NESTED)) == null ? this : d;
483              }
484          }
485      }
486  
487 <    /** Traverses stack and unlinks dead Completions. */
487 >    /** Traverses stack and unlinks one or more dead Completions, if found. */
488      final void cleanStack() {
489 <        for (Completion p = null, q = stack; q != null;) {
490 <            Completion s = q.next;
491 <            if (q.isLive()) {
492 <                p = q;
493 <                q = s;
494 <            }
495 <            else if (p == null) {
496 <                casStack(q, s);
497 <                q = stack;
495 <            }
496 <            else {
497 <                p.next = s;
498 <                if (p.isLive())
489 >        boolean unlinked = false;
490 >        Completion p;
491 >        while ((p = stack) != null && !p.isLive()) // ensure head of stack live
492 >            unlinked = casStack(p, p.next);
493 >        if (p != null && !unlinked) {              // try to unlink first nonlive
494 >            for (Completion q = p.next; q != null;) {
495 >                Completion s = q.next;
496 >                if (q.isLive()) {
497 >                    p = q;
498                      q = s;
499 +                }
500                  else {
501 <                    p = null;  // restart
502 <                    q = stack;
501 >                    casNext(p, q, s);
502 >                    break;
503                  }
504              }
505          }
# Line 554 | Line 554 | public class CompletableFuture<T> implem
554       */
555      final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
556          if (a != null && a.stack != null) {
557 <            if (a.result == null)
557 >            Object r;
558 >            if ((r = a.result) == null)
559                  a.cleanStack();
560 <            else if (mode >= 0)
560 >            if (mode >= 0 && (r != null || a.result != null))
561                  a.postComplete();
562          }
563          if (result != null && stack != null) {
# Line 618 | Line 619 | public class CompletableFuture<T> implem
619          CompletableFuture<V> d = newIncompleteFuture();
620          if (e != null || !d.uniApply(this, f, null)) {
621              UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
622 <            push(c);
623 <            c.tryFire(SYNC);
622 >            if (e != null && result != null) {
623 >                try {
624 >                    e.execute(c);
625 >                } catch (Throwable ex) {
626 >                    d.completeThrowable(ex);
627 >                }
628 >            }
629 >            else {
630 >                push(c);
631 >                c.tryFire(SYNC);
632 >            }
633          }
634          return d;
635      }
# Line 673 | Line 683 | public class CompletableFuture<T> implem
683          CompletableFuture<Void> d = newIncompleteFuture();
684          if (e != null || !d.uniAccept(this, f, null)) {
685              UniAccept<T> c = new UniAccept<T>(e, d, this, f);
686 <            push(c);
687 <            c.tryFire(SYNC);
686 >            if (e != null && result != null) {
687 >                try {
688 >                    e.execute(c);
689 >                } catch (Throwable ex) {
690 >                    d.completeThrowable(ex);
691 >                }
692 >            }
693 >            else {
694 >                push(c);
695 >                c.tryFire(SYNC);
696 >            }
697          }
698          return d;
699      }
# Line 721 | Line 740 | public class CompletableFuture<T> implem
740          CompletableFuture<Void> d = newIncompleteFuture();
741          if (e != null || !d.uniRun(this, f, null)) {
742              UniRun<T> c = new UniRun<T>(e, d, this, f);
743 <            push(c);
744 <            c.tryFire(SYNC);
743 >            if (e != null && result != null) {
744 >                try {
745 >                    e.execute(c);
746 >                } catch (Throwable ex) {
747 >                    d.completeThrowable(ex);
748 >                }
749 >            }
750 >            else {
751 >                push(c);
752 >                c.tryFire(SYNC);
753 >            }
754          }
755          return d;
756      }
# Line 784 | Line 812 | public class CompletableFuture<T> implem
812          CompletableFuture<T> d = newIncompleteFuture();
813          if (e != null || !d.uniWhenComplete(this, f, null)) {
814              UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f);
815 <            push(c);
816 <            c.tryFire(SYNC);
815 >            if (e != null && result != null) {
816 >                try {
817 >                    e.execute(c);
818 >                } catch (Throwable ex) {
819 >                    d.completeThrowable(ex);
820 >                }
821 >            }
822 >            else {
823 >                push(c);
824 >                c.tryFire(SYNC);
825 >            }
826          }
827          return d;
828      }
# Line 840 | Line 877 | public class CompletableFuture<T> implem
877          CompletableFuture<V> d = newIncompleteFuture();
878          if (e != null || !d.uniHandle(this, f, null)) {
879              UniHandle<T,V> c = new UniHandle<T,V>(e, d, this, f);
880 <            push(c);
881 <            c.tryFire(SYNC);
880 >            if (e != null && result != null) {
881 >                try {
882 >                    e.execute(c);
883 >                } catch (Throwable ex) {
884 >                    d.completeThrowable(ex);
885 >                }
886 >            }
887 >            else {
888 >                push(c);
889 >                c.tryFire(SYNC);
890 >            }
891          }
892          return d;
893      }
# Line 1000 | Line 1046 | public class CompletableFuture<T> implem
1046          if (f == null) throw new NullPointerException();
1047          Object r, s; Throwable x;
1048          CompletableFuture<V> d = newIncompleteFuture();
1049 <        if (e == null && (r = result) != null) {
1049 >        if ((r = result) != null && e == null) {
1050              if (r instanceof AltResult) {
1051                  if ((x = ((AltResult)r).ex) != null) {
1052                      d.result = encodeThrowable(x, r);
# Line 1025 | Line 1071 | public class CompletableFuture<T> implem
1071              }
1072          }
1073          UniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f);
1074 <        push(c);
1075 <        c.tryFire(SYNC);
1074 >        if (r != null && e != null) {
1075 >            try {
1076 >                e.execute(new UniCompose<T,V>(null, d, this, f));
1077 >            } catch (Throwable ex) {
1078 >                d.completeThrowable(ex);
1079 >            }
1080 >        }
1081 >        else {
1082 >            push(c);
1083 >            c.tryFire(SYNC);
1084 >        }
1085          return d;
1086      }
1087  
# Line 1078 | Line 1133 | public class CompletableFuture<T> implem
1133      final CompletableFuture<T> postFire(CompletableFuture<?> a,
1134                                          CompletableFuture<?> b, int mode) {
1135          if (b != null && b.stack != null) { // clean second source
1136 <            if (b.result == null)
1136 >            Object r;
1137 >            if ((r = b.result) == null)
1138                  b.cleanStack();
1139 <            else if (mode >= 0)
1139 >            if (mode >= 0 && (r != null || b.result != null))
1140                  b.postComplete();
1141          }
1142          return postFire(a, mode);
# Line 1151 | Line 1207 | public class CompletableFuture<T> implem
1207          CompletableFuture<V> d = newIncompleteFuture();
1208          if (e != null || !d.biApply(this, b, f, null)) {
1209              BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f);
1210 <            bipush(b, c);
1211 <            c.tryFire(SYNC);
1210 >            if (e != null && result != null && b.result != null) {
1211 >                try {
1212 >                    e.execute(c);
1213 >                } catch (Throwable ex) {
1214 >                    d.completeThrowable(ex);
1215 >                }
1216 >            }
1217 >            else {
1218 >                bipush(b, c);
1219 >                c.tryFire(SYNC);
1220 >            }
1221          }
1222          return d;
1223      }
# Line 1223 | Line 1288 | public class CompletableFuture<T> implem
1288          CompletableFuture<Void> d = newIncompleteFuture();
1289          if (e != null || !d.biAccept(this, b, f, null)) {
1290              BiAccept<T,U> c = new BiAccept<T,U>(e, d, this, b, f);
1291 <            bipush(b, c);
1292 <            c.tryFire(SYNC);
1291 >            if (e != null && result != null && b.result != null) {
1292 >                try {
1293 >                    e.execute(c);
1294 >                } catch (Throwable ex) {
1295 >                    d.completeThrowable(ex);
1296 >                }
1297 >            }
1298 >            else {
1299 >                bipush(b, c);
1300 >                c.tryFire(SYNC);
1301 >            }
1302          }
1303          return d;
1304      }
# Line 1282 | Line 1356 | public class CompletableFuture<T> implem
1356          CompletableFuture<Void> d = newIncompleteFuture();
1357          if (e != null || !d.biRun(this, b, f, null)) {
1358              BiRun<T,?> c = new BiRun<>(e, d, this, b, f);
1359 <            bipush(b, c);
1360 <            c.tryFire(SYNC);
1359 >            if (e != null && result != null && b.result != null) {
1360 >                try {
1361 >                    e.execute(c);
1362 >                } catch (Throwable ex) {
1363 >                    d.completeThrowable(ex);
1364 >                }
1365 >            }
1366 >            else {
1367 >                bipush(b, c);
1368 >                c.tryFire(SYNC);
1369 >            }
1370          }
1371          return d;
1372      }
# Line 1423 | Line 1506 | public class CompletableFuture<T> implem
1506          CompletableFuture<V> d = newIncompleteFuture();
1507          if (e != null || !d.orApply(this, b, f, null)) {
1508              OrApply<T,U,V> c = new OrApply<T,U,V>(e, d, this, b, f);
1509 <            orpush(b, c);
1510 <            c.tryFire(SYNC);
1509 >            if (e != null && (result != null || b.result != null)) {
1510 >                try {
1511 >                    e.execute(c);
1512 >                } catch (Throwable ex) {
1513 >                    d.completeThrowable(ex);
1514 >                }
1515 >            }
1516 >            else {
1517 >                orpush(b, c);
1518 >                c.tryFire(SYNC);
1519 >            }
1520          }
1521          return d;
1522      }
# Line 1487 | Line 1579 | public class CompletableFuture<T> implem
1579          CompletableFuture<Void> d = newIncompleteFuture();
1580          if (e != null || !d.orAccept(this, b, f, null)) {
1581              OrAccept<T,U> c = new OrAccept<T,U>(e, d, this, b, f);
1582 <            orpush(b, c);
1583 <            c.tryFire(SYNC);
1582 >            if (e != null && (result != null || b.result != null)) {
1583 >                try {
1584 >                    e.execute(c);
1585 >                } catch (Throwable ex) {
1586 >                    d.completeThrowable(ex);
1587 >                }
1588 >            }
1589 >            else {
1590 >                orpush(b, c);
1591 >                c.tryFire(SYNC);
1592 >            }
1593          }
1594          return d;
1595      }
# Line 1545 | Line 1646 | public class CompletableFuture<T> implem
1646          CompletableFuture<Void> d = newIncompleteFuture();
1647          if (e != null || !d.orRun(this, b, f, null)) {
1648              OrRun<T,?> c = new OrRun<>(e, d, this, b, f);
1649 <            orpush(b, c);
1650 <            c.tryFire(SYNC);
1649 >            if (e != null && (result != null || b.result != null)) {
1650 >                try {
1651 >                    e.execute(c);
1652 >                } catch (Throwable ex) {
1653 >                    d.completeThrowable(ex);
1654 >                }
1655 >            }
1656 >            else {
1657 >                orpush(b, c);
1658 >                c.tryFire(SYNC);
1659 >            }
1660          }
1661          return d;
1662      }
# Line 1731 | Line 1841 | public class CompletableFuture<T> implem
1841      private Object waitingGet(boolean interruptible) {
1842          Signaller q = null;
1843          boolean queued = false;
1734        int spins = SPINS;
1844          Object r;
1845          while ((r = result) == null) {
1846 <            if (spins > 0) {
1738 <                if (ThreadLocalRandom.nextSecondarySeed() >= 0)
1739 <                    --spins;
1740 <            }
1741 <            else if (q == null)
1846 >            if (q == null) {
1847                  q = new Signaller(interruptible, 0L, 0L);
1848 +                if (Thread.currentThread() instanceof ForkJoinWorkerThread)
1849 +                    ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q);
1850 +            }
1851              else if (!queued)
1852                  queued = tryPushStack(q);
1853              else {
# Line 1752 | Line 1860 | public class CompletableFuture<T> implem
1860                      break;
1861              }
1862          }
1863 <        if (q != null) {
1863 >        if (q != null && queued) {
1864              q.thread = null;
1865 <            if (q.interrupted) {
1866 <                if (interruptible)
1867 <                    cleanStack();
1868 <                else
1761 <                    Thread.currentThread().interrupt();
1762 <            }
1865 >            if (!interruptible && q.interrupted)
1866 >                Thread.currentThread().interrupt();
1867 >            if (r == null)
1868 >                cleanStack();
1869          }
1870 <        if (r != null)
1870 >        if (r != null || (r = result) != null)
1871              postComplete();
1872          return r;
1873      }
# Line 1779 | Line 1885 | public class CompletableFuture<T> implem
1885              Signaller q = null;
1886              boolean queued = false;
1887              Object r;
1888 <            while ((r = result) == null) { // similar to untimed, without spins
1889 <                if (q == null)
1888 >            while ((r = result) == null) { // similar to untimed
1889 >                if (q == null) {
1890                      q = new Signaller(true, nanos, deadline);
1891 +                    if (Thread.currentThread() instanceof ForkJoinWorkerThread)
1892 +                        ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q);
1893 +                }
1894                  else if (!queued)
1895                      queued = tryPushStack(q);
1896                  else if (q.nanos <= 0L)
# Line 1796 | Line 1905 | public class CompletableFuture<T> implem
1905                          break;
1906                  }
1907              }
1908 <            if (q != null)
1908 >            if (q != null && queued) {
1909                  q.thread = null;
1910 <            if (r != null)
1910 >                if (r == null)
1911 >                    cleanStack();
1912 >            }
1913 >            if (r != null || (r = result) != null)
1914                  postComplete();
1803            else
1804                cleanStack();
1915              if (r != null || (q != null && q.interrupted))
1916                  return r;
1917          }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines