ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinWorkerThread.java
Revision: 1.6
Committed: Fri Jul 31 20:41:13 2009 UTC (14 years, 10 months ago) by jsr166
Branch: MAIN
Changes since 1.5: +34 -7 lines
Log Message:
sync with jsr166 package

File Contents

# User Rev Content
1 jsr166 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package java.util.concurrent;
8    
9     import java.util.Collection;
10    
11     /**
12     * A thread managed by a {@link ForkJoinPool}. This class is
13     * subclassable solely for the sake of adding functionality -- there
14     * are no overridable methods dealing with scheduling or
15     * execution. However, you can override initialization and termination
16     * methods surrounding the main task processing loop. If you do
17     * create such a subclass, you will also need to supply a custom
18     * ForkJoinWorkerThreadFactory to use it in a ForkJoinPool.
19     *
20     * @since 1.7
21     * @author Doug Lea
22     */
23     public class ForkJoinWorkerThread extends Thread {
24     /*
25     * Algorithm overview:
26     *
27     * 1. Work-Stealing: Work-stealing queues are special forms of
28     * Deques that support only three of the four possible
29     * end-operations -- push, pop, and deq (aka steal), and only do
30     * so under the constraints that push and pop are called only from
31     * the owning thread, while deq may be called from other threads.
32     * (If you are unfamiliar with them, you probably want to read
33     * Herlihy and Shavit's book "The Art of Multiprocessor
34     * programming", chapter 16 describing these in more detail before
35     * proceeding.) The main work-stealing queue design is roughly
36     * similar to "Dynamic Circular Work-Stealing Deque" by David
37     * Chase and Yossi Lev, SPAA 2005
38     * (http://research.sun.com/scalable/pubs/index.html). The main
39     * difference ultimately stems from gc requirements that we null
40     * out taken slots as soon as we can, to maintain as small a
41     * footprint as possible even in programs generating huge numbers
42     * of tasks. To accomplish this, we shift the CAS arbitrating pop
43     * vs deq (steal) from being on the indices ("base" and "sp") to
44     * the slots themselves (mainly via method "casSlotNull()"). So,
45     * both a successful pop and deq mainly entail CAS'ing a non-null
46     * slot to null. Because we rely on CASes of references, we do
47     * not need tag bits on base or sp. They are simple ints as used
48     * in any circular array-based queue (see for example ArrayDeque).
49     * Updates to the indices must still be ordered in a way that
50     * guarantees that (sp - base) > 0 means the queue is empty, but
51     * otherwise may err on the side of possibly making the queue
52     * appear nonempty when a push, pop, or deq have not fully
53     * committed. Note that this means that the deq operation,
54     * considered individually, is not wait-free. One thief cannot
55     * successfully continue until another in-progress one (or, if
56     * previously empty, a push) completes. However, in the
57     * aggregate, we ensure at least probabilistic non-blockingness. If
58     * an attempted steal fails, a thief always chooses a different
59     * random victim target to try next. So, in order for one thief to
60     * progress, it suffices for any in-progress deq or new push on
61     * any empty queue to complete. One reason this works well here is
62     * that apparently-nonempty often means soon-to-be-stealable,
63     * which gives threads a chance to activate if necessary before
64     * stealing (see below).
65     *
66 jsr166 1.6 * This approach also enables support for "async mode" where local
67     * task processing is in FIFO, not LIFO order; simply by using a
68     * version of deq rather than pop when locallyFifo is true (as set
69     * by the ForkJoinPool). This allows use in message-passing
70     * frameworks in which tasks are never joined.
71     *
72 jsr166 1.1 * Efficient implementation of this approach currently relies on
73     * an uncomfortable amount of "Unsafe" mechanics. To maintain
74     * correct orderings, reads and writes of variable base require
75     * volatile ordering. Variable sp does not require volatile write
76     * but needs cheaper store-ordering on writes. Because they are
77     * protected by volatile base reads, reads of the queue array and
78     * its slots do not need volatile load semantics, but writes (in
79     * push) require store order and CASes (in pop and deq) require
80     * (volatile) CAS semantics. Since these combinations aren't
81     * supported using ordinary volatiles, the only way to accomplish
82     * these efficiently is to use direct Unsafe calls. (Using external
83     * AtomicIntegers and AtomicReferenceArrays for the indices and
84     * array is significantly slower because of memory locality and
85     * indirection effects.) Further, performance on most platforms is
86     * very sensitive to placement and sizing of the (resizable) queue
87     * array. Even though these queues don't usually become all that
88     * big, the initial size must be large enough to counteract cache
89     * contention effects across multiple queues (especially in the
90     * presence of GC cardmarking). Also, to improve thread-locality,
91     * queues are currently initialized immediately after the thread
92     * gets the initial signal to start processing tasks. However,
93     * all queue-related methods except pushTask are written in a way
94     * that allows them to instead be lazily allocated and/or disposed
95     * of when empty. All together, these low-level implementation
96     * choices produce as much as a factor of 4 performance
97     * improvement compared to naive implementations, and enable the
98     * processing of billions of tasks per second, sometimes at the
99     * expense of ugliness.
100     *
101     * 2. Run control: The primary run control is based on a global
102     * counter (activeCount) held by the pool. It uses an algorithm
103     * similar to that in Herlihy and Shavit section 17.6 to cause
104     * threads to eventually block when all threads declare they are
105     * inactive. (See variable "scans".) For this to work, threads
106     * must be declared active when executing tasks, and before
107     * stealing a task. They must be inactive before blocking on the
108     * Pool Barrier (awaiting a new submission or other Pool
109     * event). In between, there is some free play which we take
110     * advantage of to avoid contention and rapid flickering of the
111     * global activeCount: If inactive, we activate only if a victim
112     * queue appears to be nonempty (see above). Similarly, a thread
113     * tries to inactivate only after a full scan of other threads.
114     * The net effect is that contention on activeCount is rarely a
115     * measurable performance issue. (There are also a few other cases
116     * where we scan for work rather than retry/block upon
117     * contention.)
118     *
119     * 3. Selection control. We maintain policy of always choosing to
120     * run local tasks rather than stealing, and always trying to
121     * steal tasks before trying to run a new submission. All steals
122     * are currently performed in randomly-chosen deq-order. It may be
123     * worthwhile to bias these with locality / anti-locality
124     * information, but doing this well probably requires more
125     * lower-level information from JVMs than currently provided.
126     */
127    
128     /**
129     * Capacity of work-stealing queue array upon initialization.
130     * Must be a power of two. Initial size must be at least 2, but is
131     * padded to minimize cache effects.
132     */
133     private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
134    
135     /**
136     * Maximum work-stealing queue array size. Must be less than or
137     * equal to 1 << 28 to ensure lack of index wraparound. (This
138     * is less than usual bounds, because we need leftshift by 3
139     * to be in int range).
140     */
141     private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28;
142    
143     /**
144     * The pool this thread works in. Accessed directly by ForkJoinTask.
145     */
146     final ForkJoinPool pool;
147    
148     /**
149     * The work-stealing queue array. Size must be a power of two.
150     * Initialized when thread starts, to improve memory locality.
151     */
152     private ForkJoinTask<?>[] queue;
153    
154     /**
155     * Index (mod queue.length) of next queue slot to push to or pop
156     * from. It is written only by owner thread, via ordered store.
157     * Both sp and base are allowed to wrap around on overflow, but
158     * (sp - base) still estimates size.
159     */
160     private volatile int sp;
161    
162     /**
163     * Index (mod queue.length) of least valid queue slot, which is
164     * always the next position to steal from if nonempty.
165     */
166     private volatile int base;
167    
168     /**
169     * Activity status. When true, this worker is considered active.
170     * Must be false upon construction. It must be true when executing
171     * tasks, and BEFORE stealing a task. It must be false before
172     * calling pool.sync.
173     */
174     private boolean active;
175    
176     /**
177     * Run state of this worker. Supports simple versions of the usual
178     * shutdown/shutdownNow control.
179     */
180     private volatile int runState;
181    
182     /**
183     * Seed for random number generator for choosing steal victims.
184     * Uses Marsaglia xorshift. Must be nonzero upon initialization.
185     */
186     private int seed;
187    
188     /**
189     * Number of steals, transferred to pool when idle
190     */
191     private int stealCount;
192    
193     /**
194     * Index of this worker in pool array. Set once by pool before
195     * running, and accessed directly by pool during cleanup etc.
196     */
197     int poolIndex;
198    
199     /**
200     * The last barrier event waited for. Accessed in pool callback
201     * methods, but only by current thread.
202     */
203     long lastEventCount;
204    
205     /**
206     * True if use local fifo, not default lifo, for local polling
207     */
208     private boolean locallyFifo;
209    
210     /**
211     * Creates a ForkJoinWorkerThread operating in the given pool.
212     *
213     * @param pool the pool this thread works in
214     * @throws NullPointerException if pool is null
215     */
216     protected ForkJoinWorkerThread(ForkJoinPool pool) {
217     if (pool == null) throw new NullPointerException();
218     this.pool = pool;
219     // Note: poolIndex is set by pool during construction
220     // Remaining initialization is deferred to onStart
221     }
222    
223     // Public access methods
224    
225     /**
226     * Returns the pool hosting this thread.
227     *
228     * @return the pool
229     */
230     public ForkJoinPool getPool() {
231     return pool;
232     }
233    
234     /**
235     * Returns the index number of this thread in its pool. The
236     * returned value ranges from zero to the maximum number of
237     * threads (minus one) that have ever been created in the pool.
238     * This method may be useful for applications that track status or
239     * collect results per-worker rather than per-task.
240     *
241     * @return the index number
242     */
243     public int getPoolIndex() {
244     return poolIndex;
245     }
246    
247     /**
248     * Establishes local first-in-first-out scheduling mode for forked
249     * tasks that are never joined.
250     *
251     * @param async if true, use locally FIFO scheduling
252     */
253     void setAsyncMode(boolean async) {
254     locallyFifo = async;
255     }
256    
257     // Runstate management
258    
259     // Runstate values. Order matters
260     private static final int RUNNING = 0;
261     private static final int SHUTDOWN = 1;
262     private static final int TERMINATING = 2;
263     private static final int TERMINATED = 3;
264    
265     final boolean isShutdown() { return runState >= SHUTDOWN; }
266     final boolean isTerminating() { return runState >= TERMINATING; }
267     final boolean isTerminated() { return runState == TERMINATED; }
268     final boolean shutdown() { return transitionRunStateTo(SHUTDOWN); }
269     final boolean shutdownNow() { return transitionRunStateTo(TERMINATING); }
270    
271     /**
272 jsr166 1.4 * Transitions to at least the given state.
273     *
274     * @return {@code true} if not already at least at given state
275 jsr166 1.1 */
276     private boolean transitionRunStateTo(int state) {
277     for (;;) {
278     int s = runState;
279     if (s >= state)
280     return false;
281     if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, state))
282     return true;
283     }
284     }
285    
286     /**
287     * Tries to set status to active; fails on contention.
288     */
289     private boolean tryActivate() {
290     if (!active) {
291     if (!pool.tryIncrementActiveCount())
292     return false;
293     active = true;
294     }
295     return true;
296     }
297    
298     /**
299     * Tries to set status to inactive; fails on contention.
300     */
301     private boolean tryInactivate() {
302     if (active) {
303     if (!pool.tryDecrementActiveCount())
304     return false;
305     active = false;
306     }
307     return true;
308     }
309    
310     /**
311     * Computes next value for random victim probe. Scans don't
312     * require a very high quality generator, but also not a crummy
313     * one. Marsaglia xor-shift is cheap and works well.
314     */
315     private static int xorShift(int r) {
316 jsr166 1.6 r ^= (r << 13);
317     r ^= (r >>> 17);
318     return r ^ (r << 5);
319 jsr166 1.1 }
320    
321     // Lifecycle methods
322    
323     /**
324     * This method is required to be public, but should never be
325     * called explicitly. It performs the main run loop to execute
326     * ForkJoinTasks.
327     */
328     public void run() {
329     Throwable exception = null;
330     try {
331     onStart();
332     pool.sync(this); // await first pool event
333     mainLoop();
334     } catch (Throwable ex) {
335     exception = ex;
336     } finally {
337     onTermination(exception);
338     }
339     }
340    
341     /**
342     * Executes tasks until shut down.
343     */
344     private void mainLoop() {
345     while (!isShutdown()) {
346     ForkJoinTask<?> t = pollTask();
347     if (t != null || (t = pollSubmission()) != null)
348     t.quietlyExec();
349     else if (tryInactivate())
350     pool.sync(this);
351     }
352     }
353    
354     /**
355     * Initializes internal state after construction but before
356     * processing any tasks. If you override this method, you must
357     * invoke super.onStart() at the beginning of the method.
358     * Initialization requires care: Most fields must have legal
359     * default values, to ensure that attempted accesses from other
360     * threads work correctly even before this thread starts
361     * processing tasks.
362     */
363     protected void onStart() {
364     // Allocate while starting to improve chances of thread-local
365     // isolation
366     queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
367     // Initial value of seed need not be especially random but
368     // should differ across workers and must be nonzero
369     int p = poolIndex + 1;
370     seed = p + (p << 8) + (p << 16) + (p << 24); // spread bits
371     }
372    
373     /**
374     * Performs cleanup associated with termination of this worker
375     * thread. If you override this method, you must invoke
376     * {@code super.onTermination} at the end of the overridden method.
377     *
378     * @param exception the exception causing this thread to abort due
379 jsr166 1.4 * to an unrecoverable error, or {@code null} if completed normally
380 jsr166 1.1 */
381     protected void onTermination(Throwable exception) {
382     // Execute remaining local tasks unless aborting or terminating
383     while (exception == null && !pool.isTerminating() && base != sp) {
384     try {
385     ForkJoinTask<?> t = popTask();
386     if (t != null)
387     t.quietlyExec();
388     } catch (Throwable ex) {
389     exception = ex;
390     }
391     }
392     // Cancel other tasks, transition status, notify pool, and
393     // propagate exception to uncaught exception handler
394     try {
395     do {} while (!tryInactivate()); // ensure inactive
396     cancelTasks();
397     runState = TERMINATED;
398     pool.workerTerminated(this);
399     } catch (Throwable ex) { // Shouldn't ever happen
400     if (exception == null) // but if so, at least rethrown
401     exception = ex;
402     } finally {
403     if (exception != null)
404     ForkJoinTask.rethrowException(exception);
405     }
406     }
407    
408     // Intrinsics-based support for queue operations.
409    
410     /**
411     * Adds in store-order the given task at given slot of q to null.
412     * Caller must ensure q is non-null and index is in range.
413     */
414     private static void setSlot(ForkJoinTask<?>[] q, int i,
415     ForkJoinTask<?> t) {
416     UNSAFE.putOrderedObject(q, (i << qShift) + qBase, t);
417     }
418    
419     /**
420     * CAS given slot of q to null. Caller must ensure q is non-null
421     * and index is in range.
422     */
423     private static boolean casSlotNull(ForkJoinTask<?>[] q, int i,
424     ForkJoinTask<?> t) {
425     return UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null);
426     }
427    
428     /**
429     * Sets sp in store-order.
430     */
431     private void storeSp(int s) {
432     UNSAFE.putOrderedInt(this, spOffset, s);
433     }
434    
435     // Main queue methods
436    
437     /**
438     * Pushes a task. Called only by current thread.
439     *
440     * @param t the task. Caller must ensure non-null.
441     */
442     final void pushTask(ForkJoinTask<?> t) {
443     ForkJoinTask<?>[] q = queue;
444     int mask = q.length - 1;
445     int s = sp;
446     setSlot(q, s & mask, t);
447     storeSp(++s);
448     if ((s -= base) == 1)
449     pool.signalWork();
450     else if (s >= mask)
451     growQueue();
452     }
453    
454     /**
455     * Tries to take a task from the base of the queue, failing if
456     * either empty or contended.
457     *
458     * @return a task, or null if none or contended
459     */
460     final ForkJoinTask<?> deqTask() {
461     ForkJoinTask<?> t;
462     ForkJoinTask<?>[] q;
463     int i;
464     int b;
465     if (sp != (b = base) &&
466     (q = queue) != null && // must read q after b
467     (t = q[i = (q.length - 1) & b]) != null &&
468     casSlotNull(q, i, t)) {
469     base = b + 1;
470     return t;
471     }
472     return null;
473     }
474    
475     /**
476 jsr166 1.6 * Tries to take a task from the base of own queue, activating if
477     * necessary, failing only if empty. Called only by current thread.
478     *
479     * @return a task, or null if none
480     */
481     final ForkJoinTask<?> locallyDeqTask() {
482     int b;
483     while (sp != (b = base)) {
484     if (tryActivate()) {
485     ForkJoinTask<?>[] q = queue;
486     int i = (q.length - 1) & b;
487     ForkJoinTask<?> t = q[i];
488     if (t != null && casSlotNull(q, i, t)) {
489     base = b + 1;
490     return t;
491     }
492     }
493     }
494     return null;
495     }
496    
497     /**
498 jsr166 1.1 * Returns a popped task, or null if empty. Ensures active status
499     * if non-null. Called only by current thread.
500     */
501     final ForkJoinTask<?> popTask() {
502     int s = sp;
503     while (s != base) {
504     if (tryActivate()) {
505     ForkJoinTask<?>[] q = queue;
506     int mask = q.length - 1;
507     int i = (s - 1) & mask;
508     ForkJoinTask<?> t = q[i];
509     if (t == null || !casSlotNull(q, i, t))
510     break;
511     storeSp(s - 1);
512     return t;
513     }
514     }
515     return null;
516     }
517    
518     /**
519     * Specialized version of popTask to pop only if
520     * topmost element is the given task. Called only
521     * by current thread while active.
522     *
523     * @param t the task. Caller must ensure non-null.
524     */
525     final boolean unpushTask(ForkJoinTask<?> t) {
526     ForkJoinTask<?>[] q = queue;
527     int mask = q.length - 1;
528     int s = sp - 1;
529     if (casSlotNull(q, s & mask, t)) {
530     storeSp(s);
531     return true;
532     }
533     return false;
534     }
535    
536     /**
537 jsr166 1.6 * Returns next task or null if empty or contended
538 jsr166 1.1 */
539     final ForkJoinTask<?> peekTask() {
540     ForkJoinTask<?>[] q = queue;
541     if (q == null)
542     return null;
543     int mask = q.length - 1;
544     int i = locallyFifo ? base : (sp - 1);
545     return q[i & mask];
546     }
547    
548     /**
549     * Doubles queue array size. Transfers elements by emulating
550     * steals (deqs) from old array and placing, oldest first, into
551     * new array.
552     */
553     private void growQueue() {
554     ForkJoinTask<?>[] oldQ = queue;
555     int oldSize = oldQ.length;
556     int newSize = oldSize << 1;
557     if (newSize > MAXIMUM_QUEUE_CAPACITY)
558     throw new RejectedExecutionException("Queue capacity exceeded");
559     ForkJoinTask<?>[] newQ = queue = new ForkJoinTask<?>[newSize];
560    
561     int b = base;
562     int bf = b + oldSize;
563     int oldMask = oldSize - 1;
564     int newMask = newSize - 1;
565     do {
566     int oldIndex = b & oldMask;
567     ForkJoinTask<?> t = oldQ[oldIndex];
568     if (t != null && !casSlotNull(oldQ, oldIndex, t))
569     t = null;
570     setSlot(newQ, b & newMask, t);
571     } while (++b != bf);
572     pool.signalWork();
573     }
574    
575     /**
576     * Tries to steal a task from another worker. Starts at a random
577     * index of workers array, and probes workers until finding one
578     * with non-empty queue or finding that all are empty. It
579     * randomly selects the first n probes. If these are empty, it
580     * resorts to a full circular traversal, which is necessary to
581     * accurately set active status by caller. Also restarts if pool
582     * events occurred since last scan, which forces refresh of
583     * workers array, in case barrier was associated with resize.
584     *
585     * This method must be both fast and quiet -- usually avoiding
586     * memory accesses that could disrupt cache sharing etc other than
587     * those needed to check for and take tasks. This accounts for,
588     * among other things, updating random seed in place without
589     * storing it until exit.
590     *
591     * @return a task, or null if none found
592     */
593     private ForkJoinTask<?> scan() {
594     ForkJoinTask<?> t = null;
595     int r = seed; // extract once to keep scan quiet
596     ForkJoinWorkerThread[] ws; // refreshed on outer loop
597     int mask; // must be power 2 minus 1 and > 0
598     outer:do {
599     if ((ws = pool.workers) != null && (mask = ws.length - 1) > 0) {
600     int idx = r;
601     int probes = ~mask; // use random index while negative
602     for (;;) {
603     r = xorShift(r); // update random seed
604     ForkJoinWorkerThread v = ws[mask & idx];
605     if (v == null || v.sp == v.base) {
606     if (probes <= mask)
607     idx = (probes++ < 0) ? r : (idx + 1);
608     else
609     break;
610     }
611     else if (!tryActivate() || (t = v.deqTask()) == null)
612     continue outer; // restart on contention
613     else
614     break outer;
615     }
616     }
617     } while (pool.hasNewSyncEvent(this)); // retry on pool events
618     seed = r;
619     return t;
620     }
621    
622     /**
623     * Gets and removes a local or stolen task.
624     *
625     * @return a task, if available
626     */
627     final ForkJoinTask<?> pollTask() {
628 jsr166 1.6 ForkJoinTask<?> t = locallyFifo ? locallyDeqTask() : popTask();
629 jsr166 1.1 if (t == null && (t = scan()) != null)
630     ++stealCount;
631     return t;
632     }
633    
634     /**
635     * Gets a local task.
636     *
637     * @return a task, if available
638     */
639     final ForkJoinTask<?> pollLocalTask() {
640 jsr166 1.6 return locallyFifo ? locallyDeqTask() : popTask();
641 jsr166 1.1 }
642    
643     /**
644     * Returns a pool submission, if one exists, activating first.
645     *
646     * @return a submission, if available
647     */
648     private ForkJoinTask<?> pollSubmission() {
649     ForkJoinPool p = pool;
650     while (p.hasQueuedSubmissions()) {
651     ForkJoinTask<?> t;
652     if (tryActivate() && (t = p.pollSubmission()) != null)
653     return t;
654     }
655     return null;
656     }
657    
658     // Methods accessed only by Pool
659    
660     /**
661     * Removes and cancels all tasks in queue. Can be called from any
662     * thread.
663     */
664     final void cancelTasks() {
665     ForkJoinTask<?> t;
666     while (base != sp && (t = deqTask()) != null)
667     t.cancelIgnoringExceptions();
668     }
669    
670     /**
671     * Drains tasks to given collection c.
672     *
673     * @return the number of tasks drained
674     */
675 jsr166 1.5 final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
676 jsr166 1.1 int n = 0;
677     ForkJoinTask<?> t;
678     while (base != sp && (t = deqTask()) != null) {
679     c.add(t);
680     ++n;
681     }
682     return n;
683     }
684    
685     /**
686     * Gets and clears steal count for accumulation by pool. Called
687     * only when known to be idle (in pool.sync and termination).
688     */
689     final int getAndClearStealCount() {
690     int sc = stealCount;
691     stealCount = 0;
692     return sc;
693     }
694    
695     /**
696 jsr166 1.4 * Returns {@code true} if at least one worker in the given array
697     * appears to have at least one queued task.
698 jsr166 1.1 *
699     * @param ws array of workers
700     */
701     static boolean hasQueuedTasks(ForkJoinWorkerThread[] ws) {
702     if (ws != null) {
703     int len = ws.length;
704     for (int j = 0; j < 2; ++j) { // need two passes for clean sweep
705     for (int i = 0; i < len; ++i) {
706     ForkJoinWorkerThread w = ws[i];
707     if (w != null && w.sp != w.base)
708     return true;
709     }
710     }
711     }
712     return false;
713     }
714    
715     // Support methods for ForkJoinTask
716    
717     /**
718     * Returns an estimate of the number of tasks in the queue.
719     */
720     final int getQueueSize() {
721     // suppress momentarily negative values
722     return Math.max(0, sp - base);
723     }
724    
725     /**
726     * Returns an estimate of the number of tasks, offset by a
727     * function of number of idle workers.
728     */
729     final int getEstimatedSurplusTaskCount() {
730     // The halving approximates weighting idle vs non-idle workers
731     return (sp - base) - (pool.getIdleThreadCount() >>> 1);
732     }
733    
734     /**
735     * Scans, returning early if joinMe done.
736     */
737     final ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
738     ForkJoinTask<?> t = pollTask();
739     if (t != null && joinMe.status < 0 && sp == base) {
740     pushTask(t); // unsteal if done and this task would be stealable
741     t = null;
742     }
743     return t;
744     }
745    
746     /**
747     * Runs tasks until {@code pool.isQuiescent()}.
748     */
749     final void helpQuiescePool() {
750     for (;;) {
751     ForkJoinTask<?> t = pollTask();
752     if (t != null)
753     t.quietlyExec();
754     else if (tryInactivate() && pool.isQuiescent())
755     break;
756     }
757     do {} while (!tryActivate()); // re-activate on exit
758     }
759    
760     // Unsafe mechanics
761    
762     private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
763 jsr166 1.2 private static final long spOffset =
764 jsr166 1.3 objectFieldOffset("sp", ForkJoinWorkerThread.class);
765 jsr166 1.2 private static final long runStateOffset =
766 jsr166 1.3 objectFieldOffset("runState", ForkJoinWorkerThread.class);
767 jsr166 1.2 private static final long qBase;
768     private static final int qShift;
769 jsr166 1.1
770     static {
771     qBase = UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
772     int s = UNSAFE.arrayIndexScale(ForkJoinTask[].class);
773     if ((s & (s-1)) != 0)
774     throw new Error("data type scale not a power of two");
775     qShift = 31 - Integer.numberOfLeadingZeros(s);
776     }
777 jsr166 1.3
778     private static long objectFieldOffset(String field, Class<?> klazz) {
779     try {
780     return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
781     } catch (NoSuchFieldException e) {
782     // Convert Exception to corresponding Error
783     NoSuchFieldError error = new NoSuchFieldError(field);
784     error.initCause(e);
785     throw error;
786     }
787     }
788 jsr166 1.1 }