ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.40 by dl, Wed Aug 11 18:45:12 2010 UTC vs.
Revision 1.41 by dl, Tue Aug 17 18:30:33 2010 UTC

# Line 164 | Line 164 | public class ForkJoinWorkerThread extend
164      private static final int MAX_HELP_DEPTH = 8;
165  
166      /**
167 <     * The wakeup interval (in nanoseconds) for the first worker
167 >     * The wakeup interval (in nanoseconds) for the oldest worker
168       * suspended as spare.  On each wakeup not signalled by a
169       * resumption, it may ask the pool to reduce the number of spares.
170       */
171 <    private static final long TRIM_RATE_NANOS = 200L * 1000L * 1000L;
171 >    private static final long TRIM_RATE_NANOS =
172 >        5L * 1000L * 1000L * 1000L; // 5sec
173  
174      /**
175       * Capacity of work-stealing queue array upon initialization.
# Line 224 | Line 225 | public class ForkJoinWorkerThread extend
225       * Run state of this worker. In addition to the usual run levels,
226       * tracks if this worker is suspended as a spare, and if it was
227       * killed (trimmed) while suspended. However, "active" status is
228 <     * maintained separately.
228 >     * maintained separately and modified only in conjunction with
229 >     * CASes of the pool's runState (which are currently sadly manually
230 >     * inlined for performance.)
231       */
232      private volatile int runState;
233  
# Line 380 | Line 383 | public class ForkJoinWorkerThread extend
383       */
384      protected void onTermination(Throwable exception) {
385          try {
386 +            ForkJoinPool p = pool;
387 +            if (active) {
388 +                int a; // inline p.tryDecrementActiveCount
389 +                active = false;
390 +                do {} while(!UNSAFE.compareAndSwapInt
391 +                            (p, poolRunStateOffset, a = p.runState, a - 1));
392 +            }
393              cancelTasks();
384            while (active)              // force inactive
385                active = !pool.tryDecrementActiveCount();
394              setTerminated();
395 <            pool.workerTerminated(this);
395 >            p.workerTerminated(this);
396          } catch (Throwable ex) {        // Shouldn't ever happen
397              if (exception == null)      // but if so, at least rethrown
398                  exception = ex;
# Line 453 | Line 461 | public class ForkJoinWorkerThread extend
461      private boolean tryExecSubmission() {
462          ForkJoinPool p = pool;
463          while (p.hasQueuedSubmissions()) {
464 <            ForkJoinTask<?> t;
465 <            if (active || (active = p.tryIncrementActiveCount())) {
464 >            ForkJoinTask<?> t; int a;
465 >            if (active || // ugly/hacky: inline p.tryIncrementActiveCount
466 >                (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
467 >                                                   a = p.runState, a + 1))) {
468                  if ((t = p.pollSubmission()) != null) {
469                      currentSteal = t;
470                      t.quietlyExec();
# Line 707 | Line 717 | public class ForkJoinWorkerThread extend
717                  ForkJoinWorkerThread v = ws[k & mask];
718                  r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
719                  if (v != null && v.base != v.sp) {
720 <                    ForkJoinTask<?>[] q; int b;
721 <                    if ((canSteal ||       // ensure active status
722 <                         (canSteal = active = p.tryIncrementActiveCount())) &&
720 >                    ForkJoinTask<?>[] q; int b, a;
721 >                    if ((canSteal ||      // Ugly/hacky: inline
722 >                         (canSteal = active =  // p.tryIncrementActiveCount
723 >                          UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
724 >                                                   a = p.runState, a + 1))) &&
725                          (q = v.queue) != null && (b = v.base) != v.sp) {
726                          int i = (q.length - 1) & b;
727                          long u = (i << qShift) + qBase; // raw offset
# Line 749 | Line 761 | public class ForkJoinWorkerThread extend
761      final boolean isTrimmed()     { return (runState & TRIMMED) != 0; }
762  
763      /**
764 <     * Sets state to TERMINATING, also, unless "quiet", unparking if
765 <     * not already terminated
754 <     *
755 <     * @param quiet don't unpark (used for faster status updates on
756 <     * pool termination)
764 >     * Sets state to TERMINATING. Does NOT unpark or interrupt
765 >     * to wake up if currently blocked.
766       */
767 <    final void shutdown(boolean quiet) {
767 >    final void shutdown() {
768          for (;;) {
769              int s = runState;
770              if ((s & (TERMINATING|TERMINATED)) != 0)
# Line 770 | Line 779 | public class ForkJoinWorkerThread extend
779                                                s | TERMINATING))
780                  break;
781          }
773        if (!quiet && (runState & TERMINATED) != 0)
774            LockSupport.unpark(this);
782      }
783  
784      /**
# Line 785 | Line 792 | public class ForkJoinWorkerThread extend
792      }
793  
794      /**
795 <     * If suspended, tries to set status to unsuspended and unparks.
795 >     * If suspended, tries to set status to unsuspended.
796       *
797       * @return true if successful
798       */
# Line 802 | Line 809 | public class ForkJoinWorkerThread extend
809      /**
810       * Sets suspended status and blocks as spare until resumed
811       * or shutdown.
805     * @returns true if still running on exit
812       */
813 <    final boolean suspendAsSpare() {
808 <        lastEventCount = 0;         // reset upon resume
813 >    final void suspendAsSpare() {
814          for (;;) {                  // set suspended unless terminating
815              int s = runState;
816              if ((s & TERMINATING) != 0) { // must kill
817                  if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
818                                               s | (TRIMMED | TERMINATING)))
819 <                    return false;
819 >                    return;
820              }
821              else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
822                                                s | SUSPENDED))
# Line 819 | Line 824 | public class ForkJoinWorkerThread extend
824          }
825          ForkJoinPool p = pool;
826          p.pushSpare(this);
827 +        lastEventCount = 0;         // reset upon resume
828          while ((runState & SUSPENDED) != 0) {
829 <            if (!p.tryAccumulateStealCount(this))
830 <                continue;
831 <            interrupted();          // clear/ignore interrupts
832 <            if ((runState & SUSPENDED) == 0)
827 <                break;
828 <            if (nextSpare != 0)     // untimed
829 <                LockSupport.park(this);
830 <            else {
831 <                long startTime = System.nanoTime();
832 <                LockSupport.parkNanos(this, TRIM_RATE_NANOS);
829 >            if (p.tryAccumulateStealCount(this)) {
830 >                boolean untimed = nextSpare != 0;
831 >                long startTime = untimed? 0 : System.nanoTime();
832 >                interrupted();          // clear/ignore interrupts
833                  if ((runState & SUSPENDED) == 0)
834                      break;
835 <                long now = System.nanoTime();
836 <                if (now - startTime >= TRIM_RATE_NANOS)
837 <                    pool.tryTrimSpare(now);
835 >                if (untimed)     // untimed
836 >                    LockSupport.park(this);
837 >                else {
838 >                    LockSupport.parkNanos(this, TRIM_RATE_NANOS);
839 >                    if ((runState & SUSPENDED) == 0)
840 >                        break;
841 >                    if (System.nanoTime() - startTime >= TRIM_RATE_NANOS)
842 >                        p.tryShutdownSpare();
843 >                }
844              }
845          }
840        return runState == 0;
846      }
847  
848      // Misc support methods for ForkJoinPool
# Line 902 | Line 907 | public class ForkJoinWorkerThread extend
907       * @return a task, if available
908       */
909      final ForkJoinTask<?> pollLocalTask() {
910 +        ForkJoinPool p = pool;
911          while (sp != base) {
912 <            if (active || (active = pool.tryIncrementActiveCount()))
912 >            int a; // inline p.tryIncrementActiveCount
913 >            if (active ||
914 >                (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
915 >                                                   a = p.runState, a + 1)))
916                  return locallyFifo? locallyDeqTask() : popTask();
917          }
918          return null;
# Line 1116 | Line 1125 | public class ForkJoinWorkerThread extend
1125              }
1126              else {
1127                  ForkJoinPool p = pool;
1128 +                int a; // to inline CASes
1129                  if (active) {
1130 <                    if (!p.tryDecrementActiveCount())
1130 >                    if (!UNSAFE.compareAndSwapInt
1131 >                        (p, poolRunStateOffset, a = p.runState, a - 1))
1132                          continue;   // retry later
1133                      active = false; // inactivate
1134                  }
1135                  if (p.isQuiescent()) {
1136                      active = true; // re-activate
1137 <                    do {} while (!p.tryIncrementActiveCount());
1137 >                    do {} while(!UNSAFE.compareAndSwapInt
1138 >                                (p, poolRunStateOffset, a = p.runState, a+1));
1139                      return;
1140                  }
1141              }
# Line 1143 | Line 1155 | public class ForkJoinWorkerThread extend
1155          objectFieldOffset("currentSteal", ForkJoinWorkerThread.class);
1156      private static final long qBase =
1157          UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
1158 +    private static final long poolRunStateOffset = // to inline CAS
1159 +        objectFieldOffset("runState", ForkJoinPool.class);
1160  
1161      private static final int qShift;
1162  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines