ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
Revision: 1.5
Committed: Mon Jan 12 17:16:18 2009 UTC (15 years, 4 months ago) by dl
Branch: MAIN
Changes since 1.4: +294 -462 lines
Log Message:
Split out ThreadLocalRandom; internal refactoring pass

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package jsr166y;
8     import java.util.*;
9     import java.util.concurrent.*;
10     import java.util.concurrent.atomic.*;
11     import java.util.concurrent.locks.*;
12     import sun.misc.Unsafe;
13     import java.lang.reflect.*;
14    
15     /**
16 dl 1.2 * A thread managed by a {@link ForkJoinPool}. This class is
17     * subclassable solely for the sake of adding functionality -- there
18     * are no overridable methods dealing with scheduling or
19     * execution. However, you can override initialization and termination
20     * cleanup methods surrounding the main task processing loop. If you
21     * do create such a subclass, you will also need to supply a custom
22     * ForkJoinWorkerThreadFactory to use it in a ForkJoinPool.
23     *
24 dl 1.1 */
25     public class ForkJoinWorkerThread extends Thread {
26     /*
27     * Algorithm overview:
28     *
29     * 1. Work-Stealing: Work-stealing queues are special forms of
30     * Deques that support only three of the four possible
31     * end-operations -- push, pop, and deq (aka steal), and only do
32     * so under the constraints that push and pop are called only from
33     * the owning thread, while deq may be called from other threads.
34     * (If you are unfamiliar with them, you probably want to read
35     * Herlihy and Shavit's book "The Art of Multiprocessor
36     * programming", chapter 16 describing these in more detail before
37     * proceeding.) The main work-stealing queue design is roughly
38     * similar to "Dynamic Circular Work-Stealing Deque" by David
39     * Chase and Yossi Lev, SPAA 2005
40     * (http://research.sun.com/scalable/pubs/index.html). The main
41     * difference ultimately stems from gc requirements that we null
42     * out taken slots as soon as we can, to maintain as small a
43     * footprint as possible even in programs generating huge numbers
44     * of tasks. To accomplish this, we shift the CAS arbitrating pop
45     * vs deq (steal) from being on the indices ("base" and "sp") to
46     * the slots themselves (mainly via method "casSlotNull()"). So,
47     * both a successful pop and deq mainly entail CAS'ing a nonnull
48     * slot to null. Because we rely on CASes of references, we do
49     * not need tag bits on base or sp. They are simple ints as used
50     * in any circular array-based queue (see for example ArrayDeque).
51     * Updates to the indices must still be ordered in a way that
52     * guarantees that (sp - base) > 0 means the queue is empty, but
53     * otherwise may err on the side of possibly making the queue
54     * appear nonempty when a push, pop, or deq have not fully
55     * committed. Note that this means that the deq operation,
56     * considered individually, is not wait-free. One thief cannot
57     * successfully continue until another in-progress one (or, if
58     * previously empty, a push) completes. However, in the
59     * aggregate, we ensure at least probablistic non-blockingness. If
60     * an attempted steal fails, a thief always chooses a different
61     * random victim target to try next. So, in order for one thief to
62     * progress, it suffices for any in-progress deq or new push on
63     * any empty queue to complete. One reason this works well here is
64     * that apparently-nonempty often means soon-to-be-stealable,
65     * which gives threads a chance to activate if necessary before
66     * stealing (see below).
67     *
68     * Efficient implementation of this approach currently relies on
69     * an uncomfortable amount of "Unsafe" mechanics. To maintain
70     * correct orderings, reads and writes of variable base require
71     * volatile ordering. Variable sp does not require volatile write
72     * but needs cheaper store-ordering on writes. Because they are
73     * protected by volatile base reads, reads of the queue array and
74     * its slots do not need volatile load semantics, but writes (in
75     * push) require store order and CASes (in pop and deq) require
76     * (volatile) CAS semantics. Since these combinations aren't
77     * supported using ordinary volatiles, the only way to accomplish
78     * these effciently is to use direct Unsafe calls. (Using external
79     * AtomicIntegers and AtomicReferenceArrays for the indices and
80     * array is significantly slower because of memory locality and
81     * indirection effects.) Further, performance on most platforms is
82     * very sensitive to placement and sizing of the (resizable) queue
83     * array. Even though these queues don't usually become all that
84     * big, the initial size must be large enough to counteract cache
85     * contention effects across multiple queues (especially in the
86     * presence of GC cardmarking). Also, to improve thread-locality,
87     * queues are currently initialized immediately after the thread
88     * gets the initial signal to start processing tasks. However,
89     * all queue-related methods except pushTask are written in a way
90     * that allows them to instead be lazily allocated and/or disposed
91     * of when empty. All together, these low-level implementation
92     * choices produce as much as a factor of 4 performance
93     * improvement compared to naive implementations, and enable the
94     * processing of billions of tasks per second, sometimes at the
95     * expense of ugliness.
96     *
97     * 2. Run control: The primary run control is based on a global
98     * counter (activeCount) held by the pool. It uses an algorithm
99     * similar to that in Herlihy and Shavit section 17.6 to cause
100     * threads to eventually block when all threads declare they are
101     * inactive. (See variable "scans".) For this to work, threads
102     * must be declared active when executing tasks, and before
103     * stealing a task. They must be inactive before blocking on the
104     * Pool Barrier (awaiting a new submission or other Pool
105     * event). In between, there is some free play which we take
106     * advantage of to avoid contention and rapid flickering of the
107     * global activeCount: If inactive, we activate only if a victim
108     * queue appears to be nonempty (see above). Similarly, a thread
109     * tries to inactivate only after a full scan of other threads.
110     * The net effect is that contention on activeCount is rarely a
111     * measurable performance issue. (There are also a few other cases
112     * where we scan for work rather than retry/block upon
113     * contention.)
114     *
115     * 3. Selection control. We maintain policy of always choosing to
116     * run local tasks rather than stealing, and always trying to
117     * steal tasks before trying to run a new submission. All steals
118     * are currently performed in randomly-chosen deq-order. It may be
119     * worthwhile to bias these with locality / anti-locality
120     * information, but doing this well probably requires more
121     * lower-level information from JVMs than currently provided.
122     */
123    
124     /**
125     * Capacity of work-stealing queue array upon initialization.
126     * Must be a power of two. Initial size must be at least 2, but is
127     * padded to minimize cache effects.
128     */
129     private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
130    
131     /**
132     * Maximum work-stealing queue array size. Must be less than or
133 dl 1.5 * equal to 1 << 28 to ensure lack of index wraparound. (This
134     * is less than usual bounds, because we need leftshift by 3
135     * to be in int range).
136 dl 1.1 */
137 dl 1.5 private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28;
138 dl 1.1
139     /**
140 dl 1.5 * The pool this thread works in. Accessed directly by ForkJoinTask
141 dl 1.1 */
142 dl 1.5 final ForkJoinPool pool;
143 dl 1.1
144     /**
145     * The work-stealing queue array. Size must be a power of two.
146 dl 1.5 * Initialized when thread starts, to improve memory locality.
147 dl 1.1 */
148     private ForkJoinTask<?>[] queue;
149    
150     /**
151     * Index (mod queue.length) of next queue slot to push to or pop
152     * from. It is written only by owner thread, via ordered store.
153     * Both sp and base are allowed to wrap around on overflow, but
154     * (sp - base) still estimates size.
155     */
156     private volatile int sp;
157    
158     /**
159     * Index (mod queue.length) of least valid queue slot, which is
160     * always the next position to steal from if nonempty.
161     */
162     private volatile int base;
163    
164     /**
165 dl 1.5 * Activity status. When true, this worker is considered active.
166     * Must be false upon construction. It must be true when executing
167     * tasks, and BEFORE stealing a task. It must be false before
168     * calling pool.sync
169 dl 1.1 */
170 dl 1.5 private boolean active;
171 dl 1.1
172     /**
173     * Run state of this worker. Supports simple versions of the usual
174     * shutdown/shutdownNow control.
175     */
176     private volatile int runState;
177    
178     /**
179 dl 1.5 * Seed for random number generator for choosing steal victims.
180     * Uses Marsaglia xorshift. Must be nonzero upon initialization.
181 dl 1.1 */
182 dl 1.5 private int seed;
183 dl 1.1
184     /**
185     * Number of steals, transferred to pool when idle
186     */
187     private int stealCount;
188    
189     /**
190 dl 1.5 * Index of this worker in pool array. Set once by pool before
191     * running, and accessed directly by pool during cleanup etc
192 dl 1.1 */
193 dl 1.5 int poolIndex;
194 dl 1.1
195     /**
196 dl 1.5 * The last barrier event waited for. Accessed in pool callback
197     * methods, but only by current thread.
198 dl 1.1 */
199 dl 1.5 long lastEventCount;
200 dl 1.1
201     /**
202     * Creates a ForkJoinWorkerThread operating in the given pool.
203     * @param pool the pool this thread works in
204     * @throws NullPointerException if pool is null
205     */
206     protected ForkJoinWorkerThread(ForkJoinPool pool) {
207     if (pool == null) throw new NullPointerException();
208     this.pool = pool;
209 dl 1.5 // Note: poolIndex is set by pool during construction
210     // Remaining initialization is deferred to onStart
211 dl 1.1 }
212    
213 dl 1.5 // Public access methods
214 dl 1.2
215     /**
216 dl 1.4 * Returns the pool hosting this thread
217 dl 1.2 * @return the pool
218     */
219 dl 1.4 public ForkJoinPool getPool() {
220     return pool;
221 dl 1.2 }
222    
223     /**
224 dl 1.4 * Returns the index number of this thread in its pool. The
225     * returned value ranges from zero to the maximum number of
226     * threads (minus one) that have ever been created in the pool.
227     * This method may be useful for applications that track status or
228 dl 1.5 * collect results per-worker rather than per-task.
229 dl 1.2 * @return the index number.
230     */
231 dl 1.4 public int getPoolIndex() {
232     return poolIndex;
233 dl 1.2 }
234    
235 dl 1.5
236     // Runstate management
237    
238     // Runstate values. Order matters
239     private static final int RUNNING = 0;
240     private static final int SHUTDOWN = 1;
241     private static final int TERMINATING = 2;
242     private static final int TERMINATED = 3;
243    
244     final boolean isShutdown() { return runState >= SHUTDOWN; }
245     final boolean isTerminating() { return runState >= TERMINATING; }
246     final boolean isTerminated() { return runState == TERMINATED; }
247     final boolean shutdown() { return transitionRunStateTo(SHUTDOWN); }
248     final boolean shutdownNow() { return transitionRunStateTo(TERMINATING); }
249    
250     /**
251     * Transition to at least the given state. Return true if not
252     * already at least given state.
253     */
254     private boolean transitionRunStateTo(int state) {
255     for (;;) {
256     int s = runState;
257     if (s >= state)
258     return false;
259     if (_unsafe.compareAndSwapInt(this, runStateOffset, s, state))
260     return true;
261     }
262     }
263    
264     /**
265     * Try to set status to active; fail on contention
266     */
267     private boolean tryActivate() {
268     if (!active) {
269     if (!pool.tryIncrementActiveCount())
270     return false;
271     active = true;
272     }
273     return true;
274     }
275    
276     /**
277     * Try to set status to active; fail on contention
278     */
279     private boolean tryInactivate() {
280     if (active) {
281     if (!pool.tryDecrementActiveCount())
282     return false;
283     active = false;
284     }
285     return true;
286     }
287    
288     /**
289     * Computes next value for random victim probe. Scans don't
290     * require a very high quality generator, but also not a crummy
291     * one. Marsaglia xor-shift is cheap and works well.
292     */
293     private static int xorShift(int r) {
294     r ^= r << 1;
295     r ^= r >>> 3;
296     r ^= r << 10;
297     return r;
298     }
299    
300     // Lifecycle methods
301 dl 1.1
302     /**
303 dl 1.5 * This method is required to be public, but should never be
304     * called explicitly. It performs the main run loop to execute
305     * ForkJoinTasks.
306 dl 1.1 */
307 dl 1.5 public void run() {
308     Throwable exception = null;
309     try {
310     onStart();
311     pool.sync(this); // await first pool event
312     mainLoop();
313     } catch (Throwable ex) {
314     exception = ex;
315     } finally {
316     onTermination(exception);
317     }
318 dl 1.1 }
319    
320     /**
321 dl 1.5 * Execute tasks until shut down.
322 dl 1.1 */
323 dl 1.5 private void mainLoop() {
324     while (!isShutdown()) {
325     ForkJoinTask<?> t = pollTask();
326     if (t != null || (t = pollSubmission()) != null)
327     t.quietlyExec();
328     else if (tryInactivate())
329     pool.sync(this);
330     }
331 dl 1.1 }
332    
333 dl 1.5 /**
334     * Initializes internal state after construction but before
335     * processing any tasks. If you override this method, you must
336     * invoke super.onStart() at the beginning of the method.
337     * Initialization requires care: Most fields must have legal
338     * default values, to ensure that attempted accesses from other
339     * threads work correctly even before this thread starts
340     * processing tasks.
341     */
342     protected void onStart() {
343     // Allocate while starting to improve chances of thread-local
344     // isolation
345     queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
346     // Initial value of seed need not be especially random but
347     // should differ across workers and must be nonzero
348     int p = poolIndex + 1;
349     seed = p + (p << 8) + (p << 16) + (p << 24); // spread bits
350     }
351 dl 1.1
352     /**
353 dl 1.5 * Perform cleanup associated with termination of this worker
354     * thread. If you override this method, you must invoke
355     * super.onTermination at the end of the overridden method.
356     *
357     * @param exception the exception causing this thread to abort due
358     * to an unrecoverable error, or null if completed normally.
359 dl 1.1 */
360 dl 1.5 protected void onTermination(Throwable exception) {
361     // Execute remaining local tasks unless aborting or terminating
362     while (exception == null && !pool.isTerminating() && base != sp) {
363     try {
364     ForkJoinTask<?> t = popTask();
365     if (t != null)
366     t.quietlyExec();
367     } catch(Throwable ex) {
368     exception = ex;
369     }
370     }
371     // Cancel other tasks, transition status, notify pool, and
372     // propagate exception to uncaught exception handler
373     try {
374     do;while (!tryInactivate()); // ensure inactive
375     cancelTasks();
376     runState = TERMINATED;
377     pool.workerTerminated(this);
378     } catch (Throwable ex) { // Shouldn't ever happen
379     if (exception == null) // but if so, at least rethrown
380     exception = ex;
381     } finally {
382     if (exception != null)
383     ForkJoinTask.rethrowException(exception);
384     }
385 dl 1.1 }
386    
387 dl 1.5 // Intrinsics-based support for queue operations.
388    
389 dl 1.1 /**
390     * Add in store-order the given task at given slot of q to
391     * null. Caller must ensure q is nonnull and index is in range.
392     */
393     private static void setSlot(ForkJoinTask<?>[] q, int i,
394     ForkJoinTask<?> t){
395     _unsafe.putOrderedObject(q, (i << qShift) + qBase, t);
396     }
397    
398     /**
399     * CAS given slot of q to null. Caller must ensure q is nonnull
400     * and index is in range.
401     */
402     private static boolean casSlotNull(ForkJoinTask<?>[] q, int i,
403     ForkJoinTask<?> t) {
404     return _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null);
405     }
406    
407 dl 1.5 /**
408     * Sets sp in store-order.
409     */
410     private void storeSp(int s) {
411     _unsafe.putOrderedInt(this, spOffset, s);
412     }
413    
414 dl 1.1 // Main queue methods
415    
416     /**
417     * Pushes a task. Called only by current thread.
418     * @param t the task. Caller must ensure nonnull
419     */
420     final void pushTask(ForkJoinTask<?> t) {
421     ForkJoinTask<?>[] q = queue;
422     int mask = q.length - 1;
423     int s = sp;
424 dl 1.5 setSlot(q, s & mask, t);
425     storeSp(++s);
426 dl 1.1 if ((s -= base) == 1)
427 dl 1.5 pool.signalWork();
428 dl 1.1 else if (s >= mask)
429     growQueue();
430     }
431    
432     /**
433     * Tries to take a task from the base of the queue, failing if
434     * either empty or contended.
435     * @return a task, or null if none or contended.
436     */
437     private ForkJoinTask<?> deqTask() {
438 dl 1.5 ForkJoinTask<?> t;
439 dl 1.1 ForkJoinTask<?>[] q;
440     int i;
441     int b;
442     if (sp != (b = base) &&
443     (q = queue) != null && // must read q after b
444     (t = q[i = (q.length - 1) & b]) != null &&
445 dl 1.5 casSlotNull(q, i, t)) {
446 dl 1.1 base = b + 1;
447     return t;
448     }
449     return null;
450     }
451    
452     /**
453 dl 1.5 * Returns a popped task, or null if empty. Ensures active status
454     * if nonnull. Called only by current thread.
455 dl 1.1 */
456     final ForkJoinTask<?> popTask() {
457     int s = sp;
458 dl 1.5 while (s != base) {
459     if (tryActivate()) {
460     ForkJoinTask<?>[] q = queue;
461     int mask = q.length - 1;
462     int i = (s - 1) & mask;
463     ForkJoinTask<?> t = q[i];
464     if (t == null || !casSlotNull(q, i, t))
465     break;
466     storeSp(s - 1);
467     return t;
468     }
469 dl 1.1 }
470     return null;
471     }
472    
473     /**
474     * Specialized version of popTask to pop only if
475     * topmost element is the given task. Called only
476 dl 1.5 * by current thread while active.
477 dl 1.1 * @param t the task. Caller must ensure nonnull
478     */
479     final boolean unpushTask(ForkJoinTask<?> t) {
480     ForkJoinTask<?>[] q = queue;
481     int mask = q.length - 1;
482     int s = sp - 1;
483 dl 1.5 if (casSlotNull(q, s & mask, t)) {
484     storeSp(s);
485 dl 1.1 return true;
486     }
487     return false;
488     }
489    
490     /**
491     * Returns next task to pop.
492     */
493 dl 1.2 final ForkJoinTask<?> peekTask() {
494 dl 1.1 ForkJoinTask<?>[] q = queue;
495     return q == null? null : q[(sp - 1) & (q.length - 1)];
496     }
497    
498     /**
499     * Doubles queue array size. Transfers elements by emulating
500     * steals (deqs) from old array and placing, oldest first, into
501     * new array.
502     */
503     private void growQueue() {
504     ForkJoinTask<?>[] oldQ = queue;
505     int oldSize = oldQ.length;
506     int newSize = oldSize << 1;
507     if (newSize > MAXIMUM_QUEUE_CAPACITY)
508     throw new RejectedExecutionException("Queue capacity exceeded");
509     ForkJoinTask<?>[] newQ = queue = new ForkJoinTask<?>[newSize];
510    
511     int b = base;
512     int bf = b + oldSize;
513     int oldMask = oldSize - 1;
514     int newMask = newSize - 1;
515     do {
516     int oldIndex = b & oldMask;
517     ForkJoinTask<?> t = oldQ[oldIndex];
518     if (t != null && !casSlotNull(oldQ, oldIndex, t))
519     t = null;
520     setSlot(newQ, b & newMask, t);
521     } while (++b != bf);
522 dl 1.5 pool.signalWork();
523 dl 1.1 }
524    
525     /**
526 dl 1.5 * Tries to steal a task from another worker. Starts at a random
527     * index of workers array, and probes workers until finding one
528     * with non-empty queue or finding that all are empty. It
529     * randomly selects the first n probes. If these are empty, it
530     * resorts to a full circular traversal, which is necessary to
531     * accurately set active status by caller. Also restarts if pool
532     * events occurred since last scan, which forces refresh of
533     * workers array, in case barrier was associated with resize.
534 dl 1.1 *
535     * This method must be both fast and quiet -- usually avoiding
536     * memory accesses that could disrupt cache sharing etc other than
537     * those needed to check for and take tasks. This accounts for,
538     * among other things, updating random seed in place without
539 dl 1.5 * storing it until exit.
540 dl 1.1 *
541     * @return a task, or null if none found
542     */
543 dl 1.5 private ForkJoinTask<?> scan() {
544     ForkJoinTask<?> t = null;
545     int r = seed; // extract once to keep scan quiet
546     ForkJoinWorkerThread[] ws; // refreshed on outer loop
547     int mask; // must be power 2 minus 1 and > 0
548     outer:do {
549     if ((ws = pool.workers) != null && (mask = ws.length - 1) > 0) {
550 dl 1.1 int idx = r;
551 dl 1.5 int probes = ~mask; // use random index while negative
552 dl 1.1 for (;;) {
553 dl 1.5 r = xorShift(r); // update random seed
554     ForkJoinWorkerThread v = ws[mask & idx];
555     if (v == null || v.sp == v.base) {
556     if (probes <= mask)
557     idx = (probes++ < 0)? r : (idx + 1);
558     else
559     break;
560 dl 1.1 }
561 dl 1.5 else if (!tryActivate() || (t = v.deqTask()) == null)
562     continue outer; // restart on contention
563 dl 1.1 else
564 dl 1.5 break outer;
565 dl 1.1 }
566     }
567 dl 1.5 } while (pool.hasNewSyncEvent(this)); // retry on pool events
568     seed = r;
569     return t;
570 dl 1.1 }
571    
572     /**
573 dl 1.5 * Pops or steals a task
574     * @return a task, if available
575     */
576     final ForkJoinTask<?> pollTask() {
577     ForkJoinTask<?> t = popTask();
578     if (t == null && (t = scan()) != null)
579     ++stealCount;
580     return t;
581 dl 1.1 }
582    
583     /**
584 dl 1.5 * Returns a pool submission, if one exists, activating first.
585     * @return a submission, if available
586 dl 1.1 */
587 dl 1.5 private ForkJoinTask<?> pollSubmission() {
588     ForkJoinPool p = pool;
589     while (p.hasQueuedSubmissions()) {
590     ForkJoinTask<?> t;
591     if (tryActivate() && (t = p.pollSubmission()) != null)
592     return t;
593 dl 1.1 }
594 dl 1.5 return null;
595 dl 1.1 }
596 dl 1.5
597     // Methods accessed only by Pool
598    
599 dl 1.1 /**
600 dl 1.5 * Removes and cancels all tasks in queue. Can be called from any
601     * thread.
602 dl 1.1 */
603 dl 1.5 final void cancelTasks() {
604 dl 1.3 ForkJoinTask<?> t;
605 dl 1.5 while (base != sp && (t = deqTask()) != null)
606     t.cancelIgnoringExceptions();
607 dl 1.1 }
608    
609     /**
610 dl 1.5 * Get and clear steal count for accumulation by pool. Called
611     * only when known to be idle (in pool.sync and termination).
612 dl 1.1 */
613 dl 1.5 final int getAndClearStealCount() {
614     int sc = stealCount;
615     stealCount = 0;
616     return sc;
617     }
618    
619     /**
620     * Returns true if at least one worker in the given array appears
621     * to have at least one queued task.
622     * @param ws array of workers
623     */
624     static boolean hasQueuedTasks(ForkJoinWorkerThread[] ws) {
625     if (ws != null) {
626     int len = ws.length;
627     for (int j = 0; j < 2; ++j) { // need two passes for clean sweep
628     for (int i = 0; i < len; ++i) {
629     ForkJoinWorkerThread w = ws[i];
630     if (w != null && w.sp != w.base)
631     return true;
632 dl 1.1 }
633     }
634     }
635 dl 1.5 return false;
636 dl 1.1 }
637    
638 dl 1.5 // Support methods for ForkJoinTask
639    
640 dl 1.1 /**
641 dl 1.2 * Returns an estimate of the number of tasks in the queue.
642     */
643     final int getQueueSize() {
644 dl 1.3 int n = sp - base;
645 dl 1.5 return n < 0? 0 : n; // suppress momentarily negative values
646 dl 1.2 }
647    
648     /**
649 dl 1.1 * Returns an estimate of the number of tasks, offset by a
650     * function of number of idle workers.
651     */
652     final int getEstimatedSurplusTaskCount() {
653 dl 1.3 // The halving approximates weighting idle vs non-idle workers
654 dl 1.1 return (sp - base) - (pool.getIdleThreadCount() >>> 1);
655     }
656    
657     /**
658 dl 1.5 * Scan, returning early if joinMe done
659 dl 1.1 */
660 dl 1.5 final ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
661     ForkJoinTask<?> t = pollTask();
662     if (t != null && joinMe.status < 0 && sp == base) {
663     pushTask(t); // unsteal if done and this task would be stealable
664     t = null;
665     }
666     return t;
667 dl 1.1 }
668 dl 1.5
669 dl 1.1 /**
670 dl 1.5 * Runs tasks until pool isQuiescent
671 dl 1.1 */
672 dl 1.5 final void helpQuiescePool() {
673     for (;;) {
674     ForkJoinTask<?> t = pollTask();
675     if (t != null)
676     t.quietlyExec();
677     else if (tryInactivate() && pool.isQuiescent())
678     break;
679     }
680     do;while (!tryActivate()); // re-activate on exit
681 dl 1.1 }
682    
683     // Temporary Unsafe mechanics for preliminary release
684    
685     static final Unsafe _unsafe;
686     static final long baseOffset;
687     static final long spOffset;
688 dl 1.5 static final long runStateOffset;
689 dl 1.1 static final long qBase;
690     static final int qShift;
691     static {
692     try {
693     if (ForkJoinWorkerThread.class.getClassLoader() != null) {
694     Field f = Unsafe.class.getDeclaredField("theUnsafe");
695     f.setAccessible(true);
696     _unsafe = (Unsafe)f.get(null);
697     }
698     else
699     _unsafe = Unsafe.getUnsafe();
700     baseOffset = _unsafe.objectFieldOffset
701     (ForkJoinWorkerThread.class.getDeclaredField("base"));
702     spOffset = _unsafe.objectFieldOffset
703     (ForkJoinWorkerThread.class.getDeclaredField("sp"));
704     runStateOffset = _unsafe.objectFieldOffset
705     (ForkJoinWorkerThread.class.getDeclaredField("runState"));
706     qBase = _unsafe.arrayBaseOffset(ForkJoinTask[].class);
707     int s = _unsafe.arrayIndexScale(ForkJoinTask[].class);
708     if ((s & (s-1)) != 0)
709     throw new Error("data type scale not a power of two");
710     qShift = 31 - Integer.numberOfLeadingZeros(s);
711     } catch (Exception e) {
712     throw new RuntimeException("Could not initialize intrinsics", e);
713     }
714     }
715     }