754 |
|
} |
755 |
|
|
756 |
|
/** |
757 |
< |
* Tries to enqueue worker in wait queue and await change in |
758 |
< |
* worker's eventCount. Before blocking, rescans queues to avoid |
759 |
< |
* missed signals. If the pool is quiescent, possibly terminates |
760 |
< |
* worker upon exit. |
757 |
> |
* Tries to enqueue worker w in wait queue and await change in |
758 |
> |
* worker's eventCount. If the pool is quiescent, possibly |
759 |
> |
* terminates worker upon exit. Otherwise, before blocking, |
760 |
> |
* rescans queues to avoid missed signals. Upon finding work, |
761 |
> |
* releases at least one worker (which may be the current |
762 |
> |
* worker). Rescans restart upon detected staleness or failure to |
763 |
> |
* release due to contention. |
764 |
|
* |
765 |
|
* @param w the calling worker |
766 |
|
* @param c the ctl value on entry |
768 |
|
*/ |
769 |
|
private boolean tryAwaitWork(ForkJoinWorkerThread w, long c) { |
770 |
|
int v = w.eventCount; |
771 |
< |
w.nextWait = (int)c; // w's successor record |
771 |
> |
w.nextWait = (int)c; // w's successor record |
772 |
|
long nc = (long)(v & E_MASK) | ((c - AC_UNIT) & (AC_MASK|TC_MASK)); |
773 |
|
if (ctl != c || !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) { |
774 |
< |
long d = ctl; // return true if lost to a deq, to force rescan |
774 |
> |
long d = ctl; // return true if lost to a deq, to force scan |
775 |
|
return (int)d != (int)c && ((d - c) & AC_MASK) >= 0L; |
776 |
|
} |
777 |
< |
if (parallelism + (int)(c >> AC_SHIFT) == 1 && |
777 |
> |
for (int sc = w.stealCount; sc != 0;) { // accumulate stealCount |
778 |
> |
long s = stealCount; |
779 |
> |
if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s + sc)) |
780 |
> |
sc = w.stealCount = 0; |
781 |
> |
else if (w.eventCount != v) |
782 |
> |
return true; // update next time |
783 |
> |
} |
784 |
> |
if (parallelism + (int)(nc >> AC_SHIFT) == 0 && |
785 |
|
blockedCount == 0 && quiescerCount == 0) |
786 |
< |
idleAwaitWork(w, v); // quiescent -- maybe shrink |
787 |
< |
|
778 |
< |
boolean rescanned = false; |
779 |
< |
for (int sc;;) { |
786 |
> |
idleAwaitWork(w, nc, c, v); // quiescent |
787 |
> |
for (boolean rescanned = false;;) { |
788 |
|
if (w.eventCount != v) |
789 |
|
return true; |
790 |
< |
if ((sc = w.stealCount) != 0) { |
783 |
< |
long s = stealCount; // accumulate stealCount |
784 |
< |
if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s+sc)) |
785 |
< |
w.stealCount = 0; |
786 |
< |
} |
787 |
< |
else if (!rescanned) { |
790 |
> |
if (!rescanned) { |
791 |
|
int g = scanGuard, m = g & SMASK; |
792 |
|
ForkJoinWorkerThread[] ws = workers; |
793 |
|
if (ws != null && m < ws.length) { |
824 |
|
} |
825 |
|
|
826 |
|
/** |
827 |
< |
* If pool is quiescent, checks for termination, and waits for |
828 |
< |
* event signal for up to SHRINK_RATE nanosecs. On timeout, if ctl |
829 |
< |
* has not changed, terminates the worker. Upon its termination |
830 |
< |
* (see deregisterWorker), it may wake up another worker to |
831 |
< |
* possibly repeat this process. |
827 |
> |
* If inactivating worker w has caused pool to become |
828 |
> |
* quiescent, check for pool termination, and wait for event |
829 |
> |
* for up to SHRINK_RATE nanosecs (rescans are unnecessary in |
830 |
> |
* this case because quiescence reflects consensus about lack |
831 |
> |
* of work). On timeout, if ctl has not changed, terminate the |
832 |
> |
* worker. Upon its termination (see deregisterWorker), it may |
833 |
> |
* wake up another worker to possibly repeat this process. |
834 |
|
* |
835 |
|
* @param w the calling worker |
836 |
< |
* @param v the eventCount w must wait until changed |
837 |
< |
*/ |
838 |
< |
private void idleAwaitWork(ForkJoinWorkerThread w, int v) { |
839 |
< |
ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs |
840 |
< |
if (shutdown) |
841 |
< |
tryTerminate(false); |
842 |
< |
long c = ctl; |
843 |
< |
long nc = (((c & (AC_MASK|TC_MASK)) + AC_UNIT) | |
844 |
< |
(long)(w.nextWait & E_MASK)); // ctl value to release w |
845 |
< |
if (w.eventCount == v && |
846 |
< |
parallelism + (int)(c >> AC_SHIFT) == 0 && |
847 |
< |
blockedCount == 0 && quiescerCount == 0) { |
843 |
< |
long startTime = System.nanoTime(); |
844 |
< |
Thread.interrupted(); |
845 |
< |
if (w.eventCount == v) { |
836 |
> |
* @param currentCtl the ctl value after enqueuing w |
837 |
> |
* @param prevCtl the ctl value if w terminated |
838 |
> |
* @param v the eventCount w awaits change |
839 |
> |
*/ |
840 |
> |
private void idleAwaitWork(ForkJoinWorkerThread w, long currentCtl, |
841 |
> |
long prevCtl, int v) { |
842 |
> |
if (w.eventCount == v) { |
843 |
> |
if (shutdown) |
844 |
> |
tryTerminate(false); |
845 |
> |
ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs |
846 |
> |
while (ctl == currentCtl) { |
847 |
> |
long startTime = System.nanoTime(); |
848 |
|
w.parked = true; |
849 |
< |
if (w.eventCount == v) |
849 |
> |
if (w.eventCount == v) // must recheck |
850 |
|
LockSupport.parkNanos(this, SHRINK_RATE); |
851 |
|
w.parked = false; |
852 |
< |
if (w.eventCount == v && ctl == c && |
853 |
< |
System.nanoTime() - startTime >= SHRINK_RATE && |
854 |
< |
UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) { |
855 |
< |
w.terminate = true; |
856 |
< |
w.eventCount = ((int)c + EC_UNIT) & E_MASK; |
852 |
> |
if (w.eventCount != v) |
853 |
> |
break; |
854 |
> |
else if (System.nanoTime() - startTime < SHRINK_RATE) |
855 |
> |
Thread.interrupted(); // spurious wakeup |
856 |
> |
else if (UNSAFE.compareAndSwapLong(this, ctlOffset, |
857 |
> |
currentCtl, prevCtl)) { |
858 |
> |
w.terminate = true; // restore previous |
859 |
> |
w.eventCount = ((int)currentCtl + EC_UNIT) & E_MASK; |
860 |
> |
break; |
861 |
|
} |
862 |
|
} |
863 |
|
} |
1972 |
|
* {@code isReleasable} must return {@code true} if blocking is |
1973 |
|
* not necessary. Method {@code block} blocks the current thread |
1974 |
|
* if necessary (perhaps internally invoking {@code isReleasable} |
1975 |
< |
* before actually blocking). The unusual methods in this API |
1976 |
< |
* accommodate synchronizers that may, but don't usually, block |
1977 |
< |
* for long periods. Similarly, they allow more efficient internal |
1978 |
< |
* handling of cases in which additional workers may be, but |
1979 |
< |
* usually are not, needed to ensure sufficient parallelism. |
1980 |
< |
* Toward this end, implementations of method {@code isReleasable} |
1981 |
< |
* must be amenable to repeated invocation. |
1975 |
> |
* before actually blocking). These actions are performed by any |
1976 |
> |
* thread invoking {@link ForkJoinPool#managedBlock}. The |
1977 |
> |
* unusual methods in this API accommodate synchronizers that may, |
1978 |
> |
* but don't usually, block for long periods. Similarly, they |
1979 |
> |
* allow more efficient internal handling of cases in which |
1980 |
> |
* additional workers may be, but usually are not, needed to |
1981 |
> |
* ensure sufficient parallelism. Toward this end, |
1982 |
> |
* implementations of method {@code isReleasable} must be amenable |
1983 |
> |
* to repeated invocation. |
1984 |
|
* |
1985 |
|
* <p>For example, here is a ManagedBlocker based on a |
1986 |
|
* ReentrantLock: |