ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.38 by dl, Fri Jul 23 16:49:11 2010 UTC vs.
Revision 1.39 by dl, Sat Jul 24 20:28:18 2010 UTC

# Line 97 | Line 97 | public class ForkJoinWorkerThread extend
97       * technique for implementing efficient futures" SIGPLAN Notices,
98       * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
99       * in that: (1) We only maintain dependency links across workers
100 <     * upon steals, rather than maintain per-task bookkeeping.  This
101 <     * may require a linear scan of workers array to locate stealers,
102 <     * but usually doesn't because stealers leave hints (that may
103 <     * become stale/wrong) of where to locate the kathem. This
104 <     * isolates cost to when it is needed, rather than adding to
105 <     * per-task overhead.  (2) It is "shallow", ignoring nesting and
106 <     * potentially cyclic mutual steals.  (3) It is intentionally
107 <     * racy: field currentJoin is updated only while actively joining,
108 <     * which means that we could miss links in the chain during
109 <     * long-lived tasks, GC stalls etc.  (4) We bound the number of
110 <     * attempts to find work (see MAX_HELP_DEPTH) and fall back to
111 <     * suspending the worker and if necessary replacing it with a
112 <     * spare (see ForkJoinPool.tryAwaitJoin).
100 >     * upon steals, rather than use per-task bookkeeping.  This may
101 >     * require a linear scan of workers array to locate stealers, but
102 >     * usually doesn't because stealers leave hints (that may become
103 >     * stale/wrong) of where to locate them. This isolates cost to
104 >     * when it is needed, rather than adding to per-task overhead.
105 >     * (2) It is "shallow", ignoring nesting and potentially cyclic
106 >     * mutual steals.  (3) It is intentionally racy: field currentJoin
107 >     * is updated only while actively joining, which means that we
108 >     * miss links in the chain during long-lived tasks, GC stalls etc
109 >     * (which is OK since blocking in such cases is usually a good
110 >     * idea).  (4) We bound the number of attempts to find work (see
111 >     * MAX_HELP_DEPTH) and fall back to suspending the worker and if
112 >     * necessary replacing it with a spare (see
113 >     * ForkJoinPool.tryAwaitJoin).
114       *
115       * Efficient implementation of these algorithms currently relies
116       * on an uncomfortable amount of "Unsafe" mechanics. To maintain
# Line 256 | Line 257 | public class ForkJoinWorkerThread extend
257       */
258      private int seed;
259  
259
260      /**
261       * Activity status. When true, this worker is considered active.
262       * Accessed directly by pool.  Must be false upon construction.
# Line 265 | Line 265 | public class ForkJoinWorkerThread extend
265  
266      /**
267       * True if use local fifo, not default lifo, for local polling.
268 <     * Shadows value from ForkJoinPool, which resets it if changed
269 <     * pool-wide.
268 >     * Shadows value from ForkJoinPool.
269       */
270      private final boolean locallyFifo;
271  
# Line 321 | Line 320 | public class ForkJoinWorkerThread extend
320       */
321      final void start(int poolIndex, UncaughtExceptionHandler ueh) {
322          this.poolIndex = poolIndex;
323 +        setDaemon(true);
324          if (ueh != null)
325              setUncaughtExceptionHandler(ueh);
326        setDaemon(true);
326          start();
327      }
328  
# Line 761 | Line 760 | public class ForkJoinWorkerThread extend
760      }
761  
762      /**
763 <     * Instrumented version of park used by ForkJoinPool.awaitEvent
763 >     * Instrumented version of park used by ForkJoinPool.eventSync
764       */
765      final void doPark() {
766          ++parkCount;
# Line 802 | Line 801 | public class ForkJoinWorkerThread extend
801                                                s | SUSPENDED))
802                  break;
803          }
804 +        int pc = pool.parallelism;
805 +        pool.accumulateStealCount(this);
806          boolean timed;
807          long nanos;
808          long startTime;
809 <        if (poolIndex < pool.parallelism) {
809 >        if (poolIndex < pc) { // untimed wait for core threads
810              timed = false;
811              nanos = 0L;
812              startTime = 0L;
813          }
814 <        else {
814 >        else {                // timed wait for added threads
815              timed = true;
816              nanos = SPARE_KEEPALIVE_NANOS;
817              startTime = System.nanoTime();
818          }
818        pool.accumulateStealCount(this);
819          lastEventCount = 0;      // reset upon resume
820          interrupted();           // clear/ignore interrupts
821          while ((runState & SUSPENDED) != 0) {
# Line 905 | Line 905 | public class ForkJoinWorkerThread extend
905       * @return a task, if available
906       */
907      final ForkJoinTask<?> pollTask() {
908 <        ForkJoinTask<?> t;
909 <        return (t = pollLocalTask()) != null ? t : scan();
908 >        ForkJoinTask<?> t = pollLocalTask();
909 >        if (t == null) {
910 >            t = scan();
911 >            currentSteal = null; // cannot retain/track
912 >        }
913 >        return t;
914      }
915  
916      /**
# Line 920 | Line 924 | public class ForkJoinWorkerThread extend
924       * @param joinMe the task to join
925       * @return task status on exit
926       */
927 <    final int joinTask(ForkJoinTask<?> joinMe) {
927 >     final int joinTask(ForkJoinTask<?> joinMe) {
928          int stat;
929          ForkJoinTask<?> prevJoin = currentJoin;
930 <        currentJoin = joinMe;
930 >        // Only written by this thread; only need ordered store
931 >        UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
932          if ((stat = joinMe.status) >= 0 &&
933              (sp == base || (stat = localHelpJoinTask(joinMe)) >= 0)) {
934 <            ForkJoinPool p = pool;
935 <            int helpRetries = 2;     // initial patience values
931 <            int awaitRetries = -1;   // -1 is sentinel for replace-check only
932 <            do {
933 <                helpJoinTask(joinMe, helpRetries);
934 >            for (int retries = 0; ; ++retries) {
935 >                helpJoinTask(joinMe, retries);
936                  if ((stat = joinMe.status) < 0)
937                      break;
938 <                boolean busy = p.tryAwaitJoin(joinMe, awaitRetries);
938 >                pool.tryAwaitJoin(joinMe, retries);
939                  if ((stat = joinMe.status) < 0)
940                      break;
939                if (awaitRetries == -1)
940                    awaitRetries = 0;
941                else if (busy)
942                    ++awaitRetries;
943                if (helpRetries < p.parallelism)
944                    helpRetries <<= 1;
941                  Thread.yield(); // tame unbounded loop
942 <            } while (joinMe.status >= 0);
942 >            }
943          }
944 <        currentJoin = prevJoin;
944 >        UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
945          return stat;
946      }
947  
# Line 997 | Line 993 | public class ForkJoinWorkerThread extend
993       * caller anyway) to slightly simplify control paths.
994       *
995       * @param joinMe the task to join
996 +     * @param rescans the number of times to recheck for work
997       */
998 <    final void helpJoinTask(ForkJoinTask<?> joinMe, int retries) {
998 >    private void helpJoinTask(ForkJoinTask<?> joinMe, int rescans) {
999          ForkJoinWorkerThread[] ws = pool.workers;
1000          int n;
1001          if (ws == null || (n = ws.length) <= 1)
1002              return;                   // need at least 2 workers
1003 <
1007 <        restart:while (joinMe.status >= 0 && --retries >= 0) {
1003 >        restart:while (rescans-- >= 0 && joinMe.status >= 0) {
1004              ForkJoinTask<?> task = joinMe;        // base of chain
1005              ForkJoinWorkerThread thread = this;   // thread with stolen task
1006              for (int depth = 0; depth < MAX_HELP_DEPTH; ++depth) {
# Line 1029 | Line 1025 | public class ForkJoinWorkerThread extend
1025                      ForkJoinTask<?> t = q[i];
1026                      if (task.status < 0)          // stale
1027                          continue restart;
1028 <                    if (v.base == b) {            // recheck after reading t
1029 <                        if (t == null)            // producer stalled
1030 <                            continue restart;     // retry via restart
1035 <                        if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
1028 >                    if (t != null) {
1029 >                        if (v.base == b &&
1030 >                            UNSAFE.compareAndSwapObject(q, u, t, null)) {
1031                              if (joinMe.status < 0) {
1032                                  UNSAFE.putObjectVolatile(q, u, t);
1033                                  return;           // back out on cancel
# Line 1045 | Line 1040 | public class ForkJoinWorkerThread extend
1040                              currentSteal = prevSteal;
1041                          }
1042                      }
1043 +                    else if (v.base == b)          // producer stalled
1044 +                        continue restart;          // retry via restart
1045                      if (joinMe.status < 0)
1046                          return;
1047                  }
1048                  // Try to descend to find v's stealer
1049                  ForkJoinTask<?> next = v.currentJoin;
1050 <                if (next == null || task.status < 0)
1050 >                if (next == null || next == task || task.status < 0)
1051                      continue restart;             // no descendent or stale
1052                  if (joinMe.status < 0)
1053                      return;
# Line 1141 | Line 1138 | public class ForkJoinWorkerThread extend
1138      private static final sun.misc.Unsafe UNSAFE = getUnsafe();
1139      private static final long runStateOffset =
1140          objectFieldOffset("runState", ForkJoinWorkerThread.class);
1141 +    private static final long currentJoinOffset =
1142 +        objectFieldOffset("currentJoin", ForkJoinWorkerThread.class);
1143      private static final long qBase =
1144          UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
1145      private static final int qShift;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines