ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
Revision: 1.35
Committed: Wed Jul 7 19:52:32 2010 UTC (13 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.34: +207 -155 lines
Log Message:
Simplify APIs. See concurrency-interest postings for rationale

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package jsr166y;
8
9 import java.util.concurrent.*;
10
11 import java.util.Random;
12 import java.util.Collection;
13 import java.util.concurrent.locks.LockSupport;
14
15 /**
16 * A thread managed by a {@link ForkJoinPool}. This class is
17 * subclassable solely for the sake of adding functionality -- there
18 * are no overridable methods dealing with scheduling or execution.
19 * However, you can override initialization and termination methods
20 * surrounding the main task processing loop. If you do create such a
21 * subclass, you will also need to supply a custom {@link
22 * ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code
23 * ForkJoinPool}.
24 *
25 * @since 1.7
26 * @author Doug Lea
27 */
28 public class ForkJoinWorkerThread extends Thread {
29 /*
30 * Overview:
31 *
32 * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
33 * ForkJoinTasks. This class includes bookkeeping in support of
34 * worker activation, suspension, and lifecycle control described
35 * in more detail in the internal documentation of class
36 * ForkJoinPool. And as described further below, this class also
37 * includes special-cased support for some ForkJoinTask
38 * methods. But the main mechanics involve work-stealing:
39 *
40 * Work-stealing queues are special forms of Deques that support
41 * only three of the four possible end-operations -- push, pop,
42 * and deq (aka steal), under the further constraints that push
43 * and pop are called only from the owning thread, while deq may
44 * be called from other threads. (If you are unfamiliar with
45 * them, you probably want to read Herlihy and Shavit's book "The
46 * Art of Multiprocessor programming", chapter 16 describing these
47 * in more detail before proceeding.) The main work-stealing
48 * queue design is roughly similar to those in the papers "Dynamic
49 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
50 * (http://research.sun.com/scalable/pubs/index.html) and
51 * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
52 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
53 * The main differences ultimately stem from gc requirements that
54 * we null out taken slots as soon as we can, to maintain as small
55 * a footprint as possible even in programs generating huge
56 * numbers of tasks. To accomplish this, we shift the CAS
57 * arbitrating pop vs deq (steal) from being on the indices
58 * ("base" and "sp") to the slots themselves (mainly via method
59 * "casSlotNull()"). So, both a successful pop and deq mainly
60 * entail a CAS of a slot from non-null to null. Because we rely
61 * on CASes of references, we do not need tag bits on base or sp.
62 * They are simple ints as used in any circular array-based queue
63 * (see for example ArrayDeque). Updates to the indices must
64 * still be ordered in a way that guarantees that sp == base means
65 * the queue is empty, but otherwise may err on the side of
66 * possibly making the queue appear nonempty when a push, pop, or
67 * deq have not fully committed. Note that this means that the deq
68 * operation, considered individually, is not wait-free. One thief
69 * cannot successfully continue until another in-progress one (or,
70 * if previously empty, a push) completes. However, in the
71 * aggregate, we ensure at least probabilistic non-blockingness.
72 * If an attempted steal fails, a thief always chooses a different
73 * random victim target to try next. So, in order for one thief to
74 * progress, it suffices for any in-progress deq or new push on
75 * any empty queue to complete. One reason this works well here is
76 * that apparently-nonempty often means soon-to-be-stealable,
77 * which gives threads a chance to set activation status if
78 * necessary before stealing.
79 *
80 * This approach also enables support for "async mode" where local
81 * task processing is in FIFO, not LIFO order; simply by using a
82 * version of deq rather than pop when locallyFifo is true (as set
83 * by the ForkJoinPool). This allows use in message-passing
84 * frameworks in which tasks are never joined.
85 *
86 * When a worker would otherwise be blocked waiting to join a
87 * task, it first tries a form of linear helping: Each worker
88 * records (in field stolen) the most recent task it stole
89 * from some other worker. Plus, it records (in field joining) the
90 * task it is currently actively joining. Method joinTask uses
91 * these markers to try to find a worker to help (i.e., steal back
92 * a task from and execute it) that could hasten completion of the
93 * actively joined task. In essence, the joiner executes a task
94 * that would be on its own local deque had the to-be-joined task
95 * not been stolen. This may be seen as a conservative variant of
96 * the approach in Wagner & Calder "Leapfrogging: a portable
97 * technique for implementing efficient futures" SIGPLAN Notices,
98 * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
99 * in that: (1) We only maintain dependency links across workers
100 * upon steals, rather than maintain per-task bookkeeping. This
101 * requires a linear scan of workers array to locate stealers,
102 * which isolates cost to when it is needed, rather than adding to
103 * per-task overhead. (2) It is "shallow", ignoring nesting and
104 * potentially cyclic mutual steals. (3) It is intentionally
105 * racy: field joining is updated only while actively joining,
106 * which means that we could miss links in the chain during
107 * long-lived tasks, GC stalls etc. (4) We fall back to
108 * suspending the worker and if necessary replacing it with a
109 * spare (see ForkJoinPool.tryAwaitJoin).
110 *
111 * Efficient implementation of these algorithms currently relies on
112 * an uncomfortable amount of "Unsafe" mechanics. To maintain
113 * correct orderings, reads and writes of variable base require
114 * volatile ordering. Variable sp does not require volatile
115 * writes but still needs store-ordering, which we accomplish by
116 * pre-incrementing sp before filling the slot with an ordered
117 * store. (Pre-incrementing also enables backouts used in
118 * scanWhileJoining.) Because they are protected by volatile base
119 * reads, reads of the queue array and its slots by other threads
120 * do not need volatile load semantics, but writes (in push)
121 * require store order and CASes (in pop and deq) require
122 * (volatile) CAS semantics. (Michael, Saraswat, and Vechev's
123 * algorithm has similar properties, but without support for
124 * nulling slots.) Since these combinations aren't supported
125 * using ordinary volatiles, the only way to accomplish these
126 * efficiently is to use direct Unsafe calls. (Using external
127 * AtomicIntegers and AtomicReferenceArrays for the indices and
128 * array is significantly slower because of memory locality and
129 * indirection effects.)
130 *
131 * Further, performance on most platforms is very sensitive to
132 * placement and sizing of the (resizable) queue array. Even
133 * though these queues don't usually become all that big, the
134 * initial size must be large enough to counteract cache
135 * contention effects across multiple queues (especially in the
136 * presence of GC cardmarking). Also, to improve thread-locality,
137 * queues are initialized after starting. All together, these
138 * low-level implementation choices produce as much as a factor of
139 * 4 performance improvement compared to naive implementations,
140 * and enable the processing of billions of tasks per second,
141 * sometimes at the expense of ugliness.
142 */
143
144 /**
145 * Generator for initial random seeds for random victim
146 * selection. This is used only to create initial seeds. Random
147 * steals use a cheaper xorshift generator per steal attempt. We
148 * expect only rare contention on seedGenerator, so just use a
149 * plain Random.
150 */
151 private static final Random seedGenerator = new Random();
152
153 /**
154 * The timeout value for suspending spares. Spare workers that
155 * remain unsignalled for more than this time may be trimmed
156 * (killed and removed from pool). Since our goal is to avoid
157 * long-term thread buildup, the exact value of timeout does not
158 * matter too much so long as it avoids most false-alarm timeouts
159 * under GC stalls or momentarily high system load.
160 */
161 private static final long SPARE_KEEPALIVE_NANOS =
162 5L * 1000L * 1000L * 1000L; // 5 secs
163
164 /**
165 * Capacity of work-stealing queue array upon initialization.
166 * Must be a power of two. Initial size must be at least 4, but is
167 * padded to minimize cache effects.
168 */
169 private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
170
171 /**
172 * Maximum work-stealing queue array size. Must be less than or
173 * equal to 1 << 28 to ensure lack of index wraparound. (This
174 * is less than usual bounds, because we need leftshift by 3
175 * to be in int range).
176 */
177 private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28;
178
179 /**
180 * The pool this thread works in. Accessed directly by ForkJoinTask.
181 */
182 final ForkJoinPool pool;
183
184 /**
185 * The task most recently stolen from another worker
186 */
187 private volatile ForkJoinTask<?> stolen;
188
189 /**
190 * The task currently being joined, set only when actively
191 * trying to helpStealer.
192 */
193 private volatile ForkJoinTask<?> joining;
194
195 /**
196 * The work-stealing queue array. Size must be a power of two.
197 * Initialized in onStart, to improve memory locality.
198 */
199 private ForkJoinTask<?>[] queue;
200
201 /**
202 * Index (mod queue.length) of least valid queue slot, which is
203 * always the next position to steal from if nonempty.
204 */
205 private volatile int base;
206
207 /**
208 * Index (mod queue.length) of next queue slot to push to or pop
209 * from. It is written only by owner thread, and accessed by other
210 * threads only after reading (volatile) base. Both sp and base
211 * are allowed to wrap around on overflow, but (sp - base) still
212 * estimates size.
213 */
214 private int sp;
215
216 /**
217 * Run state of this worker. In addition to the usual run levels,
218 * tracks if this worker is suspended as a spare, and if it was
219 * killed (trimmed) while suspended. However, "active" status is
220 * maintained separately.
221 */
222 private volatile int runState;
223
224 private static final int TERMINATING = 0x01;
225 private static final int TERMINATED = 0x02;
226 private static final int SUSPENDED = 0x04; // inactive spare
227 private static final int TRIMMED = 0x08; // killed while suspended
228
229 /**
230 * Number of LockSupport.park calls to block this thread for
231 * suspension or event waits. Used for internal instrumention;
232 * currently not exported but included because volatile write upon
233 * park also provides a workaround for a JVM bug.
234 */
235 volatile int parkCount;
236
237 /**
238 * Number of steals, transferred and reset in pool callbacks pool
239 * when idle Accessed directly by pool.
240 */
241 int stealCount;
242
243 /**
244 * Seed for random number generator for choosing steal victims.
245 * Uses Marsaglia xorshift. Must be initialized as nonzero.
246 */
247 private int seed;
248
249 /**
250 * Activity status. When true, this worker is considered active.
251 * Accessed directly by pool. Must be false upon construction.
252 */
253 boolean active;
254
255 /**
256 * True if use local fifo, not default lifo, for local polling.
257 * Shadows value from ForkJoinPool, which resets it if changed
258 * pool-wide.
259 */
260 private final boolean locallyFifo;
261
262 /**
263 * Index of this worker in pool array. Set once by pool before
264 * running, and accessed directly by pool to locate this worker in
265 * its workers array.
266 */
267 int poolIndex;
268
269 /**
270 * The last pool event waited for. Accessed only by pool in
271 * callback methods invoked within this thread.
272 */
273 int lastEventCount;
274
275 /**
276 * Encoded index and event count of next event waiter. Used only
277 * by ForkJoinPool for managing event waiters.
278 */
279 volatile long nextWaiter;
280
281 /**
282 * Creates a ForkJoinWorkerThread operating in the given pool.
283 *
284 * @param pool the pool this thread works in
285 * @throws NullPointerException if pool is null
286 */
287 protected ForkJoinWorkerThread(ForkJoinPool pool) {
288 this.pool = pool;
289 this.locallyFifo = pool.locallyFifo;
290 // To avoid exposing construction details to subclasses,
291 // remaining initialization is in start() and onStart()
292 }
293
294 /**
295 * Performs additional initialization and starts this thread
296 */
297 final void start(int poolIndex, UncaughtExceptionHandler ueh) {
298 this.poolIndex = poolIndex;
299 if (ueh != null)
300 setUncaughtExceptionHandler(ueh);
301 setDaemon(true);
302 start();
303 }
304
305 // Public/protected methods
306
307 /**
308 * Returns the pool hosting this thread.
309 *
310 * @return the pool
311 */
312 public ForkJoinPool getPool() {
313 return pool;
314 }
315
316 /**
317 * Returns the index number of this thread in its pool. The
318 * returned value ranges from zero to the maximum number of
319 * threads (minus one) that have ever been created in the pool.
320 * This method may be useful for applications that track status or
321 * collect results per-worker rather than per-task.
322 *
323 * @return the index number
324 */
325 public int getPoolIndex() {
326 return poolIndex;
327 }
328
329 /**
330 * Initializes internal state after construction but before
331 * processing any tasks. If you override this method, you must
332 * invoke super.onStart() at the beginning of the method.
333 * Initialization requires care: Most fields must have legal
334 * default values, to ensure that attempted accesses from other
335 * threads work correctly even before this thread starts
336 * processing tasks.
337 */
338 protected void onStart() {
339 int rs = seedGenerator.nextInt();
340 seed = rs == 0? 1 : rs; // seed must be nonzero
341
342 // Allocate name string and arrays in this thread
343 String pid = Integer.toString(pool.getPoolNumber());
344 String wid = Integer.toString(poolIndex);
345 setName("ForkJoinPool-" + pid + "-worker-" + wid);
346
347 queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
348 }
349
350 /**
351 * Performs cleanup associated with termination of this worker
352 * thread. If you override this method, you must invoke
353 * {@code super.onTermination} at the end of the overridden method.
354 *
355 * @param exception the exception causing this thread to abort due
356 * to an unrecoverable error, or {@code null} if completed normally
357 */
358 protected void onTermination(Throwable exception) {
359 try {
360 stolen = null;
361 joining = null;
362 cancelTasks();
363 setTerminated();
364 pool.workerTerminated(this);
365 } catch (Throwable ex) { // Shouldn't ever happen
366 if (exception == null) // but if so, at least rethrown
367 exception = ex;
368 } finally {
369 if (exception != null)
370 UNSAFE.throwException(exception);
371 }
372 }
373
374 /**
375 * This method is required to be public, but should never be
376 * called explicitly. It performs the main run loop to execute
377 * ForkJoinTasks.
378 */
379 public void run() {
380 Throwable exception = null;
381 try {
382 onStart();
383 mainLoop();
384 } catch (Throwable ex) {
385 exception = ex;
386 } finally {
387 onTermination(exception);
388 }
389 }
390
391 // helpers for run()
392
393 /**
394 * Find and execute tasks and check status while running
395 */
396 private void mainLoop() {
397 boolean ran = false; // true if ran task in last loop iter
398 boolean prevRan = false; // true if ran on last or previous step
399 ForkJoinPool p = pool;
400 for (;;) {
401 p.preStep(this, prevRan);
402 if (runState != 0)
403 return;
404 ForkJoinTask<?> t; // try to get and run stolen or submitted task
405 if ((t = scan()) != null || (t = pollSubmission()) != null) {
406 t.tryExec();
407 if (base != sp)
408 runLocalTasks();
409 stolen = null;
410 prevRan = ran = true;
411 }
412 else {
413 prevRan = ran;
414 ran = false;
415 }
416 }
417 }
418
419 /**
420 * Runs local tasks until queue is empty or shut down. Call only
421 * while active.
422 */
423 private void runLocalTasks() {
424 while (runState == 0) {
425 ForkJoinTask<?> t = locallyFifo? locallyDeqTask() : popTask();
426 if (t != null)
427 t.tryExec();
428 else if (base == sp)
429 break;
430 }
431 }
432
433 /**
434 * If a submission exists, try to activate and take it
435 *
436 * @return a task, if available
437 */
438 private ForkJoinTask<?> pollSubmission() {
439 ForkJoinPool p = pool;
440 while (p.hasQueuedSubmissions()) {
441 if (active || (active = p.tryIncrementActiveCount())) {
442 ForkJoinTask<?> t = p.pollSubmission();
443 return t != null ? t : scan(); // if missed, rescan
444 }
445 }
446 return null;
447 }
448
449 /*
450 * Intrinsics-based atomic writes for queue slots. These are
451 * basically the same as methods in AtomicObjectArray, but
452 * specialized for (1) ForkJoinTask elements (2) requirement that
453 * nullness and bounds checks have already been performed by
454 * callers and (3) effective offsets are known not to overflow
455 * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't
456 * need corresponding version for reads: plain array reads are OK
457 * because they protected by other volatile reads and are
458 * confirmed by CASes.
459 *
460 * Most uses don't actually call these methods, but instead contain
461 * inlined forms that enable more predictable optimization. We
462 * don't define the version of write used in pushTask at all, but
463 * instead inline there a store-fenced array slot write.
464 */
465
466 /**
467 * CASes slot i of array q from t to null. Caller must ensure q is
468 * non-null and index is in range.
469 */
470 private static final boolean casSlotNull(ForkJoinTask<?>[] q, int i,
471 ForkJoinTask<?> t) {
472 return UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null);
473 }
474
475 /**
476 * Performs a volatile write of the given task at given slot of
477 * array q. Caller must ensure q is non-null and index is in
478 * range. This method is used only during resets and backouts.
479 */
480 private static final void writeSlot(ForkJoinTask<?>[] q, int i,
481 ForkJoinTask<?> t) {
482 UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t);
483 }
484
485 // queue methods
486
487 /**
488 * Pushes a task. Call only from this thread.
489 *
490 * @param t the task. Caller must ensure non-null.
491 */
492 final void pushTask(ForkJoinTask<?> t) {
493 ForkJoinTask<?>[] q = queue;
494 int mask = q.length - 1; // implicit assert q != null
495 int s = sp++; // ok to increment sp before slot write
496 UNSAFE.putOrderedObject(q, ((s & mask) << qShift) + qBase, t);
497 if ((s -= base) == 0)
498 pool.signalWork(); // was empty
499 else if (s == mask)
500 growQueue(); // is full
501 }
502
503 /**
504 * Tries to take a task from the base of the queue, failing if
505 * empty or contended. Note: Specializations of this code appear
506 * in locallyDeqTask and elsewhere.
507 *
508 * @return a task, or null if none or contended
509 */
510 final ForkJoinTask<?> deqTask() {
511 ForkJoinTask<?> t;
512 ForkJoinTask<?>[] q;
513 int b, i;
514 if ((b = base) != sp &&
515 (q = queue) != null && // must read q after b
516 (t = q[i = (q.length - 1) & b]) != null && base == b &&
517 UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) {
518 base = b + 1;
519 return t;
520 }
521 return null;
522 }
523
524 /**
525 * Tries to take a task from the base of own queue. Assumes active
526 * status. Called only by current thread.
527 *
528 * @return a task, or null if none
529 */
530 final ForkJoinTask<?> locallyDeqTask() {
531 ForkJoinTask<?>[] q = queue;
532 if (q != null) {
533 ForkJoinTask<?> t;
534 int b, i;
535 while (sp != (b = base)) {
536 if ((t = q[i = (q.length - 1) & b]) != null && base == b &&
537 UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase,
538 t, null)) {
539 base = b + 1;
540 return t;
541 }
542 }
543 }
544 return null;
545 }
546
547 /**
548 * Returns a popped task, or null if empty. Assumes active status.
549 * Called only by current thread. (Note: a specialization of this
550 * code appears in popWhileJoining.)
551 */
552 final ForkJoinTask<?> popTask() {
553 int s;
554 ForkJoinTask<?>[] q;
555 if (base != (s = sp) && (q = queue) != null) {
556 int i = (q.length - 1) & --s;
557 ForkJoinTask<?> t = q[i];
558 if (t != null && UNSAFE.compareAndSwapObject
559 (q, (i << qShift) + qBase, t, null)) {
560 sp = s;
561 return t;
562 }
563 }
564 return null;
565 }
566
567 /**
568 * Specialized version of popTask to pop only if topmost element
569 * is the given task. Called only by current thread while
570 * active.
571 *
572 * @param t the task. Caller must ensure non-null.
573 */
574 final boolean unpushTask(ForkJoinTask<?> t) {
575 int s;
576 ForkJoinTask<?>[] q;
577 if (base != (s = sp) && (q = queue) != null &&
578 UNSAFE.compareAndSwapObject
579 (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) {
580 sp = s;
581 return true;
582 }
583 return false;
584 }
585
586 /**
587 * Returns next task or null if empty or contended
588 */
589 final ForkJoinTask<?> peekTask() {
590 ForkJoinTask<?>[] q = queue;
591 if (q == null)
592 return null;
593 int mask = q.length - 1;
594 int i = locallyFifo ? base : (sp - 1);
595 return q[i & mask];
596 }
597
598 /**
599 * Doubles queue array size. Transfers elements by emulating
600 * steals (deqs) from old array and placing, oldest first, into
601 * new array.
602 */
603 private void growQueue() {
604 ForkJoinTask<?>[] oldQ = queue;
605 int oldSize = oldQ.length;
606 int newSize = oldSize << 1;
607 if (newSize > MAXIMUM_QUEUE_CAPACITY)
608 throw new RejectedExecutionException("Queue capacity exceeded");
609 ForkJoinTask<?>[] newQ = queue = new ForkJoinTask<?>[newSize];
610
611 int b = base;
612 int bf = b + oldSize;
613 int oldMask = oldSize - 1;
614 int newMask = newSize - 1;
615 do {
616 int oldIndex = b & oldMask;
617 ForkJoinTask<?> t = oldQ[oldIndex];
618 if (t != null && !casSlotNull(oldQ, oldIndex, t))
619 t = null;
620 writeSlot(newQ, b & newMask, t);
621 } while (++b != bf);
622 pool.signalWork();
623 }
624
625 /**
626 * Computes next value for random victim probe in scan(). Scans
627 * don't require a very high quality generator, but also not a
628 * crummy one. Marsaglia xor-shift is cheap and works well enough.
629 * Note: This is manually inlined in scan()
630 */
631 private static final int xorShift(int r) {
632 r ^= r << 13;
633 r ^= r >>> 17;
634 return r ^ (r << 5);
635 }
636
637 /**
638 * Tries to steal a task from another worker. Starts at a random
639 * index of workers array, and probes workers until finding one
640 * with non-empty queue or finding that all are empty. It
641 * randomly selects the first n probes. If these are empty, it
642 * resorts to a circular sweep, which is necessary to accurately
643 * set active status. (The circular sweep uses steps of
644 * approximately half the array size plus 1, to avoid bias
645 * stemming from leftmost packing of the array in ForkJoinPool.)
646 *
647 * This method must be both fast and quiet -- usually avoiding
648 * memory accesses that could disrupt cache sharing etc other than
649 * those needed to check for and take tasks (or to activate if not
650 * already active). This accounts for, among other things,
651 * updating random seed in place without storing it until exit.
652 *
653 * @return a task, or null if none found
654 */
655 private ForkJoinTask<?> scan() {
656 ForkJoinPool p = pool;
657 ForkJoinWorkerThread[] ws; // worker array
658 int n; // upper bound of #workers
659 if ((ws = p.workers) != null && (n = ws.length) > 1) {
660 boolean canSteal = active; // shadow active status
661 int r = seed; // extract seed once
662 int mask = n - 1;
663 int j = -n; // loop counter
664 int k = r; // worker index, random if j < 0
665 for (;;) {
666 ForkJoinWorkerThread v = ws[k & mask];
667 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
668 if (v != null && v.base != v.sp) {
669 if (canSteal || // ensure active status
670 (canSteal = active = p.tryIncrementActiveCount())) {
671 int b = v.base; // inline specialized deqTask
672 ForkJoinTask<?>[] q;
673 if (b != v.sp && (q = v.queue) != null) {
674 ForkJoinTask<?> t;
675 int i = (q.length - 1) & b;
676 long u = (i << qShift) + qBase; // raw offset
677 if ((t = q[i]) != null && v.base == b &&
678 UNSAFE.compareAndSwapObject(q, u, t, null)) {
679 stolen = t;
680 v.base = b + 1;
681 seed = r;
682 ++stealCount;
683 return t;
684 }
685 }
686 }
687 j = -n;
688 k = r; // restart on contention
689 }
690 else if (++j <= 0)
691 k = r;
692 else if (j <= n)
693 k += (n >>> 1) | 1;
694 else
695 break;
696 }
697 }
698 return null;
699 }
700
701 // Run State management
702
703 // status check methods used mainly by ForkJoinPool
704 final boolean isTerminating() { return (runState & TERMINATING) != 0; }
705 final boolean isTerminated() { return (runState & TERMINATED) != 0; }
706 final boolean isSuspended() { return (runState & SUSPENDED) != 0; }
707 final boolean isTrimmed() { return (runState & TRIMMED) != 0; }
708
709 /**
710 * Sets state to TERMINATING, also resuming if suspended.
711 */
712 final void shutdown() {
713 for (;;) {
714 int s = runState;
715 if ((s & SUSPENDED) != 0) { // kill and wakeup if suspended
716 if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
717 (s & ~SUSPENDED) |
718 (TRIMMED|TERMINATING))) {
719 LockSupport.unpark(this);
720 break;
721 }
722 }
723 else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
724 s | TERMINATING))
725 break;
726 }
727 }
728
729 /**
730 * Sets state to TERMINATED. Called only by this thread.
731 */
732 private void setTerminated() {
733 int s;
734 do {} while (!UNSAFE.compareAndSwapInt(this, runStateOffset,
735 s = runState,
736 s | (TERMINATING|TERMINATED)));
737 }
738
739 /**
740 * Instrumented version of park used by ForkJoinPool.awaitEvent
741 */
742 final void doPark() {
743 ++parkCount;
744 LockSupport.park(this);
745 }
746
747 /**
748 * If suspended, tries to set status to unsuspended.
749 * Caller must unpark to actually resume
750 *
751 * @return true if successful
752 */
753 final boolean tryUnsuspend() {
754 int s = runState;
755 if ((s & SUSPENDED) != 0)
756 return UNSAFE.compareAndSwapInt(this, runStateOffset, s,
757 s & ~SUSPENDED);
758 return false;
759 }
760
761 /**
762 * Sets suspended status and blocks as spare until resumed,
763 * shutdown, or timed out.
764 *
765 * @return false if trimmed
766 */
767 final boolean suspendAsSpare() {
768 for (;;) { // set suspended unless terminating
769 int s = runState;
770 if ((s & TERMINATING) != 0) { // must kill
771 if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
772 s | (TRIMMED | TERMINATING)))
773 return false;
774 }
775 else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
776 s | SUSPENDED))
777 break;
778 }
779 boolean timed;
780 long nanos;
781 long startTime;
782 if (poolIndex < pool.parallelism) {
783 timed = false;
784 nanos = 0L;
785 startTime = 0L;
786 }
787 else {
788 timed = true;
789 nanos = SPARE_KEEPALIVE_NANOS;
790 startTime = System.nanoTime();
791 }
792 pool.accumulateStealCount(this);
793 lastEventCount = 0; // reset upon resume
794 interrupted(); // clear/ignore interrupts
795 while ((runState & SUSPENDED) != 0) {
796 ++parkCount;
797 if (!timed)
798 LockSupport.park(this);
799 else if ((nanos -= (System.nanoTime() - startTime)) > 0)
800 LockSupport.parkNanos(this, nanos);
801 else { // try to trim on timeout
802 int s = runState;
803 if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
804 (s & ~SUSPENDED) |
805 (TRIMMED|TERMINATING)))
806 return false;
807 }
808 }
809 return true;
810 }
811
812 // Misc support methods for ForkJoinPool
813
814 /**
815 * Returns an estimate of the number of tasks in the queue. Also
816 * used by ForkJoinTask.
817 */
818 final int getQueueSize() {
819 return -base + sp;
820 }
821
822 /**
823 * Removes and cancels all tasks in queue. Can be called from any
824 * thread.
825 */
826 final void cancelTasks() {
827 while (base != sp) {
828 ForkJoinTask<?> t = deqTask();
829 if (t != null)
830 t.cancelIgnoringExceptions();
831 }
832 }
833
834 /**
835 * Drains tasks to given collection c.
836 *
837 * @return the number of tasks drained
838 */
839 final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
840 int n = 0;
841 while (base != sp) {
842 ForkJoinTask<?> t = deqTask();
843 if (t != null) {
844 c.add(t);
845 ++n;
846 }
847 }
848 return n;
849 }
850
851 // Support methods for ForkJoinTask
852
853 /**
854 * Possibly runs some tasks and/or blocks, until task is done.
855 *
856 * @param joinMe the task to join
857 */
858 final void joinTask(ForkJoinTask<?> joinMe) {
859 ForkJoinTask<?> prevJoining = joining;
860 joining = joinMe;
861 while (joinMe.status >= 0) {
862 int s = sp;
863 if (s == base) {
864 nonlocalJoinTask(joinMe);
865 break;
866 }
867 // process local task
868 ForkJoinTask<?> t;
869 ForkJoinTask<?>[] q = queue;
870 int i = (q.length - 1) & --s;
871 long u = (i << qShift) + qBase; // raw offset
872 if ((t = q[i]) != null &&
873 UNSAFE.compareAndSwapObject(q, u, t, null)) {
874 /*
875 * This recheck (and similarly in nonlocalJoinTask)
876 * handles cases where joinMe is independently
877 * cancelled or forced even though there is other work
878 * available. Back out of the pop by putting t back
879 * into slot before we commit by setting sp.
880 */
881 if (joinMe.status < 0) {
882 UNSAFE.putObjectVolatile(q, u, t);
883 break;
884 }
885 sp = s;
886 t.tryExec();
887 }
888 }
889 joining = prevJoining;
890 }
891
892 /**
893 * Tries to locate and help perform tasks for a stealer of the
894 * given task (or in turn one of its stealers), blocking (via
895 * pool.tryAwaitJoin) upon failure to find work. Traces
896 * stolen->joining links looking for a thread working on
897 * a descendant of the given task and with a non-empty queue to
898 * steal back and execute tasks from. Inhibits mutual steal chains
899 * and scans on outer joins upon nesting to avoid unbounded
900 * growth. Restarts search upon encountering inconsistencies.
901 * Tries to block if two passes agree that there are no remaining
902 * targets.
903 *
904 * @param joinMe the task to join
905 */
906 private void nonlocalJoinTask(ForkJoinTask<?> joinMe) {
907 ForkJoinPool p = pool;
908 int scans = p.parallelism; // give up if too many retries
909 ForkJoinTask<?> bottom = null; // target seen when can't descend
910 restart: while (joinMe.status >= 0) {
911 ForkJoinTask<?> target = null;
912 ForkJoinTask<?> next = joinMe;
913 while (scans >= 0 && next != null) {
914 --scans;
915 target = next;
916 next = null;
917 ForkJoinWorkerThread v = null;
918 ForkJoinWorkerThread[] ws = p.workers;
919 int n = ws.length;
920 for (int j = 0; j < n; ++j) {
921 ForkJoinWorkerThread w = ws[j];
922 if (w != null && w.stolen == target) {
923 v = w;
924 break;
925 }
926 }
927 if (v != null && v != this) {
928 ForkJoinTask<?> prevStolen = stolen;
929 int b;
930 ForkJoinTask<?>[] q;
931 while ((b = v.base) != v.sp && (q = v.queue) != null) {
932 int i = (q.length - 1) & b;
933 long u = (i << qShift) + qBase;
934 ForkJoinTask<?> t = q[i];
935 if (target.status < 0)
936 continue restart;
937 if (t != null && v.base == b &&
938 UNSAFE.compareAndSwapObject(q, u, t, null)) {
939 if (joinMe.status < 0) {
940 UNSAFE.putObjectVolatile(q, u, t);
941 return; // back out
942 }
943 stolen = t;
944 v.base = b + 1;
945 t.tryExec();
946 stolen = prevStolen;
947 }
948 if (joinMe.status < 0)
949 return;
950 }
951 next = v.joining;
952 }
953 if (target.status < 0)
954 continue restart; // inconsistent
955 if (joinMe.status < 0)
956 return;
957 }
958
959 if (bottom != target)
960 bottom = target; // recheck landing spot
961 else if (p.tryAwaitJoin(joinMe) < 0)
962 return; // successfully blocked
963 Thread.yield(); // tame spin in case too many active
964 }
965 }
966
967 /**
968 * Returns an estimate of the number of tasks, offset by a
969 * function of number of idle workers.
970 *
971 * This method provides a cheap heuristic guide for task
972 * partitioning when programmers, frameworks, tools, or languages
973 * have little or no idea about task granularity. In essence by
974 * offering this method, we ask users only about tradeoffs in
975 * overhead vs expected throughput and its variance, rather than
976 * how finely to partition tasks.
977 *
978 * In a steady state strict (tree-structured) computation, each
979 * thread makes available for stealing enough tasks for other
980 * threads to remain active. Inductively, if all threads play by
981 * the same rules, each thread should make available only a
982 * constant number of tasks.
983 *
984 * The minimum useful constant is just 1. But using a value of 1
985 * would require immediate replenishment upon each steal to
986 * maintain enough tasks, which is infeasible. Further,
987 * partitionings/granularities of offered tasks should minimize
988 * steal rates, which in general means that threads nearer the top
989 * of computation tree should generate more than those nearer the
990 * bottom. In perfect steady state, each thread is at
991 * approximately the same level of computation tree. However,
992 * producing extra tasks amortizes the uncertainty of progress and
993 * diffusion assumptions.
994 *
995 * So, users will want to use values larger, but not much larger
996 * than 1 to both smooth over transient shortages and hedge
997 * against uneven progress; as traded off against the cost of
998 * extra task overhead. We leave the user to pick a threshold
999 * value to compare with the results of this call to guide
1000 * decisions, but recommend values such as 3.
1001 *
1002 * When all threads are active, it is on average OK to estimate
1003 * surplus strictly locally. In steady-state, if one thread is
1004 * maintaining say 2 surplus tasks, then so are others. So we can
1005 * just use estimated queue length (although note that (sp - base)
1006 * can be an overestimate because of stealers lagging increments
1007 * of base). However, this strategy alone leads to serious
1008 * mis-estimates in some non-steady-state conditions (ramp-up,
1009 * ramp-down, other stalls). We can detect many of these by
1010 * further considering the number of "idle" threads, that are
1011 * known to have zero queued tasks, so compensate by a factor of
1012 * (#idle/#active) threads.
1013 */
1014 final int getEstimatedSurplusTaskCount() {
1015 return sp - base - pool.idlePerActive();
1016 }
1017
1018 /**
1019 * Gets and removes a local task.
1020 *
1021 * @return a task, if available
1022 */
1023 final ForkJoinTask<?> pollLocalTask() {
1024 while (sp != base) {
1025 if (active || (active = pool.tryIncrementActiveCount()))
1026 return locallyFifo? locallyDeqTask() : popTask();
1027 }
1028 return null;
1029 }
1030
1031 /**
1032 * Gets and removes a local or stolen task.
1033 *
1034 * @return a task, if available
1035 */
1036 final ForkJoinTask<?> pollTask() {
1037 ForkJoinTask<?> t;
1038 return (t = pollLocalTask()) != null ? t : scan();
1039 }
1040
1041 /**
1042 * Runs tasks until {@code pool.isQuiescent()}.
1043 */
1044 final void helpQuiescePool() {
1045 for (;;) {
1046 ForkJoinTask<?> t = pollLocalTask();
1047 if (t != null || (t = scan()) != null) {
1048 t.tryExec();
1049 stolen = null;
1050 }
1051 else {
1052 ForkJoinPool p = pool;
1053 if (active) {
1054 active = false; // inactivate
1055 do {} while (!p.tryDecrementActiveCount());
1056 }
1057 if (p.isQuiescent()) {
1058 active = true; // re-activate
1059 do {} while (!p.tryIncrementActiveCount());
1060 return;
1061 }
1062 }
1063 }
1064 }
1065
1066 // Unsafe mechanics
1067
1068 private static final sun.misc.Unsafe UNSAFE = getUnsafe();
1069 private static final long runStateOffset =
1070 objectFieldOffset("runState", ForkJoinWorkerThread.class);
1071 private static final long qBase =
1072 UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
1073 private static final int qShift;
1074
1075 static {
1076 int s = UNSAFE.arrayIndexScale(ForkJoinTask[].class);
1077 if ((s & (s-1)) != 0)
1078 throw new Error("data type scale not a power of two");
1079 qShift = 31 - Integer.numberOfLeadingZeros(s);
1080 }
1081
1082 private static long objectFieldOffset(String field, Class<?> klazz) {
1083 try {
1084 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1085 } catch (NoSuchFieldException e) {
1086 // Convert Exception to corresponding Error
1087 NoSuchFieldError error = new NoSuchFieldError(field);
1088 error.initCause(e);
1089 throw error;
1090 }
1091 }
1092
1093 /**
1094 * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
1095 * Replace with a simple call to Unsafe.getUnsafe when integrating
1096 * into a jdk.
1097 *
1098 * @return a sun.misc.Unsafe
1099 */
1100 private static sun.misc.Unsafe getUnsafe() {
1101 try {
1102 return sun.misc.Unsafe.getUnsafe();
1103 } catch (SecurityException se) {
1104 try {
1105 return java.security.AccessController.doPrivileged
1106 (new java.security
1107 .PrivilegedExceptionAction<sun.misc.Unsafe>() {
1108 public sun.misc.Unsafe run() throws Exception {
1109 java.lang.reflect.Field f = sun.misc
1110 .Unsafe.class.getDeclaredField("theUnsafe");
1111 f.setAccessible(true);
1112 return (sun.misc.Unsafe) f.get(null);
1113 }});
1114 } catch (java.security.PrivilegedActionException e) {
1115 throw new RuntimeException("Could not initialize intrinsics",
1116 e.getCause());
1117 }
1118 }
1119 }
1120 }