ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinWorkerThread.java (file contents):
Revision 1.19 by dl, Tue Aug 17 18:31:59 2010 UTC vs.
Revision 1.20 by dl, Sun Aug 29 23:35:07 2010 UTC

# Line 108 | Line 108 | public class ForkJoinWorkerThread extend
108       * idea).  (4) We bound the number of attempts to find work (see
109       * MAX_HELP_DEPTH) and fall back to suspending the worker and if
110       * necessary replacing it with a spare (see
111 <     * ForkJoinPool.tryAwaitJoin).
111 >     * ForkJoinPool.awaitJoin).
112       *
113       * Efficient implementation of these algorithms currently relies
114       * on an uncomfortable amount of "Unsafe" mechanics. To maintain
# Line 162 | Line 162 | public class ForkJoinWorkerThread extend
162      private static final int MAX_HELP_DEPTH = 8;
163  
164      /**
165     * The wakeup interval (in nanoseconds) for the oldest worker
166     * suspended as spare.  On each wakeup not signalled by a
167     * resumption, it may ask the pool to reduce the number of spares.
168     */
169    private static final long TRIM_RATE_NANOS =
170        5L * 1000L * 1000L * 1000L; // 5sec
171
172    /**
165       * Capacity of work-stealing queue array upon initialization.
166       * Must be a power of two. Initial size must be at least 4, but is
167       * padded to minimize cache effects.
# Line 224 | Line 216 | public class ForkJoinWorkerThread extend
216       * tracks if this worker is suspended as a spare, and if it was
217       * killed (trimmed) while suspended. However, "active" status is
218       * maintained separately and modified only in conjunction with
219 <     * CASes of the pool's runState (which are currently sadly manually
220 <     * inlined for performance.)
219 >     * CASes of the pool's runState (which are currently sadly
220 >     * manually inlined for performance.)  Accessed directly by pool
221 >     * to simplify checks for normal (zero) status.
222       */
223 <    private volatile int runState;
223 >    volatile int runState;
224  
225      private static final int TERMINATING = 0x01;
226      private static final int TERMINATED  = 0x02;
# Line 272 | Line 265 | public class ForkJoinWorkerThread extend
265      int lastEventCount;
266  
267      /**
268 <     * Encoded index and event count of next event waiter. Used only
269 <     * by ForkJoinPool for managing event waiters.
268 >     * Encoded index and event count of next event waiter. Accessed
269 >     * only by ForkJoinPool for managing event waiters.
270       */
271      volatile long nextWaiter;
272  
273      /**
274 <     * Number of times this thread suspended as spare
274 >     * Number of times this thread suspended as spare. Accessed only
275 >     * by pool.
276       */
277      int spareCount;
278  
279      /**
280 <     * Encoded index and count of next spare waiter. Used only
280 >     * Encoded index and count of next spare waiter. Accessed only
281       * by ForkJoinPool for managing spares.
282       */
283      volatile int nextSpare;
# Line 297 | Line 291 | public class ForkJoinWorkerThread extend
291  
292      /**
293       * The task most recently stolen from another worker (or
294 <     * submission queue).  Not volatile because always read/written in
295 <     * presence of related volatiles in those cases where it matters.
294 >     * submission queue).  Written only by current thread, but read by
295 >     * others.
296       */
297 <    private ForkJoinTask<?> currentSteal;
297 >    private volatile ForkJoinTask<?> currentSteal;
298  
299      /**
300       * Creates a ForkJoinWorkerThread operating in the given pool.
# Line 423 | Line 417 | public class ForkJoinWorkerThread extend
417       * Find and execute tasks and check status while running
418       */
419      private void mainLoop() {
420 <        int misses = 0; // track consecutive times failed to find work; max 2
420 >        boolean ran = false; // true if ran a task on last step
421          ForkJoinPool p = pool;
422          for (;;) {
423 <            p.preStep(this, misses);
423 >            p.preStep(this, ran);
424              if (runState != 0)
425                  break;
426 <            misses = ((tryExecSteal() || tryExecSubmission()) ? 0 :
433 <                      (misses < 2 ? misses + 1 : 2));
426 >            ran = tryExecSteal() || tryExecSubmission();
427          }
428      }
429  
# Line 441 | Line 434 | public class ForkJoinWorkerThread extend
434       */
435      private boolean tryExecSteal() {
436          ForkJoinTask<?> t;
437 <        if ((t  = scan()) != null) {
437 >        if ((t = scan()) != null) {
438              t.quietlyExec();
439 <            currentSteal = null;
439 >            UNSAFE.putOrderedObject(this, currentStealOffset, null);
440              if (sp != base)
441                  execLocalTasks();
442              return true;
# Line 460 | Line 453 | public class ForkJoinWorkerThread extend
453          ForkJoinPool p = pool;
454          while (p.hasQueuedSubmissions()) {
455              ForkJoinTask<?> t; int a;
456 <            if (active || // ugly/hacky: inline p.tryIncrementActiveCount
456 >            if (active || // inline p.tryIncrementActiveCount
457                  (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
458                                                     a = p.runState, a + 1))) {
459                  if ((t = p.pollSubmission()) != null) {
460 <                    currentSteal = t;
460 >                    UNSAFE.putOrderedObject(this, currentStealOffset, t);
461                      t.quietlyExec();
462 <                    currentSteal = null;
462 >                    UNSAFE.putOrderedObject(this, currentStealOffset, null);
463                      if (sp != base)
464                          execLocalTasks();
465                      return true;
# Line 625 | Line 618 | public class ForkJoinWorkerThread extend
618          if ((s = sp) != base && q != null &&
619              UNSAFE.compareAndSwapObject
620              (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) {
621 <            sp = s;
621 >            sp = s; // putOrderedInt may encourage more timely write
622              // UNSAFE.putOrderedInt(this, spOffset, s);
623              return true;
624          }
# Line 714 | Line 707 | public class ForkJoinWorkerThread extend
707              for (;;) {
708                  ForkJoinWorkerThread v = ws[k & mask];
709                  r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
710 <                if (v != null && v.base != v.sp) {
711 <                    ForkJoinTask<?>[] q; int b, a;
712 <                    if ((canSteal ||      // Ugly/hacky: inline
713 <                         (canSteal = active =  // p.tryIncrementActiveCount
714 <                          UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
715 <                                                   a = p.runState, a + 1))) &&
716 <                        (q = v.queue) != null && (b = v.base) != v.sp) {
717 <                        int i = (q.length - 1) & b;
718 <                        long u = (i << qShift) + qBase; // raw offset
719 <                        ForkJoinTask<?> t = q[i];
720 <                        if (v.base == b && t != null &&
710 >                ForkJoinTask<?>[] q; ForkJoinTask<?> t; int b, a;
711 >                if (v != null && (b = v.base) != v.sp &&
712 >                    (q = v.queue) != null) {
713 >                    int i = (q.length - 1) & b;
714 >                    long u = (i << qShift) + qBase; // raw offset
715 >                    int pid = poolIndex;
716 >                    if ((t = q[i]) != null) {
717 >                        if (!canSteal &&  // inline p.tryIncrementActiveCount
718 >                            UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
719 >                                                     a = p.runState, a + 1))
720 >                            canSteal = active = true;
721 >                        if (canSteal && v.base == b++ &&
722                              UNSAFE.compareAndSwapObject(q, u, t, null)) {
723 <                            int pid = poolIndex;
730 <                            currentSteal = t;
723 >                            v.base = b;
724                              v.stealHint = pid;
725 <                            v.base = b + 1;
725 >                            UNSAFE.putOrderedObject(this,
726 >                                                    currentStealOffset, t);
727                              seed = r;
728                              ++stealCount;
729                              return t;
# Line 760 | Line 754 | public class ForkJoinWorkerThread extend
754  
755      /**
756       * Sets state to TERMINATING. Does NOT unpark or interrupt
757 <     * to wake up if currently blocked.
757 >     * to wake up if currently blocked. Callers must do so if desired.
758       */
759      final void shutdown() {
760          for (;;) {
# Line 791 | Line 785 | public class ForkJoinWorkerThread extend
785  
786      /**
787       * If suspended, tries to set status to unsuspended.
788 +     * Does NOT wake up if blocked.
789       *
790       * @return true if successful
791       */
# Line 822 | Line 817 | public class ForkJoinWorkerThread extend
817          }
818          ForkJoinPool p = pool;
819          p.pushSpare(this);
825        lastEventCount = 0;         // reset upon resume
820          while ((runState & SUSPENDED) != 0) {
821              if (p.tryAccumulateStealCount(this)) {
828                boolean untimed = nextSpare != 0;
829                long startTime = untimed? 0 : System.nanoTime();
822                  interrupted();          // clear/ignore interrupts
823                  if ((runState & SUSPENDED) == 0)
824                      break;
825 <                if (untimed)     // untimed
834 <                    LockSupport.park(this);
835 <                else {
836 <                    LockSupport.parkNanos(this, TRIM_RATE_NANOS);
837 <                    if ((runState & SUSPENDED) == 0)
838 <                        break;
839 <                    if (System.nanoTime() - startTime >= TRIM_RATE_NANOS)
840 <                        p.tryShutdownSpare();
841 <                }
825 >                LockSupport.park(this);
826              }
827          }
828      }
# Line 925 | Line 909 | public class ForkJoinWorkerThread extend
909          ForkJoinTask<?> t = pollLocalTask();
910          if (t == null) {
911              t = scan();
912 <            currentSteal = null; // cannot retain/track/help
912 >            // cannot retain/track/help steal
913 >            UNSAFE.putOrderedObject(this, currentStealOffset, null);
914          }
915          return t;
916      }
# Line 980 | Line 965 | public class ForkJoinWorkerThread extend
965      }
966  
967      /**
968 <     * Tries to locate and help perform tasks for a stealer of the
969 <     * given task, or in turn one of its stealers.  Traces
970 <     * currentSteal->currentJoin links looking for a thread working on
971 <     * a descendant of the given task and with a non-empty queue to
972 <     * steal back and execute tasks from.
968 >     * Unless terminating, tries to locate and help perform tasks for
969 >     * a stealer of the given task, or in turn one of its stealers.
970 >     * Traces currentSteal->currentJoin links looking for a thread
971 >     * working on a descendant of the given task and with a non-empty
972 >     * queue to steal back and execute tasks from.
973       *
974 <     * The implementation is very branchy to cope with the potential
974 >     * The implementation is very branchy to cope with potential
975       * inconsistencies or loops encountering chains that are stale,
976       * unknown, or of length greater than MAX_HELP_DEPTH links.  All
977       * of these cases are dealt with by just returning back to the
# Line 996 | Line 981 | public class ForkJoinWorkerThread extend
981       * @param joinMe the task to join
982       */
983      final void helpJoinTask(ForkJoinTask<?> joinMe) {
984 <        ForkJoinWorkerThread[] ws = pool.workers;
985 <        int n; // need at least 2 workers
986 <        if (ws != null && (n = ws.length) > 1 && joinMe.status >= 0) {
987 <            ForkJoinTask<?> task = joinMe;        // base of chain
988 <            ForkJoinWorkerThread thread = this;   // thread with stolen task
989 <            for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
990 <                // Try to find v, the stealer of task, by first using hint
991 <                ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
992 <                if (v == null || v.currentSteal != task) {
993 <                    for (int j = 0; ; ++j) {      // search array
994 <                        if (j < n) {
995 <                            if ((v = ws[j]) != null) {
996 <                                if (task.status < 0)
997 <                                    return;       // stale or done
998 <                                if (v.currentSteal == task) {
999 <                                    thread.stealHint = j;
1000 <                                    break;        // save hint for next time
1001 <                                }
984 >        ForkJoinWorkerThread[] ws;
985 >        int n;
986 >        if (joinMe.status < 0)                // already done
987 >            return;
988 >        if ((runState & TERMINATING) != 0) {  // cancel if shutting down
989 >            joinMe.cancelIgnoringExceptions();
990 >            return;
991 >        }
992 >        if ((ws = pool.workers) == null || (n = ws.length) <= 1)
993 >            return;                           // need at least 2 workers
994 >
995 >        ForkJoinTask<?> task = joinMe;        // base of chain
996 >        ForkJoinWorkerThread thread = this;   // thread with stolen task
997 >        for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
998 >            // Try to find v, the stealer of task, by first using hint
999 >            ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
1000 >            if (v == null || v.currentSteal != task) {
1001 >                for (int j = 0; ; ++j) {      // search array
1002 >                    if (j < n) {
1003 >                        ForkJoinTask<?> vs;
1004 >                        if ((v = ws[j]) != null &&
1005 >                            (vs = v.currentSteal) != null) {
1006 >                            if (joinMe.status < 0 || task.status < 0)
1007 >                                return;       // stale or done
1008 >                            if (vs == task) {
1009 >                                thread.stealHint = j;
1010 >                                break;        // save hint for next time
1011                              }
1012                          }
1019                        else
1020                            return;               // no stealer
1013                      }
1014 +                    else
1015 +                        return;               // no stealer
1016                  }
1017 <                // Try to help v, using specialized form of deqTask
1018 <                int b;
1019 <                ForkJoinTask<?>[] q;
1020 <                while ((b = v.base) != v.sp && (q = v.queue) != null) {
1021 <                    int i = (q.length - 1) & b;
1022 <                    long u = (i << qShift) + qBase;
1023 <                    ForkJoinTask<?> t = q[i];
1024 <                    if (task.status < 0)
1025 <                        return;                   // stale or done
1026 <                    if (v.base == b) {
1027 <                        if (t == null)
1028 <                            return;               // producer stalled
1029 <                        if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
1030 <                            if (joinMe.status < 0) {
1031 <                                UNSAFE.putObjectVolatile(q, u, t);
1032 <                                return;           // back out on cancel
1033 <                            }
1034 <                            int pid = poolIndex;
1035 <                            ForkJoinTask<?> prevSteal = currentSteal;
1036 <                            currentSteal = t;
1043 <                            v.stealHint = pid;
1044 <                            v.base = b + 1;
1045 <                            t.quietlyExec();
1046 <                            currentSteal = prevSteal;
1047 <                        }
1017 >            }
1018 >            for (;;) { // Try to help v, using specialized form of deqTask
1019 >                if (joinMe.status < 0)
1020 >                    return;
1021 >                int b = v.base;
1022 >                ForkJoinTask<?>[] q = v.queue;
1023 >                if (b == v.sp || q == null)
1024 >                    break;
1025 >                int i = (q.length - 1) & b;
1026 >                long u = (i << qShift) + qBase;
1027 >                ForkJoinTask<?> t = q[i];
1028 >                int pid = poolIndex;
1029 >                ForkJoinTask<?> ps = currentSteal;
1030 >                if (task.status < 0)
1031 >                    return;                   // stale or done
1032 >                if (t != null && v.base == b++ &&
1033 >                    UNSAFE.compareAndSwapObject(q, u, t, null)) {
1034 >                    if (joinMe.status < 0) {
1035 >                        UNSAFE.putObjectVolatile(q, u, t);
1036 >                        return;               // back out on cancel
1037                      }
1038 <                    if (joinMe.status < 0)
1039 <                        return;
1038 >                    v.base = b;
1039 >                    v.stealHint = pid;
1040 >                    UNSAFE.putOrderedObject(this, currentStealOffset, t);
1041 >                    t.quietlyExec();
1042 >                    UNSAFE.putOrderedObject(this, currentStealOffset, ps);
1043                  }
1052                // Try to descend to find v's stealer
1053                ForkJoinTask<?> next = v.currentJoin;
1054                if (task.status < 0 || next == null || next == task ||
1055                    joinMe.status < 0)
1056                    return;
1057                task = next;
1058                thread = v;
1044              }
1045 +            // Try to descend to find v's stealer
1046 +            ForkJoinTask<?> next = v.currentJoin;
1047 +            if (task.status < 0 || next == null || next == task ||
1048 +                joinMe.status < 0)
1049 +                return;
1050 +            task = next;
1051 +            thread = v;
1052          }
1053      }
1054  
# Line 1115 | Line 1107 | public class ForkJoinWorkerThread extend
1107       * Runs tasks until {@code pool.isQuiescent()}.
1108       */
1109      final void helpQuiescePool() {
1110 +        ForkJoinTask<?> ps = currentSteal; // to restore below
1111          for (;;) {
1112              ForkJoinTask<?> t = pollLocalTask();
1113 <            if (t != null || (t = scan()) != null) {
1113 >            if (t != null || (t = scan()) != null)
1114                  t.quietlyExec();
1122                currentSteal = null;
1123            }
1115              else {
1116                  ForkJoinPool p = pool;
1117                  int a; // to inline CASes
# Line 1129 | Line 1120 | public class ForkJoinWorkerThread extend
1120                          (p, poolRunStateOffset, a = p.runState, a - 1))
1121                          continue;   // retry later
1122                      active = false; // inactivate
1123 +                    UNSAFE.putOrderedObject(this, currentStealOffset, ps);
1124                  }
1125                  if (p.isQuiescent()) {
1126                      active = true; // re-activate

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines