ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
Revision: 1.66
Committed: Wed Jun 1 21:04:30 2011 UTC (12 years, 11 months ago) by jsr166
Branch: MAIN
Changes since 1.65: +2 -2 lines
Log Message:
fix javac 7 warnings

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package jsr166y;
8
9 import java.util.Collection;
10 import java.util.concurrent.RejectedExecutionException;
11
12 /**
13 * A thread managed by a {@link ForkJoinPool}, which executes
14 * {@link ForkJoinTask}s.
15 * This class is subclassable solely for the sake of adding
16 * functionality -- there are no overridable methods dealing with
17 * scheduling or execution. However, you can override initialization
18 * and termination methods surrounding the main task processing loop.
19 * If you do create such a subclass, you will also need to supply a
20 * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
21 * in a {@code ForkJoinPool}.
22 *
23 * @since 1.7
24 * @author Doug Lea
25 */
26 public class ForkJoinWorkerThread extends Thread {
27 /*
28 * Overview:
29 *
30 * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
31 * ForkJoinTasks. This class includes bookkeeping in support of
32 * worker activation, suspension, and lifecycle control described
33 * in more detail in the internal documentation of class
34 * ForkJoinPool. And as described further below, this class also
35 * includes special-cased support for some ForkJoinTask
36 * methods. But the main mechanics involve work-stealing:
37 *
38 * Work-stealing queues are special forms of Deques that support
39 * only three of the four possible end-operations -- push, pop,
40 * and deq (aka steal), under the further constraints that push
41 * and pop are called only from the owning thread, while deq may
42 * be called from other threads. (If you are unfamiliar with
43 * them, you probably want to read Herlihy and Shavit's book "The
44 * Art of Multiprocessor programming", chapter 16 describing these
45 * in more detail before proceeding.) The main work-stealing
46 * queue design is roughly similar to those in the papers "Dynamic
47 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
48 * (http://research.sun.com/scalable/pubs/index.html) and
49 * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
50 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
51 * The main differences ultimately stem from gc requirements that
52 * we null out taken slots as soon as we can, to maintain as small
53 * a footprint as possible even in programs generating huge
54 * numbers of tasks. To accomplish this, we shift the CAS
55 * arbitrating pop vs deq (steal) from being on the indices
56 * ("queueBase" and "queueTop") to the slots themselves (mainly
57 * via method "casSlotNull()"). So, both a successful pop and deq
58 * mainly entail a CAS of a slot from non-null to null. Because
59 * we rely on CASes of references, we do not need tag bits on
60 * queueBase or queueTop. They are simple ints as used in any
61 * circular array-based queue (see for example ArrayDeque).
62 * Updates to the indices must still be ordered in a way that
63 * guarantees that queueTop == queueBase means the queue is empty,
64 * but otherwise may err on the side of possibly making the queue
65 * appear nonempty when a push, pop, or deq have not fully
66 * committed. Note that this means that the deq operation,
67 * considered individually, is not wait-free. One thief cannot
68 * successfully continue until another in-progress one (or, if
69 * previously empty, a push) completes. However, in the
70 * aggregate, we ensure at least probabilistic non-blockingness.
71 * If an attempted steal fails, a thief always chooses a different
72 * random victim target to try next. So, in order for one thief to
73 * progress, it suffices for any in-progress deq or new push on
74 * any empty queue to complete.
75 *
76 * This approach also enables support for "async mode" where local
77 * task processing is in FIFO, not LIFO order; simply by using a
78 * version of deq rather than pop when locallyFifo is true (as set
79 * by the ForkJoinPool). This allows use in message-passing
80 * frameworks in which tasks are never joined. However neither
81 * mode considers affinities, loads, cache localities, etc, so
82 * rarely provide the best possible performance on a given
83 * machine, but portably provide good throughput by averaging over
84 * these factors. (Further, even if we did try to use such
85 * information, we do not usually have a basis for exploiting
86 * it. For example, some sets of tasks profit from cache
87 * affinities, but others are harmed by cache pollution effects.)
88 *
89 * When a worker would otherwise be blocked waiting to join a
90 * task, it first tries a form of linear helping: Each worker
91 * records (in field currentSteal) the most recent task it stole
92 * from some other worker. Plus, it records (in field currentJoin)
93 * the task it is currently actively joining. Method joinTask uses
94 * these markers to try to find a worker to help (i.e., steal back
95 * a task from and execute it) that could hasten completion of the
96 * actively joined task. In essence, the joiner executes a task
97 * that would be on its own local deque had the to-be-joined task
98 * not been stolen. This may be seen as a conservative variant of
99 * the approach in Wagner & Calder "Leapfrogging: a portable
100 * technique for implementing efficient futures" SIGPLAN Notices,
101 * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
102 * in that: (1) We only maintain dependency links across workers
103 * upon steals, rather than use per-task bookkeeping. This may
104 * require a linear scan of workers array to locate stealers, but
105 * usually doesn't because stealers leave hints (that may become
106 * stale/wrong) of where to locate them. This isolates cost to
107 * when it is needed, rather than adding to per-task overhead.
108 * (2) It is "shallow", ignoring nesting and potentially cyclic
109 * mutual steals. (3) It is intentionally racy: field currentJoin
110 * is updated only while actively joining, which means that we
111 * miss links in the chain during long-lived tasks, GC stalls etc
112 * (which is OK since blocking in such cases is usually a good
113 * idea). (4) We bound the number of attempts to find work (see
114 * MAX_HELP) and fall back to suspending the worker and if
115 * necessary replacing it with another.
116 *
117 * Efficient implementation of these algorithms currently relies
118 * on an uncomfortable amount of "Unsafe" mechanics. To maintain
119 * correct orderings, reads and writes of variable queueBase
120 * require volatile ordering. Variable queueTop need not be
121 * volatile because non-local reads always follow those of
122 * queueBase. Similarly, because they are protected by volatile
123 * queueBase reads, reads of the queue array and its slots by
124 * other threads do not need volatile load semantics, but writes
125 * (in push) require store order and CASes (in pop and deq)
126 * require (volatile) CAS semantics. (Michael, Saraswat, and
127 * Vechev's algorithm has similar properties, but without support
128 * for nulling slots.) Since these combinations aren't supported
129 * using ordinary volatiles, the only way to accomplish these
130 * efficiently is to use direct Unsafe calls. (Using external
131 * AtomicIntegers and AtomicReferenceArrays for the indices and
132 * array is significantly slower because of memory locality and
133 * indirection effects.)
134 *
135 * Further, performance on most platforms is very sensitive to
136 * placement and sizing of the (resizable) queue array. Even
137 * though these queues don't usually become all that big, the
138 * initial size must be large enough to counteract cache
139 * contention effects across multiple queues (especially in the
140 * presence of GC cardmarking). Also, to improve thread-locality,
141 * queues are initialized after starting.
142 */
143
144 /**
145 * Mask for pool indices encoded as shorts
146 */
147 private static final int SMASK = 0xffff;
148
149 /**
150 * Capacity of work-stealing queue array upon initialization.
151 * Must be a power of two. Initial size must be at least 4, but is
152 * padded to minimize cache effects.
153 */
154 private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
155
156 /**
157 * Maximum size for queue array. Must be a power of two
158 * less than or equal to 1 << (31 - width of array entry) to
159 * ensure lack of index wraparound, but is capped at a lower
160 * value to help users trap runaway computations.
161 */
162 private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M
163
164 /**
165 * The work-stealing queue array. Size must be a power of two.
166 * Initialized when started (as oposed to when constructed), to
167 * improve memory locality.
168 */
169 ForkJoinTask<?>[] queue;
170
171 /**
172 * The pool this thread works in. Accessed directly by ForkJoinTask.
173 */
174 final ForkJoinPool pool;
175
176 /**
177 * Index (mod queue.length) of next queue slot to push to or pop
178 * from. It is written only by owner thread, and accessed by other
179 * threads only after reading (volatile) queueBase. Both queueTop
180 * and queueBase are allowed to wrap around on overflow, but
181 * (queueTop - queueBase) still estimates size.
182 */
183 int queueTop;
184
185 /**
186 * Index (mod queue.length) of least valid queue slot, which is
187 * always the next position to steal from if nonempty.
188 */
189 volatile int queueBase;
190
191 /**
192 * The index of most recent stealer, used as a hint to avoid
193 * traversal in method helpJoinTask. This is only a hint because a
194 * worker might have had multiple steals and this only holds one
195 * of them (usually the most current). Declared non-volatile,
196 * relying on other prevailing sync to keep reasonably current.
197 */
198 int stealHint;
199
200 /**
201 * Index of this worker in pool array. Set once by pool before
202 * running, and accessed directly by pool to locate this worker in
203 * its workers array.
204 */
205 final int poolIndex;
206
207 /**
208 * Encoded record for pool task waits. Usages are always
209 * surrounded by volatile reads/writes
210 */
211 int nextWait;
212
213 /**
214 * Complement of poolIndex, offset by count of entries of task
215 * waits. Accessed by ForkJoinPool to manage event waiters.
216 */
217 volatile int eventCount;
218
219 /**
220 * Seed for random number generator for choosing steal victims.
221 * Uses Marsaglia xorshift. Must be initialized as nonzero.
222 */
223 int seed;
224
225 /**
226 * Number of steals. Directly accessed (and reset) by pool when
227 * idle.
228 */
229 int stealCount;
230
231 /**
232 * True if this worker should or did terminate
233 */
234 volatile boolean terminate;
235
236 /**
237 * Set to true before LockSupport.park; false on return
238 */
239 volatile boolean parked;
240
241 /**
242 * True if use local fifo, not default lifo, for local polling.
243 * Shadows value from ForkJoinPool.
244 */
245 final boolean locallyFifo;
246
247 /**
248 * The task most recently stolen from another worker (or
249 * submission queue). All uses are surrounded by enough volatile
250 * reads/writes to maintain as non-volatile.
251 */
252 ForkJoinTask<?> currentSteal;
253
254 /**
255 * The task currently being joined, set only when actively trying
256 * to help other stealers in helpJoinTask. All uses are surrounded
257 * by enough volatile reads/writes to maintain as non-volatile.
258 */
259 ForkJoinTask<?> currentJoin;
260
261 /**
262 * Creates a ForkJoinWorkerThread operating in the given pool.
263 *
264 * @param pool the pool this thread works in
265 * @throws NullPointerException if pool is null
266 */
267 protected ForkJoinWorkerThread(ForkJoinPool pool) {
268 super(pool.nextWorkerName());
269 this.pool = pool;
270 int k = pool.registerWorker(this);
271 poolIndex = k;
272 eventCount = ~k & SMASK; // clear wait count
273 locallyFifo = pool.locallyFifo;
274 Thread.UncaughtExceptionHandler ueh = pool.ueh;
275 if (ueh != null)
276 setUncaughtExceptionHandler(ueh);
277 setDaemon(true);
278 }
279
280 // Public methods
281
282 /**
283 * Returns the pool hosting this thread.
284 *
285 * @return the pool
286 */
287 public ForkJoinPool getPool() {
288 return pool;
289 }
290
291 /**
292 * Returns the index number of this thread in its pool. The
293 * returned value ranges from zero to the maximum number of
294 * threads (minus one) that have ever been created in the pool.
295 * This method may be useful for applications that track status or
296 * collect results per-worker rather than per-task.
297 *
298 * @return the index number
299 */
300 public int getPoolIndex() {
301 return poolIndex;
302 }
303
304 // Randomization
305
306 /**
307 * Computes next value for random victim probes and backoffs.
308 * Scans don't require a very high quality generator, but also not
309 * a crummy one. Marsaglia xor-shift is cheap and works well
310 * enough. Note: This is manually inlined in FJP.scan() to avoid
311 * writes inside busy loops.
312 */
313 private int nextSeed() {
314 int r = seed;
315 r ^= r << 13;
316 r ^= r >>> 17;
317 r ^= r << 5;
318 return seed = r;
319 }
320
321 // Run State management
322
323 /**
324 * Initializes internal state after construction but before
325 * processing any tasks. If you override this method, you must
326 * invoke {@code super.onStart()} at the beginning of the method.
327 * Initialization requires care: Most fields must have legal
328 * default values, to ensure that attempted accesses from other
329 * threads work correctly even before this thread starts
330 * processing tasks.
331 */
332 protected void onStart() {
333 queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
334 int r = ForkJoinPool.workerSeedGenerator.nextInt();
335 seed = (r == 0) ? 1 : r; // must be nonzero
336 }
337
338 /**
339 * Performs cleanup associated with termination of this worker
340 * thread. If you override this method, you must invoke
341 * {@code super.onTermination} at the end of the overridden method.
342 *
343 * @param exception the exception causing this thread to abort due
344 * to an unrecoverable error, or {@code null} if completed normally
345 */
346 protected void onTermination(Throwable exception) {
347 try {
348 terminate = true;
349 cancelTasks();
350 pool.deregisterWorker(this, exception);
351 } catch (Throwable ex) { // Shouldn't ever happen
352 if (exception == null) // but if so, at least rethrown
353 exception = ex;
354 } finally {
355 if (exception != null)
356 UNSAFE.throwException(exception);
357 }
358 }
359
360 /**
361 * This method is required to be public, but should never be
362 * called explicitly. It performs the main run loop to execute
363 * {@link ForkJoinTask}s.
364 */
365 public void run() {
366 Throwable exception = null;
367 try {
368 onStart();
369 pool.work(this);
370 } catch (Throwable ex) {
371 exception = ex;
372 } finally {
373 onTermination(exception);
374 }
375 }
376
377 /*
378 * Intrinsics-based atomic writes for queue slots. These are
379 * basically the same as methods in AtomicReferenceArray, but
380 * specialized for (1) ForkJoinTask elements (2) requirement that
381 * nullness and bounds checks have already been performed by
382 * callers and (3) effective offsets are known not to overflow
383 * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't
384 * need corresponding version for reads: plain array reads are OK
385 * because they are protected by other volatile reads and are
386 * confirmed by CASes.
387 *
388 * Most uses don't actually call these methods, but instead
389 * contain inlined forms that enable more predictable
390 * optimization. We don't define the version of write used in
391 * pushTask at all, but instead inline there a store-fenced array
392 * slot write.
393 *
394 * Also in most methods, as a performance (not correctness) issue,
395 * we'd like to encourage compilers not to arbitrarily postpone
396 * setting queueTop after writing slot. Currently there is no
397 * intrinsic for arranging this, but using Unsafe putOrderedInt
398 * may be a preferable strategy on some compilers even though its
399 * main effect is a pre-, not post- fence. To simplify possible
400 * changes, the option is left in comments next to the associated
401 * assignments.
402 */
403
404 /**
405 * CASes slot i of array q from t to null. Caller must ensure q is
406 * non-null and index is in range.
407 */
408 private static final boolean casSlotNull(ForkJoinTask<?>[] q, int i,
409 ForkJoinTask<?> t) {
410 return UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null);
411 }
412
413 /**
414 * Performs a volatile write of the given task at given slot of
415 * array q. Caller must ensure q is non-null and index is in
416 * range. This method is used only during resets and backouts.
417 */
418 private static final void writeSlot(ForkJoinTask<?>[] q, int i,
419 ForkJoinTask<?> t) {
420 UNSAFE.putObjectVolatile(q, (i << ASHIFT) + ABASE, t);
421 }
422
423 // queue methods
424
425 /**
426 * Pushes a task. Call only from this thread.
427 *
428 * @param t the task. Caller must ensure non-null.
429 */
430 final void pushTask(ForkJoinTask<?> t) {
431 ForkJoinTask<?>[] q; int s, m;
432 if ((q = queue) != null) { // ignore if queue removed
433 long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
434 UNSAFE.putOrderedObject(q, u, t);
435 queueTop = s + 1; // or use putOrderedInt
436 if ((s -= queueBase) <= 2)
437 pool.signalWork();
438 else if (s == m)
439 growQueue();
440 }
441 }
442
443 /**
444 * Creates or doubles queue array. Transfers elements by
445 * emulating steals (deqs) from old array and placing, oldest
446 * first, into new array.
447 */
448 private void growQueue() {
449 ForkJoinTask<?>[] oldQ = queue;
450 int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
451 if (size > MAXIMUM_QUEUE_CAPACITY)
452 throw new RejectedExecutionException("Queue capacity exceeded");
453 if (size < INITIAL_QUEUE_CAPACITY)
454 size = INITIAL_QUEUE_CAPACITY;
455 ForkJoinTask<?>[] q = queue = new ForkJoinTask<?>[size];
456 int mask = size - 1;
457 int top = queueTop;
458 int oldMask;
459 if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
460 for (int b = queueBase; b != top; ++b) {
461 long u = ((b & oldMask) << ASHIFT) + ABASE;
462 Object x = UNSAFE.getObjectVolatile(oldQ, u);
463 if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
464 UNSAFE.putObjectVolatile
465 (q, ((b & mask) << ASHIFT) + ABASE, x);
466 }
467 }
468 }
469
470 /**
471 * Tries to take a task from the base of the queue, failing if
472 * empty or contended. Note: Specializations of this code appear
473 * in locallyDeqTask and elsewhere.
474 *
475 * @return a task, or null if none or contended
476 */
477 final ForkJoinTask<?> deqTask() {
478 ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
479 if (queueTop != (b = queueBase) &&
480 (q = queue) != null && // must read q after b
481 (i = (q.length - 1) & b) >= 0 &&
482 (t = q[i]) != null && queueBase == b &&
483 UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null)) {
484 queueBase = b + 1;
485 return t;
486 }
487 return null;
488 }
489
490 /**
491 * Tries to take a task from the base of own queue. Called only
492 * by this thread.
493 *
494 * @return a task, or null if none
495 */
496 final ForkJoinTask<?> locallyDeqTask() {
497 ForkJoinTask<?> t; int m, b, i;
498 ForkJoinTask<?>[] q = queue;
499 if (q != null && (m = q.length - 1) >= 0) {
500 while (queueTop != (b = queueBase)) {
501 if ((t = q[i = m & b]) != null &&
502 queueBase == b &&
503 UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE,
504 t, null)) {
505 queueBase = b + 1;
506 return t;
507 }
508 }
509 }
510 return null;
511 }
512
513 /**
514 * Returns a popped task, or null if empty.
515 * Called only by this thread.
516 */
517 private ForkJoinTask<?> popTask() {
518 int m;
519 ForkJoinTask<?>[] q = queue;
520 if (q != null && (m = q.length - 1) >= 0) {
521 for (int s; (s = queueTop) != queueBase;) {
522 int i = m & --s;
523 long u = (i << ASHIFT) + ABASE; // raw offset
524 ForkJoinTask<?> t = q[i];
525 if (t == null) // lost to stealer
526 break;
527 if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
528 queueTop = s; // or putOrderedInt
529 return t;
530 }
531 }
532 }
533 return null;
534 }
535
536 /**
537 * Specialized version of popTask to pop only if topmost element
538 * is the given task. Called only by this thread.
539 *
540 * @param t the task. Caller must ensure non-null.
541 */
542 final boolean unpushTask(ForkJoinTask<?> t) {
543 ForkJoinTask<?>[] q;
544 int s;
545 if ((q = queue) != null && (s = queueTop) != queueBase &&
546 UNSAFE.compareAndSwapObject
547 (q, (((q.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
548 queueTop = s; // or putOrderedInt
549 return true;
550 }
551 return false;
552 }
553
554 /**
555 * Returns next task, or null if empty or contended.
556 */
557 final ForkJoinTask<?> peekTask() {
558 int m;
559 ForkJoinTask<?>[] q = queue;
560 if (q == null || (m = q.length - 1) < 0)
561 return null;
562 int i = locallyFifo ? queueBase : (queueTop - 1);
563 return q[i & m];
564 }
565
566 // Support methods for ForkJoinPool
567
568 /**
569 * Runs the given task, plus any local tasks until queue is empty
570 */
571 final void execTask(ForkJoinTask<?> t) {
572 currentSteal = t;
573 for (;;) {
574 if (t != null)
575 t.doExec();
576 if (queueTop == queueBase)
577 break;
578 t = locallyFifo ? locallyDeqTask() : popTask();
579 }
580 ++stealCount;
581 currentSteal = null;
582 }
583
584 /**
585 * Removes and cancels all tasks in queue. Can be called from any
586 * thread.
587 */
588 final void cancelTasks() {
589 ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
590 if (cj != null && cj.status >= 0)
591 cj.cancelIgnoringExceptions();
592 ForkJoinTask<?> cs = currentSteal;
593 if (cs != null && cs.status >= 0)
594 cs.cancelIgnoringExceptions();
595 while (queueBase != queueTop) {
596 ForkJoinTask<?> t = deqTask();
597 if (t != null)
598 t.cancelIgnoringExceptions();
599 }
600 }
601
602 /**
603 * Drains tasks to given collection c.
604 *
605 * @return the number of tasks drained
606 */
607 final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
608 int n = 0;
609 while (queueBase != queueTop) {
610 ForkJoinTask<?> t = deqTask();
611 if (t != null) {
612 c.add(t);
613 ++n;
614 }
615 }
616 return n;
617 }
618
619 // Support methods for ForkJoinTask
620
621 /**
622 * Returns an estimate of the number of tasks in the queue.
623 */
624 final int getQueueSize() {
625 return queueTop - queueBase;
626 }
627
628 /**
629 * Gets and removes a local task.
630 *
631 * @return a task, if available
632 */
633 final ForkJoinTask<?> pollLocalTask() {
634 return locallyFifo ? locallyDeqTask() : popTask();
635 }
636
637 /**
638 * Gets and removes a local or stolen task.
639 *
640 * @return a task, if available
641 */
642 final ForkJoinTask<?> pollTask() {
643 ForkJoinWorkerThread[] ws;
644 ForkJoinTask<?> t = pollLocalTask();
645 if (t != null || (ws = pool.workers) == null)
646 return t;
647 int n = ws.length; // cheap version of FJP.scan
648 int steps = n << 1;
649 int r = nextSeed();
650 int i = 0;
651 while (i < steps) {
652 ForkJoinWorkerThread w = ws[(i++ + r) & (n - 1)];
653 if (w != null && w.queueBase != w.queueTop && w.queue != null) {
654 if ((t = w.deqTask()) != null)
655 return t;
656 i = 0;
657 }
658 }
659 return null;
660 }
661
662 /**
663 * The maximum stolen->joining link depth allowed in helpJoinTask,
664 * as well as the maximum number of retries (allowing on average
665 * one staleness retry per level) per attempt to instead try
666 * compensation. Depths for legitimate chains are unbounded, but
667 * we use a fixed constant to avoid (otherwise unchecked) cycles
668 * and bound staleness of traversal parameters at the expense of
669 * sometimes blocking when we could be helping.
670 */
671 private static final int MAX_HELP = 16;
672
673 /**
674 * Possibly runs some tasks and/or blocks, until joinMe is done.
675 *
676 * @param joinMe the task to join
677 * @return completion status on exit
678 */
679 final int joinTask(ForkJoinTask<?> joinMe) {
680 ForkJoinTask<?> prevJoin = currentJoin;
681 currentJoin = joinMe;
682 for (int s, retries = MAX_HELP;;) {
683 if ((s = joinMe.status) < 0) {
684 currentJoin = prevJoin;
685 return s;
686 }
687 if (retries > 0) {
688 if (queueTop != queueBase) {
689 if (!localHelpJoinTask(joinMe))
690 retries = 0; // cannot help
691 }
692 else if (retries == MAX_HELP >>> 1) {
693 --retries; // check uncommon case
694 if (tryDeqAndExec(joinMe) >= 0)
695 Thread.yield(); // for politeness
696 }
697 else
698 retries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1;
699 }
700 else {
701 retries = MAX_HELP; // restart if not done
702 pool.tryAwaitJoin(joinMe);
703 }
704 }
705 }
706
707 /**
708 * If present, pops and executes the given task, or any other
709 * cancelled task
710 *
711 * @return false if any other non-cancelled task exists in local queue
712 */
713 private boolean localHelpJoinTask(ForkJoinTask<?> joinMe) {
714 int s, i; ForkJoinTask<?>[] q; ForkJoinTask<?> t;
715 if ((s = queueTop) != queueBase && (q = queue) != null &&
716 (i = (q.length - 1) & --s) >= 0 &&
717 (t = q[i]) != null) {
718 if (t != joinMe && t.status >= 0)
719 return false;
720 if (UNSAFE.compareAndSwapObject
721 (q, (i << ASHIFT) + ABASE, t, null)) {
722 queueTop = s; // or putOrderedInt
723 t.doExec();
724 }
725 }
726 return true;
727 }
728
729 /**
730 * Tries to locate and execute tasks for a stealer of the given
731 * task, or in turn one of its stealers, Traces
732 * currentSteal->currentJoin links looking for a thread working on
733 * a descendant of the given task and with a non-empty queue to
734 * steal back and execute tasks from. The implementation is very
735 * branchy to cope with potential inconsistencies or loops
736 * encountering chains that are stale, unknown, or of length
737 * greater than MAX_HELP links. All of these cases are dealt with
738 * by just retrying by caller.
739 *
740 * @param joinMe the task to join
741 * @param canSteal true if local queue is empty
742 * @return true if ran a task
743 */
744 private boolean helpJoinTask(ForkJoinTask<?> joinMe) {
745 boolean helped = false;
746 int m = pool.scanGuard & SMASK;
747 ForkJoinWorkerThread[] ws = pool.workers;
748 if (ws != null && ws.length > m && joinMe.status >= 0) {
749 int levels = MAX_HELP; // remaining chain length
750 ForkJoinTask<?> task = joinMe; // base of chain
751 outer:for (ForkJoinWorkerThread thread = this;;) {
752 // Try to find v, the stealer of task, by first using hint
753 ForkJoinWorkerThread v = ws[thread.stealHint & m];
754 if (v == null || v.currentSteal != task) {
755 for (int j = 0; ;) { // search array
756 if ((v = ws[j]) != null && v.currentSteal == task) {
757 thread.stealHint = j;
758 break; // save hint for next time
759 }
760 if (++j > m)
761 break outer; // can't find stealer
762 }
763 }
764 // Try to help v, using specialized form of deqTask
765 for (;;) {
766 ForkJoinTask<?>[] q; int b, i;
767 if (joinMe.status < 0)
768 break outer;
769 if ((b = v.queueBase) == v.queueTop ||
770 (q = v.queue) == null ||
771 (i = (q.length-1) & b) < 0)
772 break; // empty
773 long u = (i << ASHIFT) + ABASE;
774 ForkJoinTask<?> t = q[i];
775 if (task.status < 0)
776 break outer; // stale
777 if (t != null && v.queueBase == b &&
778 UNSAFE.compareAndSwapObject(q, u, t, null)) {
779 v.queueBase = b + 1;
780 v.stealHint = poolIndex;
781 ForkJoinTask<?> ps = currentSteal;
782 currentSteal = t;
783 t.doExec();
784 currentSteal = ps;
785 helped = true;
786 }
787 }
788 // Try to descend to find v's stealer
789 ForkJoinTask<?> next = v.currentJoin;
790 if (--levels > 0 && task.status >= 0 &&
791 next != null && next != task) {
792 task = next;
793 thread = v;
794 }
795 else
796 break; // max levels, stale, dead-end, or cyclic
797 }
798 }
799 return helped;
800 }
801
802 /**
803 * Performs an uncommon case for joinTask: If task t is at base of
804 * some workers queue, steals and executes it.
805 *
806 * @param t the task
807 * @return t's status
808 */
809 private int tryDeqAndExec(ForkJoinTask<?> t) {
810 int m = pool.scanGuard & SMASK;
811 ForkJoinWorkerThread[] ws = pool.workers;
812 if (ws != null && ws.length > m && t.status >= 0) {
813 for (int j = 0; j <= m; ++j) {
814 ForkJoinTask<?>[] q; int b, i;
815 ForkJoinWorkerThread v = ws[j];
816 if (v != null &&
817 (b = v.queueBase) != v.queueTop &&
818 (q = v.queue) != null &&
819 (i = (q.length - 1) & b) >= 0 &&
820 q[i] == t) {
821 long u = (i << ASHIFT) + ABASE;
822 if (v.queueBase == b &&
823 UNSAFE.compareAndSwapObject(q, u, t, null)) {
824 v.queueBase = b + 1;
825 v.stealHint = poolIndex;
826 ForkJoinTask<?> ps = currentSteal;
827 currentSteal = t;
828 t.doExec();
829 currentSteal = ps;
830 }
831 break;
832 }
833 }
834 }
835 return t.status;
836 }
837
838 /**
839 * Implements ForkJoinTask.getSurplusQueuedTaskCount(). Returns
840 * an estimate of the number of tasks, offset by a function of
841 * number of idle workers.
842 *
843 * This method provides a cheap heuristic guide for task
844 * partitioning when programmers, frameworks, tools, or languages
845 * have little or no idea about task granularity. In essence by
846 * offering this method, we ask users only about tradeoffs in
847 * overhead vs expected throughput and its variance, rather than
848 * how finely to partition tasks.
849 *
850 * In a steady state strict (tree-structured) computation, each
851 * thread makes available for stealing enough tasks for other
852 * threads to remain active. Inductively, if all threads play by
853 * the same rules, each thread should make available only a
854 * constant number of tasks.
855 *
856 * The minimum useful constant is just 1. But using a value of 1
857 * would require immediate replenishment upon each steal to
858 * maintain enough tasks, which is infeasible. Further,
859 * partitionings/granularities of offered tasks should minimize
860 * steal rates, which in general means that threads nearer the top
861 * of computation tree should generate more than those nearer the
862 * bottom. In perfect steady state, each thread is at
863 * approximately the same level of computation tree. However,
864 * producing extra tasks amortizes the uncertainty of progress and
865 * diffusion assumptions.
866 *
867 * So, users will want to use values larger, but not much larger
868 * than 1 to both smooth over transient shortages and hedge
869 * against uneven progress; as traded off against the cost of
870 * extra task overhead. We leave the user to pick a threshold
871 * value to compare with the results of this call to guide
872 * decisions, but recommend values such as 3.
873 *
874 * When all threads are active, it is on average OK to estimate
875 * surplus strictly locally. In steady-state, if one thread is
876 * maintaining say 2 surplus tasks, then so are others. So we can
877 * just use estimated queue length (although note that (queueTop -
878 * queueBase) can be an overestimate because of stealers lagging
879 * increments of queueBase). However, this strategy alone leads
880 * to serious mis-estimates in some non-steady-state conditions
881 * (ramp-up, ramp-down, other stalls). We can detect many of these
882 * by further considering the number of "idle" threads, that are
883 * known to have zero queued tasks, so compensate by a factor of
884 * (#idle/#active) threads.
885 */
886 final int getEstimatedSurplusTaskCount() {
887 return queueTop - queueBase - pool.idlePerActive();
888 }
889
890 /**
891 * Runs tasks until {@code pool.isQuiescent()}. We piggyback on
892 * pool's active count ctl maintenance, but rather than blocking
893 * when tasks cannot be found, we rescan until all others cannot
894 * find tasks either. The bracketing by pool quiescerCounts
895 * updates suppresses pool auto-shutdown mechanics that could
896 * otherwise prematurely terminate the pool because all threads
897 * appear to be inactive.
898 */
899 final void helpQuiescePool() {
900 boolean active = true;
901 ForkJoinTask<?> ps = currentSteal; // to restore below
902 ForkJoinPool p = pool;
903 p.addQuiescerCount(1);
904 for (;;) {
905 ForkJoinWorkerThread[] ws = p.workers;
906 ForkJoinWorkerThread v = null;
907 int n;
908 if (queueTop != queueBase)
909 v = this;
910 else if (ws != null && (n = ws.length) > 1) {
911 ForkJoinWorkerThread w;
912 int r = nextSeed(); // cheap version of FJP.scan
913 int steps = n << 1;
914 for (int i = 0; i < steps; ++i) {
915 if ((w = ws[(i + r) & (n - 1)]) != null &&
916 w.queueBase != w.queueTop) {
917 v = w;
918 break;
919 }
920 }
921 }
922 if (v != null) {
923 ForkJoinTask<?> t;
924 if (!active) {
925 active = true;
926 p.addActiveCount(1);
927 }
928 if ((t = (v != this) ? v.deqTask() :
929 locallyFifo ? locallyDeqTask() : popTask()) != null) {
930 currentSteal = t;
931 t.doExec();
932 currentSteal = ps;
933 }
934 }
935 else {
936 if (active) {
937 active = false;
938 p.addActiveCount(-1);
939 }
940 if (p.isQuiescent()) {
941 p.addActiveCount(1);
942 p.addQuiescerCount(-1);
943 break;
944 }
945 }
946 }
947 }
948
949 // Unsafe mechanics
950 private static final sun.misc.Unsafe UNSAFE;
951 private static final long ABASE;
952 private static final int ASHIFT;
953
954 static {
955 int s;
956 try {
957 UNSAFE = getUnsafe();
958 Class<?> a = ForkJoinTask[].class;
959 ABASE = UNSAFE.arrayBaseOffset(a);
960 s = UNSAFE.arrayIndexScale(a);
961 } catch (Exception e) {
962 throw new Error(e);
963 }
964 if ((s & (s-1)) != 0)
965 throw new Error("data type scale not a power of two");
966 ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
967 }
968
969 /**
970 * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
971 * Replace with a simple call to Unsafe.getUnsafe when integrating
972 * into a jdk.
973 *
974 * @return a sun.misc.Unsafe
975 */
976 private static sun.misc.Unsafe getUnsafe() {
977 try {
978 return sun.misc.Unsafe.getUnsafe();
979 } catch (SecurityException se) {
980 try {
981 return java.security.AccessController.doPrivileged
982 (new java.security
983 .PrivilegedExceptionAction<sun.misc.Unsafe>() {
984 public sun.misc.Unsafe run() throws Exception {
985 java.lang.reflect.Field f = sun.misc
986 .Unsafe.class.getDeclaredField("theUnsafe");
987 f.setAccessible(true);
988 return (sun.misc.Unsafe) f.get(null);
989 }});
990 } catch (java.security.PrivilegedActionException e) {
991 throw new RuntimeException("Could not initialize intrinsics",
992 e.getCause());
993 }
994 }
995 }
996 }