17 |
|
* subclassable solely for the sake of adding functionality -- there |
18 |
|
* are no overridable methods dealing with scheduling or |
19 |
|
* execution. However, you can override initialization and termination |
20 |
< |
* cleanup methods surrounding the main task processing loop. If you |
21 |
< |
* do create such a subclass, you will also need to supply a custom |
20 |
> |
* methods surrounding the main task processing loop. If you do |
21 |
> |
* create such a subclass, you will also need to supply a custom |
22 |
|
* ForkJoinWorkerThreadFactory to use it in a ForkJoinPool. |
23 |
< |
* |
23 |
> |
* |
24 |
|
*/ |
25 |
|
public class ForkJoinWorkerThread extends Thread { |
26 |
|
/* |
44 |
|
* of tasks. To accomplish this, we shift the CAS arbitrating pop |
45 |
|
* vs deq (steal) from being on the indices ("base" and "sp") to |
46 |
|
* the slots themselves (mainly via method "casSlotNull()"). So, |
47 |
< |
* both a successful pop and deq mainly entail CAS'ing a nonnull |
47 |
> |
* both a successful pop and deq mainly entail CAS'ing a non-null |
48 |
|
* slot to null. Because we rely on CASes of references, we do |
49 |
|
* not need tag bits on base or sp. They are simple ints as used |
50 |
|
* in any circular array-based queue (see for example ArrayDeque). |
56 |
|
* considered individually, is not wait-free. One thief cannot |
57 |
|
* successfully continue until another in-progress one (or, if |
58 |
|
* previously empty, a push) completes. However, in the |
59 |
< |
* aggregate, we ensure at least probablistic non-blockingness. If |
59 |
> |
* aggregate, we ensure at least probabilistic non-blockingness. If |
60 |
|
* an attempted steal fails, a thief always chooses a different |
61 |
|
* random victim target to try next. So, in order for one thief to |
62 |
|
* progress, it suffices for any in-progress deq or new push on |
75 |
|
* push) require store order and CASes (in pop and deq) require |
76 |
|
* (volatile) CAS semantics. Since these combinations aren't |
77 |
|
* supported using ordinary volatiles, the only way to accomplish |
78 |
< |
* these effciently is to use direct Unsafe calls. (Using external |
78 |
> |
* these efficiently is to use direct Unsafe calls. (Using external |
79 |
|
* AtomicIntegers and AtomicReferenceArrays for the indices and |
80 |
|
* array is significantly slower because of memory locality and |
81 |
|
* indirection effects.) Further, performance on most platforms is |
199 |
|
long lastEventCount; |
200 |
|
|
201 |
|
/** |
202 |
+ |
* True if use local fifo, not default lifo, for local polling |
203 |
+ |
*/ |
204 |
+ |
private boolean locallyFifo; |
205 |
+ |
|
206 |
+ |
/** |
207 |
|
* Creates a ForkJoinWorkerThread operating in the given pool. |
208 |
|
* @param pool the pool this thread works in |
209 |
|
* @throws NullPointerException if pool is null |
215 |
|
// Remaining initialization is deferred to onStart |
216 |
|
} |
217 |
|
|
218 |
< |
// Public access methods |
218 |
> |
// Public access methods |
219 |
|
|
220 |
|
/** |
221 |
|
* Returns the pool hosting this thread |
237 |
|
return poolIndex; |
238 |
|
} |
239 |
|
|
240 |
+ |
/** |
241 |
+ |
* Establishes local first-in-first-out scheduling mode for forked |
242 |
+ |
* tasks that are never joined. |
243 |
+ |
* @param async if true, use locally FIFO scheduling |
244 |
+ |
*/ |
245 |
+ |
void setAsyncMode(boolean async) { |
246 |
+ |
locallyFifo = async; |
247 |
+ |
} |
248 |
|
|
249 |
|
// Runstate management |
250 |
|
|
336 |
|
private void mainLoop() { |
337 |
|
while (!isShutdown()) { |
338 |
|
ForkJoinTask<?> t = pollTask(); |
339 |
< |
if (t != null || (t = pollSubmission()) != null) |
339 |
> |
if (t != null || (t = pollSubmission()) != null) |
340 |
|
t.quietlyExec(); |
341 |
|
else if (tryInactivate()) |
342 |
|
pool.sync(this); |
385 |
|
// propagate exception to uncaught exception handler |
386 |
|
try { |
387 |
|
do;while (!tryInactivate()); // ensure inactive |
388 |
< |
cancelTasks(); |
388 |
> |
cancelTasks(); |
389 |
|
runState = TERMINATED; |
390 |
|
pool.workerTerminated(this); |
391 |
|
} catch (Throwable ex) { // Shouldn't ever happen |
397 |
|
} |
398 |
|
} |
399 |
|
|
400 |
< |
// Intrinsics-based support for queue operations. |
400 |
> |
// Intrinsics-based support for queue operations. |
401 |
|
|
402 |
|
/** |
403 |
< |
* Add in store-order the given task at given slot of q to |
404 |
< |
* null. Caller must ensure q is nonnull and index is in range. |
403 |
> |
* Adds in store-order the given task at given slot of q to null. |
404 |
> |
* Caller must ensure q is non-null and index is in range. |
405 |
|
*/ |
406 |
|
private static void setSlot(ForkJoinTask<?>[] q, int i, |
407 |
|
ForkJoinTask<?> t){ |
409 |
|
} |
410 |
|
|
411 |
|
/** |
412 |
< |
* CAS given slot of q to null. Caller must ensure q is nonnull |
412 |
> |
* CAS given slot of q to null. Caller must ensure q is non-null |
413 |
|
* and index is in range. |
414 |
|
*/ |
415 |
|
private static boolean casSlotNull(ForkJoinTask<?>[] q, int i, |
428 |
|
|
429 |
|
/** |
430 |
|
* Pushes a task. Called only by current thread. |
431 |
< |
* @param t the task. Caller must ensure nonnull |
431 |
> |
* @param t the task. Caller must ensure non-null. |
432 |
|
*/ |
433 |
|
final void pushTask(ForkJoinTask<?> t) { |
434 |
|
ForkJoinTask<?>[] q = queue; |
447 |
|
* either empty or contended. |
448 |
|
* @return a task, or null if none or contended. |
449 |
|
*/ |
450 |
< |
private ForkJoinTask<?> deqTask() { |
450 |
> |
final ForkJoinTask<?> deqTask() { |
451 |
|
ForkJoinTask<?> t; |
452 |
|
ForkJoinTask<?>[] q; |
453 |
|
int i; |
464 |
|
|
465 |
|
/** |
466 |
|
* Returns a popped task, or null if empty. Ensures active status |
467 |
< |
* if nonnull. Called only by current thread. |
467 |
> |
* if non-null. Called only by current thread. |
468 |
|
*/ |
469 |
|
final ForkJoinTask<?> popTask() { |
470 |
|
int s = sp; |
487 |
|
* Specialized version of popTask to pop only if |
488 |
|
* topmost element is the given task. Called only |
489 |
|
* by current thread while active. |
490 |
< |
* @param t the task. Caller must ensure nonnull |
490 |
> |
* @param t the task. Caller must ensure non-null |
491 |
|
*/ |
492 |
|
final boolean unpushTask(ForkJoinTask<?> t) { |
493 |
|
ForkJoinTask<?>[] q = queue; |
501 |
|
} |
502 |
|
|
503 |
|
/** |
504 |
< |
* Returns next task to pop. |
504 |
> |
* Returns next task. |
505 |
|
*/ |
506 |
|
final ForkJoinTask<?> peekTask() { |
507 |
|
ForkJoinTask<?>[] q = queue; |
508 |
< |
return q == null? null : q[(sp - 1) & (q.length - 1)]; |
508 |
> |
if (q == null) |
509 |
> |
return null; |
510 |
> |
int mask = q.length - 1; |
511 |
> |
int i = locallyFifo? base : (sp - 1); |
512 |
> |
return q[i & mask]; |
513 |
|
} |
514 |
|
|
515 |
|
/** |
587 |
|
} |
588 |
|
|
589 |
|
/** |
590 |
< |
* Pops or steals a task |
590 |
> |
* gets and removes a local or stolen a task |
591 |
|
* @return a task, if available |
592 |
|
*/ |
593 |
|
final ForkJoinTask<?> pollTask() { |
594 |
< |
ForkJoinTask<?> t = popTask(); |
594 |
> |
ForkJoinTask<?> t = locallyFifo? deqTask() : popTask(); |
595 |
|
if (t == null && (t = scan()) != null) |
596 |
|
++stealCount; |
597 |
|
return t; |
598 |
|
} |
599 |
|
|
600 |
|
/** |
601 |
+ |
* gets a local task |
602 |
+ |
* @return a task, if available |
603 |
+ |
*/ |
604 |
+ |
final ForkJoinTask<?> pollLocalTask() { |
605 |
+ |
return locallyFifo? deqTask() : popTask(); |
606 |
+ |
} |
607 |
+ |
|
608 |
+ |
/** |
609 |
|
* Returns a pool submission, if one exists, activating first. |
610 |
|
* @return a submission, if available |
611 |
|
*/ |
632 |
|
} |
633 |
|
|
634 |
|
/** |
635 |
+ |
* Drains tasks to given collection c |
636 |
+ |
* @return the number of tasks drained |
637 |
+ |
*/ |
638 |
+ |
final int drainTasksTo(Collection<ForkJoinTask<?>> c) { |
639 |
+ |
int n = 0; |
640 |
+ |
ForkJoinTask<?> t; |
641 |
+ |
while (base != sp && (t = deqTask()) != null) { |
642 |
+ |
c.add(t); |
643 |
+ |
++n; |
644 |
+ |
} |
645 |
+ |
return n; |
646 |
+ |
} |
647 |
+ |
|
648 |
+ |
/** |
649 |
|
* Get and clear steal count for accumulation by pool. Called |
650 |
|
* only when known to be idle (in pool.sync and termination). |
651 |
|
*/ |
704 |
|
} |
705 |
|
return t; |
706 |
|
} |
707 |
< |
|
707 |
> |
|
708 |
|
/** |
709 |
|
* Runs tasks until pool isQuiescent |
710 |
|
*/ |
711 |
|
final void helpQuiescePool() { |
712 |
|
for (;;) { |
713 |
|
ForkJoinTask<?> t = pollTask(); |
714 |
< |
if (t != null) |
714 |
> |
if (t != null) |
715 |
|
t.quietlyExec(); |
716 |
|
else if (tryInactivate() && pool.isQuiescent()) |
717 |
|
break; |
720 |
|
} |
721 |
|
|
722 |
|
// Temporary Unsafe mechanics for preliminary release |
723 |
+ |
private static Unsafe getUnsafe() throws Throwable { |
724 |
+ |
try { |
725 |
+ |
return Unsafe.getUnsafe(); |
726 |
+ |
} catch (SecurityException se) { |
727 |
+ |
try { |
728 |
+ |
return java.security.AccessController.doPrivileged |
729 |
+ |
(new java.security.PrivilegedExceptionAction<Unsafe>() { |
730 |
+ |
public Unsafe run() throws Exception { |
731 |
+ |
return getUnsafePrivileged(); |
732 |
+ |
}}); |
733 |
+ |
} catch (java.security.PrivilegedActionException e) { |
734 |
+ |
throw e.getCause(); |
735 |
+ |
} |
736 |
+ |
} |
737 |
+ |
} |
738 |
+ |
|
739 |
+ |
private static Unsafe getUnsafePrivileged() |
740 |
+ |
throws NoSuchFieldException, IllegalAccessException { |
741 |
+ |
Field f = Unsafe.class.getDeclaredField("theUnsafe"); |
742 |
+ |
f.setAccessible(true); |
743 |
+ |
return (Unsafe) f.get(null); |
744 |
+ |
} |
745 |
+ |
|
746 |
+ |
private static long fieldOffset(String fieldName) |
747 |
+ |
throws NoSuchFieldException { |
748 |
+ |
return _unsafe.objectFieldOffset |
749 |
+ |
(ForkJoinWorkerThread.class.getDeclaredField(fieldName)); |
750 |
+ |
} |
751 |
|
|
752 |
|
static final Unsafe _unsafe; |
753 |
|
static final long baseOffset; |
757 |
|
static final int qShift; |
758 |
|
static { |
759 |
|
try { |
760 |
< |
if (ForkJoinWorkerThread.class.getClassLoader() != null) { |
761 |
< |
Field f = Unsafe.class.getDeclaredField("theUnsafe"); |
762 |
< |
f.setAccessible(true); |
763 |
< |
_unsafe = (Unsafe)f.get(null); |
697 |
< |
} |
698 |
< |
else |
699 |
< |
_unsafe = Unsafe.getUnsafe(); |
700 |
< |
baseOffset = _unsafe.objectFieldOffset |
701 |
< |
(ForkJoinWorkerThread.class.getDeclaredField("base")); |
702 |
< |
spOffset = _unsafe.objectFieldOffset |
703 |
< |
(ForkJoinWorkerThread.class.getDeclaredField("sp")); |
704 |
< |
runStateOffset = _unsafe.objectFieldOffset |
705 |
< |
(ForkJoinWorkerThread.class.getDeclaredField("runState")); |
760 |
> |
_unsafe = getUnsafe(); |
761 |
> |
baseOffset = fieldOffset("base"); |
762 |
> |
spOffset = fieldOffset("sp"); |
763 |
> |
runStateOffset = fieldOffset("runState"); |
764 |
|
qBase = _unsafe.arrayBaseOffset(ForkJoinTask[].class); |
765 |
|
int s = _unsafe.arrayIndexScale(ForkJoinTask[].class); |
766 |
|
if ((s & (s-1)) != 0) |
767 |
|
throw new Error("data type scale not a power of two"); |
768 |
|
qShift = 31 - Integer.numberOfLeadingZeros(s); |
769 |
< |
} catch (Exception e) { |
769 |
> |
} catch (Throwable e) { |
770 |
|
throw new RuntimeException("Could not initialize intrinsics", e); |
771 |
|
} |
772 |
|
} |