ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.351 by dl, Wed Dec 5 11:03:24 2018 UTC vs.
Revision 1.352 by jsr166, Thu Dec 13 03:39:10 2018 UTC

# Line 416 | Line 416 | public class ForkJoinPool extends Abstra
416       * if to its current value).  This would be extremely costly. So
417       * we relax it in several ways: (1) Producers only signal when
418       * their queue is possibly empty at some point during a push
419 <     * operation. (2) Other workers propagate this signal
419 >     * operation (which requires conservatively checking size zero or
420 >     * one to cover races). (2) Other workers propagate this signal
421       * when they find tasks in a queue with size greater than one. (3)
422       * Workers only enqueue after scanning (see below) and not finding
423       * any tasks.  (4) Rather than CASing ctl to its current value in
# Line 732 | Line 733 | public class ForkJoinPool extends Abstra
733  
734      /**
735       * The maximum number of top-level polls per worker before
736 <     * checking other queues, expressed as a bit shift.  See above for
737 <     * rationale.
736 >     * checking other queues, expressed as a bit shift to, in effect,
737 >     * multiply by pool size, and then use as random value mask, so
738 >     * average bound is about poolSize*(1<<TOP_BOUND_SHIFT).  See
739 >     * above for rationale.
740       */
741      static final int TOP_BOUND_SHIFT = 10;
742  
# Line 809 | Line 812 | public class ForkJoinPool extends Abstra
812           */
813          final void push(ForkJoinTask<?> task) {
814              ForkJoinTask<?>[] a;
815 <            int s = top, d = s - base, cap, m;
815 >            int s = top, d, cap, m;
816              ForkJoinPool p = pool;
817              if ((a = array) != null && (cap = a.length) > 0) {
818                  QA.setRelease(a, (m = cap - 1) & s, task);
819                  top = s + 1;
820 <                if (d == m)
821 <                    growArray(false);
822 <                else if (QA.getAcquire(a, m & (s - 1)) == null && p != null) {
823 <                    VarHandle.fullFence();  // was empty
821 <                    p.signalWork(null);
820 >                if (((d = s - (int)BASE.getAcquire(this)) & ~1) == 0 &&
821 >                    p != null) {                 // size 0 or 1
822 >                    VarHandle.fullFence();
823 >                    p.signalWork();
824                  }
825 +                else if (d == m)
826 +                    growArray(false);
827              }
828          }
829  
# Line 830 | Line 834 | public class ForkJoinPool extends Abstra
834          final boolean lockedPush(ForkJoinTask<?> task) {
835              ForkJoinTask<?>[] a;
836              boolean signal = false;
837 <            int s = top, d = s - base, cap, m;
837 >            int s = top, b = base, cap, d;
838              if ((a = array) != null && (cap = a.length) > 0) {
839 <                a[(m = (cap - 1)) & s] = task;
839 >                a[(cap - 1) & s] = task;
840                  top = s + 1;
841 <                if (d == m)
841 >                if (b - s + cap - 1 == 0)
842                      growArray(true);
843                  else {
844                      phase = 0; // full volatile unlock
845 <                    if (a[m & (s - 1)] == null)
846 <                        signal = true;   // was empty
845 >                    if (((s - base) & ~1) == 0) // size 0 or 1
846 >                        signal = true;
847                  }
848              }
849              return signal;
# Line 981 | Line 985 | public class ForkJoinPool extends Abstra
985           * queue, up to bound n (to avoid infinite unfairness).
986           */
987          final void topLevelExec(ForkJoinTask<?> t, WorkQueue q, int n) {
988 <            int nstolen = 1;
989 <            for (int j = 0;;) {
990 <                if (t != null)
988 >            if (t != null && q != null) { // hoist checks
989 >                int nstolen = 1;
990 >                for (;;) {
991                      t.doExec();
992 <                if (j++ <= n)
989 <                    t = nextLocalTask();
990 <                else {
991 <                    j = 0;
992 <                    t = null;
993 <                }
994 <                if (t == null) {
995 <                    if (q != null && (t = q.poll()) != null) {
996 <                        ++nstolen;
997 <                        j = 0;
998 <                    }
999 <                    else if (j != 0)
992 >                    if (n-- < 0)
993                          break;
994 +                    else if ((t = nextLocalTask()) == null) {
995 +                        if ((t = q.poll()) == null)
996 +                            break;
997 +                        else
998 +                            ++nstolen;
999 +                    }
1000                  }
1001 +                ForkJoinWorkerThread thread = owner;
1002 +                nsteals += nstolen;
1003 +                source = 0;
1004 +                if (thread != null)
1005 +                    thread.afterTopLevelExec();
1006              }
1003            ForkJoinWorkerThread thread = owner;
1004            nsteals += nstolen;
1005            source = 0;
1006            if (thread != null)
1007                thread.afterTopLevelExec();
1007          }
1008  
1009          /**
# Line 1427 | Line 1426 | public class ForkJoinPool extends Abstra
1426  
1427          if (!tryTerminate(false, false) &&            // possibly replace worker
1428              w != null && w.array != null)             // avoid repeated failures
1429 <            signalWork(null);
1429 >            signalWork();
1430  
1431          if (ex == null)                               // help clean on way out
1432              ForkJoinTask.helpExpungeStaleExceptions();
# Line 1437 | Line 1436 | public class ForkJoinPool extends Abstra
1436  
1437      /**
1438       * Tries to create or release a worker if too few are running.
1440     * @param q if non-null recheck if empty on CAS failure
1439       */
1440 <    final void signalWork(WorkQueue q) {
1440 >    final void signalWork() {
1441          for (;;) {
1442              long c; int sp; WorkQueue[] ws; int i; WorkQueue v;
1443              if ((c = ctl) >= 0L)                      // enough workers
# Line 1466 | Line 1464 | public class ForkJoinPool extends Abstra
1464                          LockSupport.unpark(vt);
1465                      break;
1466                  }
1469                else if (q != null && q.isEmpty())     // no need to retry
1470                    break;
1467              }
1468          }
1469      }
# Line 1588 | Line 1584 | public class ForkJoinPool extends Abstra
1584                  else if (rc <= 0 && (md & SHUTDOWN) != 0 &&
1585                           tryTerminate(false, false))
1586                      break;                        // quiescent shutdown
1587 <                else if (w.phase < 0) {
1588 <                    if (rc <= 0 && pred != 0 && phase == (int)c) {
1589 <                        long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred);
1590 <                        long d = keepAlive + System.currentTimeMillis();
1591 <                        LockSupport.parkUntil(this, d);
1592 <                        if (ctl == c &&           // drop on timeout if all idle
1593 <                            d - System.currentTimeMillis() <= TIMEOUT_SLOP &&
1594 <                            CTL.compareAndSet(this, c, nc)) {
1595 <                            w.phase = QUIET;
1600 <                            break;
1601 <                        }
1602 <                    }
1603 <                    else {
1604 <                        LockSupport.park(this);
1605 <                        if (w.phase < 0)          // one spurious wakeup check
1606 <                            LockSupport.park(this);
1587 >                else if (rc <= 0 && pred != 0 && phase == (int)c) {
1588 >                    long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred);
1589 >                    long d = keepAlive + System.currentTimeMillis();
1590 >                    LockSupport.parkUntil(this, d);
1591 >                    if (ctl == c &&               // drop on timeout if all idle
1592 >                        d - System.currentTimeMillis() <= TIMEOUT_SLOP &&
1593 >                        CTL.compareAndSet(this, c, nc)) {
1594 >                        w.phase = QUIET;
1595 >                        break;
1596                      }
1597                  }
1598 +                else if (w.phase < 0)
1599 +                    LockSupport.park(this);       // OK if spuriously woken
1600                  w.source = 0;                     // disable signal
1601              }
1602          }
# Line 1621 | Line 1612 | public class ForkJoinPool extends Abstra
1612          WorkQueue[] ws; int n;
1613          if ((ws = workQueues) != null && (n = ws.length) > 0 && w != null) {
1614              for (int m = n - 1, j = r & m;;) {
1615 <                WorkQueue q; int b, s;
1616 <                if ((q = ws[j]) != null && (s = q.top) != (b = q.base)) {
1615 >                WorkQueue q; int b;
1616 >                if ((q = ws[j]) != null && q.top != (b = q.base)) {
1617                      int qid = q.id;
1618                      ForkJoinTask<?>[] a; int cap, k; ForkJoinTask<?> t;
1619                      if ((a = q.array) != null && (cap = a.length) > 0) {
# Line 1631 | Line 1622 | public class ForkJoinPool extends Abstra
1622                              QA.compareAndSet(a, k, t, null)) {
1623                              q.base = b;
1624                              w.source = qid;
1625 <                            if (s != b && a[(cap - 1) & b] != null)
1626 <                                signalWork(q);    // help signal if more tasks
1625 >                            if (q.top - b > 0)
1626 >                                signalWork();
1627                              w.topLevelExec(t, q,  // random fairness bound
1628 <                                           (r | (1 << TOP_BOUND_SHIFT)) & SMASK);
1628 >                                           r & ((n << TOP_BOUND_SHIFT) - 1));
1629                          }
1630                      }
1631                      return true;
# Line 1880 | Line 1871 | public class ForkJoinPool extends Abstra
1871                  r = ThreadLocalRandom.advanceProbe(r);
1872              else {
1873                  if (q.lockedPush(task))
1874 <                    signalWork(null);
1874 >                    signalWork();
1875                  return;
1876              }
1877          }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines