ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.255 by dl, Wed Aug 5 13:31:36 2015 UTC vs.
Revision 1.256 by dl, Thu Aug 6 20:40:42 2015 UTC

# Line 393 | Line 393 | public class ForkJoinPool extends Abstra
393       * where no action is required, we reduce write contention by
394       * equivalently prefacing signalWork when called by an external
395       * task producer using a memory access with full-volatile
396 <     * semantics (alternatively, a "fullFence" could be used). (4) For
397 <     * internal task producers we rely on the fact that even if no
398 <     * other workers awaken, the producer itself will eventually see
399 <     * the task and execute it.
396 >     * semantics or a "fullFence". (4) For internal task producers we
397 >     * rely on the fact that even if no other workers awaken, the
398 >     * producer itself will eventually see the task and execute it.
399       *
400       * Almost always, too many signals are issued. A task producer
401       * cannot in general tell if some existing worker is in the midst
# Line 812 | Line 811 | public class ForkJoinPool extends Abstra
811          }
812  
813          /**
814 <         * Pushes a task. Call only by owner in unshared queues.  (The
816 <         * shared-queue version is embedded in method externalPush.)
814 >         * Pushes a task. Call only by owner in unshared queues.
815           *
816           * @param task the task. Caller must ensure non-null.
817           * @throws RejectedExecutionException if array cannot be resized
# Line 848 | Line 846 | public class ForkJoinPool extends Abstra
846                  (t = top) - (b = base) > 0) {
847                  int mask = size - 1;
848                  do { // emulate poll from old array, push to new array
849 <                    long offset = ((b & oldMask) << ASHIFT) + ABASE;
849 >                    int index = b & oldMask;
850 >                    long offset = ((long)index << ASHIFT) + ABASE;
851                      ForkJoinTask<?> x = (ForkJoinTask<?>)
852                          U.getObjectVolatile(oldA, offset);
853                      if (x != null &&
# Line 866 | Line 865 | public class ForkJoinPool extends Abstra
865           */
866          final ForkJoinTask<?> pop() {
867              int b = base, s = top, al, i; ForkJoinTask<?>[] a;
868 <            if ((a = array) != null && s != b && (al = a.length) > 0) {
869 <                long offset = (((al - 1) & (s - 1)) << ASHIFT) + ABASE;
868 >            if ((a = array) != null && b != s && (al = a.length) > 0) {
869 >                int index = (al - 1) & --s;
870 >                long offset = ((long)index << ASHIFT) + ABASE;
871                  ForkJoinTask<?> t = (ForkJoinTask<?>)U.getObject(a, offset);
872                  if (t != null && U.compareAndSwapObject(a, offset, t, null)) {
873 <                    top = s - 1;
873 >                    top = s;
874                      U.storeFence();
875                      return t;
876                  }
# Line 886 | Line 886 | public class ForkJoinPool extends Abstra
886          final ForkJoinTask<?> pollAt(int b) {
887              ForkJoinTask<?>[] a; int al;
888              if ((a = array) != null && (al = a.length) > 0) {
889 <                long offset = (((al - 1) & b) << ASHIFT) + ABASE;
889 >                int index = (al - 1) & b;
890 >                long offset = ((long)index << ASHIFT) + ABASE;
891                  ForkJoinTask<?> t = (ForkJoinTask<?>)
892                      U.getObjectVolatile(a, offset);
893                  if (t != null && b++ == base &&
# Line 906 | Line 907 | public class ForkJoinPool extends Abstra
907                  int b = base, s = top, d, al; ForkJoinTask<?>[] a;
908                  if ((a = array) != null && (d = b - s) < 0 &&
909                      (al = a.length) > 0) {
910 <                    long offset = (((al - 1) & b) << ASHIFT) + ABASE;
910 >                    int index = (al - 1) & b;
911 >                    long offset = ((long)index << ASHIFT) + ABASE;
912                      ForkJoinTask<?> t = (ForkJoinTask<?>)
913                          U.getObjectVolatile(a, offset);
914                      if (b++ == base) {
# Line 946 | Line 948 | public class ForkJoinPool extends Abstra
948           * Pops the given task only if it is at the current top.
949           */
950          final boolean tryUnpush(ForkJoinTask<?> task) {
951 <            int s = top - 1, al; ForkJoinTask<?>[] a;
952 <            if ((a = array) != null && (al = a.length) > 0) {
953 <                long offset = (((al - 1) & s) << ASHIFT) + ABASE;
954 <                if (U.getObject(a, offset) == task &&
955 <                    U.compareAndSwapObject(a, offset, task, null)) {
951 >            int b = base, s = top, al; ForkJoinTask<?>[] a;
952 >            if ((a = array) != null && b != s && (al = a.length) > 0) {
953 >                int index = (al - 1) & --s;
954 >                long offset = ((long)index << ASHIFT) + ABASE;
955 >                if (U.compareAndSwapObject(a, offset, task, null)) {
956                      top = s;
957                      U.storeFence();
958                      return true;
# Line 960 | Line 962 | public class ForkJoinPool extends Abstra
962          }
963  
964          /**
965 +         * Shared version of push. Fails if already locked.
966 +         *
967 +         * @return status: > 0 locked, 0 was empty, < 0 was nonempty
968 +         */
969 +        final int sharedPush(ForkJoinTask<?> task) {
970 +            int stat;
971 +            if (U.compareAndSwapInt(this, QLOCK, 0, 1)) {
972 +                int b = base, s = top, al, d; ForkJoinTask<?>[] a;
973 +                if ((a = array) != null && (al = a.length) > 0 &&
974 +                    al - 1 + (d = b - s) > 0) {
975 +                    a[(al - 1) & s] = task;
976 +                    top = s + 1;                 // relaxed writes OK here
977 +                    U.putOrderedInt(this, QLOCK, 0);
978 +                    if (d < 0)
979 +                        stat = d;
980 +                    else {
981 +                        U.fullFence();           // sync with signallees
982 +                        stat = 0;
983 +                    }
984 +                }
985 +                else {
986 +                    growAndSharedPush(task);
987 +                    stat = 0;
988 +                }
989 +            }
990 +            else
991 +                stat = 1;
992 +            return stat;
993 +        }
994 +
995 +        /*
996 +         * Helper for sharedPush; called only when locked and resize
997 +         * needed.
998 +         */
999 +        private void growAndSharedPush(ForkJoinTask<?> task) {
1000 +            try {
1001 +                growArray();
1002 +                int s = top, al; ForkJoinTask<?>[] a;
1003 +                if ((a = array) != null && (al = a.length) > 0) {
1004 +                    a[(al - 1) & s] = task;
1005 +                    top = s + 1;
1006 +                }
1007 +            } finally {
1008 +                qlock = 0;
1009 +            }
1010 +        }
1011 +
1012 +        /**
1013           * Shared version of pop
1014           */
1015          final boolean trySharedUnpush(ForkJoinTask<?> task) {
966            ForkJoinTask<?> t;
1016              boolean popped = false;
1017              int s = top - 1, al;  ForkJoinTask<?>[] a;
1018              if ((a = array) != null && (al = a.length) > 0) {
1019 <                long offset = (((al - 1) & s) << ASHIFT) + ABASE;
1020 <                if (U.getObject(a, offset) == task &&
1019 >                int index = (al - 1) & s;
1020 >                long offset = ((long)index << ASHIFT) + ABASE;
1021 >                ForkJoinTask<?> t = (ForkJoinTask<?>)U.getObject(a, offset);
1022 >                if (t == task &&
1023                      U.compareAndSwapInt(this, QLOCK, 0, 1)) {
1024                      if (U.compareAndSwapObject(a, offset, task, null)) {
1025                          popped = true;
# Line 1006 | Line 1057 | public class ForkJoinPool extends Abstra
1057              for (int nexec = 0;;) {
1058                  int b = base, s = top, al; ForkJoinTask<?>[] a;
1059                  if ((a = array) != null && b != s && (al = a.length) > 0) {
1060 <                    long offset = (((al - 1) & --s) << ASHIFT) + ABASE;
1060 >                    int index = (al - 1) & --s;
1061 >                    long offset = ((long)index << ASHIFT) + ABASE;
1062                      ForkJoinTask<?> t = (ForkJoinTask<?>)
1063                          U.getAndSetObject(a, offset, null);
1064                      if (t != null) {
# Line 1030 | Line 1082 | public class ForkJoinPool extends Abstra
1082              for (int nexec = 0;;) {
1083                  int b = base, s = top, al; ForkJoinTask<?>[] a;
1084                  if ((a = array) != null && b != s && (al = a.length) > 0) {
1085 <                    long offset = (((al - 1) & b++) << ASHIFT) + ABASE;
1085 >                    int index = (al - 1) & b++;
1086 >                    long offset = ((long)index << ASHIFT) + ABASE;
1087                      ForkJoinTask<?> t = (ForkJoinTask<?>)
1088                          U.getAndSetObject(a, offset, null);
1089                      if (t != null) {
# Line 1089 | Line 1142 | public class ForkJoinPool extends Abstra
1142                  while ((d = (b = base) - (s = top)) < 0 &&
1143                         (a = array) != null && (al = a.length) > 0) {
1144                      for (;;) {      // traverse from s to b
1145 <                        int i = --s & (al - 1);
1146 <                        long offset = (i << ASHIFT) + ABASE;
1145 >                        int index = --s & (al - 1);
1146 >                        long offset = (index << ASHIFT) + ABASE;
1147                          ForkJoinTask<?> t = (ForkJoinTask<?>)
1148                              U.getObjectVolatile(a, offset);
1149                          if (t == null)
# Line 1142 | Line 1195 | public class ForkJoinPool extends Abstra
1195          final CountedCompleter<?> popCC(CountedCompleter<?> task, int mode) {
1196              int b = base, s = top, al; ForkJoinTask<?>[] a;
1197              if ((a = array) != null && b != s && (al = a.length) > 0) {
1198 <                long offset = (((al - 1) & (s - 1)) << ASHIFT) + ABASE;
1199 <                Object o = U.getObjectVolatile(a, offset);
1198 >                int index = (al - 1) & (s - 1);
1199 >                long offset = ((long)index << ASHIFT) + ABASE;
1200 >                ForkJoinTask<?> o = (ForkJoinTask<?>)
1201 >                    U.getObjectVolatile(a, offset);
1202                  if (o instanceof CountedCompleter) {
1203                      CountedCompleter<?> t = (CountedCompleter<?>)o;
1204                      for (CountedCompleter<?> r = t;;) {
# Line 1190 | Line 1245 | public class ForkJoinPool extends Abstra
1245              ForkJoinTask<?>[] a;
1246              int b = base, s = top, al, h;
1247              if ((a = array) != null && b != s && (al = a.length) > 0) {
1248 <                long offset = (((al - 1) & b) << ASHIFT) + ABASE;
1249 <                Object o = U.getObjectVolatile(a, offset);
1248 >                int index = (al - 1) & b;
1249 >                long offset = ((long)index << ASHIFT) + ABASE;
1250 >                ForkJoinTask<?> o = (ForkJoinTask<?>)
1251 >                    U.getObjectVolatile(a, offset);
1252                  if (o == null)
1253                      h = 2;                      // retryable
1254                  else if (!(o instanceof CountedCompleter))
# Line 1265 | Line 1322 | public class ForkJoinPool extends Abstra
1322  
1323      /**
1324       * Permission required for callers of methods that may start or
1325 <     * kill threads.
1325 >     * kill threads. Also used as a private static lock in tryInitialize.
1326       */
1327      private static final RuntimePermission modifyThreadPermission;
1328  
# Line 1386 | Line 1443 | public class ForkJoinPool extends Abstra
1443      volatile int runState;
1444      final int config;                    // parallelism, mode
1445      int indexSeed;                       // to generate worker index
1446 +    volatile AtomicLong stealCounter;    // also used as sync monitor
1447      volatile WorkQueue[] workQueues;     // main registry
1448 +    final String workerNamePrefix;       // to create worker name string
1449      final ForkJoinWorkerThreadFactory factory;
1450      final UncaughtExceptionHandler ueh;  // per-worker UEH
1392    final String workerNamePrefix;       // to create worker name string
1393    volatile AtomicLong stealCounter;    // also used as sync monitor
1451  
1452      /**
1453       * Instantiates fields upon first submission, and/or throws
1454       * exception if terminating. Called only by externalPush.
1455       */
1456      private void tryInitialize() {
1457 <        int rs;
1458 <        while (((rs = runState) & STARTED) == 0) {
1459 <            AtomicLong sc = new AtomicLong();
1460 <            if (U.compareAndSwapObject(this, STEALCOUNTER, null, sc)) {
1461 <                // create workQueues array with size a power of two
1462 <                int p = config & SMASK; // ensure at least 2 slots
1463 <                int n = (p > 1) ? p - 1 : 1;
1464 <                n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
1465 <                n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
1466 <                workQueues = new WorkQueue[n];
1467 <                while (!U.compareAndSwapInt(this, RUNSTATE, rs, rs | STARTED))
1468 <                    rs = runState;
1469 <                synchronized (sc) { sc.notifyAll(); }
1413 <                break;
1414 <            }
1415 <            else if ((sc = stealCounter) != null) { // wait for initialization
1416 <                synchronized (sc) {
1417 <                    try {
1418 <                        if ((runState & STARTED) == 0)
1419 <                            sc.wait();
1420 <                    } catch (InterruptedException ie) {
1421 <                        Thread.currentThread().interrupt();
1422 <                    }
1457 >        if ((runState & STARTED) == 0) { // bootstrap by locking static field
1458 >            int rs; // create workQueues array with size a power of two
1459 >            int p = config & SMASK; // ensure at least 2 slots
1460 >            int n = (p > 1) ? p - 1 : 1;
1461 >            n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
1462 >            n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
1463 >            synchronized(modifyThreadPermission) {
1464 >                if ((runState & STARTED) == 0) {
1465 >                    stealCounter = new AtomicLong();
1466 >                    workQueues = new WorkQueue[n];
1467 >                    do {} while (!U.compareAndSwapInt(this, RUNSTATE,
1468 >                                                      rs = runState,
1469 >                                                      rs | STARTED));
1470                  }
1471              }
1472          }
1473 <        if ((rs = runState) < 0) {
1473 >        if (runState < 0) {
1474              tryTerminate(false, false); // help terminate
1475              throw new RejectedExecutionException();
1476          }
# Line 1860 | Line 1907 | public class ForkJoinPool extends Abstra
1907                  WorkQueue q; ForkJoinTask<?>[] a; int b, d, al;
1908                  if ((q = ws[idx]) != null && (d = (b = q.base) - q.top) < 0 &&
1909                      (a = q.array) != null && (al = a.length) > 0) {
1910 <                    long offset = (((al - 1) & b) << ASHIFT) + ABASE;
1910 >                    int index = (al - 1) & b;
1911 >                    long offset = ((long)index << ASHIFT) + ABASE;
1912                      ForkJoinTask<?> t = (ForkJoinTask<?>)
1913                          U.getObjectVolatile(a, offset);
1914                      if (t == null || b++ != q.base)
# Line 2010 | Line 2058 | public class ForkJoinPool extends Abstra
2058                          ForkJoinTask<?> next = v.currentJoin;
2059                          ForkJoinTask<?> t = null;
2060                          if ((a = v.array) != null && (al = a.length) > 0) {
2061 <                            long offset = (((al - 1) & b) << ASHIFT) + ABASE;
2061 >                            int index = (al - 1) & b;
2062 >                            long offset = ((long)index << ASHIFT) + ABASE;
2063                              t = (ForkJoinTask<?>)U.getObjectVolatile(a, offset);
2064                              if (t != null && b++ == v.base) {
2065                                  if (j.currentJoin != subtask ||
# Line 2403 | Line 2452 | public class ForkJoinPool extends Abstra
2452              WorkQueue q = new WorkQueue(this, null);
2453              q.config = index;
2454              q.scanState = ~UNSIGNALLED;
2455 <            synchronized (lock) {          // lock to install
2455 >            q.qlock = 1;                   // lock queue
2456 >            boolean installed = false;
2457 >            synchronized (lock) {          // lock pool to install
2458                  WorkQueue[] ws;
2459                  if ((ws = workQueues) != null && index < ws.length &&
2460                      ws[index] == null) {
2461 <                    ws[index] = q;             // else throw away
2461 >                    ws[index] = q;         // else throw away
2462 >                    installed = true;
2463 >                }
2464 >            }
2465 >            if (installed) {
2466 >                try {
2467 >                    q.growArray();
2468 >                } finally {
2469 >                    q.qlock = 0;
2470                  }
2471              }
2472          }
# Line 2423 | Line 2482 | public class ForkJoinPool extends Abstra
2482       * @param task the task. Caller must ensure non-null.
2483       */
2484      final void externalPush(ForkJoinTask<?> task) {
2485 <        int r;                                    // initialize caller's probe
2485 >        int r;                            // initialize caller's probe
2486          if ((r = ThreadLocalRandom.getProbe()) == 0) {
2487              ThreadLocalRandom.localInit();
2488              r = ThreadLocalRandom.getProbe();
2489          }
2490          for (;;) {
2491 <            WorkQueue q; int wl, k;
2491 >            WorkQueue q; int wl, k, stat;
2492              int rs = runState;
2493              WorkQueue[] ws = workQueues;
2494              if (rs <= 0 || ws == null || (wl = ws.length) <= 0)
2495                  tryInitialize();
2496              else if ((q = ws[k = (wl - 1) & r & SQMASK]) == null)
2497                  tryCreateExternalQueue(k);
2498 <            else if (U.compareAndSwapInt(q, QLOCK, 0, 1)) {
2499 <                int b = q.base, s = q.top, al, d; ForkJoinTask<?>[] a;
2500 <                if ((a = q.array) != null && (al = a.length) > 0 &&
2501 <                    al - 1 + (d = b - s) > 0) {
2502 <                    a[(al - 1) & s] = task;        // push
2444 <                    q.top = s + 1;                 // relaxed writes OK here
2445 <                    if (d != 0)
2446 <                        U.putOrderedInt(q, QLOCK, 0);
2447 <                    else {                         // sync with signallees
2448 <                        U.putIntVolatile(q, QLOCK, 0);
2449 <                        signalWork();
2450 <                    }
2451 <                    break;
2452 <                }
2453 <                else {                             // grow then retry
2454 <                    try {
2455 <                        q.growArray();
2456 <                    } finally {
2457 <                        q.qlock = 0;
2458 <                    }
2459 <                }
2498 >            else if ((stat = q.sharedPush(task)) < 0)
2499 >                break;
2500 >            else if (stat == 0) {
2501 >                signalWork();
2502 >                break;
2503              }
2504 <            else                                   // move if cannot lock
2504 >            else                          // move if busy
2505                  r = ThreadLocalRandom.advanceProbe(r);
2506          }
2507      }
# Line 2503 | Line 2546 | public class ForkJoinPool extends Abstra
2546                  w.trySharedUnpush(task));
2547      }
2548  
2549 +
2550      /**
2551       * Performs helpComplete for an external submitter.
2552       */
# Line 3375 | Line 3419 | public class ForkJoinPool extends Abstra
3419      private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
3420      private static final long CTL;
3421      private static final long RUNSTATE;
3378    private static final long STEALCOUNTER;
3379    private static final long QLOCK;
3422      private static final int  ABASE;
3423      private static final int  ASHIFT;
3424  
# Line 3386 | Line 3428 | public class ForkJoinPool extends Abstra
3428                  (ForkJoinPool.class.getDeclaredField("ctl"));
3429              RUNSTATE = U.objectFieldOffset
3430                  (ForkJoinPool.class.getDeclaredField("runState"));
3389            STEALCOUNTER = U.objectFieldOffset
3390                (ForkJoinPool.class.getDeclaredField("stealCounter"));
3391
3392            QLOCK = U.objectFieldOffset
3393                (WorkQueue.class.getDeclaredField("qlock"));
3431  
3432              ABASE = U.arrayBaseOffset(ForkJoinTask[].class);
3433              int scale = U.arrayIndexScale(ForkJoinTask[].class);
3434              if ((scale & (scale - 1)) != 0)
3435                  throw new Error("array index scale not a power of two");
3436              ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
3400
3437          } catch (ReflectiveOperationException e) {
3438              throw new Error(e);
3439          }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines