309 |
|
* has not yet entered the wait queue. We solve this by requiring |
310 |
|
* a full sweep of all workers (via repeated calls to method |
311 |
|
* scan()) both before and after a newly waiting worker is added |
312 |
< |
* to the wait queue. During a rescan, the worker might release |
313 |
< |
* some other queued worker rather than itself, which has the same |
314 |
< |
* net effect. Because enqueued workers may actually be rescanning |
315 |
< |
* rather than waiting, we set and clear the "parker" field of |
316 |
< |
* WorkQueues to reduce unnecessary calls to unpark. (This |
317 |
< |
* requires a secondary recheck to avoid missed signals.) Note |
318 |
< |
* the unusual conventions about Thread.interrupts surrounding |
319 |
< |
* parking and other blocking: Because interrupts are used solely |
320 |
< |
* to alert threads to check termination, which is checked anyway |
321 |
< |
* upon blocking, we clear status (using Thread.interrupted) |
322 |
< |
* before any call to park, so that park does not immediately |
323 |
< |
* return due to status being set via some other unrelated call to |
324 |
< |
* interrupt in user code. |
312 |
> |
* to the wait queue. Because enqueued workers may actually be |
313 |
> |
* rescanning rather than waiting, we set and clear the "parker" |
314 |
> |
* field of WorkQueues to reduce unnecessary calls to unpark. |
315 |
> |
* (This requires a secondary recheck to avoid missed signals.) |
316 |
> |
* Note the unusual conventions about Thread.interrupts |
317 |
> |
* surrounding parking and other blocking: Because interrupts are |
318 |
> |
* used solely to alert threads to check termination, which is |
319 |
> |
* checked anyway upon blocking, we clear status (using |
320 |
> |
* Thread.interrupted) before any call to park, so that park does |
321 |
> |
* not immediately return due to status being set via some other |
322 |
> |
* unrelated call to interrupt in user code. |
323 |
|
* |
324 |
|
* Signalling. We create or wake up workers only when there |
325 |
|
* appears to be at least one task they might be able to find and |
326 |
< |
* execute. |
326 |
> |
* execute. When a submission is added or another worker adds a |
327 |
> |
* task to a queue that has fewer than two tasks, they signal |
328 |
> |
* waiting workers (or trigger creation of new ones if fewer than |
329 |
> |
* the given parallelism level -- signalWork). These primary |
330 |
> |
* signals are buttressed by others whenever other threads remove |
331 |
> |
* a task from a queue a notice that there are other tasks there |
332 |
> |
* as well. So in general, pools will be over-signalled. On most |
333 |
> |
* platforms, signalling (unpark) overhead time is noticeably |
334 |
> |
* long, and the time between signalling a thread and it actually |
335 |
> |
* making progress can be very noticeably long, so it is worth |
336 |
> |
* offloading these delays from critical paths as much as |
337 |
> |
* possible. |
338 |
|
* |
339 |
|
* Trimming workers. To release resources after periods of lack of |
340 |
|
* use, a worker starting to wait when the pool is quiescent will |
689 |
|
*/ |
690 |
|
final void push(ForkJoinTask<?> task) { |
691 |
|
ForkJoinTask<?>[] a; ForkJoinPool p; |
692 |
< |
int s = top, m; |
692 |
> |
int s = top, m, n; |
693 |
|
if ((a = array) != null) { // ignore if queue removed |
694 |
< |
int j = (((m = a.length - 1) & s) << ASHIFT) + ABASE; |
694 |
> |
long j = (((m = a.length - 1) & s) << ASHIFT) + ABASE; |
695 |
|
U.putOrderedObject(a, j, task); |
696 |
< |
if ((top = s + 1) - base >= m) |
696 |
> |
if ((n = (top = s + 1) - base) <= 2) { |
697 |
> |
if ((p = pool) != null) |
698 |
> |
p.signalWork(this); |
699 |
> |
} |
700 |
> |
else if (n >= m) |
701 |
|
growArray(); |
689 |
– |
else if ((p = pool) != null) |
690 |
– |
p.signalWork(this); |
702 |
|
} |
703 |
|
} |
704 |
|
|
756 |
|
* appear in ForkJoinPool methods scan and tryHelpStealer. |
757 |
|
*/ |
758 |
|
final ForkJoinTask<?> pollAt(int b) { |
759 |
< |
ForkJoinTask<?> t; ForkJoinTask<?>[] a; |
759 |
> |
ForkJoinTask<?> t; ForkJoinTask<?>[] a; ForkJoinPool p; |
760 |
|
if ((a = array) != null) { |
761 |
|
int j = (((a.length - 1) & b) << ASHIFT) + ABASE; |
762 |
|
if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null && |
763 |
|
base == b && |
764 |
|
U.compareAndSwapObject(a, j, t, null)) { |
765 |
|
U.putOrderedInt(this, QBASE, b + 1); |
766 |
+ |
if (top - b > 1 && (p = pool) != null) |
767 |
+ |
p.signalWork(this); |
768 |
|
return t; |
769 |
|
} |
770 |
|
} |
775 |
|
* Takes next task, if one exists, in FIFO order. |
776 |
|
*/ |
777 |
|
final ForkJoinTask<?> poll() { |
778 |
< |
ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t; |
778 |
> |
ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t; ForkJoinPool p; |
779 |
|
while ((b = base) - top < 0 && (a = array) != null) { |
780 |
|
int j = (((a.length - 1) & b) << ASHIFT) + ABASE; |
781 |
|
t = (ForkJoinTask<?>)U.getObjectVolatile(a, j); |
783 |
|
if (base == b && |
784 |
|
U.compareAndSwapObject(a, j, t, null)) { |
785 |
|
U.putOrderedInt(this, QBASE, b + 1); |
786 |
+ |
if (top - b > 1 && (p = pool) != null) |
787 |
+ |
p.signalWork(this); |
788 |
|
return t; |
789 |
|
} |
790 |
|
} |
935 |
|
* Polls for and executes the given task or any other task in |
936 |
|
* its CountedCompleter computation. |
937 |
|
*/ |
938 |
< |
final boolean pollAndExecCC(ForkJoinTask<?> root) { |
938 |
> |
final boolean pollAndExecCC(ForkJoinTask<?> root, ForkJoinPool spool) { |
939 |
|
ForkJoinTask<?>[] a; int b; Object o; |
940 |
|
outer: while (root.status >= 0 && (b = base) - top < 0 && |
941 |
|
(a = array) != null) { |
948 |
|
if (base == b && |
949 |
|
U.compareAndSwapObject(a, j, t, null)) { |
950 |
|
U.putOrderedInt(this, QBASE, b + 1); |
951 |
+ |
if (spool != null && top - b > 1) |
952 |
+ |
spool.signalWork(this); |
953 |
|
t.doExec(); |
954 |
|
return true; |
955 |
|
} |
1423 |
|
(ws = workQueues) != null && (m = (ws.length - 1)) >= 0 && |
1424 |
|
(q = ws[m & z & SQMASK]) != null && |
1425 |
|
U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock |
1426 |
< |
int s = q.top, am; |
1427 |
< |
if ((a = q.array) != null && (am = a.length - 1) > s - q.base) { |
1426 |
> |
int s, am, n; |
1427 |
> |
if ((a = q.array) != null && |
1428 |
> |
(am = a.length - 1) > (n = (s = q.top) - q.base)) { |
1429 |
|
int j = ((am & s) << ASHIFT) + ABASE; |
1430 |
|
U.putOrderedObject(a, j, task); |
1431 |
|
q.top = s + 1; // push on to deque |
1432 |
|
q.qlock = 0; |
1433 |
< |
signalWork(q); |
1433 |
> |
if (n <= 1) |
1434 |
> |
signalWork(q); |
1435 |
|
return; |
1436 |
|
} |
1437 |
|
q.qlock = 0; |
1492 |
|
try { // locked version of push |
1493 |
|
if ((a != null && a.length > s + 1 - q.base) || |
1494 |
|
(a = q.growArray()) != null) { // must presize |
1495 |
< |
int j = (((a.length - 1) & s) << ASHIFT) + ABASE; |
1495 |
> |
long j = (((a.length - 1) & s) << ASHIFT) + ABASE; |
1496 |
|
U.putOrderedObject(a, j, task); |
1497 |
|
q.top = s + 1; |
1498 |
|
submitted = true; |
1541 |
|
* @param q if non-null, the queue holding tasks to be processed |
1542 |
|
*/ |
1543 |
|
final void signalWork(WorkQueue q) { |
1544 |
< |
long c; int e, u, i, n; WorkQueue[] ws; WorkQueue w; Thread p; |
1545 |
< |
if ((u = (int)((c = ctl) >>> 32)) < 0) { |
1546 |
< |
if ((e = (int)c) > 0) { |
1547 |
< |
if ((ws = workQueues) != null && ws.length > (i = e & SMASK) && |
1548 |
< |
(w = ws[i]) != null) { |
1530 |
< |
long nc = (((long)(w.nextWait & E_MASK)) | |
1531 |
< |
((long)(u + UAC_UNIT) << 32)); |
1532 |
< |
if (w.eventCount == (e | INT_SIGN) && |
1533 |
< |
U.compareAndSwapLong(this, CTL, c, nc)) { |
1534 |
< |
w.eventCount = (e + E_SEQ) & E_MASK; |
1535 |
< |
if ((p = w.parker) != null) |
1536 |
< |
U.unpark(p); |
1537 |
< |
} |
1538 |
< |
else |
1539 |
< |
retrySignalWork(q); |
1540 |
< |
} |
1541 |
< |
} |
1542 |
< |
else if ((short)u < 0) |
1543 |
< |
tryAddWorker(); |
1544 |
< |
} |
1545 |
< |
} |
1546 |
< |
|
1547 |
< |
/** |
1548 |
< |
* Fallback version of signalWork, triggered if release fails |
1549 |
< |
* and the calling queue is non-empty; |
1550 |
< |
*/ |
1551 |
< |
final void retrySignalWork(WorkQueue q) { |
1552 |
< |
long c; int e, u, i, n; WorkQueue[] ws; WorkQueue w; Thread p; |
1553 |
< |
while ((q != null && !q.isEmpty()) && |
1554 |
< |
(u = (int)((c = ctl) >>> 32)) < 0) { |
1555 |
< |
if ((e = (int)c) > 0) { |
1556 |
< |
if ((ws = workQueues) != null && ws.length > (i = e & SMASK) && |
1557 |
< |
(w = ws[i]) != null) { |
1558 |
< |
long nc = (((long)(w.nextWait & E_MASK)) | |
1559 |
< |
((long)(u + UAC_UNIT) << 32)); |
1560 |
< |
if (w.eventCount == (e | INT_SIGN) && |
1561 |
< |
U.compareAndSwapLong(this, CTL, c, nc)) { |
1562 |
< |
w.eventCount = (e + E_SEQ) & E_MASK; |
1563 |
< |
if ((p = w.parker) != null) |
1564 |
< |
U.unpark(p); |
1565 |
< |
break; |
1566 |
< |
} |
1567 |
< |
} |
1568 |
< |
else |
1569 |
< |
break; |
1570 |
< |
} |
1571 |
< |
else { |
1544 |
> |
for (;;) { |
1545 |
> |
long c; int e, u, i; WorkQueue[] ws; WorkQueue w; Thread p; |
1546 |
> |
if ((u = (int)((c = ctl) >>> 32)) >= 0) |
1547 |
> |
break; |
1548 |
> |
if ((e = (int)c) <= 0) { |
1549 |
|
if ((short)u < 0) |
1550 |
|
tryAddWorker(); |
1551 |
|
break; |
1552 |
|
} |
1553 |
+ |
if ((ws = workQueues) == null || ws.length <= (i = e & SMASK) || |
1554 |
+ |
(w = ws[i]) == null) |
1555 |
+ |
break; |
1556 |
+ |
int wec = w.eventCount; |
1557 |
+ |
long nc = (((long)(w.nextWait & E_MASK)) | |
1558 |
+ |
((long)(u + UAC_UNIT) << 32)); |
1559 |
+ |
if (wec == (e | INT_SIGN) && |
1560 |
+ |
U.compareAndSwapLong(this, CTL, c, nc)) { |
1561 |
+ |
w.eventCount = (e + E_SEQ) & E_MASK; |
1562 |
+ |
if ((p = w.parker) != null) |
1563 |
+ |
U.unpark(p); |
1564 |
+ |
break; |
1565 |
+ |
} |
1566 |
+ |
if (q == null || q.base - q.top >= 0) // quit if empty |
1567 |
+ |
break; |
1568 |
|
} |
1569 |
|
} |
1570 |
|
|
1592 |
|
* The scan terminates upon either finding a non-empty queue, or |
1593 |
|
* completing the sweep. If the worker is not inactivated, it |
1594 |
|
* takes and returns a task from this queue. Otherwise, if not |
1595 |
< |
* activated, it signals workers (that may include itself) and |
1596 |
< |
* returns so caller can retry. Also returns for true if the |
1597 |
< |
* worker array may have changed during an empty scan. On failure |
1598 |
< |
* to find a task, we take one of the following actions, after |
1599 |
< |
* which the caller will retry calling this method unless |
1600 |
< |
* terminated. |
1609 |
< |
* |
1610 |
< |
* * If pool is terminating, terminate the worker. |
1611 |
< |
* |
1612 |
< |
* * If not already enqueued, try to inactivate and enqueue the |
1613 |
< |
* worker on wait queue. Or, if inactivating has caused the pool |
1614 |
< |
* to be quiescent, relay to idleAwaitWork to possibly shrink |
1615 |
< |
* pool. |
1616 |
< |
* |
1617 |
< |
* * If already enqueued and none of the above apply, possibly |
1618 |
< |
* park awaiting signal, else lingering to help scan and signal. |
1619 |
< |
* |
1620 |
< |
* * If a non-empty queue discovered or left as a hint, |
1621 |
< |
* help wake up other workers before return. |
1595 |
> |
* activated, it tries to activate itself or some other worker by |
1596 |
> |
* signalling. Also indicates retry if the worker array may have |
1597 |
> |
* changed during an empty scan. On failure to find a task, if |
1598 |
> |
* not already enqueued, tries to inactivate and enqueue the |
1599 |
> |
* worker on wait queue and then rescan. Otherwise relays to one |
1600 |
> |
* of the actions in onEmptyScan. |
1601 |
|
* |
1602 |
|
* @param w the worker (via its WorkQueue) |
1603 |
|
* @return a task or null if none found |
1604 |
|
*/ |
1605 |
|
private final ForkJoinTask<?> scan(WorkQueue w) { |
1606 |
|
WorkQueue[] ws; int m; |
1607 |
< |
int ps = plock; // read plock before ws |
1607 |
> |
int ps = plock; // read plock before ws |
1608 |
|
if (w != null && (ws = workQueues) != null && (m = ws.length - 1) >= 0) { |
1609 |
< |
int ec = w.eventCount; // ec is negative if inactive |
1609 |
> |
int ec = w.eventCount; // ec negative if inactive |
1610 |
|
int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5; |
1611 |
< |
int j = (m << 1) | (ec < 0 ? MIN_RESCANS : 1); |
1612 |
< |
do { |
1613 |
< |
WorkQueue q; int b; |
1614 |
< |
if ((q = ws[(r - j) & m]) != null && |
1615 |
< |
(b = q.base) - q.top < 0) { // probably nonempty |
1616 |
< |
ForkJoinTask<?>[] a = q.array; |
1617 |
< |
if ((ec >= 0 || (ec = w.eventCount) >= 0) && a != null) { |
1618 |
< |
long i = (((a.length - 1) & b) << ASHIFT) + ABASE; |
1619 |
< |
ForkJoinTask<?> t = |
1620 |
< |
(ForkJoinTask<?>)U.getObjectVolatile(a, i); |
1621 |
< |
if (q.base == b && t != null && |
1622 |
< |
U.compareAndSwapObject(a, i, t, null)) { |
1623 |
< |
U.putOrderedInt(q, QBASE, b + 1); |
1624 |
< |
return t; // taken |
1646 |
< |
} |
1611 |
> |
for (int j = (m << 1) | (ec < 0 ? MIN_RESCANS : 1);;) { |
1612 |
> |
WorkQueue q; int b, s; ForkJoinTask<?>[] a; |
1613 |
> |
if ((q = ws[(r - j) & m]) != null && // probably nonempty |
1614 |
> |
(b = q.base) - (s = q.top) < 0 && (a = q.array) != null) { |
1615 |
> |
long i = (((a.length - 1) & b) << ASHIFT) + ABASE; |
1616 |
> |
ForkJoinTask<?> t = |
1617 |
> |
(ForkJoinTask<?>)U.getObjectVolatile(a, i); |
1618 |
> |
if ((ec >= 0 || (ec = w.eventCount) >= 0) && |
1619 |
> |
q.base == b && t != null && |
1620 |
> |
U.compareAndSwapObject(a, i, t, null)) { |
1621 |
> |
U.putOrderedInt(q, QBASE, b + 1); |
1622 |
> |
if (q.top - b > 1) |
1623 |
> |
signalWork(q); |
1624 |
> |
return t; |
1625 |
|
} |
1626 |
< |
if (j < m) // must restart to revisit |
1626 |
> |
if (--j < m) { // restart to revisit |
1627 |
> |
if (ec < 0) // help activate |
1628 |
> |
signalWork(q); |
1629 |
|
break; |
1630 |
+ |
} |
1631 |
+ |
} |
1632 |
+ |
else if (--j < 0) { |
1633 |
+ |
long c = ctl; |
1634 |
+ |
long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK)); |
1635 |
+ |
int e = (int)c; |
1636 |
+ |
if (plock == ps) { |
1637 |
+ |
if (e >= 0 && ec >= 0) { |
1638 |
+ |
w.nextWait = e; // try to enqueue/inactivate |
1639 |
+ |
w.eventCount = ec | INT_SIGN; |
1640 |
+ |
if (!U.compareAndSwapLong(this, CTL, c, nc)) |
1641 |
+ |
w.eventCount = ec; // unmark on CAS failure |
1642 |
+ |
} |
1643 |
+ |
else |
1644 |
+ |
onEmptyScan(w, c); |
1645 |
+ |
} |
1646 |
+ |
break; |
1647 |
|
} |
1651 |
– |
} while (--j >= 0); |
1652 |
– |
|
1653 |
– |
int e, ns; long c, sc; |
1654 |
– |
if (j >= 0 || plock != ps) { // incomplete scan |
1655 |
– |
if (w.eventCount < 0) // help activate for next time |
1656 |
– |
signalWork(null); |
1657 |
– |
} |
1658 |
– |
else if ((e = (int)(c = ctl)) < 0) |
1659 |
– |
w.qlock = -1; // pool is terminating |
1660 |
– |
else if (ec >= 0) { // try to enqueue/inactivate |
1661 |
– |
long nc = (((long)ec | |
1662 |
– |
((c - AC_UNIT) & (AC_MASK|TC_MASK)))); |
1663 |
– |
w.nextWait = e; // link and mark inactive |
1664 |
– |
w.eventCount = ec | INT_SIGN; |
1665 |
– |
if (!U.compareAndSwapLong(this, CTL, c, nc)) |
1666 |
– |
w.eventCount = ec; // unmark on CAS failure |
1667 |
– |
else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK)) |
1668 |
– |
idleAwaitWork(w, nc, c); |
1669 |
– |
} |
1670 |
– |
else if ((ns = w.nsteals) != 0) { |
1671 |
– |
if (U.compareAndSwapLong(this, STEALCOUNT, |
1672 |
– |
sc = stealCount, sc + ns)) |
1673 |
– |
w.nsteals = 0; // collect steals and rescan |
1674 |
– |
} |
1675 |
– |
else if (w.eventCount < 0 && ctl == c) { |
1676 |
– |
Thread wt = Thread.currentThread(); |
1677 |
– |
Thread.interrupted(); // clear status |
1678 |
– |
U.putObject(wt, PARKBLOCKER, this); |
1679 |
– |
w.parker = wt; // emulate LockSupport.park |
1680 |
– |
if (w.eventCount < 0 && ctl == c)// recheck |
1681 |
– |
U.park(false, 0L); // block |
1682 |
– |
w.parker = null; |
1683 |
– |
U.putObject(wt, PARKBLOCKER, null); |
1648 |
|
} |
1649 |
|
} |
1650 |
|
return null; |
1651 |
|
} |
1652 |
|
|
1653 |
|
/** |
1654 |
< |
* If inactivating worker w has caused the pool to become |
1654 |
> |
* A continuation of scan(), possibly blocking or terminating |
1655 |
> |
* worker w. ALso, if inactivating w has caused the pool to become |
1656 |
|
* quiescent, checks for pool termination, and, so long as this is |
1657 |
|
* not the only worker, waits for event for up to a given |
1658 |
|
* duration. On timeout, if ctl has not changed, terminates the |
1660 |
|
* repeat this process. |
1661 |
|
* |
1662 |
|
* @param w the calling worker |
1663 |
< |
* @param currentCtl the ctl value triggering possible quiescence |
1699 |
< |
* @param prevCtl the ctl value to restore if thread is terminated |
1663 |
> |
* @param c the ctl value triggering possible quiescence |
1664 |
|
*/ |
1665 |
< |
private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) { |
1666 |
< |
if (w != null && w.eventCount < 0 && !tryTerminate(false, false) && |
1667 |
< |
(int)prevCtl != 0 && ctl == currentCtl) { |
1668 |
< |
int ns = w.nsteals; |
1669 |
< |
if (ns != 0) { |
1670 |
< |
w.nsteals = 0; |
1671 |
< |
long sc; |
1672 |
< |
do {} while (!U.compareAndSwapLong(this, STEALCOUNT, |
1673 |
< |
sc = stealCount, sc + ns)); |
1674 |
< |
} |
1675 |
< |
int dc = -(short)(currentCtl >>> TC_SHIFT); |
1676 |
< |
long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT; |
1677 |
< |
long deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP; |
1678 |
< |
Thread wt = Thread.currentThread(); |
1679 |
< |
int spins = MIN_RESCANS; // poll before blocking |
1680 |
< |
while (ctl == currentCtl) { |
1681 |
< |
if (spins >= 0) { |
1682 |
< |
if (w.nextSeed() < 0) |
1683 |
< |
--spins; |
1684 |
< |
} |
1685 |
< |
else { |
1686 |
< |
Thread.interrupted(); // timed variant of version in scan() |
1665 |
> |
private final void onEmptyScan(WorkQueue w, long c) { |
1666 |
> |
int ec, ns, e, d; |
1667 |
> |
if (w != null && ctl == c) { |
1668 |
> |
if ((e = (int)c) < 0) |
1669 |
> |
w.qlock = -1; // pool is terminating |
1670 |
> |
else if ((ec = w.eventCount) < 0 && |
1671 |
> |
((d = (int)(c >> AC_SHIFT) + (config & SMASK)) != 0 || |
1672 |
> |
!tryTerminate(false, false))) { |
1673 |
> |
long pc = 0L, parkTime = 0L, deadline = 0L; |
1674 |
> |
if (d == 0 && ec == (e | INT_SIGN) && |
1675 |
> |
(pc = (((long)(w.nextWait & E_MASK)) | |
1676 |
> |
((long)(((int)(c >>> 32)) + UAC_UNIT) << 32))) != 0) { |
1677 |
> |
int dc = -(short)(c >>> TC_SHIFT); |
1678 |
> |
parkTime = (dc < 0 ? FAST_IDLE_TIMEOUT: |
1679 |
> |
(dc + 1) * IDLE_TIMEOUT); |
1680 |
> |
deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP; |
1681 |
> |
} |
1682 |
> |
if ((ns = w.nsteals) != 0) { |
1683 |
> |
long sc = stealCount; |
1684 |
> |
if (U.compareAndSwapLong(this, STEALCOUNT, sc, sc + ns)) |
1685 |
> |
w.nsteals = 0; // collect and rescan |
1686 |
> |
} |
1687 |
> |
else if (w.eventCount < 0 && ctl == c) { |
1688 |
> |
Thread wt = Thread.currentThread(); |
1689 |
> |
Thread.interrupted(); // clear status |
1690 |
|
U.putObject(wt, PARKBLOCKER, this); |
1691 |
< |
w.parker = wt; |
1692 |
< |
if (ctl == currentCtl) |
1693 |
< |
U.park(false, parkTime); |
1691 |
> |
w.parker = wt; // emulate LockSupport.park |
1692 |
> |
if (w.eventCount < 0 && ctl == c) // recheck |
1693 |
> |
U.park(false, parkTime); // block |
1694 |
|
w.parker = null; |
1695 |
|
U.putObject(wt, PARKBLOCKER, null); |
1696 |
< |
if (ctl != currentCtl) |
1697 |
< |
break; |
1698 |
< |
if (deadline - System.nanoTime() <= 0L && |
1732 |
< |
U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) { |
1696 |
> |
if (parkTime != 0L && w.eventCount < 0 && ctl == c && |
1697 |
> |
deadline - System.nanoTime() <= 0L && |
1698 |
> |
U.compareAndSwapLong(this, CTL, c, pc)) { |
1699 |
|
w.eventCount = (w.eventCount + E_SEQ) | E_MASK; |
1700 |
|
w.qlock = -1; // shrink |
1735 |
– |
break; |
1701 |
|
} |
1702 |
|
} |
1703 |
|
} |
1765 |
|
if (t != null && v.base == b && |
1766 |
|
U.compareAndSwapObject(a, i, t, null)) { |
1767 |
|
U.putOrderedInt(v, QBASE, b + 1); |
1768 |
+ |
if (v.top - b > 1) |
1769 |
+ |
signalWork(v); |
1770 |
|
joiner.runSubtask(t); |
1771 |
|
} |
1772 |
|
else if (v.base == b && ++steps == MAX_HELP) |
1798 |
|
* |
1799 |
|
* @param task the task to join |
1800 |
|
*/ |
1801 |
< |
private int helpComplete(ForkJoinTask<?> task) { |
1801 |
> |
private int helpComplete(ForkJoinTask<?> task, ForkJoinPool spool) { |
1802 |
|
WorkQueue[] ws; int m; |
1803 |
|
if (task != null && (ws = workQueues) != null && |
1804 |
|
(m = ws.length - 1) >= 0) { |
1805 |
< |
int scans = m | MIN_RESCANS; |
1839 |
< |
for (int j = 1, k = scans;;) { |
1805 |
> |
for (int j = 1, k = m;;) { |
1806 |
|
WorkQueue q; int s; |
1807 |
|
if ((s = task.status) < 0) |
1808 |
|
return s; |
1809 |
< |
if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) |
1810 |
< |
k = scans; |
1809 |
> |
if ((q = ws[j & m]) != null && q.pollAndExecCC(task, spool)) |
1810 |
> |
k = m; |
1811 |
|
else if (--k >= 0) |
1812 |
|
j += 2; |
1813 |
|
else |
1883 |
|
joiner.tryRemoveAndExec(task)); // process local tasks |
1884 |
|
if (s >= 0 && (s = task.status) >= 0 && |
1885 |
|
(task instanceof CountedCompleter)) |
1886 |
< |
s = helpComplete(task); |
1886 |
> |
s = helpComplete(task, this); |
1887 |
|
while (s >= 0 && (s = task.status) >= 0) { |
1888 |
|
if ((!joiner.isEmpty() || // try helping |
1889 |
|
(s = tryHelpStealer(joiner, task)) == 0) && |
1902 |
|
} |
1903 |
|
} |
1904 |
|
// reactivate |
1905 |
< |
if (false) { // possible hotspot bug? |
1906 |
< |
U.getAndAddLong(this, CTL, AC_UNIT); |
1907 |
< |
} |
1942 |
< |
else { |
1943 |
< |
long c; |
1944 |
< |
do {} while (!U.compareAndSwapLong |
1945 |
< |
(this, CTL, c = ctl, c + AC_UNIT)); |
1946 |
< |
} |
1905 |
> |
long c; |
1906 |
> |
do {} while (!U.compareAndSwapLong |
1907 |
> |
(this, CTL, c = ctl, c + AC_UNIT)); |
1908 |
|
} |
1909 |
|
} |
1910 |
|
} |
1930 |
|
joiner.tryRemoveAndExec(task)); |
1931 |
|
if (s >= 0 && (s = task.status) >= 0 && |
1932 |
|
(task instanceof CountedCompleter)) |
1933 |
< |
s = helpComplete(task); |
1933 |
> |
s = helpComplete(task, this); |
1934 |
|
if (s >= 0 && joiner.isEmpty()) { |
1935 |
|
do {} while (task.status >= 0 && |
1936 |
|
tryHelpStealer(joiner, task) > 0); |
2118 |
|
for (int i = 0; i < ws.length; ++i) { |
2119 |
|
if ((w = ws[i]) != null) { |
2120 |
|
if (!w.isEmpty()) { // signal unprocessed tasks |
2121 |
< |
signalWork(null); |
2121 |
> |
signalWork(w); |
2122 |
|
return false; |
2123 |
|
} |
2124 |
|
if ((i & 1) != 0 && w.eventCount >= 0) |
2275 |
|
break; |
2276 |
|
} |
2277 |
|
if (root.status >= 0) |
2278 |
< |
helpComplete(root); |
2278 |
> |
helpComplete(root, null); |
2279 |
|
} |
2280 |
|
} |
2281 |
|
|