ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.345 by dl, Mon Feb 12 20:01:40 2018 UTC vs.
Revision 1.346 by dl, Sun Feb 18 21:31:55 2018 UTC

# Line 416 | Line 416 | public class ForkJoinPool extends Abstra
416       * if to its current value).  This would be extremely costly. So
417       * we relax it in several ways: (1) Producers only signal when
418       * their queue is possibly empty at some point during a push
419 <     * operation. (2) Other workers propagate this signal when they
420 <     * find tasks. (3) Workers only enqueue after scanning (see below)
421 <     * and not finding any tasks.  (4) Rather than CASing ctl to its
422 <     * current value in the common case where no action is required,
423 <     * we reduce write contention by equivalently prefacing signalWork
424 <     * when called by an external task producer using a memory access
425 <     * with full-volatile semantics or a "fullFence".
426 <     *
427 <     * Almost always, too many signals are issued. A task producer
428 <     * cannot in general tell if some existing worker is in the midst
429 <     * of finishing one task (or already scanning) and ready to take
430 <     * another without being signalled. So the producer might instead
431 <     * activate a different worker that does not find any work, and
432 <     * then inactivates. This scarcely matters in steady-state
433 <     * computations involving all workers, but can create contention
434 <     * and bookkeeping bottlenecks during ramp-up, ramp-down, and small
435 <     * computations involving only a few workers.
419 >     * operation (which requires conservatively checking size zero or
420 >     * one to cover races). (2) Other workers propagate this signal
421 >     * when they find tasks in a queue with size greater than one. (3)
422 >     * Workers only enqueue after scanning (see below) and not finding
423 >     * any tasks.  (4) Rather than CASing ctl to its current value in
424 >     * the common case where no action is required, we reduce write
425 >     * contention by equivalently prefacing signalWork when called by
426 >     * an external task producer using a memory access with
427 >     * full-volatile semantics or a "fullFence".
428 >     *
429 >     * Almost always, too many signals are issued, in part because a
430 >     * task producer cannot tell if some existing worker is in the
431 >     * midst of finishing one task (or already scanning) and ready to
432 >     * take another without being signalled. So the producer might
433 >     * instead activate a different worker that does not find any
434 >     * work, and then inactivates. This scarcely matters in
435 >     * steady-state computations involving all workers, but can create
436 >     * contention and bookkeeping bottlenecks during ramp-up,
437 >     * ramp-down, and small computations involving only a few workers.
438       *
439       * Scanning. Method scan (from runWorker) performs top-level
440       * scanning for tasks. (Similar scans appear in helpQuiesce and
# Line 810 | Line 812 | public class ForkJoinPool extends Abstra
812           */
813          final void push(ForkJoinTask<?> task) {
814              ForkJoinTask<?>[] a;
815 <            int s = top, d, cap;
815 >            int s = top, d, cap, m;
816              ForkJoinPool p = pool;
817              if ((a = array) != null && (cap = a.length) > 0) {
818 <                QA.setRelease(a, (cap - 1) & s, task);
818 >                QA.setRelease(a, (m = cap - 1) & s, task);
819                  top = s + 1;
820 <                if ((d = (int)BASE.getAcquire(this) - s) == 0 && p != null) {
820 >                if (((d = s - (int)BASE.getAcquire(this)) & ~1) == 0 &&
821 >                    p != null) {                 // size 0 or 1
822                      VarHandle.fullFence();
823                      p.signalWork();
824                  }
825 <                else if (d + cap - 1 == 0)
825 >                else if (d == m)
826                      growArray(false);
827              }
828          }
# Line 831 | Line 834 | public class ForkJoinPool extends Abstra
834          final boolean lockedPush(ForkJoinTask<?> task) {
835              ForkJoinTask<?>[] a;
836              boolean signal = false;
837 <            int s = top, b = base, cap;
837 >            int s = top, b = base, cap, d;
838              if ((a = array) != null && (cap = a.length) > 0) {
839                  a[(cap - 1) & s] = task;
840                  top = s + 1;
# Line 839 | Line 842 | public class ForkJoinPool extends Abstra
842                      growArray(true);
843                  else {
844                      phase = 0; // full volatile unlock
845 <                    if (base == s)
845 >                    if (((s - base) & ~1) == 0) // size 0 or 1
846                          signal = true;
847                  }
848              }
# Line 890 | Line 893 | public class ForkJoinPool extends Abstra
893          final ForkJoinTask<?> poll() {
894              int b, k, cap; ForkJoinTask<?>[] a;
895              while ((a = array) != null && (cap = a.length) > 0 &&
896 <                   (b = base) != top) {
896 >                   top - (b = base) > 0) {
897                  ForkJoinTask<?> t = (ForkJoinTask<?>)
898                      QA.getAcquire(a, k = (cap - 1) & b);
899                  if (base == b++) {
# Line 982 | Line 985 | public class ForkJoinPool extends Abstra
985           * queue, up to bound n (to avoid infinite unfairness).
986           */
987          final void topLevelExec(ForkJoinTask<?> t, WorkQueue q, int n) {
988 <            if (t != null) {
988 >            if (t != null && q != null) { // hoist checks
989                  int nstolen = 1;
990                  for (;;) {
991                      t.doExec();
992                      if (n-- < 0)
993                          break;
994                      else if ((t = nextLocalTask()) == null) {
995 <                        if (q != null && (t = q.poll()) != null)
993 <                            ++nstolen;
994 <                        else
995 >                        if ((t = q.poll()) == null)
996                              break;
997 +                        else
998 +                            ++nstolen;
999                      }
1000                  }
1001                  ForkJoinWorkerThread thread = owner;
# Line 1009 | Line 1012 | public class ForkJoinPool extends Abstra
1012          final void tryRemoveAndExec(ForkJoinTask<?> task) {
1013              ForkJoinTask<?>[] a; int s, cap;
1014              if ((a = array) != null && (cap = a.length) > 0 &&
1015 <                base - (s = top) < 0) { // traverse from top
1015 >                (s = top) - base > 0) { // traverse from top
1016                  for (int m = cap - 1, ns = s - 1, i = ns; ; --i) {
1017                      int index = i & m;
1018                      ForkJoinTask<?> t = (ForkJoinTask<?>)QA.get(a, index);
# Line 1049 | Line 1052 | public class ForkJoinPool extends Abstra
1052              if (task != null && (status = task.status) >= 0) {
1053                  int s, k, cap; ForkJoinTask<?>[] a;
1054                  while ((a = array) != null && (cap = a.length) > 0 &&
1055 <                       (s = top) != base) {
1055 >                       (s = top) - base > 0) {
1056                      CountedCompleter<?> v = null;
1057                      ForkJoinTask<?> o = a[k = (cap - 1) & (s - 1)];
1058                      if (o instanceof CountedCompleter) {
# Line 1099 | Line 1102 | public class ForkJoinPool extends Abstra
1102              if (blocker != null) {
1103                  int b, k, cap; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
1104                  while ((a = array) != null && (cap = a.length) > 0 &&
1105 <                       (b = base) != top) {
1105 >                       top - (b = base) > 0) {
1106                      t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) & b);
1107                      if (blocker.isReleasable())
1108                          break;
# Line 1609 | Line 1612 | public class ForkJoinPool extends Abstra
1612          WorkQueue[] ws; int n;
1613          if ((ws = workQueues) != null && (n = ws.length) > 0 && w != null) {
1614              for (int m = n - 1, j = r & m;;) {
1615 <                WorkQueue q; int b, d;
1616 <                if ((q = ws[j]) != null && (d = (b = q.base) - q.top) != 0) {
1615 >                WorkQueue q; int b;
1616 >                if ((q = ws[j]) != null && q.top != (b = q.base)) {
1617                      int qid = q.id;
1618                      ForkJoinTask<?>[] a; int cap, k; ForkJoinTask<?> t;
1619                      if ((a = q.array) != null && (cap = a.length) > 0) {
# Line 1619 | Line 1622 | public class ForkJoinPool extends Abstra
1622                              QA.compareAndSet(a, k, t, null)) {
1623                              q.base = b;
1624                              w.source = qid;
1625 <                            if (d != -1 || b != q.top)
1625 >                            if (q.top - b > 0)
1626                                  signalWork();
1627                              w.topLevelExec(t, q,  // random fairness bound
1628                                             r & ((n << TOP_BOUND_SHIFT) - 1));
# Line 1663 | Line 1666 | public class ForkJoinPool extends Abstra
1666                  while (n > 0) {
1667                      WorkQueue q; int b;
1668                      if ((q = ws[r & m]) != null && q.source == id &&
1669 <                        (b = q.base) != q.top) {
1669 >                        q.top != (b = q.base)) {
1670                          ForkJoinTask<?>[] a; int cap, k;
1671                          int qid = q.id;
1672                          if ((a = q.array) != null && (cap = a.length) > 0) {
# Line 1726 | Line 1729 | public class ForkJoinPool extends Abstra
1729                  WorkQueue q; int b;
1730                  if ((q = ws[r & m]) != null) {
1731                      int qs = q.source;
1732 <                    if ((b = q.base) != q.top) {
1732 >                    if (q.top != (b = q.base)) {
1733                          quiet = empty = false;
1734                          ForkJoinTask<?>[] a; int cap, k;
1735                          int qid = q.id;
# Line 1795 | Line 1798 | public class ForkJoinPool extends Abstra
1798                  WorkQueue q;
1799                  if ((q = ws[i]) != null) {
1800                      int b; ForkJoinTask<?> t;
1801 <                    if ((b = q.base) != q.top) {
1801 >                    if (q.top - (b = q.base) > 0) {
1802                          nonempty = true;
1803                          if ((t = q.poll()) != null)
1804                              return t;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines