ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.26 by dl, Sun Aug 2 11:54:31 2009 UTC vs.
Revision 1.35 by dl, Wed Jul 7 19:52:32 2010 UTC

# Line 8 | Line 8 | package jsr166y;
8  
9   import java.util.concurrent.*;
10  
11 + import java.util.Random;
12   import java.util.Collection;
13 + import java.util.concurrent.locks.LockSupport;
14  
15   /**
16   * A thread managed by a {@link ForkJoinPool}.  This class is
# Line 25 | Line 27 | import java.util.Collection;
27   */
28   public class ForkJoinWorkerThread extends Thread {
29      /*
30 <     * Algorithm overview:
30 >     * Overview:
31       *
32 <     * 1. Work-Stealing: Work-stealing queues are special forms of
33 <     * Deques that support only three of the four possible
34 <     * end-operations -- push, pop, and deq (aka steal), and only do
35 <     * so under the constraints that push and pop are called only from
36 <     * the owning thread, while deq may be called from other threads.
37 <     * (If you are unfamiliar with them, you probably want to read
38 <     * Herlihy and Shavit's book "The Art of Multiprocessor
39 <     * programming", chapter 16 describing these in more detail before
40 <     * proceeding.)  The main work-stealing queue design is roughly
41 <     * similar to "Dynamic Circular Work-Stealing Deque" by David
42 <     * Chase and Yossi Lev, SPAA 2005
43 <     * (http://research.sun.com/scalable/pubs/index.html).  The main
44 <     * difference ultimately stems from gc requirements that we null
45 <     * out taken slots as soon as we can, to maintain as small a
46 <     * footprint as possible even in programs generating huge numbers
47 <     * of tasks. To accomplish this, we shift the CAS arbitrating pop
48 <     * vs deq (steal) from being on the indices ("base" and "sp") to
49 <     * the slots themselves (mainly via method "casSlotNull()"). So,
50 <     * both a successful pop and deq mainly entail CAS'ing a non-null
51 <     * slot to null.  Because we rely on CASes of references, we do
52 <     * not need tag bits on base or sp.  They are simple ints as used
53 <     * in any circular array-based queue (see for example ArrayDeque).
54 <     * Updates to the indices must still be ordered in a way that
55 <     * guarantees that (sp - base) > 0 means the queue is empty, but
56 <     * otherwise may err on the side of possibly making the queue
57 <     * appear nonempty when a push, pop, or deq have not fully
58 <     * committed. Note that this means that the deq operation,
59 <     * considered individually, is not wait-free. One thief cannot
60 <     * successfully continue until another in-progress one (or, if
61 <     * previously empty, a push) completes.  However, in the
62 <     * aggregate, we ensure at least probabilistic non-blockingness. If
63 <     * an attempted steal fails, a thief always chooses a different
32 >     * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
33 >     * ForkJoinTasks. This class includes bookkeeping in support of
34 >     * worker activation, suspension, and lifecycle control described
35 >     * in more detail in the internal documentation of class
36 >     * ForkJoinPool. And as described further below, this class also
37 >     * includes special-cased support for some ForkJoinTask
38 >     * methods. But the main mechanics involve work-stealing:
39 >     *
40 >     * Work-stealing queues are special forms of Deques that support
41 >     * only three of the four possible end-operations -- push, pop,
42 >     * and deq (aka steal), under the further constraints that push
43 >     * and pop are called only from the owning thread, while deq may
44 >     * be called from other threads.  (If you are unfamiliar with
45 >     * them, you probably want to read Herlihy and Shavit's book "The
46 >     * Art of Multiprocessor programming", chapter 16 describing these
47 >     * in more detail before proceeding.)  The main work-stealing
48 >     * queue design is roughly similar to those in the papers "Dynamic
49 >     * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
50 >     * (http://research.sun.com/scalable/pubs/index.html) and
51 >     * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
52 >     * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
53 >     * The main differences ultimately stem from gc requirements that
54 >     * we null out taken slots as soon as we can, to maintain as small
55 >     * a footprint as possible even in programs generating huge
56 >     * numbers of tasks. To accomplish this, we shift the CAS
57 >     * arbitrating pop vs deq (steal) from being on the indices
58 >     * ("base" and "sp") to the slots themselves (mainly via method
59 >     * "casSlotNull()"). So, both a successful pop and deq mainly
60 >     * entail a CAS of a slot from non-null to null.  Because we rely
61 >     * on CASes of references, we do not need tag bits on base or sp.
62 >     * They are simple ints as used in any circular array-based queue
63 >     * (see for example ArrayDeque).  Updates to the indices must
64 >     * still be ordered in a way that guarantees that sp == base means
65 >     * the queue is empty, but otherwise may err on the side of
66 >     * possibly making the queue appear nonempty when a push, pop, or
67 >     * deq have not fully committed. Note that this means that the deq
68 >     * operation, considered individually, is not wait-free. One thief
69 >     * cannot successfully continue until another in-progress one (or,
70 >     * if previously empty, a push) completes.  However, in the
71 >     * aggregate, we ensure at least probabilistic non-blockingness.
72 >     * If an attempted steal fails, a thief always chooses a different
73       * random victim target to try next. So, in order for one thief to
74       * progress, it suffices for any in-progress deq or new push on
75       * any empty queue to complete. One reason this works well here is
76       * that apparently-nonempty often means soon-to-be-stealable,
77 <     * which gives threads a chance to activate if necessary before
78 <     * stealing (see below).
77 >     * which gives threads a chance to set activation status if
78 >     * necessary before stealing.
79       *
80       * This approach also enables support for "async mode" where local
81       * task processing is in FIFO, not LIFO order; simply by using a
# Line 72 | Line 83 | public class ForkJoinWorkerThread extend
83       * by the ForkJoinPool).  This allows use in message-passing
84       * frameworks in which tasks are never joined.
85       *
86 <     * Efficient implementation of this approach currently relies on
86 >     * When a worker would otherwise be blocked waiting to join a
87 >     * task, it first tries a form of linear helping: Each worker
88 >     * records (in field stolen) the most recent task it stole
89 >     * from some other worker. Plus, it records (in field joining) the
90 >     * task it is currently actively joining. Method joinTask uses
91 >     * these markers to try to find a worker to help (i.e., steal back
92 >     * a task from and execute it) that could hasten completion of the
93 >     * actively joined task. In essence, the joiner executes a task
94 >     * that would be on its own local deque had the to-be-joined task
95 >     * not been stolen. This may be seen as a conservative variant of
96 >     * the approach in Wagner & Calder "Leapfrogging: a portable
97 >     * technique for implementing efficient futures" SIGPLAN Notices,
98 >     * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
99 >     * in that: (1) We only maintain dependency links across workers
100 >     * upon steals, rather than maintain per-task bookkeeping.  This
101 >     * requires a linear scan of workers array to locate stealers,
102 >     * which isolates cost to when it is needed, rather than adding to
103 >     * per-task overhead.  (2) It is "shallow", ignoring nesting and
104 >     * potentially cyclic mutual steals.  (3) It is intentionally
105 >     * racy: field joining is updated only while actively joining,
106 >     * which means that we could miss links in the chain during
107 >     * long-lived tasks, GC stalls etc.  (4) We fall back to
108 >     * suspending the worker and if necessary replacing it with a
109 >     * spare (see ForkJoinPool.tryAwaitJoin).
110 >     *
111 >     * Efficient implementation of these algorithms currently relies on
112       * an uncomfortable amount of "Unsafe" mechanics. To maintain
113       * correct orderings, reads and writes of variable base require
114 <     * volatile ordering.  Variable sp does not require volatile write
115 <     * but needs cheaper store-ordering on writes.  Because they are
116 <     * protected by volatile base reads, reads of the queue array and
117 <     * its slots do not need volatile load semantics, but writes (in
118 <     * push) require store order and CASes (in pop and deq) require
119 <     * (volatile) CAS semantics. Since these combinations aren't
120 <     * supported using ordinary volatiles, the only way to accomplish
121 <     * these efficiently is to use direct Unsafe calls. (Using external
114 >     * volatile ordering.  Variable sp does not require volatile
115 >     * writes but still needs store-ordering, which we accomplish by
116 >     * pre-incrementing sp before filling the slot with an ordered
117 >     * store.  (Pre-incrementing also enables backouts used in
118 >     * scanWhileJoining.)  Because they are protected by volatile base
119 >     * reads, reads of the queue array and its slots by other threads
120 >     * do not need volatile load semantics, but writes (in push)
121 >     * require store order and CASes (in pop and deq) require
122 >     * (volatile) CAS semantics.  (Michael, Saraswat, and Vechev's
123 >     * algorithm has similar properties, but without support for
124 >     * nulling slots.)  Since these combinations aren't supported
125 >     * using ordinary volatiles, the only way to accomplish these
126 >     * efficiently is to use direct Unsafe calls. (Using external
127       * AtomicIntegers and AtomicReferenceArrays for the indices and
128       * array is significantly slower because of memory locality and
129 <     * indirection effects.) Further, performance on most platforms is
130 <     * very sensitive to placement and sizing of the (resizable) queue
131 <     * array.  Even though these queues don't usually become all that
132 <     * big, the initial size must be large enough to counteract cache
129 >     * indirection effects.)
130 >     *
131 >     * Further, performance on most platforms is very sensitive to
132 >     * placement and sizing of the (resizable) queue array.  Even
133 >     * though these queues don't usually become all that big, the
134 >     * initial size must be large enough to counteract cache
135       * contention effects across multiple queues (especially in the
136       * presence of GC cardmarking). Also, to improve thread-locality,
137 <     * queues are currently initialized immediately after the thread
138 <     * gets the initial signal to start processing tasks.  However,
139 <     * all queue-related methods except pushTask are written in a way
140 <     * that allows them to instead be lazily allocated and/or disposed
141 <     * of when empty. All together, these low-level implementation
142 <     * choices produce as much as a factor of 4 performance
143 <     * improvement compared to naive implementations, and enable the
144 <     * processing of billions of tasks per second, sometimes at the
145 <     * expense of ugliness.
146 <     *
147 <     * 2. Run control: The primary run control is based on a global
148 <     * counter (activeCount) held by the pool. It uses an algorithm
149 <     * similar to that in Herlihy and Shavit section 17.6 to cause
150 <     * threads to eventually block when all threads declare they are
151 <     * inactive. (See variable "scans".)  For this to work, threads
152 <     * must be declared active when executing tasks, and before
153 <     * stealing a task. They must be inactive before blocking on the
154 <     * Pool Barrier (awaiting a new submission or other Pool
155 <     * event). In between, there is some free play which we take
156 <     * advantage of to avoid contention and rapid flickering of the
157 <     * global activeCount: If inactive, we activate only if a victim
158 <     * queue appears to be nonempty (see above).  Similarly, a thread
159 <     * tries to inactivate only after a full scan of other threads.
117 <     * The net effect is that contention on activeCount is rarely a
118 <     * measurable performance issue. (There are also a few other cases
119 <     * where we scan for work rather than retry/block upon
120 <     * contention.)
121 <     *
122 <     * 3. Selection control. We maintain policy of always choosing to
123 <     * run local tasks rather than stealing, and always trying to
124 <     * steal tasks before trying to run a new submission. All steals
125 <     * are currently performed in randomly-chosen deq-order. It may be
126 <     * worthwhile to bias these with locality / anti-locality
127 <     * information, but doing this well probably requires more
128 <     * lower-level information from JVMs than currently provided.
137 >     * queues are initialized after starting.  All together, these
138 >     * low-level implementation choices produce as much as a factor of
139 >     * 4 performance improvement compared to naive implementations,
140 >     * and enable the processing of billions of tasks per second,
141 >     * sometimes at the expense of ugliness.
142 >     */
143 >
144 >    /**
145 >     * Generator for initial random seeds for random victim
146 >     * selection. This is used only to create initial seeds. Random
147 >     * steals use a cheaper xorshift generator per steal attempt. We
148 >     * expect only rare contention on seedGenerator, so just use a
149 >     * plain Random.
150 >     */
151 >    private static final Random seedGenerator = new Random();
152 >
153 >    /**
154 >     * The timeout value for suspending spares. Spare workers that
155 >     * remain unsignalled for more than this time may be trimmed
156 >     * (killed and removed from pool).  Since our goal is to avoid
157 >     * long-term thread buildup, the exact value of timeout does not
158 >     * matter too much so long as it avoids most false-alarm timeouts
159 >     * under GC stalls or momentarily high system load.
160       */
161 +    private static final long SPARE_KEEPALIVE_NANOS =
162 +        5L * 1000L * 1000L * 1000L; // 5 secs
163  
164      /**
165       * Capacity of work-stealing queue array upon initialization.
166 <     * Must be a power of two. Initial size must be at least 2, but is
166 >     * Must be a power of two. Initial size must be at least 4, but is
167       * padded to minimize cache effects.
168       */
169      private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
# Line 149 | Line 182 | public class ForkJoinWorkerThread extend
182      final ForkJoinPool pool;
183  
184      /**
185 <     * The work-stealing queue array. Size must be a power of two.
153 <     * Initialized when thread starts, to improve memory locality.
185 >     * The task most recently stolen from another worker
186       */
187 <    private ForkJoinTask<?>[] queue;
187 >    private volatile ForkJoinTask<?> stolen;
188  
189      /**
190 <     * Index (mod queue.length) of next queue slot to push to or pop
191 <     * from. It is written only by owner thread, via ordered store.
160 <     * Both sp and base are allowed to wrap around on overflow, but
161 <     * (sp - base) still estimates size.
190 >     * The task currently being joined, set only when actively
191 >     * trying to helpStealer.
192       */
193 <    private volatile int sp;
193 >    private volatile ForkJoinTask<?> joining;
194 >
195 >    /**
196 >     * The work-stealing queue array. Size must be a power of two.
197 >     * Initialized in onStart, to improve memory locality.
198 >     */
199 >    private ForkJoinTask<?>[] queue;
200  
201      /**
202       * Index (mod queue.length) of least valid queue slot, which is
# Line 169 | Line 205 | public class ForkJoinWorkerThread extend
205      private volatile int base;
206  
207      /**
208 <     * Activity status. When true, this worker is considered active.
209 <     * Must be false upon construction. It must be true when executing
210 <     * tasks, and BEFORE stealing a task. It must be false before
211 <     * calling pool.sync.
212 <     */
213 <    private boolean active;
208 >     * Index (mod queue.length) of next queue slot to push to or pop
209 >     * from. It is written only by owner thread, and accessed by other
210 >     * threads only after reading (volatile) base.  Both sp and base
211 >     * are allowed to wrap around on overflow, but (sp - base) still
212 >     * estimates size.
213 >     */
214 >    private int sp;
215  
216      /**
217 <     * Run state of this worker. Supports simple versions of the usual
218 <     * shutdown/shutdownNow control.
217 >     * Run state of this worker. In addition to the usual run levels,
218 >     * tracks if this worker is suspended as a spare, and if it was
219 >     * killed (trimmed) while suspended. However, "active" status is
220 >     * maintained separately.
221       */
222      private volatile int runState;
223  
224 +    private static final int TERMINATING = 0x01;
225 +    private static final int TERMINATED  = 0x02;
226 +    private static final int SUSPENDED   = 0x04; // inactive spare
227 +    private static final int TRIMMED     = 0x08; // killed while suspended
228 +
229 +    /**
230 +     * Number of LockSupport.park calls to block this thread for
231 +     * suspension or event waits. Used for internal instrumention;
232 +     * currently not exported but included because volatile write upon
233 +     * park also provides a workaround for a JVM bug.
234 +     */
235 +    volatile int parkCount;
236 +
237 +    /**
238 +     * Number of steals, transferred and reset in pool callbacks pool
239 +     * when idle Accessed directly by pool.
240 +     */
241 +    int stealCount;
242 +
243      /**
244       * Seed for random number generator for choosing steal victims.
245 <     * Uses Marsaglia xorshift. Must be nonzero upon initialization.
245 >     * Uses Marsaglia xorshift. Must be initialized as nonzero.
246       */
247      private int seed;
248  
249      /**
250 <     * Number of steals, transferred to pool when idle
250 >     * Activity status. When true, this worker is considered active.
251 >     * Accessed directly by pool.  Must be false upon construction.
252       */
253 <    private int stealCount;
253 >    boolean active;
254  
255      /**
256 +     * True if use local fifo, not default lifo, for local polling.
257 +     * Shadows value from ForkJoinPool, which resets it if changed
258 +     * pool-wide.
259 +     */
260 +    private final boolean locallyFifo;
261 +    
262 +    /**
263       * Index of this worker in pool array. Set once by pool before
264 <     * running, and accessed directly by pool during cleanup etc.
264 >     * running, and accessed directly by pool to locate this worker in
265 >     * its workers array.
266       */
267      int poolIndex;
268  
269      /**
270 <     * The last barrier event waited for. Accessed in pool callback
271 <     * methods, but only by current thread.
270 >     * The last pool event waited for. Accessed only by pool in
271 >     * callback methods invoked within this thread.
272       */
273 <    long lastEventCount;
273 >    int lastEventCount;
274  
275      /**
276 <     * True if use local fifo, not default lifo, for local polling
276 >     * Encoded index and event count of next event waiter. Used only
277 >     * by ForkJoinPool for managing event waiters.
278       */
279 <    private boolean locallyFifo;
279 >    volatile long nextWaiter;
280  
281      /**
282       * Creates a ForkJoinWorkerThread operating in the given pool.
# Line 217 | Line 285 | public class ForkJoinWorkerThread extend
285       * @throws NullPointerException if pool is null
286       */
287      protected ForkJoinWorkerThread(ForkJoinPool pool) {
220        if (pool == null) throw new NullPointerException();
288          this.pool = pool;
289 <        // Note: poolIndex is set by pool during construction
290 <        // Remaining initialization is deferred to onStart
289 >        this.locallyFifo = pool.locallyFifo;
290 >        // To avoid exposing construction details to subclasses,
291 >        // remaining initialization is in start() and onStart()
292      }
293  
294 <    // Public access methods
294 >    /**
295 >     * Performs additional initialization and starts this thread
296 >     */
297 >    final void start(int poolIndex, UncaughtExceptionHandler ueh) {
298 >        this.poolIndex = poolIndex;
299 >        if (ueh != null)
300 >            setUncaughtExceptionHandler(ueh);
301 >        setDaemon(true);
302 >        start();
303 >    }
304 >
305 >    // Public/protected methods
306  
307      /**
308       * Returns the pool hosting this thread.
# Line 248 | Line 327 | public class ForkJoinWorkerThread extend
327      }
328  
329      /**
330 <     * Establishes local first-in-first-out scheduling mode for forked
331 <     * tasks that are never joined.
332 <     *
333 <     * @param async if true, use locally FIFO scheduling
330 >     * Initializes internal state after construction but before
331 >     * processing any tasks. If you override this method, you must
332 >     * invoke super.onStart() at the beginning of the method.
333 >     * Initialization requires care: Most fields must have legal
334 >     * default values, to ensure that attempted accesses from other
335 >     * threads work correctly even before this thread starts
336 >     * processing tasks.
337       */
338 <    void setAsyncMode(boolean async) {
339 <        locallyFifo = async;
340 <    }
259 <
260 <    // Runstate management
261 <
262 <    // Runstate values. Order matters
263 <    private static final int RUNNING     = 0;
264 <    private static final int SHUTDOWN    = 1;
265 <    private static final int TERMINATING = 2;
266 <    private static final int TERMINATED  = 3;
267 <
268 <    final boolean isShutdown()    { return runState >= SHUTDOWN;  }
269 <    final boolean isTerminating() { return runState >= TERMINATING;  }
270 <    final boolean isTerminated()  { return runState == TERMINATED; }
271 <    final boolean shutdown()      { return transitionRunStateTo(SHUTDOWN); }
272 <    final boolean shutdownNow()   { return transitionRunStateTo(TERMINATING); }
338 >    protected void onStart() {
339 >        int rs = seedGenerator.nextInt();
340 >        seed = rs == 0? 1 : rs; // seed must be nonzero
341  
342 <    /**
343 <     * Transitions to at least the given state.
344 <     *
345 <     * @return {@code true} if not already at least at given state
278 <     */
279 <    private boolean transitionRunStateTo(int state) {
280 <        for (;;) {
281 <            int s = runState;
282 <            if (s >= state)
283 <                return false;
284 <            if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, state))
285 <                return true;
286 <        }
287 <    }
342 >        // Allocate name string and arrays in this thread
343 >        String pid = Integer.toString(pool.getPoolNumber());
344 >        String wid = Integer.toString(poolIndex);
345 >        setName("ForkJoinPool-" + pid + "-worker-" + wid);
346  
347 <    /**
290 <     * Tries to set status to active; fails on contention.
291 <     */
292 <    private boolean tryActivate() {
293 <        if (!active) {
294 <            if (!pool.tryIncrementActiveCount())
295 <                return false;
296 <            active = true;
297 <        }
298 <        return true;
347 >        queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
348      }
349  
350      /**
351 <     * Tries to set status to inactive; fails on contention.
351 >     * Performs cleanup associated with termination of this worker
352 >     * thread.  If you override this method, you must invoke
353 >     * {@code super.onTermination} at the end of the overridden method.
354 >     *
355 >     * @param exception the exception causing this thread to abort due
356 >     * to an unrecoverable error, or {@code null} if completed normally
357       */
358 <    private boolean tryInactivate() {
359 <        if (active) {
360 <            if (!pool.tryDecrementActiveCount())
361 <                return false;
362 <            active = false;
358 >    protected void onTermination(Throwable exception) {
359 >        try {
360 >            stolen = null;
361 >            joining = null;
362 >            cancelTasks();
363 >            setTerminated();
364 >            pool.workerTerminated(this);
365 >        } catch (Throwable ex) {        // Shouldn't ever happen
366 >            if (exception == null)      // but if so, at least rethrown
367 >                exception = ex;
368 >        } finally {
369 >            if (exception != null)
370 >                UNSAFE.throwException(exception);
371          }
310        return true;
311    }
312
313    /**
314     * Computes next value for random victim probe.  Scans don't
315     * require a very high quality generator, but also not a crummy
316     * one.  Marsaglia xor-shift is cheap and works well.
317     */
318    private static int xorShift(int r) {
319        r ^= (r << 13);
320        r ^= (r >>> 17);
321        return r ^ (r << 5);
372      }
373  
324    // Lifecycle methods
325
374      /**
375       * This method is required to be public, but should never be
376       * called explicitly. It performs the main run loop to execute
# Line 332 | Line 380 | public class ForkJoinWorkerThread extend
380          Throwable exception = null;
381          try {
382              onStart();
335            pool.sync(this); // await first pool event
383              mainLoop();
384          } catch (Throwable ex) {
385              exception = ex;
# Line 341 | Line 388 | public class ForkJoinWorkerThread extend
388          }
389      }
390  
391 +    // helpers for run()
392 +
393      /**
394 <     * Executes tasks until shut down.
394 >     * Find and execute tasks and check status while running
395       */
396      private void mainLoop() {
397 <        while (!isShutdown()) {
398 <            ForkJoinTask<?> t = pollTask();
399 <            if (t != null || (t = pollSubmission()) != null)
400 <                t.quietlyExec();
401 <            else if (tryInactivate())
402 <                pool.sync(this);
397 >        boolean ran = false;      // true if ran task in last loop iter
398 >        boolean prevRan = false;  // true if ran on last or previous step
399 >        ForkJoinPool p = pool;
400 >        for (;;) {
401 >            p.preStep(this, prevRan);
402 >            if (runState != 0)
403 >                return;
404 >            ForkJoinTask<?> t; // try to get and run stolen or submitted task
405 >            if ((t = scan()) != null || (t = pollSubmission()) != null) {
406 >                t.tryExec();
407 >                if (base != sp)
408 >                    runLocalTasks();
409 >                stolen = null;
410 >                prevRan = ran = true;
411 >            }
412 >            else {
413 >                prevRan = ran;
414 >                ran = false;
415 >            }
416          }
417      }
418  
419      /**
420 <     * Initializes internal state after construction but before
421 <     * processing any tasks. If you override this method, you must
360 <     * invoke super.onStart() at the beginning of the method.
361 <     * Initialization requires care: Most fields must have legal
362 <     * default values, to ensure that attempted accesses from other
363 <     * threads work correctly even before this thread starts
364 <     * processing tasks.
420 >     * Runs local tasks until queue is empty or shut down.  Call only
421 >     * while active.
422       */
423 <    protected void onStart() {
424 <        // Allocate while starting to improve chances of thread-local
425 <        // isolation
426 <        queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
427 <        // Initial value of seed need not be especially random but
428 <        // should differ across workers and must be nonzero
429 <        int p = poolIndex + 1;
430 <        seed = p + (p << 8) + (p << 16) + (p << 24); // spread bits
423 >    private void runLocalTasks() {
424 >        while (runState == 0) {
425 >            ForkJoinTask<?> t = locallyFifo? locallyDeqTask() : popTask();
426 >            if (t != null)
427 >                t.tryExec();
428 >            else if (base == sp)
429 >                break;
430 >        }
431      }
432  
433      /**
434 <     * Performs cleanup associated with termination of this worker
378 <     * thread.  If you override this method, you must invoke
379 <     * {@code super.onTermination} at the end of the overridden method.
434 >     * If a submission exists, try to activate and take it
435       *
436 <     * @param exception the exception causing this thread to abort due
382 <     * to an unrecoverable error, or {@code null} if completed normally
436 >     * @return a task, if available
437       */
438 <    protected void onTermination(Throwable exception) {
439 <        // Execute remaining local tasks unless aborting or terminating
440 <        while (exception == null &&  !pool.isTerminating() && base != sp) {
441 <            try {
442 <                ForkJoinTask<?> t = popTask();
443 <                if (t != null)
390 <                    t.quietlyExec();
391 <            } catch (Throwable ex) {
392 <                exception = ex;
438 >    private ForkJoinTask<?> pollSubmission() {
439 >        ForkJoinPool p = pool;
440 >        while (p.hasQueuedSubmissions()) {
441 >            if (active || (active = p.tryIncrementActiveCount())) {
442 >                ForkJoinTask<?> t = p.pollSubmission();
443 >                return t != null ? t : scan(); // if missed, rescan
444              }
445          }
446 <        // Cancel other tasks, transition status, notify pool, and
396 <        // propagate exception to uncaught exception handler
397 <        try {
398 <            do {} while (!tryInactivate()); // ensure inactive
399 <            cancelTasks();
400 <            runState = TERMINATED;
401 <            pool.workerTerminated(this);
402 <        } catch (Throwable ex) {        // Shouldn't ever happen
403 <            if (exception == null)      // but if so, at least rethrown
404 <                exception = ex;
405 <        } finally {
406 <            if (exception != null)
407 <                ForkJoinTask.rethrowException(exception);
408 <        }
446 >        return null;
447      }
448  
449 <    // Intrinsics-based support for queue operations.
450 <
451 <    /**
452 <     * Adds in store-order the given task at given slot of q to null.
453 <     * Caller must ensure q is non-null and index is in range.
449 >    /*
450 >     * Intrinsics-based atomic writes for queue slots. These are
451 >     * basically the same as methods in AtomicObjectArray, but
452 >     * specialized for (1) ForkJoinTask elements (2) requirement that
453 >     * nullness and bounds checks have already been performed by
454 >     * callers and (3) effective offsets are known not to overflow
455 >     * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't
456 >     * need corresponding version for reads: plain array reads are OK
457 >     * because they protected by other volatile reads and are
458 >     * confirmed by CASes.
459 >     *
460 >     * Most uses don't actually call these methods, but instead contain
461 >     * inlined forms that enable more predictable optimization.  We
462 >     * don't define the version of write used in pushTask at all, but
463 >     * instead inline there a store-fenced array slot write.
464       */
417    private static void setSlot(ForkJoinTask<?>[] q, int i,
418                                ForkJoinTask<?> t) {
419        UNSAFE.putOrderedObject(q, (i << qShift) + qBase, t);
420    }
465  
466      /**
467 <     * CAS given slot of q to null. Caller must ensure q is non-null
468 <     * and index is in range.
467 >     * CASes slot i of array q from t to null. Caller must ensure q is
468 >     * non-null and index is in range.
469       */
470 <    private static boolean casSlotNull(ForkJoinTask<?>[] q, int i,
471 <                                       ForkJoinTask<?> t) {
470 >    private static final boolean casSlotNull(ForkJoinTask<?>[] q, int i,
471 >                                             ForkJoinTask<?> t) {
472          return UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null);
473      }
474  
475      /**
476 <     * Sets sp in store-order.
476 >     * Performs a volatile write of the given task at given slot of
477 >     * array q.  Caller must ensure q is non-null and index is in
478 >     * range. This method is used only during resets and backouts.
479       */
480 <    private void storeSp(int s) {
481 <        UNSAFE.putOrderedInt(this, spOffset, s);
480 >    private static final void writeSlot(ForkJoinTask<?>[] q, int i,
481 >                                              ForkJoinTask<?> t) {
482 >        UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t);
483      }
484  
485 <    // Main queue methods
485 >    // queue methods
486  
487      /**
488 <     * Pushes a task. Called only by current thread.
488 >     * Pushes a task. Call only from this thread.
489       *
490       * @param t the task. Caller must ensure non-null.
491       */
492      final void pushTask(ForkJoinTask<?> t) {
493          ForkJoinTask<?>[] q = queue;
494 <        int mask = q.length - 1;
495 <        int s = sp;
496 <        setSlot(q, s & mask, t);
497 <        storeSp(++s);
498 <        if ((s -= base) == 1)
499 <            pool.signalWork();
500 <        else if (s >= mask)
454 <            growQueue();
494 >        int mask = q.length - 1; // implicit assert q != null
495 >        int s = sp++;            // ok to increment sp before slot write
496 >        UNSAFE.putOrderedObject(q, ((s & mask) << qShift) + qBase, t);
497 >        if ((s -= base) == 0)
498 >            pool.signalWork();   // was empty
499 >        else if (s == mask)
500 >            growQueue();         // is full
501      }
502  
503      /**
504       * Tries to take a task from the base of the queue, failing if
505 <     * either empty or contended.
505 >     * empty or contended. Note: Specializations of this code appear
506 >     * in locallyDeqTask and elsewhere.
507       *
508       * @return a task, or null if none or contended
509       */
510      final ForkJoinTask<?> deqTask() {
511          ForkJoinTask<?> t;
512          ForkJoinTask<?>[] q;
513 <        int i;
514 <        int b;
468 <        if (sp != (b = base) &&
513 >        int b, i;
514 >        if ((b = base) != sp &&
515              (q = queue) != null && // must read q after b
516 <            (t = q[i = (q.length - 1) & b]) != null &&
517 <            casSlotNull(q, i, t)) {
516 >            (t = q[i = (q.length - 1) & b]) != null && base == b &&
517 >            UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) {
518              base = b + 1;
519              return t;
520          }
# Line 476 | Line 522 | public class ForkJoinWorkerThread extend
522      }
523  
524      /**
525 <     * Tries to take a task from the base of own queue, activating if
526 <     * necessary, failing only if empty. Called only by current thread.
525 >     * Tries to take a task from the base of own queue. Assumes active
526 >     * status.  Called only by current thread.
527       *
528       * @return a task, or null if none
529       */
530      final ForkJoinTask<?> locallyDeqTask() {
531 <        int b;
532 <        while (sp != (b = base)) {
533 <            if (tryActivate()) {
534 <                ForkJoinTask<?>[] q = queue;
535 <                int i = (q.length - 1) & b;
536 <                ForkJoinTask<?> t = q[i];
537 <                if (t != null && casSlotNull(q, i, t)) {
531 >        ForkJoinTask<?>[] q = queue;
532 >        if (q != null) {
533 >            ForkJoinTask<?> t;
534 >            int b, i;
535 >            while (sp != (b = base)) {
536 >                if ((t = q[i = (q.length - 1) & b]) != null && base == b &&
537 >                    UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase,
538 >                                                t, null)) {
539                      base = b + 1;
540                      return t;
541                  }
# Line 498 | Line 545 | public class ForkJoinWorkerThread extend
545      }
546  
547      /**
548 <     * Returns a popped task, or null if empty. Ensures active status
549 <     * if non-null. Called only by current thread.
548 >     * Returns a popped task, or null if empty. Assumes active status.
549 >     * Called only by current thread. (Note: a specialization of this
550 >     * code appears in popWhileJoining.)
551       */
552      final ForkJoinTask<?> popTask() {
553 <        int s = sp;
554 <        while (s != base) {
555 <            if (tryActivate()) {
556 <                ForkJoinTask<?>[] q = queue;
557 <                int mask = q.length - 1;
558 <                int i = (s - 1) & mask;
559 <                ForkJoinTask<?> t = q[i];
560 <                if (t == null || !casSlotNull(q, i, t))
513 <                    break;
514 <                storeSp(s - 1);
553 >        int s;
554 >        ForkJoinTask<?>[] q;
555 >        if (base != (s = sp) && (q = queue) != null) {
556 >            int i = (q.length - 1) & --s;
557 >            ForkJoinTask<?> t = q[i];
558 >            if (t != null && UNSAFE.compareAndSwapObject
559 >                (q, (i << qShift) + qBase, t, null)) {
560 >                sp = s;
561                  return t;
562              }
563          }
# Line 519 | Line 565 | public class ForkJoinWorkerThread extend
565      }
566  
567      /**
568 <     * Specialized version of popTask to pop only if
569 <     * topmost element is the given task. Called only
570 <     * by current thread while active.
568 >     * Specialized version of popTask to pop only if topmost element
569 >     * is the given task. Called only by current thread while
570 >     * active.
571       *
572       * @param t the task. Caller must ensure non-null.
573       */
574      final boolean unpushTask(ForkJoinTask<?> t) {
575 <        ForkJoinTask<?>[] q = queue;
576 <        int mask = q.length - 1;
577 <        int s = sp - 1;
578 <        if (casSlotNull(q, s & mask, t)) {
579 <            storeSp(s);
575 >        int s;
576 >        ForkJoinTask<?>[] q;
577 >        if (base != (s = sp) && (q = queue) != null &&
578 >            UNSAFE.compareAndSwapObject
579 >            (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) {
580 >            sp = s;
581              return true;
582          }
583          return false;
# Line 570 | Line 617 | public class ForkJoinWorkerThread extend
617              ForkJoinTask<?> t = oldQ[oldIndex];
618              if (t != null && !casSlotNull(oldQ, oldIndex, t))
619                  t = null;
620 <            setSlot(newQ, b & newMask, t);
620 >            writeSlot(newQ, b & newMask, t);
621          } while (++b != bf);
622          pool.signalWork();
623      }
624  
625      /**
626 +     * Computes next value for random victim probe in scan().  Scans
627 +     * don't require a very high quality generator, but also not a
628 +     * crummy one.  Marsaglia xor-shift is cheap and works well enough.
629 +     * Note: This is manually inlined in scan()
630 +     */
631 +    private static final int xorShift(int r) {
632 +        r ^= r << 13;
633 +        r ^= r >>> 17;
634 +        return r ^ (r << 5);
635 +    }
636 +
637 +    /**
638       * Tries to steal a task from another worker. Starts at a random
639       * index of workers array, and probes workers until finding one
640       * with non-empty queue or finding that all are empty.  It
641       * randomly selects the first n probes. If these are empty, it
642 <     * resorts to a full circular traversal, which is necessary to
643 <     * accurately set active status by caller. Also restarts if pool
644 <     * events occurred since last scan, which forces refresh of
645 <     * workers array, in case barrier was associated with resize.
642 >     * resorts to a circular sweep, which is necessary to accurately
643 >     * set active status. (The circular sweep uses steps of
644 >     * approximately half the array size plus 1, to avoid bias
645 >     * stemming from leftmost packing of the array in ForkJoinPool.)
646       *
647       * This method must be both fast and quiet -- usually avoiding
648       * memory accesses that could disrupt cache sharing etc other than
649 <     * those needed to check for and take tasks. This accounts for,
650 <     * among other things, updating random seed in place without
651 <     * storing it until exit.
649 >     * those needed to check for and take tasks (or to activate if not
650 >     * already active). This accounts for, among other things,
651 >     * updating random seed in place without storing it until exit.
652       *
653       * @return a task, or null if none found
654       */
655      private ForkJoinTask<?> scan() {
656 <        ForkJoinTask<?> t = null;
657 <        int r = seed;                    // extract once to keep scan quiet
658 <        ForkJoinWorkerThread[] ws;       // refreshed on outer loop
659 <        int mask;                        // must be power 2 minus 1 and > 0
660 <        outer:do {
661 <            if ((ws = pool.workers) != null && (mask = ws.length - 1) > 0) {
662 <                int idx = r;
663 <                int probes = ~mask;      // use random index while negative
664 <                for (;;) {
665 <                    r = xorShift(r);     // update random seed
666 <                    ForkJoinWorkerThread v = ws[mask & idx];
667 <                    if (v == null || v.sp == v.base) {
668 <                        if (probes <= mask)
669 <                            idx = (probes++ < 0) ? r : (idx + 1);
670 <                        else
671 <                            break;
656 >        ForkJoinPool p = pool;
657 >        ForkJoinWorkerThread[] ws;        // worker array
658 >        int n;                            // upper bound of #workers
659 >        if ((ws = p.workers) != null && (n = ws.length) > 1) {
660 >            boolean canSteal = active;    // shadow active status
661 >            int r = seed;                 // extract seed once
662 >            int mask = n - 1;
663 >            int j = -n;                   // loop counter
664 >            int k = r;                    // worker index, random if j < 0
665 >            for (;;) {
666 >                ForkJoinWorkerThread v = ws[k & mask];
667 >                r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
668 >                if (v != null && v.base != v.sp) {
669 >                    if (canSteal ||       // ensure active status
670 >                        (canSteal = active = p.tryIncrementActiveCount())) {
671 >                        int b = v.base;   // inline specialized deqTask
672 >                        ForkJoinTask<?>[] q;
673 >                        if (b != v.sp && (q = v.queue) != null) {
674 >                            ForkJoinTask<?> t;
675 >                            int i = (q.length - 1) & b;
676 >                            long u = (i << qShift) + qBase; // raw offset
677 >                            if ((t = q[i]) != null && v.base == b &&
678 >                                UNSAFE.compareAndSwapObject(q, u, t, null)) {
679 >                                stolen = t;
680 >                                v.base = b + 1;
681 >                                seed = r;
682 >                                ++stealCount;
683 >                                return t;
684 >                            }
685 >                        }
686                      }
687 <                    else if (!tryActivate() || (t = v.deqTask()) == null)
688 <                        continue outer;  // restart on contention
616 <                    else
617 <                        break outer;
687 >                    j = -n;
688 >                    k = r;                // restart on contention
689                  }
690 +                else if (++j <= 0)
691 +                    k = r;
692 +                else if (j <= n)
693 +                    k += (n >>> 1) | 1;
694 +                else
695 +                    break;
696              }
697 <        } while (pool.hasNewSyncEvent(this)); // retry on pool events
698 <        seed = r;
622 <        return t;
697 >        }
698 >        return null;
699      }
700  
701 +    // Run State management
702 +
703 +    // status check methods used mainly by ForkJoinPool
704 +    final boolean isTerminating() { return (runState & TERMINATING) != 0; }
705 +    final boolean isTerminated()  { return (runState & TERMINATED) != 0; }
706 +    final boolean isSuspended()   { return (runState & SUSPENDED) != 0; }
707 +    final boolean isTrimmed()     { return (runState & TRIMMED) != 0; }
708 +
709      /**
710 <     * Gets and removes a local or stolen task.
627 <     *
628 <     * @return a task, if available
710 >     * Sets state to TERMINATING, also resuming if suspended.
711       */
712 <    final ForkJoinTask<?> pollTask() {
713 <        ForkJoinTask<?> t = locallyFifo ? locallyDeqTask() : popTask();
714 <        if (t == null && (t = scan()) != null)
715 <            ++stealCount;
716 <        return t;
712 >    final void shutdown() {
713 >        for (;;) {
714 >            int s = runState;
715 >            if ((s & SUSPENDED) != 0) { // kill and wakeup if suspended
716 >                if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
717 >                                             (s & ~SUSPENDED) |
718 >                                             (TRIMMED|TERMINATING))) {
719 >                    LockSupport.unpark(this);
720 >                    break;
721 >                }
722 >            }
723 >            else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
724 >                                              s | TERMINATING))
725 >                break;
726 >        }
727 >    }
728 >
729 >    /**
730 >     * Sets state to TERMINATED. Called only by this thread.
731 >     */
732 >    private void setTerminated() {
733 >        int s;
734 >        do {} while (!UNSAFE.compareAndSwapInt(this, runStateOffset,
735 >                                               s = runState,
736 >                                               s | (TERMINATING|TERMINATED)));
737 >    }
738 >
739 >    /**
740 >     * Instrumented version of park used by ForkJoinPool.awaitEvent
741 >     */
742 >    final void doPark() {
743 >        ++parkCount;
744 >        LockSupport.park(this);
745      }
746  
747      /**
748 <     * Gets a local task.
748 >     * If suspended, tries to set status to unsuspended.
749 >     * Caller must unpark to actually resume
750       *
751 <     * @return a task, if available
751 >     * @return true if successful
752       */
753 <    final ForkJoinTask<?> pollLocalTask() {
754 <        return locallyFifo ? locallyDeqTask() : popTask();
753 >    final boolean tryUnsuspend() {
754 >        int s = runState;
755 >        if ((s & SUSPENDED) != 0)
756 >            return UNSAFE.compareAndSwapInt(this, runStateOffset, s,
757 >                                            s & ~SUSPENDED);
758 >        return false;
759      }
760  
761      /**
762 <     * Returns a pool submission, if one exists, activating first.
762 >     * Sets suspended status and blocks as spare until resumed,
763 >     * shutdown, or timed out.
764       *
765 <     * @return a submission, if available
765 >     * @return false if trimmed
766       */
767 <    private ForkJoinTask<?> pollSubmission() {
768 <        ForkJoinPool p = pool;
769 <        while (p.hasQueuedSubmissions()) {
770 <            ForkJoinTask<?> t;
771 <            if (tryActivate() && (t = p.pollSubmission()) != null)
772 <                return t;
767 >    final boolean suspendAsSpare() {
768 >        for (;;) {               // set suspended unless terminating
769 >            int s = runState;
770 >            if ((s & TERMINATING) != 0) { // must kill
771 >                if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
772 >                                             s | (TRIMMED | TERMINATING)))
773 >                    return false;
774 >            }
775 >            else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
776 >                                              s | SUSPENDED))
777 >                break;
778          }
779 <        return null;
779 >        boolean timed;
780 >        long nanos;
781 >        long startTime;
782 >        if (poolIndex < pool.parallelism) {
783 >            timed = false;
784 >            nanos = 0L;
785 >            startTime = 0L;
786 >        }
787 >        else {
788 >            timed = true;
789 >            nanos = SPARE_KEEPALIVE_NANOS;
790 >            startTime = System.nanoTime();
791 >        }
792 >        pool.accumulateStealCount(this);
793 >        lastEventCount = 0;      // reset upon resume
794 >        interrupted();           // clear/ignore interrupts
795 >        while ((runState & SUSPENDED) != 0) {
796 >            ++parkCount;
797 >            if (!timed)
798 >                LockSupport.park(this);
799 >            else if ((nanos -= (System.nanoTime() - startTime)) > 0)
800 >                LockSupport.parkNanos(this, nanos);
801 >            else { // try to trim on timeout
802 >                int s = runState;
803 >                if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
804 >                                             (s & ~SUSPENDED) |
805 >                                             (TRIMMED|TERMINATING)))
806 >                    return false;
807 >            }
808 >        }
809 >        return true;
810      }
811  
812 <    // Methods accessed only by Pool
812 >    // Misc support methods for ForkJoinPool
813 >
814 >    /**
815 >     * Returns an estimate of the number of tasks in the queue.  Also
816 >     * used by ForkJoinTask.
817 >     */
818 >    final int getQueueSize() {
819 >        return -base + sp;
820 >    }
821  
822      /**
823       * Removes and cancels all tasks in queue.  Can be called from any
824       * thread.
825       */
826      final void cancelTasks() {
827 <        ForkJoinTask<?> t;
828 <        while (base != sp && (t = deqTask()) != null)
829 <            t.cancelIgnoringExceptions();
827 >        while (base != sp) {
828 >            ForkJoinTask<?> t = deqTask();
829 >            if (t != null)
830 >                t.cancelIgnoringExceptions();
831 >        }
832      }
833  
834      /**
# Line 677 | Line 838 | public class ForkJoinWorkerThread extend
838       */
839      final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
840          int n = 0;
841 <        ForkJoinTask<?> t;
842 <        while (base != sp && (t = deqTask()) != null) {
843 <            c.add(t);
844 <            ++n;
841 >        while (base != sp) {
842 >            ForkJoinTask<?> t = deqTask();
843 >            if (t != null) {
844 >                c.add(t);
845 >                ++n;
846 >            }
847          }
848          return n;
849      }
850  
851 <    /**
689 <     * Gets and clears steal count for accumulation by pool.  Called
690 <     * only when known to be idle (in pool.sync and termination).
691 <     */
692 <    final int getAndClearStealCount() {
693 <        int sc = stealCount;
694 <        stealCount = 0;
695 <        return sc;
696 <    }
851 >    // Support methods for ForkJoinTask
852  
853      /**
854 <     * Returns {@code true} if at least one worker in the given array
700 <     * appears to have at least one queued task.
854 >     * Possibly runs some tasks and/or blocks, until task is done.
855       *
856 <     * @param ws array of workers
856 >     * @param joinMe the task to join
857       */
858 <    static boolean hasQueuedTasks(ForkJoinWorkerThread[] ws) {
859 <        if (ws != null) {
860 <            int len = ws.length;
861 <            for (int j = 0; j < 2; ++j) { // need two passes for clean sweep
862 <                for (int i = 0; i < len; ++i) {
863 <                    ForkJoinWorkerThread w = ws[i];
864 <                    if (w != null && w.sp != w.base)
865 <                        return true;
858 >    final void joinTask(ForkJoinTask<?> joinMe) {
859 >        ForkJoinTask<?> prevJoining = joining;
860 >        joining = joinMe;
861 >        while (joinMe.status >= 0) {
862 >            int s = sp;
863 >            if (s == base) {
864 >                nonlocalJoinTask(joinMe);
865 >                break;
866 >            }
867 >            // process local task
868 >            ForkJoinTask<?> t;
869 >            ForkJoinTask<?>[] q = queue;
870 >            int i = (q.length - 1) & --s;
871 >            long u = (i << qShift) + qBase; // raw offset
872 >            if ((t = q[i]) != null &&
873 >                UNSAFE.compareAndSwapObject(q, u, t, null)) {
874 >                /*
875 >                 * This recheck (and similarly in nonlocalJoinTask)
876 >                 * handles cases where joinMe is independently
877 >                 * cancelled or forced even though there is other work
878 >                 * available. Back out of the pop by putting t back
879 >                 * into slot before we commit by setting sp.
880 >                 */
881 >                if (joinMe.status < 0) {
882 >                    UNSAFE.putObjectVolatile(q, u, t);
883 >                    break;
884                  }
885 +                sp = s;
886 +                t.tryExec();
887              }
888          }
889 <        return false;
889 >        joining = prevJoining;
890      }
891  
718    // Support methods for ForkJoinTask
719
892      /**
893 <     * Returns an estimate of the number of tasks in the queue.
893 >     * Tries to locate and help perform tasks for a stealer of the
894 >     * given task (or in turn one of its stealers), blocking (via
895 >     * pool.tryAwaitJoin) upon failure to find work.  Traces
896 >     * stolen->joining links looking for a thread working on
897 >     * a descendant of the given task and with a non-empty queue to
898 >     * steal back and execute tasks from. Inhibits mutual steal chains
899 >     * and scans on outer joins upon nesting to avoid unbounded
900 >     * growth.  Restarts search upon encountering inconsistencies.
901 >     * Tries to block if two passes agree that there are no remaining
902 >     * targets.
903 >     *
904 >     * @param joinMe the task to join
905       */
906 <    final int getQueueSize() {
907 <        // suppress momentarily negative values
908 <        return Math.max(0, sp - base);
906 >    private void nonlocalJoinTask(ForkJoinTask<?> joinMe) {
907 >        ForkJoinPool p = pool;
908 >        int scans = p.parallelism;       // give up if too many retries
909 >        ForkJoinTask<?> bottom = null;   // target seen when can't descend
910 >        restart: while (joinMe.status >= 0) {
911 >            ForkJoinTask<?> target = null;
912 >            ForkJoinTask<?> next = joinMe;
913 >            while (scans >= 0 && next != null) {
914 >                --scans;
915 >                target = next;
916 >                next = null;
917 >                ForkJoinWorkerThread v = null;
918 >                ForkJoinWorkerThread[] ws = p.workers;
919 >                int n = ws.length;
920 >                for (int j = 0; j < n; ++j) {
921 >                    ForkJoinWorkerThread w = ws[j];
922 >                    if (w != null && w.stolen == target) {
923 >                        v = w;
924 >                        break;
925 >                    }
926 >                }
927 >                if (v != null && v != this) {
928 >                    ForkJoinTask<?> prevStolen = stolen;
929 >                    int b;
930 >                    ForkJoinTask<?>[] q;
931 >                    while ((b = v.base) != v.sp && (q = v.queue) != null) {
932 >                        int i = (q.length - 1) & b;
933 >                        long u = (i << qShift) + qBase;
934 >                        ForkJoinTask<?> t = q[i];
935 >                        if (target.status < 0)
936 >                            continue restart;
937 >                        if (t != null && v.base == b &&
938 >                            UNSAFE.compareAndSwapObject(q, u, t, null)) {
939 >                            if (joinMe.status < 0) {
940 >                                UNSAFE.putObjectVolatile(q, u, t);
941 >                                return; // back out
942 >                            }
943 >                            stolen = t;
944 >                            v.base = b + 1;
945 >                            t.tryExec();
946 >                            stolen = prevStolen;
947 >                        }
948 >                        if (joinMe.status < 0)
949 >                            return;
950 >                    }
951 >                    next = v.joining;
952 >                }
953 >                if (target.status < 0)
954 >                    continue restart;  // inconsistent
955 >                if (joinMe.status < 0)
956 >                    return;
957 >            }
958 >
959 >            if (bottom != target)
960 >                bottom = target;    // recheck landing spot
961 >            else if (p.tryAwaitJoin(joinMe) < 0)
962 >                return;             // successfully blocked
963 >            Thread.yield();         // tame spin in case too many active
964 >        }
965      }
966  
967      /**
968       * Returns an estimate of the number of tasks, offset by a
969       * function of number of idle workers.
970 +     *
971 +     * This method provides a cheap heuristic guide for task
972 +     * partitioning when programmers, frameworks, tools, or languages
973 +     * have little or no idea about task granularity.  In essence by
974 +     * offering this method, we ask users only about tradeoffs in
975 +     * overhead vs expected throughput and its variance, rather than
976 +     * how finely to partition tasks.
977 +     *
978 +     * In a steady state strict (tree-structured) computation, each
979 +     * thread makes available for stealing enough tasks for other
980 +     * threads to remain active. Inductively, if all threads play by
981 +     * the same rules, each thread should make available only a
982 +     * constant number of tasks.
983 +     *
984 +     * The minimum useful constant is just 1. But using a value of 1
985 +     * would require immediate replenishment upon each steal to
986 +     * maintain enough tasks, which is infeasible.  Further,
987 +     * partitionings/granularities of offered tasks should minimize
988 +     * steal rates, which in general means that threads nearer the top
989 +     * of computation tree should generate more than those nearer the
990 +     * bottom. In perfect steady state, each thread is at
991 +     * approximately the same level of computation tree. However,
992 +     * producing extra tasks amortizes the uncertainty of progress and
993 +     * diffusion assumptions.
994 +     *
995 +     * So, users will want to use values larger, but not much larger
996 +     * than 1 to both smooth over transient shortages and hedge
997 +     * against uneven progress; as traded off against the cost of
998 +     * extra task overhead. We leave the user to pick a threshold
999 +     * value to compare with the results of this call to guide
1000 +     * decisions, but recommend values such as 3.
1001 +     *
1002 +     * When all threads are active, it is on average OK to estimate
1003 +     * surplus strictly locally. In steady-state, if one thread is
1004 +     * maintaining say 2 surplus tasks, then so are others. So we can
1005 +     * just use estimated queue length (although note that (sp - base)
1006 +     * can be an overestimate because of stealers lagging increments
1007 +     * of base).  However, this strategy alone leads to serious
1008 +     * mis-estimates in some non-steady-state conditions (ramp-up,
1009 +     * ramp-down, other stalls). We can detect many of these by
1010 +     * further considering the number of "idle" threads, that are
1011 +     * known to have zero queued tasks, so compensate by a factor of
1012 +     * (#idle/#active) threads.
1013       */
1014      final int getEstimatedSurplusTaskCount() {
1015 <        // The halving approximates weighting idle vs non-idle workers
734 <        return (sp - base) - (pool.getIdleThreadCount() >>> 1);
1015 >        return sp - base - pool.idlePerActive();
1016      }
1017  
1018      /**
1019 <     * Scans, returning early if joinMe done.
1019 >     * Gets and removes a local task.
1020 >     *
1021 >     * @return a task, if available
1022       */
1023 <    final ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
1024 <        ForkJoinTask<?> t = pollTask();
1025 <        if (t != null && joinMe.status < 0 && sp == base) {
1026 <            pushTask(t); // unsteal if done and this task would be stealable
744 <            t = null;
1023 >    final ForkJoinTask<?> pollLocalTask() {
1024 >        while (sp != base) {
1025 >            if (active || (active = pool.tryIncrementActiveCount()))
1026 >                return locallyFifo? locallyDeqTask() : popTask();
1027          }
1028 <        return t;
1028 >        return null;
1029 >    }
1030 >
1031 >    /**
1032 >     * Gets and removes a local or stolen task.
1033 >     *
1034 >     * @return a task, if available
1035 >     */
1036 >    final ForkJoinTask<?> pollTask() {
1037 >        ForkJoinTask<?> t;
1038 >        return (t = pollLocalTask()) != null ? t : scan();
1039      }
1040  
1041      /**
# Line 751 | Line 1043 | public class ForkJoinWorkerThread extend
1043       */
1044      final void helpQuiescePool() {
1045          for (;;) {
1046 <            ForkJoinTask<?> t = pollTask();
1047 <            if (t != null)
1048 <                t.quietlyExec();
1049 <            else if (tryInactivate() && pool.isQuiescent())
1050 <                break;
1046 >            ForkJoinTask<?> t = pollLocalTask();
1047 >            if (t != null || (t = scan()) != null) {
1048 >                t.tryExec();
1049 >                stolen = null;
1050 >            }
1051 >            else {
1052 >                ForkJoinPool p = pool;
1053 >                if (active) {
1054 >                    active = false; // inactivate
1055 >                    do {} while (!p.tryDecrementActiveCount());
1056 >                }
1057 >                if (p.isQuiescent()) {
1058 >                    active = true; // re-activate
1059 >                    do {} while (!p.tryIncrementActiveCount());
1060 >                    return;
1061 >                }
1062 >            }
1063          }
760        do {} while (!tryActivate()); // re-activate on exit
1064      }
1065  
1066      // Unsafe mechanics
1067  
1068      private static final sun.misc.Unsafe UNSAFE = getUnsafe();
766    private static final long spOffset =
767        objectFieldOffset("sp", ForkJoinWorkerThread.class);
1069      private static final long runStateOffset =
1070          objectFieldOffset("runState", ForkJoinWorkerThread.class);
1071 <    private static final long qBase;
1071 >    private static final long qBase =
1072 >        UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
1073      private static final int qShift;
1074  
1075      static {
774        qBase = UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
1076          int s = UNSAFE.arrayIndexScale(ForkJoinTask[].class);
1077          if ((s & (s-1)) != 0)
1078              throw new Error("data type scale not a power of two");

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines