60 |
|
* Runnable}- or {@code Callable}- based activities as well. However, |
61 |
|
* tasks that are already executing in a pool should normally |
62 |
|
* <em>NOT</em> use these pool execution methods, but instead use the |
63 |
< |
* within-computation forms listed in the table. |
63 |
> |
* within-computation forms listed in the table. |
64 |
|
* |
65 |
|
* <table BORDER CELLPADDING=3 CELLSPACING=1> |
66 |
|
* <tr> |
84 |
|
* <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td> |
85 |
|
* </tr> |
86 |
|
* </table> |
87 |
< |
* |
87 |
> |
* |
88 |
|
* <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is |
89 |
|
* used for all parallel task execution in a program or subsystem. |
90 |
|
* Otherwise, use would not usually outweigh the construction and |
171 |
|
* ForkJoinWorkerThread.joinTask) interleave these options until |
172 |
|
* successful. Creating a new spare always succeeds, but also |
173 |
|
* increases application footprint, so we try to avoid it, within |
174 |
< |
* reason. |
174 |
> |
* reason. |
175 |
|
* |
176 |
|
* The ManagedBlocker extension API can't use option (1) so uses a |
177 |
|
* special version of (2) in method awaitBlocker. |
539 |
|
final void incrementRunningCount() { |
540 |
|
int c; |
541 |
|
do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset, |
542 |
< |
c = workerCounts, |
542 |
> |
c = workerCounts, |
543 |
|
c + ONE_RUNNING)); |
544 |
|
} |
545 |
|
|
689 |
|
* parallelism maintenance |
690 |
|
*/ |
691 |
|
private void ensureEnoughWorkers() { |
692 |
< |
for (;;) { |
692 |
> |
while ((runState & TERMINATING) == 0) { |
693 |
|
int pc = parallelism; |
694 |
|
int wc = workerCounts; |
695 |
|
int rc = wc & RUNNING_COUNT_MASK; |
696 |
|
int tc = wc >>> TOTAL_COUNT_SHIFT; |
697 |
|
if (tc < pc) { |
698 |
< |
if (runState == TERMINATING || |
699 |
< |
(UNSAFE.compareAndSwapInt |
700 |
< |
(this, workerCountsOffset, |
701 |
< |
wc, wc + (ONE_RUNNING|ONE_TOTAL)) && |
702 |
< |
addWorker() == null)) |
698 |
> |
if (UNSAFE.compareAndSwapInt |
699 |
> |
(this, workerCountsOffset, |
700 |
> |
wc, wc + (ONE_RUNNING|ONE_TOTAL)) && |
701 |
> |
addWorker() == null) |
702 |
|
break; |
703 |
|
} |
704 |
< |
else if (tc > pc && rc < pc && |
704 |
> |
else if (tc > pc && rc < pc && |
705 |
|
tc > (runState & ACTIVE_COUNT_MASK)) { |
706 |
|
ForkJoinWorkerThread spare = null; |
707 |
|
ForkJoinWorkerThread[] ws = workers; |
708 |
|
int nws = ws.length; |
709 |
< |
for (int i = 0; i < nws; ++i) { |
709 |
> |
for (int i = 0; i < nws; ++i) { |
710 |
|
ForkJoinWorkerThread w = ws[i]; |
711 |
|
if (w != null && w.isSuspended()) { |
712 |
< |
if ((workerCounts & RUNNING_COUNT_MASK) > pc || |
714 |
< |
runState == TERMINATING) |
712 |
> |
if ((workerCounts & RUNNING_COUNT_MASK) > pc) |
713 |
|
return; |
714 |
|
if (w.tryResumeSpare()) |
715 |
|
incrementRunningCount(); |
790 |
|
*/ |
791 |
|
private void signalEvent() { |
792 |
|
int c; |
793 |
< |
do {} while (!UNSAFE.compareAndSwapInt(this, eventCountOffset, |
793 |
> |
do {} while (!UNSAFE.compareAndSwapInt(this, eventCountOffset, |
794 |
|
c = eventCount, c+1)); |
795 |
|
releaseWaiters(); |
796 |
|
} |
917 |
|
* |
918 |
|
* We allow blocking if: |
919 |
|
* |
920 |
< |
* 1. There would still be at least as many running threads as |
920 |
> |
* 1. There would still be at least as many running threads as |
921 |
|
* parallelism level if this thread blocks. |
922 |
|
* |
923 |
|
* 2. A spare is resumed to replace this worker. We tolerate |
927 |
|
* preStep(). |
928 |
|
* |
929 |
|
* 3. After #spares repeated checks, there are no fewer than #spare |
930 |
< |
* threads not running. We allow this slack to avoid hysteresis |
931 |
< |
* and as a hedge against lag/uncertainty of running count |
930 |
> |
* threads not running. We allow this slack to avoid hysteresis |
931 |
> |
* and as a hedge against lag/uncertainty of running count |
932 |
|
* estimates when signalling or unblocking stalls. |
933 |
|
* |
934 |
|
* 4. All existing workers are busy (as rechecked via repeated |
935 |
|
* retries by caller) and a new spare is created. |
936 |
< |
* |
936 |
> |
* |
937 |
|
* If none of the above hold, we try to escape out by |
938 |
|
* re-incrementing count and returning to caller, which can retry |
939 |
|
* later. |
946 |
|
* none of the blocking checks hold |
947 |
|
*/ |
948 |
|
final boolean tryAwaitJoin(ForkJoinTask<?> joinMe, int retries) { |
949 |
< |
if (joinMe.status < 0) // precheck to prime loop |
949 |
> |
if (joinMe.status < 0) // precheck for cancellation |
950 |
> |
return false; |
951 |
> |
if ((runState & TERMINATING) != 0) { // shutting down |
952 |
> |
joinMe.cancelIgnoringExceptions(); |
953 |
|
return false; |
954 |
+ |
} |
955 |
+ |
|
956 |
|
int pc = parallelism; |
957 |
|
boolean running = true; // false when running count decremented |
958 |
|
outer:for (;;) { |
960 |
|
int rc = wc & RUNNING_COUNT_MASK; |
961 |
|
int tc = wc >>> TOTAL_COUNT_SHIFT; |
962 |
|
if (running) { // replace with spare or decrement count |
963 |
< |
if (rc <= pc && tc > pc && |
963 |
> |
if (rc <= pc && tc > pc && |
964 |
|
(retries > 0 || tc > (runState & ACTIVE_COUNT_MASK))) { |
965 |
|
ForkJoinWorkerThread[] ws = workers; |
966 |
|
int nws = ws.length; |
982 |
|
} |
983 |
|
if (retries < 0 || // < 0 means replacement check only |
984 |
|
rc == 0 || joinMe.status < 0 || workerCounts != wc || |
985 |
< |
!UNSAFE.compareAndSwapInt(this, workerCountsOffset, |
985 |
> |
!UNSAFE.compareAndSwapInt(this, workerCountsOffset, |
986 |
|
wc, wc - ONE_RUNNING)) |
987 |
|
return false; // done or inconsistent or contended |
988 |
|
running = false; |
996 |
|
if (retries > sc) { |
997 |
|
if (rc > 0 && rc >= pc - sc) // allow slack |
998 |
|
break; |
999 |
< |
if (tc < MAX_THREADS && |
1000 |
< |
tc == (runState & ACTIVE_COUNT_MASK) && |
999 |
> |
if (tc < MAX_THREADS && |
1000 |
> |
tc == (runState & ACTIVE_COUNT_MASK) && |
1001 |
|
workerCounts == wc && |
1002 |
|
UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, |
1003 |
|
wc+(ONE_RUNNING|ONE_TOTAL))) { |
1039 |
|
int wc = workerCounts; |
1040 |
|
int rc = wc & RUNNING_COUNT_MASK; |
1041 |
|
int tc = wc >>> TOTAL_COUNT_SHIFT; |
1042 |
< |
if (running) { |
1043 |
< |
if (rc <= pc && tc > pc && |
1042 |
> |
if (running) { |
1043 |
> |
if (rc <= pc && tc > pc && |
1044 |
|
(retries > 0 || tc > (runState & ACTIVE_COUNT_MASK))) { |
1045 |
|
ForkJoinWorkerThread[] ws = workers; |
1046 |
|
int nws = ws.length; |
1047 |
< |
for (int i = 0; i < nws; ++i) { |
1047 |
> |
for (int i = 0; i < nws; ++i) { |
1048 |
|
ForkJoinWorkerThread w = ws[i]; |
1049 |
|
if (w != null) { |
1050 |
|
if (done = blocker.isReleasable()) |
1063 |
|
if (done = blocker.isReleasable()) |
1064 |
|
return; |
1065 |
|
if (rc == 0 || workerCounts != wc || |
1066 |
< |
!UNSAFE.compareAndSwapInt(this, workerCountsOffset, |
1066 |
> |
!UNSAFE.compareAndSwapInt(this, workerCountsOffset, |
1067 |
|
wc, wc - ONE_RUNNING)) |
1068 |
|
continue; |
1069 |
|
running = false; |
1070 |
|
if (rc > pc) |
1071 |
|
break; |
1072 |
|
} |
1073 |
< |
else { |
1073 |
> |
else { |
1074 |
|
if (rc >= pc || (done = blocker.isReleasable())) |
1075 |
|
break; |
1076 |
|
int sc = tc - pc + 1; |
1077 |
|
if (retries++ > sc) { |
1078 |
|
if (rc > 0 && rc >= pc - sc) |
1079 |
|
break; |
1080 |
< |
if (tc < MAX_THREADS && |
1081 |
< |
tc == (runState & ACTIVE_COUNT_MASK) && |
1080 |
> |
if (tc < MAX_THREADS && |
1081 |
> |
tc == (runState & ACTIVE_COUNT_MASK) && |
1082 |
|
workerCounts == wc && |
1083 |
|
UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, |
1084 |
|
wc+(ONE_RUNNING|ONE_TOTAL))) { |
1089 |
|
Thread.yield(); |
1090 |
|
} |
1091 |
|
} |
1092 |
< |
|
1092 |
> |
|
1093 |
|
try { |
1094 |
|
if (!done) |
1095 |
|
do {} while (!blocker.isReleasable() && !blocker.block()); |
1101 |
|
c = workerCounts, c + ONE_RUNNING)); |
1102 |
|
} |
1103 |
|
} |
1104 |
< |
} |
1104 |
> |
} |
1105 |
|
|
1106 |
|
/** |
1107 |
|
* Possibly initiates and/or completes termination. |
1278 |
|
* use {@link java.lang.Runtime#availableProcessors}. |
1279 |
|
* @param factory the factory for creating new threads. For default value, |
1280 |
|
* use {@link #defaultForkJoinWorkerThreadFactory}. |
1281 |
< |
* @param handler the handler for internal worker threads that |
1282 |
< |
* terminate due to unrecoverable errors encountered while executing |
1281 |
> |
* @param handler the handler for internal worker threads that |
1282 |
> |
* terminate due to unrecoverable errors encountered while executing |
1283 |
|
* tasks. For default value, use <code>null</code>. |
1284 |
< |
* @param asyncMode if true, |
1284 |
> |
* @param asyncMode if true, |
1285 |
|
* establishes local first-in-first-out scheduling mode for forked |
1286 |
|
* tasks that are never joined. This mode may be more appropriate |
1287 |
|
* than default locally stack-based mode in applications in which |
1295 |
|
* because it does not hold {@link |
1296 |
|
* java.lang.RuntimePermission}{@code ("modifyThread")} |
1297 |
|
*/ |
1298 |
< |
public ForkJoinPool(int parallelism, |
1298 |
> |
public ForkJoinPool(int parallelism, |
1299 |
|
ForkJoinWorkerThreadFactory factory, |
1300 |
|
Thread.UncaughtExceptionHandler handler, |
1301 |
|
boolean asyncMode) { |
1348 |
|
/** |
1349 |
|
* Performs the given task, returning its result upon completion. |
1350 |
|
* If the caller is already engaged in a fork/join computation in |
1351 |
< |
* the current pool, this method is equivalent in effect to |
1351 |
> |
* the current pool, this method is equivalent in effect to |
1352 |
|
* {@link ForkJoinTask#invoke}. |
1353 |
|
* |
1354 |
|
* @param task the task |
1365 |
|
/** |
1366 |
|
* Arranges for (asynchronous) execution of the given task. |
1367 |
|
* If the caller is already engaged in a fork/join computation in |
1368 |
< |
* the current pool, this method is equivalent in effect to |
1368 |
> |
* the current pool, this method is equivalent in effect to |
1369 |
|
* {@link ForkJoinTask#fork}. |
1370 |
|
* |
1371 |
|
* @param task the task |
1396 |
|
/** |
1397 |
|
* Submits a ForkJoinTask for execution. |
1398 |
|
* If the caller is already engaged in a fork/join computation in |
1399 |
< |
* the current pool, this method is equivalent in effect to |
1399 |
> |
* the current pool, this method is equivalent in effect to |
1400 |
|
* {@link ForkJoinTask#fork}. |
1401 |
|
* |
1402 |
|
* @param task the task to submit |