ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
Revision: 1.2
Committed: Wed Jan 7 16:07:37 2009 UTC (15 years, 4 months ago) by dl
Branch: MAIN
Changes since 1.1: +53 -163 lines
Log Message:
Improved documentaion; moved methods to improve javadoc flow; regularized extension APIs

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package jsr166y;
8 import java.util.*;
9 import java.util.concurrent.*;
10 import java.util.concurrent.atomic.*;
11 import java.util.concurrent.locks.*;
12 import sun.misc.Unsafe;
13 import java.lang.reflect.*;
14
15 /**
16 * A thread managed by a {@link ForkJoinPool}. This class is
17 * subclassable solely for the sake of adding functionality -- there
18 * are no overridable methods dealing with scheduling or
19 * execution. However, you can override initialization and termination
20 * cleanup methods surrounding the main task processing loop. If you
21 * do create such a subclass, you will also need to supply a custom
22 * ForkJoinWorkerThreadFactory to use it in a ForkJoinPool.
23 *
24 * <p>This class also provides methods for generating per-thread
25 * random numbers, with the same properties as {@link
26 * java.util.Random} but with each generator isolated from those of
27 * other threads.
28 */
29 public class ForkJoinWorkerThread extends Thread {
30 /*
31 * Algorithm overview:
32 *
33 * 1. Work-Stealing: Work-stealing queues are special forms of
34 * Deques that support only three of the four possible
35 * end-operations -- push, pop, and deq (aka steal), and only do
36 * so under the constraints that push and pop are called only from
37 * the owning thread, while deq may be called from other threads.
38 * (If you are unfamiliar with them, you probably want to read
39 * Herlihy and Shavit's book "The Art of Multiprocessor
40 * programming", chapter 16 describing these in more detail before
41 * proceeding.) The main work-stealing queue design is roughly
42 * similar to "Dynamic Circular Work-Stealing Deque" by David
43 * Chase and Yossi Lev, SPAA 2005
44 * (http://research.sun.com/scalable/pubs/index.html). The main
45 * difference ultimately stems from gc requirements that we null
46 * out taken slots as soon as we can, to maintain as small a
47 * footprint as possible even in programs generating huge numbers
48 * of tasks. To accomplish this, we shift the CAS arbitrating pop
49 * vs deq (steal) from being on the indices ("base" and "sp") to
50 * the slots themselves (mainly via method "casSlotNull()"). So,
51 * both a successful pop and deq mainly entail CAS'ing a nonnull
52 * slot to null. Because we rely on CASes of references, we do
53 * not need tag bits on base or sp. They are simple ints as used
54 * in any circular array-based queue (see for example ArrayDeque).
55 * Updates to the indices must still be ordered in a way that
56 * guarantees that (sp - base) > 0 means the queue is empty, but
57 * otherwise may err on the side of possibly making the queue
58 * appear nonempty when a push, pop, or deq have not fully
59 * committed. Note that this means that the deq operation,
60 * considered individually, is not wait-free. One thief cannot
61 * successfully continue until another in-progress one (or, if
62 * previously empty, a push) completes. However, in the
63 * aggregate, we ensure at least probablistic non-blockingness. If
64 * an attempted steal fails, a thief always chooses a different
65 * random victim target to try next. So, in order for one thief to
66 * progress, it suffices for any in-progress deq or new push on
67 * any empty queue to complete. One reason this works well here is
68 * that apparently-nonempty often means soon-to-be-stealable,
69 * which gives threads a chance to activate if necessary before
70 * stealing (see below).
71 *
72 * Efficient implementation of this approach currently relies on
73 * an uncomfortable amount of "Unsafe" mechanics. To maintain
74 * correct orderings, reads and writes of variable base require
75 * volatile ordering. Variable sp does not require volatile write
76 * but needs cheaper store-ordering on writes. Because they are
77 * protected by volatile base reads, reads of the queue array and
78 * its slots do not need volatile load semantics, but writes (in
79 * push) require store order and CASes (in pop and deq) require
80 * (volatile) CAS semantics. Since these combinations aren't
81 * supported using ordinary volatiles, the only way to accomplish
82 * these effciently is to use direct Unsafe calls. (Using external
83 * AtomicIntegers and AtomicReferenceArrays for the indices and
84 * array is significantly slower because of memory locality and
85 * indirection effects.) Further, performance on most platforms is
86 * very sensitive to placement and sizing of the (resizable) queue
87 * array. Even though these queues don't usually become all that
88 * big, the initial size must be large enough to counteract cache
89 * contention effects across multiple queues (especially in the
90 * presence of GC cardmarking). Also, to improve thread-locality,
91 * queues are currently initialized immediately after the thread
92 * gets the initial signal to start processing tasks. However,
93 * all queue-related methods except pushTask are written in a way
94 * that allows them to instead be lazily allocated and/or disposed
95 * of when empty. All together, these low-level implementation
96 * choices produce as much as a factor of 4 performance
97 * improvement compared to naive implementations, and enable the
98 * processing of billions of tasks per second, sometimes at the
99 * expense of ugliness.
100 *
101 * 2. Run control: The primary run control is based on a global
102 * counter (activeCount) held by the pool. It uses an algorithm
103 * similar to that in Herlihy and Shavit section 17.6 to cause
104 * threads to eventually block when all threads declare they are
105 * inactive. (See variable "scans".) For this to work, threads
106 * must be declared active when executing tasks, and before
107 * stealing a task. They must be inactive before blocking on the
108 * Pool Barrier (awaiting a new submission or other Pool
109 * event). In between, there is some free play which we take
110 * advantage of to avoid contention and rapid flickering of the
111 * global activeCount: If inactive, we activate only if a victim
112 * queue appears to be nonempty (see above). Similarly, a thread
113 * tries to inactivate only after a full scan of other threads.
114 * The net effect is that contention on activeCount is rarely a
115 * measurable performance issue. (There are also a few other cases
116 * where we scan for work rather than retry/block upon
117 * contention.)
118 *
119 * 3. Selection control. We maintain policy of always choosing to
120 * run local tasks rather than stealing, and always trying to
121 * steal tasks before trying to run a new submission. All steals
122 * are currently performed in randomly-chosen deq-order. It may be
123 * worthwhile to bias these with locality / anti-locality
124 * information, but doing this well probably requires more
125 * lower-level information from JVMs than currently provided.
126 */
127
128 /**
129 * Capacity of work-stealing queue array upon initialization.
130 * Must be a power of two. Initial size must be at least 2, but is
131 * padded to minimize cache effects.
132 */
133 private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
134
135 /**
136 * Maximum work-stealing queue array size. Must be less than or
137 * equal to 1 << 30 to ensure lack of index wraparound.
138 */
139 private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 30;
140
141 /**
142 * Generator of seeds for per-thread random numbers.
143 */
144 private static final Random randomSeedGenerator = new Random();
145
146 /**
147 * The work-stealing queue array. Size must be a power of two.
148 */
149 private ForkJoinTask<?>[] queue;
150
151 /**
152 * Index (mod queue.length) of next queue slot to push to or pop
153 * from. It is written only by owner thread, via ordered store.
154 * Both sp and base are allowed to wrap around on overflow, but
155 * (sp - base) still estimates size.
156 */
157 private volatile int sp;
158
159 /**
160 * Index (mod queue.length) of least valid queue slot, which is
161 * always the next position to steal from if nonempty.
162 */
163 private volatile int base;
164
165 /**
166 * The pool this thread works in.
167 */
168 final ForkJoinPool pool;
169
170 /**
171 * Index of this worker in pool array. Set once by pool before
172 * running, and accessed directly by pool during cleanup etc
173 */
174 int poolIndex;
175
176 /**
177 * Run state of this worker. Supports simple versions of the usual
178 * shutdown/shutdownNow control.
179 */
180 private volatile int runState;
181
182 // Runstate values. Order matters
183 private static final int RUNNING = 0;
184 private static final int SHUTDOWN = 1;
185 private static final int TERMINATING = 2;
186 private static final int TERMINATED = 3;
187
188 /**
189 * Activity status. When true, this worker is considered active.
190 * Must be false upon construction. It must be true when executing
191 * tasks, and BEFORE stealing a task. It must be false before
192 * blocking on the Pool Barrier.
193 */
194 private boolean active;
195
196 /**
197 * Number of steals, transferred to pool when idle
198 */
199 private int stealCount;
200
201 /**
202 * Seed for random number generator for choosing steal victims
203 */
204 private int randomVictimSeed;
205
206 /**
207 * Seed for embedded Jurandom
208 */
209 private long juRandomSeed;
210
211 /**
212 * The last barrier event waited for
213 */
214 private long eventCount;
215
216 /**
217 * Creates a ForkJoinWorkerThread operating in the given pool.
218 * @param pool the pool this thread works in
219 * @throws NullPointerException if pool is null
220 */
221 protected ForkJoinWorkerThread(ForkJoinPool pool) {
222 if (pool == null) throw new NullPointerException();
223 this.pool = pool;
224 // remaining initialization deferred to onStart
225 }
226
227 // public access methods
228
229 /**
230 * Returns the pool hosting the current task execution.
231 * @return the pool
232 */
233 public static ForkJoinPool getPool() {
234 return ((ForkJoinWorkerThread)(Thread.currentThread())).pool;
235 }
236
237 /**
238 * Returns the index number of the current worker thread in its
239 * pool. The returned value ranges from zero to the maximum
240 * number of threads (minus one) that have ever been created in
241 * the pool. This method may be useful for applications that
242 * track status or collect results on a per-worker basis.
243 * @return the index number.
244 */
245 public static int getPoolIndex() {
246 return ((ForkJoinWorkerThread)(Thread.currentThread())).poolIndex;
247 }
248
249 // Access methods used by Pool
250
251 /**
252 * Get and clear steal count for accumulation by pool. Called
253 * only when known to be idle (in pool.sync and termination).
254 */
255 final int getAndClearStealCount() {
256 int sc = stealCount;
257 stealCount = 0;
258 return sc;
259 }
260
261 /**
262 * Returns estimate of the number of tasks in the queue, without
263 * correcting for transient negative values
264 */
265 final int getRawQueueSize() {
266 return sp - base;
267 }
268
269 // Intrinsics-based support for queue operations.
270 // Currently these three (setSp, setSlot, casSlotNull) are
271 // usually manually inlined to improve performance
272
273 /**
274 * Sets sp in store-order.
275 */
276 private void setSp(int s) {
277 _unsafe.putOrderedInt(this, spOffset, s);
278 }
279
280 /**
281 * Add in store-order the given task at given slot of q to
282 * null. Caller must ensure q is nonnull and index is in range.
283 */
284 private static void setSlot(ForkJoinTask<?>[] q, int i,
285 ForkJoinTask<?> t){
286 _unsafe.putOrderedObject(q, (i << qShift) + qBase, t);
287 }
288
289 /**
290 * CAS given slot of q to null. Caller must ensure q is nonnull
291 * and index is in range.
292 */
293 private static boolean casSlotNull(ForkJoinTask<?>[] q, int i,
294 ForkJoinTask<?> t) {
295 return _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null);
296 }
297
298 // Main queue methods
299
300 /**
301 * Pushes a task. Called only by current thread.
302 * @param t the task. Caller must ensure nonnull
303 */
304 final void pushTask(ForkJoinTask<?> t) {
305 ForkJoinTask<?>[] q = queue;
306 int mask = q.length - 1;
307 int s = sp;
308 _unsafe.putOrderedObject(q, ((s & mask) << qShift) + qBase, t);
309 _unsafe.putOrderedInt(this, spOffset, ++s);
310 if ((s -= base) == 1)
311 pool.signalNonEmptyWorkerQueue();
312 else if (s >= mask)
313 growQueue();
314 }
315
316 /**
317 * Tries to take a task from the base of the queue, failing if
318 * either empty or contended.
319 * @return a task, or null if none or contended.
320 */
321 private ForkJoinTask<?> deqTask() {
322 ForkJoinTask<?>[] q;
323 ForkJoinTask<?> t;
324 int i;
325 int b;
326 if (sp != (b = base) &&
327 (q = queue) != null && // must read q after b
328 (t = q[i = (q.length - 1) & b]) != null &&
329 _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) {
330 base = b + 1;
331 return t;
332 }
333 return null;
334 }
335
336 /**
337 * Returns a popped task, or null if empty. Called only by
338 * current thread.
339 */
340 final ForkJoinTask<?> popTask() {
341 ForkJoinTask<?> t;
342 int i;
343 ForkJoinTask<?>[] q = queue;
344 int mask = q.length - 1;
345 int s = sp;
346 if (s != base &&
347 (t = q[i = (s - 1) & mask]) != null &&
348 _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) {
349 _unsafe.putOrderedInt(this, spOffset, s - 1);
350 return t;
351 }
352 return null;
353 }
354
355 /**
356 * Specialized version of popTask to pop only if
357 * topmost element is the given task. Called only
358 * by current thread.
359 * @param t the task. Caller must ensure nonnull
360 */
361 final boolean unpushTask(ForkJoinTask<?> t) {
362 ForkJoinTask<?>[] q = queue;
363 int mask = q.length - 1;
364 int s = sp - 1;
365 if (_unsafe.compareAndSwapObject(q, ((s & mask) << qShift) + qBase,
366 t, null)) {
367 _unsafe.putOrderedInt(this, spOffset, s);
368 return true;
369 }
370 return false;
371 }
372
373 /**
374 * Returns next task to pop.
375 */
376 final ForkJoinTask<?> peekTask() {
377 ForkJoinTask<?>[] q = queue;
378 return q == null? null : q[(sp - 1) & (q.length - 1)];
379 }
380
381 /**
382 * Doubles queue array size. Transfers elements by emulating
383 * steals (deqs) from old array and placing, oldest first, into
384 * new array.
385 */
386 private void growQueue() {
387 ForkJoinTask<?>[] oldQ = queue;
388 int oldSize = oldQ.length;
389 int newSize = oldSize << 1;
390 if (newSize > MAXIMUM_QUEUE_CAPACITY)
391 throw new RejectedExecutionException("Queue capacity exceeded");
392 ForkJoinTask<?>[] newQ = queue = new ForkJoinTask<?>[newSize];
393
394 int b = base;
395 int bf = b + oldSize;
396 int oldMask = oldSize - 1;
397 int newMask = newSize - 1;
398 do {
399 int oldIndex = b & oldMask;
400 ForkJoinTask<?> t = oldQ[oldIndex];
401 if (t != null && !casSlotNull(oldQ, oldIndex, t))
402 t = null;
403 setSlot(newQ, b & newMask, t);
404 } while (++b != bf);
405 pool.signalIdleWorkers(false);
406 }
407
408 // Runstate management
409
410 final boolean isShutdown() { return runState >= SHUTDOWN; }
411 final boolean isTerminating() { return runState >= TERMINATING; }
412 final boolean isTerminated() { return runState == TERMINATED; }
413 final boolean shutdown() { return transitionRunStateTo(SHUTDOWN); }
414 final boolean shutdownNow() { return transitionRunStateTo(TERMINATING); }
415
416 /**
417 * Transition to at least the given state. Return true if not
418 * already at least given state.
419 */
420 private boolean transitionRunStateTo(int state) {
421 for (;;) {
422 int s = runState;
423 if (s >= state)
424 return false;
425 if (_unsafe.compareAndSwapInt(this, runStateOffset, s, state))
426 return true;
427 }
428 }
429
430 /**
431 * Ensure status is active and if necessary adjust pool active count
432 */
433 final void activate() {
434 if (!active) {
435 active = true;
436 pool.incrementActiveCount();
437 }
438 }
439
440 /**
441 * Ensure status is inactive and if necessary adjust pool active count
442 */
443 final void inactivate() {
444 if (active) {
445 active = false;
446 pool.decrementActiveCount();
447 }
448 }
449
450 // Lifecycle methods
451
452 /**
453 * Initializes internal state after construction but before
454 * processing any tasks. If you override this method, you must
455 * invoke super.onStart() at the beginning of the method.
456 * Initialization requires care: Most fields must have legal
457 * default values, to ensure that attempted accesses from other
458 * threads work correctly even before this thread starts
459 * processing tasks.
460 */
461 protected void onStart() {
462 juRandomSeed = randomSeedGenerator.nextLong();
463 do;while((randomVictimSeed = nextRandomInt()) == 0); // must be nonzero
464 if (queue == null)
465 queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
466
467 // Heuristically allow one initial thread to warm up; others wait
468 if (poolIndex < pool.getParallelism() - 1) {
469 eventCount = pool.sync(this, 0);
470 activate();
471 }
472 }
473
474 /**
475 * Perform cleanup associated with termination of this worker
476 * thread. If you override this method, you must invoke
477 * super.onTermination at the end of the overridden method.
478 *
479 * @param exception the exception causing this thread to abort due
480 * to an unrecoverable error, or null if completed normally.
481 */
482 protected void onTermination(Throwable exception) {
483 try {
484 clearLocalTasks();
485 inactivate();
486 cancelTasks();
487 } finally {
488 terminate(exception);
489 }
490 }
491
492 /**
493 * Notify pool of termination and, if exception is nonnull,
494 * rethrow it to trigger this thread's uncaughtExceptionHandler
495 */
496 private void terminate(Throwable exception) {
497 transitionRunStateTo(TERMINATED);
498 try {
499 pool.workerTerminated(this);
500 } finally {
501 if (exception != null)
502 ForkJoinTask.rethrowException(exception);
503 }
504 }
505
506 /**
507 * Run local tasks on exit from main.
508 */
509 private void clearLocalTasks() {
510 while (base != sp && !pool.isTerminating()) {
511 ForkJoinTask<?> t = popTask();
512 if (t != null) {
513 activate(); // ensure active status
514 t.quietlyExec();
515 }
516 }
517 }
518
519 /**
520 * Removes and cancels all tasks in queue. Can be called from any
521 * thread.
522 */
523 final void cancelTasks() {
524 while (base != sp) {
525 ForkJoinTask<?> t = deqTask();
526 if (t != null)
527 t.cancelIgnoreExceptions();
528 }
529 }
530
531 /**
532 * This method is required to be public, but should never be
533 * called explicitly. It performs the main run loop to execute
534 * ForkJoinTasks.
535 */
536 public void run() {
537 Throwable exception = null;
538 try {
539 onStart();
540 while (!isShutdown())
541 step();
542 } catch (Throwable ex) {
543 exception = ex;
544 } finally {
545 onTermination(exception);
546 }
547 }
548
549 /**
550 * Main top-level action.
551 */
552 private void step() {
553 ForkJoinTask<?> t = sp != base? popTask() : null;
554 if (t != null || (t = scan(null, true)) != null) {
555 activate();
556 t.quietlyExec();
557 }
558 else {
559 inactivate();
560 eventCount = pool.sync(this, eventCount);
561 }
562 }
563
564 // scanning for and stealing tasks
565
566 /**
567 * Computes next value for random victim probe. Scans don't
568 * require a very high quality generator, but also not a crummy
569 * one. Marsaglia xor-shift is cheap and works well.
570 *
571 * This is currently unused, and manually inlined
572 */
573 private static int xorShift(int r) {
574 r ^= r << 1;
575 r ^= r >>> 3;
576 r ^= r << 10;
577 return r;
578 }
579
580 /**
581 * Tries to steal a task from another worker and/or, if enabled,
582 * submission queue. Starts at a random index of workers array,
583 * and probes workers until finding one with non-empty queue or
584 * finding that all are empty. It randomly selects the first n-1
585 * probes. If these are empty, it resorts to full circular
586 * traversal, which is necessary to accurately set active status
587 * by caller. Also restarts if pool barrier has tripped since last
588 * scan, which forces refresh of workers array, in case barrier
589 * was associated with resize.
590 *
591 * This method must be both fast and quiet -- usually avoiding
592 * memory accesses that could disrupt cache sharing etc other than
593 * those needed to check for and take tasks. This accounts for,
594 * among other things, updating random seed in place without
595 * storing it until exit. (Note that we only need to store it if
596 * we found a task; otherwise it doesn't matter if we start at the
597 * same place next time.)
598 *
599 * @param joinMe if non null; exit early if done
600 * @param checkSubmissions true if OK to take submissions
601 * @return a task, or null if none found
602 */
603 private ForkJoinTask<?> scan(ForkJoinTask<?> joinMe,
604 boolean checkSubmissions) {
605 ForkJoinPool p = pool;
606 if (p == null) // Never null, but avoids
607 return null; // implicit nullchecks below
608 int r = randomVictimSeed; // extract once to keep scan quiet
609 restart: // outer loop refreshes ws array
610 while (joinMe == null || joinMe.status >= 0) {
611 int mask;
612 ForkJoinWorkerThread[] ws = p.workers;
613 if (ws != null && (mask = ws.length - 1) > 0) {
614 int probes = -mask; // use random index while negative
615 int idx = r;
616 for (;;) {
617 ForkJoinWorkerThread v;
618 // inlined xorshift to update seed
619 r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
620 if ((v = ws[mask & idx]) != null && v.sp != v.base) {
621 ForkJoinTask<?> t;
622 activate();
623 if ((joinMe == null || joinMe.status >= 0) &&
624 (t = v.deqTask()) != null) {
625 randomVictimSeed = r;
626 ++stealCount;
627 return t;
628 }
629 continue restart; // restart on contention
630 }
631 if ((probes >> 1) <= mask) // n-1 random then circular
632 idx = (probes++ < 0)? r : (idx + 1);
633 else
634 break;
635 }
636 }
637 if (checkSubmissions && p.hasQueuedSubmissions()) {
638 activate();
639 ForkJoinTask<?> t = p.pollSubmission();
640 if (t != null)
641 return t;
642 }
643 else {
644 long ec = eventCount; // restart on pool event
645 if ((eventCount = p.getEventCount()) == ec)
646 break;
647 }
648 }
649 return null;
650 }
651
652 /**
653 * Callback from pool.sync to rescan before blocking. If a
654 * task is found, it is pushed so it can be executed upon return.
655 * @return true if found and pushed a task
656 */
657 final boolean prescan() {
658 ForkJoinTask<?> t = scan(null, true);
659 if (t != null) {
660 pushTask(t);
661 return true;
662 }
663 else {
664 inactivate();
665 return false;
666 }
667 }
668
669 // Support for ForkJoinTask methods
670
671 /**
672 * Implements ForkJoinTask.helpJoin
673 */
674 final int helpJoinTask(ForkJoinTask<?> joinMe) {
675 ForkJoinTask<?> t = null;
676 int s;
677 while ((s = joinMe.status) >= 0) {
678 if (t == null) {
679 if ((t = scan(joinMe, false)) == null) // block if no work
680 return joinMe.awaitDone(this, false);
681 // else recheck status before exec
682 }
683 else {
684 t.quietlyExec();
685 t = null;
686 }
687 }
688 if (t != null) // unsteal
689 pushTask(t);
690 return s;
691 }
692
693 /**
694 * Pops or steals a task
695 * @return task, or null if none available
696 */
697 final ForkJoinTask<?> getLocalOrStolenTask() {
698 ForkJoinTask<?> t = popTask();
699 return t != null? t : scan(null, false);
700 }
701
702 /**
703 * Runs tasks until pool isQuiescent
704 */
705 final void helpQuiescePool() {
706 for (;;) {
707 ForkJoinTask<?> t = getLocalOrStolenTask();
708 if (t != null) {
709 activate();
710 t.quietlyExec();
711 }
712 else {
713 inactivate();
714 if (pool.isQuiescent()) {
715 activate(); // re-activate on exit
716 break;
717 }
718 }
719 }
720 }
721
722 /**
723 * Returns an estimate of the number of tasks in the queue.
724 */
725 final int getQueueSize() {
726 int b = base;
727 int n = sp - b;
728 return n <= 0? 0 : n; // suppress momentarily negative values
729 }
730
731 /**
732 * Returns an estimate of the number of tasks, offset by a
733 * function of number of idle workers.
734 */
735 final int getEstimatedSurplusTaskCount() {
736 return (sp - base) - (pool.getIdleThreadCount() >>> 1);
737 }
738
739 // Per-worker exported random numbers
740
741 // Same constants as java.util.Random
742 final static long JURandomMultiplier = 0x5DEECE66DL;
743 final static long JURandomAddend = 0xBL;
744 final static long JURandomMask = (1L << 48) - 1;
745
746 private final int nextJURandom(int bits) {
747 long next = (juRandomSeed * JURandomMultiplier + JURandomAddend) &
748 JURandomMask;
749 juRandomSeed = next;
750 return (int)(next >>> (48 - bits));
751 }
752
753 private final int nextJURandomInt(int n) {
754 if (n <= 0)
755 throw new IllegalArgumentException("n must be positive");
756 int bits = nextJURandom(31);
757 if ((n & -n) == n)
758 return (int)((n * (long)bits) >> 31);
759
760 for (;;) {
761 int val = bits % n;
762 if (bits - val + (n-1) >= 0)
763 return val;
764 bits = nextJURandom(31);
765 }
766 }
767
768 private final long nextJURandomLong() {
769 return ((long)(nextJURandom(32)) << 32) + nextJURandom(32);
770 }
771
772 private final long nextJURandomLong(long n) {
773 if (n <= 0)
774 throw new IllegalArgumentException("n must be positive");
775 long offset = 0;
776 while (n >= Integer.MAX_VALUE) { // randomly pick half range
777 int bits = nextJURandom(2); // 2nd bit for odd vs even split
778 long half = n >>> 1;
779 long nextn = ((bits & 2) == 0)? half : n - half;
780 if ((bits & 1) == 0)
781 offset += n - nextn;
782 n = nextn;
783 }
784 return offset + nextJURandomInt((int)n);
785 }
786
787 private final double nextJURandomDouble() {
788 return (((long)(nextJURandom(26)) << 27) + nextJURandom(27))
789 / (double)(1L << 53);
790 }
791
792 /**
793 * Returns a random integer using a per-worker random
794 * number generator with the same properties as
795 * {@link java.util.Random#nextInt}
796 * @return the next pseudorandom, uniformly distributed {@code int}
797 * value from this worker's random number generator's sequence
798 */
799 public static int nextRandomInt() {
800 return ((ForkJoinWorkerThread)(Thread.currentThread())).
801 nextJURandom(32);
802 }
803
804 /**
805 * Returns a random integer using a per-worker random
806 * number generator with the same properties as
807 * {@link java.util.Random#nextInt(int)}
808 * @param n the bound on the random number to be returned. Must be
809 * positive.
810 * @return the next pseudorandom, uniformly distributed {@code int}
811 * value between {@code 0} (inclusive) and {@code n} (exclusive)
812 * from this worker's random number generator's sequence
813 * @throws IllegalArgumentException if n is not positive
814 */
815 public static int nextRandomInt(int n) {
816 return ((ForkJoinWorkerThread)(Thread.currentThread())).
817 nextJURandomInt(n);
818 }
819
820 /**
821 * Returns a random long using a per-worker random
822 * number generator with the same properties as
823 * {@link java.util.Random#nextLong}
824 * @return the next pseudorandom, uniformly distributed {@code long}
825 * value from this worker's random number generator's sequence
826 */
827 public static long nextRandomLong() {
828 return ((ForkJoinWorkerThread)(Thread.currentThread())).
829 nextJURandomLong();
830 }
831
832 /**
833 * Returns a random integer using a per-worker random
834 * number generator with the same properties as
835 * {@link java.util.Random#nextInt(int)}
836 * @param n the bound on the random number to be returned. Must be
837 * positive.
838 * @return the next pseudorandom, uniformly distributed {@code int}
839 * value between {@code 0} (inclusive) and {@code n} (exclusive)
840 * from this worker's random number generator's sequence
841 * @throws IllegalArgumentException if n is not positive
842 */
843 public static long nextRandomLong(long n) {
844 return ((ForkJoinWorkerThread)(Thread.currentThread())).
845 nextJURandomLong(n);
846 }
847
848 /**
849 * Returns a random double using a per-worker random
850 * number generator with the same properties as
851 * {@link java.util.Random#nextDouble}
852 * @return the next pseudorandom, uniformly distributed {@code double}
853 * value between {@code 0.0} and {@code 1.0} from this
854 * worker's random number generator's sequence
855 */
856 public static double nextRandomDouble() {
857 return ((ForkJoinWorkerThread)(Thread.currentThread())).
858 nextJURandomDouble();
859 }
860
861 // Temporary Unsafe mechanics for preliminary release
862
863 static final Unsafe _unsafe;
864 static final long baseOffset;
865 static final long spOffset;
866 static final long qBase;
867 static final int qShift;
868 static final long runStateOffset;
869 static {
870 try {
871 if (ForkJoinWorkerThread.class.getClassLoader() != null) {
872 Field f = Unsafe.class.getDeclaredField("theUnsafe");
873 f.setAccessible(true);
874 _unsafe = (Unsafe)f.get(null);
875 }
876 else
877 _unsafe = Unsafe.getUnsafe();
878 baseOffset = _unsafe.objectFieldOffset
879 (ForkJoinWorkerThread.class.getDeclaredField("base"));
880 spOffset = _unsafe.objectFieldOffset
881 (ForkJoinWorkerThread.class.getDeclaredField("sp"));
882 runStateOffset = _unsafe.objectFieldOffset
883 (ForkJoinWorkerThread.class.getDeclaredField("runState"));
884 qBase = _unsafe.arrayBaseOffset(ForkJoinTask[].class);
885 int s = _unsafe.arrayIndexScale(ForkJoinTask[].class);
886 if ((s & (s-1)) != 0)
887 throw new Error("data type scale not a power of two");
888 qShift = 31 - Integer.numberOfLeadingZeros(s);
889 } catch (Exception e) {
890 throw new RuntimeException("Could not initialize intrinsics", e);
891 }
892 }
893 }