6 |
|
|
7 |
|
package jsr166y; |
8 |
|
|
9 |
– |
import java.util.concurrent.*; |
10 |
– |
|
9 |
|
import java.util.Random; |
10 |
|
import java.util.Collection; |
11 |
|
import java.util.concurrent.locks.LockSupport; |
12 |
+ |
import java.util.concurrent.RejectedExecutionException; |
13 |
|
|
14 |
|
/** |
15 |
|
* A thread managed by a {@link ForkJoinPool}. This class is |
171 |
|
|
172 |
|
/** |
173 |
|
* Maximum work-stealing queue array size. Must be less than or |
174 |
< |
* equal to 1 << 28 to ensure lack of index wraparound. (This |
175 |
< |
* is less than usual bounds, because we need leftshift by 3 |
176 |
< |
* to be in int range). |
174 |
> |
* equal to 1 << (31 - width of array entry) to ensure lack of |
175 |
> |
* index wraparound. The value is set in the static block |
176 |
> |
* at the end of this file after obtaining width. |
177 |
|
*/ |
178 |
< |
private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28; |
178 |
> |
private static final int MAXIMUM_QUEUE_CAPACITY; |
179 |
|
|
180 |
|
/** |
181 |
|
* The pool this thread works in. Accessed directly by ForkJoinTask. |
229 |
|
private static final int TRIMMED = 0x08; // killed while suspended |
230 |
|
|
231 |
|
/** |
232 |
< |
* Number of steals, transferred and reset in pool callbacks pool |
233 |
< |
* when idle Accessed directly by pool. |
232 |
> |
* Number of steals. Directly accessed (and reset) by |
233 |
> |
* pool.tryAccumulateStealCount when idle. |
234 |
|
*/ |
235 |
|
int stealCount; |
236 |
|
|
285 |
|
|
286 |
|
/** |
287 |
|
* The task currently being joined, set only when actively trying |
288 |
< |
* to helpStealer. Written only by current thread, but read by |
289 |
< |
* others. |
288 |
> |
* to help other stealers in helpJoinTask. Written only by this |
289 |
> |
* thread, but read by others. |
290 |
|
*/ |
291 |
|
private volatile ForkJoinTask<?> currentJoin; |
292 |
|
|
293 |
|
/** |
294 |
|
* The task most recently stolen from another worker (or |
295 |
< |
* submission queue). Written only by current thread, but read by |
295 |
> |
* submission queue). Written only by this thread, but read by |
296 |
|
* others. |
297 |
|
*/ |
298 |
|
private volatile ForkJoinTask<?> currentSteal; |
312 |
|
} |
313 |
|
|
314 |
|
/** |
315 |
< |
* Performs additional initialization and starts this thread |
315 |
> |
* Performs additional initialization and starts this thread. |
316 |
|
*/ |
317 |
|
final void start(int poolIndex, UncaughtExceptionHandler ueh) { |
318 |
|
this.poolIndex = poolIndex; |
348 |
|
/** |
349 |
|
* Initializes internal state after construction but before |
350 |
|
* processing any tasks. If you override this method, you must |
351 |
< |
* invoke super.onStart() at the beginning of the method. |
351 |
> |
* invoke @code{super.onStart()} at the beginning of the method. |
352 |
|
* Initialization requires care: Most fields must have legal |
353 |
|
* default values, to ensure that attempted accesses from other |
354 |
|
* threads work correctly even before this thread starts |
415 |
|
// helpers for run() |
416 |
|
|
417 |
|
/** |
418 |
< |
* Find and execute tasks and check status while running |
418 |
> |
* Finds and executes tasks, and checks status while running. |
419 |
|
*/ |
420 |
|
private void mainLoop() { |
421 |
|
boolean ran = false; // true if ran a task on last step |
429 |
|
} |
430 |
|
|
431 |
|
/** |
432 |
< |
* Try to steal a task and execute it |
432 |
> |
* Tries to steal a task and execute it. |
433 |
|
* |
434 |
|
* @return true if ran a task |
435 |
|
*/ |
446 |
|
} |
447 |
|
|
448 |
|
/** |
449 |
< |
* If a submission exists, try to activate and run it; |
449 |
> |
* If a submission exists, try to activate and run it. |
450 |
|
* |
451 |
|
* @return true if ran a task |
452 |
|
*/ |
453 |
|
private boolean tryExecSubmission() { |
454 |
|
ForkJoinPool p = pool; |
455 |
+ |
// This loop is needed in case attempt to activate fails, in |
456 |
+ |
// which case we only retry if there still appears to be a |
457 |
+ |
// submission. |
458 |
|
while (p.hasQueuedSubmissions()) { |
459 |
|
ForkJoinTask<?> t; int a; |
460 |
|
if (active || // inline p.tryIncrementActiveCount |
479 |
|
*/ |
480 |
|
private void execLocalTasks() { |
481 |
|
while (runState == 0) { |
482 |
< |
ForkJoinTask<?> t = locallyFifo? locallyDeqTask() : popTask(); |
482 |
> |
ForkJoinTask<?> t = locallyFifo ? locallyDeqTask() : popTask(); |
483 |
|
if (t != null) |
484 |
|
t.quietlyExec(); |
485 |
|
else if (sp == base) |
489 |
|
|
490 |
|
/* |
491 |
|
* Intrinsics-based atomic writes for queue slots. These are |
492 |
< |
* basically the same as methods in AtomicObjectArray, but |
492 |
> |
* basically the same as methods in AtomicReferenceArray, but |
493 |
|
* specialized for (1) ForkJoinTask elements (2) requirement that |
494 |
|
* nullness and bounds checks have already been performed by |
495 |
|
* callers and (3) effective offsets are known not to overflow |
496 |
|
* from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't |
497 |
|
* need corresponding version for reads: plain array reads are OK |
498 |
< |
* because they protected by other volatile reads and are |
498 |
> |
* because they are protected by other volatile reads and are |
499 |
|
* confirmed by CASes. |
500 |
|
* |
501 |
|
* Most uses don't actually call these methods, but instead contain |
519 |
|
* range. This method is used only during resets and backouts. |
520 |
|
*/ |
521 |
|
private static final void writeSlot(ForkJoinTask<?>[] q, int i, |
522 |
< |
ForkJoinTask<?> t) { |
522 |
> |
ForkJoinTask<?> t) { |
523 |
|
UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t); |
524 |
|
} |
525 |
|
|
564 |
|
|
565 |
|
/** |
566 |
|
* Tries to take a task from the base of own queue. Assumes active |
567 |
< |
* status. Called only by current thread. |
567 |
> |
* status. Called only by this thread. |
568 |
|
* |
569 |
|
* @return a task, or null if none |
570 |
|
*/ |
587 |
|
|
588 |
|
/** |
589 |
|
* Returns a popped task, or null if empty. Assumes active status. |
590 |
< |
* Called only by current thread. |
590 |
> |
* Called only by this thread. |
591 |
|
*/ |
592 |
|
private ForkJoinTask<?> popTask() { |
593 |
|
ForkJoinTask<?>[] q = queue; |
611 |
|
|
612 |
|
/** |
613 |
|
* Specialized version of popTask to pop only if topmost element |
614 |
< |
* is the given task. Called only by current thread while |
613 |
< |
* active. |
614 |
> |
* is the given task. Called only by this thread while active. |
615 |
|
* |
616 |
|
* @param t the task. Caller must ensure non-null. |
617 |
|
*/ |
629 |
|
} |
630 |
|
|
631 |
|
/** |
632 |
< |
* Returns next task or null if empty or contended |
632 |
> |
* Returns next task, or null if empty or contended. |
633 |
|
*/ |
634 |
|
final ForkJoinTask<?> peekTask() { |
635 |
|
ForkJoinTask<?>[] q = queue; |
671 |
|
* Computes next value for random victim probe in scan(). Scans |
672 |
|
* don't require a very high quality generator, but also not a |
673 |
|
* crummy one. Marsaglia xor-shift is cheap and works well enough. |
674 |
< |
* Note: This is manually inlined in scan() |
674 |
> |
* Note: This is manually inlined in scan(). |
675 |
|
*/ |
676 |
|
private static final int xorShift(int r) { |
677 |
|
r ^= r << 13; |
749 |
|
// Run State management |
750 |
|
|
751 |
|
// status check methods used mainly by ForkJoinPool |
752 |
< |
final boolean isRunning() { return runState == 0; } |
753 |
< |
final boolean isTerminating() { return (runState & TERMINATING) != 0; } |
754 |
< |
final boolean isTerminated() { return (runState & TERMINATED) != 0; } |
755 |
< |
final boolean isSuspended() { return (runState & SUSPENDED) != 0; } |
756 |
< |
final boolean isTrimmed() { return (runState & TRIMMED) != 0; } |
752 |
> |
final boolean isRunning() { return runState == 0; } |
753 |
> |
final boolean isTerminated() { return (runState & TERMINATED) != 0; } |
754 |
> |
final boolean isSuspended() { return (runState & SUSPENDED) != 0; } |
755 |
> |
final boolean isTrimmed() { return (runState & TRIMMED) != 0; } |
756 |
> |
|
757 |
> |
final boolean isTerminating() { |
758 |
> |
if ((runState & TERMINATING) != 0) |
759 |
> |
return true; |
760 |
> |
if (pool.isAtLeastTerminating()) { // propagate pool state |
761 |
> |
shutdown(); |
762 |
> |
return true; |
763 |
> |
} |
764 |
> |
return false; |
765 |
> |
} |
766 |
|
|
767 |
|
/** |
768 |
|
* Sets state to TERMINATING. Does NOT unpark or interrupt |
786 |
|
} |
787 |
|
|
788 |
|
/** |
789 |
< |
* Sets state to TERMINATED. Called only by onTermination() |
789 |
> |
* Sets state to TERMINATED. Called only by onTermination(). |
790 |
|
*/ |
791 |
|
private void setTerminated() { |
792 |
|
int s; |
907 |
|
if (active || |
908 |
|
(active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset, |
909 |
|
a = p.runState, a + 1))) |
910 |
< |
return locallyFifo? locallyDeqTask() : popTask(); |
910 |
> |
return locallyFifo ? locallyDeqTask() : popTask(); |
911 |
|
} |
912 |
|
return null; |
913 |
|
} |
931 |
|
* Possibly runs some tasks and/or blocks, until task is done. |
932 |
|
* |
933 |
|
* @param joinMe the task to join |
934 |
+ |
* @param timed true if use timed wait |
935 |
+ |
* @param nanos wait time if timed |
936 |
|
*/ |
937 |
< |
final void joinTask(ForkJoinTask<?> joinMe) { |
937 |
> |
final void joinTask(ForkJoinTask<?> joinMe, boolean timed, long nanos) { |
938 |
|
// currentJoin only written by this thread; only need ordered store |
939 |
|
ForkJoinTask<?> prevJoin = currentJoin; |
940 |
|
UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe); |
941 |
< |
if (sp != base) |
942 |
< |
localHelpJoinTask(joinMe); |
943 |
< |
if (joinMe.status >= 0) |
944 |
< |
pool.awaitJoin(joinMe, this); |
941 |
> |
if (isTerminating()) // cancel if shutting down |
942 |
> |
joinMe.cancelIgnoringExceptions(); |
943 |
> |
else |
944 |
> |
pool.awaitJoin(joinMe, this, timed, nanos); |
945 |
|
UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin); |
946 |
|
} |
947 |
|
|
948 |
|
/** |
949 |
|
* Run tasks in local queue until given task is done. |
950 |
+ |
* Not currently used because it complicates semantics. |
951 |
|
* |
952 |
|
* @param joinMe the task to join |
953 |
|
*/ |
980 |
|
} |
981 |
|
|
982 |
|
/** |
983 |
< |
* Unless terminating, tries to locate and help perform tasks for |
984 |
< |
* a stealer of the given task, or in turn one of its stealers. |
985 |
< |
* Traces currentSteal->currentJoin links looking for a thread |
986 |
< |
* working on a descendant of the given task and with a non-empty |
987 |
< |
* queue to steal back and execute tasks from. |
983 |
> |
* Tries to locate and help perform tasks for a stealer of the |
984 |
> |
* given task, or in turn one of its stealers. Traces |
985 |
> |
* currentSteal->currentJoin links looking for a thread working on |
986 |
> |
* a descendant of the given task and with a non-empty queue to |
987 |
> |
* steal back and execute tasks from. |
988 |
|
* |
989 |
|
* The implementation is very branchy to cope with potential |
990 |
|
* inconsistencies or loops encountering chains that are stale, |
1000 |
|
int n; |
1001 |
|
if (joinMe.status < 0) // already done |
1002 |
|
return; |
990 |
– |
if ((runState & TERMINATING) != 0) { // cancel if shutting down |
991 |
– |
joinMe.cancelIgnoringExceptions(); |
992 |
– |
return; |
993 |
– |
} |
1003 |
|
if ((ws = pool.workers) == null || (n = ws.length) <= 1) |
1004 |
|
return; // need at least 2 workers |
1005 |
|
|
1012 |
|
for (int j = 0; ; ++j) { // search array |
1013 |
|
if (j < n) { |
1014 |
|
ForkJoinTask<?> vs; |
1015 |
< |
if ((v = ws[j]) != null && |
1015 |
> |
if ((v = ws[j]) != null && v != this && |
1016 |
|
(vs = v.currentSteal) != null) { |
1017 |
|
if (joinMe.status < 0 || task.status < 0) |
1018 |
|
return; // stale or done |
1064 |
|
} |
1065 |
|
|
1066 |
|
/** |
1067 |
+ |
* Implements ForkJoinTask.getSurplusQueuedTaskCount(). |
1068 |
|
* Returns an estimate of the number of tasks, offset by a |
1069 |
|
* function of number of idle workers. |
1070 |
|
* |
1167 |
|
if ((s & (s-1)) != 0) |
1168 |
|
throw new Error("data type scale not a power of two"); |
1169 |
|
qShift = 31 - Integer.numberOfLeadingZeros(s); |
1170 |
+ |
MAXIMUM_QUEUE_CAPACITY = 1 << (31 - qShift); |
1171 |
|
} |
1172 |
|
|
1173 |
|
private static long objectFieldOffset(String field, Class<?> klazz) { |