ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.91 by dl, Tue Feb 22 00:39:31 2011 UTC vs.
Revision 1.94 by dl, Tue Mar 1 10:59:04 2011 UTC

# Line 478 | Line 478 | public class ForkJoinPool extends Abstra
478       * negative, there is at least one waiting worker, and when e is
479       * negative, the pool is terminating.  To deal with these possibly
480       * negative fields, we use casts in and out of "short" and/or
481 <     * signed shifts to maintain signedness.  Note: AC_SHIFT is
482 <     * redundantly declared in ForkJoinWorkerThread in order to
483 <     * integrate a surplus-threads check.
481 >     * signed shifts to maintain signedness.
482       */
483      volatile long ctl;
484  
# Line 754 | Line 752 | public class ForkJoinPool extends Abstra
752      }
753  
754      /**
755 <     * Tries to enqueue worker in wait queue and await change in
756 <     * worker's eventCount.  Before blocking, rescans queues to avoid
757 <     * missed signals.  If the pool is quiescent, possibly terminates
758 <     * worker upon exit.
755 >     * Tries to enqueue worker w in wait queue and await change in
756 >     * worker's eventCount.  If the pool is quiescent, possibly
757 >     * terminates worker upon exit.  Otherwise, before blocking,
758 >     * rescans queues to avoid missed signals.  Upon finding work,
759 >     * releases at least one worker (which may be the current
760 >     * worker). Rescans restart upon detected staleness or failure to
761 >     * release due to contention.
762       *
763       * @param w the calling worker
764       * @param c the ctl value on entry
# Line 765 | Line 766 | public class ForkJoinPool extends Abstra
766       */
767      private boolean tryAwaitWork(ForkJoinWorkerThread w, long c) {
768          int v = w.eventCount;
769 <        w.nextWait = (int)c;                       // w's successor record
769 >        w.nextWait = (int)c;                      // w's successor record
770          long nc = (long)(v & E_MASK) | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
771          if (ctl != c || !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
772 <            long d = ctl; // return true if lost to a deq, to force rescan
772 >            long d = ctl; // return true if lost to a deq, to force scan
773              return (int)d != (int)c && ((d - c) & AC_MASK) >= 0L;
774          }
775 <        boolean rescanned = false;
776 <        for (int sc;;) {
775 >        for (int sc = w.stealCount; sc != 0;) {   // accumulate stealCount
776 >            long s = stealCount;
777 >            if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s + sc))
778 >                sc = w.stealCount = 0;
779 >            else if (w.eventCount != v)
780 >                return true;                      // update next time
781 >        }
782 >        if (parallelism + (int)(nc >> AC_SHIFT) == 0 &&
783 >            blockedCount == 0 && quiescerCount == 0)
784 >            idleAwaitWork(w, nc, c, v);           // quiescent
785 >        for (boolean rescanned = false;;) {
786              if (w.eventCount != v)
787                  return true;
788 <            if ((sc = w.stealCount) != 0) {
779 <                long s = stealCount;               // accumulate stealCount
780 <                if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s+sc))
781 <                    w.stealCount = 0;
782 <            }
783 <            else if (!rescanned) {
788 >            if (!rescanned) {
789                  int g = scanGuard, m = g & SMASK;
790                  ForkJoinWorkerThread[] ws = workers;
791                  if (ws != null && m < ws.length) {
# Line 804 | Line 809 | public class ForkJoinPool extends Abstra
809                  else
810                      Thread.interrupted();          // clear before park
811              }
807            else if (parallelism + (int)(ctl >> AC_SHIFT) == 0 &&
808                     blockedCount == 0 && quiescerCount == 0)
809                idleAwaitWork(w, v);               // quiescent -- maybe shrink
812              else {
813                  w.parked = true;                   // must recheck
814                  if (w.eventCount != v) {
# Line 820 | Line 822 | public class ForkJoinPool extends Abstra
822      }
823  
824      /**
825 <     * If pool is quiescent, checks for termination, and waits for
826 <     * event signal for up to SHRINK_RATE nanosecs. On timeout, if ctl
827 <     * has not changed, terminates the worker. Upon its termination
828 <     * (see deregisterWorker), it may wake up another worker to
829 <     * possibly repeat this process.
825 >     * If inactivating worker w has caused pool to become
826 >     * quiescent, check for pool termination, and wait for event
827 >     * for up to SHRINK_RATE nanosecs (rescans are unnecessary in
828 >     * this case because quiescence reflects consensus about lack
829 >     * of work). On timeout, if ctl has not changed, terminate the
830 >     * worker. Upon its termination (see deregisterWorker), it may
831 >     * wake up another worker to possibly repeat this process.
832       *
833       * @param w the calling worker
834 <     * @param v the eventCount w must wait until changed
835 <     */
836 <    private void idleAwaitWork(ForkJoinWorkerThread w, int v) {
837 <        ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs
838 <        if (shutdown)
839 <            tryTerminate(false);
840 <        long c = ctl;
841 <        long nc = (((c & (AC_MASK|TC_MASK)) + AC_UNIT) |
842 <                   (long)(w.nextWait & E_MASK)); // ctl value to release w
843 <        if (w.eventCount == v &&
844 <            parallelism + (int)(c >> AC_SHIFT) == 0 &&
845 <            blockedCount == 0 && quiescerCount == 0) {
842 <            long startTime = System.nanoTime();
843 <            Thread.interrupted();
844 <            if (w.eventCount == v) {
834 >     * @param currentCtl the ctl value after enqueuing w
835 >     * @param prevCtl the ctl value if w terminated
836 >     * @param v the eventCount w awaits change
837 >     */
838 >    private void idleAwaitWork(ForkJoinWorkerThread w, long currentCtl,
839 >                               long prevCtl, int v) {
840 >        if (w.eventCount == v) {
841 >            if (shutdown)
842 >                tryTerminate(false);
843 >            ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs
844 >            while (ctl == currentCtl) {
845 >                long startTime = System.nanoTime();
846                  w.parked = true;
847 <                if (w.eventCount == v)
847 >                if (w.eventCount == v)             // must recheck
848                      LockSupport.parkNanos(this, SHRINK_RATE);
849                  w.parked = false;
850 <                if (w.eventCount == v && ctl == c &&
851 <                    System.nanoTime() - startTime >= SHRINK_RATE &&
852 <                    UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
853 <                    w.terminate = true;
854 <                    w.eventCount = ((int)c + EC_UNIT) & E_MASK;
850 >                if (w.eventCount != v)
851 >                    break;
852 >                else if (System.nanoTime() - startTime < SHRINK_RATE)
853 >                    Thread.interrupted();          // spurious wakeup
854 >                else if (UNSAFE.compareAndSwapLong(this, ctlOffset,
855 >                                                   currentCtl, prevCtl)) {
856 >                    w.terminate = true;            // restore previous
857 >                    w.eventCount = ((int)currentCtl + EC_UNIT) & E_MASK;
858 >                    break;
859                  }
860              }
861          }
# Line 1728 | Line 1733 | public class ForkJoinPool extends Abstra
1733  
1734      /**
1735       * Returns an estimate of the number of tasks submitted to this
1736 <     * pool that have not yet begun executing.  This meThod may take
1736 >     * pool that have not yet begun executing.  This method may take
1737       * time proportional to the number of submissions.
1738       *
1739       * @return the number of queued submissions
# Line 1965 | Line 1970 | public class ForkJoinPool extends Abstra
1970       * {@code isReleasable} must return {@code true} if blocking is
1971       * not necessary. Method {@code block} blocks the current thread
1972       * if necessary (perhaps internally invoking {@code isReleasable}
1973 <     * before actually blocking). The unusual methods in this API
1974 <     * accommodate synchronizers that may, but don't usually, block
1975 <     * for long periods. Similarly, they allow more efficient internal
1976 <     * handling of cases in which additional workers may be, but
1977 <     * usually are not, needed to ensure sufficient parallelism.
1978 <     * Toward this end, implementations of method {@code isReleasable}
1979 <     * must be amenable to repeated invocation.
1973 >     * before actually blocking). These actions are performed by any
1974 >     * thread invoking {@link ForkJoinPool#managedBlock}.  The
1975 >     * unusual methods in this API accommodate synchronizers that may,
1976 >     * but don't usually, block for long periods. Similarly, they
1977 >     * allow more efficient internal handling of cases in which
1978 >     * additional workers may be, but usually are not, needed to
1979 >     * ensure sufficient parallelism.  Toward this end,
1980 >     * implementations of method {@code isReleasable} must be amenable
1981 >     * to repeated invocation.
1982       *
1983       * <p>For example, here is a ManagedBlocker based on a
1984       * ReentrantLock:

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines