ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.115 by jsr166, Thu Jan 26 19:10:27 2012 UTC vs.
Revision 1.116 by dl, Fri Jan 27 17:27:28 2012 UTC

# Line 195 | Line 195 | public class ForkJoinPool extends Abstra
195       * WorkQueues are also used in a similar way for tasks submitted
196       * to the pool. We cannot mix these tasks in the same queues used
197       * for work-stealing (this would contaminate lifo/fifo
198 <     * processing). Instead, we loosely associate (via hashing)
199 <     * submission queues with submitting threads, and randomly scan
200 <     * these queues as well when looking for work. In essence,
201 <     * submitters act like workers except that they never take tasks,
202 <     * and they are multiplexed on to a finite number of shared work
203 <     * queues. However, classes are set up so that future extensions
204 <     * could allow submitters to optionally help perform tasks as
205 <     * well. Pool submissions from internal workers are also allowed,
206 <     * but use randomized rather than thread-hashed queue indices to
207 <     * avoid imbalance.  Insertion of tasks in shared mode requires a
208 <     * lock (mainly to protect in the case of resizing) but we use
209 <     * only a simple spinlock (using bits in field runState), because
210 <     * submitters encountering a busy queue try or create others so
211 <     * never block.
198 >     * processing). Instead, we loosely associate submission queues
199 >     * with submitting threads, using a form of hashing.  The
200 >     * ThreadLocal Submitter class contains a value initially used as
201 >     * a hash code for choosing existing queues, but may be randomly
202 >     * repositioned upon contention with other submitters.  In
203 >     * essence, submitters act like workers except that they never
204 >     * take tasks, and they are multiplexed on to a finite number of
205 >     * shared work queues. However, classes are set up so that future
206 >     * extensions could allow submitters to optionally help perform
207 >     * tasks as well. Pool submissions from internal workers are also
208 >     * allowed, but use randomized rather than thread-hashed queue
209 >     * indices to avoid imbalance.  Insertion of tasks in shared mode
210 >     * requires a lock (mainly to protect in the case of resizing) but
211 >     * we use only a simple spinlock (using bits in field runState),
212 >     * because submitters encountering a busy queue try or create
213 >     * others so never block.
214       *
215       * Management.
216       * ==========
# Line 1085 | Line 1087 | public class ForkJoinPool extends Abstra
1087      }
1088  
1089      /**
1090 <     * Computes a hash code for the given thread. This method is
1091 <     * expected to provide higher-quality hash codes than those using
1092 <     * method hashCode().
1093 <     */
1094 <    static final int hashThread(Thread t) {
1095 <        long id = (t == null) ? 0L : t.getId(); // Use MurmurHash of thread id
1096 <        int h = (int)id ^ (int)(id >>> 32);
1097 <        h ^= h >>> 16;
1098 <        h *= 0x85ebca6b;
1099 <        h ^= h >>> 13;
1100 <        h *= 0xc2b2ae35;
1101 <        return h ^ (h >>> 16);
1090 > <<<<<<< ForkJoinPool.java
1091 >     * Per-thread records for (typically non-FJ) threads that submit
1092 >     * to pools. Cureently holds only psuedo-random seed / index that
1093 >     * is used to chose submission queues in method doSubmit. In the
1094 >     * future, this may incorporate a means to implement different
1095 >     * task rejection and resubmission policies.
1096 >     */
1097 >    static final class Submitter {
1098 >        int seed; // seed for random submission queue selection
1099 >
1100 >        // Heuristic padding to ameliorate unfortunate memory placements
1101 >        int p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
1102 >
1103 >        Submitter() {
1104 >            // Use identityHashCode, forced negative, for seed
1105 >            seed = System.identityHashCode(Thread.currentThread()) | (1 << 31);
1106 >        }
1107 >
1108 >        /**
1109 >         * Computes next value for random probes.  Like method
1110 >         * WorkQueue.nextSeed, this is manually inlined in several
1111 >         * usages to avoid writes inside busy loops.
1112 >         */
1113 >        final int nextSeed() {
1114 >            int r = seed;
1115 >            r ^= r << 13;
1116 >            r ^= r >>> 17;
1117 >            return seed = r ^= r << 5;
1118 >        }
1119 >    }
1120 >
1121 >    /** ThreadLocal class for Submitters */
1122 >    static final class ThreadSubmitter extends ThreadLocal<Submitter> {
1123 >        public Submitter initialValue() { return new Submitter(); }
1124      }
1125  
1126      /**
1127 +     * Per-thread submission bookeeping. Shared across all pools
1128 +     * to reduce ThreadLocal pollution and because random motion
1129 +     * to avoid contention in one pool is likely to hold for others.
1130 +     */
1131 +    static final ThreadSubmitter submitters = new ThreadSubmitter();
1132 +
1133 +    /**
1134       * Top-level runloop for workers
1135       */
1136      final void runWorker(ForkJoinWorkerThread wt) {
1137 +        // Initialize queue array and seed in this thread
1138          WorkQueue w = wt.workQueue;
1139 <        w.growArray(false);     // Initialize queue array and seed in this thread
1140 <        w.seed = hashThread(Thread.currentThread()) | (1 << 31); // force < 0
1139 >        w.growArray(false);
1140 >        // Same initial hash as Submitters
1141 >        w.seed = System.identityHashCode(Thread.currentThread()) | (1 << 31);
1142  
1143          do {} while (w.runTask(scan(w)));
1144      }
# Line 1218 | Line 1251 | public class ForkJoinPool extends Abstra
1251              U.throwException(ex);
1252      }
1253  
1254 +    /**
1255 +     * Tries to add and register a new queue at the given index.
1256 +     *
1257 +     * @param idx the workQueues array index to register the queue
1258 +     * @return the queue, or null if could not add because could
1259 +     * not acquire lock or idx is unusable
1260 +     */
1261 +    private WorkQueue tryAddSharedQueue(int idx) {
1262 +        WorkQueue q = null;
1263 +        ReentrantLock lock = this.lock;
1264 +        if (idx >= 0 && (idx & 1) == 0 && !lock.isLocked()) {
1265 +            // create queue outside of lock but only if apparently free
1266 +            WorkQueue nq = new WorkQueue(null, SHARED_QUEUE);
1267 +            if (lock.tryLock()) {
1268 +                try {
1269 +                    WorkQueue[] ws = workQueues;
1270 +                    if (ws != null && idx < ws.length) {
1271 +                        if ((q = ws[idx]) == null) {
1272 +                            int rs;         // update runState seq
1273 +                            ws[idx] = q = nq;
1274 +                            runState = (((rs = runState) & SHUTDOWN) |
1275 +                                        ((rs + RS_SEQ) & ~SHUTDOWN));
1276 +                        }
1277 +                    }
1278 +                } finally {
1279 +                    lock.unlock();
1280 +                }
1281 +            }
1282 +        }
1283 +        return q;
1284 +    }
1285  
1286      // Maintaining ctl counts
1287  
# Line 1320 | Line 1384 | public class ForkJoinPool extends Abstra
1384      // Submissions
1385  
1386      /**
1387 <     * Unless shutting down, adds the given task to some submission
1388 <     * queue; using a randomly chosen queue index if the caller is a
1389 <     * ForkJoinWorkerThread, else one based on caller thread's hash
1390 <     * code. If no queue exists at the index, one is created.  If the
1327 <     * queue is busy, another is chosen by sweeping through the queues
1328 <     * array.
1387 >     * Unless shutting down, adds the given task to a submission queue
1388 >     * at submitter's current queue index. If no queue exists at the
1389 >     * index, one is created unless pool lock is busy.  If the queue
1390 >     * and/or lock are busy, another index is randomly chosen.
1391       */
1392      private void doSubmit(ForkJoinTask<?> task) {
1393          if (task == null)
1394              throw new NullPointerException();
1395 <        Thread t = Thread.currentThread();
1396 <        int r = ((t instanceof ForkJoinWorkerThread) ?
1397 <                 ((ForkJoinWorkerThread)t).workQueue.nextSeed() : hashThread(t));
1336 <        for (;;) {
1395 >        Submitter s = submitters.get();
1396 >        for (int r = s.seed;;) {
1397 >            WorkQueue q; int k;
1398              int rs = runState, m = rs & SMASK;
1338            int j = r &= (m & ~1);                      // even numbered queues
1399              WorkQueue[] ws = workQueues;
1400 <            if (rs < 0 || ws == null)
1401 <                throw new RejectedExecutionException(); // shutting down
1402 <            if (ws.length > m) {                        // consistency check
1403 <                for (WorkQueue q;;) {                   // circular sweep
1404 <                    if (((q = ws[j]) != null ||
1405 <                         (q = tryAddSharedQueue(j)) != null) &&
1406 <                        q.trySharedPush(task)) {
1407 <                        signalWork();
1348 <                        return;
1349 <                    }
1350 <                    if ((j = (j + 2) & m) == r) {
1351 <                        Thread.yield();                 // all queues busy
1352 <                        break;
1353 <                    }
1354 <                }
1400 >            if (rs < 0 || ws == null)   // shutting down
1401 >                throw new RejectedExecutionException();
1402 >            if (ws.length > m &&        // k must be at index
1403 >                ((q = ws[k = (r << 1) & m]) != null ||
1404 >                 (q = tryAddSharedQueue(k)) != null) &&
1405 >                q.trySharedPush(task)) {
1406 >                signalWork();
1407 >                return;
1408              }
1409 +            r ^= r << 13;               // xorshift seed to new position
1410 +            r ^= r >>> 17;
1411 +            if (((s.seed = r ^= r << 5) & m) == 0)
1412 +                Thread.yield();         // occasionally yield if busy
1413          }
1414      }
1415  
1359    /**
1360     * Tries to add and register a new queue at the given index.
1361     *
1362     * @param idx the workQueues array index to register the queue
1363     * @return the queue, or null if could not add because could
1364     * not acquire lock or idx is unusable
1365     */
1366    private WorkQueue tryAddSharedQueue(int idx) {
1367        WorkQueue q = null;
1368        ReentrantLock lock = this.lock;
1369        if (idx >= 0 && (idx & 1) == 0 && !lock.isLocked()) {
1370            // create queue outside of lock but only if apparently free
1371            WorkQueue nq = new WorkQueue(null, SHARED_QUEUE);
1372            if (lock.tryLock()) {
1373                try {
1374                    WorkQueue[] ws = workQueues;
1375                    if (ws != null && idx < ws.length) {
1376                        if ((q = ws[idx]) == null) {
1377                            int rs;         // update runState seq
1378                            ws[idx] = q = nq;
1379                            runState = (((rs = runState) & SHUTDOWN) |
1380                                        ((rs + RS_SEQ) & ~SHUTDOWN));
1381                        }
1382                    }
1383                } finally {
1384                    lock.unlock();
1385                }
1386            }
1387        }
1388        return q;
1389    }
1416  
1417      // Scanning for tasks
1418  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines