ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinWorkerThread.java
Revision: 1.7
Committed: Mon Aug 3 01:18:07 2009 UTC (14 years, 10 months ago) by jsr166
Branch: MAIN
Changes since 1.6: +6 -5 lines
Log Message:
sync with jsr166 package

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8
9 import java.util.Collection;
10
11 /**
12 * A thread managed by a {@link ForkJoinPool}. This class is
13 * subclassable solely for the sake of adding functionality -- there
14 * are no overridable methods dealing with scheduling or execution.
15 * However, you can override initialization and termination methods
16 * surrounding the main task processing loop. If you do create such a
17 * subclass, you will also need to supply a custom {@link
18 * ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code
19 * ForkJoinPool}.
20 *
21 * @since 1.7
22 * @author Doug Lea
23 */
24 public class ForkJoinWorkerThread extends Thread {
25 /*
26 * Algorithm overview:
27 *
28 * 1. Work-Stealing: Work-stealing queues are special forms of
29 * Deques that support only three of the four possible
30 * end-operations -- push, pop, and deq (aka steal), and only do
31 * so under the constraints that push and pop are called only from
32 * the owning thread, while deq may be called from other threads.
33 * (If you are unfamiliar with them, you probably want to read
34 * Herlihy and Shavit's book "The Art of Multiprocessor
35 * programming", chapter 16 describing these in more detail before
36 * proceeding.) The main work-stealing queue design is roughly
37 * similar to "Dynamic Circular Work-Stealing Deque" by David
38 * Chase and Yossi Lev, SPAA 2005
39 * (http://research.sun.com/scalable/pubs/index.html). The main
40 * difference ultimately stems from gc requirements that we null
41 * out taken slots as soon as we can, to maintain as small a
42 * footprint as possible even in programs generating huge numbers
43 * of tasks. To accomplish this, we shift the CAS arbitrating pop
44 * vs deq (steal) from being on the indices ("base" and "sp") to
45 * the slots themselves (mainly via method "casSlotNull()"). So,
46 * both a successful pop and deq mainly entail CAS'ing a non-null
47 * slot to null. Because we rely on CASes of references, we do
48 * not need tag bits on base or sp. They are simple ints as used
49 * in any circular array-based queue (see for example ArrayDeque).
50 * Updates to the indices must still be ordered in a way that
51 * guarantees that (sp - base) > 0 means the queue is empty, but
52 * otherwise may err on the side of possibly making the queue
53 * appear nonempty when a push, pop, or deq have not fully
54 * committed. Note that this means that the deq operation,
55 * considered individually, is not wait-free. One thief cannot
56 * successfully continue until another in-progress one (or, if
57 * previously empty, a push) completes. However, in the
58 * aggregate, we ensure at least probabilistic non-blockingness. If
59 * an attempted steal fails, a thief always chooses a different
60 * random victim target to try next. So, in order for one thief to
61 * progress, it suffices for any in-progress deq or new push on
62 * any empty queue to complete. One reason this works well here is
63 * that apparently-nonempty often means soon-to-be-stealable,
64 * which gives threads a chance to activate if necessary before
65 * stealing (see below).
66 *
67 * This approach also enables support for "async mode" where local
68 * task processing is in FIFO, not LIFO order; simply by using a
69 * version of deq rather than pop when locallyFifo is true (as set
70 * by the ForkJoinPool). This allows use in message-passing
71 * frameworks in which tasks are never joined.
72 *
73 * Efficient implementation of this approach currently relies on
74 * an uncomfortable amount of "Unsafe" mechanics. To maintain
75 * correct orderings, reads and writes of variable base require
76 * volatile ordering. Variable sp does not require volatile write
77 * but needs cheaper store-ordering on writes. Because they are
78 * protected by volatile base reads, reads of the queue array and
79 * its slots do not need volatile load semantics, but writes (in
80 * push) require store order and CASes (in pop and deq) require
81 * (volatile) CAS semantics. Since these combinations aren't
82 * supported using ordinary volatiles, the only way to accomplish
83 * these efficiently is to use direct Unsafe calls. (Using external
84 * AtomicIntegers and AtomicReferenceArrays for the indices and
85 * array is significantly slower because of memory locality and
86 * indirection effects.) Further, performance on most platforms is
87 * very sensitive to placement and sizing of the (resizable) queue
88 * array. Even though these queues don't usually become all that
89 * big, the initial size must be large enough to counteract cache
90 * contention effects across multiple queues (especially in the
91 * presence of GC cardmarking). Also, to improve thread-locality,
92 * queues are currently initialized immediately after the thread
93 * gets the initial signal to start processing tasks. However,
94 * all queue-related methods except pushTask are written in a way
95 * that allows them to instead be lazily allocated and/or disposed
96 * of when empty. All together, these low-level implementation
97 * choices produce as much as a factor of 4 performance
98 * improvement compared to naive implementations, and enable the
99 * processing of billions of tasks per second, sometimes at the
100 * expense of ugliness.
101 *
102 * 2. Run control: The primary run control is based on a global
103 * counter (activeCount) held by the pool. It uses an algorithm
104 * similar to that in Herlihy and Shavit section 17.6 to cause
105 * threads to eventually block when all threads declare they are
106 * inactive. (See variable "scans".) For this to work, threads
107 * must be declared active when executing tasks, and before
108 * stealing a task. They must be inactive before blocking on the
109 * Pool Barrier (awaiting a new submission or other Pool
110 * event). In between, there is some free play which we take
111 * advantage of to avoid contention and rapid flickering of the
112 * global activeCount: If inactive, we activate only if a victim
113 * queue appears to be nonempty (see above). Similarly, a thread
114 * tries to inactivate only after a full scan of other threads.
115 * The net effect is that contention on activeCount is rarely a
116 * measurable performance issue. (There are also a few other cases
117 * where we scan for work rather than retry/block upon
118 * contention.)
119 *
120 * 3. Selection control. We maintain policy of always choosing to
121 * run local tasks rather than stealing, and always trying to
122 * steal tasks before trying to run a new submission. All steals
123 * are currently performed in randomly-chosen deq-order. It may be
124 * worthwhile to bias these with locality / anti-locality
125 * information, but doing this well probably requires more
126 * lower-level information from JVMs than currently provided.
127 */
128
129 /**
130 * Capacity of work-stealing queue array upon initialization.
131 * Must be a power of two. Initial size must be at least 2, but is
132 * padded to minimize cache effects.
133 */
134 private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
135
136 /**
137 * Maximum work-stealing queue array size. Must be less than or
138 * equal to 1 << 28 to ensure lack of index wraparound. (This
139 * is less than usual bounds, because we need leftshift by 3
140 * to be in int range).
141 */
142 private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28;
143
144 /**
145 * The pool this thread works in. Accessed directly by ForkJoinTask.
146 */
147 final ForkJoinPool pool;
148
149 /**
150 * The work-stealing queue array. Size must be a power of two.
151 * Initialized when thread starts, to improve memory locality.
152 */
153 private ForkJoinTask<?>[] queue;
154
155 /**
156 * Index (mod queue.length) of next queue slot to push to or pop
157 * from. It is written only by owner thread, via ordered store.
158 * Both sp and base are allowed to wrap around on overflow, but
159 * (sp - base) still estimates size.
160 */
161 private volatile int sp;
162
163 /**
164 * Index (mod queue.length) of least valid queue slot, which is
165 * always the next position to steal from if nonempty.
166 */
167 private volatile int base;
168
169 /**
170 * Activity status. When true, this worker is considered active.
171 * Must be false upon construction. It must be true when executing
172 * tasks, and BEFORE stealing a task. It must be false before
173 * calling pool.sync.
174 */
175 private boolean active;
176
177 /**
178 * Run state of this worker. Supports simple versions of the usual
179 * shutdown/shutdownNow control.
180 */
181 private volatile int runState;
182
183 /**
184 * Seed for random number generator for choosing steal victims.
185 * Uses Marsaglia xorshift. Must be nonzero upon initialization.
186 */
187 private int seed;
188
189 /**
190 * Number of steals, transferred to pool when idle
191 */
192 private int stealCount;
193
194 /**
195 * Index of this worker in pool array. Set once by pool before
196 * running, and accessed directly by pool during cleanup etc.
197 */
198 int poolIndex;
199
200 /**
201 * The last barrier event waited for. Accessed in pool callback
202 * methods, but only by current thread.
203 */
204 long lastEventCount;
205
206 /**
207 * True if use local fifo, not default lifo, for local polling
208 */
209 private boolean locallyFifo;
210
211 /**
212 * Creates a ForkJoinWorkerThread operating in the given pool.
213 *
214 * @param pool the pool this thread works in
215 * @throws NullPointerException if pool is null
216 */
217 protected ForkJoinWorkerThread(ForkJoinPool pool) {
218 if (pool == null) throw new NullPointerException();
219 this.pool = pool;
220 // Note: poolIndex is set by pool during construction
221 // Remaining initialization is deferred to onStart
222 }
223
224 // Public access methods
225
226 /**
227 * Returns the pool hosting this thread.
228 *
229 * @return the pool
230 */
231 public ForkJoinPool getPool() {
232 return pool;
233 }
234
235 /**
236 * Returns the index number of this thread in its pool. The
237 * returned value ranges from zero to the maximum number of
238 * threads (minus one) that have ever been created in the pool.
239 * This method may be useful for applications that track status or
240 * collect results per-worker rather than per-task.
241 *
242 * @return the index number
243 */
244 public int getPoolIndex() {
245 return poolIndex;
246 }
247
248 /**
249 * Establishes local first-in-first-out scheduling mode for forked
250 * tasks that are never joined.
251 *
252 * @param async if true, use locally FIFO scheduling
253 */
254 void setAsyncMode(boolean async) {
255 locallyFifo = async;
256 }
257
258 // Runstate management
259
260 // Runstate values. Order matters
261 private static final int RUNNING = 0;
262 private static final int SHUTDOWN = 1;
263 private static final int TERMINATING = 2;
264 private static final int TERMINATED = 3;
265
266 final boolean isShutdown() { return runState >= SHUTDOWN; }
267 final boolean isTerminating() { return runState >= TERMINATING; }
268 final boolean isTerminated() { return runState == TERMINATED; }
269 final boolean shutdown() { return transitionRunStateTo(SHUTDOWN); }
270 final boolean shutdownNow() { return transitionRunStateTo(TERMINATING); }
271
272 /**
273 * Transitions to at least the given state.
274 *
275 * @return {@code true} if not already at least at given state
276 */
277 private boolean transitionRunStateTo(int state) {
278 for (;;) {
279 int s = runState;
280 if (s >= state)
281 return false;
282 if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, state))
283 return true;
284 }
285 }
286
287 /**
288 * Tries to set status to active; fails on contention.
289 */
290 private boolean tryActivate() {
291 if (!active) {
292 if (!pool.tryIncrementActiveCount())
293 return false;
294 active = true;
295 }
296 return true;
297 }
298
299 /**
300 * Tries to set status to inactive; fails on contention.
301 */
302 private boolean tryInactivate() {
303 if (active) {
304 if (!pool.tryDecrementActiveCount())
305 return false;
306 active = false;
307 }
308 return true;
309 }
310
311 /**
312 * Computes next value for random victim probe. Scans don't
313 * require a very high quality generator, but also not a crummy
314 * one. Marsaglia xor-shift is cheap and works well.
315 */
316 private static int xorShift(int r) {
317 r ^= (r << 13);
318 r ^= (r >>> 17);
319 return r ^ (r << 5);
320 }
321
322 // Lifecycle methods
323
324 /**
325 * This method is required to be public, but should never be
326 * called explicitly. It performs the main run loop to execute
327 * ForkJoinTasks.
328 */
329 public void run() {
330 Throwable exception = null;
331 try {
332 onStart();
333 pool.sync(this); // await first pool event
334 mainLoop();
335 } catch (Throwable ex) {
336 exception = ex;
337 } finally {
338 onTermination(exception);
339 }
340 }
341
342 /**
343 * Executes tasks until shut down.
344 */
345 private void mainLoop() {
346 while (!isShutdown()) {
347 ForkJoinTask<?> t = pollTask();
348 if (t != null || (t = pollSubmission()) != null)
349 t.quietlyExec();
350 else if (tryInactivate())
351 pool.sync(this);
352 }
353 }
354
355 /**
356 * Initializes internal state after construction but before
357 * processing any tasks. If you override this method, you must
358 * invoke super.onStart() at the beginning of the method.
359 * Initialization requires care: Most fields must have legal
360 * default values, to ensure that attempted accesses from other
361 * threads work correctly even before this thread starts
362 * processing tasks.
363 */
364 protected void onStart() {
365 // Allocate while starting to improve chances of thread-local
366 // isolation
367 queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
368 // Initial value of seed need not be especially random but
369 // should differ across workers and must be nonzero
370 int p = poolIndex + 1;
371 seed = p + (p << 8) + (p << 16) + (p << 24); // spread bits
372 }
373
374 /**
375 * Performs cleanup associated with termination of this worker
376 * thread. If you override this method, you must invoke
377 * {@code super.onTermination} at the end of the overridden method.
378 *
379 * @param exception the exception causing this thread to abort due
380 * to an unrecoverable error, or {@code null} if completed normally
381 */
382 protected void onTermination(Throwable exception) {
383 // Execute remaining local tasks unless aborting or terminating
384 while (exception == null && !pool.isTerminating() && base != sp) {
385 try {
386 ForkJoinTask<?> t = popTask();
387 if (t != null)
388 t.quietlyExec();
389 } catch (Throwable ex) {
390 exception = ex;
391 }
392 }
393 // Cancel other tasks, transition status, notify pool, and
394 // propagate exception to uncaught exception handler
395 try {
396 do {} while (!tryInactivate()); // ensure inactive
397 cancelTasks();
398 runState = TERMINATED;
399 pool.workerTerminated(this);
400 } catch (Throwable ex) { // Shouldn't ever happen
401 if (exception == null) // but if so, at least rethrown
402 exception = ex;
403 } finally {
404 if (exception != null)
405 ForkJoinTask.rethrowException(exception);
406 }
407 }
408
409 // Intrinsics-based support for queue operations.
410
411 /**
412 * Adds in store-order the given task at given slot of q to null.
413 * Caller must ensure q is non-null and index is in range.
414 */
415 private static void setSlot(ForkJoinTask<?>[] q, int i,
416 ForkJoinTask<?> t) {
417 UNSAFE.putOrderedObject(q, (i << qShift) + qBase, t);
418 }
419
420 /**
421 * CAS given slot of q to null. Caller must ensure q is non-null
422 * and index is in range.
423 */
424 private static boolean casSlotNull(ForkJoinTask<?>[] q, int i,
425 ForkJoinTask<?> t) {
426 return UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null);
427 }
428
429 /**
430 * Sets sp in store-order.
431 */
432 private void storeSp(int s) {
433 UNSAFE.putOrderedInt(this, spOffset, s);
434 }
435
436 // Main queue methods
437
438 /**
439 * Pushes a task. Called only by current thread.
440 *
441 * @param t the task. Caller must ensure non-null.
442 */
443 final void pushTask(ForkJoinTask<?> t) {
444 ForkJoinTask<?>[] q = queue;
445 int mask = q.length - 1;
446 int s = sp;
447 setSlot(q, s & mask, t);
448 storeSp(++s);
449 if ((s -= base) == 1)
450 pool.signalWork();
451 else if (s >= mask)
452 growQueue();
453 }
454
455 /**
456 * Tries to take a task from the base of the queue, failing if
457 * either empty or contended.
458 *
459 * @return a task, or null if none or contended
460 */
461 final ForkJoinTask<?> deqTask() {
462 ForkJoinTask<?> t;
463 ForkJoinTask<?>[] q;
464 int i;
465 int b;
466 if (sp != (b = base) &&
467 (q = queue) != null && // must read q after b
468 (t = q[i = (q.length - 1) & b]) != null &&
469 casSlotNull(q, i, t)) {
470 base = b + 1;
471 return t;
472 }
473 return null;
474 }
475
476 /**
477 * Tries to take a task from the base of own queue, activating if
478 * necessary, failing only if empty. Called only by current thread.
479 *
480 * @return a task, or null if none
481 */
482 final ForkJoinTask<?> locallyDeqTask() {
483 int b;
484 while (sp != (b = base)) {
485 if (tryActivate()) {
486 ForkJoinTask<?>[] q = queue;
487 int i = (q.length - 1) & b;
488 ForkJoinTask<?> t = q[i];
489 if (t != null && casSlotNull(q, i, t)) {
490 base = b + 1;
491 return t;
492 }
493 }
494 }
495 return null;
496 }
497
498 /**
499 * Returns a popped task, or null if empty. Ensures active status
500 * if non-null. Called only by current thread.
501 */
502 final ForkJoinTask<?> popTask() {
503 int s = sp;
504 while (s != base) {
505 if (tryActivate()) {
506 ForkJoinTask<?>[] q = queue;
507 int mask = q.length - 1;
508 int i = (s - 1) & mask;
509 ForkJoinTask<?> t = q[i];
510 if (t == null || !casSlotNull(q, i, t))
511 break;
512 storeSp(s - 1);
513 return t;
514 }
515 }
516 return null;
517 }
518
519 /**
520 * Specialized version of popTask to pop only if
521 * topmost element is the given task. Called only
522 * by current thread while active.
523 *
524 * @param t the task. Caller must ensure non-null.
525 */
526 final boolean unpushTask(ForkJoinTask<?> t) {
527 ForkJoinTask<?>[] q = queue;
528 int mask = q.length - 1;
529 int s = sp - 1;
530 if (casSlotNull(q, s & mask, t)) {
531 storeSp(s);
532 return true;
533 }
534 return false;
535 }
536
537 /**
538 * Returns next task or null if empty or contended
539 */
540 final ForkJoinTask<?> peekTask() {
541 ForkJoinTask<?>[] q = queue;
542 if (q == null)
543 return null;
544 int mask = q.length - 1;
545 int i = locallyFifo ? base : (sp - 1);
546 return q[i & mask];
547 }
548
549 /**
550 * Doubles queue array size. Transfers elements by emulating
551 * steals (deqs) from old array and placing, oldest first, into
552 * new array.
553 */
554 private void growQueue() {
555 ForkJoinTask<?>[] oldQ = queue;
556 int oldSize = oldQ.length;
557 int newSize = oldSize << 1;
558 if (newSize > MAXIMUM_QUEUE_CAPACITY)
559 throw new RejectedExecutionException("Queue capacity exceeded");
560 ForkJoinTask<?>[] newQ = queue = new ForkJoinTask<?>[newSize];
561
562 int b = base;
563 int bf = b + oldSize;
564 int oldMask = oldSize - 1;
565 int newMask = newSize - 1;
566 do {
567 int oldIndex = b & oldMask;
568 ForkJoinTask<?> t = oldQ[oldIndex];
569 if (t != null && !casSlotNull(oldQ, oldIndex, t))
570 t = null;
571 setSlot(newQ, b & newMask, t);
572 } while (++b != bf);
573 pool.signalWork();
574 }
575
576 /**
577 * Tries to steal a task from another worker. Starts at a random
578 * index of workers array, and probes workers until finding one
579 * with non-empty queue or finding that all are empty. It
580 * randomly selects the first n probes. If these are empty, it
581 * resorts to a full circular traversal, which is necessary to
582 * accurately set active status by caller. Also restarts if pool
583 * events occurred since last scan, which forces refresh of
584 * workers array, in case barrier was associated with resize.
585 *
586 * This method must be both fast and quiet -- usually avoiding
587 * memory accesses that could disrupt cache sharing etc other than
588 * those needed to check for and take tasks. This accounts for,
589 * among other things, updating random seed in place without
590 * storing it until exit.
591 *
592 * @return a task, or null if none found
593 */
594 private ForkJoinTask<?> scan() {
595 ForkJoinTask<?> t = null;
596 int r = seed; // extract once to keep scan quiet
597 ForkJoinWorkerThread[] ws; // refreshed on outer loop
598 int mask; // must be power 2 minus 1 and > 0
599 outer:do {
600 if ((ws = pool.workers) != null && (mask = ws.length - 1) > 0) {
601 int idx = r;
602 int probes = ~mask; // use random index while negative
603 for (;;) {
604 r = xorShift(r); // update random seed
605 ForkJoinWorkerThread v = ws[mask & idx];
606 if (v == null || v.sp == v.base) {
607 if (probes <= mask)
608 idx = (probes++ < 0) ? r : (idx + 1);
609 else
610 break;
611 }
612 else if (!tryActivate() || (t = v.deqTask()) == null)
613 continue outer; // restart on contention
614 else
615 break outer;
616 }
617 }
618 } while (pool.hasNewSyncEvent(this)); // retry on pool events
619 seed = r;
620 return t;
621 }
622
623 /**
624 * Gets and removes a local or stolen task.
625 *
626 * @return a task, if available
627 */
628 final ForkJoinTask<?> pollTask() {
629 ForkJoinTask<?> t = locallyFifo ? locallyDeqTask() : popTask();
630 if (t == null && (t = scan()) != null)
631 ++stealCount;
632 return t;
633 }
634
635 /**
636 * Gets a local task.
637 *
638 * @return a task, if available
639 */
640 final ForkJoinTask<?> pollLocalTask() {
641 return locallyFifo ? locallyDeqTask() : popTask();
642 }
643
644 /**
645 * Returns a pool submission, if one exists, activating first.
646 *
647 * @return a submission, if available
648 */
649 private ForkJoinTask<?> pollSubmission() {
650 ForkJoinPool p = pool;
651 while (p.hasQueuedSubmissions()) {
652 ForkJoinTask<?> t;
653 if (tryActivate() && (t = p.pollSubmission()) != null)
654 return t;
655 }
656 return null;
657 }
658
659 // Methods accessed only by Pool
660
661 /**
662 * Removes and cancels all tasks in queue. Can be called from any
663 * thread.
664 */
665 final void cancelTasks() {
666 ForkJoinTask<?> t;
667 while (base != sp && (t = deqTask()) != null)
668 t.cancelIgnoringExceptions();
669 }
670
671 /**
672 * Drains tasks to given collection c.
673 *
674 * @return the number of tasks drained
675 */
676 final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
677 int n = 0;
678 ForkJoinTask<?> t;
679 while (base != sp && (t = deqTask()) != null) {
680 c.add(t);
681 ++n;
682 }
683 return n;
684 }
685
686 /**
687 * Gets and clears steal count for accumulation by pool. Called
688 * only when known to be idle (in pool.sync and termination).
689 */
690 final int getAndClearStealCount() {
691 int sc = stealCount;
692 stealCount = 0;
693 return sc;
694 }
695
696 /**
697 * Returns {@code true} if at least one worker in the given array
698 * appears to have at least one queued task.
699 *
700 * @param ws array of workers
701 */
702 static boolean hasQueuedTasks(ForkJoinWorkerThread[] ws) {
703 if (ws != null) {
704 int len = ws.length;
705 for (int j = 0; j < 2; ++j) { // need two passes for clean sweep
706 for (int i = 0; i < len; ++i) {
707 ForkJoinWorkerThread w = ws[i];
708 if (w != null && w.sp != w.base)
709 return true;
710 }
711 }
712 }
713 return false;
714 }
715
716 // Support methods for ForkJoinTask
717
718 /**
719 * Returns an estimate of the number of tasks in the queue.
720 */
721 final int getQueueSize() {
722 // suppress momentarily negative values
723 return Math.max(0, sp - base);
724 }
725
726 /**
727 * Returns an estimate of the number of tasks, offset by a
728 * function of number of idle workers.
729 */
730 final int getEstimatedSurplusTaskCount() {
731 // The halving approximates weighting idle vs non-idle workers
732 return (sp - base) - (pool.getIdleThreadCount() >>> 1);
733 }
734
735 /**
736 * Scans, returning early if joinMe done.
737 */
738 final ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
739 ForkJoinTask<?> t = pollTask();
740 if (t != null && joinMe.status < 0 && sp == base) {
741 pushTask(t); // unsteal if done and this task would be stealable
742 t = null;
743 }
744 return t;
745 }
746
747 /**
748 * Runs tasks until {@code pool.isQuiescent()}.
749 */
750 final void helpQuiescePool() {
751 for (;;) {
752 ForkJoinTask<?> t = pollTask();
753 if (t != null)
754 t.quietlyExec();
755 else if (tryInactivate() && pool.isQuiescent())
756 break;
757 }
758 do {} while (!tryActivate()); // re-activate on exit
759 }
760
761 // Unsafe mechanics
762
763 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
764 private static final long spOffset =
765 objectFieldOffset("sp", ForkJoinWorkerThread.class);
766 private static final long runStateOffset =
767 objectFieldOffset("runState", ForkJoinWorkerThread.class);
768 private static final long qBase;
769 private static final int qShift;
770
771 static {
772 qBase = UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
773 int s = UNSAFE.arrayIndexScale(ForkJoinTask[].class);
774 if ((s & (s-1)) != 0)
775 throw new Error("data type scale not a power of two");
776 qShift = 31 - Integer.numberOfLeadingZeros(s);
777 }
778
779 private static long objectFieldOffset(String field, Class<?> klazz) {
780 try {
781 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
782 } catch (NoSuchFieldException e) {
783 // Convert Exception to corresponding Error
784 NoSuchFieldError error = new NoSuchFieldError(field);
785 error.initCause(e);
786 throw error;
787 }
788 }
789 }