52 |
|
* convenient form for informal monitoring. |
53 |
|
* |
54 |
|
* <p> As is the case with other ExecutorServices, there are three |
55 |
< |
* main task execution methods summarized in the follwoing |
55 |
> |
* main task execution methods summarized in the following |
56 |
|
* table. These are designed to be used by clients not already engaged |
57 |
|
* in fork/join computations in the current pool. The main forms of |
58 |
|
* these methods accept instances of {@code ForkJoinTask}, but |
138 |
|
* cache pollution effects.) |
139 |
|
* |
140 |
|
* Beyond work-stealing support and essential bookkeeping, the |
141 |
< |
* main responsibility of this framework is to arrange tactics for |
142 |
< |
* when one worker is waiting to join a task stolen (or always |
143 |
< |
* held by) another. Becauae we are multiplexing many tasks on to |
144 |
< |
* a pool of workers, we can't just let them block (as in |
145 |
< |
* Thread.join). We also cannot just reassign the joiner's |
146 |
< |
* run-time stack with another and replace it later, which would |
147 |
< |
* be a form of "continuation", that even if possible is not |
148 |
< |
* necessarily a good idea. Given that the creation costs of most |
149 |
< |
* threads on most systems mainly surrounds setting up runtime |
150 |
< |
* stacks, thread creation and switching is usually not much more |
151 |
< |
* expensive than stack creation and switching, and is more |
152 |
< |
* flexible). Instead we combine two tactics: |
141 |
> |
* main responsibility of this framework is to take actions when |
142 |
> |
* one worker is waiting to join a task stolen (or always held by) |
143 |
> |
* another. Becauae we are multiplexing many tasks on to a pool |
144 |
> |
* of workers, we can't just let them block (as in Thread.join). |
145 |
> |
* We also cannot just reassign the joiner's run-time stack with |
146 |
> |
* another and replace it later, which would be a form of |
147 |
> |
* "continuation", that even if possible is not necessarily a good |
148 |
> |
* idea. Given that the creation costs of most threads on most |
149 |
> |
* systems mainly surrounds setting up runtime stacks, thread |
150 |
> |
* creation and switching is usually not much more expensive than |
151 |
> |
* stack creation and switching, and is more flexible). Instead we |
152 |
> |
* combine two tactics: |
153 |
|
* |
154 |
< |
* 1. Arranging for the joiner to execute some task that it |
154 |
> |
* Helping: Arranging for the joiner to execute some task that it |
155 |
|
* would be running if the steal had not occurred. Method |
156 |
|
* ForkJoinWorkerThread.helpJoinTask tracks joining->stealing |
157 |
|
* links to try to find such a task. |
158 |
|
* |
159 |
< |
* 2. Unless there are already enough live threads, creating or |
160 |
< |
* or re-activating a spare thread to compensate for the |
161 |
< |
* (blocked) joiner until it unblocks. Spares then suspend |
162 |
< |
* at their next opportunity or eventually die if unused for |
163 |
< |
* too long. See below and the internal documentation |
164 |
< |
* for tryAwaitJoin for more details about compensation |
165 |
< |
* rules. |
159 |
> |
* Compensating: Unless there are already enough live threads, |
160 |
> |
* creating or or re-activating a spare thread to compensate |
161 |
> |
* for the (blocked) joiner until it unblocks. Spares then |
162 |
> |
* suspend at their next opportunity or eventually die if |
163 |
> |
* unused for too long. See below and the internal |
164 |
> |
* documentation for tryAwaitJoin for more details about |
165 |
> |
* compensation rules. |
166 |
|
* |
167 |
|
* Because the determining existence of conservatively safe |
168 |
|
* helping targets, the availability of already-created spares, |
173 |
|
* increases application footprint, so we try to avoid it, within |
174 |
|
* reason. |
175 |
|
* |
176 |
< |
* The ManagedBlocker extension API can't use option (1) so uses a |
177 |
< |
* special version of (2) in method awaitBlocker. |
176 |
> |
* The ManagedBlocker extension API can't use helping so uses a |
177 |
> |
* special version of compensation in method awaitBlocker. |
178 |
|
* |
179 |
|
* The main throughput advantages of work-stealing stem from |
180 |
|
* decentralized control -- workers mostly steal tasks from each |
497 |
|
* making decisions about creating and suspending spare |
498 |
|
* threads. Updated only by CAS. Note that adding a new worker |
499 |
|
* requires incrementing both counts, since workers start off in |
500 |
< |
* running state. This field is also used for memory-fencing |
501 |
< |
* configuration parameters. |
500 |
> |
* running state. |
501 |
|
*/ |
502 |
|
private volatile int workerCounts; |
503 |
|
|
656 |
|
try { |
657 |
|
w = factory.newThread(this); |
658 |
|
} finally { // Adjust on either null or exceptional factory return |
659 |
< |
if (w == null) { |
659 |
> |
if (w == null) |
660 |
|
onWorkerCreationFailure(); |
662 |
– |
return null; |
663 |
– |
} |
661 |
|
} |
662 |
< |
w.start(recordWorker(w), ueh); |
662 |
> |
if (w != null) |
663 |
> |
w.start(recordWorker(w), ueh); |
664 |
|
return w; |
665 |
|
} |
666 |
|
|
670 |
|
private void onWorkerCreationFailure() { |
671 |
|
for (;;) { |
672 |
|
int wc = workerCounts; |
673 |
< |
if ((wc >>> TOTAL_COUNT_SHIFT) == 0) |
674 |
< |
Thread.yield(); // wait for other counts to settle |
673 |
> |
int rc = wc & RUNNING_COUNT_MASK; |
674 |
> |
int tc = wc >>> TOTAL_COUNT_SHIFT; |
675 |
> |
if (rc == 0 || wc == 0) |
676 |
> |
Thread.yield(); // must wait for other counts to settle |
677 |
|
else if (UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, |
678 |
|
wc - (ONE_RUNNING|ONE_TOTAL))) |
679 |
|
break; |
682 |
|
} |
683 |
|
|
684 |
|
/** |
685 |
< |
* Creates and/or resumes enough workers to establish target |
686 |
< |
* parallelism, giving up if terminating or addWorker fails |
687 |
< |
* |
688 |
< |
* TODO: recast this to support lazier creation and automated |
689 |
< |
* parallelism maintenance |
685 |
> |
* Creates enough total workers to establish target parallelism, |
686 |
> |
* giving up if terminating or addWorker fails |
687 |
|
*/ |
688 |
< |
private void ensureEnoughWorkers() { |
689 |
< |
while ((runState & TERMINATING) == 0) { |
690 |
< |
int pc = parallelism; |
691 |
< |
int wc = workerCounts; |
692 |
< |
int rc = wc & RUNNING_COUNT_MASK; |
693 |
< |
int tc = wc >>> TOTAL_COUNT_SHIFT; |
694 |
< |
if (tc < pc) { |
698 |
< |
if (UNSAFE.compareAndSwapInt |
699 |
< |
(this, workerCountsOffset, |
700 |
< |
wc, wc + (ONE_RUNNING|ONE_TOTAL)) && |
701 |
< |
addWorker() == null) |
702 |
< |
break; |
703 |
< |
} |
704 |
< |
else if (tc > pc && rc < pc && |
705 |
< |
tc > (runState & ACTIVE_COUNT_MASK)) { |
706 |
< |
ForkJoinWorkerThread spare = null; |
707 |
< |
ForkJoinWorkerThread[] ws = workers; |
708 |
< |
int nws = ws.length; |
709 |
< |
for (int i = 0; i < nws; ++i) { |
710 |
< |
ForkJoinWorkerThread w = ws[i]; |
711 |
< |
if (w != null && w.isSuspended()) { |
712 |
< |
if ((workerCounts & RUNNING_COUNT_MASK) > pc) |
713 |
< |
return; |
714 |
< |
if (w.tryResumeSpare()) |
715 |
< |
incrementRunningCount(); |
716 |
< |
break; |
717 |
< |
} |
718 |
< |
} |
719 |
< |
} |
720 |
< |
else |
688 |
> |
private void ensureEnoughTotalWorkers() { |
689 |
> |
int wc; |
690 |
> |
while (((wc = workerCounts) >>> TOTAL_COUNT_SHIFT) < parallelism && |
691 |
> |
runState < TERMINATING) { |
692 |
> |
if ((UNSAFE.compareAndSwapInt(this, workerCountsOffset, |
693 |
> |
wc, wc + (ONE_RUNNING|ONE_TOTAL)) && |
694 |
> |
addWorker() == null)) |
695 |
|
break; |
696 |
|
} |
697 |
|
} |
718 |
|
for (;;) { |
719 |
|
int wc = workerCounts; |
720 |
|
int rc = wc & RUNNING_COUNT_MASK; |
721 |
< |
if (rc - nr < 0 || (wc >>> TOTAL_COUNT_SHIFT) == 0) |
721 |
> |
int tc = wc >>> TOTAL_COUNT_SHIFT; |
722 |
> |
if (rc - nr < 0 || tc == 0) |
723 |
|
Thread.yield(); // back off if waiting for other updates |
724 |
|
else if (UNSAFE.compareAndSwapInt(this, workerCountsOffset, |
725 |
|
wc, wc - unit)) |
728 |
|
|
729 |
|
accumulateStealCount(w); // collect final count |
730 |
|
if (!tryTerminate(false)) |
731 |
< |
ensureEnoughWorkers(); |
731 |
> |
ensureEnoughTotalWorkers(); |
732 |
|
} |
733 |
|
|
734 |
|
// Waiting for and signalling events |
744 |
|
int n = ws.length; |
745 |
|
for (;;) { |
746 |
|
int i = ((int)(top & WAITER_ID_MASK)) - 1; |
747 |
< |
if (i < 0 || (int)(top >>> EVENT_COUNT_SHIFT) == eventCount) |
747 |
> |
int e = (int)(top >>> EVENT_COUNT_SHIFT); |
748 |
> |
if (i < 0 || e == eventCount) |
749 |
|
return; |
750 |
|
ForkJoinWorkerThread w; |
751 |
|
if (i < n && (w = ws[i]) != null && |
785 |
|
int n = ws.length; |
786 |
|
for (;;) { |
787 |
|
int i = ((int)(top & WAITER_ID_MASK)) - 1; |
788 |
< |
if (i < 0 || (int)(top >>> EVENT_COUNT_SHIFT) == ec) |
788 |
> |
int e = (int)(top >>> EVENT_COUNT_SHIFT); |
789 |
> |
if (i < 0 || e == ec) |
790 |
|
return; |
791 |
|
ForkJoinWorkerThread w; |
792 |
|
if (i < n && (w = ws[i]) != null && |
803 |
|
} |
804 |
|
|
805 |
|
/** |
806 |
< |
* If worker is inactive, blocks until terminating or event count |
807 |
< |
* advances from last value held by worker; in any case helps |
831 |
< |
* release others. |
806 |
> |
* Blockss worker until terminating or event count |
807 |
> |
* advances from last value held by worker |
808 |
|
* |
809 |
|
* @param w the calling worker thread |
834 |
– |
* @param retries the number of scans by caller failing to find work |
835 |
– |
* @return false if now too many threads running |
810 |
|
*/ |
811 |
< |
private boolean eventSync(ForkJoinWorkerThread w, int retries) { |
811 |
> |
private void eventSync(ForkJoinWorkerThread w) { |
812 |
|
int wec = w.lastEventCount; |
813 |
< |
if (retries > 1) { // can only block after 2nd miss |
814 |
< |
long nextTop = (((long)wec << EVENT_COUNT_SHIFT) | |
815 |
< |
((long)(w.poolIndex + 1))); |
816 |
< |
long top; |
817 |
< |
while ((runState < SHUTDOWN || !tryTerminate(false)) && |
818 |
< |
(((int)(top = eventWaiters) & WAITER_ID_MASK) == 0 || |
819 |
< |
(int)(top >>> EVENT_COUNT_SHIFT) == wec) && |
820 |
< |
eventCount == wec) { |
821 |
< |
if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset, |
822 |
< |
w.nextWaiter = top, nextTop)) { |
823 |
< |
accumulateStealCount(w); // transfer steals while idle |
824 |
< |
Thread.interrupted(); // clear/ignore interrupt |
825 |
< |
while (eventCount == wec) |
826 |
< |
w.doPark(); |
853 |
< |
break; |
854 |
< |
} |
813 |
> |
long nextTop = (((long)wec << EVENT_COUNT_SHIFT) | |
814 |
> |
((long)(w.poolIndex + 1))); |
815 |
> |
long top; |
816 |
> |
while ((runState < SHUTDOWN || !tryTerminate(false)) && |
817 |
> |
(((int)(top = eventWaiters) & WAITER_ID_MASK) == 0 || |
818 |
> |
(int)(top >>> EVENT_COUNT_SHIFT) == wec) && |
819 |
> |
eventCount == wec) { |
820 |
> |
if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset, |
821 |
> |
w.nextWaiter = top, nextTop)) { |
822 |
> |
accumulateStealCount(w); // transfer steals while idle |
823 |
> |
Thread.interrupted(); // clear/ignore interrupt |
824 |
> |
while (eventCount == wec) |
825 |
> |
w.doPark(); |
826 |
> |
break; |
827 |
|
} |
856 |
– |
wec = eventCount; |
828 |
|
} |
829 |
< |
releaseWaiters(); |
859 |
< |
int wc = workerCounts; |
860 |
< |
if ((wc & RUNNING_COUNT_MASK) <= parallelism) { |
861 |
< |
w.lastEventCount = wec; |
862 |
< |
return true; |
863 |
< |
} |
864 |
< |
if (wec != w.lastEventCount) // back up if may re-wait |
865 |
< |
w.lastEventCount = wec - (wc >>> TOTAL_COUNT_SHIFT); |
866 |
< |
return false; |
829 |
> |
w.lastEventCount = eventCount; |
830 |
|
} |
831 |
|
|
832 |
|
/** |
852 |
|
*/ |
853 |
|
final void preStep(ForkJoinWorkerThread w, int retries) { |
854 |
|
boolean active = w.active; |
855 |
< |
boolean inactivate = active && retries != 0; |
855 |
> |
boolean inactivate = active && retries > 0; |
856 |
|
for (;;) { |
857 |
|
int rs, wc; |
858 |
|
if (inactivate && |
860 |
|
rs = runState, rs - ONE_ACTIVE)) |
861 |
|
inactivate = active = w.active = false; |
862 |
|
if (((wc = workerCounts) & RUNNING_COUNT_MASK) <= parallelism) { |
863 |
< |
if (active || eventSync(w, retries)) |
864 |
< |
break; |
863 |
> |
if (retries > 0) { |
864 |
> |
if (retries > 1 && !active) |
865 |
> |
eventSync(w); |
866 |
> |
releaseWaiters(); |
867 |
> |
} |
868 |
> |
break; |
869 |
|
} |
870 |
< |
else if (!(inactivate |= active) && // must inactivate to suspend |
870 |
> |
if (!(inactivate |= active) && // must inactivate to suspend |
871 |
|
UNSAFE.compareAndSwapInt(this, workerCountsOffset, |
872 |
|
wc, wc - ONE_RUNNING) && |
873 |
|
!w.suspendAsSpare()) // false if trimmed |
879 |
|
* Awaits join of the given task if enough threads, or can resume |
880 |
|
* or create a spare. Fails (in which case the given task might |
881 |
|
* not be done) upon contention or lack of decision about |
882 |
< |
* blocking. Returns void because caller must check |
916 |
< |
* task status on return anyway. |
882 |
> |
* blocking. |
883 |
|
* |
884 |
|
* We allow blocking if: |
885 |
|
* |
887 |
|
* parallelism level if this thread blocks. |
888 |
|
* |
889 |
|
* 2. A spare is resumed to replace this worker. We tolerate |
890 |
< |
* slop in the decision to replace if a spare is found without |
891 |
< |
* first decrementing run count. This may release too many, |
892 |
< |
* but if so, the superfluous ones will re-suspend via |
927 |
< |
* preStep(). |
890 |
> |
* races in the decision to replace when a spare is found. |
891 |
> |
* This may release too many, but if so, the superfluous ones |
892 |
> |
* will re-suspend via preStep(). |
893 |
|
* |
894 |
< |
* 3. After #spares repeated checks, there are no fewer than #spare |
894 |
> |
* 3. After #spares repeated retries, there are fewer than #spare |
895 |
|
* threads not running. We allow this slack to avoid hysteresis |
896 |
|
* and as a hedge against lag/uncertainty of running count |
897 |
|
* estimates when signalling or unblocking stalls. |
898 |
|
* |
899 |
< |
* 4. All existing workers are busy (as rechecked via repeated |
900 |
< |
* retries by caller) and a new spare is created. |
899 |
> |
* 4. All existing workers are busy (as rechecked via #spares |
900 |
> |
* repeated retries by caller) and a new spare is created. |
901 |
|
* |
902 |
< |
* If none of the above hold, we try to escape out by |
903 |
< |
* re-incrementing count and returning to caller, which can retry |
939 |
< |
* later. |
902 |
> |
* If none of the above hold, we escape out by re-incrementing |
903 |
> |
* count and returning to caller, which can retry later. |
904 |
|
* |
905 |
|
* @param joinMe the task to join |
906 |
< |
* @param retries if negative, then serve only as a precheck |
943 |
< |
* that the thread can be replaced by a spare. Otherwise, |
944 |
< |
* the number of repeated calls to this method returning busy |
945 |
< |
* @return true if the call must be retried because there |
946 |
< |
* none of the blocking checks hold |
906 |
> |
* @param retries the number of calls to this method for this join |
907 |
|
*/ |
908 |
< |
final boolean tryAwaitJoin(ForkJoinTask<?> joinMe, int retries) { |
949 |
< |
if (joinMe.status < 0) // precheck for cancellation |
950 |
< |
return false; |
951 |
< |
if ((runState & TERMINATING) != 0) { // shutting down |
952 |
< |
joinMe.cancelIgnoringExceptions(); |
953 |
< |
return false; |
954 |
< |
} |
955 |
< |
|
908 |
> |
final void tryAwaitJoin(ForkJoinTask<?> joinMe, int retries) { |
909 |
|
int pc = parallelism; |
910 |
|
boolean running = true; // false when running count decremented |
911 |
< |
outer:for (;;) { |
911 |
> |
outer:while (joinMe.status >= 0) { |
912 |
|
int wc = workerCounts; |
913 |
|
int rc = wc & RUNNING_COUNT_MASK; |
914 |
|
int tc = wc >>> TOTAL_COUNT_SHIFT; |
915 |
|
if (running) { // replace with spare or decrement count |
916 |
|
if (rc <= pc && tc > pc && |
917 |
|
(retries > 0 || tc > (runState & ACTIVE_COUNT_MASK))) { |
918 |
< |
ForkJoinWorkerThread[] ws = workers; |
918 |
> |
ForkJoinWorkerThread[] ws = workers; // search for spare |
919 |
|
int nws = ws.length; |
920 |
< |
for (int i = 0; i < nws; ++i) { // search for spare |
920 |
> |
for (int i = 0; i < nws; ++i) { |
921 |
|
ForkJoinWorkerThread w = ws[i]; |
922 |
< |
if (w != null) { |
922 |
> |
if (w != null && w.isSuspended()) { |
923 |
> |
if ((workerCounts & RUNNING_COUNT_MASK) > pc) |
924 |
> |
continue outer; |
925 |
|
if (joinMe.status < 0) |
926 |
< |
return false; |
927 |
< |
if (w.isSuspended()) { |
928 |
< |
if ((workerCounts & RUNNING_COUNT_MASK)>=pc && |
929 |
< |
w.tryResumeSpare()) { |
975 |
< |
running = false; |
976 |
< |
break outer; |
977 |
< |
} |
978 |
< |
continue outer; // rescan |
926 |
> |
break outer; |
927 |
> |
if (w.tryResumeSpare()) { |
928 |
> |
running = false; |
929 |
> |
break outer; |
930 |
|
} |
931 |
+ |
continue outer; // rescan on failure to resume |
932 |
|
} |
933 |
|
} |
934 |
|
} |
935 |
< |
if (retries < 0 || // < 0 means replacement check only |
936 |
< |
rc == 0 || joinMe.status < 0 || workerCounts != wc || |
985 |
< |
!UNSAFE.compareAndSwapInt(this, workerCountsOffset, |
986 |
< |
wc, wc - ONE_RUNNING)) |
987 |
< |
return false; // done or inconsistent or contended |
988 |
< |
running = false; |
989 |
< |
if (rc > pc) |
935 |
> |
if ((rc <= pc && (rc == 0 || --retries < 0)) || // no retry |
936 |
> |
joinMe.status < 0) |
937 |
|
break; |
938 |
+ |
if (workerCounts == wc && |
939 |
+ |
UNSAFE.compareAndSwapInt(this, workerCountsOffset, |
940 |
+ |
wc, wc - ONE_RUNNING)) |
941 |
+ |
running = false; |
942 |
|
} |
943 |
|
else { // allow blocking if enough threads |
944 |
< |
if (rc >= pc || joinMe.status < 0) |
944 |
> |
int sc = tc - pc + 1; // = spares, plus the one to add |
945 |
> |
if (sc > 0 && rc > 0 && rc >= pc - sc && rc > pc - retries) |
946 |
> |
break; |
947 |
> |
if (--retries > sc && tc < MAX_THREADS && |
948 |
> |
tc == (runState & ACTIVE_COUNT_MASK) && |
949 |
> |
workerCounts == wc && |
950 |
> |
UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, |
951 |
> |
wc + (ONE_RUNNING|ONE_TOTAL))) { |
952 |
> |
addWorker(); |
953 |
|
break; |
995 |
– |
int sc = tc - pc + 1; // = spare threads, plus the one to add |
996 |
– |
if (retries > sc) { |
997 |
– |
if (rc > 0 && rc >= pc - sc) // allow slack |
998 |
– |
break; |
999 |
– |
if (tc < MAX_THREADS && |
1000 |
– |
tc == (runState & ACTIVE_COUNT_MASK) && |
1001 |
– |
workerCounts == wc && |
1002 |
– |
UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, |
1003 |
– |
wc+(ONE_RUNNING|ONE_TOTAL))) { |
1004 |
– |
addWorker(); |
1005 |
– |
break; |
1006 |
– |
} |
954 |
|
} |
955 |
< |
if (workerCounts == wc && // back out to allow rescan |
955 |
> |
if (workerCounts == wc && |
956 |
|
UNSAFE.compareAndSwapInt (this, workerCountsOffset, |
957 |
|
wc, wc + ONE_RUNNING)) { |
958 |
< |
releaseWaiters(); // help others progress |
959 |
< |
return true; // let caller retry |
958 |
> |
running = true; // back out; allow retry |
959 |
> |
break; |
960 |
|
} |
961 |
|
} |
962 |
|
} |
963 |
< |
// arrive here if can block |
964 |
< |
joinMe.internalAwaitDone(); |
965 |
< |
int c; // to inline incrementRunningCount |
966 |
< |
do {} while (!UNSAFE.compareAndSwapInt |
967 |
< |
(this, workerCountsOffset, |
968 |
< |
c = workerCounts, c + ONE_RUNNING)); |
969 |
< |
return false; |
963 |
> |
if (!running) { // can block |
964 |
> |
int c; // to inline incrementRunningCount |
965 |
> |
joinMe.internalAwaitDone(); |
966 |
> |
do {} while (!UNSAFE.compareAndSwapInt |
967 |
> |
(this, workerCountsOffset, |
968 |
> |
c = workerCounts, c + ONE_RUNNING)); |
969 |
> |
} |
970 |
|
} |
971 |
|
|
972 |
|
/** |
976 |
|
*/ |
977 |
|
final void awaitBlocker(ManagedBlocker blocker) |
978 |
|
throws InterruptedException { |
1032 |
– |
boolean done; |
1033 |
– |
if (done = blocker.isReleasable()) |
1034 |
– |
return; |
979 |
|
int pc = parallelism; |
980 |
+ |
boolean running = true; |
981 |
|
int retries = 0; |
982 |
< |
boolean running = true; // false when running count decremented |
983 |
< |
outer:for (;;) { |
982 |
> |
boolean done; |
983 |
> |
outer:while (!(done = blocker.isReleasable())) { |
984 |
|
int wc = workerCounts; |
985 |
|
int rc = wc & RUNNING_COUNT_MASK; |
986 |
|
int tc = wc >>> TOTAL_COUNT_SHIFT; |
987 |
|
if (running) { |
988 |
< |
if (rc <= pc && tc > pc && |
988 |
> |
if (rc <= pc && tc > pc && |
989 |
|
(retries > 0 || tc > (runState & ACTIVE_COUNT_MASK))) { |
990 |
|
ForkJoinWorkerThread[] ws = workers; |
991 |
|
int nws = ws.length; |
992 |
|
for (int i = 0; i < nws; ++i) { |
993 |
|
ForkJoinWorkerThread w = ws[i]; |
994 |
< |
if (w != null) { |
994 |
> |
if (w != null && w.isSuspended()) { |
995 |
> |
if ((workerCounts & RUNNING_COUNT_MASK) > pc) |
996 |
> |
continue outer; |
997 |
|
if (done = blocker.isReleasable()) |
998 |
< |
return; |
999 |
< |
if (w.isSuspended()) { |
1000 |
< |
if ((workerCounts & RUNNING_COUNT_MASK)>=pc && |
1001 |
< |
w.tryResumeSpare()) { |
1055 |
< |
running = false; |
1056 |
< |
break outer; |
1057 |
< |
} |
1058 |
< |
continue outer; // rescan |
998 |
> |
break outer; |
999 |
> |
if (w.tryResumeSpare()) { |
1000 |
> |
running = false; |
1001 |
> |
break outer; |
1002 |
|
} |
1003 |
+ |
continue outer; |
1004 |
|
} |
1005 |
|
} |
1006 |
< |
} |
1063 |
< |
if (done = blocker.isReleasable()) |
1064 |
< |
return; |
1065 |
< |
if (rc == 0 || workerCounts != wc || |
1066 |
< |
!UNSAFE.compareAndSwapInt(this, workerCountsOffset, |
1067 |
< |
wc, wc - ONE_RUNNING)) |
1068 |
< |
continue; |
1069 |
< |
running = false; |
1070 |
< |
if (rc > pc) |
1071 |
< |
break; |
1072 |
< |
} |
1073 |
< |
else { |
1074 |
< |
if (rc >= pc || (done = blocker.isReleasable())) |
1075 |
< |
break; |
1076 |
< |
int sc = tc - pc + 1; |
1077 |
< |
if (retries++ > sc) { |
1078 |
< |
if (rc > 0 && rc >= pc - sc) |
1006 |
> |
if (done = blocker.isReleasable()) |
1007 |
|
break; |
1008 |
< |
if (tc < MAX_THREADS && |
1009 |
< |
tc == (runState & ACTIVE_COUNT_MASK) && |
1010 |
< |
workerCounts == wc && |
1011 |
< |
UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, |
1012 |
< |
wc+(ONE_RUNNING|ONE_TOTAL))) { |
1013 |
< |
addWorker(); |
1008 |
> |
} |
1009 |
> |
if (rc > 0 && workerCounts == wc && |
1010 |
> |
UNSAFE.compareAndSwapInt(this, workerCountsOffset, |
1011 |
> |
wc, wc - ONE_RUNNING)) { |
1012 |
> |
running = false; |
1013 |
> |
if (rc > pc) |
1014 |
|
break; |
1087 |
– |
} |
1015 |
|
} |
1016 |
+ |
} |
1017 |
+ |
else if (rc >= pc) |
1018 |
+ |
break; |
1019 |
+ |
else if (tc < MAX_THREADS && |
1020 |
+ |
tc == (runState & ACTIVE_COUNT_MASK) && |
1021 |
+ |
workerCounts == wc && |
1022 |
+ |
UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, |
1023 |
+ |
wc + (ONE_RUNNING|ONE_TOTAL))) { |
1024 |
+ |
addWorker(); |
1025 |
+ |
break; |
1026 |
+ |
} |
1027 |
+ |
else if (workerCounts == wc && |
1028 |
+ |
UNSAFE.compareAndSwapInt (this, workerCountsOffset, |
1029 |
+ |
wc, wc + ONE_RUNNING)) { |
1030 |
|
Thread.yield(); |
1031 |
+ |
++retries; |
1032 |
+ |
running = true; // allow rescan |
1033 |
|
} |
1034 |
|
} |
1035 |
|
|
1285 |
|
throw new RejectedExecutionException(); |
1286 |
|
submissionQueue.offer(task); |
1287 |
|
signalEvent(); |
1288 |
< |
ensureEnoughWorkers(); |
1288 |
> |
ensureEnoughTotalWorkers(); |
1289 |
|
} |
1290 |
|
|
1291 |
|
/** |