ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/forkjoin/ForkJoinWorkerThread.java
Revision: 1.32
Committed: Tue Jan 6 14:34:59 2009 UTC (15 years, 5 months ago) by dl
Branch: MAIN
CVS Tags: HEAD
Changes since 1.31: +0 -0 lines
State: FILE REMOVED
Log Message:
Repackaging

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package jsr166y.forkjoin;
8 import jsr166y.*;
9 import java.util.*;
10 import java.util.concurrent.*;
11 import java.util.concurrent.atomic.*;
12 import java.util.concurrent.locks.*;
13 import sun.misc.Unsafe;
14 import java.lang.reflect.*;
15
16 /**
17 * A thread that is internally managed by a ForkJoinPool to execute
18 * ForkJoinTasks. This class additionally provides public
19 * <tt>static</tt> methods accessing some basic scheduling and
20 * execution mechanics for the <em>current</em>
21 * ForkJoinWorkerThread. These methods may be invoked only from within
22 * other ForkJoinTask computations. Attempts to invoke in other
23 * contexts result in exceptions or errors including
24 * ClassCastException. These methods enable construction of
25 * special-purpose task classes, as well as specialized idioms
26 * occasionally useful in ForkJoinTask processing.
27 *
28 * <p>The form of supported static methods reflects the fact that
29 * worker threads may access and process tasks obtained in any of
30 * three ways. In preference order: <em>Local</em> tasks are processed
31 * in LIFO (newest first) order. <em>Stolen</em> tasks are obtained
32 * from other threads in FIFO (oldest first) order, only if there are
33 * no local tasks to run. <em>Submissions</em> form a FIFO queue
34 * common to the entire pool, and are started only if no other
35 * work is available.
36 *
37 * <p> This class also includes utility methods for accessing and
38 * manipulating submissions to the pool, in support of extensions that
39 * provide more extensive error recovery and/or alternate forms of
40 * execution.
41 *
42 * <p> This class is subclassable solely for the sake of adding
43 * functionality -- there are no overridable methods dealing with
44 * scheduling or execution. However, you can override initialization
45 * and termination cleanup methods surrounding the main task
46 * processing loop. If you do create such a subclass, you will also
47 * need to supply a custom ForkJoinWorkerThreadFactory to use it in a
48 * ForkJoinPool.
49 */
50 public class ForkJoinWorkerThread extends Thread {
51
52 /*
53 * Algorithm overview:
54 *
55 * 1. Work-Stealing: Work-stealing queues are special forms of
56 * Deques that support only three of the four possible
57 * end-operations -- push, pop, and deq (aka steal), and only do
58 * so under the constraints that push and pop are called only from
59 * the owning thread, while deq may be called from other threads.
60 * (If you are unfamiliar with them, you probably want to read
61 * Herlihy and Shavit's book "The Art of Multiprocessor
62 * programming", chapter 16 describing these in more detail before
63 * proceeding.) The main work-stealing queue design is roughly
64 * similar to "Dynamic Circular Work-Stealing Deque" by David
65 * Chase and Yossi Lev, SPAA 2005
66 * (http://research.sun.com/scalable/pubs/index.html). The main
67 * difference ultimately stems from gc requirements that we null
68 * out taken slots as soon as we can, to maintain as small a
69 * footprint as possible even in programs generating huge numbers
70 * of tasks. To accomplish this, we shift the CAS arbitrating pop
71 * vs deq (steal) from being on the indices ("base" and "sp") to
72 * the slots themselves (mainly via method "casSlotNull()"). So,
73 * both a successful pop and deq mainly entail CAS'ing a nonnull
74 * slot to null. Because we rely on CASes of references, we do
75 * not need tag bits on base or sp. They are simple ints as used
76 * in any circular array-based queue (see for example ArrayDeque).
77 * Updates to the indices must still be ordered in a way that
78 * guarantees that (sp - base) > 0 means the queue is empty, but
79 * otherwise may err on the side of possibly making the queue
80 * appear nonempty when a push, pop, or deq have not fully
81 * committed. Note that this means that the deq operation,
82 * considered individually, is not wait-free. One thief cannot
83 * successfully continue until another in-progress one (or, if
84 * previously empty, a push) completes. However, in the
85 * aggregate, we ensure at least probabilistic non-blockingness. If
86 * an attempted steal fails, a thief always chooses a different
87 * random victim target to try next. So, in order for one thief to
88 * progress, it suffices for any in-progress deq or new push on
89 * any empty queue to complete. One reason this works well here is
90 * that apparently-nonempty often means soon-to-be-stealable,
91 * which gives threads a chance to activate if necessary before
92 * stealing (see below).
93 *
94 * Efficient implementation of this approach currently relies on
95 * an uncomfortable amount of "Unsafe" mechanics. To maintain
96 * correct orderings, reads and writes of variable base require
97 * volatile ordering. Variable sp does not require volatile loads
98 * (so long as other threads read base first), but require
99 * store-ordering on writes. Because they are protected by
100 * volatile base reads, reads of the queue array and its slots do
101 * not need volatile load semantics, but writes (in push) require
102 * store order and CASes (in pop and deq) require (volatile) CAS
103 * semantics. Since these combinations aren't supported using
104 * ordinary volatiles, the only way to accomplish these efficiently
105 * is to use direct Unsafe calls. (Using external AtomicIntegers
106 * and AtomicReferenceArrays for the indices and array is
107 * significantly slower because of memory locality and indirection
108 * effects.) Further, performance on most platforms is very
109 * sensitive to placement and sizing of the (resizable) queue
110 * array. Even though these queues don't usually become all that
111 * big, the initial size must be large enough to counteract cache
112 * contention effects across multiple queues (especially in the
113 * presence of GC cardmarking), Also, to improve thread-locality,
114 * queues are currently initialized immediately after the thread
115 * gets the initial signal to start processing tasks. However,
116 * all queue-related methods except pushTask are written in a way
117 * that allows them to instead be lazily allocated and/or disposed
118 * of when empty. All together, these low-level implementation
119 * choices produce as much as a factor of 4 performance
120 * improvement compared to naive implementations, and enable the
121 * processing of billions of tasks per second, sometimes at the
122 * expense of ugliness.
123 *
124 * 2. Run control: The primary run control is based on a global
125 * counter (activeCount) held by the pool. It uses an algorithm
126 * similar to that in Herlihy and Shavit section 17.6 to cause
127 * threads to eventually block when all threads declare they are
128 * inactive. (See variable "scans".) For this to work, threads
129 * must be declared active when executing tasks, and before
130 * stealing a task. They must be inactive before blocking on the
131 * PoolBarrier (awaiting a new submission or other Pool event). In
132 * between, there is some free play which we take advantage of to
133 * avoid contention and rapid flickering of the global
134 * activeCount: If inactive, we activate only if a victim queue
135 * appears to be nonempty (see above), and even then, back off,
136 * looking for another victim if the attempt (CAS) to increase
137 * activeCount fails. Similarly, a thread tries to inactivate
138 * only after a full scan of other threads, and if the attempted
139 * decrement fails, rescans instead. The net effect is that
140 * contention on activeCount is rarely a measurable performance
141 * issue. (There are also a few other cases where we scan for work
142 * rather than retry/block upon contention.)
143 *
144 * Unlike in previous incarnations of this framework, we do not
145 * ever block worker threads while submissions are executing
146 * (i.e., activeCount is nonzero). Doing so can lead to anomalies
147 * (like convoying of dependent threads) and overheads that negate
148 * benefits. To compensate, we ensure that threads looking for
149 * work are extremely well-behaved. Scans (mainly in
150 * getStolenTask; also getSubmission and scanWhileJoining) do not
151 * modify any variables that might disrupt caches (except, when
152 * necessary, activation status) and probe only the base/sp fields
153 * of other threads unless they appear non-empty. We also
154 * occasionally perform Thread.yields, which may or may not
155 * improve good citizenship. It may be possible to replace this
156 * with a different advisory blocking scheme that better shields
157 * users from the effects of poor ForkJoin task design causing
158 * imbalances, in turn causing excessive spins.
159 *
160 * 3. Selection control. We maintain policy of always choosing to
161 * run local tasks rather than stealing, and always trying to
162 * steal tasks before trying to run a new submission. This shows
163 * up in different ways in different cases though, accounting for
164 * the number of different run/get methods. All steals are
165 * currently performed in randomly-chosen deq-order. It may be
166 * worthwhile to bias these with locality / anti-locality
167 * information, but doing this well probably requires more
168 * lower-level information from JVMs than currently provided.
169 */
170
171 /**
172 * Capacity of work-stealing queue array upon initialization.
173 * Must be a power of two. Initial size must be at least 2, but is
174 * padded to minimize cache effects.
175 */
176 private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
177
178 /**
179 * Maximum work-stealing queue array size. Must be less than or
180 * equal to 1 << 30 to ensure lack of index wraparound.
181 */
182 private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 30;
183
184 /**
185 * Generator of seeds for per-thread random numbers.
186 */
187 private static final Random randomSeedGenerator = new Random();
188
189 /**
190 * Run state of this worker.
191 */
192 private final RunState runState;
193
194 /**
195 * The pool this thread works in.
196 */
197 private final ForkJoinPool pool;
198
199 /**
200 * The work-stealing queue array. Size must be a power of two.
201 */
202 private ForkJoinTask<?>[] queue;
203
204 /**
205 * Index (mod queue.length) of next queue slot to push to or pop
206 * from. It is written only by owner thread, via ordered store.
207 * Both sp and base are allowed to wrap around on overflow, but
208 * (sp - base) still estimates size. To improve array locality,
209 * indices are occasionally renormalized to zero (see
210 * tryInactivate).
211 */
212 private int sp;
213
214 /**
215 * Index (mod queue.length) of least valid queue slot, which is
216 * always the next position to steal from if nonempty.
217 */
218 private volatile int base;
219
220 /**
221 * Activity status and pause control. When zero, this worker is
222 * considered active. Nonzero values indicate number of empty
223 * scans (see getStolenTask) to control pausing. The value must
224 * be nonzero upon construction. It must be zero when executing
225 * tasks, and BEFORE stealing a task. It must be nonzero before
226 * blocking on the PoolBarrier.
227 */
228 private int scans;
229
230 /**
231 * Seed for random number generator for choosing steal victims
232 */
233 private int randomVictimSeed;
234
235 /**
236 * Number of steals, transferred to fullStealCount when idle
237 */
238 private int stealCount;
239
240 /**
241 * Number of steals, just for monitoring purposes,
242 */
243 private volatile long fullStealCount;
244
245 /**
246 * Seed for juRandom methods.
247 */
248 private long juRandomSeed;
249
250 /**
251 * The last event count waited for
252 */
253 private long eventCount;
254
255 /**
256 * Index of this worker in pool array. Set once by pool before running.
257 */
258 private int poolIndex;
259
260 // Padding to help avoid cacheline sharing across workers
261 private int pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
262 private int pad8, pad9, pada, padb, padc, padd, pade, padf;
263
264 /**
265 * Creates a ForkJoinWorkerThread operating in the given pool.
266 * @param pool the pool this thread works in
267 * @throws NullPointerException if pool is null;
268 */
269 protected ForkJoinWorkerThread(ForkJoinPool pool) {
270 if (pool == null) throw new NullPointerException();
271 this.pool = pool;
272 this.runState = new RunState();
273 this.scans = 1;
274 int rseed = randomSeedGenerator.nextInt();
275 this.randomVictimSeed = (rseed == 0)? 1 : rseed; // must be nonzero
276 this.juRandomSeed = randomSeedGenerator.nextLong();
277 }
278
279 // Initialization and access methods used by Pool
280
281 final void setWorkerPoolIndex(int i) {
282 poolIndex = i;
283 }
284
285 final int getWorkerPoolIndex() {
286 return poolIndex;
287 }
288
289 final RunState getRunState() {
290 return runState;
291 }
292
293 final long getWorkerStealCount() {
294 return fullStealCount + stealCount; // can peek at local count too
295 }
296
297 // Primitive support for queue operations
298
299 /**
300 * Sets sp in store-order.
301 */
302 private final void setSp(int s) {
303 _unsafe.putOrderedInt(this, spOffset, s);
304 }
305
306 /**
307 * Add in store-order the given task at given slot of q to
308 * null. Caller must ensure q is nonnull and index is in range.
309 */
310 private static final void setSlot(ForkJoinTask<?>[] q, int i,
311 ForkJoinTask<?> t){
312 _unsafe.putOrderedObject(q, (i << qShift) + qBase, t);
313 }
314
315 /**
316 * CAS given slot of q to null. Caller must ensure q is nonnull
317 * and index is in range.
318 */
319 private static final boolean casSlotNull(ForkJoinTask<?>[] q, int i,
320 ForkJoinTask<?> t) {
321 return _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null);
322 }
323
324 // Main queue methods
325
326 /**
327 * Returns an estimate of the number of tasks in the queue.
328 */
329 final int getQueueSize() {
330 int n = sp - base;
331 return n < 0? 0 : n; // suppress momentarily negative values
332 }
333
334 /**
335 * Pushes a task. Called only by current thread.
336 * @param t the task. Caller must ensure nonnull
337 */
338 final void pushTask(ForkJoinTask<?> t) {
339 ForkJoinTask<?>[] q = queue;
340 int mask = q.length - 1;
341 int s = sp;
342 setSp(s + 1);
343 setSlot(q, s & mask, t);
344 if (mask <= s + 1 - base)
345 growQueue();
346 }
347
348 /**
349 * Tries to take a task from the base of the queue, failing if
350 * either empty or contended.
351 * @return a task, or null if none or contended.
352 */
353 private final ForkJoinTask<?> deqTask() {
354 ForkJoinTask<?> t;
355 int i;
356 int b = base;
357 ForkJoinTask<?>[] q = queue;
358 if (b - sp < 0 &&
359 q != null &&
360 (t = q[i = b & (q.length - 1)]) != null &&
361 casSlotNull(q, i, t)) {
362 base = b + 1;
363 return t;
364 }
365 return null;
366 }
367
368 /**
369 * Returns a popped task, or null if empty. Called only by
370 * current thread.
371 */
372 final ForkJoinTask<?> popTask() {
373 ForkJoinTask<?> t;
374 int i;
375 int s;
376 ForkJoinTask<?>[] q = queue;
377 if (q != null &&
378 (s = sp - 1) - base >= 0 &&
379 (t = q[i = s & (q.length - 1)]) != null &&
380 casSlotNull(q, i, t)) {
381 setSp(s);
382 return t;
383 }
384 return null;
385 }
386
387 /**
388 * Same as popTask, but with implementation biased to expect a
389 * task to be available
390 */
391 private final ForkJoinTask<?> expectedPopTask() {
392 int s;
393 ForkJoinTask<?>[] q = queue;
394 if (q != null) {
395 int i = (q.length - 1) & (s = sp - 1);
396 ForkJoinTask<?> t = q[i];
397 if (casSlotNull(q, i, t) && t != null) {
398 setSp(s);
399 return t;
400 }
401 }
402 return null;
403 }
404
405 /**
406 * Specialized version of popTask to pop only if
407 * topmost element is the given task.
408 * @param t the task to match (null is never matched)
409 */
410 final boolean popIfNext(ForkJoinTask<?> t) {
411 int s;
412 ForkJoinTask<?>[] q = queue;
413 if (t != null && q != null &&
414 casSlotNull(q, (q.length - 1) & (s = sp - 1), t)) {
415 setSp(s);
416 return true;
417 }
418 return false;
419 }
420
421 /**
422 * Returns next task to pop.
423 */
424 final ForkJoinTask<?> peekTask() {
425 ForkJoinTask<?>[] q = queue;
426 return q == null? null : q[(sp - 1) & (q.length - 1)];
427 }
428
429 /**
430 * Doubles queue array size. Transfers elements by emulating
431 * steals (deqs) from old array and placing, oldest first, into
432 * new array.
433 */
434 private final void growQueue() {
435 ForkJoinTask<?>[] oldQ = queue;
436 int oldSize = oldQ.length;
437 int newSize = oldSize << 1;
438 if (newSize > MAXIMUM_QUEUE_CAPACITY)
439 throw new RejectedExecutionException("Queue capacity exceeded");
440 ForkJoinTask<?>[] newQ = queue = new ForkJoinTask<?>[newSize];
441
442 int b = base;
443 int bf = b + oldSize;
444 int oldMask = oldSize - 1;
445 int newMask = newSize - 1;
446 do {
447 int oldIndex = b & oldMask;
448 ForkJoinTask<?> t = oldQ[oldIndex];
449 if (t != null && !casSlotNull(oldQ, oldIndex, t))
450 t = null;
451 setSlot(newQ, b & newMask, t);
452 } while (++b != bf);
453 }
454
455 // Activation control
456
457 /**
458 * Unconditionally set status to active and adjust activeCount
459 */
460 private final void ensureActive() {
461 if (scans != 0) {
462 scans = 0;
463 pool.incrementActiveCount();
464 }
465 }
466
467 /**
468 * Try to activate but fail on contention on active worker counter
469 * @return true if now active
470 */
471 private final boolean tryActivate() {
472 if (scans != 0) {
473 if (!pool.tryIncrementActiveCount())
474 return false;
475 scans = 0;
476 }
477 return true;
478 }
479
480 /**
481 * Unconditionally inactivate. Does not block even if activeCount
482 * now zero. (Use tryInactivate instead.) Needed for helpQuiesce.
483 */
484 private final void ensureInactive() {
485 if (scans == 0) {
486 scans = 1;
487 pool.decrementActiveCount();
488 }
489 }
490
491 /**
492 * Possibly inactivate and block or pause waiting for work. If
493 * pool is quiescent, before blocking, transfer local steal count
494 * to volatile field. Also renormalizes queue indices to improve
495 * future queue array locality. Note that this can cause ongoing
496 * steals to momentarily believe queue is nonempty but will still
497 * fail to extract a task, which at most may cause them to
498 * unnecessarily activate, but even this is minimized by only
499 * doing this upon quiescence.
500 *
501 * Precondition: Local queue is empty, and at least one full scan
502 * of other worker queues and submissions failed to find a task.
503 *
504 * @return true if pool apparently idle on entry to this method
505 */
506 private final boolean tryInactivate() {
507 if (scans == 0 && !pool.tryDecrementActiveCount())
508 return false;
509 ++scans;
510 if (pool.getActiveThreadCount() != 0) {
511 if (scans >= SCANS_PER_PAUSE) {
512 scans = 1;
513 pauseAwaitingWork(null);
514 }
515 return false;
516 }
517 if (sp != 0) { // renormalize indices
518 setSp(0);
519 base = 0;
520 }
521 int sc = stealCount; // accumulate steals
522 if (sc != 0) {
523 long fsc = fullStealCount + sc;
524 stealCount = 0;
525 fullStealCount = fsc;
526 }
527 eventCount = pool.barrierSync(eventCount);
528 return true;
529 }
530
531 // Support for pausing when inactive
532
533 /**
534 * The number of empty steal attempts before pausing. Must be a
535 * power of two.
536 */
537 private static final int PROBES_PER_PAUSE = (1 << 10);
538
539 /**
540 * The number of empty scans (== probe each worker at least once)
541 * before pausing. Based on actual number of processors, not
542 * actual poolSize, since this better estimates effects of memory
543 * stalls etc on larger machines.
544 */
545 private static final int SCANS_PER_PAUSE =
546 PROBES_PER_PAUSE / Runtime.getRuntime().availableProcessors();
547
548 /**
549 * Politely stall when cannot find a task to run. First check run
550 * status and cancel task if stopping. Currently, pauses are
551 * implemented only as yield, but may someday incorporate advisory
552 * blocking.
553 * @param joinMe if nonnull, a task to cancel if stopping
554 */
555 private final void pauseAwaitingWork(ForkJoinTask<?> joinMe) {
556 if (runState.isAtLeastStopping()) {
557 if (joinMe != null)
558 joinMe.cancel();
559 }
560 else
561 Thread.yield();
562 }
563
564 // Lifecycle methods
565
566 /**
567 * Initializes internal state after construction but before
568 * processing any tasks. If you override this method, you must
569 * invoke super.onStart() at the beginning of the method.
570 * Initialization requires care: Most fields must have legal
571 * default values, to ensure that attempted accesses from other
572 * threads work correctly even before this thread starts
573 * processing tasks.
574 */
575 protected void onStart() {
576 // wait for start signal before allocating queue array
577 eventCount = pool.barrierSync(0);
578 if (queue == null)
579 queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
580 }
581
582 /**
583 * Perform cleanup associated with termination of this worker
584 * thread. If you override this method, you must invoke
585 * super.onTermination at the end of the overridden method.
586 *
587 * @param exception the exception causing this thread to abort due
588 * to an unrecoverable error, or null if completed normally.
589 */
590 protected void onTermination(Throwable exception) {
591 try {
592 clearLocalTasks();
593 ensureInactive();
594 cancelTasks();
595 runState.transitionToTerminated();
596 } finally {
597 pool.workerTerminated(this, exception);
598 }
599 }
600
601 /**
602 * This method is required to be public, but should never be
603 * called explicitly. It performs the main run loop to execute
604 * ForkJoinTasks.
605 */
606 public void run() {
607 try {
608 onStart();
609 mainLoop();
610 onTermination(null);
611 } catch (Throwable ex) {
612 onTermination(ex);
613 }
614 }
615
616 // Methods for running submissions, stolen and/or local tasks
617
618 /**
619 * Main run loop. On each step prefer running a submission
620 * if previously inactive, else prefer stolen task. If a
621 * task was run, also run any other subtasks it pushed;
622 * otherwise inactivate.
623 */
624 private final void mainLoop() {
625 boolean preferSubmission = true;
626 while (runState.isRunning()) {
627 if ((preferSubmission || !runStolenTask()) && !runSubmission())
628 preferSubmission = !preferSubmission && tryInactivate();
629 else {
630 runLocalTasks();
631 preferSubmission = false;
632 }
633 }
634 }
635
636 /**
637 * Runs all tasks on local queue
638 */
639 private final void runLocalTasks() {
640 ForkJoinTask<?> t;
641 while ((t = expectedPopTask()) != null)
642 t.exec();
643 }
644
645 /**
646 * Runs a stolen task if one exists.
647 * @return true if ran a task
648 */
649 private final boolean runStolenTask() {
650 ForkJoinTask<?> t = getStolenTask();
651 if (t != null) {
652 t.exec();
653 return true;
654 }
655 return false;
656 }
657
658 /**
659 * Runs a submission if one exists.
660 * @return true if ran a task
661 */
662 private final boolean runSubmission() {
663 Submission<?> s = getSubmission();
664 if (s != null) {
665 s.exec();
666 return true;
667 }
668 return false;
669 }
670
671 /**
672 * Returns a submission, if one exists; activating first if necessary
673 */
674 private final Submission<?> getSubmission() {
675 while (pool.mayHaveQueuedSubmissions()) {
676 Submission<?> s;
677 if (tryActivate() && (s = pool.pollSubmission()) != null)
678 return s;
679 }
680 return null;
681 }
682
683 /**
684 * Runs one popped task, if available
685 * @return true if ran a task
686 */
687 private final boolean runLocalTask() {
688 ForkJoinTask<?> t = popTask();
689 if (t != null) {
690 t.exec();
691 return true;
692 }
693 return false;
694 }
695
696 /**
697 * Pops or steals a task
698 * @return task, or null if none available
699 */
700 private final ForkJoinTask<?> getLocalOrStolenTask() {
701 ForkJoinTask<?> t = popTask();
702 return t != null? t : getStolenTask();
703 }
704
705 /**
706 * Runs one popped or stolen task, if available
707 * @return true if ran a task
708 */
709 private final boolean runLocalOrStolenTask() {
710 ForkJoinTask<?> t = getLocalOrStolenTask();
711 if (t != null) {
712 t.exec();
713 return true;
714 }
715 return false;
716 }
717
718 /**
719 * Runs tasks until activeCount zero
720 */
721 private final void runUntilQuiescent() {
722 for (;;) {
723 ForkJoinTask<?> t = getLocalOrStolenTask();
724 if (t != null) {
725 ensureActive();
726 t.exec();
727 }
728 else {
729 ensureInactive();
730 if (pool.getActiveThreadCount() == 0) {
731 ensureActive(); // reactivate on exit
732 break;
733 }
734 }
735 }
736 }
737
738 // Stealing tasks
739
740 /**
741 * Computes next value for random victim probe. Scans don't
742 * require a very high quality generator, but also not a crummy
743 * one. Marsaglia xor-shift is cheap and works well.
744 */
745 private static final int xorShift(int r) {
746 r ^= r << 1;
747 r ^= r >>> 3;
748 return r ^ (r << 10);
749 }
750
751 /**
752 * Tries to steal a task from another worker. Starts at a random
753 * index of workers array, and probes workers until finding one
754 * with non-empty queue or finding that all are empty. It
755 * randomly selects the first n-1 probes. If these are empty, it
756 * resorts to a full circular traversal, which is necessary to
757 * accurately set active status by caller.
758 *
759 * This method must be both fast and quiet -- avoiding as much as
760 * possible memory accesses that could disrupt cache sharing etc
761 * other than those needed to check for and take tasks. This
762 * accounts for, among other things, updating random seed in place
763 * without storing it until exit. (Note that we only need to store
764 * it if we found a task; otherwise it doesn't matter if we start
765 * at the same place next time.)
766 *
767 * @return a task, or null if none found
768 */
769 private final ForkJoinTask<?> getStolenTask() {
770 final ForkJoinWorkerThread[] ws = pool.workers;
771 final int mask = ws.length - 1; // must be power of 2 minus 1
772 int probes = -mask; // use random index while negative
773 int r = randomVictimSeed; // extract once to keep scan quiet
774 int idx = r;
775 ForkJoinTask<?> t = null;
776 do {
777 ForkJoinWorkerThread v = ws[mask & idx];
778 r = xorShift(r); // update seed
779 if (v != null && v.base - v.sp < 0) { // apparently nonempty
780 if (tryActivate() && (t = v.deqTask()) != null) {
781 randomVictimSeed = r;
782 ++stealCount;
783 break;
784 }
785 probes = -mask; // restart on contention
786 idx = r;
787 continue;
788 }
789 idx = probes < 0? r : (idx + 1); // n-1 random then circular
790 } while (probes++ <= mask);
791 return t;
792 }
793
794 /**
795 * Tries to steal tasks while waiting for join. Similar to
796 * getStolenTask except intersperses checks for completion and
797 * shutdown.
798 * @return a task, or null if joinMe is completed
799 */
800 private final ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
801 ForkJoinWorkerThread[] ws = pool.workers;
802 int mask = ws.length - 1;
803 int r = randomVictimSeed;
804 int idx = r;
805 int probes = 0;
806 ForkJoinTask<?> t = null;
807 for (;;) {
808 ForkJoinWorkerThread v = ws[idx & mask];
809 r = xorShift(r);
810 if (joinMe.status < 0)
811 break;
812 if (v != null && (t = v.deqTask()) != null) {
813 randomVictimSeed = r;
814 ++stealCount;
815 break;
816 }
817 if ((++probes & (PROBES_PER_PAUSE - 1)) == 0)
818 pauseAwaitingWork(joinMe);
819 idx = probes <= mask? r: (idx + 1); // n-1 random then circular
820 }
821 return t;
822 }
823
824 // Support for core ForkJoinTask methods
825
826 /**
827 * Implements ForkJoinTask.quietlyJoin
828 */
829 final void helpJoinTask(ForkJoinTask<?> joinMe) {
830 ForkJoinTask<?> t;
831 while (joinMe.status >= 0 &&
832 ((t = popTask()) != null ||
833 (t = scanWhileJoining(joinMe)) != null))
834 t.exec();
835 }
836
837 /**
838 * Implements RecursiveAction.forkJoin
839 */
840 final void doForkJoin(RecursiveAction t1, RecursiveAction t2) {
841 if (t1.status >= 0 && t2.status >= 0) {
842 pushTask(t2);
843 if (t1.rawExec()) {
844 if (popIfNext(t2)) {
845 if (t2.rawExec())
846 return;
847 }
848 else {
849 helpJoinTask(t2);
850 if (t2.completedNormally())
851 return;
852 }
853 }
854 }
855 Throwable ex;
856 if ((ex = t1.getException()) != null)
857 t2.cancel();
858 else if ((ex = t2.getException()) != null)
859 t1.cancel();
860 if (ex != null)
861 ForkJoinTask.rethrowException(ex);
862 }
863
864 /**
865 * Timeout version of helpJoin needed for Submission class
866 * Returns false if timed out before completed
867 */
868 final boolean doTimedJoinTask(ForkJoinTask<?> joinMe, long nanos) {
869 long startTime = System.nanoTime();
870 int spins = 0;
871 for (;;) {
872 ForkJoinTask<?> t = popTask();
873 if (joinMe.isDone())
874 return true;
875 else if ((t = getLocalOrStolenTask())!= null)
876 t.exec();
877 else if (runState.isAtLeastStopping())
878 return false;
879 else if (nanos - (System.nanoTime() - startTime) <= 0)
880 return false;
881 }
882 }
883
884 // Cleanup support
885
886 /**
887 * Run or cancel all local tasks on exit from main.
888 */
889 private final void clearLocalTasks() {
890 while (sp - base > 0) {
891 ForkJoinTask<?> t = popTask();
892 if (t != null) {
893 if (runState.isAtLeastStopping())
894 t.setCancelled(); // avoid exceptions due to cancel()
895 else
896 t.exec();
897 }
898 }
899 }
900
901 /**
902 * Removes and cancels all tasks in queue. Can be called from any
903 * thread.
904 */
905 final void cancelTasks() {
906 while (sp - base > 0) {
907 ForkJoinTask<?> t = deqTask();
908 if (t != null) // avoid exceptions due to cancel()
909 t.setCancelled();
910 }
911 }
912
913
914 // Public methods on current thread
915
916 /**
917 * Returns the pool hosting the current task execution.
918 * @return the pool
919 */
920 public static ForkJoinPool getPool() {
921 return ((ForkJoinWorkerThread)(Thread.currentThread())).pool;
922 }
923
924 /**
925 * Returns the index number of the current worker thread in its
926 * pool. The return value is in the range
927 * <tt>0...getPool().getPoolSize()-1</tt>. This method may be
928 * useful for applications that track status or collect results
929 * per-worker rather than per-task.
930 * @return the index number.
931 */
932 public static int getPoolIndex() {
933 return ((ForkJoinWorkerThread)(Thread.currentThread())).poolIndex;
934 }
935
936 /**
937 * Returns an estimate of the number of tasks waiting to be run by
938 * the current worker thread. This value may be useful for
939 * heuristic decisions about whether to fork other tasks.
940 * @return the number of tasks
941 */
942 public static int getLocalQueueSize() {
943 return ((ForkJoinWorkerThread)(Thread.currentThread())).
944 getQueueSize();
945 }
946
947 /**
948 * Returns, but does not remove or execute, the next task locally
949 * queued for execution by the current worker thread. There is no
950 * guarantee that this task will be the next one actually returned
951 * or executed from other polling or execution methods.
952 * @return the next task or null if none
953 */
954 public static ForkJoinTask<?> peekLocalTask() {
955 return ((ForkJoinWorkerThread)(Thread.currentThread())).peekTask();
956 }
957
958 /**
959 * Removes and returns, without executing, the next task queued
960 * for execution in the current worker thread's local queue.
961 * @return the next task to execute, or null if none
962 */
963 public static ForkJoinTask<?> pollLocalTask() {
964 return ((ForkJoinWorkerThread)(Thread.currentThread())).popTask();
965 }
966
967 /**
968 * Execute the next task locally queued by the current worker, if
969 * one is available.
970 * @return true if a task was run; a false return indicates
971 * that no task was available.
972 */
973 public static boolean executeLocalTask() {
974 return ((ForkJoinWorkerThread)(Thread.currentThread())).
975 runLocalTask();
976 }
977
978 /**
979 * Removes and returns, without executing, the next task queued
980 * for execution in the current worker thread's local queue or if
981 * none, a task stolen from another worker, if one is available.
982 * A null return does not necessarily imply that all tasks are
983 * completed, only that there are currently none available.
984 * @return the next task to execute, or null if none
985 */
986 public static ForkJoinTask<?> pollTask() {
987 return ((ForkJoinWorkerThread)(Thread.currentThread())).
988 getLocalOrStolenTask();
989 }
990
991 /**
992 * Helps this program complete by processing a local, stolen or
993 * submitted task, if one is available. This method may be useful
994 * when several tasks are forked, and only one of them must be
995 * joined, as in:
996 * <pre>
997 * while (!t1.isDone() &amp;&amp; !t2.isDone())
998 * ForkJoinWorkerThread.executeTask();
999 * </pre>
1000 *
1001 * @return true if a task was run; a false return indicates
1002 * that no task was available.
1003 */
1004 public static boolean executeTask() {
1005 return ((ForkJoinWorkerThread)(Thread.currentThread())).
1006 runLocalOrStolenTask();
1007 }
1008
1009 /**
1010 * Executes tasks (but not new submissions) until the pool
1011 * isQuiescent.
1012 */
1013 public static void helpQuiesce() {
1014 ((ForkJoinWorkerThread)(Thread.currentThread())).
1015 runUntilQuiescent();
1016 }
1017
1018 /**
1019 * Returns an estimate of how many more locally queued tasks there
1020 * are than idle worker threads that might steal them. This value
1021 * may be useful for heuristic decisions about whether to fork
1022 * other tasks. In many usages of ForkJoinTasks, at steady state,
1023 * each worker should aim to maintain a small constant number (for
1024 * example, 3) stealable tasks, plus more if there are idle
1025 * workers.
1026 *
1027 * <p><b>Sample Usage.</b> Here is a variant version of
1028 * <tt>compute</tt> for the {@link BinaryAsyncAction} Fib example
1029 * using getEstimatedSurplusTaskCount to dynamically determine
1030 * sequential threshold:
1031 *
1032 * <pre>
1033 * protected void compute() {
1034 * Fib f = this;
1035 * while (f.n &gt; 1 &amp;&amp;
1036 * ForkJoinWorkerThread.getEstimatedSurplusTaskCount() &lt;= 3) {
1037 * Fib left = new Fib(f.n - 1);
1038 * Fib right = new Fib(f.n - 2);
1039 * f.linkSubtasks(left, right);
1040 * right.fork(); // fork right
1041 * f = left; // loop on left
1042 * }
1043 * f.result = sequentiallyComputeFibinacci(f.n);
1044 * f.finish();
1045 * }
1046 * }
1047 * </pre>
1048 *
1049 * @return the number of tasks, which is negative if there are
1050 * fewer tasks than idle workers
1051 */
1052 public static int getEstimatedSurplusTaskCount() {
1053 return ((ForkJoinWorkerThread)(Thread.currentThread()))
1054 .estimatedSurplusTaskCount();
1055 }
1056
1057 final int estimatedSurplusTaskCount() {
1058 return (sp - base) - pool.getIdleThreadCount();
1059 }
1060
1061 /**
1062 * Removes and returns, without executing, the given task from the
1063 * queue hosting current execution only if it would be the next
1064 * task executed by the current worker. Among other usages, this
1065 * method may be used to bypass task execution during
1066 * cancellation.
1067 *
1068 * <p><b>Sample Usage,</b> This method may help counterbalance
1069 * effects of dynamic task thresholding. If using a threshold that
1070 * typically generates too many tasks, then this method may be
1071 * used to more cheaply execute excess ones. Here is a dynamically
1072 * tuned version of the {@link RecursiveAction} Applyer example:
1073 *
1074 * <pre>
1075 * class Applyer extends RecursiveAction {
1076 * final double[] array;
1077 * final int lo, hi, seqSize;
1078 * double result;
1079 * Applyer next; // keeps track of right-hand-side tasks
1080 * Applyer(double[] array, int lo, int hi, int seqSize, Applyer next) {
1081 * this.array = array; this.lo = lo; this.hi = hi;
1082 * this.seqSize = seqSize; this.next = next;
1083 * }
1084 *
1085 * double atLeaf(int l, int r) {
1086 * double sum = 0;
1087 * for (int i = l; i &lt; h; ++i) // perform leftmost base step
1088 * sum += array[i] * array[i];
1089 * return sum;
1090 * }
1091 *
1092 * protected void compute() {
1093 * int l = lo;
1094 * int h = hi;
1095 * Applyer right = null;
1096 * while (h - l &gt; 1 &amp;&amp;
1097 * ForkJoinWorkerThread.getEstimatedSurplusTaskCount() &lt;= 3) {
1098 * int mid = (l + h) &gt;&gt;&gt; 1;
1099 * right = new Applyer(array, mid, h, seqSize, right);
1100 * right.fork();
1101 * h = mid;
1102 * }
1103 * double sum = atLeaf(l, h);
1104 * while (right != null &amp;&amp; // direct compute unstolen tasks
1105 * ForkJoinWorkerThread.removeIfNextLocalTask(right)) {
1106 * sum += right.atLeaf(r.lo, r.hi);
1107 * right = right.next;
1108 * }
1109 * while (right != null) { // join remaining right-hand sides
1110 * right.join();
1111 * sum += right.result;
1112 * right = right.next;
1113 * }
1114 * result = sum;
1115 * }
1116 * }
1117 * </pre>
1118 *
1119 * @param task the task
1120 * @return true if removed
1121 */
1122 public static boolean removeIfNextLocalTask(ForkJoinTask<?> task) {
1123 return ((ForkJoinWorkerThread)(Thread.currentThread())).popIfNext(task);
1124 }
1125
1126 // Support for alternate handling of submissions
1127
1128 /**
1129 * Removes and returns the next unexecuted submission to the given
1130 * pool, if one is available. To access a submission from the
1131 * current worker's pool, use <tt>pollSubmission(getPool())</tt>.
1132 * This method may be useful for draining tasks during exception
1133 * recovery and for re-assigning work in systems with multiple
1134 * pools.
1135 * @param pool the pool
1136 * @return the next submission, or null if none
1137 */
1138 public static Future<?> pollSubmission(ForkJoinPool pool) {
1139 return pool.pollSubmission();
1140 }
1141
1142 /**
1143 * If the given argument represents a submission to a ForkJoinPool
1144 * (normally, one returned by <tt>pollSubmission</tt>), returns
1145 * the actual task submitted to the pool. This method may be
1146 * useful for alternate handling of drained submissions..
1147 * @param submission the submission
1148 * @return the underlying task
1149 * @throws IllegalArgumentException if the given future does
1150 * not represent a submission to a pool
1151 */
1152 public static <V> ForkJoinTask<V> getSubmittedTask(Future<V> submission) {
1153 try {
1154 return ((Submission)submission).getSubmittedTask();
1155 } catch (ClassCastException ex) {
1156 throw new IllegalArgumentException();
1157 }
1158 }
1159
1160 /**
1161 * If the argument represents a submission to a ForkJoinPool
1162 * (normally, one returned by <tt>pollSubmission</tt>), causes it
1163 * to be ready with the given value returned upon invocation of
1164 * its <tt>get()</tt> method, regardless of the status of the
1165 * underlying ForkJoinTask. This method may be useful for
1166 * alternate handling of drained submissions..
1167 * @param submission the submission
1168 * @param value the result to be returned by the submission
1169 * @throws IllegalArgumentException if the given future does
1170 * not represent a submission to a pool
1171 */
1172 public static <V> void forceCompletion(Future<V> submission, V value) {
1173 try {
1174 ((Submission)submission).finishTask(value);
1175 } catch (ClassCastException ex) {
1176 throw new IllegalArgumentException();
1177 }
1178 }
1179
1180 /**
1181 * If the argument represents a submission to a ForkJoinPool
1182 * (normally, one returned by <tt>pollSubmission</tt>), causes it
1183 * to be ready with the given exception thrown on invocation of
1184 * its <tt>get()</tt> method, regardless of the status of the
1185 * underlying ForkJoinTask..This method may be useful for
1186 * alternate handling of drained submissions..
1187 * @param submission the submission
1188 * @param exception the exception to be thrown on access
1189 * @throws IllegalArgumentException if the exception is
1190 * not a RuntimeException or Error
1191 * @throws IllegalArgumentException if the given future does
1192 * not represent a submission to a pool
1193 */
1194 public static <V> void forceCompletionExceptionally(Future<V> submission,
1195 Throwable exception) {
1196 if (!(exception instanceof RuntimeException) &&
1197 !(exception instanceof Error))
1198 throw new IllegalArgumentException();
1199 try {
1200 ((Submission)submission).finishTaskExceptionally(exception);
1201 } catch (ClassCastException ex) {
1202 throw new IllegalArgumentException();
1203 }
1204 }
1205
1206 // Per-worker exported random numbers
1207 // Same constants as java.util.Random
1208 final static long JURandomMultiplier = 0x5DEECE66DL;
1209 final static long JURandomAddend = 0xBL;
1210 final static long JURandomMask = (1L << 48) - 1;
1211
1212 private final int nextJURandom(int bits) {
1213 long next = (juRandomSeed * JURandomMultiplier + JURandomAddend) &
1214 JURandomMask;
1215 juRandomSeed = next;
1216 return (int)(next >>> (48 - bits));
1217 }
1218
1219 private final int nextJURandomInt(int n) {
1220 if (n <= 0)
1221 throw new IllegalArgumentException("n must be positive");
1222 int bits = nextJURandom(31);
1223 if ((n & -n) == n)
1224 return (int)((n * (long)bits) >> 31);
1225
1226 for (;;) {
1227 int val = bits % n;
1228 if (bits - val + (n-1) >= 0)
1229 return val;
1230 bits = nextJURandom(31);
1231 }
1232 }
1233
1234 private final long nextJURandomLong() {
1235 return ((long)(nextJURandom(32)) << 32) + nextJURandom(32);
1236 }
1237
1238 private final long nextJURandomLong(long n) {
1239 if (n <= 0)
1240 throw new IllegalArgumentException("n must be positive");
1241 long offset = 0;
1242 while (n >= Integer.MAX_VALUE) { // randomly pick half range
1243 int bits = nextJURandom(2); // 2nd bit for odd vs even split
1244 long half = n >>> 1;
1245 long nextn = ((bits & 2) == 0)? half : n - half;
1246 if ((bits & 1) == 0)
1247 offset += n - nextn;
1248 n = nextn;
1249 }
1250 return offset + nextJURandomInt((int)n);
1251 }
1252
1253 private final double nextJURandomDouble() {
1254 return (((long)(nextJURandom(26)) << 27) + nextJURandom(27))
1255 / (double)(1L << 53);
1256 }
1257
1258 /**
1259 * Returns a random integer using a per-worker random
1260 * number generator with the same properties as
1261 * {@link java.util.Random#nextInt}
1262 * @return the next pseudorandom, uniformly distributed {@code int}
1263 * value from this worker's random number generator's sequence
1264 */
1265 public static int nextRandomInt() {
1266 return ((ForkJoinWorkerThread)(Thread.currentThread())).
1267 nextJURandom(32);
1268 }
1269
1270 /**
1271 * Returns a random integer using a per-worker random
1272 * number generator with the same properties as
1273 * {@link java.util.Random#nextInt(int)}
1274 * @param n the bound on the random number to be returned. Must be
1275 * positive.
1276 * @return the next pseudorandom, uniformly distributed {@code int}
1277 * value between {@code 0} (inclusive) and {@code n} (exclusive)
1278 * from this worker's random number generator's sequence
1279 * @throws IllegalArgumentException if n is not positive
1280 */
1281 public static int nextRandomInt(int n) {
1282 return ((ForkJoinWorkerThread)(Thread.currentThread())).
1283 nextJURandomInt(n);
1284 }
1285
1286 /**
1287 * Returns a random long using a per-worker random
1288 * number generator with the same properties as
1289 * {@link java.util.Random#nextLong}
1290 * @return the next pseudorandom, uniformly distributed {@code long}
1291 * value from this worker's random number generator's sequence
1292 */
1293 public static long nextRandomLong() {
1294 return ((ForkJoinWorkerThread)(Thread.currentThread())).
1295 nextJURandomLong();
1296 }
1297
1298 /**
1299 * Returns a random integer using a per-worker random
1300 * number generator with the same properties as
1301 * {@link java.util.Random#nextInt(int)}
1302 * @param n the bound on the random number to be returned. Must be
1303 * positive.
1304 * @return the next pseudorandom, uniformly distributed {@code int}
1305 * value between {@code 0} (inclusive) and {@code n} (exclusive)
1306 * from this worker's random number generator's sequence
1307 * @throws IllegalArgumentException if n is not positive
1308 */
1309 public static long nextRandomLong(long n) {
1310 return ((ForkJoinWorkerThread)(Thread.currentThread())).
1311 nextJURandomLong(n);
1312 }
1313
1314 /**
1315 * Returns a random double using a per-worker random
1316 * number generator with the same properties as
1317 * {@link java.util.Random#nextDouble}
1318 * @return the next pseudorandom, uniformly distributed {@code double}
1319 * value between {@code 0.0} and {@code 1.0} from this
1320 * worker's random number generator's sequence
1321 */
1322 public static double nextRandomDouble() {
1323 return ((ForkJoinWorkerThread)(Thread.currentThread())).
1324 nextJURandomDouble();
1325 }
1326
1327 // Temporary Unsafe mechanics for preliminary release
1328
1329 static final Unsafe _unsafe;
1330 static final long baseOffset;
1331 static final long spOffset;
1332 static final long qBase;
1333 static final int qShift;
1334 static {
1335 try {
1336 if (ForkJoinWorkerThread.class.getClassLoader() != null) {
1337 Field f = Unsafe.class.getDeclaredField("theUnsafe");
1338 f.setAccessible(true);
1339 _unsafe = (Unsafe)f.get(null);
1340 }
1341 else
1342 _unsafe = Unsafe.getUnsafe();
1343 baseOffset = _unsafe.objectFieldOffset
1344 (ForkJoinWorkerThread.class.getDeclaredField("base"));
1345 spOffset = _unsafe.objectFieldOffset
1346 (ForkJoinWorkerThread.class.getDeclaredField("sp"));
1347 qBase = _unsafe.arrayBaseOffset(ForkJoinTask[].class);
1348 int s = _unsafe.arrayIndexScale(ForkJoinTask[].class);
1349 if ((s & (s-1)) != 0)
1350 throw new Error("data type scale not a power of two");
1351 qShift = 31 - Integer.numberOfLeadingZeros(s);
1352 } catch (Exception e) {
1353 throw new RuntimeException("Could not initialize intrinsics", e);
1354 }
1355 }
1356
1357 }