ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinWorkerThread.java (file contents):
Revision 1.18 by dl, Wed Aug 11 18:45:45 2010 UTC vs.
Revision 1.19 by dl, Tue Aug 17 18:31:59 2010 UTC

# Line 162 | Line 162 | public class ForkJoinWorkerThread extend
162      private static final int MAX_HELP_DEPTH = 8;
163  
164      /**
165 <     * The wakeup interval (in nanoseconds) for the first worker
165 >     * The wakeup interval (in nanoseconds) for the oldest worker
166       * suspended as spare.  On each wakeup not signalled by a
167       * resumption, it may ask the pool to reduce the number of spares.
168       */
169 <    private static final long TRIM_RATE_NANOS = 200L * 1000L * 1000L;
169 >    private static final long TRIM_RATE_NANOS =
170 >        5L * 1000L * 1000L * 1000L; // 5sec
171  
172      /**
173       * Capacity of work-stealing queue array upon initialization.
# Line 222 | Line 223 | public class ForkJoinWorkerThread extend
223       * Run state of this worker. In addition to the usual run levels,
224       * tracks if this worker is suspended as a spare, and if it was
225       * killed (trimmed) while suspended. However, "active" status is
226 <     * maintained separately.
226 >     * maintained separately and modified only in conjunction with
227 >     * CASes of the pool's runState (which are currently sadly manually
228 >     * inlined for performance.)
229       */
230      private volatile int runState;
231  
# Line 378 | Line 381 | public class ForkJoinWorkerThread extend
381       */
382      protected void onTermination(Throwable exception) {
383          try {
384 +            ForkJoinPool p = pool;
385 +            if (active) {
386 +                int a; // inline p.tryDecrementActiveCount
387 +                active = false;
388 +                do {} while(!UNSAFE.compareAndSwapInt
389 +                            (p, poolRunStateOffset, a = p.runState, a - 1));
390 +            }
391              cancelTasks();
382            while (active)              // force inactive
383                active = !pool.tryDecrementActiveCount();
392              setTerminated();
393 <            pool.workerTerminated(this);
393 >            p.workerTerminated(this);
394          } catch (Throwable ex) {        // Shouldn't ever happen
395              if (exception == null)      // but if so, at least rethrown
396                  exception = ex;
# Line 451 | Line 459 | public class ForkJoinWorkerThread extend
459      private boolean tryExecSubmission() {
460          ForkJoinPool p = pool;
461          while (p.hasQueuedSubmissions()) {
462 <            ForkJoinTask<?> t;
463 <            if (active || (active = p.tryIncrementActiveCount())) {
462 >            ForkJoinTask<?> t; int a;
463 >            if (active || // ugly/hacky: inline p.tryIncrementActiveCount
464 >                (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
465 >                                                   a = p.runState, a + 1))) {
466                  if ((t = p.pollSubmission()) != null) {
467                      currentSteal = t;
468                      t.quietlyExec();
# Line 705 | Line 715 | public class ForkJoinWorkerThread extend
715                  ForkJoinWorkerThread v = ws[k & mask];
716                  r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
717                  if (v != null && v.base != v.sp) {
718 <                    ForkJoinTask<?>[] q; int b;
719 <                    if ((canSteal ||       // ensure active status
720 <                         (canSteal = active = p.tryIncrementActiveCount())) &&
718 >                    ForkJoinTask<?>[] q; int b, a;
719 >                    if ((canSteal ||      // Ugly/hacky: inline
720 >                         (canSteal = active =  // p.tryIncrementActiveCount
721 >                          UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
722 >                                                   a = p.runState, a + 1))) &&
723                          (q = v.queue) != null && (b = v.base) != v.sp) {
724                          int i = (q.length - 1) & b;
725                          long u = (i << qShift) + qBase; // raw offset
# Line 747 | Line 759 | public class ForkJoinWorkerThread extend
759      final boolean isTrimmed()     { return (runState & TRIMMED) != 0; }
760  
761      /**
762 <     * Sets state to TERMINATING, also, unless "quiet", unparking if
763 <     * not already terminated
752 <     *
753 <     * @param quiet don't unpark (used for faster status updates on
754 <     * pool termination)
762 >     * Sets state to TERMINATING. Does NOT unpark or interrupt
763 >     * to wake up if currently blocked.
764       */
765 <    final void shutdown(boolean quiet) {
765 >    final void shutdown() {
766          for (;;) {
767              int s = runState;
768              if ((s & (TERMINATING|TERMINATED)) != 0)
# Line 768 | Line 777 | public class ForkJoinWorkerThread extend
777                                                s | TERMINATING))
778                  break;
779          }
771        if (!quiet && (runState & TERMINATED) != 0)
772            LockSupport.unpark(this);
780      }
781  
782      /**
# Line 783 | Line 790 | public class ForkJoinWorkerThread extend
790      }
791  
792      /**
793 <     * If suspended, tries to set status to unsuspended and unparks.
793 >     * If suspended, tries to set status to unsuspended.
794       *
795       * @return true if successful
796       */
# Line 800 | Line 807 | public class ForkJoinWorkerThread extend
807      /**
808       * Sets suspended status and blocks as spare until resumed
809       * or shutdown.
803     * @returns true if still running on exit
810       */
811 <    final boolean suspendAsSpare() {
806 <        lastEventCount = 0;         // reset upon resume
811 >    final void suspendAsSpare() {
812          for (;;) {                  // set suspended unless terminating
813              int s = runState;
814              if ((s & TERMINATING) != 0) { // must kill
815                  if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
816                                               s | (TRIMMED | TERMINATING)))
817 <                    return false;
817 >                    return;
818              }
819              else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
820                                                s | SUSPENDED))
# Line 817 | Line 822 | public class ForkJoinWorkerThread extend
822          }
823          ForkJoinPool p = pool;
824          p.pushSpare(this);
825 +        lastEventCount = 0;         // reset upon resume
826          while ((runState & SUSPENDED) != 0) {
827 <            if (!p.tryAccumulateStealCount(this))
828 <                continue;
829 <            interrupted();          // clear/ignore interrupts
830 <            if ((runState & SUSPENDED) == 0)
825 <                break;
826 <            if (nextSpare != 0)     // untimed
827 <                LockSupport.park(this);
828 <            else {
829 <                long startTime = System.nanoTime();
830 <                LockSupport.parkNanos(this, TRIM_RATE_NANOS);
827 >            if (p.tryAccumulateStealCount(this)) {
828 >                boolean untimed = nextSpare != 0;
829 >                long startTime = untimed? 0 : System.nanoTime();
830 >                interrupted();          // clear/ignore interrupts
831                  if ((runState & SUSPENDED) == 0)
832                      break;
833 <                long now = System.nanoTime();
834 <                if (now - startTime >= TRIM_RATE_NANOS)
835 <                    pool.tryTrimSpare(now);
833 >                if (untimed)     // untimed
834 >                    LockSupport.park(this);
835 >                else {
836 >                    LockSupport.parkNanos(this, TRIM_RATE_NANOS);
837 >                    if ((runState & SUSPENDED) == 0)
838 >                        break;
839 >                    if (System.nanoTime() - startTime >= TRIM_RATE_NANOS)
840 >                        p.tryShutdownSpare();
841 >                }
842              }
843          }
838        return runState == 0;
844      }
845  
846      // Misc support methods for ForkJoinPool
# Line 900 | Line 905 | public class ForkJoinWorkerThread extend
905       * @return a task, if available
906       */
907      final ForkJoinTask<?> pollLocalTask() {
908 +        ForkJoinPool p = pool;
909          while (sp != base) {
910 <            if (active || (active = pool.tryIncrementActiveCount()))
910 >            int a; // inline p.tryIncrementActiveCount
911 >            if (active ||
912 >                (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
913 >                                                   a = p.runState, a + 1)))
914                  return locallyFifo? locallyDeqTask() : popTask();
915          }
916          return null;
# Line 1114 | Line 1123 | public class ForkJoinWorkerThread extend
1123              }
1124              else {
1125                  ForkJoinPool p = pool;
1126 +                int a; // to inline CASes
1127                  if (active) {
1128 <                    if (!p.tryDecrementActiveCount())
1128 >                    if (!UNSAFE.compareAndSwapInt
1129 >                        (p, poolRunStateOffset, a = p.runState, a - 1))
1130                          continue;   // retry later
1131                      active = false; // inactivate
1132                  }
1133                  if (p.isQuiescent()) {
1134                      active = true; // re-activate
1135 <                    do {} while (!p.tryIncrementActiveCount());
1135 >                    do {} while(!UNSAFE.compareAndSwapInt
1136 >                                (p, poolRunStateOffset, a = p.runState, a+1));
1137                      return;
1138                  }
1139              }
1140          }
1141      }
1142  
1131
1143      // Unsafe mechanics
1144  
1145      private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
# Line 1142 | Line 1153 | public class ForkJoinWorkerThread extend
1153          objectFieldOffset("currentSteal", ForkJoinWorkerThread.class);
1154      private static final long qBase =
1155          UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
1156 +    private static final long poolRunStateOffset = // to inline CAS
1157 +        objectFieldOffset("runState", ForkJoinPool.class);
1158  
1159      private static final int qShift;
1160  
# Line 1162 | Line 1175 | public class ForkJoinWorkerThread extend
1175              throw error;
1176          }
1177      }
1165
1178   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines