259 |
|
* workers that previously could not find a task to now find one: |
260 |
|
* Submission of a new task to the pool, or another worker pushing |
261 |
|
* a task onto a previously empty queue. (We also use this |
262 |
< |
* mechanism for termination actions that require wakeups of idle |
263 |
< |
* workers). Each worker maintains its last known event count, |
264 |
< |
* and blocks when a scan for work did not find a task AND its |
265 |
< |
* lastEventCount matches the current eventCount. Waiting idle |
266 |
< |
* workers are recorded in a variant of Treiber stack headed by |
267 |
< |
* field eventWaiters which, when nonzero, encodes the thread |
268 |
< |
* index and count awaited for by the worker thread most recently |
269 |
< |
* calling eventSync. This thread in turn has a record (field |
270 |
< |
* nextEventWaiter) for the next waiting worker. In addition to |
271 |
< |
* allowing simpler decisions about need for wakeup, the event |
272 |
< |
* count bits in eventWaiters serve the role of tags to avoid ABA |
273 |
< |
* errors in Treiber stacks. To reduce delays in task diffusion, |
274 |
< |
* workers not otherwise occupied may invoke method |
275 |
< |
* releaseEventWaiters, that removes and signals (unparks) workers |
276 |
< |
* not waiting on current count. To reduce stalls, To minimize |
277 |
< |
* task production stalls associate with signalling, any worker |
278 |
< |
* pushing a task on an empty queue invokes the weaker method |
279 |
< |
* signalWork, that only releases idle workers until it detects |
280 |
< |
* interference by other threads trying to release, and lets them |
281 |
< |
* take over. The net effect is a tree-like diffusion of signals, |
282 |
< |
* where released threads (and possibly others) help with unparks. |
283 |
< |
* To further reduce contention effects a bit, failed CASes to |
284 |
< |
* increment field eventCount are tolerated without retries. |
262 |
> |
* mechanism for configuration and termination actions that |
263 |
> |
* require wakeups of idle workers). Each worker maintains its |
264 |
> |
* last known event count, and blocks when a scan for work did not |
265 |
> |
* find a task AND its lastEventCount matches the current |
266 |
> |
* eventCount. Waiting idle workers are recorded in a variant of |
267 |
> |
* Treiber stack headed by field eventWaiters which, when nonzero, |
268 |
> |
* encodes the thread index and count awaited for by the worker |
269 |
> |
* thread most recently calling eventSync. This thread in turn has |
270 |
> |
* a record (field nextEventWaiter) for the next waiting worker. |
271 |
> |
* In addition to allowing simpler decisions about need for |
272 |
> |
* wakeup, the event count bits in eventWaiters serve the role of |
273 |
> |
* tags to avoid ABA errors in Treiber stacks. Upon any wakeup, |
274 |
> |
* released threads also try to release others (but give up upon |
275 |
> |
* contention to reduce useless flailing). The net effect is a |
276 |
> |
* tree-like diffusion of signals, where released threads (and |
277 |
> |
* possibly others) help with unparks. To further reduce |
278 |
> |
* contention effects a bit, failed CASes to increment field |
279 |
> |
* eventCount are tolerated without retries in signalWork. |
280 |
|
* Conceptually they are merged into the same event, which is OK |
281 |
|
* when their only purpose is to enable workers to scan for work. |
282 |
|
* |
299 |
|
* may become a spare at about the same time as another is |
300 |
|
* needlessly being created. We counteract this and related slop |
301 |
|
* in part by requiring resumed spares to immediately recheck (in |
302 |
< |
* preStep) to see whether they they should re-suspend. To avoid |
308 |
< |
* long-term build-up of spares, the oldest spare (see |
309 |
< |
* ForkJoinWorkerThread.suspendAsSpare) occasionally wakes up if |
310 |
< |
* not signalled and calls tryTrimSpare, which uses two different |
311 |
< |
* thresholds: Always killing if the number of spares is greater |
312 |
< |
* that 25% of total, and killing others only at a slower rate |
313 |
< |
* (UNUSED_SPARE_TRIM_RATE_NANOS). |
302 |
> |
* preStep) to see whether they they should re-suspend. |
303 |
|
* |
304 |
< |
* 6. Deciding when to create new workers. The main dynamic |
304 |
> |
* 6. Killing off unneeded workers. The Spare and Event queues use |
305 |
> |
* similar mechanisms to shed unused workers: The oldest (first) |
306 |
> |
* waiter uses a timed rather than hard wait. When this wait times |
307 |
> |
* out without a normal wakeup, it tries to shutdown any one (for |
308 |
> |
* convenience the newest) other waiter via tryShutdownSpare or |
309 |
> |
* tryShutdownWaiter, respectively. The wakeup rates for spares |
310 |
> |
* are much shorter than for waiters. Together, they will |
311 |
> |
* eventually reduce the number of worker threads to a minimum of |
312 |
> |
* one after a long enough period without use. |
313 |
> |
* |
314 |
> |
* 7. Deciding when to create new workers. The main dynamic |
315 |
|
* control in this class is deciding when to create extra threads |
316 |
|
* in method helpMaintainParallelism. We would like to keep |
317 |
|
* exactly #parallelism threads running, which is an impossble |
325 |
|
* impedes accuracy. Our main defense is to allow some slack in |
326 |
|
* creation thresholds, using rules that reflect the fact that the |
327 |
|
* more threads we have running, the more likely that we are |
328 |
< |
* underestimating the number running threads. The rules also |
329 |
< |
* better cope with the fact that some of the methods in this |
328 |
> |
* underestimating the number running threads. (We also include |
329 |
> |
* some heuristic use of Thread.yield when all workers appear to |
330 |
> |
* be busy, to improve likelihood of counts settling.) The rules |
331 |
> |
* also better cope with the fact that some of the methods in this |
332 |
|
* class tend to never become compiled (but are interpreted), so |
333 |
|
* some components of the entire set of controls might execute 100 |
334 |
|
* times faster than others. And similarly for cases where the |
420 |
|
new AtomicInteger(); |
421 |
|
|
422 |
|
/** |
423 |
+ |
* The wakeup interval (in nanoseconds) for the oldest worker |
424 |
+ |
* worker waiting for an event invokes tryShutdownWaiter to shrink |
425 |
+ |
* the number of workers. The exact value does not matter too |
426 |
+ |
* much, but should be long enough to slowly release resources |
427 |
+ |
* during long periods without use without disrupting normal use. |
428 |
+ |
*/ |
429 |
+ |
private static final long SHRINK_RATE_NANOS = |
430 |
+ |
60L * 1000L * 1000L * 1000L; // one minute |
431 |
+ |
|
432 |
+ |
/** |
433 |
|
* Absolute bound for parallelism level. Twice this number plus |
434 |
|
* one (i.e., 0xfff) must fit into a 16bit field to enable |
435 |
|
* word-packing for some counts and indices. |
474 |
|
private volatile long stealCount; |
475 |
|
|
476 |
|
/** |
466 |
– |
* The last nanoTime that a spare thread was trimmed |
467 |
– |
*/ |
468 |
– |
private volatile long trimTime; |
469 |
– |
|
470 |
– |
/** |
471 |
– |
* The rate at which to trim unused spares |
472 |
– |
*/ |
473 |
– |
static final long UNUSED_SPARE_TRIM_RATE_NANOS = |
474 |
– |
1000L * 1000L * 1000L; // 1 sec |
475 |
– |
|
476 |
– |
/** |
477 |
|
* Encoded record of top of treiber stack of threads waiting for |
478 |
|
* events. The top 32 bits contain the count being waited for. The |
479 |
|
* bottom 16 bits contains one plus the pool index of waiting |
514 |
|
* These are bundled together to ensure consistent read for |
515 |
|
* termination checks (i.e., that runLevel is at least SHUTDOWN |
516 |
|
* and active threads is zero). |
517 |
+ |
* |
518 |
+ |
* Notes: Most direct CASes are dependent on these bitfield |
519 |
+ |
* positions. Also, this field is non-private to enable direct |
520 |
+ |
* performance-sensitive CASes in ForkJoinWorkerThread. |
521 |
|
*/ |
522 |
< |
private volatile int runState; |
522 |
> |
volatile int runState; |
523 |
|
|
524 |
|
// Note: The order among run level values matters. |
525 |
|
private static final int RUNLEVEL_SHIFT = 16; |
527 |
|
private static final int TERMINATING = 1 << (RUNLEVEL_SHIFT + 1); |
528 |
|
private static final int TERMINATED = 1 << (RUNLEVEL_SHIFT + 2); |
529 |
|
private static final int ACTIVE_COUNT_MASK = (1 << RUNLEVEL_SHIFT) - 1; |
526 |
– |
private static final int ONE_ACTIVE = 1; // active update delta |
530 |
|
|
531 |
|
/** |
532 |
|
* Holds number of total (i.e., created and not yet terminated) |
568 |
|
private final int poolNumber; |
569 |
|
|
570 |
|
|
571 |
< |
// Utilities for CASing fields. Note that several of these |
572 |
< |
// are manually inlined by callers |
571 |
> |
// Utilities for CASing fields. Note that most of these |
572 |
> |
// are usually manually inlined by callers |
573 |
|
|
574 |
|
/** |
575 |
|
* Increments running count part of workerCounts |
602 |
|
private void decrementWorkerCounts(int dr, int dt) { |
603 |
|
for (;;) { |
604 |
|
int wc = workerCounts; |
602 |
– |
if (wc == 0 && (runState & TERMINATED) != 0) |
603 |
– |
return; // lagging termination on a backout |
605 |
|
if ((wc & RUNNING_COUNT_MASK) - dr < 0 || |
606 |
< |
(wc >>> TOTAL_COUNT_SHIFT) - dt < 0) |
606 |
> |
(wc >>> TOTAL_COUNT_SHIFT) - dt < 0) { |
607 |
> |
if ((runState & TERMINATED) != 0) |
608 |
> |
return; // lagging termination on a backout |
609 |
|
Thread.yield(); |
610 |
+ |
} |
611 |
|
if (UNSAFE.compareAndSwapInt(this, workerCountsOffset, |
612 |
|
wc, wc - (dr + dt))) |
613 |
|
return; |
632 |
|
final boolean tryIncrementActiveCount() { |
633 |
|
int c; |
634 |
|
return UNSAFE.compareAndSwapInt(this, runStateOffset, |
635 |
< |
c = runState, c + ONE_ACTIVE); |
635 |
> |
c = runState, c + 1); |
636 |
|
} |
637 |
|
|
638 |
|
/** |
642 |
|
final boolean tryDecrementActiveCount() { |
643 |
|
int c; |
644 |
|
return UNSAFE.compareAndSwapInt(this, runStateOffset, |
645 |
< |
c = runState, c - ONE_ACTIVE); |
645 |
> |
c = runState, c - 1); |
646 |
|
} |
647 |
|
|
648 |
|
/** |
709 |
|
* Tries to create and add new worker. Assumes that worker counts |
710 |
|
* are already updated to accommodate the worker, so adjusts on |
711 |
|
* failure. |
712 |
+ |
* |
713 |
+ |
* @return the worker, or null on failure |
714 |
|
*/ |
715 |
< |
private void addWorker() { |
715 |
> |
private ForkJoinWorkerThread addWorker() { |
716 |
|
ForkJoinWorkerThread w = null; |
717 |
|
try { |
718 |
|
w = factory.newThread(this); |
722 |
|
tryTerminate(false); // in case of failure during shutdown |
723 |
|
} |
724 |
|
} |
725 |
< |
if (w != null) |
725 |
> |
if (w != null) { |
726 |
|
w.start(recordWorker(w), ueh); |
727 |
+ |
advanceEventCount(); |
728 |
+ |
} |
729 |
+ |
return w; |
730 |
|
} |
731 |
|
|
732 |
|
/** |
749 |
|
/** |
750 |
|
* Releases workers blocked on a count not equal to current count. |
751 |
|
* Normally called after precheck that eventWaiters isn't zero to |
752 |
< |
* avoid wasted array checks. |
753 |
< |
* |
745 |
< |
* @param signalling true if caller is a signalling worker so can |
746 |
< |
* exit upon (conservatively) detected contention by other threads |
747 |
< |
* who will continue to release |
752 |
> |
* avoid wasted array checks. Gives up upon a change in count or |
753 |
> |
* contention, letting other workers take over. |
754 |
|
*/ |
755 |
< |
private void releaseEventWaiters(boolean signalling) { |
755 |
> |
private void releaseEventWaiters() { |
756 |
|
ForkJoinWorkerThread[] ws = workers; |
757 |
|
int n = ws.length; |
758 |
< |
long h; // head of stack |
759 |
< |
ForkJoinWorkerThread w; int id, ec; |
760 |
< |
while ((id = ((int)((h = eventWaiters) & WAITER_ID_MASK)) - 1) >= 0 && |
761 |
< |
(int)(h >>> EVENT_COUNT_SHIFT) != (ec = eventCount) && |
762 |
< |
id < n && (w = ws[id]) != null) { |
763 |
< |
if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset, |
764 |
< |
h, h = w.nextWaiter)) |
765 |
< |
LockSupport.unpark(w); |
766 |
< |
if (signalling && (eventCount != ec || eventWaiters != h)) |
758 |
> |
long h = eventWaiters; |
759 |
> |
int ec = eventCount; |
760 |
> |
ForkJoinWorkerThread w; int id; |
761 |
> |
while ((int)(h >>> EVENT_COUNT_SHIFT) != ec && |
762 |
> |
(id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 && |
763 |
> |
id < n && (w = ws[id]) != null && |
764 |
> |
UNSAFE.compareAndSwapLong(this, eventWaitersOffset, |
765 |
> |
h, h = w.nextWaiter)) { |
766 |
> |
LockSupport.unpark(w); |
767 |
> |
if (eventWaiters != h || eventCount != ec) |
768 |
|
break; |
769 |
|
} |
770 |
|
} |
777 |
|
int c; // try to increment event count -- CAS failure OK |
778 |
|
UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1); |
779 |
|
if (eventWaiters != 0L) |
780 |
< |
releaseEventWaiters(true); |
780 |
> |
releaseEventWaiters(); |
781 |
|
} |
782 |
|
|
783 |
|
/** |
784 |
< |
* Blocks worker until terminating or event count |
785 |
< |
* advances from last value held by worker |
784 |
> |
* Adds the given worker to event queue and blocks until |
785 |
> |
* terminating or event count advances from the workers |
786 |
> |
* lastEventCount value |
787 |
|
* |
788 |
|
* @param w the calling worker thread |
789 |
|
*/ |
790 |
|
private void eventSync(ForkJoinWorkerThread w) { |
791 |
< |
int wec = w.lastEventCount; |
792 |
< |
long nh = (((long)wec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1)); |
791 |
> |
int ec = w.lastEventCount; |
792 |
> |
long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1)); |
793 |
|
long h; |
794 |
|
while ((runState < SHUTDOWN || !tryTerminate(false)) && |
795 |
< |
((h = eventWaiters) == 0L || |
796 |
< |
(int)(h >>> EVENT_COUNT_SHIFT) == wec) && |
797 |
< |
eventCount == wec) { |
795 |
> |
(((int)((h = eventWaiters) & WAITER_ID_MASK)) == 0 || |
796 |
> |
(int)(h >>> EVENT_COUNT_SHIFT) == ec) && |
797 |
> |
eventCount == ec) { |
798 |
|
if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset, |
799 |
|
w.nextWaiter = h, nh)) { |
800 |
< |
while (runState < TERMINATING && eventCount == wec) { |
801 |
< |
if (!tryAccumulateStealCount(w)) // transfer while idle |
802 |
< |
continue; |
803 |
< |
Thread.interrupted(); // clear/ignore interrupt |
804 |
< |
if (eventCount != wec) |
805 |
< |
break; |
800 |
> |
awaitEvent(w, ec); |
801 |
> |
break; |
802 |
> |
} |
803 |
> |
} |
804 |
> |
} |
805 |
> |
|
806 |
> |
/** |
807 |
> |
* Blocks the given worker (that has already been entered as an |
808 |
> |
* event waiter) until terminating or event count advances from |
809 |
> |
* the given value. The oldest (first) waiter uses a timed wait to |
810 |
> |
* occasionally one-by-one shrink the number of workers (to a |
811 |
> |
* minumum of one) if the pool has not been used for extended |
812 |
> |
* periods. |
813 |
> |
* |
814 |
> |
* @param w the calling worker thread |
815 |
> |
* @param ec the count |
816 |
> |
*/ |
817 |
> |
private void awaitEvent(ForkJoinWorkerThread w, int ec) { |
818 |
> |
while (eventCount == ec) { |
819 |
> |
if (tryAccumulateStealCount(w)) { // transfer while idle |
820 |
> |
boolean untimed = (w.nextWaiter != 0L || |
821 |
> |
(workerCounts & RUNNING_COUNT_MASK) <= 1); |
822 |
> |
long startTime = untimed? 0 : System.nanoTime(); |
823 |
> |
Thread.interrupted(); // clear/ignore interrupt |
824 |
> |
if (eventCount != ec || !w.isRunning() || |
825 |
> |
runState >= TERMINATING) // recheck after clear |
826 |
> |
break; |
827 |
> |
if (untimed) |
828 |
|
LockSupport.park(w); |
829 |
+ |
else { |
830 |
+ |
LockSupport.parkNanos(w, SHRINK_RATE_NANOS); |
831 |
+ |
if (eventCount != ec || !w.isRunning() || |
832 |
+ |
runState >= TERMINATING) |
833 |
+ |
break; |
834 |
+ |
if (System.nanoTime() - startTime >= SHRINK_RATE_NANOS) |
835 |
+ |
tryShutdownWaiter(ec); |
836 |
|
} |
800 |
– |
break; |
837 |
|
} |
838 |
|
} |
839 |
< |
w.lastEventCount = eventCount; |
839 |
> |
} |
840 |
> |
|
841 |
> |
/** |
842 |
> |
* Callback from the oldest waiter in awaitEvent waking up after a |
843 |
> |
* period of non-use. Tries (once) to shutdown an event waiter (or |
844 |
> |
* a spare, if one exists). Note that we don't need CAS or locks |
845 |
> |
* here because the method is called only from one thread |
846 |
> |
* occasionally waking (and even misfires are OK). Note that |
847 |
> |
* until the shutdown worker fully terminates, workerCounts |
848 |
> |
* will overestimate total count, which is tolerable. |
849 |
> |
* |
850 |
> |
* @param ec the event count waited on by caller (to abort |
851 |
> |
* attempt if count has since changed). |
852 |
> |
*/ |
853 |
> |
private void tryShutdownWaiter(int ec) { |
854 |
> |
if (spareWaiters != 0) { // prefer killing spares |
855 |
> |
tryShutdownSpare(); |
856 |
> |
return; |
857 |
> |
} |
858 |
> |
ForkJoinWorkerThread[] ws = workers; |
859 |
> |
int n = ws.length; |
860 |
> |
long h = eventWaiters; |
861 |
> |
ForkJoinWorkerThread w; int id; long nh; |
862 |
> |
if (runState == 0 && |
863 |
> |
submissionQueue.isEmpty() && |
864 |
> |
eventCount == ec && |
865 |
> |
(id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 && |
866 |
> |
id < n && (w = ws[id]) != null && |
867 |
> |
(nh = w.nextWaiter) != 0L && // keep at least one worker |
868 |
> |
UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh)) { |
869 |
> |
w.shutdown(); |
870 |
> |
LockSupport.unpark(w); |
871 |
> |
} |
872 |
> |
releaseEventWaiters(); |
873 |
|
} |
874 |
|
|
875 |
|
// Maintaining spares |
878 |
|
* Pushes worker onto the spare stack |
879 |
|
*/ |
880 |
|
final void pushSpare(ForkJoinWorkerThread w) { |
881 |
< |
int ns = (++w.spareCount << SPARE_COUNT_SHIFT) | (w.poolIndex+1); |
881 |
> |
int ns = (++w.spareCount << SPARE_COUNT_SHIFT) | (w.poolIndex + 1); |
882 |
|
do {} while (!UNSAFE.compareAndSwapInt(this, spareWaitersOffset, |
883 |
|
w.nextSpare = spareWaiters,ns)); |
884 |
|
} |
885 |
|
|
886 |
|
/** |
887 |
< |
* Tries (once) to resume a spare if running count is less than |
888 |
< |
* target parallelism. Fails on contention or stale workers. |
887 |
> |
* Callback from oldest spare occasionally waking up. Tries |
888 |
> |
* (once) to shutdown a spare. Same idea as tryShutdownWaiter. |
889 |
|
*/ |
890 |
< |
private void tryResumeSpare() { |
890 |
> |
final void tryShutdownSpare() { |
891 |
|
int sw, id; |
892 |
|
ForkJoinWorkerThread w; |
893 |
|
ForkJoinWorkerThread[] ws; |
894 |
|
if ((id = ((sw = spareWaiters) & SPARE_ID_MASK) - 1) >= 0 && |
895 |
|
id < (ws = workers).length && (w = ws[id]) != null && |
896 |
< |
(workerCounts & RUNNING_COUNT_MASK) < parallelism && |
828 |
< |
eventWaiters == 0L && |
829 |
< |
spareWaiters == sw && |
896 |
> |
(workerCounts & RUNNING_COUNT_MASK) >= parallelism && |
897 |
|
UNSAFE.compareAndSwapInt(this, spareWaitersOffset, |
898 |
< |
sw, w.nextSpare) && |
899 |
< |
w.tryUnsuspend()) { |
833 |
< |
int c; // try increment; if contended, finish after unpark |
834 |
< |
boolean inc = UNSAFE.compareAndSwapInt(this, workerCountsOffset, |
835 |
< |
c = workerCounts, |
836 |
< |
c + ONE_RUNNING); |
898 |
> |
sw, w.nextSpare)) { |
899 |
> |
w.shutdown(); |
900 |
|
LockSupport.unpark(w); |
901 |
< |
if (!inc) { |
902 |
< |
do {} while(!UNSAFE.compareAndSwapInt(this, workerCountsOffset, |
903 |
< |
c = workerCounts, |
904 |
< |
c + ONE_RUNNING)); |
901 |
> |
advanceEventCount(); |
902 |
> |
} |
903 |
> |
} |
904 |
> |
|
905 |
> |
/** |
906 |
> |
* Tries (once) to resume a spare if worker counts match |
907 |
> |
* the given count. |
908 |
> |
* |
909 |
> |
* @param wc workerCounts value on invocation of this method |
910 |
> |
*/ |
911 |
> |
private void tryResumeSpare(int wc) { |
912 |
> |
ForkJoinWorkerThread[] ws = workers; |
913 |
> |
int n = ws.length; |
914 |
> |
int sw, id, rs; ForkJoinWorkerThread w; |
915 |
> |
if ((id = ((sw = spareWaiters) & SPARE_ID_MASK) - 1) >= 0 && |
916 |
> |
id < n && (w = ws[id]) != null && |
917 |
> |
(rs = runState) < TERMINATING && |
918 |
> |
eventWaiters == 0L && workerCounts == wc) { |
919 |
> |
// In case all workers busy, heuristically back off to let settle |
920 |
> |
Thread.yield(); |
921 |
> |
if (eventWaiters == 0L && runState == rs && // recheck |
922 |
> |
workerCounts == wc && spareWaiters == sw && |
923 |
> |
UNSAFE.compareAndSwapInt(this, spareWaitersOffset, |
924 |
> |
sw, w.nextSpare)) { |
925 |
> |
int c; // increment running count before resume |
926 |
> |
do {} while(!UNSAFE.compareAndSwapInt |
927 |
> |
(this, workerCountsOffset, |
928 |
> |
c = workerCounts, c + ONE_RUNNING)); |
929 |
> |
if (w.tryUnsuspend()) |
930 |
> |
LockSupport.unpark(w); |
931 |
> |
else // back out if w was shutdown |
932 |
> |
decrementWorkerCounts(ONE_RUNNING, 0); |
933 |
|
} |
934 |
|
} |
935 |
|
} |
936 |
|
|
937 |
+ |
// adding workers on demand |
938 |
+ |
|
939 |
|
/** |
940 |
< |
* Callback from oldest spare occasionally waking up. Tries |
941 |
< |
* (once) to shutdown a spare if more than 25% spare overage, or |
942 |
< |
* if UNUSED_SPARE_TRIM_RATE_NANOS have elapsed and there are at |
943 |
< |
* least #parallelism running threads. Note that we don't need CAS |
851 |
< |
* or locks here because the method is called only from the oldest |
852 |
< |
* suspended spare occasionally waking (and even misfires are OK). |
853 |
< |
* |
854 |
< |
* @param now the wake up nanoTime of caller |
855 |
< |
*/ |
856 |
< |
final void tryTrimSpare(long now) { |
857 |
< |
long lastTrim = trimTime; |
858 |
< |
trimTime = now; |
859 |
< |
helpMaintainParallelism(); // first, help wake up any needed spares |
860 |
< |
int sw, id; |
861 |
< |
ForkJoinWorkerThread w; |
862 |
< |
ForkJoinWorkerThread[] ws; |
940 |
> |
* Adds one or more workers if needed to establish target parallelism. |
941 |
> |
* Retries upon contention. |
942 |
> |
*/ |
943 |
> |
private void addWorkerIfBelowTarget() { |
944 |
|
int pc = parallelism; |
945 |
< |
int wc = workerCounts; |
946 |
< |
if ((wc & RUNNING_COUNT_MASK) >= pc && |
947 |
< |
(((wc >>> TOTAL_COUNT_SHIFT) - pc) > (pc >>> 2) + 1 ||// approx 25% |
948 |
< |
now - lastTrim >= UNUSED_SPARE_TRIM_RATE_NANOS) && |
949 |
< |
(id = ((sw = spareWaiters) & SPARE_ID_MASK) - 1) >= 0 && |
950 |
< |
id < (ws = workers).length && (w = ws[id]) != null && |
951 |
< |
UNSAFE.compareAndSwapInt(this, spareWaitersOffset, |
952 |
< |
sw, w.nextSpare)) |
953 |
< |
w.shutdown(false); |
945 |
> |
int wc; |
946 |
> |
while (((wc = workerCounts) >>> TOTAL_COUNT_SHIFT) < pc && |
947 |
> |
runState < TERMINATING) { |
948 |
> |
if (UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, |
949 |
> |
wc + (ONE_RUNNING|ONE_TOTAL))) { |
950 |
> |
if (addWorker() == null) |
951 |
> |
break; |
952 |
> |
} |
953 |
> |
} |
954 |
> |
} |
955 |
> |
|
956 |
> |
/** |
957 |
> |
* Tries (once) to add a new worker if all existing workers are |
958 |
> |
* busy, and there are either no running workers or the deficit is |
959 |
> |
* at least twice the surplus. |
960 |
> |
* |
961 |
> |
* @param wc workerCounts value on invocation of this method |
962 |
> |
*/ |
963 |
> |
private void tryAddWorkerIfBusy(int wc) { |
964 |
> |
int tc, rc, rs; |
965 |
> |
int pc = parallelism; |
966 |
> |
if ((tc = wc >>> TOTAL_COUNT_SHIFT) < MAX_WORKERS && |
967 |
> |
((rc = wc & RUNNING_COUNT_MASK) == 0 || |
968 |
> |
rc < pc - ((tc - pc) << 1)) && |
969 |
> |
(rs = runState) < TERMINATING && |
970 |
> |
(rs & ACTIVE_COUNT_MASK) == tc) { |
971 |
> |
// Since all workers busy, heuristically back off to let settle |
972 |
> |
Thread.yield(); |
973 |
> |
if (eventWaiters == 0L && spareWaiters == 0 && // recheck |
974 |
> |
runState == rs && workerCounts == wc && |
975 |
> |
UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, |
976 |
> |
wc + (ONE_RUNNING|ONE_TOTAL))) |
977 |
> |
addWorker(); |
978 |
> |
} |
979 |
|
} |
980 |
|
|
981 |
|
/** |
982 |
|
* Does at most one of: |
983 |
|
* |
984 |
|
* 1. Help wake up existing workers waiting for work via |
985 |
< |
* releaseEventWaiters. (If any exist, then it probably doesn't |
985 |
> |
* releaseEventWaiters. (If any exist, then it doesn't |
986 |
|
* matter right now if under target parallelism level.) |
987 |
|
* |
988 |
< |
* 2. If below parallelism level and a spare exists, try (once) |
883 |
< |
* to resume it via tryResumeSpare. |
988 |
> |
* 2. If a spare exists, try (once) to resume it via tryResumeSpare. |
989 |
|
* |
990 |
< |
* 3. If neither of the above, tries (once) to add a new |
991 |
< |
* worker if either there are not enough total, or if all |
992 |
< |
* existing workers are busy, there are either no running |
993 |
< |
* workers or the deficit is at least twice the surplus. |
990 |
> |
* 3. If there are not enough total workers, add some |
991 |
> |
* via addWorkerIfBelowTarget; |
992 |
> |
* |
993 |
> |
* 4. Try (once) to add a new worker if all existing workers |
994 |
> |
* are busy, via tryAddWorkerIfBusy |
995 |
|
*/ |
996 |
|
private void helpMaintainParallelism() { |
997 |
< |
// uglified to work better when not compiled |
998 |
< |
int pc, wc, rc, tc, rs; long h; |
893 |
< |
if ((h = eventWaiters) != 0L) { |
997 |
> |
long h; int pc, wc; |
998 |
> |
if (((int)((h = eventWaiters) & WAITER_ID_MASK)) != 0) { |
999 |
|
if ((int)(h >>> EVENT_COUNT_SHIFT) != eventCount) |
1000 |
< |
releaseEventWaiters(false); // avoid useless call |
1000 |
> |
releaseEventWaiters(); // avoid useless call |
1001 |
|
} |
1002 |
|
else if ((pc = parallelism) > |
1003 |
< |
(rc = ((wc = workerCounts) & RUNNING_COUNT_MASK))) { |
1003 |
> |
((wc = workerCounts) & RUNNING_COUNT_MASK)) { |
1004 |
|
if (spareWaiters != 0) |
1005 |
< |
tryResumeSpare(); |
1006 |
< |
else if ((rs = runState) < TERMINATING && |
1007 |
< |
((tc = wc >>> TOTAL_COUNT_SHIFT) < pc || |
1008 |
< |
(tc == (rs & ACTIVE_COUNT_MASK) && // all busy |
1009 |
< |
(rc == 0 || // must add |
905 |
< |
rc < pc - ((tc - pc) << 1)) && // within slack |
906 |
< |
tc < MAX_WORKERS && runState == rs)) && // recheck busy |
907 |
< |
workerCounts == wc && |
908 |
< |
UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, |
909 |
< |
wc + (ONE_RUNNING|ONE_TOTAL))) |
910 |
< |
addWorker(); |
1005 |
> |
tryResumeSpare(wc); |
1006 |
> |
else if ((wc >>> TOTAL_COUNT_SHIFT) < pc) |
1007 |
> |
addWorkerIfBelowTarget(); |
1008 |
> |
else |
1009 |
> |
tryAddWorkerIfBusy(wc); |
1010 |
|
} |
1011 |
|
} |
1012 |
|
|
1013 |
|
/** |
1014 |
|
* Callback from workers invoked upon each top-level action (i.e., |
1015 |
< |
* stealing a task or taking a submission and running |
1016 |
< |
* it). Performs one or more of the following: |
1015 |
> |
* stealing a task or taking a submission and running it). |
1016 |
> |
* Performs one or more of the following: |
1017 |
|
* |
1018 |
< |
* 1. If the worker cannot find work (misses > 0), updates its |
1019 |
< |
* active status to inactive and updates activeCount unless |
1020 |
< |
* this is the first miss and there is contention, in which |
922 |
< |
* case it may try again (either in this or a subsequent |
923 |
< |
* call). |
1018 |
> |
* 1. If the worker is active, try to set its active status to |
1019 |
> |
* inactive and update activeCount. On contention, we may try |
1020 |
> |
* again on this or subsequent call. |
1021 |
|
* |
1022 |
< |
* 2. If there are at least 2 misses, awaits the next task event |
926 |
< |
* via eventSync |
1022 |
> |
* 2. Release any existing event waiters that are now relesable |
1023 |
|
* |
1024 |
< |
* 3. If there are too many running threads, suspends this worker |
1025 |
< |
* (first forcing inactivation if necessary). If it is not |
1024 |
> |
* 3. If there are too many running threads, suspend this worker |
1025 |
> |
* (first forcing inactive if necessary). If it is not |
1026 |
|
* needed, it may be killed while suspended via |
1027 |
< |
* tryTrimSpare. Otherwise, upon resume it rechecks to make |
1027 |
> |
* tryShutdownSpare. Otherwise, upon resume it rechecks to make |
1028 |
|
* sure that it is still needed. |
1029 |
|
* |
1030 |
< |
* 4. Helps release and/or reactivate other workers via |
1031 |
< |
* helpMaintainParallelism |
1030 |
> |
* 4. If more than 1 miss, await the next task event via |
1031 |
> |
* eventSync (first forcing inactivation if necessary), upon |
1032 |
> |
* which worker may also be killed, via tryShutdownWaiter. |
1033 |
> |
* |
1034 |
> |
* 5. Help reactivate other workers via helpMaintainParallelism |
1035 |
|
* |
1036 |
|
* @param w the worker |
1037 |
|
* @param misses the number of scans by caller failing to find work |
1038 |
< |
* (saturating at 2 just to avoid wraparound) |
1038 |
> |
* (saturating at 2 to avoid wraparound) |
1039 |
|
*/ |
1040 |
|
final void preStep(ForkJoinWorkerThread w, int misses) { |
1041 |
|
boolean active = w.active; |
1042 |
|
int pc = parallelism; |
1043 |
|
for (;;) { |
1044 |
< |
int wc = workerCounts; |
1045 |
< |
int rc = wc & RUNNING_COUNT_MASK; |
1046 |
< |
if (active && (misses > 0 || rc > pc)) { |
1047 |
< |
int rs; // try inactivate |
1048 |
< |
if (UNSAFE.compareAndSwapInt(this, runStateOffset, |
1049 |
< |
rs = runState, rs - ONE_ACTIVE)) |
1050 |
< |
active = w.active = false; |
1051 |
< |
else if (misses > 1 || rc > pc || |
1052 |
< |
(rs & ACTIVE_COUNT_MASK) >= pc) |
954 |
< |
continue; // force inactivate |
1044 |
> |
int rs, wc, rc, ec; long h; |
1045 |
> |
if (active && UNSAFE.compareAndSwapInt(this, runStateOffset, |
1046 |
> |
rs = runState, rs - 1)) |
1047 |
> |
active = w.active = false; |
1048 |
> |
if (((int)((h = eventWaiters) & WAITER_ID_MASK)) != 0 && |
1049 |
> |
(int)(h >>> EVENT_COUNT_SHIFT) != eventCount) { |
1050 |
> |
releaseEventWaiters(); |
1051 |
> |
if (misses > 1) |
1052 |
> |
continue; // clear before sync below |
1053 |
|
} |
1054 |
< |
if (misses > 1) { |
1055 |
< |
misses = 0; // don't re-sync |
1056 |
< |
eventSync(w); // continue loop to recheck rc |
959 |
< |
} |
960 |
< |
else if (rc > pc) { |
961 |
< |
if (workerCounts == wc && // try to suspend as spare |
1054 |
> |
if ((rc = ((wc = workerCounts) & RUNNING_COUNT_MASK)) > pc) { |
1055 |
> |
if (!active && // must inactivate to suspend |
1056 |
> |
workerCounts == wc && // try to suspend as spare |
1057 |
|
UNSAFE.compareAndSwapInt(this, workerCountsOffset, |
1058 |
< |
wc, wc - ONE_RUNNING) && |
1059 |
< |
!w.suspendAsSpare()) // false if killed |
1060 |
< |
break; |
1058 |
> |
wc, wc - ONE_RUNNING)) { |
1059 |
> |
w.suspendAsSpare(); |
1060 |
> |
if (!w.isRunning()) |
1061 |
> |
break; // was killed while spare |
1062 |
> |
} |
1063 |
> |
continue; |
1064 |
|
} |
1065 |
< |
else { |
1066 |
< |
if (rc < pc || eventWaiters != 0L) |
1067 |
< |
helpMaintainParallelism(); |
1068 |
< |
break; |
1065 |
> |
if (misses > 0) { |
1066 |
> |
if ((ec = eventCount) == w.lastEventCount && misses > 1) { |
1067 |
> |
if (!active) { // must inactivate to sync |
1068 |
> |
eventSync(w); |
1069 |
> |
if (w.isRunning()) |
1070 |
> |
misses = 1; // don't re-sync |
1071 |
> |
else |
1072 |
> |
break; // was killed while waiting |
1073 |
> |
} |
1074 |
> |
continue; |
1075 |
> |
} |
1076 |
> |
w.lastEventCount = ec; |
1077 |
|
} |
1078 |
+ |
if (rc < pc) |
1079 |
+ |
helpMaintainParallelism(); |
1080 |
+ |
break; |
1081 |
|
} |
1082 |
|
} |
1083 |
|
|
1182 |
|
* Actions on transition to TERMINATING |
1183 |
|
* |
1184 |
|
* Runs up to four passes through workers: (0) shutting down each |
1185 |
< |
* quietly (without waking up if parked) to quickly spread |
1186 |
< |
* notifications without unnecessary bouncing around event queues |
1187 |
< |
* etc (1) wake up and help cancel tasks (2) interrupt (3) mop up |
1188 |
< |
* races with interrupted workers |
1185 |
> |
* (without waking up if parked) to quickly spread notifications |
1186 |
> |
* without unnecessary bouncing around event queues etc (1) wake |
1187 |
> |
* up and help cancel tasks (2) interrupt (3) mop up races with |
1188 |
> |
* interrupted workers |
1189 |
|
*/ |
1190 |
|
private void startTerminating() { |
1191 |
|
cancelSubmissions(); |
1198 |
|
for (int i = 0; i < n; ++i) { |
1199 |
|
ForkJoinWorkerThread w = ws[i]; |
1200 |
|
if (w != null) { |
1201 |
< |
w.shutdown(true); |
1201 |
> |
w.shutdown(); |
1202 |
|
if (passes > 0 && !w.isTerminated()) { |
1203 |
|
w.cancelTasks(); |
1204 |
|
LockSupport.unpark(w); |
1346 |
|
this.workerLock = new ReentrantLock(); |
1347 |
|
this.termination = new Phaser(1); |
1348 |
|
this.poolNumber = poolNumberGenerator.incrementAndGet(); |
1240 |
– |
this.trimTime = System.nanoTime(); |
1349 |
|
} |
1350 |
|
|
1351 |
|
/** |
1374 |
|
throw new RejectedExecutionException(); |
1375 |
|
submissionQueue.offer(task); |
1376 |
|
advanceEventCount(); |
1377 |
< |
helpMaintainParallelism(); // start or wake up workers |
1377 |
> |
if (eventWaiters != 0L) |
1378 |
> |
releaseEventWaiters(); |
1379 |
> |
if ((workerCounts >>> TOTAL_COUNT_SHIFT) < parallelism) |
1380 |
> |
addWorkerIfBelowTarget(); |
1381 |
|
} |
1382 |
|
|
1383 |
|
/** |
1396 |
|
|
1397 |
|
/** |
1398 |
|
* Arranges for (asynchronous) execution of the given task. |
1288 |
– |
* If the caller is already engaged in a fork/join computation in |
1289 |
– |
* the current pool, this method is equivalent in effect to |
1290 |
– |
* {@link ForkJoinTask#fork}. |
1399 |
|
* |
1400 |
|
* @param task the task |
1401 |
|
* @throws NullPointerException if the task is null |
1424 |
|
|
1425 |
|
/** |
1426 |
|
* Submits a ForkJoinTask for execution. |
1319 |
– |
* If the caller is already engaged in a fork/join computation in |
1320 |
– |
* the current pool, this method is equivalent in effect to |
1321 |
– |
* {@link ForkJoinTask#fork}. |
1427 |
|
* |
1428 |
|
* @param task the task to submit |
1429 |
|
* @return the task |