ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.138 by jsr166, Fri Jan 4 18:52:18 2013 UTC vs.
Revision 1.139 by dl, Thu Jan 10 15:03:25 2013 UTC

# Line 196 | Line 196 | public class ForkJoinPool extends Abstra
196       * for work-stealing (this would contaminate lifo/fifo
197       * processing). Instead, we randomly associate submission queues
198       * with submitting threads, using a form of hashing.  The
199 <     * ThreadLocal Submitter class contains a value initially used as
200 <     * a hash code for choosing existing queues, but may be randomly
201 <     * repositioned upon contention with other submitters.  In
202 <     * essence, submitters act like workers except that they are
203 <     * restricted to executing local tasks that they submitted (or in
204 <     * the case of CountedCompleters, others with the same root task).
205 <     * However, because most shared/external queue operations are more
206 <     * expensive than internal, and because, at steady state, external
207 <     * submitters will compete for CPU with workers, ForkJoinTask.join
208 <     * and related methods disable them from repeatedly helping to
209 <     * process tasks if all workers are active.  Insertion of tasks in
210 <     * shared mode requires a lock (mainly to protect in the case of
199 >     * ThreadLocalRandom probe value serves as a hash code for
200 >     * choosing existing queues, and may be randomly repositioned upon
201 >     * contention with other submitters.  In essence, submitters act
202 >     * like workers except that they are restricted to executing local
203 >     * tasks that they submitted (or in the case of CountedCompleters,
204 >     * others with the same root task).  However, because most
205 >     * shared/external queue operations are more expensive than
206 >     * internal, and because, at steady state, external submitters
207 >     * will compete for CPU with workers, ForkJoinTask.join and
208 >     * related methods disable them from repeatedly helping to process
209 >     * tasks if all workers are active.  Insertion of tasks in shared
210 >     * mode requires a lock (mainly to protect in the case of
211       * resizing) but we use only a simple spinlock (using bits in
212       * field qlock), because submitters encountering a busy queue move
213       * on to try or create other queues -- they block only when
# Line 535 | Line 535 | public class ForkJoinPool extends Abstra
535      }
536  
537      /**
538     * Per-thread records for threads that submit to pools. Currently
539     * holds only pseudo-random seed / index that is used to choose
540     * submission queues in method externalPush. In the future, this may
541     * also incorporate a means to implement different task rejection
542     * and resubmission policies.
543     *
544     * Seeds for submitters and workers/workQueues work in basically
545     * the same way but are initialized and updated using slightly
546     * different mechanics. Both are initialized using the same
547     * approach as in class ThreadLocal, where successive values are
548     * unlikely to collide with previous values. Seeds are then
549     * randomly modified upon collisions using xorshifts, which
550     * requires a non-zero seed.
551     */
552    static final class Submitter {
553        int seed;
554        Submitter(int s) { seed = s; }
555    }
556
557    /**
538       * Class for artificial tasks that are used to replace the target
539       * of local joins if they are removed from an interior queue slot
540       * in WorkQueue.tryRemoveAndExec. We don't need the proxy to
# Line 1054 | Line 1034 | public class ForkJoinPool extends Abstra
1034          defaultForkJoinWorkerThreadFactory;
1035  
1036      /**
1057     * Per-thread submission bookkeeping. Shared across all pools
1058     * to reduce ThreadLocal pollution and because random motion
1059     * to avoid contention in one pool is likely to hold for others.
1060     * Lazily initialized on first submission (but null-checked
1061     * in other contexts to avoid unnecessary initialization).
1062     */
1063    static final ThreadLocal<Submitter> submitters;
1064
1065    /**
1037       * Permission required for callers of methods that may start or
1038       * kill threads.
1039       */
# Line 1254 | Line 1225 | public class ForkJoinPool extends Abstra
1225       * a more conservative alternative to a pure spinlock.
1226       */
1227      private int acquirePlock() {
1228 <        int spins = PL_SPINS, r = 0, ps, nps;
1228 >        int spins = PL_SPINS, ps, nps;
1229          for (;;) {
1230              if (((ps = plock) & PL_LOCK) == 0 &&
1231                  U.compareAndSwapInt(this, PLOCK, ps, nps = ps + PL_LOCK))
1232                  return nps;
1262            else if (r == 0) { // randomize spins if possible
1263                Thread t = Thread.currentThread(); WorkQueue w; Submitter z;
1264                if ((t instanceof ForkJoinWorkerThread) &&
1265                    (w = ((ForkJoinWorkerThread)t).workQueue) != null)
1266                    r = w.seed;
1267                else if ((z = submitters.get()) != null)
1268                    r = z.seed;
1269                else
1270                    r = 1;
1271            }
1233              else if (spins >= 0) {
1234 <                r ^= r << 1; r ^= r >>> 3; r ^= r << 10; // xorshift
1274 <                if (r >= 0)
1234 >                if (ThreadLocalRandom.nextSecondarySeed() >= 0)
1235                      --spins;
1236              }
1237              else if (U.compareAndSwapInt(this, PLOCK, ps, ps | PL_SIGNAL)) {
# Line 1464 | Line 1424 | public class ForkJoinPool extends Abstra
1424       * @param task the task. Caller must ensure non-null.
1425       */
1426      final void externalPush(ForkJoinTask<?> task) {
1427 <        WorkQueue[] ws; WorkQueue q; Submitter z; int m; ForkJoinTask<?>[] a;
1428 <        if ((z = submitters.get()) != null && plock > 0 &&
1427 >        WorkQueue[] ws; WorkQueue q; int z, m; ForkJoinTask<?>[] a;
1428 >        if ((z = ThreadLocalRandom.getProbe()) != 0 && plock > 0 &&
1429              (ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
1430 <            (q = ws[m & z.seed & SQMASK]) != null &&
1430 >            (q = ws[m & z & SQMASK]) != null &&
1431              U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock
1432              int b = q.base, s = q.top, n, an;
1433              if ((a = q.array) != null && (an = a.length) > (n = s + 1 - b)) {
# Line 1502 | Line 1462 | public class ForkJoinPool extends Abstra
1462       * reinitialize if workQueues exists, while still advancing plock.
1463       */
1464      private void fullExternalPush(ForkJoinTask<?> task) {
1465 <        int r = 0; // random index seed
1466 <        for (Submitter z = submitters.get();;) {
1465 >        int r;
1466 >        if ((r = ThreadLocalRandom.getProbe()) == 0) {
1467 >            ThreadLocalRandom.localInit();
1468 >            r = ThreadLocalRandom.getProbe();
1469 >        }
1470 >        for (;;) {
1471              WorkQueue[] ws; WorkQueue q; int ps, m, k;
1472 <            if (z == null) {
1473 <                if (U.compareAndSwapInt(this, INDEXSEED, r = indexSeed,
1510 <                                        r += SEED_INCREMENT) && r != 0)
1511 <                    submitters.set(z = new Submitter(r));
1512 <            }
1513 <            else if (r == 0) {                  // move to a different index
1514 <                r = z.seed;
1515 <                r ^= r << 13;                   // same xorshift as WorkQueues
1516 <                r ^= r >>> 17;
1517 <                z.seed = r ^ (r << 5);
1518 <            }
1519 <            else if ((ps = plock) < 0)
1472 >            boolean move = false;
1473 >            if ((ps = plock) < 0)
1474                  throw new RejectedExecutionException();
1475              else if (ps == 0 || (ws = workQueues) == null ||
1476                       (m = ws.length - 1) < 0) { // initialize workQueues
# Line 1556 | Line 1510 | public class ForkJoinPool extends Abstra
1510                          return;
1511                      }
1512                  }
1513 <                r = 0; // move on failure
1513 >                move = true; // move on failure
1514              }
1515              else if (((ps = plock) & PL_LOCK) == 0) { // create new queue
1516                  q = new WorkQueue(this, null, SHARED_QUEUE, r);
# Line 1570 | Line 1524 | public class ForkJoinPool extends Abstra
1524                      releasePlock(nps);
1525              }
1526              else
1527 <                r = 0; // try elsewhere while lock held
1527 >                move = true; // move if busy
1528 >            if (move)
1529 >                r = ThreadLocalRandom.advanceProbe(r);
1530          }
1531      }
1532  
# Line 2329 | Line 2285 | public class ForkJoinPool extends Abstra
2285       * least one task.
2286       */
2287      static WorkQueue commonSubmitterQueue() {
2288 <        ForkJoinPool p; WorkQueue[] ws; int m; Submitter z;
2289 <        return ((z = submitters.get()) != null &&
2288 >        ForkJoinPool p; WorkQueue[] ws; int m, z;
2289 >        return ((z = ThreadLocalRandom.getProbe()) != 0 &&
2290                  (p = common) != null &&
2291                  (ws = p.workQueues) != null &&
2292                  (m = ws.length - 1) >= 0) ?
2293 <            ws[m & z.seed & SQMASK] : null;
2293 >            ws[m & z & SQMASK] : null;
2294      }
2295  
2296      /**
2297       * Tries to pop the given task from submitter's queue in common pool.
2298       */
2299      static boolean tryExternalUnpush(ForkJoinTask<?> t) {
2300 <        ForkJoinPool p; WorkQueue[] ws; WorkQueue q; Submitter z;
2301 <        ForkJoinTask<?>[] a;  int m, s;
2300 >        ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
2301 >        ForkJoinTask<?>[] a;  int m, s, z;
2302          if (t != null &&
2303 <            (z = submitters.get()) != null &&
2303 >            (z = ThreadLocalRandom.getProbe()) != 0 &&
2304              (p = common) != null &&
2305              (ws = p.workQueues) != null &&
2306              (m = ws.length - 1) >= 0 &&
2307 <            (q = ws[m & z.seed & SQMASK]) != null &&
2307 >            (q = ws[m & z & SQMASK]) != null &&
2308              (s = q.top) != q.base &&
2309              (a = q.array) != null) {
2310              long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
# Line 2418 | Line 2374 | public class ForkJoinPool extends Abstra
2374       */
2375      static void externalHelpJoin(ForkJoinTask<?> t) {
2376          // Some hard-to-avoid overlap with tryExternalUnpush
2377 <        ForkJoinPool p; WorkQueue[] ws; WorkQueue q, w; Submitter z;
2378 <        ForkJoinTask<?>[] a;  int m, s, n;
2377 >        ForkJoinPool p; WorkQueue[] ws; WorkQueue q, w;
2378 >        ForkJoinTask<?>[] a;  int m, s, n, z;
2379          if (t != null &&
2380 <            (z = submitters.get()) != null &&
2380 >            (z = ThreadLocalRandom.getProbe()) != 0 &&
2381              (p = common) != null &&
2382              (ws = p.workQueues) != null &&
2383              (m = ws.length - 1) >= 0 &&
2384 <            (q = ws[m & z.seed & SQMASK]) != null &&
2384 >            (q = ws[m & z & SQMASK]) != null &&
2385              (a = q.array) != null) {
2386              int am = a.length - 1;
2387              if ((s = q.top) != q.base) {
# Line 3358 | Line 3314 | public class ForkJoinPool extends Abstra
3314          if ((s & (s-1)) != 0)
3315              throw new Error("data type scale not a power of two");
3316  
3361        submitters = new ThreadLocal<Submitter>();
3317          ForkJoinWorkerThreadFactory fac = defaultForkJoinWorkerThreadFactory =
3318              new DefaultForkJoinWorkerThreadFactory();
3319          modifyThreadPermission = new RuntimePermission("modifyThread");

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines