ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.92 by dl, Tue Feb 22 10:50:51 2011 UTC vs.
Revision 1.93 by dl, Wed Feb 23 12:48:43 2011 UTC

# Line 754 | Line 754 | public class ForkJoinPool extends Abstra
754      }
755  
756      /**
757 <     * Tries to enqueue worker in wait queue and await change in
758 <     * worker's eventCount.  Before blocking, rescans queues to avoid
759 <     * missed signals.  If the pool is quiescent, possibly terminates
760 <     * worker upon exit.
757 >     * Tries to enqueue worker w in wait queue and await change in
758 >     * worker's eventCount.  If the pool is quiescent, possibly
759 >     * terminates worker upon exit.  Otherwise, before blocking,
760 >     * rescans queues to avoid missed signals.  Upon finding work,
761 >     * releases at least one worker (which may be the current
762 >     * worker). Rescans restart upon detected staleness or failure to
763 >     * release due to contention.
764       *
765       * @param w the calling worker
766       * @param c the ctl value on entry
# Line 765 | Line 768 | public class ForkJoinPool extends Abstra
768       */
769      private boolean tryAwaitWork(ForkJoinWorkerThread w, long c) {
770          int v = w.eventCount;
771 <        w.nextWait = (int)c;                       // w's successor record
771 >        w.nextWait = (int)c;                      // w's successor record
772          long nc = (long)(v & E_MASK) | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
773          if (ctl != c || !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
774 <            long d = ctl; // return true if lost to a deq, to force rescan
774 >            long d = ctl; // return true if lost to a deq, to force scan
775              return (int)d != (int)c && ((d - c) & AC_MASK) >= 0L;
776          }
777 <        if (parallelism + (int)(c >> AC_SHIFT) == 1 &&
777 >        for (int sc = w.stealCount; sc != 0;) {   // accumulate stealCount
778 >            long s = stealCount;
779 >            if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s + sc))
780 >                sc = w.stealCount = 0;
781 >            else if (w.eventCount != v)
782 >                return true;                      // update next time
783 >        }
784 >        if (parallelism + (int)(nc >> AC_SHIFT) == 0 &&
785              blockedCount == 0 && quiescerCount == 0)
786 <            idleAwaitWork(w, v);               // quiescent -- maybe shrink
787 <
778 <        boolean rescanned = false;
779 <        for (int sc;;) {
786 >            idleAwaitWork(w, nc, c, v);           // quiescent
787 >        for (boolean rescanned = false;;) {
788              if (w.eventCount != v)
789                  return true;
790 <            if ((sc = w.stealCount) != 0) {
783 <                long s = stealCount;               // accumulate stealCount
784 <                if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s+sc))
785 <                    w.stealCount = 0;
786 <            }
787 <            else if (!rescanned) {
790 >            if (!rescanned) {
791                  int g = scanGuard, m = g & SMASK;
792                  ForkJoinWorkerThread[] ws = workers;
793                  if (ws != null && m < ws.length) {
# Line 821 | Line 824 | public class ForkJoinPool extends Abstra
824      }
825  
826      /**
827 <     * If pool is quiescent, checks for termination, and waits for
828 <     * event signal for up to SHRINK_RATE nanosecs. On timeout, if ctl
829 <     * has not changed, terminates the worker. Upon its termination
830 <     * (see deregisterWorker), it may wake up another worker to
831 <     * possibly repeat this process.
827 >     * If inactivating worker w has caused pool to become
828 >     * quiescent, check for pool termination, and wait for event
829 >     * for up to SHRINK_RATE nanosecs (rescans are unnecessary in
830 >     * this case because quiescence reflects consensus about lack
831 >     * of work). On timeout, if ctl has not changed, terminate the
832 >     * worker. Upon its termination (see deregisterWorker), it may
833 >     * wake up another worker to possibly repeat this process.
834       *
835       * @param w the calling worker
836 <     * @param v the eventCount w must wait until changed
837 <     */
838 <    private void idleAwaitWork(ForkJoinWorkerThread w, int v) {
839 <        ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs
840 <        if (shutdown)
841 <            tryTerminate(false);
842 <        long c = ctl;
843 <        long nc = (((c & (AC_MASK|TC_MASK)) + AC_UNIT) |
844 <                   (long)(w.nextWait & E_MASK)); // ctl value to release w
845 <        if (w.eventCount == v &&
846 <            parallelism + (int)(c >> AC_SHIFT) == 0 &&
847 <            blockedCount == 0 && quiescerCount == 0) {
843 <            long startTime = System.nanoTime();
844 <            Thread.interrupted();
845 <            if (w.eventCount == v) {
836 >     * @param currentCtl the ctl value after enqueuing w
837 >     * @param prevCtl the ctl value if w terminated
838 >     * @param v the eventCount w awaits change
839 >     */
840 >    private void idleAwaitWork(ForkJoinWorkerThread w, long currentCtl,
841 >                               long prevCtl, int v) {
842 >        if (w.eventCount == v) {
843 >            if (shutdown)
844 >                tryTerminate(false);
845 >            ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs
846 >            while (ctl == currentCtl) {
847 >                long startTime = System.nanoTime();
848                  w.parked = true;
849 <                if (w.eventCount == v)
849 >                if (w.eventCount == v)             // must recheck
850                      LockSupport.parkNanos(this, SHRINK_RATE);
851                  w.parked = false;
852 <                if (w.eventCount == v && ctl == c &&
853 <                    System.nanoTime() - startTime >= SHRINK_RATE &&
854 <                    UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
855 <                    w.terminate = true;
856 <                    w.eventCount = ((int)c + EC_UNIT) & E_MASK;
852 >                if (w.eventCount != v)
853 >                    break;
854 >                else if (System.nanoTime() - startTime < SHRINK_RATE)
855 >                    Thread.interrupted();          // spurious wakeup
856 >                else if (UNSAFE.compareAndSwapLong(this, ctlOffset,
857 >                                                   currentCtl, prevCtl)) {
858 >                    w.terminate = true;            // restore previous
859 >                    w.eventCount = ((int)currentCtl + EC_UNIT) & E_MASK;
860 >                    break;
861                  }
862              }
863          }
# Line 1966 | Line 1972 | public class ForkJoinPool extends Abstra
1972       * {@code isReleasable} must return {@code true} if blocking is
1973       * not necessary. Method {@code block} blocks the current thread
1974       * if necessary (perhaps internally invoking {@code isReleasable}
1975 <     * before actually blocking). The unusual methods in this API
1976 <     * accommodate synchronizers that may, but don't usually, block
1977 <     * for long periods. Similarly, they allow more efficient internal
1978 <     * handling of cases in which additional workers may be, but
1979 <     * usually are not, needed to ensure sufficient parallelism.
1980 <     * Toward this end, implementations of method {@code isReleasable}
1981 <     * must be amenable to repeated invocation.
1975 >     * before actually blocking). These actions are performed by any
1976 >     * thread invoking {@link ForkJoinPool#managedBlock}.  The
1977 >     * unusual methods in this API accommodate synchronizers that may,
1978 >     * but don't usually, block for long periods. Similarly, they
1979 >     * allow more efficient internal handling of cases in which
1980 >     * additional workers may be, but usually are not, needed to
1981 >     * ensure sufficient parallelism.  Toward this end,
1982 >     * implementations of method {@code isReleasable} must be amenable
1983 >     * to repeated invocation.
1984       *
1985       * <p>For example, here is a ManagedBlocker based on a
1986       * ReentrantLock:

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines