419 |
|
* explicitly take into account core affinities, loads, cache |
420 |
|
* localities, etc, However, they do exploit temporal locality |
421 |
|
* (which usually approximates these) by probabilistically |
422 |
< |
* preferring to re-poll from the same queue after a successful |
423 |
< |
* poll on average #workers times before trying others. |
424 |
< |
* Restricted forms of scanning occur in methods helpComplete and |
422 |
> |
* (bounded by POLL_LIMIT) preferring to re-poll from the same |
423 |
> |
* queue after a successful poll before trying others. Restricted |
424 |
> |
* forms of scanning occur in methods helpComplete and |
425 |
|
* findNonEmptyStealQueue, and take similar but simpler forms. |
426 |
|
* |
427 |
|
* Deactivation and waiting. Queuing encounters several intrinsic |
1275 |
|
private static final int DEFAULT_COMMON_MAX_SPARES = 256; |
1276 |
|
|
1277 |
|
/** |
1278 |
< |
* The maximum number of repolls or rescans per scan. |
1279 |
< |
* Must be a power of two minus 1. |
1278 |
> |
* The maximum number of extra repolls per scan, bounding |
1279 |
> |
* unfairness. Must be a power of two minus 1. |
1280 |
|
*/ |
1281 |
|
private static final int POLL_LIMIT = 255; |
1282 |
|
|
1794 |
|
* pseudorandom permutation. Upon finding a non-empty queue, makes |
1795 |
|
* on average #workers attempts to re-poll (fewer if contended) on |
1796 |
|
* the same queue before returning (impossible scanState value) 0 |
1797 |
< |
* to restart scan. Else returns after at least one full scan and |
1798 |
< |
* at most the given step limit. |
1797 |
> |
* to restart scan. Else returns after at least 1 and at most 32 |
1798 |
> |
* full scans. |
1799 |
|
* |
1800 |
|
* @param w the worker (via its WorkQueue) |
1801 |
< |
* @param limit scan/repoll limit as bitmask (0 if spare) |
1801 |
> |
* @param limit repoll limit as bitmask (0 if spare) |
1802 |
|
* @param step (circular) index increment per iteration (must be odd) |
1803 |
|
* @param r a random seed for origin index |
1804 |
|
* @return negative if should await signal |
1805 |
|
*/ |
1806 |
|
private int scan(WorkQueue w, int limit, int step, int r) { |
1807 |
|
int stat = 0, wl; WorkQueue[] ws; |
1808 |
< |
if ((ws = workQueues) != null && (wl = ws.length) > 0) { |
1808 |
> |
if ((ws = workQueues) != null && w != null && (wl = ws.length) > 0) { |
1809 |
|
for (int m = wl - 1, |
1810 |
< |
idx = m & r, // origin index |
1811 |
< |
nsteps = m | limit, // at least one full scan |
1812 |
< |
maxPolls = m & limit, // mean <= m/2 unless spare |
1810 |
> |
origin = m & r, idx = origin, |
1811 |
|
npolls = 0, |
1812 |
|
ss = w.scanState;;) { // negative if inactive |
1813 |
|
WorkQueue q; ForkJoinTask<?>[] a; int b, d, al; |
1833 |
|
else |
1834 |
|
break; // contention |
1835 |
|
} |
1836 |
< |
if (npolls > maxPolls) |
1836 |
> |
if (npolls > limit) |
1837 |
|
break; |
1838 |
|
} |
1839 |
|
else if (npolls != 0) // rescan |
1840 |
|
break; |
1841 |
< |
else if (--nsteps < 0) { // inactivate or wait |
1842 |
< |
if ((stat = ss) >= 0) |
1841 |
> |
else if ((idx = (idx + step) & m) == origin) { |
1842 |
> |
if (ss < 0) { // await signal |
1843 |
> |
stat = ss; |
1844 |
> |
break; |
1845 |
> |
} |
1846 |
> |
else if (r >= 0) { |
1847 |
|
inactivate(w, ss); |
1848 |
< |
break; |
1848 |
> |
break; |
1849 |
> |
} |
1850 |
> |
else |
1851 |
> |
r <<= 1; // at most 31 rescans |
1852 |
|
} |
1848 |
– |
else // circularly traverse |
1849 |
– |
idx = (idx + step) & m; |
1853 |
|
} |
1854 |
|
} |
1855 |
|
return stat; |