ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinWorkerThread.java
Revision: 1.16
Committed: Thu May 27 16:47:21 2010 UTC (14 years ago) by dl
Branch: MAIN
Changes since 1.15: +85 -59 lines
Log Message:
Sync with jsr166y

File Contents

# User Rev Content
1 jsr166 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package java.util.concurrent;
8    
9 dl 1.14 import java.util.Random;
10 jsr166 1.1 import java.util.Collection;
11 dl 1.14 import java.util.concurrent.locks.LockSupport;
12 jsr166 1.1
13     /**
14     * A thread managed by a {@link ForkJoinPool}. This class is
15     * subclassable solely for the sake of adding functionality -- there
16 jsr166 1.7 * are no overridable methods dealing with scheduling or execution.
17     * However, you can override initialization and termination methods
18     * surrounding the main task processing loop. If you do create such a
19     * subclass, you will also need to supply a custom {@link
20     * ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code
21     * ForkJoinPool}.
22 jsr166 1.1 *
23     * @since 1.7
24     * @author Doug Lea
25     */
26     public class ForkJoinWorkerThread extends Thread {
27     /*
28 dl 1.14 * Overview:
29 jsr166 1.1 *
30 dl 1.14 * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
31     * ForkJoinTasks. This class includes bookkeeping in support of
32     * worker activation, suspension, and lifecycle control described
33     * in more detail in the internal documentation of class
34     * ForkJoinPool. And as described further below, this class also
35     * includes special-cased support for some ForkJoinTask
36     * methods. But the main mechanics involve work-stealing:
37     *
38     * Work-stealing queues are special forms of Deques that support
39     * only three of the four possible end-operations -- push, pop,
40     * and deq (aka steal), under the further constraints that push
41     * and pop are called only from the owning thread, while deq may
42     * be called from other threads. (If you are unfamiliar with
43     * them, you probably want to read Herlihy and Shavit's book "The
44     * Art of Multiprocessor programming", chapter 16 describing these
45     * in more detail before proceeding.) The main work-stealing
46     * queue design is roughly similar to those in the papers "Dynamic
47     * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
48     * (http://research.sun.com/scalable/pubs/index.html) and
49     * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
50     * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
51     * The main differences ultimately stem from gc requirements that
52     * we null out taken slots as soon as we can, to maintain as small
53     * a footprint as possible even in programs generating huge
54     * numbers of tasks. To accomplish this, we shift the CAS
55     * arbitrating pop vs deq (steal) from being on the indices
56     * ("base" and "sp") to the slots themselves (mainly via method
57     * "casSlotNull()"). So, both a successful pop and deq mainly
58     * entail a CAS of a slot from non-null to null. Because we rely
59     * on CASes of references, we do not need tag bits on base or sp.
60     * They are simple ints as used in any circular array-based queue
61     * (see for example ArrayDeque). Updates to the indices must
62     * still be ordered in a way that guarantees that sp == base means
63     * the queue is empty, but otherwise may err on the side of
64     * possibly making the queue appear nonempty when a push, pop, or
65     * deq have not fully committed. Note that this means that the deq
66     * operation, considered individually, is not wait-free. One thief
67     * cannot successfully continue until another in-progress one (or,
68     * if previously empty, a push) completes. However, in the
69     * aggregate, we ensure at least probabilistic non-blockingness.
70     * If an attempted steal fails, a thief always chooses a different
71     * random victim target to try next. So, in order for one thief to
72     * progress, it suffices for any in-progress deq or new push on
73     * any empty queue to complete. One reason this works well here is
74     * that apparently-nonempty often means soon-to-be-stealable,
75     * which gives threads a chance to set activation status if
76     * necessary before stealing.
77 jsr166 1.1 *
78 jsr166 1.6 * This approach also enables support for "async mode" where local
79     * task processing is in FIFO, not LIFO order; simply by using a
80     * version of deq rather than pop when locallyFifo is true (as set
81     * by the ForkJoinPool). This allows use in message-passing
82     * frameworks in which tasks are never joined.
83     *
84 jsr166 1.1 * Efficient implementation of this approach currently relies on
85     * an uncomfortable amount of "Unsafe" mechanics. To maintain
86     * correct orderings, reads and writes of variable base require
87 dl 1.14 * volatile ordering. Variable sp does not require volatile
88     * writes but still needs store-ordering, which we accomplish by
89     * pre-incrementing sp before filling the slot with an ordered
90     * store. (Pre-incrementing also enables backouts used in
91     * scanWhileJoining.) Because they are protected by volatile base
92     * reads, reads of the queue array and its slots by other threads
93     * do not need volatile load semantics, but writes (in push)
94     * require store order and CASes (in pop and deq) require
95     * (volatile) CAS semantics. (Michael, Saraswat, and Vechev's
96     * algorithm has similar properties, but without support for
97     * nulling slots.) Since these combinations aren't supported
98     * using ordinary volatiles, the only way to accomplish these
99 jsr166 1.8 * efficiently is to use direct Unsafe calls. (Using external
100 jsr166 1.1 * AtomicIntegers and AtomicReferenceArrays for the indices and
101     * array is significantly slower because of memory locality and
102 jsr166 1.8 * indirection effects.)
103 jsr166 1.9 *
104 jsr166 1.8 * Further, performance on most platforms is very sensitive to
105     * placement and sizing of the (resizable) queue array. Even
106     * though these queues don't usually become all that big, the
107     * initial size must be large enough to counteract cache
108 jsr166 1.1 * contention effects across multiple queues (especially in the
109     * presence of GC cardmarking). Also, to improve thread-locality,
110 dl 1.14 * queues are initialized after starting. All together, these
111     * low-level implementation choices produce as much as a factor of
112     * 4 performance improvement compared to naive implementations,
113     * and enable the processing of billions of tasks per second,
114     * sometimes at the expense of ugliness.
115 jsr166 1.1 */
116    
117     /**
118 dl 1.14 * Generator for initial random seeds for random victim
119     * selection. This is used only to create initial seeds. Random
120     * steals use a cheaper xorshift generator per steal attempt. We
121     * expect only rare contention on seedGenerator, so just use a
122     * plain Random.
123     */
124     private static final Random seedGenerator = new Random();
125    
126     /**
127     * The timeout value for suspending spares. Spare workers that
128     * remain unsignalled for more than this time may be trimmed
129     * (killed and removed from pool). Since our goal is to avoid
130     * long-term thread buildup, the exact value of timeout does not
131     * matter too much so long as it avoids most false-alarm timeouts
132     * under GC stalls or momentarily high system load.
133     */
134     private static final long SPARE_KEEPALIVE_NANOS =
135     5L * 1000L * 1000L * 1000L; // 5 secs
136    
137     /**
138 jsr166 1.1 * Capacity of work-stealing queue array upon initialization.
139     * Must be a power of two. Initial size must be at least 2, but is
140     * padded to minimize cache effects.
141     */
142     private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
143    
144     /**
145     * Maximum work-stealing queue array size. Must be less than or
146     * equal to 1 << 28 to ensure lack of index wraparound. (This
147     * is less than usual bounds, because we need leftshift by 3
148     * to be in int range).
149     */
150     private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28;
151    
152     /**
153     * The pool this thread works in. Accessed directly by ForkJoinTask.
154     */
155     final ForkJoinPool pool;
156    
157     /**
158     * The work-stealing queue array. Size must be a power of two.
159 dl 1.14 * Initialized in onStart, to improve memory locality.
160 jsr166 1.1 */
161     private ForkJoinTask<?>[] queue;
162    
163     /**
164 dl 1.14 * Index (mod queue.length) of least valid queue slot, which is
165     * always the next position to steal from if nonempty.
166     */
167     private volatile int base;
168    
169     /**
170 jsr166 1.1 * Index (mod queue.length) of next queue slot to push to or pop
171 dl 1.14 * from. It is written only by owner thread, and accessed by other
172     * threads only after reading (volatile) base. Both sp and base
173     * are allowed to wrap around on overflow, but (sp - base) still
174     * estimates size.
175     */
176     private int sp;
177 jsr166 1.1
178     /**
179 dl 1.14 * Run state of this worker. In addition to the usual run levels,
180     * tracks if this worker is suspended as a spare, and if it was
181     * killed (trimmed) while suspended. However, "active" status is
182     * maintained separately.
183 jsr166 1.1 */
184 dl 1.14 private volatile int runState;
185    
186     private static final int TERMINATING = 0x01;
187     private static final int TERMINATED = 0x02;
188     private static final int SUSPENDED = 0x04; // inactive spare
189     private static final int TRIMMED = 0x08; // killed while suspended
190 jsr166 1.1
191     /**
192 dl 1.14 * Number of LockSupport.park calls to block this thread for
193     * suspension or event waits. Used for internal instrumention;
194     * currently not exported but included because volatile write upon
195     * park also provides a workaround for a JVM bug.
196 jsr166 1.1 */
197 dl 1.14 private volatile int parkCount;
198 jsr166 1.1
199     /**
200 dl 1.14 * Number of steals, transferred and reset in pool callbacks pool
201     * when idle Accessed directly by pool.
202 jsr166 1.1 */
203 dl 1.14 int stealCount;
204 jsr166 1.1
205     /**
206     * Seed for random number generator for choosing steal victims.
207 dl 1.14 * Uses Marsaglia xorshift. Must be initialized as nonzero.
208 jsr166 1.1 */
209     private int seed;
210    
211     /**
212 dl 1.14 * Activity status. When true, this worker is considered active.
213     * Accessed directly by pool. Must be false upon construction.
214     */
215     boolean active;
216    
217     /**
218     * True if use local fifo, not default lifo, for local polling.
219     * Shadows value from ForkJoinPool, which resets it if changed
220     * pool-wide.
221 jsr166 1.1 */
222 dl 1.14 private boolean locallyFifo;
223 jsr166 1.1
224     /**
225     * Index of this worker in pool array. Set once by pool before
226 dl 1.14 * running, and accessed directly by pool to locate this worker in
227     * its workers array.
228 jsr166 1.1 */
229     int poolIndex;
230    
231     /**
232 dl 1.14 * The last pool event waited for. Accessed only by pool in
233     * callback methods invoked within this thread.
234 jsr166 1.1 */
235 dl 1.14 int lastEventCount;
236 jsr166 1.1
237     /**
238 dl 1.14 * Encoded index and event count of next event waiter. Used only
239     * by ForkJoinPool for managing event waiters.
240 jsr166 1.1 */
241 dl 1.14 volatile long nextWaiter;
242 jsr166 1.1
243     /**
244     * Creates a ForkJoinWorkerThread operating in the given pool.
245     *
246     * @param pool the pool this thread works in
247     * @throws NullPointerException if pool is null
248     */
249     protected ForkJoinWorkerThread(ForkJoinPool pool) {
250     if (pool == null) throw new NullPointerException();
251     this.pool = pool;
252 dl 1.14 // To avoid exposing construction details to subclasses,
253     // remaining initialization is in start() and onStart()
254 jsr166 1.1 }
255    
256 dl 1.14 /**
257     * Performs additional initialization and starts this thread
258     */
259     final void start(int poolIndex, boolean locallyFifo,
260     UncaughtExceptionHandler ueh) {
261     this.poolIndex = poolIndex;
262     this.locallyFifo = locallyFifo;
263     if (ueh != null)
264     setUncaughtExceptionHandler(ueh);
265     setDaemon(true);
266     start();
267     }
268    
269     // Public/protected methods
270 jsr166 1.1
271     /**
272     * Returns the pool hosting this thread.
273     *
274     * @return the pool
275     */
276     public ForkJoinPool getPool() {
277     return pool;
278     }
279    
280     /**
281     * Returns the index number of this thread in its pool. The
282     * returned value ranges from zero to the maximum number of
283     * threads (minus one) that have ever been created in the pool.
284     * This method may be useful for applications that track status or
285     * collect results per-worker rather than per-task.
286     *
287     * @return the index number
288     */
289     public int getPoolIndex() {
290     return poolIndex;
291     }
292    
293     /**
294 dl 1.14 * Initializes internal state after construction but before
295     * processing any tasks. If you override this method, you must
296     * invoke super.onStart() at the beginning of the method.
297     * Initialization requires care: Most fields must have legal
298     * default values, to ensure that attempted accesses from other
299     * threads work correctly even before this thread starts
300     * processing tasks.
301 jsr166 1.1 */
302 dl 1.14 protected void onStart() {
303     int rs = seedGenerator.nextInt();
304     seed = rs == 0? 1 : rs; // seed must be nonzero
305 jsr166 1.1
306 dl 1.14 // Allocate name string and queue array in this thread
307     String pid = Integer.toString(pool.getPoolNumber());
308     String wid = Integer.toString(poolIndex);
309     setName("ForkJoinPool-" + pid + "-worker-" + wid);
310 jsr166 1.1
311 dl 1.14 queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
312     }
313 jsr166 1.1
314     /**
315 dl 1.14 * Performs cleanup associated with termination of this worker
316     * thread. If you override this method, you must invoke
317     * {@code super.onTermination} at the end of the overridden method.
318 jsr166 1.4 *
319 dl 1.14 * @param exception the exception causing this thread to abort due
320     * to an unrecoverable error, or {@code null} if completed normally
321 jsr166 1.1 */
322 dl 1.14 protected void onTermination(Throwable exception) {
323     try {
324     cancelTasks();
325     setTerminated();
326     pool.workerTerminated(this);
327     } catch (Throwable ex) { // Shouldn't ever happen
328     if (exception == null) // but if so, at least rethrown
329     exception = ex;
330     } finally {
331     if (exception != null)
332     UNSAFE.throwException(exception);
333 jsr166 1.1 }
334     }
335    
336     /**
337     * This method is required to be public, but should never be
338     * called explicitly. It performs the main run loop to execute
339     * ForkJoinTasks.
340     */
341     public void run() {
342     Throwable exception = null;
343     try {
344     onStart();
345     mainLoop();
346     } catch (Throwable ex) {
347     exception = ex;
348     } finally {
349     onTermination(exception);
350     }
351     }
352    
353 dl 1.14 // helpers for run()
354    
355 jsr166 1.1 /**
356 dl 1.14 * Find and execute tasks and check status while running
357 jsr166 1.1 */
358     private void mainLoop() {
359 dl 1.14 boolean ran = false; // true if ran task on previous step
360     ForkJoinPool p = pool;
361     for (;;) {
362     p.preStep(this, ran);
363     if (runState != 0)
364     return;
365     ForkJoinTask<?> t; // try to get and run stolen or submitted task
366     if (ran = (t = scan()) != null || (t = pollSubmission()) != null) {
367     t.tryExec();
368     if (base != sp)
369     runLocalTasks();
370     }
371 jsr166 1.1 }
372     }
373    
374     /**
375 dl 1.14 * Runs local tasks until queue is empty or shut down. Call only
376     * while active.
377 jsr166 1.1 */
378 dl 1.14 private void runLocalTasks() {
379     while (runState == 0) {
380     ForkJoinTask<?> t = locallyFifo? locallyDeqTask() : popTask();
381     if (t != null)
382     t.tryExec();
383     else if (base == sp)
384     break;
385     }
386 jsr166 1.1 }
387    
388     /**
389 dl 1.14 * If a submission exists, try to activate and take it
390 jsr166 1.1 *
391 dl 1.14 * @return a task, if available
392 jsr166 1.1 */
393 dl 1.14 private ForkJoinTask<?> pollSubmission() {
394     ForkJoinPool p = pool;
395     while (p.hasQueuedSubmissions()) {
396     if (active || (active = p.tryIncrementActiveCount())) {
397     ForkJoinTask<?> t = p.pollSubmission();
398     return t != null ? t : scan(); // if missed, rescan
399 jsr166 1.1 }
400     }
401 dl 1.14 return null;
402 jsr166 1.1 }
403    
404 dl 1.14 /*
405     * Intrinsics-based atomic writes for queue slots. These are
406     * basically the same as methods in AtomicObjectArray, but
407     * specialized for (1) ForkJoinTask elements (2) requirement that
408     * nullness and bounds checks have already been performed by
409     * callers and (3) effective offsets are known not to overflow
410     * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't
411     * need corresponding version for reads: plain array reads are OK
412     * because they protected by other volatile reads and are
413     * confirmed by CASes.
414     *
415     * Most uses don't actually call these methods, but instead contain
416     * inlined forms that enable more predictable optimization. We
417     * don't define the version of write used in pushTask at all, but
418     * instead inline there a store-fenced array slot write.
419 jsr166 1.1 */
420    
421     /**
422 dl 1.14 * CASes slot i of array q from t to null. Caller must ensure q is
423     * non-null and index is in range.
424 jsr166 1.1 */
425 dl 1.14 private static final boolean casSlotNull(ForkJoinTask<?>[] q, int i,
426     ForkJoinTask<?> t) {
427     return UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null);
428 jsr166 1.1 }
429    
430     /**
431 dl 1.14 * Performs a volatile write of the given task at given slot of
432     * array q. Caller must ensure q is non-null and index is in
433     * range. This method is used only during resets and backouts.
434 jsr166 1.1 */
435 dl 1.14 private static final void writeSlot(ForkJoinTask<?>[] q, int i,
436     ForkJoinTask<?> t) {
437     UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t);
438 jsr166 1.1 }
439    
440 dl 1.14 // queue methods
441 jsr166 1.1
442     /**
443 dl 1.14 * Pushes a task. Call only from this thread.
444 jsr166 1.1 *
445     * @param t the task. Caller must ensure non-null.
446     */
447     final void pushTask(ForkJoinTask<?> t) {
448 dl 1.14 int s;
449 jsr166 1.1 ForkJoinTask<?>[] q = queue;
450 dl 1.14 int mask = q.length - 1; // implicit assert q != null
451     UNSAFE.putOrderedObject(q, (((s = sp++) & mask) << qShift) + qBase, t);
452     if ((s -= base) <= 0)
453 jsr166 1.1 pool.signalWork();
454 dl 1.14 else if (s + 1 >= mask)
455 jsr166 1.1 growQueue();
456     }
457    
458     /**
459     * Tries to take a task from the base of the queue, failing if
460 dl 1.14 * empty or contended. Note: Specializations of this code appear
461     * in scan and scanWhileJoining.
462 jsr166 1.1 *
463     * @return a task, or null if none or contended
464     */
465     final ForkJoinTask<?> deqTask() {
466     ForkJoinTask<?> t;
467     ForkJoinTask<?>[] q;
468 dl 1.14 int b, i;
469     if ((b = base) != sp &&
470 jsr166 1.1 (q = queue) != null && // must read q after b
471     (t = q[i = (q.length - 1) & b]) != null &&
472 dl 1.14 UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) {
473 jsr166 1.1 base = b + 1;
474     return t;
475     }
476     return null;
477     }
478    
479     /**
480 dl 1.14 * Tries to take a task from the base of own queue. Assumes active
481     * status. Called only by current thread.
482 jsr166 1.6 *
483     * @return a task, or null if none
484     */
485     final ForkJoinTask<?> locallyDeqTask() {
486 dl 1.14 ForkJoinTask<?>[] q = queue;
487     if (q != null) {
488     ForkJoinTask<?> t;
489     int b, i;
490     while (sp != (b = base)) {
491     if ((t = q[i = (q.length - 1) & b]) != null &&
492     UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase,
493     t, null)) {
494 jsr166 1.6 base = b + 1;
495     return t;
496     }
497     }
498     }
499     return null;
500     }
501    
502     /**
503 dl 1.14 * Returns a popped task, or null if empty. Assumes active status.
504     * Called only by current thread. (Note: a specialization of this
505 dl 1.16 * code appears in popWhileJoining.)
506 jsr166 1.1 */
507     final ForkJoinTask<?> popTask() {
508 dl 1.14 int s;
509 dl 1.16 ForkJoinTask<?>[] q;
510     if (base != (s = sp) && (q = queue) != null) {
511 dl 1.14 int i = (q.length - 1) & --s;
512     ForkJoinTask<?> t = q[i];
513     if (t != null && UNSAFE.compareAndSwapObject
514     (q, (i << qShift) + qBase, t, null)) {
515     sp = s;
516 jsr166 1.1 return t;
517     }
518     }
519     return null;
520     }
521    
522     /**
523 dl 1.16 * Specialized version of popTask to pop only if topmost element
524     * is the given task. Called only by current thread while
525     * active.
526 jsr166 1.1 *
527     * @param t the task. Caller must ensure non-null.
528     */
529     final boolean unpushTask(ForkJoinTask<?> t) {
530 dl 1.14 int s;
531 dl 1.16 ForkJoinTask<?>[] q;
532     if (base != (s = sp) && (q = queue) != null &&
533     UNSAFE.compareAndSwapObject
534     (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) {
535 dl 1.14 sp = s;
536 jsr166 1.1 return true;
537     }
538     return false;
539     }
540    
541     /**
542 jsr166 1.6 * Returns next task or null if empty or contended
543 jsr166 1.1 */
544     final ForkJoinTask<?> peekTask() {
545     ForkJoinTask<?>[] q = queue;
546     if (q == null)
547     return null;
548     int mask = q.length - 1;
549     int i = locallyFifo ? base : (sp - 1);
550     return q[i & mask];
551     }
552    
553     /**
554     * Doubles queue array size. Transfers elements by emulating
555     * steals (deqs) from old array and placing, oldest first, into
556     * new array.
557     */
558     private void growQueue() {
559     ForkJoinTask<?>[] oldQ = queue;
560     int oldSize = oldQ.length;
561     int newSize = oldSize << 1;
562     if (newSize > MAXIMUM_QUEUE_CAPACITY)
563     throw new RejectedExecutionException("Queue capacity exceeded");
564     ForkJoinTask<?>[] newQ = queue = new ForkJoinTask<?>[newSize];
565    
566     int b = base;
567     int bf = b + oldSize;
568     int oldMask = oldSize - 1;
569     int newMask = newSize - 1;
570     do {
571     int oldIndex = b & oldMask;
572     ForkJoinTask<?> t = oldQ[oldIndex];
573     if (t != null && !casSlotNull(oldQ, oldIndex, t))
574     t = null;
575 dl 1.14 writeSlot(newQ, b & newMask, t);
576 jsr166 1.1 } while (++b != bf);
577     pool.signalWork();
578     }
579    
580     /**
581 dl 1.14 * Computes next value for random victim probe in scan(). Scans
582     * don't require a very high quality generator, but also not a
583     * crummy one. Marsaglia xor-shift is cheap and works well enough.
584     * Note: This is manually inlined in scan()
585     */
586     private static final int xorShift(int r) {
587     r ^= r << 13;
588     r ^= r >>> 17;
589     return r ^ (r << 5);
590     }
591    
592     /**
593 jsr166 1.1 * Tries to steal a task from another worker. Starts at a random
594     * index of workers array, and probes workers until finding one
595     * with non-empty queue or finding that all are empty. It
596     * randomly selects the first n probes. If these are empty, it
597 dl 1.14 * resorts to a circular sweep, which is necessary to accurately
598     * set active status. (The circular sweep uses steps of
599     * approximately half the array size plus 1, to avoid bias
600     * stemming from leftmost packing of the array in ForkJoinPool.)
601 jsr166 1.1 *
602     * This method must be both fast and quiet -- usually avoiding
603     * memory accesses that could disrupt cache sharing etc other than
604 dl 1.14 * those needed to check for and take tasks (or to activate if not
605     * already active). This accounts for, among other things,
606     * updating random seed in place without storing it until exit.
607 jsr166 1.1 *
608     * @return a task, or null if none found
609     */
610     private ForkJoinTask<?> scan() {
611 dl 1.14 ForkJoinPool p = pool;
612 dl 1.16 ForkJoinWorkerThread[] ws; // worker array
613     int n; // upper bound of #workers
614     if ((ws = p.workers) != null && (n = ws.length) > 1) {
615     boolean canSteal = active; // shadow active status
616     int r = seed; // extract seed once
617     int mask = n - 1;
618     int j = -n; // loop counter
619     int k = r; // worker index, random if j < 0
620     for (;;) {
621     ForkJoinWorkerThread v = ws[k & mask];
622     r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
623     if (v != null && v.base != v.sp) {
624     int b, i; // inline specialized deqTask
625     ForkJoinTask<?>[] q;
626 dl 1.14 ForkJoinTask<?> t;
627 dl 1.16 if ((canSteal || // ensure active status
628     (canSteal = active = p.tryIncrementActiveCount())) &&
629 dl 1.14 (q = v.queue) != null &&
630 dl 1.16 (t = q[i = (q.length - 1) & (b = v.base)]) != null &&
631 dl 1.14 UNSAFE.compareAndSwapObject
632     (q, (i << qShift) + qBase, t, null)) {
633     v.base = b + 1;
634     seed = r;
635     ++stealCount;
636     return t;
637 jsr166 1.1 }
638 dl 1.16 j = -n;
639     k = r; // restart on contention
640 jsr166 1.1 }
641 dl 1.16 else if (++j <= 0)
642     k = r;
643     else if (j <= n)
644     k += (n >>> 1) | 1;
645     else
646     break;
647 jsr166 1.1 }
648 dl 1.14 }
649     return null;
650 jsr166 1.1 }
651    
652 dl 1.14 // Run State management
653    
654     // status check methods used mainly by ForkJoinPool
655     final boolean isTerminating() { return (runState & TERMINATING) != 0; }
656     final boolean isTerminated() { return (runState & TERMINATED) != 0; }
657     final boolean isSuspended() { return (runState & SUSPENDED) != 0; }
658     final boolean isTrimmed() { return (runState & TRIMMED) != 0; }
659    
660 jsr166 1.1 /**
661 dl 1.14 * Sets state to TERMINATING, also resuming if suspended.
662     */
663     final void shutdown() {
664     for (;;) {
665     int s = runState;
666     if ((s & SUSPENDED) != 0) { // kill and wakeup if suspended
667     if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
668     (s & ~SUSPENDED) |
669     (TRIMMED|TERMINATING))) {
670     LockSupport.unpark(this);
671     break;
672     }
673     }
674     else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
675     s | TERMINATING))
676     break;
677     }
678     }
679    
680     /**
681     * Sets state to TERMINATED. Called only by this thread.
682     */
683     private void setTerminated() {
684     int s;
685     do {} while (!UNSAFE.compareAndSwapInt(this, runStateOffset,
686     s = runState,
687     s | (TERMINATING|TERMINATED)));
688     }
689    
690     /**
691     * Instrumented version of park. Also used by ForkJoinPool.awaitEvent
692 jsr166 1.1 */
693 dl 1.14 final void doPark() {
694     ++parkCount;
695     LockSupport.park(this);
696 jsr166 1.1 }
697    
698     /**
699 dl 1.14 * If suspended, tries to set status to unsuspended.
700     * Caller must unpark to actually resume
701 jsr166 1.1 *
702 dl 1.14 * @return true if successful
703 jsr166 1.1 */
704 dl 1.14 final boolean tryUnsuspend() {
705     int s;
706     return (((s = runState) & SUSPENDED) != 0 &&
707     UNSAFE.compareAndSwapInt(this, runStateOffset, s,
708     s & ~SUSPENDED));
709 jsr166 1.1 }
710    
711     /**
712 dl 1.14 * Sets suspended status and blocks as spare until resumed,
713     * shutdown, or timed out.
714 jsr166 1.1 *
715 dl 1.14 * @return false if trimmed
716 jsr166 1.1 */
717 dl 1.14 final boolean suspendAsSpare() {
718     for (;;) { // set suspended unless terminating
719     int s = runState;
720     if ((s & TERMINATING) != 0) { // must kill
721     if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
722     s | (TRIMMED | TERMINATING)))
723     return false;
724     }
725     else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
726     s | SUSPENDED))
727     break;
728     }
729     lastEventCount = 0; // reset upon resume
730 jsr166 1.1 ForkJoinPool p = pool;
731 dl 1.14 p.releaseWaiters(); // help others progress
732     p.accumulateStealCount(this);
733     interrupted(); // clear/ignore interrupts
734     if (poolIndex < p.getParallelism()) { // untimed wait
735     while ((runState & SUSPENDED) != 0)
736     doPark();
737     return true;
738     }
739     return timedSuspend(); // timed wait if apparently non-core
740     }
741    
742     /**
743     * Blocks as spare until resumed or timed out
744     * @return false if trimmed
745     */
746     private boolean timedSuspend() {
747     long nanos = SPARE_KEEPALIVE_NANOS;
748     long startTime = System.nanoTime();
749     while ((runState & SUSPENDED) != 0) {
750     ++parkCount;
751     if ((nanos -= (System.nanoTime() - startTime)) > 0)
752     LockSupport.parkNanos(this, nanos);
753     else { // try to trim on timeout
754     int s = runState;
755     if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
756     (s & ~SUSPENDED) |
757     (TRIMMED|TERMINATING)))
758     return false;
759     }
760 jsr166 1.1 }
761 dl 1.14 return true;
762     }
763    
764     // Misc support methods for ForkJoinPool
765    
766     /**
767     * Returns an estimate of the number of tasks in the queue. Also
768     * used by ForkJoinTask.
769     */
770     final int getQueueSize() {
771     return -base + sp;
772 jsr166 1.1 }
773    
774 dl 1.14 /**
775     * Set locallyFifo mode. Called only by ForkJoinPool
776     */
777     final void setAsyncMode(boolean async) {
778     locallyFifo = async;
779     }
780 jsr166 1.1
781     /**
782     * Removes and cancels all tasks in queue. Can be called from any
783     * thread.
784     */
785     final void cancelTasks() {
786 dl 1.14 while (base != sp) {
787     ForkJoinTask<?> t = deqTask();
788     if (t != null)
789     t.cancelIgnoringExceptions();
790     }
791 jsr166 1.1 }
792    
793     /**
794     * Drains tasks to given collection c.
795     *
796     * @return the number of tasks drained
797     */
798 jsr166 1.5 final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
799 jsr166 1.1 int n = 0;
800 dl 1.14 while (base != sp) {
801     ForkJoinTask<?> t = deqTask();
802     if (t != null) {
803     c.add(t);
804     ++n;
805     }
806 jsr166 1.1 }
807     return n;
808     }
809    
810 dl 1.14 // Support methods for ForkJoinTask
811    
812 jsr166 1.1 /**
813 dl 1.14 * Returns an estimate of the number of tasks, offset by a
814     * function of number of idle workers.
815     *
816     * This method provides a cheap heuristic guide for task
817     * partitioning when programmers, frameworks, tools, or languages
818     * have little or no idea about task granularity. In essence by
819     * offering this method, we ask users only about tradeoffs in
820     * overhead vs expected throughput and its variance, rather than
821     * how finely to partition tasks.
822     *
823     * In a steady state strict (tree-structured) computation, each
824     * thread makes available for stealing enough tasks for other
825     * threads to remain active. Inductively, if all threads play by
826     * the same rules, each thread should make available only a
827     * constant number of tasks.
828     *
829     * The minimum useful constant is just 1. But using a value of 1
830     * would require immediate replenishment upon each steal to
831     * maintain enough tasks, which is infeasible. Further,
832     * partitionings/granularities of offered tasks should minimize
833     * steal rates, which in general means that threads nearer the top
834     * of computation tree should generate more than those nearer the
835     * bottom. In perfect steady state, each thread is at
836     * approximately the same level of computation tree. However,
837     * producing extra tasks amortizes the uncertainty of progress and
838     * diffusion assumptions.
839     *
840     * So, users will want to use values larger, but not much larger
841     * than 1 to both smooth over transient shortages and hedge
842     * against uneven progress; as traded off against the cost of
843     * extra task overhead. We leave the user to pick a threshold
844     * value to compare with the results of this call to guide
845     * decisions, but recommend values such as 3.
846     *
847     * When all threads are active, it is on average OK to estimate
848     * surplus strictly locally. In steady-state, if one thread is
849     * maintaining say 2 surplus tasks, then so are others. So we can
850     * just use estimated queue length (although note that (sp - base)
851     * can be an overestimate because of stealers lagging increments
852     * of base). However, this strategy alone leads to serious
853     * mis-estimates in some non-steady-state conditions (ramp-up,
854     * ramp-down, other stalls). We can detect many of these by
855     * further considering the number of "idle" threads, that are
856     * known to have zero queued tasks, so compensate by a factor of
857     * (#idle/#active) threads.
858 jsr166 1.1 */
859 dl 1.14 final int getEstimatedSurplusTaskCount() {
860     return sp - base - pool.idlePerActive();
861 jsr166 1.1 }
862    
863     /**
864 dl 1.14 * Gets and removes a local task.
865 jsr166 1.1 *
866 dl 1.14 * @return a task, if available
867 jsr166 1.1 */
868 dl 1.14 final ForkJoinTask<?> pollLocalTask() {
869     while (base != sp) {
870     if (active || (active = pool.tryIncrementActiveCount()))
871     return locallyFifo? locallyDeqTask() : popTask();
872 jsr166 1.1 }
873 dl 1.14 return null;
874 jsr166 1.1 }
875    
876     /**
877 dl 1.14 * Gets and removes a local or stolen task.
878     *
879     * @return a task, if available
880 jsr166 1.1 */
881 dl 1.14 final ForkJoinTask<?> pollTask() {
882     ForkJoinTask<?> t;
883     return (t = pollLocalTask()) != null ? t : scan();
884 jsr166 1.1 }
885    
886     /**
887 dl 1.16 * Executes or processes other tasks awaiting the given task
888     * @return task completion status
889     */
890     final int execWhileJoining(ForkJoinTask<?> joinMe) {
891     int s;
892     while ((s = joinMe.status) >= 0) {
893     ForkJoinTask<?> t = base != sp?
894     popWhileJoining(joinMe) :
895     scanWhileJoining(joinMe);
896     if (t != null)
897     t.tryExec();
898     }
899     return s;
900     }
901    
902     /**
903     * Returns or stolen task, if available, unless joinMe is done
904 dl 1.14 *
905     * This method is intrinsically nonmodular. To maintain the
906     * property that tasks are never stolen if the awaited task is
907     * ready, we must interleave mechanics of scan with status
908     * checks. We rely here on the commit points of deq that allow us
909     * to cancel a steal even after CASing slot to null, but before
910     * adjusting base index: If, after the CAS, we see that joinMe is
911     * ready, we can back out by placing the task back into the slot,
912 dl 1.16 * without adjusting index. The loop is otherwise a variant of the
913     * one in scan().
914 dl 1.14 *
915 jsr166 1.1 */
916 dl 1.16 private ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
917     int r = seed;
918     ForkJoinPool p = pool;
919     ForkJoinWorkerThread[] ws;
920     int n;
921     outer:while ((ws = p.workers) != null && (n = ws.length) > 1) {
922     int mask = n - 1;
923 dl 1.14 int k = r;
924 dl 1.16 boolean contended = false; // to retry loop if deq contends
925     for (int j = -n; j <= n; ++j) {
926     if (joinMe.status < 0)
927     break outer;
928     int b;
929     ForkJoinTask<?>[] q;
930     ForkJoinWorkerThread v = ws[k & mask];
931 dl 1.14 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
932 dl 1.16 if (v != null && (b=v.base) != v.sp && (q=v.queue) != null) {
933     int i = (q.length - 1) & b;
934     ForkJoinTask<?> t = q[i];
935     if (t != null && UNSAFE.compareAndSwapObject
936     (q, (i << qShift) + qBase, t, null)) {
937     if (joinMe.status >= 0) {
938     v.base = b + 1;
939     seed = r;
940     ++stealCount;
941     return t;
942 dl 1.14 }
943 dl 1.16 UNSAFE.putObjectVolatile(q, (i<<qShift)+qBase, t);
944     break outer; // back out
945 dl 1.14 }
946 dl 1.16 contended = true;
947 dl 1.14 }
948 dl 1.16 k = j < 0 ? r : (k + ((n >>> 1) | 1));
949 dl 1.14 }
950 dl 1.16 if (!contended && p.tryAwaitBusyJoin(joinMe))
951     break;
952 dl 1.15 }
953     return null;
954     }
955    
956     /**
957     * Version of popTask with join checks surrounding extraction.
958 dl 1.16 * Uses the same backout strategy as helpJoinTask. Note that
959 dl 1.15 * we ignore locallyFifo flag for local tasks here since helping
960     * joins only make sense in LIFO mode.
961     *
962     * @return a popped task, if available, unless joinMe is done
963     */
964     private ForkJoinTask<?> popWhileJoining(ForkJoinTask<?> joinMe) {
965     int s;
966     ForkJoinTask<?>[] q;
967     while ((s = sp) != base && (q = queue) != null && joinMe.status >= 0) {
968     int i = (q.length - 1) & --s;
969     ForkJoinTask<?> t = q[i];
970     if (t != null && UNSAFE.compareAndSwapObject
971     (q, (i << qShift) + qBase, t, null)) {
972     if (joinMe.status >= 0) {
973     sp = s;
974     return t;
975     }
976     UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t);
977     break; // back out
978     }
979 jsr166 1.1 }
980 dl 1.14 return null;
981 jsr166 1.1 }
982    
983     /**
984     * Runs tasks until {@code pool.isQuiescent()}.
985     */
986     final void helpQuiescePool() {
987     for (;;) {
988 dl 1.14 ForkJoinTask<?> t = pollLocalTask();
989     if (t != null || (t = scan()) != null)
990     t.tryExec();
991     else {
992     ForkJoinPool p = pool;
993     if (active) {
994     active = false; // inactivate
995     do {} while (!p.tryDecrementActiveCount());
996     }
997     if (p.isQuiescent()) {
998     active = true; // re-activate
999     do {} while (!p.tryIncrementActiveCount());
1000     return;
1001     }
1002     }
1003 jsr166 1.1 }
1004     }
1005    
1006     // Unsafe mechanics
1007    
1008     private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
1009 jsr166 1.2 private static final long runStateOffset =
1010 jsr166 1.3 objectFieldOffset("runState", ForkJoinWorkerThread.class);
1011 dl 1.14 private static final long qBase =
1012     UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
1013 jsr166 1.2 private static final int qShift;
1014 jsr166 1.1
1015     static {
1016     int s = UNSAFE.arrayIndexScale(ForkJoinTask[].class);
1017     if ((s & (s-1)) != 0)
1018     throw new Error("data type scale not a power of two");
1019     qShift = 31 - Integer.numberOfLeadingZeros(s);
1020     }
1021 jsr166 1.3
1022     private static long objectFieldOffset(String field, Class<?> klazz) {
1023     try {
1024     return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1025     } catch (NoSuchFieldException e) {
1026     // Convert Exception to corresponding Error
1027     NoSuchFieldError error = new NoSuchFieldError(field);
1028     error.initCause(e);
1029     throw error;
1030     }
1031     }
1032 jsr166 1.1 }