ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
Revision: 1.1
Committed: Tue Jan 6 14:30:31 2009 UTC (15 years, 4 months ago) by dl
Branch: MAIN
Log Message:
Refactored and repackaged ForkJoin classes

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package jsr166y;
8 import java.util.*;
9 import java.util.concurrent.*;
10 import java.util.concurrent.atomic.*;
11 import java.util.concurrent.locks.*;
12 import sun.misc.Unsafe;
13 import java.lang.reflect.*;
14
15 /**
16 * A thread that is internally managed by a ForkJoinPool to execute
17 * ForkJoinTasks. This class additionally provides public
18 * <tt>static</tt> methods accessing some basic scheduling and
19 * execution mechanics for the <em>current</em>
20 * ForkJoinWorkerThread. These methods may be invoked only from within
21 * other ForkJoinTask computations. Attempts to invoke in other
22 * contexts result in exceptions or errors including
23 * ClassCastException. These methods enable construction of
24 * special-purpose task classes, as well as specialized idioms
25 * occasionally useful in ForkJoinTask processing.
26 *
27 * <p>The form of supported static methods reflects the fact that
28 * worker threads may access and process tasks obtained in any of
29 * three ways. In preference order: <em>Local</em> tasks are processed
30 * in LIFO (newest first) order. <em>Stolen</em> tasks are obtained
31 * from other threads in FIFO (oldest first) order, only if there are
32 * no local tasks to run. <em>Submissions</em> form a FIFO queue
33 * common to the entire pool, and are started only if no other
34 * work is available.
35 *
36 * <p> This class is subclassable solely for the sake of adding
37 * functionality -- there are no overridable methods dealing with
38 * scheduling or execution. However, you can override initialization
39 * and termination cleanup methods surrounding the main task
40 * processing loop. If you do create such a subclass, you will also
41 * need to supply a custom ForkJoinWorkerThreadFactory to use it in a
42 * ForkJoinPool.
43 */
44 public class ForkJoinWorkerThread extends Thread {
45 /*
46 * Algorithm overview:
47 *
48 * 1. Work-Stealing: Work-stealing queues are special forms of
49 * Deques that support only three of the four possible
50 * end-operations -- push, pop, and deq (aka steal), and only do
51 * so under the constraints that push and pop are called only from
52 * the owning thread, while deq may be called from other threads.
53 * (If you are unfamiliar with them, you probably want to read
54 * Herlihy and Shavit's book "The Art of Multiprocessor
55 * programming", chapter 16 describing these in more detail before
56 * proceeding.) The main work-stealing queue design is roughly
57 * similar to "Dynamic Circular Work-Stealing Deque" by David
58 * Chase and Yossi Lev, SPAA 2005
59 * (http://research.sun.com/scalable/pubs/index.html). The main
60 * difference ultimately stems from gc requirements that we null
61 * out taken slots as soon as we can, to maintain as small a
62 * footprint as possible even in programs generating huge numbers
63 * of tasks. To accomplish this, we shift the CAS arbitrating pop
64 * vs deq (steal) from being on the indices ("base" and "sp") to
65 * the slots themselves (mainly via method "casSlotNull()"). So,
66 * both a successful pop and deq mainly entail CAS'ing a nonnull
67 * slot to null. Because we rely on CASes of references, we do
68 * not need tag bits on base or sp. They are simple ints as used
69 * in any circular array-based queue (see for example ArrayDeque).
70 * Updates to the indices must still be ordered in a way that
71 * guarantees that (sp - base) > 0 means the queue is empty, but
72 * otherwise may err on the side of possibly making the queue
73 * appear nonempty when a push, pop, or deq have not fully
74 * committed. Note that this means that the deq operation,
75 * considered individually, is not wait-free. One thief cannot
76 * successfully continue until another in-progress one (or, if
77 * previously empty, a push) completes. However, in the
78 * aggregate, we ensure at least probablistic non-blockingness. If
79 * an attempted steal fails, a thief always chooses a different
80 * random victim target to try next. So, in order for one thief to
81 * progress, it suffices for any in-progress deq or new push on
82 * any empty queue to complete. One reason this works well here is
83 * that apparently-nonempty often means soon-to-be-stealable,
84 * which gives threads a chance to activate if necessary before
85 * stealing (see below).
86 *
87 * Efficient implementation of this approach currently relies on
88 * an uncomfortable amount of "Unsafe" mechanics. To maintain
89 * correct orderings, reads and writes of variable base require
90 * volatile ordering. Variable sp does not require volatile write
91 * but needs cheaper store-ordering on writes. Because they are
92 * protected by volatile base reads, reads of the queue array and
93 * its slots do not need volatile load semantics, but writes (in
94 * push) require store order and CASes (in pop and deq) require
95 * (volatile) CAS semantics. Since these combinations aren't
96 * supported using ordinary volatiles, the only way to accomplish
97 * these effciently is to use direct Unsafe calls. (Using external
98 * AtomicIntegers and AtomicReferenceArrays for the indices and
99 * array is significantly slower because of memory locality and
100 * indirection effects.) Further, performance on most platforms is
101 * very sensitive to placement and sizing of the (resizable) queue
102 * array. Even though these queues don't usually become all that
103 * big, the initial size must be large enough to counteract cache
104 * contention effects across multiple queues (especially in the
105 * presence of GC cardmarking). Also, to improve thread-locality,
106 * queues are currently initialized immediately after the thread
107 * gets the initial signal to start processing tasks. However,
108 * all queue-related methods except pushTask are written in a way
109 * that allows them to instead be lazily allocated and/or disposed
110 * of when empty. All together, these low-level implementation
111 * choices produce as much as a factor of 4 performance
112 * improvement compared to naive implementations, and enable the
113 * processing of billions of tasks per second, sometimes at the
114 * expense of ugliness.
115 *
116 * 2. Run control: The primary run control is based on a global
117 * counter (activeCount) held by the pool. It uses an algorithm
118 * similar to that in Herlihy and Shavit section 17.6 to cause
119 * threads to eventually block when all threads declare they are
120 * inactive. (See variable "scans".) For this to work, threads
121 * must be declared active when executing tasks, and before
122 * stealing a task. They must be inactive before blocking on the
123 * Pool Barrier (awaiting a new submission or other Pool
124 * event). In between, there is some free play which we take
125 * advantage of to avoid contention and rapid flickering of the
126 * global activeCount: If inactive, we activate only if a victim
127 * queue appears to be nonempty (see above). Similarly, a thread
128 * tries to inactivate only after a full scan of other threads.
129 * The net effect is that contention on activeCount is rarely a
130 * measurable performance issue. (There are also a few other cases
131 * where we scan for work rather than retry/block upon
132 * contention.)
133 *
134 * 3. Selection control. We maintain policy of always choosing to
135 * run local tasks rather than stealing, and always trying to
136 * steal tasks before trying to run a new submission. All steals
137 * are currently performed in randomly-chosen deq-order. It may be
138 * worthwhile to bias these with locality / anti-locality
139 * information, but doing this well probably requires more
140 * lower-level information from JVMs than currently provided.
141 */
142
143 /**
144 * Capacity of work-stealing queue array upon initialization.
145 * Must be a power of two. Initial size must be at least 2, but is
146 * padded to minimize cache effects.
147 */
148 private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
149
150 /**
151 * Maximum work-stealing queue array size. Must be less than or
152 * equal to 1 << 30 to ensure lack of index wraparound.
153 */
154 private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 30;
155
156 /**
157 * Generator of seeds for per-thread random numbers.
158 */
159 private static final Random randomSeedGenerator = new Random();
160
161 /**
162 * The work-stealing queue array. Size must be a power of two.
163 */
164 private ForkJoinTask<?>[] queue;
165
166 /**
167 * Index (mod queue.length) of next queue slot to push to or pop
168 * from. It is written only by owner thread, via ordered store.
169 * Both sp and base are allowed to wrap around on overflow, but
170 * (sp - base) still estimates size.
171 */
172 private volatile int sp;
173
174 /**
175 * Index (mod queue.length) of least valid queue slot, which is
176 * always the next position to steal from if nonempty.
177 */
178 private volatile int base;
179
180 /**
181 * The pool this thread works in.
182 */
183 final ForkJoinPool pool;
184
185 /**
186 * Index of this worker in pool array. Set once by pool before
187 * running, and accessed directly by pool during cleanup etc
188 */
189 int poolIndex;
190
191 /**
192 * Run state of this worker. Supports simple versions of the usual
193 * shutdown/shutdownNow control.
194 */
195 private volatile int runState;
196
197 // Runstate values. Order matters
198 private static final int RUNNING = 0;
199 private static final int SHUTDOWN = 1;
200 private static final int TERMINATING = 2;
201 private static final int TERMINATED = 3;
202
203 /**
204 * Activity status. When true, this worker is considered active.
205 * Must be false upon construction. It must be true when executing
206 * tasks, and BEFORE stealing a task. It must be false before
207 * blocking on the Pool Barrier.
208 */
209 private boolean active;
210
211 /**
212 * Number of steals, transferred to pool when idle
213 */
214 private int stealCount;
215
216 /**
217 * Seed for random number generator for choosing steal victims
218 */
219 private int randomVictimSeed;
220
221 /**
222 * Seed for embedded Jurandom
223 */
224 private long juRandomSeed;
225
226 /**
227 * The last barrier event waited for
228 */
229 private long eventCount;
230
231 /**
232 * Creates a ForkJoinWorkerThread operating in the given pool.
233 * @param pool the pool this thread works in
234 * @throws NullPointerException if pool is null
235 */
236 protected ForkJoinWorkerThread(ForkJoinPool pool) {
237 if (pool == null) throw new NullPointerException();
238 this.pool = pool;
239 // remaining initialization deferred to onStart
240 }
241
242 // Access methods used by Pool
243
244 /**
245 * Get and clear steal count for accumulation by pool. Called
246 * only when known to be idle (in pool.sync and termination).
247 */
248 final int getAndClearStealCount() {
249 int sc = stealCount;
250 stealCount = 0;
251 return sc;
252 }
253
254 /**
255 * Returns estimate of the number of tasks in the queue, without
256 * correcting for transient negative values
257 */
258 final int getRawQueueSize() {
259 return sp - base;
260 }
261
262 // Intrinsics-based support for queue operations.
263 // Currently these three (setSp, setSlot, casSlotNull) are
264 // usually manually inlined to improve performance
265
266 /**
267 * Sets sp in store-order.
268 */
269 private void setSp(int s) {
270 _unsafe.putOrderedInt(this, spOffset, s);
271 }
272
273 /**
274 * Add in store-order the given task at given slot of q to
275 * null. Caller must ensure q is nonnull and index is in range.
276 */
277 private static void setSlot(ForkJoinTask<?>[] q, int i,
278 ForkJoinTask<?> t){
279 _unsafe.putOrderedObject(q, (i << qShift) + qBase, t);
280 }
281
282 /**
283 * CAS given slot of q to null. Caller must ensure q is nonnull
284 * and index is in range.
285 */
286 private static boolean casSlotNull(ForkJoinTask<?>[] q, int i,
287 ForkJoinTask<?> t) {
288 return _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null);
289 }
290
291 // Main queue methods
292
293 /**
294 * Pushes a task. Called only by current thread.
295 * @param t the task. Caller must ensure nonnull
296 */
297 final void pushTask(ForkJoinTask<?> t) {
298 ForkJoinTask<?>[] q = queue;
299 int mask = q.length - 1;
300 int s = sp;
301 _unsafe.putOrderedObject(q, ((s & mask) << qShift) + qBase, t);
302 _unsafe.putOrderedInt(this, spOffset, ++s);
303 if ((s -= base) == 1)
304 pool.signalNonEmptyWorkerQueue();
305 else if (s >= mask)
306 growQueue();
307 }
308
309 /**
310 * Tries to take a task from the base of the queue, failing if
311 * either empty or contended.
312 * @return a task, or null if none or contended.
313 */
314 private ForkJoinTask<?> deqTask() {
315 ForkJoinTask<?>[] q;
316 ForkJoinTask<?> t;
317 int i;
318 int b;
319 if (sp != (b = base) &&
320 (q = queue) != null && // must read q after b
321 (t = q[i = (q.length - 1) & b]) != null &&
322 _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) {
323 base = b + 1;
324 return t;
325 }
326 return null;
327 }
328
329 /**
330 * Returns a popped task, or null if empty. Called only by
331 * current thread.
332 */
333 final ForkJoinTask<?> popTask() {
334 ForkJoinTask<?> t;
335 int i;
336 ForkJoinTask<?>[] q = queue;
337 int mask = q.length - 1;
338 int s = sp;
339 if (s != base &&
340 (t = q[i = (s - 1) & mask]) != null &&
341 _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) {
342 _unsafe.putOrderedInt(this, spOffset, s - 1);
343 return t;
344 }
345 return null;
346 }
347
348 /**
349 * Specialized version of popTask to pop only if
350 * topmost element is the given task. Called only
351 * by current thread.
352 * @param t the task. Caller must ensure nonnull
353 */
354 final boolean unpushTask(ForkJoinTask<?> t) {
355 ForkJoinTask<?>[] q = queue;
356 int mask = q.length - 1;
357 int s = sp - 1;
358 if (_unsafe.compareAndSwapObject(q, ((s & mask) << qShift) + qBase,
359 t, null)) {
360 _unsafe.putOrderedInt(this, spOffset, s);
361 return true;
362 }
363 return false;
364 }
365
366 /**
367 * Returns next task to pop.
368 */
369 private ForkJoinTask<?> peekTask() {
370 ForkJoinTask<?>[] q = queue;
371 return q == null? null : q[(sp - 1) & (q.length - 1)];
372 }
373
374 /**
375 * Doubles queue array size. Transfers elements by emulating
376 * steals (deqs) from old array and placing, oldest first, into
377 * new array.
378 */
379 private void growQueue() {
380 ForkJoinTask<?>[] oldQ = queue;
381 int oldSize = oldQ.length;
382 int newSize = oldSize << 1;
383 if (newSize > MAXIMUM_QUEUE_CAPACITY)
384 throw new RejectedExecutionException("Queue capacity exceeded");
385 ForkJoinTask<?>[] newQ = queue = new ForkJoinTask<?>[newSize];
386
387 int b = base;
388 int bf = b + oldSize;
389 int oldMask = oldSize - 1;
390 int newMask = newSize - 1;
391 do {
392 int oldIndex = b & oldMask;
393 ForkJoinTask<?> t = oldQ[oldIndex];
394 if (t != null && !casSlotNull(oldQ, oldIndex, t))
395 t = null;
396 setSlot(newQ, b & newMask, t);
397 } while (++b != bf);
398 pool.signalIdleWorkers(false);
399 }
400
401 // Runstate management
402
403 final boolean isShutdown() { return runState >= SHUTDOWN; }
404 final boolean isTerminating() { return runState >= TERMINATING; }
405 final boolean isTerminated() { return runState == TERMINATED; }
406 final boolean shutdown() { return transitionRunStateTo(SHUTDOWN); }
407 final boolean shutdownNow() { return transitionRunStateTo(TERMINATING); }
408
409 /**
410 * Transition to at least the given state. Return true if not
411 * already at least given state.
412 */
413 private boolean transitionRunStateTo(int state) {
414 for (;;) {
415 int s = runState;
416 if (s >= state)
417 return false;
418 if (_unsafe.compareAndSwapInt(this, runStateOffset, s, state))
419 return true;
420 }
421 }
422
423 /**
424 * Ensure status is active and if necessary adjust pool active count
425 */
426 final void activate() {
427 if (!active) {
428 active = true;
429 pool.incrementActiveCount();
430 }
431 }
432
433 /**
434 * Ensure status is inactive and if necessary adjust pool active count
435 */
436 final void inactivate() {
437 if (active) {
438 active = false;
439 pool.decrementActiveCount();
440 }
441 }
442
443 // Lifecycle methods
444
445 /**
446 * Initializes internal state after construction but before
447 * processing any tasks. If you override this method, you must
448 * invoke super.onStart() at the beginning of the method.
449 * Initialization requires care: Most fields must have legal
450 * default values, to ensure that attempted accesses from other
451 * threads work correctly even before this thread starts
452 * processing tasks.
453 */
454 protected void onStart() {
455 juRandomSeed = randomSeedGenerator.nextLong();
456 do;while((randomVictimSeed = nextRandomInt()) == 0); // must be nonzero
457 if (queue == null)
458 queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
459
460 // Heuristically allow one initial thread to warm up; others wait
461 if (poolIndex < pool.getParallelism() - 1) {
462 eventCount = pool.sync(this, 0);
463 activate();
464 }
465 }
466
467 /**
468 * Perform cleanup associated with termination of this worker
469 * thread. If you override this method, you must invoke
470 * super.onTermination at the end of the overridden method.
471 *
472 * @param exception the exception causing this thread to abort due
473 * to an unrecoverable error, or null if completed normally.
474 */
475 protected void onTermination(Throwable exception) {
476 try {
477 clearLocalTasks();
478 inactivate();
479 cancelTasks();
480 } finally {
481 terminate(exception);
482 }
483 }
484
485 /**
486 * Notify pool of termination and, if exception is nonnull,
487 * rethrow it to trigger this thread's uncaughtExceptionHandler
488 */
489 private void terminate(Throwable exception) {
490 transitionRunStateTo(TERMINATED);
491 try {
492 pool.workerTerminated(this);
493 } finally {
494 if (exception != null)
495 ForkJoinTask.rethrowException(exception);
496 }
497 }
498
499 /**
500 * Run local tasks on exit from main.
501 */
502 private void clearLocalTasks() {
503 while (base != sp && !pool.isTerminating()) {
504 ForkJoinTask<?> t = popTask();
505 if (t != null) {
506 activate(); // ensure active status
507 t.quietlyExec();
508 }
509 }
510 }
511
512 /**
513 * Removes and cancels all tasks in queue. Can be called from any
514 * thread.
515 */
516 final void cancelTasks() {
517 while (base != sp) {
518 ForkJoinTask<?> t = deqTask();
519 if (t != null)
520 t.cancelIgnoreExceptions();
521 }
522 }
523
524 /**
525 * This method is required to be public, but should never be
526 * called explicitly. It performs the main run loop to execute
527 * ForkJoinTasks.
528 */
529 public void run() {
530 Throwable exception = null;
531 try {
532 onStart();
533 while (!isShutdown())
534 step();
535 } catch (Throwable ex) {
536 exception = ex;
537 } finally {
538 onTermination(exception);
539 }
540 }
541
542 /**
543 * Main top-level action.
544 */
545 private void step() {
546 ForkJoinTask<?> t = sp != base? popTask() : null;
547 if (t != null || (t = scan(null, true)) != null) {
548 activate();
549 t.quietlyExec();
550 }
551 else {
552 inactivate();
553 eventCount = pool.sync(this, eventCount);
554 }
555 }
556
557 // scanning for and stealing tasks
558
559 /**
560 * Computes next value for random victim probe. Scans don't
561 * require a very high quality generator, but also not a crummy
562 * one. Marsaglia xor-shift is cheap and works well.
563 *
564 * This is currently unused, and manually inlined
565 */
566 private static int xorShift(int r) {
567 r ^= r << 1;
568 r ^= r >>> 3;
569 r ^= r << 10;
570 return r;
571 }
572
573 /**
574 * Tries to steal a task from another worker and/or, if enabled,
575 * submission queue. Starts at a random index of workers array,
576 * and probes workers until finding one with non-empty queue or
577 * finding that all are empty. It randomly selects the first n-1
578 * probes. If these are empty, it resorts to full circular
579 * traversal, which is necessary to accurately set active status
580 * by caller. Also restarts if pool barrier has tripped since last
581 * scan, which forces refresh of workers array, in case barrier
582 * was associated with resize.
583 *
584 * This method must be both fast and quiet -- usually avoiding
585 * memory accesses that could disrupt cache sharing etc other than
586 * those needed to check for and take tasks. This accounts for,
587 * among other things, updating random seed in place without
588 * storing it until exit. (Note that we only need to store it if
589 * we found a task; otherwise it doesn't matter if we start at the
590 * same place next time.)
591 *
592 * @param joinMe if non null; exit early if done
593 * @param checkSubmissions true if OK to take submissions
594 * @return a task, or null if none found
595 */
596 private ForkJoinTask<?> scan(ForkJoinTask<?> joinMe,
597 boolean checkSubmissions) {
598 ForkJoinPool p = pool;
599 if (p == null) // Never null, but avoids
600 return null; // implicit nullchecks below
601 int r = randomVictimSeed; // extract once to keep scan quiet
602 restart: // outer loop refreshes ws array
603 while (joinMe == null || joinMe.status >= 0) {
604 int mask;
605 ForkJoinWorkerThread[] ws = p.workers;
606 if (ws != null && (mask = ws.length - 1) > 0) {
607 int probes = -mask; // use random index while negative
608 int idx = r;
609 for (;;) {
610 ForkJoinWorkerThread v;
611 // inlined xorshift to update seed
612 r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
613 if ((v = ws[mask & idx]) != null && v.sp != v.base) {
614 ForkJoinTask<?> t;
615 activate();
616 if ((joinMe == null || joinMe.status >= 0) &&
617 (t = v.deqTask()) != null) {
618 randomVictimSeed = r;
619 ++stealCount;
620 return t;
621 }
622 continue restart; // restart on contention
623 }
624 if ((probes >> 1) <= mask) // n-1 random then circular
625 idx = (probes++ < 0)? r : (idx + 1);
626 else
627 break;
628 }
629 }
630 if (checkSubmissions && p.hasQueuedSubmissions()) {
631 activate();
632 ForkJoinTask<?> t = p.pollSubmission();
633 if (t != null)
634 return t;
635 }
636 else {
637 long ec = eventCount; // restart on pool event
638 if ((eventCount = p.getEventCount()) == ec)
639 break;
640 }
641 }
642 return null;
643 }
644
645 /**
646 * Callback from pool.sync to rescan before blocking. If a
647 * task is found, it is pushed so it can be executed upon return.
648 * @return true if found and pushed a task
649 */
650 final boolean prescan() {
651 ForkJoinTask<?> t = scan(null, true);
652 if (t != null) {
653 pushTask(t);
654 return true;
655 }
656 else {
657 inactivate();
658 return false;
659 }
660 }
661
662 /**
663 * Implements ForkJoinTask.helpJoin
664 */
665 final int helpJoinTask(ForkJoinTask<?> joinMe) {
666 ForkJoinTask<?> t = null;
667 int s;
668 while ((s = joinMe.status) >= 0) {
669 if (t == null) {
670 if ((t = scan(joinMe, false)) == null) // block if no work
671 return joinMe.awaitDone(this, false);
672 // else recheck status before exec
673 }
674 else {
675 t.quietlyExec();
676 t = null;
677 }
678 }
679 if (t != null) // unsteal
680 pushTask(t);
681 return s;
682 }
683
684 // Support for public static and/or ForkJoinTask methods
685
686 /**
687 * Returns an estimate of the number of tasks in the queue.
688 */
689 final int getQueueSize() {
690 int b = base;
691 int n = sp - b;
692 return n <= 0? 0 : n; // suppress momentarily negative values
693 }
694
695 /**
696 * Runs one popped task, if available
697 * @return true if ran a task
698 */
699 private boolean runLocalTask() {
700 ForkJoinTask<?> t = popTask();
701 if (t == null)
702 return false;
703 t.quietlyExec();
704 return true;
705 }
706
707 /**
708 * Pops or steals a task
709 * @return task, or null if none available
710 */
711 private ForkJoinTask<?> getLocalOrStolenTask() {
712 ForkJoinTask<?> t = popTask();
713 return t != null? t : scan(null, false);
714 }
715
716 /**
717 * Runs a popped or stolen task, if available
718 * @return true if ran a task
719 */
720 private boolean runLocalOrStolenTask() {
721 ForkJoinTask<?> t = getLocalOrStolenTask();
722 if (t == null)
723 return false;
724 t.quietlyExec();
725 return true;
726 }
727
728 /**
729 * Runs tasks until pool isQuiescent
730 */
731 final void helpQuiescePool() {
732 activate();
733 for (;;) {
734 if (!runLocalOrStolenTask()) {
735 inactivate();
736 if (pool.isQuiescent()) {
737 activate(); // re-activate on exit
738 break;
739 }
740 }
741 }
742 }
743
744 /**
745 * Returns an estimate of the number of tasks, offset by a
746 * function of number of idle workers.
747 */
748 final int getEstimatedSurplusTaskCount() {
749 return (sp - base) - (pool.getIdleThreadCount() >>> 1);
750 }
751
752 // Public methods on current thread
753
754 /**
755 * Returns the pool hosting the current task execution.
756 * @return the pool
757 */
758 public static ForkJoinPool getPool() {
759 return ((ForkJoinWorkerThread)(Thread.currentThread())).pool;
760 }
761
762 /**
763 * Returns the index number of the current worker thread in its
764 * pool. The returned value ranges from zero to the maximum
765 * number of threads (minus one) that have ever been created in
766 * the pool. This method may be useful for applications that
767 * track status or collect results per-worker rather than
768 * per-task.
769 * @return the index number.
770 */
771 public static int getPoolIndex() {
772 return ((ForkJoinWorkerThread)(Thread.currentThread())).poolIndex;
773 }
774
775 /**
776 * Returns an estimate of the number of tasks waiting to be run by
777 * the current worker thread. This value may be useful for
778 * heuristic decisions about whether to fork other tasks.
779 * @return the number of tasks
780 */
781 public static int getLocalQueueSize() {
782 return ((ForkJoinWorkerThread)(Thread.currentThread())).
783 getQueueSize();
784 }
785
786 /**
787 * Returns, but does not remove or execute, the next task locally
788 * queued for execution by the current worker thread. There is no
789 * guarantee that this task will be the next one actually returned
790 * or executed from other polling or execution methods.
791 * @return the next task or null if none
792 */
793 public static ForkJoinTask<?> peekLocalTask() {
794 return ((ForkJoinWorkerThread)(Thread.currentThread())).peekTask();
795 }
796
797 /**
798 * Removes and returns, without executing, the next task queued
799 * for execution in the current worker thread's local queue.
800 * @return the next task to execute, or null if none
801 */
802 public static ForkJoinTask<?> pollLocalTask() {
803 return ((ForkJoinWorkerThread)(Thread.currentThread())).popTask();
804 }
805
806 /**
807 * Execute the next task locally queued by the current worker, if
808 * one is available.
809 * @return true if a task was run; a false return indicates
810 * that no task was available.
811 */
812 public static boolean executeLocalTask() {
813 return ((ForkJoinWorkerThread)(Thread.currentThread())).
814 runLocalTask();
815 }
816
817 /**
818 * Removes and returns, without executing, the next task queued
819 * for execution in the current worker thread's local queue or if
820 * none, a task stolen from another worker, if one is available.
821 * A null return does not necessarily imply that all tasks are
822 * completed, only that there are currently none available.
823 * @return the next task to execute, or null if none
824 */
825 public static ForkJoinTask<?> pollTask() {
826 return ((ForkJoinWorkerThread)(Thread.currentThread())).
827 getLocalOrStolenTask();
828 }
829
830 /**
831 * Helps this program complete by processing a local or stolen
832 * task, if one is available. This method may be useful when
833 * several tasks are forked, and only one of them must be joined,
834 * as in:
835 *
836 * <pre>
837 * while (!t1.isDone() &amp;&amp; !t2.isDone())
838 * ForkJoinWorkerThread.executeTask();
839 * </pre>
840 *
841 * @return true if a task was run; a false return indicates
842 * that no task was available.
843 */
844 public static boolean executeTask() {
845 return ((ForkJoinWorkerThread)(Thread.currentThread())).
846 runLocalOrStolenTask();
847 }
848
849 // Per-worker exported random numbers
850
851 // Same constants as java.util.Random
852 final static long JURandomMultiplier = 0x5DEECE66DL;
853 final static long JURandomAddend = 0xBL;
854 final static long JURandomMask = (1L << 48) - 1;
855
856 private final int nextJURandom(int bits) {
857 long next = (juRandomSeed * JURandomMultiplier + JURandomAddend) &
858 JURandomMask;
859 juRandomSeed = next;
860 return (int)(next >>> (48 - bits));
861 }
862
863 private final int nextJURandomInt(int n) {
864 if (n <= 0)
865 throw new IllegalArgumentException("n must be positive");
866 int bits = nextJURandom(31);
867 if ((n & -n) == n)
868 return (int)((n * (long)bits) >> 31);
869
870 for (;;) {
871 int val = bits % n;
872 if (bits - val + (n-1) >= 0)
873 return val;
874 bits = nextJURandom(31);
875 }
876 }
877
878 private final long nextJURandomLong() {
879 return ((long)(nextJURandom(32)) << 32) + nextJURandom(32);
880 }
881
882 private final long nextJURandomLong(long n) {
883 if (n <= 0)
884 throw new IllegalArgumentException("n must be positive");
885 long offset = 0;
886 while (n >= Integer.MAX_VALUE) { // randomly pick half range
887 int bits = nextJURandom(2); // 2nd bit for odd vs even split
888 long half = n >>> 1;
889 long nextn = ((bits & 2) == 0)? half : n - half;
890 if ((bits & 1) == 0)
891 offset += n - nextn;
892 n = nextn;
893 }
894 return offset + nextJURandomInt((int)n);
895 }
896
897 private final double nextJURandomDouble() {
898 return (((long)(nextJURandom(26)) << 27) + nextJURandom(27))
899 / (double)(1L << 53);
900 }
901
902 /**
903 * Returns a random integer using a per-worker random
904 * number generator with the same properties as
905 * {@link java.util.Random#nextInt}
906 * @return the next pseudorandom, uniformly distributed {@code int}
907 * value from this worker's random number generator's sequence
908 */
909 public static int nextRandomInt() {
910 return ((ForkJoinWorkerThread)(Thread.currentThread())).
911 nextJURandom(32);
912 }
913
914 /**
915 * Returns a random integer using a per-worker random
916 * number generator with the same properties as
917 * {@link java.util.Random#nextInt(int)}
918 * @param n the bound on the random number to be returned. Must be
919 * positive.
920 * @return the next pseudorandom, uniformly distributed {@code int}
921 * value between {@code 0} (inclusive) and {@code n} (exclusive)
922 * from this worker's random number generator's sequence
923 * @throws IllegalArgumentException if n is not positive
924 */
925 public static int nextRandomInt(int n) {
926 return ((ForkJoinWorkerThread)(Thread.currentThread())).
927 nextJURandomInt(n);
928 }
929
930 /**
931 * Returns a random long using a per-worker random
932 * number generator with the same properties as
933 * {@link java.util.Random#nextLong}
934 * @return the next pseudorandom, uniformly distributed {@code long}
935 * value from this worker's random number generator's sequence
936 */
937 public static long nextRandomLong() {
938 return ((ForkJoinWorkerThread)(Thread.currentThread())).
939 nextJURandomLong();
940 }
941
942 /**
943 * Returns a random integer using a per-worker random
944 * number generator with the same properties as
945 * {@link java.util.Random#nextInt(int)}
946 * @param n the bound on the random number to be returned. Must be
947 * positive.
948 * @return the next pseudorandom, uniformly distributed {@code int}
949 * value between {@code 0} (inclusive) and {@code n} (exclusive)
950 * from this worker's random number generator's sequence
951 * @throws IllegalArgumentException if n is not positive
952 */
953 public static long nextRandomLong(long n) {
954 return ((ForkJoinWorkerThread)(Thread.currentThread())).
955 nextJURandomLong(n);
956 }
957
958 /**
959 * Returns a random double using a per-worker random
960 * number generator with the same properties as
961 * {@link java.util.Random#nextDouble}
962 * @return the next pseudorandom, uniformly distributed {@code double}
963 * value between {@code 0.0} and {@code 1.0} from this
964 * worker's random number generator's sequence
965 */
966 public static double nextRandomDouble() {
967 return ((ForkJoinWorkerThread)(Thread.currentThread())).
968 nextJURandomDouble();
969 }
970
971 // Temporary Unsafe mechanics for preliminary release
972
973 static final Unsafe _unsafe;
974 static final long baseOffset;
975 static final long spOffset;
976 static final long qBase;
977 static final int qShift;
978 static final long runStateOffset;
979 static {
980 try {
981 if (ForkJoinWorkerThread.class.getClassLoader() != null) {
982 Field f = Unsafe.class.getDeclaredField("theUnsafe");
983 f.setAccessible(true);
984 _unsafe = (Unsafe)f.get(null);
985 }
986 else
987 _unsafe = Unsafe.getUnsafe();
988 baseOffset = _unsafe.objectFieldOffset
989 (ForkJoinWorkerThread.class.getDeclaredField("base"));
990 spOffset = _unsafe.objectFieldOffset
991 (ForkJoinWorkerThread.class.getDeclaredField("sp"));
992 runStateOffset = _unsafe.objectFieldOffset
993 (ForkJoinWorkerThread.class.getDeclaredField("runState"));
994 qBase = _unsafe.arrayBaseOffset(ForkJoinTask[].class);
995 int s = _unsafe.arrayIndexScale(ForkJoinTask[].class);
996 if ((s & (s-1)) != 0)
997 throw new Error("data type scale not a power of two");
998 qShift = 31 - Integer.numberOfLeadingZeros(s);
999 } catch (Exception e) {
1000 throw new RuntimeException("Could not initialize intrinsics", e);
1001 }
1002 }
1003 }