ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
Revision: 1.1
Committed: Tue Jan 6 14:30:31 2009 UTC (15 years, 4 months ago) by dl
Branch: MAIN
Log Message:
Refactored and repackaged ForkJoin classes

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package jsr166y;
8     import java.util.*;
9     import java.util.concurrent.*;
10     import java.util.concurrent.atomic.*;
11     import java.util.concurrent.locks.*;
12     import sun.misc.Unsafe;
13     import java.lang.reflect.*;
14    
15     /**
16     * A thread that is internally managed by a ForkJoinPool to execute
17     * ForkJoinTasks. This class additionally provides public
18     * <tt>static</tt> methods accessing some basic scheduling and
19     * execution mechanics for the <em>current</em>
20     * ForkJoinWorkerThread. These methods may be invoked only from within
21     * other ForkJoinTask computations. Attempts to invoke in other
22     * contexts result in exceptions or errors including
23     * ClassCastException. These methods enable construction of
24     * special-purpose task classes, as well as specialized idioms
25     * occasionally useful in ForkJoinTask processing.
26     *
27     * <p>The form of supported static methods reflects the fact that
28     * worker threads may access and process tasks obtained in any of
29     * three ways. In preference order: <em>Local</em> tasks are processed
30     * in LIFO (newest first) order. <em>Stolen</em> tasks are obtained
31     * from other threads in FIFO (oldest first) order, only if there are
32     * no local tasks to run. <em>Submissions</em> form a FIFO queue
33     * common to the entire pool, and are started only if no other
34     * work is available.
35     *
36     * <p> This class is subclassable solely for the sake of adding
37     * functionality -- there are no overridable methods dealing with
38     * scheduling or execution. However, you can override initialization
39     * and termination cleanup methods surrounding the main task
40     * processing loop. If you do create such a subclass, you will also
41     * need to supply a custom ForkJoinWorkerThreadFactory to use it in a
42     * ForkJoinPool.
43     */
44     public class ForkJoinWorkerThread extends Thread {
45     /*
46     * Algorithm overview:
47     *
48     * 1. Work-Stealing: Work-stealing queues are special forms of
49     * Deques that support only three of the four possible
50     * end-operations -- push, pop, and deq (aka steal), and only do
51     * so under the constraints that push and pop are called only from
52     * the owning thread, while deq may be called from other threads.
53     * (If you are unfamiliar with them, you probably want to read
54     * Herlihy and Shavit's book "The Art of Multiprocessor
55     * programming", chapter 16 describing these in more detail before
56     * proceeding.) The main work-stealing queue design is roughly
57     * similar to "Dynamic Circular Work-Stealing Deque" by David
58     * Chase and Yossi Lev, SPAA 2005
59     * (http://research.sun.com/scalable/pubs/index.html). The main
60     * difference ultimately stems from gc requirements that we null
61     * out taken slots as soon as we can, to maintain as small a
62     * footprint as possible even in programs generating huge numbers
63     * of tasks. To accomplish this, we shift the CAS arbitrating pop
64     * vs deq (steal) from being on the indices ("base" and "sp") to
65     * the slots themselves (mainly via method "casSlotNull()"). So,
66     * both a successful pop and deq mainly entail CAS'ing a nonnull
67     * slot to null. Because we rely on CASes of references, we do
68     * not need tag bits on base or sp. They are simple ints as used
69     * in any circular array-based queue (see for example ArrayDeque).
70     * Updates to the indices must still be ordered in a way that
71     * guarantees that (sp - base) > 0 means the queue is empty, but
72     * otherwise may err on the side of possibly making the queue
73     * appear nonempty when a push, pop, or deq have not fully
74     * committed. Note that this means that the deq operation,
75     * considered individually, is not wait-free. One thief cannot
76     * successfully continue until another in-progress one (or, if
77     * previously empty, a push) completes. However, in the
78     * aggregate, we ensure at least probablistic non-blockingness. If
79     * an attempted steal fails, a thief always chooses a different
80     * random victim target to try next. So, in order for one thief to
81     * progress, it suffices for any in-progress deq or new push on
82     * any empty queue to complete. One reason this works well here is
83     * that apparently-nonempty often means soon-to-be-stealable,
84     * which gives threads a chance to activate if necessary before
85     * stealing (see below).
86     *
87     * Efficient implementation of this approach currently relies on
88     * an uncomfortable amount of "Unsafe" mechanics. To maintain
89     * correct orderings, reads and writes of variable base require
90     * volatile ordering. Variable sp does not require volatile write
91     * but needs cheaper store-ordering on writes. Because they are
92     * protected by volatile base reads, reads of the queue array and
93     * its slots do not need volatile load semantics, but writes (in
94     * push) require store order and CASes (in pop and deq) require
95     * (volatile) CAS semantics. Since these combinations aren't
96     * supported using ordinary volatiles, the only way to accomplish
97     * these effciently is to use direct Unsafe calls. (Using external
98     * AtomicIntegers and AtomicReferenceArrays for the indices and
99     * array is significantly slower because of memory locality and
100     * indirection effects.) Further, performance on most platforms is
101     * very sensitive to placement and sizing of the (resizable) queue
102     * array. Even though these queues don't usually become all that
103     * big, the initial size must be large enough to counteract cache
104     * contention effects across multiple queues (especially in the
105     * presence of GC cardmarking). Also, to improve thread-locality,
106     * queues are currently initialized immediately after the thread
107     * gets the initial signal to start processing tasks. However,
108     * all queue-related methods except pushTask are written in a way
109     * that allows them to instead be lazily allocated and/or disposed
110     * of when empty. All together, these low-level implementation
111     * choices produce as much as a factor of 4 performance
112     * improvement compared to naive implementations, and enable the
113     * processing of billions of tasks per second, sometimes at the
114     * expense of ugliness.
115     *
116     * 2. Run control: The primary run control is based on a global
117     * counter (activeCount) held by the pool. It uses an algorithm
118     * similar to that in Herlihy and Shavit section 17.6 to cause
119     * threads to eventually block when all threads declare they are
120     * inactive. (See variable "scans".) For this to work, threads
121     * must be declared active when executing tasks, and before
122     * stealing a task. They must be inactive before blocking on the
123     * Pool Barrier (awaiting a new submission or other Pool
124     * event). In between, there is some free play which we take
125     * advantage of to avoid contention and rapid flickering of the
126     * global activeCount: If inactive, we activate only if a victim
127     * queue appears to be nonempty (see above). Similarly, a thread
128     * tries to inactivate only after a full scan of other threads.
129     * The net effect is that contention on activeCount is rarely a
130     * measurable performance issue. (There are also a few other cases
131     * where we scan for work rather than retry/block upon
132     * contention.)
133     *
134     * 3. Selection control. We maintain policy of always choosing to
135     * run local tasks rather than stealing, and always trying to
136     * steal tasks before trying to run a new submission. All steals
137     * are currently performed in randomly-chosen deq-order. It may be
138     * worthwhile to bias these with locality / anti-locality
139     * information, but doing this well probably requires more
140     * lower-level information from JVMs than currently provided.
141     */
142    
143     /**
144     * Capacity of work-stealing queue array upon initialization.
145     * Must be a power of two. Initial size must be at least 2, but is
146     * padded to minimize cache effects.
147     */
148     private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
149    
150     /**
151     * Maximum work-stealing queue array size. Must be less than or
152     * equal to 1 << 30 to ensure lack of index wraparound.
153     */
154     private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 30;
155    
156     /**
157     * Generator of seeds for per-thread random numbers.
158     */
159     private static final Random randomSeedGenerator = new Random();
160    
161     /**
162     * The work-stealing queue array. Size must be a power of two.
163     */
164     private ForkJoinTask<?>[] queue;
165    
166     /**
167     * Index (mod queue.length) of next queue slot to push to or pop
168     * from. It is written only by owner thread, via ordered store.
169     * Both sp and base are allowed to wrap around on overflow, but
170     * (sp - base) still estimates size.
171     */
172     private volatile int sp;
173    
174     /**
175     * Index (mod queue.length) of least valid queue slot, which is
176     * always the next position to steal from if nonempty.
177     */
178     private volatile int base;
179    
180     /**
181     * The pool this thread works in.
182     */
183     final ForkJoinPool pool;
184    
185     /**
186     * Index of this worker in pool array. Set once by pool before
187     * running, and accessed directly by pool during cleanup etc
188     */
189     int poolIndex;
190    
191     /**
192     * Run state of this worker. Supports simple versions of the usual
193     * shutdown/shutdownNow control.
194     */
195     private volatile int runState;
196    
197     // Runstate values. Order matters
198     private static final int RUNNING = 0;
199     private static final int SHUTDOWN = 1;
200     private static final int TERMINATING = 2;
201     private static final int TERMINATED = 3;
202    
203     /**
204     * Activity status. When true, this worker is considered active.
205     * Must be false upon construction. It must be true when executing
206     * tasks, and BEFORE stealing a task. It must be false before
207     * blocking on the Pool Barrier.
208     */
209     private boolean active;
210    
211     /**
212     * Number of steals, transferred to pool when idle
213     */
214     private int stealCount;
215    
216     /**
217     * Seed for random number generator for choosing steal victims
218     */
219     private int randomVictimSeed;
220    
221     /**
222     * Seed for embedded Jurandom
223     */
224     private long juRandomSeed;
225    
226     /**
227     * The last barrier event waited for
228     */
229     private long eventCount;
230    
231     /**
232     * Creates a ForkJoinWorkerThread operating in the given pool.
233     * @param pool the pool this thread works in
234     * @throws NullPointerException if pool is null
235     */
236     protected ForkJoinWorkerThread(ForkJoinPool pool) {
237     if (pool == null) throw new NullPointerException();
238     this.pool = pool;
239     // remaining initialization deferred to onStart
240     }
241    
242     // Access methods used by Pool
243    
244     /**
245     * Get and clear steal count for accumulation by pool. Called
246     * only when known to be idle (in pool.sync and termination).
247     */
248     final int getAndClearStealCount() {
249     int sc = stealCount;
250     stealCount = 0;
251     return sc;
252     }
253    
254     /**
255     * Returns estimate of the number of tasks in the queue, without
256     * correcting for transient negative values
257     */
258     final int getRawQueueSize() {
259     return sp - base;
260     }
261    
262     // Intrinsics-based support for queue operations.
263     // Currently these three (setSp, setSlot, casSlotNull) are
264     // usually manually inlined to improve performance
265    
266     /**
267     * Sets sp in store-order.
268     */
269     private void setSp(int s) {
270     _unsafe.putOrderedInt(this, spOffset, s);
271     }
272    
273     /**
274     * Add in store-order the given task at given slot of q to
275     * null. Caller must ensure q is nonnull and index is in range.
276     */
277     private static void setSlot(ForkJoinTask<?>[] q, int i,
278     ForkJoinTask<?> t){
279     _unsafe.putOrderedObject(q, (i << qShift) + qBase, t);
280     }
281    
282     /**
283     * CAS given slot of q to null. Caller must ensure q is nonnull
284     * and index is in range.
285     */
286     private static boolean casSlotNull(ForkJoinTask<?>[] q, int i,
287     ForkJoinTask<?> t) {
288     return _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null);
289     }
290    
291     // Main queue methods
292    
293     /**
294     * Pushes a task. Called only by current thread.
295     * @param t the task. Caller must ensure nonnull
296     */
297     final void pushTask(ForkJoinTask<?> t) {
298     ForkJoinTask<?>[] q = queue;
299     int mask = q.length - 1;
300     int s = sp;
301     _unsafe.putOrderedObject(q, ((s & mask) << qShift) + qBase, t);
302     _unsafe.putOrderedInt(this, spOffset, ++s);
303     if ((s -= base) == 1)
304     pool.signalNonEmptyWorkerQueue();
305     else if (s >= mask)
306     growQueue();
307     }
308    
309     /**
310     * Tries to take a task from the base of the queue, failing if
311     * either empty or contended.
312     * @return a task, or null if none or contended.
313     */
314     private ForkJoinTask<?> deqTask() {
315     ForkJoinTask<?>[] q;
316     ForkJoinTask<?> t;
317     int i;
318     int b;
319     if (sp != (b = base) &&
320     (q = queue) != null && // must read q after b
321     (t = q[i = (q.length - 1) & b]) != null &&
322     _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) {
323     base = b + 1;
324     return t;
325     }
326     return null;
327     }
328    
329     /**
330     * Returns a popped task, or null if empty. Called only by
331     * current thread.
332     */
333     final ForkJoinTask<?> popTask() {
334     ForkJoinTask<?> t;
335     int i;
336     ForkJoinTask<?>[] q = queue;
337     int mask = q.length - 1;
338     int s = sp;
339     if (s != base &&
340     (t = q[i = (s - 1) & mask]) != null &&
341     _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) {
342     _unsafe.putOrderedInt(this, spOffset, s - 1);
343     return t;
344     }
345     return null;
346     }
347    
348     /**
349     * Specialized version of popTask to pop only if
350     * topmost element is the given task. Called only
351     * by current thread.
352     * @param t the task. Caller must ensure nonnull
353     */
354     final boolean unpushTask(ForkJoinTask<?> t) {
355     ForkJoinTask<?>[] q = queue;
356     int mask = q.length - 1;
357     int s = sp - 1;
358     if (_unsafe.compareAndSwapObject(q, ((s & mask) << qShift) + qBase,
359     t, null)) {
360     _unsafe.putOrderedInt(this, spOffset, s);
361     return true;
362     }
363     return false;
364     }
365    
366     /**
367     * Returns next task to pop.
368     */
369     private ForkJoinTask<?> peekTask() {
370     ForkJoinTask<?>[] q = queue;
371     return q == null? null : q[(sp - 1) & (q.length - 1)];
372     }
373    
374     /**
375     * Doubles queue array size. Transfers elements by emulating
376     * steals (deqs) from old array and placing, oldest first, into
377     * new array.
378     */
379     private void growQueue() {
380     ForkJoinTask<?>[] oldQ = queue;
381     int oldSize = oldQ.length;
382     int newSize = oldSize << 1;
383     if (newSize > MAXIMUM_QUEUE_CAPACITY)
384     throw new RejectedExecutionException("Queue capacity exceeded");
385     ForkJoinTask<?>[] newQ = queue = new ForkJoinTask<?>[newSize];
386    
387     int b = base;
388     int bf = b + oldSize;
389     int oldMask = oldSize - 1;
390     int newMask = newSize - 1;
391     do {
392     int oldIndex = b & oldMask;
393     ForkJoinTask<?> t = oldQ[oldIndex];
394     if (t != null && !casSlotNull(oldQ, oldIndex, t))
395     t = null;
396     setSlot(newQ, b & newMask, t);
397     } while (++b != bf);
398     pool.signalIdleWorkers(false);
399     }
400    
401     // Runstate management
402    
403     final boolean isShutdown() { return runState >= SHUTDOWN; }
404     final boolean isTerminating() { return runState >= TERMINATING; }
405     final boolean isTerminated() { return runState == TERMINATED; }
406     final boolean shutdown() { return transitionRunStateTo(SHUTDOWN); }
407     final boolean shutdownNow() { return transitionRunStateTo(TERMINATING); }
408    
409     /**
410     * Transition to at least the given state. Return true if not
411     * already at least given state.
412     */
413     private boolean transitionRunStateTo(int state) {
414     for (;;) {
415     int s = runState;
416     if (s >= state)
417     return false;
418     if (_unsafe.compareAndSwapInt(this, runStateOffset, s, state))
419     return true;
420     }
421     }
422    
423     /**
424     * Ensure status is active and if necessary adjust pool active count
425     */
426     final void activate() {
427     if (!active) {
428     active = true;
429     pool.incrementActiveCount();
430     }
431     }
432    
433     /**
434     * Ensure status is inactive and if necessary adjust pool active count
435     */
436     final void inactivate() {
437     if (active) {
438     active = false;
439     pool.decrementActiveCount();
440     }
441     }
442    
443     // Lifecycle methods
444    
445     /**
446     * Initializes internal state after construction but before
447     * processing any tasks. If you override this method, you must
448     * invoke super.onStart() at the beginning of the method.
449     * Initialization requires care: Most fields must have legal
450     * default values, to ensure that attempted accesses from other
451     * threads work correctly even before this thread starts
452     * processing tasks.
453     */
454     protected void onStart() {
455     juRandomSeed = randomSeedGenerator.nextLong();
456     do;while((randomVictimSeed = nextRandomInt()) == 0); // must be nonzero
457     if (queue == null)
458     queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
459    
460     // Heuristically allow one initial thread to warm up; others wait
461     if (poolIndex < pool.getParallelism() - 1) {
462     eventCount = pool.sync(this, 0);
463     activate();
464     }
465     }
466    
467     /**
468     * Perform cleanup associated with termination of this worker
469     * thread. If you override this method, you must invoke
470     * super.onTermination at the end of the overridden method.
471     *
472     * @param exception the exception causing this thread to abort due
473     * to an unrecoverable error, or null if completed normally.
474     */
475     protected void onTermination(Throwable exception) {
476     try {
477     clearLocalTasks();
478     inactivate();
479     cancelTasks();
480     } finally {
481     terminate(exception);
482     }
483     }
484    
485     /**
486     * Notify pool of termination and, if exception is nonnull,
487     * rethrow it to trigger this thread's uncaughtExceptionHandler
488     */
489     private void terminate(Throwable exception) {
490     transitionRunStateTo(TERMINATED);
491     try {
492     pool.workerTerminated(this);
493     } finally {
494     if (exception != null)
495     ForkJoinTask.rethrowException(exception);
496     }
497     }
498    
499     /**
500     * Run local tasks on exit from main.
501     */
502     private void clearLocalTasks() {
503     while (base != sp && !pool.isTerminating()) {
504     ForkJoinTask<?> t = popTask();
505     if (t != null) {
506     activate(); // ensure active status
507     t.quietlyExec();
508     }
509     }
510     }
511    
512     /**
513     * Removes and cancels all tasks in queue. Can be called from any
514     * thread.
515     */
516     final void cancelTasks() {
517     while (base != sp) {
518     ForkJoinTask<?> t = deqTask();
519     if (t != null)
520     t.cancelIgnoreExceptions();
521     }
522     }
523    
524     /**
525     * This method is required to be public, but should never be
526     * called explicitly. It performs the main run loop to execute
527     * ForkJoinTasks.
528     */
529     public void run() {
530     Throwable exception = null;
531     try {
532     onStart();
533     while (!isShutdown())
534     step();
535     } catch (Throwable ex) {
536     exception = ex;
537     } finally {
538     onTermination(exception);
539     }
540     }
541    
542     /**
543     * Main top-level action.
544     */
545     private void step() {
546     ForkJoinTask<?> t = sp != base? popTask() : null;
547     if (t != null || (t = scan(null, true)) != null) {
548     activate();
549     t.quietlyExec();
550     }
551     else {
552     inactivate();
553     eventCount = pool.sync(this, eventCount);
554     }
555     }
556    
557     // scanning for and stealing tasks
558    
559     /**
560     * Computes next value for random victim probe. Scans don't
561     * require a very high quality generator, but also not a crummy
562     * one. Marsaglia xor-shift is cheap and works well.
563     *
564     * This is currently unused, and manually inlined
565     */
566     private static int xorShift(int r) {
567     r ^= r << 1;
568     r ^= r >>> 3;
569     r ^= r << 10;
570     return r;
571     }
572    
573     /**
574     * Tries to steal a task from another worker and/or, if enabled,
575     * submission queue. Starts at a random index of workers array,
576     * and probes workers until finding one with non-empty queue or
577     * finding that all are empty. It randomly selects the first n-1
578     * probes. If these are empty, it resorts to full circular
579     * traversal, which is necessary to accurately set active status
580     * by caller. Also restarts if pool barrier has tripped since last
581     * scan, which forces refresh of workers array, in case barrier
582     * was associated with resize.
583     *
584     * This method must be both fast and quiet -- usually avoiding
585     * memory accesses that could disrupt cache sharing etc other than
586     * those needed to check for and take tasks. This accounts for,
587     * among other things, updating random seed in place without
588     * storing it until exit. (Note that we only need to store it if
589     * we found a task; otherwise it doesn't matter if we start at the
590     * same place next time.)
591     *
592     * @param joinMe if non null; exit early if done
593     * @param checkSubmissions true if OK to take submissions
594     * @return a task, or null if none found
595     */
596     private ForkJoinTask<?> scan(ForkJoinTask<?> joinMe,
597     boolean checkSubmissions) {
598     ForkJoinPool p = pool;
599     if (p == null) // Never null, but avoids
600     return null; // implicit nullchecks below
601     int r = randomVictimSeed; // extract once to keep scan quiet
602     restart: // outer loop refreshes ws array
603     while (joinMe == null || joinMe.status >= 0) {
604     int mask;
605     ForkJoinWorkerThread[] ws = p.workers;
606     if (ws != null && (mask = ws.length - 1) > 0) {
607     int probes = -mask; // use random index while negative
608     int idx = r;
609     for (;;) {
610     ForkJoinWorkerThread v;
611     // inlined xorshift to update seed
612     r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
613     if ((v = ws[mask & idx]) != null && v.sp != v.base) {
614     ForkJoinTask<?> t;
615     activate();
616     if ((joinMe == null || joinMe.status >= 0) &&
617     (t = v.deqTask()) != null) {
618     randomVictimSeed = r;
619     ++stealCount;
620     return t;
621     }
622     continue restart; // restart on contention
623     }
624     if ((probes >> 1) <= mask) // n-1 random then circular
625     idx = (probes++ < 0)? r : (idx + 1);
626     else
627     break;
628     }
629     }
630     if (checkSubmissions && p.hasQueuedSubmissions()) {
631     activate();
632     ForkJoinTask<?> t = p.pollSubmission();
633     if (t != null)
634     return t;
635     }
636     else {
637     long ec = eventCount; // restart on pool event
638     if ((eventCount = p.getEventCount()) == ec)
639     break;
640     }
641     }
642     return null;
643     }
644    
645     /**
646     * Callback from pool.sync to rescan before blocking. If a
647     * task is found, it is pushed so it can be executed upon return.
648     * @return true if found and pushed a task
649     */
650     final boolean prescan() {
651     ForkJoinTask<?> t = scan(null, true);
652     if (t != null) {
653     pushTask(t);
654     return true;
655     }
656     else {
657     inactivate();
658     return false;
659     }
660     }
661    
662     /**
663     * Implements ForkJoinTask.helpJoin
664     */
665     final int helpJoinTask(ForkJoinTask<?> joinMe) {
666     ForkJoinTask<?> t = null;
667     int s;
668     while ((s = joinMe.status) >= 0) {
669     if (t == null) {
670     if ((t = scan(joinMe, false)) == null) // block if no work
671     return joinMe.awaitDone(this, false);
672     // else recheck status before exec
673     }
674     else {
675     t.quietlyExec();
676     t = null;
677     }
678     }
679     if (t != null) // unsteal
680     pushTask(t);
681     return s;
682     }
683    
684     // Support for public static and/or ForkJoinTask methods
685    
686     /**
687     * Returns an estimate of the number of tasks in the queue.
688     */
689     final int getQueueSize() {
690     int b = base;
691     int n = sp - b;
692     return n <= 0? 0 : n; // suppress momentarily negative values
693     }
694    
695     /**
696     * Runs one popped task, if available
697     * @return true if ran a task
698     */
699     private boolean runLocalTask() {
700     ForkJoinTask<?> t = popTask();
701     if (t == null)
702     return false;
703     t.quietlyExec();
704     return true;
705     }
706    
707     /**
708     * Pops or steals a task
709     * @return task, or null if none available
710     */
711     private ForkJoinTask<?> getLocalOrStolenTask() {
712     ForkJoinTask<?> t = popTask();
713     return t != null? t : scan(null, false);
714     }
715    
716     /**
717     * Runs a popped or stolen task, if available
718     * @return true if ran a task
719     */
720     private boolean runLocalOrStolenTask() {
721     ForkJoinTask<?> t = getLocalOrStolenTask();
722     if (t == null)
723     return false;
724     t.quietlyExec();
725     return true;
726     }
727    
728     /**
729     * Runs tasks until pool isQuiescent
730     */
731     final void helpQuiescePool() {
732     activate();
733     for (;;) {
734     if (!runLocalOrStolenTask()) {
735     inactivate();
736     if (pool.isQuiescent()) {
737     activate(); // re-activate on exit
738     break;
739     }
740     }
741     }
742     }
743    
744     /**
745     * Returns an estimate of the number of tasks, offset by a
746     * function of number of idle workers.
747     */
748     final int getEstimatedSurplusTaskCount() {
749     return (sp - base) - (pool.getIdleThreadCount() >>> 1);
750     }
751    
752     // Public methods on current thread
753    
754     /**
755     * Returns the pool hosting the current task execution.
756     * @return the pool
757     */
758     public static ForkJoinPool getPool() {
759     return ((ForkJoinWorkerThread)(Thread.currentThread())).pool;
760     }
761    
762     /**
763     * Returns the index number of the current worker thread in its
764     * pool. The returned value ranges from zero to the maximum
765     * number of threads (minus one) that have ever been created in
766     * the pool. This method may be useful for applications that
767     * track status or collect results per-worker rather than
768     * per-task.
769     * @return the index number.
770     */
771     public static int getPoolIndex() {
772     return ((ForkJoinWorkerThread)(Thread.currentThread())).poolIndex;
773     }
774    
775     /**
776     * Returns an estimate of the number of tasks waiting to be run by
777     * the current worker thread. This value may be useful for
778     * heuristic decisions about whether to fork other tasks.
779     * @return the number of tasks
780     */
781     public static int getLocalQueueSize() {
782     return ((ForkJoinWorkerThread)(Thread.currentThread())).
783     getQueueSize();
784     }
785    
786     /**
787     * Returns, but does not remove or execute, the next task locally
788     * queued for execution by the current worker thread. There is no
789     * guarantee that this task will be the next one actually returned
790     * or executed from other polling or execution methods.
791     * @return the next task or null if none
792     */
793     public static ForkJoinTask<?> peekLocalTask() {
794     return ((ForkJoinWorkerThread)(Thread.currentThread())).peekTask();
795     }
796    
797     /**
798     * Removes and returns, without executing, the next task queued
799     * for execution in the current worker thread's local queue.
800     * @return the next task to execute, or null if none
801     */
802     public static ForkJoinTask<?> pollLocalTask() {
803     return ((ForkJoinWorkerThread)(Thread.currentThread())).popTask();
804     }
805    
806     /**
807     * Execute the next task locally queued by the current worker, if
808     * one is available.
809     * @return true if a task was run; a false return indicates
810     * that no task was available.
811     */
812     public static boolean executeLocalTask() {
813     return ((ForkJoinWorkerThread)(Thread.currentThread())).
814     runLocalTask();
815     }
816    
817     /**
818     * Removes and returns, without executing, the next task queued
819     * for execution in the current worker thread's local queue or if
820     * none, a task stolen from another worker, if one is available.
821     * A null return does not necessarily imply that all tasks are
822     * completed, only that there are currently none available.
823     * @return the next task to execute, or null if none
824     */
825     public static ForkJoinTask<?> pollTask() {
826     return ((ForkJoinWorkerThread)(Thread.currentThread())).
827     getLocalOrStolenTask();
828     }
829    
830     /**
831     * Helps this program complete by processing a local or stolen
832     * task, if one is available. This method may be useful when
833     * several tasks are forked, and only one of them must be joined,
834     * as in:
835     *
836     * <pre>
837     * while (!t1.isDone() &amp;&amp; !t2.isDone())
838     * ForkJoinWorkerThread.executeTask();
839     * </pre>
840     *
841     * @return true if a task was run; a false return indicates
842     * that no task was available.
843     */
844     public static boolean executeTask() {
845     return ((ForkJoinWorkerThread)(Thread.currentThread())).
846     runLocalOrStolenTask();
847     }
848    
849     // Per-worker exported random numbers
850    
851     // Same constants as java.util.Random
852     final static long JURandomMultiplier = 0x5DEECE66DL;
853     final static long JURandomAddend = 0xBL;
854     final static long JURandomMask = (1L << 48) - 1;
855    
856     private final int nextJURandom(int bits) {
857     long next = (juRandomSeed * JURandomMultiplier + JURandomAddend) &
858     JURandomMask;
859     juRandomSeed = next;
860     return (int)(next >>> (48 - bits));
861     }
862    
863     private final int nextJURandomInt(int n) {
864     if (n <= 0)
865     throw new IllegalArgumentException("n must be positive");
866     int bits = nextJURandom(31);
867     if ((n & -n) == n)
868     return (int)((n * (long)bits) >> 31);
869    
870     for (;;) {
871     int val = bits % n;
872     if (bits - val + (n-1) >= 0)
873     return val;
874     bits = nextJURandom(31);
875     }
876     }
877    
878     private final long nextJURandomLong() {
879     return ((long)(nextJURandom(32)) << 32) + nextJURandom(32);
880     }
881    
882     private final long nextJURandomLong(long n) {
883     if (n <= 0)
884     throw new IllegalArgumentException("n must be positive");
885     long offset = 0;
886     while (n >= Integer.MAX_VALUE) { // randomly pick half range
887     int bits = nextJURandom(2); // 2nd bit for odd vs even split
888     long half = n >>> 1;
889     long nextn = ((bits & 2) == 0)? half : n - half;
890     if ((bits & 1) == 0)
891     offset += n - nextn;
892     n = nextn;
893     }
894     return offset + nextJURandomInt((int)n);
895     }
896    
897     private final double nextJURandomDouble() {
898     return (((long)(nextJURandom(26)) << 27) + nextJURandom(27))
899     / (double)(1L << 53);
900     }
901    
902     /**
903     * Returns a random integer using a per-worker random
904     * number generator with the same properties as
905     * {@link java.util.Random#nextInt}
906     * @return the next pseudorandom, uniformly distributed {@code int}
907     * value from this worker's random number generator's sequence
908     */
909     public static int nextRandomInt() {
910     return ((ForkJoinWorkerThread)(Thread.currentThread())).
911     nextJURandom(32);
912     }
913    
914     /**
915     * Returns a random integer using a per-worker random
916     * number generator with the same properties as
917     * {@link java.util.Random#nextInt(int)}
918     * @param n the bound on the random number to be returned. Must be
919     * positive.
920     * @return the next pseudorandom, uniformly distributed {@code int}
921     * value between {@code 0} (inclusive) and {@code n} (exclusive)
922     * from this worker's random number generator's sequence
923     * @throws IllegalArgumentException if n is not positive
924     */
925     public static int nextRandomInt(int n) {
926     return ((ForkJoinWorkerThread)(Thread.currentThread())).
927     nextJURandomInt(n);
928     }
929    
930     /**
931     * Returns a random long using a per-worker random
932     * number generator with the same properties as
933     * {@link java.util.Random#nextLong}
934     * @return the next pseudorandom, uniformly distributed {@code long}
935     * value from this worker's random number generator's sequence
936     */
937     public static long nextRandomLong() {
938     return ((ForkJoinWorkerThread)(Thread.currentThread())).
939     nextJURandomLong();
940     }
941    
942     /**
943     * Returns a random integer using a per-worker random
944     * number generator with the same properties as
945     * {@link java.util.Random#nextInt(int)}
946     * @param n the bound on the random number to be returned. Must be
947     * positive.
948     * @return the next pseudorandom, uniformly distributed {@code int}
949     * value between {@code 0} (inclusive) and {@code n} (exclusive)
950     * from this worker's random number generator's sequence
951     * @throws IllegalArgumentException if n is not positive
952     */
953     public static long nextRandomLong(long n) {
954     return ((ForkJoinWorkerThread)(Thread.currentThread())).
955     nextJURandomLong(n);
956     }
957    
958     /**
959     * Returns a random double using a per-worker random
960     * number generator with the same properties as
961     * {@link java.util.Random#nextDouble}
962     * @return the next pseudorandom, uniformly distributed {@code double}
963     * value between {@code 0.0} and {@code 1.0} from this
964     * worker's random number generator's sequence
965     */
966     public static double nextRandomDouble() {
967     return ((ForkJoinWorkerThread)(Thread.currentThread())).
968     nextJURandomDouble();
969     }
970    
971     // Temporary Unsafe mechanics for preliminary release
972    
973     static final Unsafe _unsafe;
974     static final long baseOffset;
975     static final long spOffset;
976     static final long qBase;
977     static final int qShift;
978     static final long runStateOffset;
979     static {
980     try {
981     if (ForkJoinWorkerThread.class.getClassLoader() != null) {
982     Field f = Unsafe.class.getDeclaredField("theUnsafe");
983     f.setAccessible(true);
984     _unsafe = (Unsafe)f.get(null);
985     }
986     else
987     _unsafe = Unsafe.getUnsafe();
988     baseOffset = _unsafe.objectFieldOffset
989     (ForkJoinWorkerThread.class.getDeclaredField("base"));
990     spOffset = _unsafe.objectFieldOffset
991     (ForkJoinWorkerThread.class.getDeclaredField("sp"));
992     runStateOffset = _unsafe.objectFieldOffset
993     (ForkJoinWorkerThread.class.getDeclaredField("runState"));
994     qBase = _unsafe.arrayBaseOffset(ForkJoinTask[].class);
995     int s = _unsafe.arrayIndexScale(ForkJoinTask[].class);
996     if ((s & (s-1)) != 0)
997     throw new Error("data type scale not a power of two");
998     qShift = 31 - Integer.numberOfLeadingZeros(s);
999     } catch (Exception e) {
1000     throw new RuntimeException("Could not initialize intrinsics", e);
1001     }
1002     }
1003     }