ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.41 by dl, Tue Aug 17 18:30:33 2010 UTC vs.
Revision 1.42 by dl, Sun Aug 29 23:34:46 2010 UTC

# Line 110 | Line 110 | public class ForkJoinWorkerThread extend
110       * idea).  (4) We bound the number of attempts to find work (see
111       * MAX_HELP_DEPTH) and fall back to suspending the worker and if
112       * necessary replacing it with a spare (see
113 <     * ForkJoinPool.tryAwaitJoin).
113 >     * ForkJoinPool.awaitJoin).
114       *
115       * Efficient implementation of these algorithms currently relies
116       * on an uncomfortable amount of "Unsafe" mechanics. To maintain
# Line 164 | Line 164 | public class ForkJoinWorkerThread extend
164      private static final int MAX_HELP_DEPTH = 8;
165  
166      /**
167     * The wakeup interval (in nanoseconds) for the oldest worker
168     * suspended as spare.  On each wakeup not signalled by a
169     * resumption, it may ask the pool to reduce the number of spares.
170     */
171    private static final long TRIM_RATE_NANOS =
172        5L * 1000L * 1000L * 1000L; // 5sec
173
174    /**
167       * Capacity of work-stealing queue array upon initialization.
168       * Must be a power of two. Initial size must be at least 4, but is
169       * padded to minimize cache effects.
# Line 226 | Line 218 | public class ForkJoinWorkerThread extend
218       * tracks if this worker is suspended as a spare, and if it was
219       * killed (trimmed) while suspended. However, "active" status is
220       * maintained separately and modified only in conjunction with
221 <     * CASes of the pool's runState (which are currently sadly manually
222 <     * inlined for performance.)
221 >     * CASes of the pool's runState (which are currently sadly
222 >     * manually inlined for performance.)  Accessed directly by pool
223 >     * to simplify checks for normal (zero) status.
224       */
225 <    private volatile int runState;
225 >    volatile int runState;
226  
227      private static final int TERMINATING = 0x01;
228      private static final int TERMINATED  = 0x02;
# Line 274 | Line 267 | public class ForkJoinWorkerThread extend
267      int lastEventCount;
268  
269      /**
270 <     * Encoded index and event count of next event waiter. Used only
271 <     * by ForkJoinPool for managing event waiters.
270 >     * Encoded index and event count of next event waiter. Accessed
271 >     * only by ForkJoinPool for managing event waiters.
272       */
273      volatile long nextWaiter;
274  
275      /**
276 <     * Number of times this thread suspended as spare
276 >     * Number of times this thread suspended as spare. Accessed only
277 >     * by pool.
278       */
279      int spareCount;
280  
281      /**
282 <     * Encoded index and count of next spare waiter. Used only
282 >     * Encoded index and count of next spare waiter. Accessed only
283       * by ForkJoinPool for managing spares.
284       */
285      volatile int nextSpare;
# Line 299 | Line 293 | public class ForkJoinWorkerThread extend
293  
294      /**
295       * The task most recently stolen from another worker (or
296 <     * submission queue).  Not volatile because always read/written in
297 <     * presence of related volatiles in those cases where it matters.
296 >     * submission queue).  Written only by current thread, but read by
297 >     * others.
298       */
299 <    private ForkJoinTask<?> currentSteal;
299 >    private volatile ForkJoinTask<?> currentSteal;
300  
301      /**
302       * Creates a ForkJoinWorkerThread operating in the given pool.
# Line 425 | Line 419 | public class ForkJoinWorkerThread extend
419       * Find and execute tasks and check status while running
420       */
421      private void mainLoop() {
422 <        int misses = 0; // track consecutive times failed to find work; max 2
422 >        boolean ran = false; // true if ran a task on last step
423          ForkJoinPool p = pool;
424          for (;;) {
425 <            p.preStep(this, misses);
425 >            p.preStep(this, ran);
426              if (runState != 0)
427                  break;
428 <            misses = ((tryExecSteal() || tryExecSubmission()) ? 0 :
435 <                      (misses < 2 ? misses + 1 : 2));
428 >            ran = tryExecSteal() || tryExecSubmission();
429          }
430      }
431  
# Line 443 | Line 436 | public class ForkJoinWorkerThread extend
436       */
437      private boolean tryExecSteal() {
438          ForkJoinTask<?> t;
439 <        if ((t  = scan()) != null) {
439 >        if ((t = scan()) != null) {
440              t.quietlyExec();
441 <            currentSteal = null;
441 >            UNSAFE.putOrderedObject(this, currentStealOffset, null);
442              if (sp != base)
443                  execLocalTasks();
444              return true;
# Line 462 | Line 455 | public class ForkJoinWorkerThread extend
455          ForkJoinPool p = pool;
456          while (p.hasQueuedSubmissions()) {
457              ForkJoinTask<?> t; int a;
458 <            if (active || // ugly/hacky: inline p.tryIncrementActiveCount
458 >            if (active || // inline p.tryIncrementActiveCount
459                  (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
460                                                     a = p.runState, a + 1))) {
461                  if ((t = p.pollSubmission()) != null) {
462 <                    currentSteal = t;
462 >                    UNSAFE.putOrderedObject(this, currentStealOffset, t);
463                      t.quietlyExec();
464 <                    currentSteal = null;
464 >                    UNSAFE.putOrderedObject(this, currentStealOffset, null);
465                      if (sp != base)
466                          execLocalTasks();
467                      return true;
# Line 627 | Line 620 | public class ForkJoinWorkerThread extend
620          if ((s = sp) != base && q != null &&
621              UNSAFE.compareAndSwapObject
622              (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) {
623 <            sp = s;
623 >            sp = s; // putOrderedInt may encourage more timely write
624              // UNSAFE.putOrderedInt(this, spOffset, s);
625              return true;
626          }
# Line 716 | Line 709 | public class ForkJoinWorkerThread extend
709              for (;;) {
710                  ForkJoinWorkerThread v = ws[k & mask];
711                  r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
712 <                if (v != null && v.base != v.sp) {
713 <                    ForkJoinTask<?>[] q; int b, a;
714 <                    if ((canSteal ||      // Ugly/hacky: inline
715 <                         (canSteal = active =  // p.tryIncrementActiveCount
716 <                          UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
717 <                                                   a = p.runState, a + 1))) &&
718 <                        (q = v.queue) != null && (b = v.base) != v.sp) {
719 <                        int i = (q.length - 1) & b;
720 <                        long u = (i << qShift) + qBase; // raw offset
721 <                        ForkJoinTask<?> t = q[i];
722 <                        if (v.base == b && t != null &&
712 >                ForkJoinTask<?>[] q; ForkJoinTask<?> t; int b, a;
713 >                if (v != null && (b = v.base) != v.sp &&
714 >                    (q = v.queue) != null) {
715 >                    int i = (q.length - 1) & b;
716 >                    long u = (i << qShift) + qBase; // raw offset
717 >                    int pid = poolIndex;
718 >                    if ((t = q[i]) != null) {
719 >                        if (!canSteal &&  // inline p.tryIncrementActiveCount
720 >                            UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
721 >                                                     a = p.runState, a + 1))
722 >                            canSteal = active = true;
723 >                        if (canSteal && v.base == b++ &&
724                              UNSAFE.compareAndSwapObject(q, u, t, null)) {
725 <                            int pid = poolIndex;
732 <                            currentSteal = t;
725 >                            v.base = b;
726                              v.stealHint = pid;
727 <                            v.base = b + 1;
727 >                            UNSAFE.putOrderedObject(this,
728 >                                                    currentStealOffset, t);
729                              seed = r;
730                              ++stealCount;
731                              return t;
# Line 762 | Line 756 | public class ForkJoinWorkerThread extend
756  
757      /**
758       * Sets state to TERMINATING. Does NOT unpark or interrupt
759 <     * to wake up if currently blocked.
759 >     * to wake up if currently blocked. Callers must do so if desired.
760       */
761      final void shutdown() {
762          for (;;) {
# Line 793 | Line 787 | public class ForkJoinWorkerThread extend
787  
788      /**
789       * If suspended, tries to set status to unsuspended.
790 +     * Does NOT wake up if blocked.
791       *
792       * @return true if successful
793       */
# Line 824 | Line 819 | public class ForkJoinWorkerThread extend
819          }
820          ForkJoinPool p = pool;
821          p.pushSpare(this);
827        lastEventCount = 0;         // reset upon resume
822          while ((runState & SUSPENDED) != 0) {
823              if (p.tryAccumulateStealCount(this)) {
830                boolean untimed = nextSpare != 0;
831                long startTime = untimed? 0 : System.nanoTime();
824                  interrupted();          // clear/ignore interrupts
825                  if ((runState & SUSPENDED) == 0)
826                      break;
827 <                if (untimed)     // untimed
836 <                    LockSupport.park(this);
837 <                else {
838 <                    LockSupport.parkNanos(this, TRIM_RATE_NANOS);
839 <                    if ((runState & SUSPENDED) == 0)
840 <                        break;
841 <                    if (System.nanoTime() - startTime >= TRIM_RATE_NANOS)
842 <                        p.tryShutdownSpare();
843 <                }
827 >                LockSupport.park(this);
828              }
829          }
830      }
# Line 927 | Line 911 | public class ForkJoinWorkerThread extend
911          ForkJoinTask<?> t = pollLocalTask();
912          if (t == null) {
913              t = scan();
914 <            currentSteal = null; // cannot retain/track/help
914 >            // cannot retain/track/help steal
915 >            UNSAFE.putOrderedObject(this, currentStealOffset, null);
916          }
917          return t;
918      }
# Line 982 | Line 967 | public class ForkJoinWorkerThread extend
967      }
968  
969      /**
970 <     * Tries to locate and help perform tasks for a stealer of the
971 <     * given task, or in turn one of its stealers.  Traces
972 <     * currentSteal->currentJoin links looking for a thread working on
973 <     * a descendant of the given task and with a non-empty queue to
974 <     * steal back and execute tasks from.
970 >     * Unless terminating, tries to locate and help perform tasks for
971 >     * a stealer of the given task, or in turn one of its stealers.
972 >     * Traces currentSteal->currentJoin links looking for a thread
973 >     * working on a descendant of the given task and with a non-empty
974 >     * queue to steal back and execute tasks from.
975       *
976 <     * The implementation is very branchy to cope with the potential
976 >     * The implementation is very branchy to cope with potential
977       * inconsistencies or loops encountering chains that are stale,
978       * unknown, or of length greater than MAX_HELP_DEPTH links.  All
979       * of these cases are dealt with by just returning back to the
# Line 998 | Line 983 | public class ForkJoinWorkerThread extend
983       * @param joinMe the task to join
984       */
985      final void helpJoinTask(ForkJoinTask<?> joinMe) {
986 <        ForkJoinWorkerThread[] ws = pool.workers;
987 <        int n; // need at least 2 workers
988 <        if (ws != null && (n = ws.length) > 1 && joinMe.status >= 0) {
989 <            ForkJoinTask<?> task = joinMe;        // base of chain
990 <            ForkJoinWorkerThread thread = this;   // thread with stolen task
991 <            for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
992 <                // Try to find v, the stealer of task, by first using hint
993 <                ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
994 <                if (v == null || v.currentSteal != task) {
995 <                    for (int j = 0; ; ++j) {      // search array
996 <                        if (j < n) {
997 <                            if ((v = ws[j]) != null) {
998 <                                if (task.status < 0)
999 <                                    return;       // stale or done
1000 <                                if (v.currentSteal == task) {
1001 <                                    thread.stealHint = j;
1002 <                                    break;        // save hint for next time
1003 <                                }
986 >        ForkJoinWorkerThread[] ws;
987 >        int n;
988 >        if (joinMe.status < 0)                // already done
989 >            return;
990 >        if ((runState & TERMINATING) != 0) {  // cancel if shutting down
991 >            joinMe.cancelIgnoringExceptions();
992 >            return;
993 >        }
994 >        if ((ws = pool.workers) == null || (n = ws.length) <= 1)
995 >            return;                           // need at least 2 workers
996 >
997 >        ForkJoinTask<?> task = joinMe;        // base of chain
998 >        ForkJoinWorkerThread thread = this;   // thread with stolen task
999 >        for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
1000 >            // Try to find v, the stealer of task, by first using hint
1001 >            ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
1002 >            if (v == null || v.currentSteal != task) {
1003 >                for (int j = 0; ; ++j) {      // search array
1004 >                    if (j < n) {
1005 >                        ForkJoinTask<?> vs;
1006 >                        if ((v = ws[j]) != null &&
1007 >                            (vs = v.currentSteal) != null) {
1008 >                            if (joinMe.status < 0 || task.status < 0)
1009 >                                return;       // stale or done
1010 >                            if (vs == task) {
1011 >                                thread.stealHint = j;
1012 >                                break;        // save hint for next time
1013                              }
1014                          }
1021                        else
1022                            return;               // no stealer
1015                      }
1016 +                    else
1017 +                        return;               // no stealer
1018                  }
1019 <                // Try to help v, using specialized form of deqTask
1020 <                int b;
1021 <                ForkJoinTask<?>[] q;
1022 <                while ((b = v.base) != v.sp && (q = v.queue) != null) {
1023 <                    int i = (q.length - 1) & b;
1024 <                    long u = (i << qShift) + qBase;
1025 <                    ForkJoinTask<?> t = q[i];
1026 <                    if (task.status < 0)
1027 <                        return;                   // stale or done
1028 <                    if (v.base == b) {
1029 <                        if (t == null)
1030 <                            return;               // producer stalled
1031 <                        if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
1032 <                            if (joinMe.status < 0) {
1033 <                                UNSAFE.putObjectVolatile(q, u, t);
1034 <                                return;           // back out on cancel
1035 <                            }
1036 <                            int pid = poolIndex;
1037 <                            ForkJoinTask<?> prevSteal = currentSteal;
1038 <                            currentSteal = t;
1045 <                            v.stealHint = pid;
1046 <                            v.base = b + 1;
1047 <                            t.quietlyExec();
1048 <                            currentSteal = prevSteal;
1049 <                        }
1019 >            }
1020 >            for (;;) { // Try to help v, using specialized form of deqTask
1021 >                if (joinMe.status < 0)
1022 >                    return;
1023 >                int b = v.base;
1024 >                ForkJoinTask<?>[] q = v.queue;
1025 >                if (b == v.sp || q == null)
1026 >                    break;
1027 >                int i = (q.length - 1) & b;
1028 >                long u = (i << qShift) + qBase;
1029 >                ForkJoinTask<?> t = q[i];
1030 >                int pid = poolIndex;
1031 >                ForkJoinTask<?> ps = currentSteal;
1032 >                if (task.status < 0)
1033 >                    return;                   // stale or done
1034 >                if (t != null && v.base == b++ &&
1035 >                    UNSAFE.compareAndSwapObject(q, u, t, null)) {
1036 >                    if (joinMe.status < 0) {
1037 >                        UNSAFE.putObjectVolatile(q, u, t);
1038 >                        return;               // back out on cancel
1039                      }
1040 <                    if (joinMe.status < 0)
1041 <                        return;
1040 >                    v.base = b;
1041 >                    v.stealHint = pid;
1042 >                    UNSAFE.putOrderedObject(this, currentStealOffset, t);
1043 >                    t.quietlyExec();
1044 >                    UNSAFE.putOrderedObject(this, currentStealOffset, ps);
1045                  }
1054                // Try to descend to find v's stealer
1055                ForkJoinTask<?> next = v.currentJoin;
1056                if (task.status < 0 || next == null || next == task ||
1057                    joinMe.status < 0)
1058                    return;
1059                task = next;
1060                thread = v;
1046              }
1047 +            // Try to descend to find v's stealer
1048 +            ForkJoinTask<?> next = v.currentJoin;
1049 +            if (task.status < 0 || next == null || next == task ||
1050 +                joinMe.status < 0)
1051 +                return;
1052 +            task = next;
1053 +            thread = v;
1054          }
1055      }
1056  
# Line 1117 | Line 1109 | public class ForkJoinWorkerThread extend
1109       * Runs tasks until {@code pool.isQuiescent()}.
1110       */
1111      final void helpQuiescePool() {
1112 +        ForkJoinTask<?> ps = currentSteal; // to restore below
1113          for (;;) {
1114              ForkJoinTask<?> t = pollLocalTask();
1115 <            if (t != null || (t = scan()) != null) {
1115 >            if (t != null || (t = scan()) != null)
1116                  t.quietlyExec();
1124                currentSteal = null;
1125            }
1117              else {
1118                  ForkJoinPool p = pool;
1119                  int a; // to inline CASes
# Line 1131 | Line 1122 | public class ForkJoinWorkerThread extend
1122                          (p, poolRunStateOffset, a = p.runState, a - 1))
1123                          continue;   // retry later
1124                      active = false; // inactivate
1125 +                    UNSAFE.putOrderedObject(this, currentStealOffset, ps);
1126                  }
1127                  if (p.isQuiescent()) {
1128                      active = true; // re-activate

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines