ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.352 by jsr166, Thu Dec 13 03:39:10 2018 UTC vs.
Revision 1.353 by dl, Thu Dec 13 12:10:11 2018 UTC

# Line 416 | Line 416 | public class ForkJoinPool extends Abstra
416       * if to its current value).  This would be extremely costly. So
417       * we relax it in several ways: (1) Producers only signal when
418       * their queue is possibly empty at some point during a push
419 <     * operation (which requires conservatively checking size zero or
420 <     * one to cover races). (2) Other workers propagate this signal
419 >     * operation. (2) Other workers propagate this signal
420       * when they find tasks in a queue with size greater than one. (3)
421       * Workers only enqueue after scanning (see below) and not finding
422       * any tasks.  (4) Rather than CASing ctl to its current value in
# Line 733 | Line 732 | public class ForkJoinPool extends Abstra
732  
733      /**
734       * The maximum number of top-level polls per worker before
735 <     * checking other queues, expressed as a bit shift to, in effect,
736 <     * multiply by pool size, and then use as random value mask, so
738 <     * average bound is about poolSize*(1<<TOP_BOUND_SHIFT).  See
739 <     * above for rationale.
735 >     * checking other queues, expressed as a bit shift.  See above for
736 >     * rationale.
737       */
738      static final int TOP_BOUND_SHIFT = 10;
739  
# Line 812 | Line 809 | public class ForkJoinPool extends Abstra
809           */
810          final void push(ForkJoinTask<?> task) {
811              ForkJoinTask<?>[] a;
812 <            int s = top, d, cap, m;
812 >            int s = top, d = s - base, cap, m;
813              ForkJoinPool p = pool;
814              if ((a = array) != null && (cap = a.length) > 0) {
815                  QA.setRelease(a, (m = cap - 1) & s, task);
816                  top = s + 1;
817 <                if (((d = s - (int)BASE.getAcquire(this)) & ~1) == 0 &&
821 <                    p != null) {                 // size 0 or 1
822 <                    VarHandle.fullFence();
823 <                    p.signalWork();
824 <                }
825 <                else if (d == m)
817 >                if (d == m)
818                      growArray(false);
819 +                else if (QA.getAcquire(a, m & (s - 1)) == null && p != null) {
820 +                    VarHandle.fullFence();  // was empty
821 +                    p.signalWork(null);
822 +                }
823              }
824          }
825  
# Line 834 | Line 830 | public class ForkJoinPool extends Abstra
830          final boolean lockedPush(ForkJoinTask<?> task) {
831              ForkJoinTask<?>[] a;
832              boolean signal = false;
833 <            int s = top, b = base, cap, d;
833 >            int s = top, d = s - base, cap, m;
834              if ((a = array) != null && (cap = a.length) > 0) {
835 <                a[(cap - 1) & s] = task;
835 >                a[(m = (cap - 1)) & s] = task;
836                  top = s + 1;
837 <                if (b - s + cap - 1 == 0)
837 >                if (d == m)
838                      growArray(true);
839                  else {
840                      phase = 0; // full volatile unlock
841 <                    if (((s - base) & ~1) == 0) // size 0 or 1
842 <                        signal = true;
841 >                    if (a[m & (s - 1)] == null)
842 >                        signal = true;   // was empty
843                  }
844              }
845              return signal;
# Line 985 | Line 981 | public class ForkJoinPool extends Abstra
981           * queue, up to bound n (to avoid infinite unfairness).
982           */
983          final void topLevelExec(ForkJoinTask<?> t, WorkQueue q, int n) {
984 <            if (t != null && q != null) { // hoist checks
985 <                int nstolen = 1;
986 <                for (;;) {
984 >            int nstolen = 1;
985 >            for (int j = 0;;) {
986 >                if (t != null)
987                      t.doExec();
988 <                    if (n-- < 0)
989 <                        break;
990 <                    else if ((t = nextLocalTask()) == null) {
991 <                        if ((t = q.poll()) == null)
992 <                            break;
993 <                        else
994 <                            ++nstolen;
988 >                if (j++ <= n)
989 >                    t = nextLocalTask();
990 >                else {
991 >                    j = 0;
992 >                    t = null;
993 >                }
994 >                if (t == null) {
995 >                    if (q != null && (t = q.poll()) != null) {
996 >                        ++nstolen;
997 >                        j = 0;
998                      }
999 +                    else if (j != 0)
1000 +                        break;
1001                  }
1001                ForkJoinWorkerThread thread = owner;
1002                nsteals += nstolen;
1003                source = 0;
1004                if (thread != null)
1005                    thread.afterTopLevelExec();
1002              }
1003 +            ForkJoinWorkerThread thread = owner;
1004 +            nsteals += nstolen;
1005 +            source = 0;
1006 +            if (thread != null)
1007 +                thread.afterTopLevelExec();
1008          }
1009  
1010          /**
# Line 1426 | Line 1427 | public class ForkJoinPool extends Abstra
1427  
1428          if (!tryTerminate(false, false) &&            // possibly replace worker
1429              w != null && w.array != null)             // avoid repeated failures
1430 <            signalWork();
1430 >            signalWork(null);
1431  
1432          if (ex == null)                               // help clean on way out
1433              ForkJoinTask.helpExpungeStaleExceptions();
# Line 1436 | Line 1437 | public class ForkJoinPool extends Abstra
1437  
1438      /**
1439       * Tries to create or release a worker if too few are running.
1440 +     * @param q if non-null recheck if empty on CAS failure
1441       */
1442 <    final void signalWork() {
1442 >    final void signalWork(WorkQueue q) {
1443          for (;;) {
1444              long c; int sp; WorkQueue[] ws; int i; WorkQueue v;
1445              if ((c = ctl) >= 0L)                      // enough workers
# Line 1464 | Line 1466 | public class ForkJoinPool extends Abstra
1466                          LockSupport.unpark(vt);
1467                      break;
1468                  }
1469 +                else if (q != null && q.isEmpty())     // no need to retry
1470 +                    break;
1471              }
1472          }
1473      }
# Line 1584 | Line 1588 | public class ForkJoinPool extends Abstra
1588                  else if (rc <= 0 && (md & SHUTDOWN) != 0 &&
1589                           tryTerminate(false, false))
1590                      break;                        // quiescent shutdown
1591 <                else if (rc <= 0 && pred != 0 && phase == (int)c) {
1592 <                    long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred);
1593 <                    long d = keepAlive + System.currentTimeMillis();
1594 <                    LockSupport.parkUntil(this, d);
1595 <                    if (ctl == c &&               // drop on timeout if all idle
1596 <                        d - System.currentTimeMillis() <= TIMEOUT_SLOP &&
1597 <                        CTL.compareAndSet(this, c, nc)) {
1598 <                        w.phase = QUIET;
1599 <                        break;
1591 >                else if (w.phase < 0) {
1592 >                    if (rc <= 0 && pred != 0 && phase == (int)c) {
1593 >                        long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred);
1594 >                        long d = keepAlive + System.currentTimeMillis();
1595 >                        LockSupport.parkUntil(this, d);
1596 >                        if (ctl == c &&           // drop on timeout if all idle
1597 >                            d - System.currentTimeMillis() <= TIMEOUT_SLOP &&
1598 >                            CTL.compareAndSet(this, c, nc)) {
1599 >                            w.phase = QUIET;
1600 >                            break;
1601 >                        }
1602 >                    }
1603 >                    else {
1604 >                        LockSupport.park(this);
1605 >                        if (w.phase < 0)          // one spurious wakeup check
1606 >                            LockSupport.park(this);
1607                      }
1608                  }
1598                else if (w.phase < 0)
1599                    LockSupport.park(this);       // OK if spuriously woken
1609                  w.source = 0;                     // disable signal
1610              }
1611          }
# Line 1622 | Line 1631 | public class ForkJoinPool extends Abstra
1631                              QA.compareAndSet(a, k, t, null)) {
1632                              q.base = b;
1633                              w.source = qid;
1634 <                            if (q.top - b > 0)
1635 <                                signalWork();
1634 >                            if (a[(cap - 1) & b] != null)
1635 >                                signalWork(q);    // help signal if more tasks
1636                              w.topLevelExec(t, q,  // random fairness bound
1637 <                                           r & ((n << TOP_BOUND_SHIFT) - 1));
1637 >                                           (r | (1 << TOP_BOUND_SHIFT)) & SMASK);
1638                          }
1639                      }
1640                      return true;
# Line 1871 | Line 1880 | public class ForkJoinPool extends Abstra
1880                  r = ThreadLocalRandom.advanceProbe(r);
1881              else {
1882                  if (q.lockedPush(task))
1883 <                    signalWork();
1883 >                    signalWork(null);
1884                  return;
1885              }
1886          }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines