ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.32 by dl, Sun Apr 18 12:51:18 2010 UTC vs.
Revision 1.61 by jsr166, Tue Nov 23 06:21:54 2010 UTC

# Line 6 | Line 6
6  
7   package jsr166y;
8  
9 import java.util.concurrent.*;
10
9   import java.util.Random;
10   import java.util.Collection;
11   import java.util.concurrent.locks.LockSupport;
12 + import java.util.concurrent.RejectedExecutionException;
13  
14   /**
15 < * A thread managed by a {@link ForkJoinPool}.  This class is
16 < * subclassable solely for the sake of adding functionality -- there
17 < * are no overridable methods dealing with scheduling or execution.
18 < * However, you can override initialization and termination methods
19 < * surrounding the main task processing loop.  If you do create such a
20 < * subclass, you will also need to supply a custom {@link
21 < * ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code
22 < * ForkJoinPool}.
15 > * A thread managed by a {@link ForkJoinPool}, which executes
16 > * {@link ForkJoinTask}s.
17 > * This class is subclassable solely for the sake of adding
18 > * functionality -- there are no overridable methods dealing with
19 > * scheduling or execution.  However, you can override initialization
20 > * and termination methods surrounding the main task processing loop.
21 > * If you do create such a subclass, you will also need to supply a
22 > * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
23 > * in a {@code ForkJoinPool}.
24   *
25   * @since 1.7
26   * @author Doug Lea
# Line 83 | Line 83 | public class ForkJoinWorkerThread extend
83       * by the ForkJoinPool).  This allows use in message-passing
84       * frameworks in which tasks are never joined.
85       *
86 <     * Efficient implementation of this approach currently relies on
87 <     * an uncomfortable amount of "Unsafe" mechanics. To maintain
86 >     * When a worker would otherwise be blocked waiting to join a
87 >     * task, it first tries a form of linear helping: Each worker
88 >     * records (in field currentSteal) the most recent task it stole
89 >     * from some other worker. Plus, it records (in field currentJoin)
90 >     * the task it is currently actively joining. Method joinTask uses
91 >     * these markers to try to find a worker to help (i.e., steal back
92 >     * a task from and execute it) that could hasten completion of the
93 >     * actively joined task. In essence, the joiner executes a task
94 >     * that would be on its own local deque had the to-be-joined task
95 >     * not been stolen. This may be seen as a conservative variant of
96 >     * the approach in Wagner & Calder "Leapfrogging: a portable
97 >     * technique for implementing efficient futures" SIGPLAN Notices,
98 >     * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
99 >     * in that: (1) We only maintain dependency links across workers
100 >     * upon steals, rather than use per-task bookkeeping.  This may
101 >     * require a linear scan of workers array to locate stealers, but
102 >     * usually doesn't because stealers leave hints (that may become
103 >     * stale/wrong) of where to locate them. This isolates cost to
104 >     * when it is needed, rather than adding to per-task overhead.
105 >     * (2) It is "shallow", ignoring nesting and potentially cyclic
106 >     * mutual steals.  (3) It is intentionally racy: field currentJoin
107 >     * is updated only while actively joining, which means that we
108 >     * miss links in the chain during long-lived tasks, GC stalls etc
109 >     * (which is OK since blocking in such cases is usually a good
110 >     * idea).  (4) We bound the number of attempts to find work (see
111 >     * MAX_HELP_DEPTH) and fall back to suspending the worker and if
112 >     * necessary replacing it with a spare (see
113 >     * ForkJoinPool.awaitJoin).
114 >     *
115 >     * Efficient implementation of these algorithms currently relies
116 >     * on an uncomfortable amount of "Unsafe" mechanics. To maintain
117       * correct orderings, reads and writes of variable base require
118       * volatile ordering.  Variable sp does not require volatile
119       * writes but still needs store-ordering, which we accomplish by
120       * pre-incrementing sp before filling the slot with an ordered
121       * store.  (Pre-incrementing also enables backouts used in
122 <     * scanWhileJoining.)  Because they are protected by volatile base
123 <     * reads, reads of the queue array and its slots by other threads
124 <     * do not need volatile load semantics, but writes (in push)
125 <     * require store order and CASes (in pop and deq) require
126 <     * (volatile) CAS semantics.  (Michael, Saraswat, and Vechev's
127 <     * algorithm has similar properties, but without support for
128 <     * nulling slots.)  Since these combinations aren't supported
129 <     * using ordinary volatiles, the only way to accomplish these
130 <     * efficiently is to use direct Unsafe calls. (Using external
131 <     * AtomicIntegers and AtomicReferenceArrays for the indices and
132 <     * array is significantly slower because of memory locality and
133 <     * indirection effects.)
122 >     * joinTask.)  Because they are protected by volatile base reads,
123 >     * reads of the queue array and its slots by other threads do not
124 >     * need volatile load semantics, but writes (in push) require
125 >     * store order and CASes (in pop and deq) require (volatile) CAS
126 >     * semantics.  (Michael, Saraswat, and Vechev's algorithm has
127 >     * similar properties, but without support for nulling slots.)
128 >     * Since these combinations aren't supported using ordinary
129 >     * volatiles, the only way to accomplish these efficiently is to
130 >     * use direct Unsafe calls. (Using external AtomicIntegers and
131 >     * AtomicReferenceArrays for the indices and array is
132 >     * significantly slower because of memory locality and indirection
133 >     * effects.)
134       *
135       * Further, performance on most platforms is very sensitive to
136       * placement and sizing of the (resizable) queue array.  Even
# Line 126 | Line 155 | public class ForkJoinWorkerThread extend
155      private static final Random seedGenerator = new Random();
156  
157      /**
158 <     * The timeout value for suspending spares. Spare workers that
159 <     * remain unsignalled for more than this time may be trimmed
160 <     * (killed and removed from pool).  Since our goal is to avoid
161 <     * long-term thread buildup, the exact value of timeout does not
162 <     * matter too much so long as it avoids most false-alarm timeouts
134 <     * under GC stalls or momentarily high system load.
158 >     * The maximum stolen->joining link depth allowed in helpJoinTask.
159 >     * Depths for legitimate chains are unbounded, but we use a fixed
160 >     * constant to avoid (otherwise unchecked) cycles and bound
161 >     * staleness of traversal parameters at the expense of sometimes
162 >     * blocking when we could be helping.
163       */
164 <    private static final long SPARE_KEEPALIVE_NANOS =
137 <        5L * 1000L * 1000L * 1000L; // 5 secs
164 >    private static final int MAX_HELP_DEPTH = 8;
165  
166      /**
167       * Capacity of work-stealing queue array upon initialization.
168 <     * Must be a power of two. Initial size must be at least 2, but is
168 >     * Must be a power of two. Initial size must be at least 4, but is
169       * padded to minimize cache effects.
170       */
171      private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
172  
173      /**
174       * Maximum work-stealing queue array size.  Must be less than or
175 <     * equal to 1 << 28 to ensure lack of index wraparound. (This
176 <     * is less than usual bounds, because we need leftshift by 3
177 <     * to be in int range).
175 >     * equal to 1 << (31 - width of array entry) to ensure lack of
176 >     * index wraparound. The value is set in the static block
177 >     * at the end of this file after obtaining width.
178       */
179 <    private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28;
179 >    private static final int MAXIMUM_QUEUE_CAPACITY;
180  
181      /**
182       * The pool this thread works in. Accessed directly by ForkJoinTask.
# Line 178 | Line 205 | public class ForkJoinWorkerThread extend
205      private int sp;
206  
207      /**
208 +     * The index of most recent stealer, used as a hint to avoid
209 +     * traversal in method helpJoinTask. This is only a hint because a
210 +     * worker might have had multiple steals and this only holds one
211 +     * of them (usually the most current). Declared non-volatile,
212 +     * relying on other prevailing sync to keep reasonably current.
213 +     */
214 +    private int stealHint;
215 +
216 +    /**
217       * Run state of this worker. In addition to the usual run levels,
218       * tracks if this worker is suspended as a spare, and if it was
219       * killed (trimmed) while suspended. However, "active" status is
220 <     * maintained separately.
220 >     * maintained separately and modified only in conjunction with
221 >     * CASes of the pool's runState (which are currently sadly
222 >     * manually inlined for performance.)  Accessed directly by pool
223 >     * to simplify checks for normal (zero) status.
224       */
225 <    private volatile int runState;
225 >    volatile int runState;
226  
227      private static final int TERMINATING = 0x01;
228      private static final int TERMINATED  = 0x02;
# Line 191 | Line 230 | public class ForkJoinWorkerThread extend
230      private static final int TRIMMED     = 0x08; // killed while suspended
231  
232      /**
233 <     * Number of LockSupport.park calls to block this thread for
234 <     * suspension or event waits. Used for internal instrumention;
196 <     * currently not exported but included because volatile write upon
197 <     * park also provides a workaround for a JVM bug.
198 <     */
199 <    private volatile int parkCount;
200 <
201 <    /**
202 <     * Number of steals, transferred and reset in pool callbacks pool
203 <     * when idle Accessed directly by pool.
233 >     * Number of steals. Directly accessed (and reset) by
234 >     * pool.tryAccumulateStealCount when idle.
235       */
236      int stealCount;
237  
# Line 218 | Line 249 | public class ForkJoinWorkerThread extend
249  
250      /**
251       * True if use local fifo, not default lifo, for local polling.
252 <     * Shadows value from ForkJoinPool, which resets it if changed
222 <     * pool-wide.
252 >     * Shadows value from ForkJoinPool.
253       */
254 <    private boolean locallyFifo;
254 >    private final boolean locallyFifo;
255  
256      /**
257       * Index of this worker in pool array. Set once by pool before
# Line 237 | Line 267 | public class ForkJoinWorkerThread extend
267      int lastEventCount;
268  
269      /**
270 <     * Encoded index and event count of next event waiter. Used only
271 <     * by ForkJoinPool for managing event waiters.
270 >     * Encoded index and event count of next event waiter. Accessed
271 >     * only by ForkJoinPool for managing event waiters.
272       */
273      volatile long nextWaiter;
274  
275      /**
276 +     * Number of times this thread suspended as spare. Accessed only
277 +     * by pool.
278 +     */
279 +    int spareCount;
280 +
281 +    /**
282 +     * Encoded index and count of next spare waiter. Accessed only
283 +     * by ForkJoinPool for managing spares.
284 +     */
285 +    volatile int nextSpare;
286 +
287 +    /**
288 +     * The task currently being joined, set only when actively trying
289 +     * to help other stealers in helpJoinTask. Written only by this
290 +     * thread, but read by others.
291 +     */
292 +    private volatile ForkJoinTask<?> currentJoin;
293 +
294 +    /**
295 +     * The task most recently stolen from another worker (or
296 +     * submission queue).  Written only by this thread, but read by
297 +     * others.
298 +     */
299 +    private volatile ForkJoinTask<?> currentSteal;
300 +
301 +    /**
302       * Creates a ForkJoinWorkerThread operating in the given pool.
303       *
304       * @param pool the pool this thread works in
305       * @throws NullPointerException if pool is null
306       */
307      protected ForkJoinWorkerThread(ForkJoinPool pool) {
252        if (pool == null) throw new NullPointerException();
308          this.pool = pool;
309 +        this.locallyFifo = pool.locallyFifo;
310 +        setDaemon(true);
311          // To avoid exposing construction details to subclasses,
312          // remaining initialization is in start() and onStart()
313      }
314  
315      /**
316 <     * Performs additional initialization and starts this thread
316 >     * Performs additional initialization and starts this thread.
317       */
318 <    final void start(int poolIndex, boolean locallyFifo,
262 <                     UncaughtExceptionHandler ueh) {
318 >    final void start(int poolIndex, UncaughtExceptionHandler ueh) {
319          this.poolIndex = poolIndex;
264        this.locallyFifo = locallyFifo;
320          if (ueh != null)
321              setUncaughtExceptionHandler(ueh);
267        setDaemon(true);
322          start();
323      }
324  
# Line 295 | Line 349 | public class ForkJoinWorkerThread extend
349      /**
350       * Initializes internal state after construction but before
351       * processing any tasks. If you override this method, you must
352 <     * invoke super.onStart() at the beginning of the method.
352 >     * invoke {@code super.onStart()} at the beginning of the method.
353       * Initialization requires care: Most fields must have legal
354       * default values, to ensure that attempted accesses from other
355       * threads work correctly even before this thread starts
# Line 305 | Line 359 | public class ForkJoinWorkerThread extend
359          int rs = seedGenerator.nextInt();
360          seed = rs == 0? 1 : rs; // seed must be nonzero
361  
362 <        // Allocate name string and queue array in this thread
362 >        // Allocate name string and arrays in this thread
363          String pid = Integer.toString(pool.getPoolNumber());
364          String wid = Integer.toString(poolIndex);
365          setName("ForkJoinPool-" + pid + "-worker-" + wid);
# Line 323 | Line 377 | public class ForkJoinWorkerThread extend
377       */
378      protected void onTermination(Throwable exception) {
379          try {
380 +            ForkJoinPool p = pool;
381 +            if (active) {
382 +                int a; // inline p.tryDecrementActiveCount
383 +                active = false;
384 +                do {} while (!UNSAFE.compareAndSwapInt
385 +                             (p, poolRunStateOffset, a = p.runState, a - 1));
386 +            }
387              cancelTasks();
388              setTerminated();
389 <            pool.workerTerminated(this);
389 >            p.workerTerminated(this);
390          } catch (Throwable ex) {        // Shouldn't ever happen
391              if (exception == null)      // but if so, at least rethrown
392                  exception = ex;
# Line 338 | Line 399 | public class ForkJoinWorkerThread extend
399      /**
400       * This method is required to be public, but should never be
401       * called explicitly. It performs the main run loop to execute
402 <     * ForkJoinTasks.
402 >     * {@link ForkJoinTask}s.
403       */
404      public void run() {
405          Throwable exception = null;
# Line 355 | Line 416 | public class ForkJoinWorkerThread extend
416      // helpers for run()
417  
418      /**
419 <     * Find and execute tasks and check status while running
419 >     * Finds and executes tasks, and checks status while running.
420       */
421      private void mainLoop() {
422 <        boolean ran = false; // true if ran task on previous step
422 >        boolean ran = false; // true if ran a task on last step
423          ForkJoinPool p = pool;
424          for (;;) {
425              p.preStep(this, ran);
426              if (runState != 0)
427 <                return;
428 <            ForkJoinTask<?> t; // try to get and run stolen or submitted task
368 <            if (ran = (t = scan()) != null || (t = pollSubmission()) != null) {
369 <                t.tryExec();
370 <                if (base != sp)
371 <                    runLocalTasks();
372 <            }
427 >                break;
428 >            ran = tryExecSteal() || tryExecSubmission();
429          }
430      }
431  
432      /**
433 <     * Runs local tasks until queue is empty or shut down.  Call only
434 <     * while active.
433 >     * Tries to steal a task and execute it.
434 >     *
435 >     * @return true if ran a task
436       */
437 <    private void runLocalTasks() {
438 <        while (runState == 0) {
439 <            ForkJoinTask<?> t = locallyFifo? locallyDeqTask() : popTask();
440 <            if (t != null)
441 <                t.tryExec();
442 <            else if (base == sp)
443 <                break;
437 >    private boolean tryExecSteal() {
438 >        ForkJoinTask<?> t;
439 >        if ((t = scan()) != null) {
440 >            t.quietlyExec();
441 >            UNSAFE.putOrderedObject(this, currentStealOffset, null);
442 >            if (sp != base)
443 >                execLocalTasks();
444 >            return true;
445          }
446 +        return false;
447      }
448  
449      /**
450 <     * If a submission exists, try to activate and take it
450 >     * If a submission exists, try to activate and run it.
451       *
452 <     * @return a task, if available
452 >     * @return true if ran a task
453       */
454 <    private ForkJoinTask<?> pollSubmission() {
454 >    private boolean tryExecSubmission() {
455          ForkJoinPool p = pool;
456 +        // This loop is needed in case attempt to activate fails, in
457 +        // which case we only retry if there still appears to be a
458 +        // submission.
459          while (p.hasQueuedSubmissions()) {
460 <            if (active || (active = p.tryIncrementActiveCount())) {
461 <                ForkJoinTask<?> t = p.pollSubmission();
462 <                return t != null ? t : scan(); // if missed, rescan
460 >            ForkJoinTask<?> t; int a;
461 >            if (active || // inline p.tryIncrementActiveCount
462 >                (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
463 >                                                   a = p.runState, a + 1))) {
464 >                if ((t = p.pollSubmission()) != null) {
465 >                    UNSAFE.putOrderedObject(this, currentStealOffset, t);
466 >                    t.quietlyExec();
467 >                    UNSAFE.putOrderedObject(this, currentStealOffset, null);
468 >                    if (sp != base)
469 >                        execLocalTasks();
470 >                    return true;
471 >                }
472              }
473          }
474 <        return null;
474 >        return false;
475 >    }
476 >
477 >    /**
478 >     * Runs local tasks until queue is empty or shut down.  Call only
479 >     * while active.
480 >     */
481 >    private void execLocalTasks() {
482 >        while (runState == 0) {
483 >            ForkJoinTask<?> t = locallyFifo ? locallyDeqTask() : popTask();
484 >            if (t != null)
485 >                t.quietlyExec();
486 >            else if (sp == base)
487 >                break;
488 >        }
489      }
490  
491      /*
492       * Intrinsics-based atomic writes for queue slots. These are
493 <     * basically the same as methods in AtomicObjectArray, but
493 >     * basically the same as methods in AtomicReferenceArray, but
494       * specialized for (1) ForkJoinTask elements (2) requirement that
495       * nullness and bounds checks have already been performed by
496       * callers and (3) effective offsets are known not to overflow
497       * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't
498       * need corresponding version for reads: plain array reads are OK
499 <     * because they protected by other volatile reads and are
499 >     * because they are protected by other volatile reads and are
500       * confirmed by CASes.
501       *
502       * Most uses don't actually call these methods, but instead contain
# Line 435 | Line 520 | public class ForkJoinWorkerThread extend
520       * range. This method is used only during resets and backouts.
521       */
522      private static final void writeSlot(ForkJoinTask<?>[] q, int i,
523 <                                              ForkJoinTask<?> t) {
523 >                                        ForkJoinTask<?> t) {
524          UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t);
525      }
526  
# Line 447 | Line 532 | public class ForkJoinWorkerThread extend
532       * @param t the task. Caller must ensure non-null.
533       */
534      final void pushTask(ForkJoinTask<?> t) {
450        int s;
535          ForkJoinTask<?>[] q = queue;
536          int mask = q.length - 1; // implicit assert q != null
537 <        UNSAFE.putOrderedObject(q, (((s = sp++) & mask) << qShift) + qBase, t);
538 <        if ((s -= base) <= 0)
539 <            pool.signalWork();
540 <        else if (s + 1 >= mask)
541 <            growQueue();
537 >        int s = sp++;            // ok to increment sp before slot write
538 >        UNSAFE.putOrderedObject(q, ((s & mask) << qShift) + qBase, t);
539 >        if ((s -= base) == 0)
540 >            pool.signalWork();   // was empty
541 >        else if (s == mask)
542 >            growQueue();         // is full
543      }
544  
545      /**
546       * Tries to take a task from the base of the queue, failing if
547       * empty or contended. Note: Specializations of this code appear
548 <     * in scan and scanWhileJoining.
548 >     * in locallyDeqTask and elsewhere.
549       *
550       * @return a task, or null if none or contended
551       */
# Line 468 | Line 553 | public class ForkJoinWorkerThread extend
553          ForkJoinTask<?> t;
554          ForkJoinTask<?>[] q;
555          int b, i;
556 <        if ((b = base) != sp &&
556 >        if (sp != (b = base) &&
557              (q = queue) != null && // must read q after b
558 <            (t = q[i = (q.length - 1) & b]) != null &&
558 >            (t = q[i = (q.length - 1) & b]) != null && base == b &&
559              UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) {
560              base = b + 1;
561              return t;
# Line 480 | Line 565 | public class ForkJoinWorkerThread extend
565  
566      /**
567       * Tries to take a task from the base of own queue. Assumes active
568 <     * status.  Called only by current thread.
568 >     * status.  Called only by this thread.
569       *
570       * @return a task, or null if none
571       */
# Line 490 | Line 575 | public class ForkJoinWorkerThread extend
575              ForkJoinTask<?> t;
576              int b, i;
577              while (sp != (b = base)) {
578 <                if ((t = q[i = (q.length - 1) & b]) != null &&
578 >                if ((t = q[i = (q.length - 1) & b]) != null && base == b &&
579                      UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase,
580                                                  t, null)) {
581                      base = b + 1;
# Line 503 | Line 588 | public class ForkJoinWorkerThread extend
588  
589      /**
590       * Returns a popped task, or null if empty. Assumes active status.
591 <     * Called only by current thread. (Note: a specialization of this
507 <     * code appears in scanWhileJoining.)
591 >     * Called only by this thread.
592       */
593 <    final ForkJoinTask<?> popTask() {
510 <        int s;
593 >    private ForkJoinTask<?> popTask() {
594          ForkJoinTask<?>[] q = queue;
595 <        if (q != null && (s = sp) != base) {
596 <            int i = (q.length - 1) & --s;
597 <            ForkJoinTask<?> t = q[i];
598 <            if (t != null && UNSAFE.compareAndSwapObject
599 <                (q, (i << qShift) + qBase, t, null)) {
600 <                sp = s;
601 <                return t;
595 >        if (q != null) {
596 >            int s;
597 >            while ((s = sp) != base) {
598 >                int i = (q.length - 1) & --s;
599 >                long u = (i << qShift) + qBase; // raw offset
600 >                ForkJoinTask<?> t = q[i];
601 >                if (t == null)   // lost to stealer
602 >                    break;
603 >                if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
604 >                    /*
605 >                     * Note: here and in related methods, as a
606 >                     * performance (not correctness) issue, we'd like
607 >                     * to encourage compiler not to arbitrarily
608 >                     * postpone setting sp after successful CAS.
609 >                     * Currently there is no intrinsic for arranging
610 >                     * this, but using Unsafe putOrderedInt may be a
611 >                     * preferable strategy on some compilers even
612 >                     * though its main effect is a pre-, not post-
613 >                     * fence. To simplify possible changes, the option
614 >                     * is left in comments next to the associated
615 >                     * assignments.
616 >                     */
617 >                    sp = s; // putOrderedInt may encourage more timely write
618 >                    // UNSAFE.putOrderedInt(this, spOffset, s);
619 >                    return t;
620 >                }
621              }
622          }
623          return null;
624      }
625  
626      /**
627 <     * Specialized version of popTask to pop only if
628 <     * topmost element is the given task. Called only
527 <     * by current thread while active.
627 >     * Specialized version of popTask to pop only if topmost element
628 >     * is the given task. Called only by this thread while active.
629       *
630       * @param t the task. Caller must ensure non-null.
631       */
632      final boolean unpushTask(ForkJoinTask<?> t) {
633          int s;
634          ForkJoinTask<?>[] q = queue;
635 <        if (q != null && UNSAFE.compareAndSwapObject
636 <            (q, (((q.length - 1) & (s = sp - 1)) << qShift) + qBase, t, null)){
637 <            sp = s;
635 >        if ((s = sp) != base && q != null &&
636 >            UNSAFE.compareAndSwapObject
637 >            (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) {
638 >            sp = s; // putOrderedInt may encourage more timely write
639 >            // UNSAFE.putOrderedInt(this, spOffset, s);
640              return true;
641          }
642          return false;
643      }
644  
645      /**
646 <     * Returns next task or null if empty or contended
646 >     * Returns next task, or null if empty or contended.
647       */
648      final ForkJoinTask<?> peekTask() {
649          ForkJoinTask<?>[] q = queue;
# Line 582 | Line 685 | public class ForkJoinWorkerThread extend
685       * Computes next value for random victim probe in scan().  Scans
686       * don't require a very high quality generator, but also not a
687       * crummy one.  Marsaglia xor-shift is cheap and works well enough.
688 <     * Note: This is manually inlined in scan()
688 >     * Note: This is manually inlined in scan().
689       */
690      private static final int xorShift(int r) {
691          r ^= r << 13;
# Line 610 | Line 713 | public class ForkJoinWorkerThread extend
713       */
714      private ForkJoinTask<?> scan() {
715          ForkJoinPool p = pool;
716 <        ForkJoinWorkerThread[] ws = p.workers;
717 <        int n = ws.length;            // upper bound of #workers
718 <        boolean canSteal = active;    // shadow active status
719 <        int r = seed;                 // extract seed once
720 <        int k = r;                    // index: random if j<0 else step
721 <        for (int j = -n; j < n; ++j) {
722 <            ForkJoinWorkerThread v = ws[k & (n - 1)];
723 <            r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
724 <            if (v != null && v.base != v.sp) {
725 <                if (canSteal ||       // ensure active status
726 <                    (canSteal = active = p.tryIncrementActiveCount())) {
727 <                    int b, i;         // inlined specialization of deqTask
728 <                    ForkJoinTask<?> t;
729 <                    ForkJoinTask<?>[] q;
730 <                    if ((b = v.base) != v.sp &&  // recheck
731 <                        (q = v.queue) != null &&
732 <                        (t = q[i = (q.length - 1) & b]) != null &&
733 <                        UNSAFE.compareAndSwapObject
734 <                        (q, (i << qShift) + qBase, t, null)) {
735 <                        v.base = b + 1;
736 <                        seed = r;
737 <                        ++stealCount;
738 <                        return t;
716 >        ForkJoinWorkerThread[] ws;        // worker array
717 >        int n;                            // upper bound of #workers
718 >        if ((ws = p.workers) != null && (n = ws.length) > 1) {
719 >            boolean canSteal = active;    // shadow active status
720 >            int r = seed;                 // extract seed once
721 >            int mask = n - 1;
722 >            int j = -n;                   // loop counter
723 >            int k = r;                    // worker index, random if j < 0
724 >            for (;;) {
725 >                ForkJoinWorkerThread v = ws[k & mask];
726 >                r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
727 >                ForkJoinTask<?>[] q; ForkJoinTask<?> t; int b, a;
728 >                if (v != null && (b = v.base) != v.sp &&
729 >                    (q = v.queue) != null) {
730 >                    int i = (q.length - 1) & b;
731 >                    long u = (i << qShift) + qBase; // raw offset
732 >                    int pid = poolIndex;
733 >                    if ((t = q[i]) != null) {
734 >                        if (!canSteal &&  // inline p.tryIncrementActiveCount
735 >                            UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
736 >                                                     a = p.runState, a + 1))
737 >                            canSteal = active = true;
738 >                        if (canSteal && v.base == b++ &&
739 >                            UNSAFE.compareAndSwapObject(q, u, t, null)) {
740 >                            v.base = b;
741 >                            v.stealHint = pid;
742 >                            UNSAFE.putOrderedObject(this,
743 >                                                    currentStealOffset, t);
744 >                            seed = r;
745 >                            ++stealCount;
746 >                            return t;
747 >                        }
748                      }
749 +                    j = -n;
750 +                    k = r;                // restart on contention
751                  }
752 <                j = -n;           // reset on contention
752 >                else if (++j <= 0)
753 >                    k = r;
754 >                else if (j <= n)
755 >                    k += (n >>> 1) | 1;
756 >                else
757 >                    break;
758              }
640            k = j >= 0? k + ((n >>> 1) | 1) : r;
759          }
760          return null;
761      }
# Line 645 | Line 763 | public class ForkJoinWorkerThread extend
763      // Run State management
764  
765      // status check methods used mainly by ForkJoinPool
766 <    final boolean isTerminating() { return (runState & TERMINATING) != 0; }
767 <    final boolean isTerminated()  { return (runState & TERMINATED) != 0; }
768 <    final boolean isSuspended()   { return (runState & SUSPENDED) != 0; }
769 <    final boolean isTrimmed()     { return (runState & TRIMMED) != 0; }
766 >    final boolean isRunning()    { return runState == 0; }
767 >    final boolean isTerminated() { return (runState & TERMINATED) != 0; }
768 >    final boolean isSuspended()  { return (runState & SUSPENDED) != 0; }
769 >    final boolean isTrimmed()    { return (runState & TRIMMED) != 0; }
770 >
771 >    final boolean isTerminating() {
772 >        if ((runState & TERMINATING) != 0)
773 >            return true;
774 >        if (pool.isAtLeastTerminating()) { // propagate pool state
775 >            shutdown();
776 >            return true;
777 >        }
778 >        return false;
779 >    }
780  
781      /**
782 <     * Sets state to TERMINATING, also resuming if suspended.
782 >     * Sets state to TERMINATING. Does NOT unpark or interrupt
783 >     * to wake up if currently blocked. Callers must do so if desired.
784       */
785      final void shutdown() {
786          for (;;) {
787              int s = runState;
788 +            if ((s & (TERMINATING|TERMINATED)) != 0)
789 +                break;
790              if ((s & SUSPENDED) != 0) { // kill and wakeup if suspended
791                  if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
792                                               (s & ~SUSPENDED) |
793 <                                             (TRIMMED|TERMINATING))) {
663 <                    LockSupport.unpark(this);
793 >                                             (TRIMMED|TERMINATING)))
794                      break;
665                }
795              }
796              else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
797                                                s | TERMINATING))
# Line 671 | Line 800 | public class ForkJoinWorkerThread extend
800      }
801  
802      /**
803 <     * Sets state to TERMINATED. Called only by this thread.
803 >     * Sets state to TERMINATED. Called only by onTermination().
804       */
805      private void setTerminated() {
806          int s;
# Line 681 | Line 810 | public class ForkJoinWorkerThread extend
810      }
811  
812      /**
684     * Instrumented version of park. Also used by ForkJoinPool.awaitEvent
685     */
686    final void doPark() {
687        ++parkCount;
688        LockSupport.park(this);
689    }
690
691    /**
813       * If suspended, tries to set status to unsuspended.
814 <     * Caller must unpark to actually resume
814 >     * Does NOT wake up if blocked.
815       *
816       * @return true if successful
817       */
818      final boolean tryUnsuspend() {
819          int s;
820 <        return (((s = runState) & SUSPENDED) != 0 &&
821 <                UNSAFE.compareAndSwapInt(this, runStateOffset, s,
822 <                                         s & ~SUSPENDED));
820 >        while (((s = runState) & SUSPENDED) != 0) {
821 >            if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
822 >                                         s & ~SUSPENDED))
823 >                return true;
824 >        }
825 >        return false;
826      }
827  
828      /**
829 <     * Sets suspended status and blocks as spare until resumed,
830 <     * shutdown, or timed out.
707 <     *
708 <     * @return false if trimmed
829 >     * Sets suspended status and blocks as spare until resumed
830 >     * or shutdown.
831       */
832 <    final boolean suspendAsSpare() {
833 <        for (;;) {               // set suspended unless terminating
832 >    final void suspendAsSpare() {
833 >        for (;;) {                  // set suspended unless terminating
834              int s = runState;
835              if ((s & TERMINATING) != 0) { // must kill
836                  if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
837                                               s | (TRIMMED | TERMINATING)))
838 <                    return false;
838 >                    return;
839              }
840              else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
841                                                s | SUSPENDED))
842                  break;
843          }
722        lastEventCount = 0;      // reset upon resume
844          ForkJoinPool p = pool;
845 <        p.releaseWaiters();      // help others progress
725 <        p.accumulateStealCount(this);
726 <        interrupted();           // clear/ignore interrupts
727 <        if (poolIndex < p.getParallelism()) { // untimed wait
728 <            while ((runState & SUSPENDED) != 0)
729 <                doPark();
730 <            return true;
731 <        }
732 <        return timedSuspend();   // timed wait if apparently non-core
733 <    }
734 <
735 <    /**
736 <     * Blocks as spare until resumed or timed out
737 <     * @return false if trimmed
738 <     */
739 <    private boolean timedSuspend() {
740 <        long nanos = SPARE_KEEPALIVE_NANOS;
741 <        long startTime = System.nanoTime();
845 >        p.pushSpare(this);
846          while ((runState & SUSPENDED) != 0) {
847 <            ++parkCount;
848 <            if ((nanos -= (System.nanoTime() - startTime)) > 0)
849 <                LockSupport.parkNanos(this, nanos);
850 <            else { // try to trim on timeout
851 <                int s = runState;
748 <                if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
749 <                                             (s & ~SUSPENDED) |
750 <                                             (TRIMMED|TERMINATING)))
751 <                    return false;
847 >            if (p.tryAccumulateStealCount(this)) {
848 >                interrupted();          // clear/ignore interrupts
849 >                if ((runState & SUSPENDED) == 0)
850 >                    break;
851 >                LockSupport.park(this);
852              }
853          }
754        return true;
854      }
855  
856      // Misc support methods for ForkJoinPool
# Line 761 | Line 860 | public class ForkJoinWorkerThread extend
860       * used by ForkJoinTask.
861       */
862      final int getQueueSize() {
863 <        return -base + sp;
864 <    }
766 <
767 <    /**
768 <     * Set locallyFifo mode. Called only by ForkJoinPool
769 <     */
770 <    final void setAsyncMode(boolean async) {
771 <        locallyFifo = async;
863 >        int n; // external calls must read base first
864 >        return (n = -base + sp) <= 0 ? 0 : n;
865      }
866  
867      /**
# Line 776 | Line 869 | public class ForkJoinWorkerThread extend
869       * thread.
870       */
871      final void cancelTasks() {
872 +        ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
873 +        if (cj != null && cj.status >= 0) {
874 +            cj.cancelIgnoringExceptions();
875 +            try {
876 +                this.interrupt(); // awaken wait
877 +            } catch (SecurityException ignore) {
878 +            }
879 +        }
880 +        ForkJoinTask<?> cs = currentSteal;
881 +        if (cs != null && cs.status >= 0)
882 +            cs.cancelIgnoringExceptions();
883          while (base != sp) {
884              ForkJoinTask<?> t = deqTask();
885              if (t != null)
# Line 803 | Line 907 | public class ForkJoinWorkerThread extend
907      // Support methods for ForkJoinTask
908  
909      /**
910 +     * Gets and removes a local task.
911 +     *
912 +     * @return a task, if available
913 +     */
914 +    final ForkJoinTask<?> pollLocalTask() {
915 +        ForkJoinPool p = pool;
916 +        while (sp != base) {
917 +            int a; // inline p.tryIncrementActiveCount
918 +            if (active ||
919 +                (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
920 +                                                   a = p.runState, a + 1)))
921 +                return locallyFifo ? locallyDeqTask() : popTask();
922 +        }
923 +        return null;
924 +    }
925 +
926 +    /**
927 +     * Gets and removes a local or stolen task.
928 +     *
929 +     * @return a task, if available
930 +     */
931 +    final ForkJoinTask<?> pollTask() {
932 +        ForkJoinTask<?> t = pollLocalTask();
933 +        if (t == null) {
934 +            t = scan();
935 +            // cannot retain/track/help steal
936 +            UNSAFE.putOrderedObject(this, currentStealOffset, null);
937 +        }
938 +        return t;
939 +    }
940 +
941 +    /**
942 +     * Possibly runs some tasks and/or blocks, until task is done.
943 +     *
944 +     * @param joinMe the task to join
945 +     * @param timed true if use timed wait
946 +     * @param nanos wait time if timed
947 +     */
948 +    final void joinTask(ForkJoinTask<?> joinMe, boolean timed, long nanos) {
949 +        // currentJoin only written by this thread; only need ordered store
950 +        ForkJoinTask<?> prevJoin = currentJoin;
951 +        UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
952 +        pool.awaitJoin(joinMe, this, timed, nanos);
953 +        UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
954 +    }
955 +
956 +    /**
957 +     * Tries to locate and help perform tasks for a stealer of the
958 +     * given task, or in turn one of its stealers.  Traces
959 +     * currentSteal->currentJoin links looking for a thread working on
960 +     * a descendant of the given task and with a non-empty queue to
961 +     * steal back and execute tasks from.
962 +     *
963 +     * The implementation is very branchy to cope with potential
964 +     * inconsistencies or loops encountering chains that are stale,
965 +     * unknown, or of length greater than MAX_HELP_DEPTH links.  All
966 +     * of these cases are dealt with by just returning back to the
967 +     * caller, who is expected to retry if other join mechanisms also
968 +     * don't work out.
969 +     *
970 +     * @param joinMe the task to join
971 +     * @param running if false, then must update pool count upon
972 +     *  running a task
973 +     * @return value of running on exit
974 +     */
975 +    final boolean helpJoinTask(ForkJoinTask<?> joinMe, boolean running) {
976 +        /*
977 +         * Initial checks to (1) abort if terminating; (2) clean out
978 +         * old cancelled tasks from local queue; (3) if joinMe is next
979 +         * task, run it; (4) omit scan if local queue nonempty (since
980 +         * it may contain non-descendents of joinMe).
981 +         */
982 +        ForkJoinPool p = pool;
983 +        for (;;) {
984 +            ForkJoinTask<?>[] q;
985 +            int s;
986 +            if (joinMe.status < 0)
987 +                return running;
988 +            else if ((runState & TERMINATING) != 0) {
989 +                joinMe.cancelIgnoringExceptions();
990 +                return running;
991 +            }
992 +            else if ((s = sp) == base || (q = queue) == null)
993 +                break;                            // queue empty
994 +            else {
995 +                int i = (q.length - 1) & --s;
996 +                long u = (i << qShift) + qBase;   // raw offset
997 +                ForkJoinTask<?> t = q[i];
998 +                if (t == null)
999 +                    break;                        // lost to a stealer
1000 +                else if (t != joinMe && t.status >= 0)
1001 +                    return running;               // cannot safely help
1002 +                else if ((running ||
1003 +                          (running = p.tryIncrementRunningCount())) &&
1004 +                         UNSAFE.compareAndSwapObject(q, u, t, null)) {
1005 +                    sp = s; // putOrderedInt may encourage more timely write
1006 +                    // UNSAFE.putOrderedInt(this, spOffset, s);
1007 +                    t.quietlyExec();
1008 +                }
1009 +            }
1010 +        }
1011 +
1012 +        int n;                                    // worker array size
1013 +        ForkJoinWorkerThread[] ws = p.workers;
1014 +        if (ws != null && (n = ws.length) > 1) {  // need at least 2 workers
1015 +            ForkJoinTask<?> task = joinMe;        // base of chain
1016 +            ForkJoinWorkerThread thread = this;   // thread with stolen task
1017 +
1018 +            outer:for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
1019 +                // Try to find v, the stealer of task, by first using hint
1020 +                ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
1021 +                if (v == null || v.currentSteal != task) {
1022 +                    for (int j = 0; ; ++j) {      // search array
1023 +                        if (j < n) {
1024 +                            ForkJoinTask<?> vs;
1025 +                            if ((v = ws[j]) != null &&
1026 +                                (vs = v.currentSteal) != null) {
1027 +                                if (joinMe.status < 0)
1028 +                                    break outer;
1029 +                                if (vs == task) {
1030 +                                    if (task.status < 0)
1031 +                                        break outer; // stale
1032 +                                    thread.stealHint = j;
1033 +                                    break;        // save hint for next time
1034 +                                }
1035 +                            }
1036 +                        }
1037 +                        else
1038 +                            break outer;          // no stealer
1039 +                    }
1040 +                }
1041 +
1042 +                // Try to help v, using specialized form of deqTask
1043 +                for (;;) {
1044 +                    if (joinMe.status < 0)
1045 +                        break outer;
1046 +                    int b = v.base;
1047 +                    ForkJoinTask<?>[] q = v.queue;
1048 +                    if (b == v.sp || q == null)
1049 +                        break;                    // empty
1050 +                    int i = (q.length - 1) & b;
1051 +                    long u = (i << qShift) + qBase;
1052 +                    ForkJoinTask<?> t = q[i];
1053 +                    if (task.status < 0)
1054 +                        break outer;              // stale
1055 +                    if (t != null &&
1056 +                        (running ||
1057 +                         (running = p.tryIncrementRunningCount())) &&
1058 +                        v.base == b++ &&
1059 +                        UNSAFE.compareAndSwapObject(q, u, t, null)) {
1060 +                        if (t != joinMe && joinMe.status < 0) {
1061 +                            UNSAFE.putObjectVolatile(q, u, t);
1062 +                            break outer;          // joinMe cancelled; back out
1063 +                        }
1064 +                        v.base = b;
1065 +                        if (t.status >= 0) {
1066 +                            ForkJoinTask<?> ps = currentSteal;
1067 +                            int pid = poolIndex;
1068 +                            v.stealHint = pid;
1069 +                            UNSAFE.putOrderedObject(this,
1070 +                                                    currentStealOffset, t);
1071 +                            t.quietlyExec();
1072 +                            UNSAFE.putOrderedObject(this,
1073 +                                                    currentStealOffset, ps);
1074 +                        }
1075 +                    }
1076 +                    else if ((runState & TERMINATING) != 0) {
1077 +                        joinMe.cancelIgnoringExceptions();
1078 +                        break outer;
1079 +                    }
1080 +                }
1081 +
1082 +                // Try to descend to find v's stealer
1083 +                ForkJoinTask<?> next = v.currentJoin;
1084 +                if (task.status < 0 || next == null || next == task ||
1085 +                    joinMe.status < 0)
1086 +                    break;                 // done, stale, dead-end, or cyclic
1087 +                task = next;
1088 +                thread = v;
1089 +            }
1090 +        }
1091 +        return running;
1092 +    }
1093 +
1094 +    /**
1095 +     * Implements ForkJoinTask.getSurplusQueuedTaskCount().
1096       * Returns an estimate of the number of tasks, offset by a
1097       * function of number of idle workers.
1098       *
# Line 854 | Line 1144 | public class ForkJoinWorkerThread extend
1144      }
1145  
1146      /**
857     * Gets and removes a local task.
858     *
859     * @return a task, if available
860     */
861    final ForkJoinTask<?> pollLocalTask() {
862        while (base != sp) {
863            if (active || (active = pool.tryIncrementActiveCount()))
864                return locallyFifo? locallyDeqTask() : popTask();
865        }
866        return null;
867    }
868
869    /**
870     * Gets and removes a local or stolen task.
871     *
872     * @return a task, if available
873     */
874    final ForkJoinTask<?> pollTask() {
875        ForkJoinTask<?> t;
876        return (t = pollLocalTask()) != null ? t : scan();
877    }
878
879    /**
880     * Returns a popped or stolen task, if available, unless joinMe is done
881     *
882     * This method is intrinsically nonmodular. To maintain the
883     * property that tasks are never stolen if the awaited task is
884     * ready, we must interleave mechanics of scan with status
885     * checks. We rely here on the commit points of deq that allow us
886     * to cancel a steal even after CASing slot to null, but before
887     * adjusting base index: If, after the CAS, we see that joinMe is
888     * ready, we can back out by placing the task back into the slot,
889     * without adjusting index. The scan loop is otherwise the same as
890     * in scan.
891     *
892     */
893    final ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
894        ForkJoinTask<?> popped; // prefer local tasks
895        if (base != sp && (popped = popWhileJoining(joinMe)) != null)
896            return popped;
897        if (joinMe.status >= 0) {
898            ForkJoinPool p = pool;
899            ForkJoinWorkerThread[] ws = p.workers;
900            int n = ws.length;
901            int r = seed;
902            int k = r;
903            for (int j = -n; j < n && joinMe.status >= 0; ++j) {
904                ForkJoinWorkerThread v = ws[k & (n - 1)];
905                r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
906                if (v != null) {
907                    int b = v.base;
908                    ForkJoinTask<?>[] q;
909                    if (b != v.sp && (q = v.queue) != null) {
910                        int i = (q.length - 1) & b;
911                        ForkJoinTask<?> t = q[i];
912                        if (t != null && UNSAFE.compareAndSwapObject
913                            (q, (i << qShift) + qBase, t, null)) {
914                            if (joinMe.status >= 0) {
915                                v.base = b + 1;
916                                seed = r;
917                                ++stealCount;
918                                return t;
919                            }
920                            UNSAFE.putObjectVolatile(q, (i<<qShift)+qBase, t);
921                            break; // back out
922                        }
923                        j = -n;
924                    }
925                }
926                k = j >= 0? k + ((n >>> 1) | 1) : r;
927            }
928        }
929        return null;
930    }
931
932    /**
933     * Version of popTask with join checks surrounding extraction.
934     * Uses the same backout strategy as scanWhileJoining. Note that
935     * we ignore locallyFifo flag for local tasks here since helping
936     * joins only make sense in LIFO mode.
937     *
938     * @return a popped task, if available, unless joinMe is done
939     */
940    private ForkJoinTask<?> popWhileJoining(ForkJoinTask<?> joinMe) {
941        int s;
942        ForkJoinTask<?>[] q;
943        while ((s = sp) != base && (q = queue) != null && joinMe.status >= 0) {
944            int i = (q.length - 1) & --s;
945            ForkJoinTask<?> t = q[i];
946            if (t != null && UNSAFE.compareAndSwapObject
947                (q, (i << qShift) + qBase, t, null)) {
948                if (joinMe.status >= 0) {
949                    sp = s;
950                    return t;
951                }
952                UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t);
953                break;  // back out
954            }
955        }
956        return null;
957    }
958
959    /**
1147       * Runs tasks until {@code pool.isQuiescent()}.
1148       */
1149      final void helpQuiescePool() {
1150 +        ForkJoinTask<?> ps = currentSteal; // to restore below
1151          for (;;) {
1152              ForkJoinTask<?> t = pollLocalTask();
1153              if (t != null || (t = scan()) != null)
1154 <                t.tryExec();
1154 >                t.quietlyExec();
1155              else {
1156                  ForkJoinPool p = pool;
1157 +                int a; // to inline CASes
1158                  if (active) {
1159 +                    if (!UNSAFE.compareAndSwapInt
1160 +                        (p, poolRunStateOffset, a = p.runState, a - 1))
1161 +                        continue;   // retry later
1162                      active = false; // inactivate
1163 <                    do {} while (!p.tryDecrementActiveCount());
1163 >                    UNSAFE.putOrderedObject(this, currentStealOffset, ps);
1164                  }
1165                  if (p.isQuiescent()) {
1166                      active = true; // re-activate
1167 <                    do {} while (!p.tryIncrementActiveCount());
1167 >                    do {} while (!UNSAFE.compareAndSwapInt
1168 >                                 (p, poolRunStateOffset, a = p.runState, a+1));
1169                      return;
1170                  }
1171              }
# Line 982 | Line 1175 | public class ForkJoinWorkerThread extend
1175      // Unsafe mechanics
1176  
1177      private static final sun.misc.Unsafe UNSAFE = getUnsafe();
1178 +    private static final long spOffset =
1179 +        objectFieldOffset("sp", ForkJoinWorkerThread.class);
1180      private static final long runStateOffset =
1181          objectFieldOffset("runState", ForkJoinWorkerThread.class);
1182 +    private static final long currentJoinOffset =
1183 +        objectFieldOffset("currentJoin", ForkJoinWorkerThread.class);
1184 +    private static final long currentStealOffset =
1185 +        objectFieldOffset("currentSteal", ForkJoinWorkerThread.class);
1186      private static final long qBase =
1187          UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
1188 +    private static final long poolRunStateOffset = // to inline CAS
1189 +        objectFieldOffset("runState", ForkJoinPool.class);
1190 +
1191      private static final int qShift;
1192  
1193      static {
# Line 993 | Line 1195 | public class ForkJoinWorkerThread extend
1195          if ((s & (s-1)) != 0)
1196              throw new Error("data type scale not a power of two");
1197          qShift = 31 - Integer.numberOfLeadingZeros(s);
1198 +        MAXIMUM_QUEUE_CAPACITY = 1 << (31 - qShift);
1199      }
1200  
1201      private static long objectFieldOffset(String field, Class<?> klazz) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines