1 |
|
/* |
2 |
|
* Written by Doug Lea with assistance from members of JCP JSR-166 |
3 |
|
* Expert Group and released to the public domain, as explained at |
4 |
< |
* http://creativecommons.org/licenses/publicdomain |
4 |
> |
* http://creativecommons.org/publicdomain/zero/1.0/ |
5 |
|
*/ |
6 |
|
|
7 |
|
package jsr166y; |
19 |
|
import java.util.concurrent.RejectedExecutionException; |
20 |
|
import java.util.concurrent.RunnableFuture; |
21 |
|
import java.util.concurrent.TimeUnit; |
22 |
– |
import java.util.concurrent.TimeoutException; |
22 |
|
import java.util.concurrent.atomic.AtomicInteger; |
23 |
|
import java.util.concurrent.locks.LockSupport; |
24 |
|
import java.util.concurrent.locks.ReentrantLock; |
101 |
|
* daemon} mode, there is typically no need to explicitly {@link |
102 |
|
* #shutdown} such a pool upon program exit. |
103 |
|
* |
104 |
< |
* <pre> |
104 |
> |
* <pre> {@code |
105 |
|
* static final ForkJoinPool mainPool = new ForkJoinPool(); |
106 |
|
* ... |
107 |
|
* public void sort(long[] array) { |
108 |
|
* mainPool.invoke(new SortTask(array, 0, array.length)); |
109 |
< |
* } |
111 |
< |
* </pre> |
109 |
> |
* }}</pre> |
110 |
|
* |
111 |
|
* <p><b>Implementation notes</b>: This implementation restricts the |
112 |
|
* maximum number of running threads to 32767. Attempts to create |
149 |
|
* Updates tend not to contend with each other except during |
150 |
|
* bursts while submitted tasks begin or end. In some cases when |
151 |
|
* they do contend, threads can instead do something else |
152 |
< |
* (usually, scan for tesks) until contention subsides. |
152 |
> |
* (usually, scan for tasks) until contention subsides. |
153 |
|
* |
154 |
|
* To enable packing, we restrict maximum parallelism to (1<<15)-1 |
155 |
|
* (which is far in excess of normal operating range) to allow |
193 |
|
* shutdown schemes. |
194 |
|
* |
195 |
|
* Wait Queuing. Unlike HPC work-stealing frameworks, we cannot |
196 |
< |
* let workers spin indefinitely scanning for tasks when none are |
197 |
< |
* can be immediately found, and we cannot start/resume workers |
198 |
< |
* unless there appear to be tasks available. On the other hand, |
199 |
< |
* we must quickly prod them into action when new tasks are |
200 |
< |
* submitted or generated. We park/unpark workers after placing |
201 |
< |
* in an event wait queue when they cannot find work. This "queue" |
202 |
< |
* is actually a simple Treiber stack, headed by the "id" field of |
203 |
< |
* ctl, plus a 15bit counter value to both wake up waiters (by |
204 |
< |
* advancing their count) and avoid ABA effects. Successors are |
205 |
< |
* held in worker field "nextWait". Queuing deals with several |
206 |
< |
* intrinsic races, mainly that a task-producing thread can miss |
207 |
< |
* seeing (and signalling) another thread that gave up looking for |
208 |
< |
* work but has not yet entered the wait queue. We solve this by |
209 |
< |
* requiring a full sweep of all workers both before (in scan()) |
210 |
< |
* and after (in awaitWork()) a newly waiting worker is added to |
211 |
< |
* the wait queue. During a rescan, the worker might release some |
212 |
< |
* other queued worker rather than itself, which has the same net |
213 |
< |
* effect. |
196 |
> |
* let workers spin indefinitely scanning for tasks when none can |
197 |
> |
* be found immediately, and we cannot start/resume workers unless |
198 |
> |
* there appear to be tasks available. On the other hand, we must |
199 |
> |
* quickly prod them into action when new tasks are submitted or |
200 |
> |
* generated. We park/unpark workers after placing in an event |
201 |
> |
* wait queue when they cannot find work. This "queue" is actually |
202 |
> |
* a simple Treiber stack, headed by the "id" field of ctl, plus a |
203 |
> |
* 15bit counter value to both wake up waiters (by advancing their |
204 |
> |
* count) and avoid ABA effects. Successors are held in worker |
205 |
> |
* field "nextWait". Queuing deals with several intrinsic races, |
206 |
> |
* mainly that a task-producing thread can miss seeing (and |
207 |
> |
* signalling) another thread that gave up looking for work but |
208 |
> |
* has not yet entered the wait queue. We solve this by requiring |
209 |
> |
* a full sweep of all workers both before (in scan()) and after |
210 |
> |
* (in tryAwaitWork()) a newly waiting worker is added to the wait |
211 |
> |
* queue. During a rescan, the worker might release some other |
212 |
> |
* queued worker rather than itself, which has the same net |
213 |
> |
* effect. Because enqueued workers may actually be rescanning |
214 |
> |
* rather than waiting, we set and clear the "parked" field of |
215 |
> |
* ForkJoinWorkerThread to reduce unnecessary calls to unpark. |
216 |
> |
* (Use of the parked field requires a secondary recheck to avoid |
217 |
> |
* missed signals.) |
218 |
|
* |
219 |
|
* Signalling. We create or wake up workers only when there |
220 |
|
* appears to be at least one task they might be able to find and |
231 |
|
* Trimming workers. To release resources after periods of lack of |
232 |
|
* use, a worker starting to wait when the pool is quiescent will |
233 |
|
* time out and terminate if the pool has remained quiescent for |
234 |
< |
* SHRINK_RATE nanosecs. |
234 |
> |
* SHRINK_RATE nanosecs. This will slowly propagate, eventually |
235 |
> |
* terminating all workers after long periods of non-use. |
236 |
|
* |
237 |
|
* Submissions. External submissions are maintained in an |
238 |
|
* array-based queue that is structured identically to |
239 |
< |
* ForkJoinWorkerThread queues (which see) except for the use of |
240 |
< |
* submissionLock in method addSubmission. Unlike worker queues, |
241 |
< |
* multiple external threads can add new submissions. |
239 |
> |
* ForkJoinWorkerThread queues except for the use of |
240 |
> |
* submissionLock in method addSubmission. Unlike the case for |
241 |
> |
* worker queues, multiple external threads can add new |
242 |
> |
* submissions, so adding requires a lock. |
243 |
|
* |
244 |
|
* Compensation. Beyond work-stealing support and lifecycle |
245 |
|
* control, the main responsibility of this framework is to take |
276 |
|
* if blocking would leave less than one active (non-waiting, |
277 |
|
* non-blocked) worker. Additionally, to avoid some false alarms |
278 |
|
* due to GC, lagging counters, system activity, etc, compensated |
279 |
< |
* blocking for joins is only attempted after a number of rechecks |
280 |
< |
* proportional to the current apparent deficit (where retries are |
281 |
< |
* interspersed with Thread.yield, for good citizenship). The |
282 |
< |
* variable blockedCount, incremented before blocking and |
283 |
< |
* decremented after, is sometimes needed to distinguish cases of |
284 |
< |
* waiting for work vs blocking on joins or other managed sync, |
285 |
< |
* but both the cases are equivalent for most pool control, so we |
286 |
< |
* can update non-atomically. (Additionally, contention on |
283 |
< |
* blockedCount alleviates some contention on ctl). |
279 |
> |
* blocking for joins is only attempted after rechecks stabilize |
280 |
> |
* (retries are interspersed with Thread.yield, for good |
281 |
> |
* citizenship). The variable blockedCount, incremented before |
282 |
> |
* blocking and decremented after, is sometimes needed to |
283 |
> |
* distinguish cases of waiting for work vs blocking on joins or |
284 |
> |
* other managed sync. Both cases are equivalent for most pool |
285 |
> |
* control, so we can update non-atomically. (Additionally, |
286 |
> |
* contention on blockedCount alleviates some contention on ctl). |
287 |
|
* |
288 |
|
* Shutdown and Termination. A call to shutdownNow atomically sets |
289 |
|
* the ctl stop bit and then (non-atomically) sets each workers |
290 |
|
* "terminate" status, cancels all unprocessed tasks, and wakes up |
291 |
|
* all waiting workers. Detecting whether termination should |
292 |
|
* commence after a non-abrupt shutdown() call requires more work |
293 |
< |
* and bookkeeping. We need consensus about quiesence (i.e., that |
293 |
> |
* and bookkeeping. We need consensus about quiescence (i.e., that |
294 |
|
* there is no more work) which is reflected in active counts so |
295 |
|
* long as there are no current blockers, as well as possible |
296 |
|
* re-evaluations during independent changes in blocking or |
465 |
|
/** |
466 |
|
* Main pool control -- a long packed with: |
467 |
|
* AC: Number of active running workers minus target parallelism (16 bits) |
468 |
< |
* TC: Number of total workers minus target parallelism (16bits) |
468 |
> |
* TC: Number of total workers minus target parallelism (16 bits) |
469 |
|
* ST: true if pool is terminating (1 bit) |
470 |
|
* EC: the wait count of top waiting thread (15 bits) |
471 |
|
* ID: ~poolIndex of top of Treiber stack of waiting threads (16 bits) |
481 |
|
* negative, there is at least one waiting worker, and when e is |
482 |
|
* negative, the pool is terminating. To deal with these possibly |
483 |
|
* negative fields, we use casts in and out of "short" and/or |
484 |
< |
* signed shifts to maintain signedness. Note: AC_SHIFT is |
482 |
< |
* redundantly declared in ForkJoinWorkerThread in order to |
483 |
< |
* integrate a surplus-threads check. |
484 |
> |
* signed shifts to maintain signedness. |
485 |
|
*/ |
486 |
|
volatile long ctl; |
487 |
|
|
525 |
|
|
526 |
|
/** |
527 |
|
* Index (mod submission queue length) of next element to take |
528 |
< |
* from submission queue. |
528 |
> |
* from submission queue. Usage is identical to that for |
529 |
> |
* per-worker queues -- see ForkJoinWorkerThread internal |
530 |
> |
* documentation. |
531 |
|
*/ |
532 |
|
volatile int queueBase; |
533 |
|
|
534 |
|
/** |
535 |
|
* Index (mod submission queue length) of next element to add |
536 |
< |
* in submission queue. |
536 |
> |
* in submission queue. Usage is identical to that for |
537 |
> |
* per-worker queues -- see ForkJoinWorkerThread internal |
538 |
> |
* documentation. |
539 |
|
*/ |
540 |
|
int queueTop; |
541 |
|
|
545 |
|
volatile boolean shutdown; |
546 |
|
|
547 |
|
/** |
548 |
< |
* True if use local fifo, not default lifo, for local polling |
549 |
< |
* Read by, and replicated by ForkJoinWorkerThreads |
548 |
> |
* True if use local fifo, not default lifo, for local polling. |
549 |
> |
* Read by, and replicated by ForkJoinWorkerThreads. |
550 |
|
*/ |
551 |
|
final boolean locallyFifo; |
552 |
|
|
573 |
|
private int nextWorkerIndex; |
574 |
|
|
575 |
|
/** |
576 |
< |
* SeqLock and index masking for for updates to workers array. |
577 |
< |
* Locked when SG_UNIT is set. Unlocking clears bit by adding |
576 |
> |
* SeqLock and index masking for updates to workers array. Locked |
577 |
> |
* when SG_UNIT is set. Unlocking clears bit by adding |
578 |
|
* SG_UNIT. Staleness of read-only operations can be checked by |
579 |
|
* comparing scanGuard to value before the reads. The low 16 bits |
580 |
|
* (i.e, anding with SMASK) hold (the smallest power of two |
712 |
|
*/ |
713 |
|
private boolean scan(ForkJoinWorkerThread w, int a) { |
714 |
|
int g = scanGuard; // mask 0 avoids useless scans if only one active |
715 |
< |
int m = parallelism == 1 - a? 0 : g & SMASK; |
715 |
> |
int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK; |
716 |
|
ForkJoinWorkerThread[] ws = workers; |
717 |
|
if (ws == null || ws.length <= m) // staleness check |
718 |
|
return false; |
759 |
|
} |
760 |
|
|
761 |
|
/** |
762 |
< |
* Tries to enqueue worker in wait queue and await change in |
763 |
< |
* worker's eventCount. Before blocking, rescans queues to avoid |
764 |
< |
* missed signals. If the pool is quiescent, possibly terminates |
765 |
< |
* worker upon exit. |
762 |
> |
* Tries to enqueue worker w in wait queue and await change in |
763 |
> |
* worker's eventCount. If the pool is quiescent and there is |
764 |
> |
* more than one worker, possibly terminates worker upon exit. |
765 |
> |
* Otherwise, before blocking, rescans queues to avoid missed |
766 |
> |
* signals. Upon finding work, releases at least one worker |
767 |
> |
* (which may be the current worker). Rescans restart upon |
768 |
> |
* detected staleness or failure to release due to |
769 |
> |
* contention. Note the unusual conventions about Thread.interrupt |
770 |
> |
* here and elsewhere: Because interrupts are used solely to alert |
771 |
> |
* threads to check termination, which is checked here anyway, we |
772 |
> |
* clear status (using Thread.interrupted) before any call to |
773 |
> |
* park, so that park does not immediately return due to status |
774 |
> |
* being set via some other unrelated call to interrupt in user |
775 |
> |
* code. |
776 |
|
* |
777 |
|
* @param w the calling worker |
778 |
|
* @param c the ctl value on entry |
780 |
|
*/ |
781 |
|
private boolean tryAwaitWork(ForkJoinWorkerThread w, long c) { |
782 |
|
int v = w.eventCount; |
783 |
< |
w.nextWait = (int)c; // w's successor record |
783 |
> |
w.nextWait = (int)c; // w's successor record |
784 |
|
long nc = (long)(v & E_MASK) | ((c - AC_UNIT) & (AC_MASK|TC_MASK)); |
785 |
|
if (ctl != c || !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) { |
786 |
< |
long d = ctl; // return true if lost to a deq, to force rescan |
787 |
< |
return (int)d != (int)c && ((d - c) & AC_MASK) >= 0L; |
786 |
> |
long d = ctl; // return true if lost to a deq, to force scan |
787 |
> |
return (int)d != (int)c && (d & AC_MASK) >= (c & AC_MASK); |
788 |
|
} |
789 |
< |
if (parallelism + (int)(c >> AC_SHIFT) == 1 && |
789 |
> |
for (int sc = w.stealCount; sc != 0;) { // accumulate stealCount |
790 |
> |
long s = stealCount; |
791 |
> |
if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s + sc)) |
792 |
> |
sc = w.stealCount = 0; |
793 |
> |
else if (w.eventCount != v) |
794 |
> |
return true; // update next time |
795 |
> |
} |
796 |
> |
if ((!shutdown || !tryTerminate(false)) && |
797 |
> |
(int)c != 0 && parallelism + (int)(nc >> AC_SHIFT) == 0 && |
798 |
|
blockedCount == 0 && quiescerCount == 0) |
799 |
< |
idleAwaitWork(w, v); // quiescent -- maybe shrink |
800 |
< |
|
778 |
< |
boolean rescanned = false; |
779 |
< |
for (int sc;;) { |
799 |
> |
idleAwaitWork(w, nc, c, v); // quiescent |
800 |
> |
for (boolean rescanned = false;;) { |
801 |
|
if (w.eventCount != v) |
802 |
|
return true; |
803 |
< |
if ((sc = w.stealCount) != 0) { |
783 |
< |
long s = stealCount; // accumulate stealCount |
784 |
< |
if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s+sc)) |
785 |
< |
w.stealCount = 0; |
786 |
< |
} |
787 |
< |
else if (!rescanned) { |
803 |
> |
if (!rescanned) { |
804 |
|
int g = scanGuard, m = g & SMASK; |
805 |
|
ForkJoinWorkerThread[] ws = workers; |
806 |
|
if (ws != null && m < ws.length) { |
837 |
|
} |
838 |
|
|
839 |
|
/** |
840 |
< |
* If pool is quiescent, checks for termination, and waits for |
841 |
< |
* event signal for up to SHRINK_RATE nanosecs. On timeout, if ctl |
842 |
< |
* has not changed, terminates the worker. Upon its termination |
843 |
< |
* (see deregisterWorker), it may wake up another worker to |
844 |
< |
* possibly repeat this process. |
840 |
> |
* If inactivating worker w has caused pool to become |
841 |
> |
* quiescent, check for pool termination, and wait for event |
842 |
> |
* for up to SHRINK_RATE nanosecs (rescans are unnecessary in |
843 |
> |
* this case because quiescence reflects consensus about lack |
844 |
> |
* of work). On timeout, if ctl has not changed, terminate the |
845 |
> |
* worker. Upon its termination (see deregisterWorker), it may |
846 |
> |
* wake up another worker to possibly repeat this process. |
847 |
|
* |
848 |
|
* @param w the calling worker |
849 |
< |
* @param v the eventCount w must wait until changed |
850 |
< |
*/ |
851 |
< |
private void idleAwaitWork(ForkJoinWorkerThread w, int v) { |
852 |
< |
ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs |
853 |
< |
if (shutdown) |
854 |
< |
tryTerminate(false); |
855 |
< |
long c = ctl; |
856 |
< |
long nc = (((c & (AC_MASK|TC_MASK)) + AC_UNIT) | |
857 |
< |
(long)(w.nextWait & E_MASK)); // ctl value to release w |
858 |
< |
if (w.eventCount == v && |
859 |
< |
parallelism + (int)(c >> AC_SHIFT) == 0 && |
860 |
< |
blockedCount == 0 && quiescerCount == 0) { |
843 |
< |
long startTime = System.nanoTime(); |
844 |
< |
Thread.interrupted(); |
845 |
< |
if (w.eventCount == v) { |
849 |
> |
* @param currentCtl the ctl value after enqueuing w |
850 |
> |
* @param prevCtl the ctl value if w terminated |
851 |
> |
* @param v the eventCount w awaits change |
852 |
> |
*/ |
853 |
> |
private void idleAwaitWork(ForkJoinWorkerThread w, long currentCtl, |
854 |
> |
long prevCtl, int v) { |
855 |
> |
if (w.eventCount == v) { |
856 |
> |
if (shutdown) |
857 |
> |
tryTerminate(false); |
858 |
> |
ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs |
859 |
> |
while (ctl == currentCtl) { |
860 |
> |
long startTime = System.nanoTime(); |
861 |
|
w.parked = true; |
862 |
< |
if (w.eventCount == v) |
862 |
> |
if (w.eventCount == v) // must recheck |
863 |
|
LockSupport.parkNanos(this, SHRINK_RATE); |
864 |
|
w.parked = false; |
865 |
< |
if (w.eventCount == v && ctl == c && |
866 |
< |
System.nanoTime() - startTime >= SHRINK_RATE && |
867 |
< |
UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) { |
868 |
< |
w.terminate = true; |
869 |
< |
w.eventCount = ((int)c + EC_UNIT) & E_MASK; |
865 |
> |
if (w.eventCount != v) |
866 |
> |
break; |
867 |
> |
else if (System.nanoTime() - startTime < |
868 |
> |
SHRINK_RATE - (SHRINK_RATE / 10)) // timing slop |
869 |
> |
Thread.interrupted(); // spurious wakeup |
870 |
> |
else if (UNSAFE.compareAndSwapLong(this, ctlOffset, |
871 |
> |
currentCtl, prevCtl)) { |
872 |
> |
w.terminate = true; // restore previous |
873 |
> |
w.eventCount = ((int)currentCtl + EC_UNIT) & E_MASK; |
874 |
> |
break; |
875 |
|
} |
876 |
|
} |
877 |
|
} |
907 |
|
|
908 |
|
/** |
909 |
|
* Creates or doubles submissionQueue array. |
910 |
< |
* Basically identical to ForkJoinWorkerThread version |
910 |
> |
* Basically identical to ForkJoinWorkerThread version. |
911 |
|
*/ |
912 |
|
private void growSubmissionQueue() { |
913 |
|
ForkJoinTask<?>[] oldQ = submissionQueue; |
947 |
|
int pc = parallelism; |
948 |
|
do { |
949 |
|
ForkJoinWorkerThread[] ws; ForkJoinWorkerThread w; |
950 |
< |
int e, ac, tc, rc, i; |
950 |
> |
int e, ac, tc, i; |
951 |
|
long c = ctl; |
952 |
|
int u = (int)(c >>> 32); |
953 |
|
if ((e = (int)c) < 0) { |
987 |
|
} |
988 |
|
|
989 |
|
/** |
990 |
< |
* Decrements blockedCount and increments active count |
990 |
> |
* Decrements blockedCount and increments active count. |
991 |
|
*/ |
992 |
|
private void postBlock() { |
993 |
|
long c; |
994 |
|
do {} while (!UNSAFE.compareAndSwapLong(this, ctlOffset, // no mask |
995 |
|
c = ctl, c + AC_UNIT)); |
996 |
|
int b; |
997 |
< |
do {} while(!UNSAFE.compareAndSwapInt(this, blockedCountOffset, |
998 |
< |
b = blockedCount, b - 1)); |
997 |
> |
do {} while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset, |
998 |
> |
b = blockedCount, b - 1)); |
999 |
|
} |
1000 |
|
|
1001 |
|
/** |
1005 |
|
* @param joinMe the task |
1006 |
|
*/ |
1007 |
|
final void tryAwaitJoin(ForkJoinTask<?> joinMe) { |
988 |
– |
int s; |
1008 |
|
Thread.interrupted(); // clear interrupts before checking termination |
1009 |
|
if (joinMe.status >= 0) { |
1010 |
|
if (tryPreBlock()) { |
1011 |
|
joinMe.tryAwaitDone(0L); |
1012 |
|
postBlock(); |
1013 |
|
} |
1014 |
< |
if ((ctl & STOP_BIT) != 0L) |
1014 |
> |
else if ((ctl & STOP_BIT) != 0L) |
1015 |
|
joinMe.cancelIgnoringExceptions(); |
1016 |
|
} |
1017 |
|
} |
1018 |
|
|
1019 |
|
/** |
1020 |
|
* Possibly blocks the given worker waiting for joinMe to |
1021 |
< |
* complete or timeout |
1021 |
> |
* complete or timeout. |
1022 |
|
* |
1023 |
|
* @param joinMe the task |
1024 |
|
* @param millis the wait time for underlying Object.wait |
1054 |
|
} |
1055 |
|
|
1056 |
|
/** |
1057 |
< |
* If necessary, compensates for blocker, and blocks |
1057 |
> |
* If necessary, compensates for blocker, and blocks. |
1058 |
|
*/ |
1059 |
|
private void awaitBlocker(ManagedBlocker blocker) |
1060 |
|
throws InterruptedException { |
1146 |
|
ws[k] = w; |
1147 |
|
nextWorkerIndex = k + 1; |
1148 |
|
int m = g & SMASK; |
1149 |
< |
g = k >= m? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1); |
1149 |
> |
g = (k > m) ? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1); |
1150 |
|
} |
1151 |
|
} finally { |
1152 |
|
scanGuard = g; |
1226 |
|
if ((int)(c >> AC_SHIFT) != -parallelism) |
1227 |
|
return false; |
1228 |
|
if (!shutdown || blockedCount != 0 || quiescerCount != 0 || |
1229 |
< |
queueTop - queueBase > 0) { |
1229 |
> |
queueBase != queueTop) { |
1230 |
|
if (ctl == c) // staleness check |
1231 |
|
return false; |
1232 |
|
continue; |
1235 |
|
if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, c | STOP_BIT)) |
1236 |
|
startTerminating(); |
1237 |
|
} |
1238 |
< |
if ((short)(c >>> TC_SHIFT) == -parallelism) { |
1239 |
< |
submissionLock.lock(); |
1240 |
< |
termination.signalAll(); |
1241 |
< |
submissionLock.unlock(); |
1238 |
> |
if ((short)(c >>> TC_SHIFT) == -parallelism) { // signal when 0 workers |
1239 |
> |
final ReentrantLock lock = this.submissionLock; |
1240 |
> |
lock.lock(); |
1241 |
> |
try { |
1242 |
> |
termination.signalAll(); |
1243 |
> |
} finally { |
1244 |
> |
lock.unlock(); |
1245 |
> |
} |
1246 |
|
} |
1247 |
|
return true; |
1248 |
|
} |
1249 |
|
|
1250 |
|
/** |
1251 |
|
* Runs up to three passes through workers: (0) Setting |
1252 |
< |
* termination status for each worker, followed by wakeups up |
1253 |
< |
* queued workers (1) helping cancel tasks (2) interrupting |
1252 |
> |
* termination status for each worker, followed by wakeups up to |
1253 |
> |
* queued workers; (1) helping cancel tasks; (2) interrupting |
1254 |
|
* lagging threads (likely in external tasks, but possibly also |
1255 |
|
* blocked in joins). Each pass repeats previous steps because of |
1256 |
|
* potential lagging thread creation. |
1296 |
|
|
1297 |
|
/** |
1298 |
|
* Tries to set the termination status of waiting workers, and |
1299 |
< |
* then wake them up (after which they will terminate). |
1299 |
> |
* then wakes them up (after which they will terminate). |
1300 |
|
*/ |
1301 |
|
private void terminateWaiters() { |
1302 |
|
ForkJoinWorkerThread[] ws = workers; |
1321 |
|
// misc ForkJoinWorkerThread support |
1322 |
|
|
1323 |
|
/** |
1324 |
< |
* Increment or decrement quiescerCount. Needed only to prevent |
1324 |
> |
* Increments or decrements quiescerCount. Needed only to prevent |
1325 |
|
* triggering shutdown if a worker is transiently inactive while |
1326 |
|
* checking quiescence. |
1327 |
|
* |
1329 |
|
*/ |
1330 |
|
final void addQuiescerCount(int delta) { |
1331 |
|
int c; |
1332 |
< |
do {} while(!UNSAFE.compareAndSwapInt(this, quiescerCountOffset, |
1333 |
< |
c = quiescerCount, c + delta)); |
1332 |
> |
do {} while (!UNSAFE.compareAndSwapInt(this, quiescerCountOffset, |
1333 |
> |
c = quiescerCount, c + delta)); |
1334 |
|
} |
1335 |
|
|
1336 |
|
/** |
1337 |
< |
* Directly increment or decrement active count without |
1338 |
< |
* queuing. This method is used to transiently assert inactivation |
1339 |
< |
* while checking quiescence. |
1337 |
> |
* Directly increments or decrements active count without queuing. |
1338 |
> |
* This method is used to transiently assert inactivation while |
1339 |
> |
* checking quiescence. |
1340 |
|
* |
1341 |
|
* @param delta 1 for increment, -1 for decrement |
1342 |
|
*/ |
1355 |
|
final int idlePerActive() { |
1356 |
|
// Approximate at powers of two for small values, saturate past 4 |
1357 |
|
int p = parallelism; |
1358 |
< |
int a = p + (int)(ctl >> AC_SHIFT); |
1359 |
< |
return (a > (p >>>= 1) ? 0 : |
1360 |
< |
a > (p >>>= 1) ? 1 : |
1361 |
< |
a > (p >>>= 1) ? 2 : |
1362 |
< |
a > (p >>>= 1) ? 4 : |
1363 |
< |
8); |
1358 |
> |
int a = p + (int)(ctl >> AC_SHIFT); |
1359 |
> |
return (a > (p >>>= 1) ? 0 : |
1360 |
> |
a > (p >>>= 1) ? 1 : |
1361 |
> |
a > (p >>>= 1) ? 2 : |
1362 |
> |
a > (p >>>= 1) ? 4 : |
1363 |
> |
8); |
1364 |
|
} |
1365 |
|
|
1366 |
|
// Exported methods |
1683 |
|
*/ |
1684 |
|
public int getRunningThreadCount() { |
1685 |
|
int r = parallelism + (int)(ctl >> AC_SHIFT); |
1686 |
< |
return r <= 0? 0 : r; // suppress momentarily negative values |
1686 |
> |
return (r <= 0) ? 0 : r; // suppress momentarily negative values |
1687 |
|
} |
1688 |
|
|
1689 |
|
/** |
1695 |
|
*/ |
1696 |
|
public int getActiveThreadCount() { |
1697 |
|
int r = parallelism + (int)(ctl >> AC_SHIFT) + blockedCount; |
1698 |
< |
return r <= 0? 0 : r; // suppress momentarily negative values |
1698 |
> |
return (r <= 0) ? 0 : r; // suppress momentarily negative values |
1699 |
|
} |
1700 |
|
|
1701 |
|
/** |
1752 |
|
|
1753 |
|
/** |
1754 |
|
* Returns an estimate of the number of tasks submitted to this |
1755 |
< |
* pool that have not yet begun executing. This meThod may take |
1755 |
> |
* pool that have not yet begun executing. This method may take |
1756 |
|
* time proportional to the number of submissions. |
1757 |
|
* |
1758 |
|
* @return the number of queued submissions |
1850 |
|
int ac = rc + blockedCount; |
1851 |
|
String level; |
1852 |
|
if ((c & STOP_BIT) != 0) |
1853 |
< |
level = (tc == 0)? "Terminated" : "Terminating"; |
1853 |
> |
level = (tc == 0) ? "Terminated" : "Terminating"; |
1854 |
|
else |
1855 |
< |
level = shutdown? "Shutting down" : "Running"; |
1855 |
> |
level = shutdown ? "Shutting down" : "Running"; |
1856 |
|
return super.toString() + |
1857 |
|
"[" + level + |
1858 |
|
", parallelism = " + pc + |
1989 |
|
* {@code isReleasable} must return {@code true} if blocking is |
1990 |
|
* not necessary. Method {@code block} blocks the current thread |
1991 |
|
* if necessary (perhaps internally invoking {@code isReleasable} |
1992 |
< |
* before actually blocking). The unusual methods in this API |
1993 |
< |
* accommodate synchronizers that may, but don't usually, block |
1994 |
< |
* for long periods. Similarly, they allow more efficient internal |
1995 |
< |
* handling of cases in which additional workers may be, but |
1996 |
< |
* usually are not, needed to ensure sufficient parallelism. |
1997 |
< |
* Toward this end, implementations of method {@code isReleasable} |
1998 |
< |
* must be amenable to repeated invocation. |
1992 |
> |
* before actually blocking). These actions are performed by any |
1993 |
> |
* thread invoking {@link ForkJoinPool#managedBlock}. The |
1994 |
> |
* unusual methods in this API accommodate synchronizers that may, |
1995 |
> |
* but don't usually, block for long periods. Similarly, they |
1996 |
> |
* allow more efficient internal handling of cases in which |
1997 |
> |
* additional workers may be, but usually are not, needed to |
1998 |
> |
* ensure sufficient parallelism. Toward this end, |
1999 |
> |
* implementations of method {@code isReleasable} must be amenable |
2000 |
> |
* to repeated invocation. |
2001 |
|
* |
2002 |
|
* <p>For example, here is a ManagedBlocker based on a |
2003 |
|
* ReentrantLock: |
2115 |
|
modifyThreadPermission = new RuntimePermission("modifyThread"); |
2116 |
|
defaultForkJoinWorkerThreadFactory = |
2117 |
|
new DefaultForkJoinWorkerThreadFactory(); |
2093 |
– |
int s; |
2118 |
|
try { |
2119 |
|
UNSAFE = getUnsafe(); |
2120 |
< |
Class k = ForkJoinPool.class; |
2120 |
> |
Class<?> k = ForkJoinPool.class; |
2121 |
|
ctlOffset = UNSAFE.objectFieldOffset |
2122 |
|
(k.getDeclaredField("ctl")); |
2123 |
|
stealCountOffset = UNSAFE.objectFieldOffset |
2130 |
|
(k.getDeclaredField("scanGuard")); |
2131 |
|
nextWorkerNumberOffset = UNSAFE.objectFieldOffset |
2132 |
|
(k.getDeclaredField("nextWorkerNumber")); |
2109 |
– |
Class a = ForkJoinTask[].class; |
2110 |
– |
ABASE = UNSAFE.arrayBaseOffset(a); |
2111 |
– |
s = UNSAFE.arrayIndexScale(a); |
2133 |
|
} catch (Exception e) { |
2134 |
|
throw new Error(e); |
2135 |
|
} |
2136 |
+ |
Class<?> a = ForkJoinTask[].class; |
2137 |
+ |
ABASE = UNSAFE.arrayBaseOffset(a); |
2138 |
+ |
int s = UNSAFE.arrayIndexScale(a); |
2139 |
|
if ((s & (s-1)) != 0) |
2140 |
|
throw new Error("data type scale not a power of two"); |
2141 |
|
ASHIFT = 31 - Integer.numberOfLeadingZeros(s); |