ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinWorkerThread.java (file contents):
Revision 1.37 by jsr166, Thu Nov 18 00:39:15 2010 UTC vs.
Revision 1.38 by dl, Thu Nov 18 00:53:24 2010 UTC

# Line 940 | Line 940 | public class ForkJoinWorkerThread extend
940          UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
941          if (isTerminating())                // cancel if shutting down
942              joinMe.cancelIgnoringExceptions();
943 <        else
944 <            pool.awaitJoin(joinMe, this, timed, nanos);
943 >        else {
944 >            if (sp != base)
945 >                localHelpJoinTask(joinMe);
946 >            if (joinMe.status >= 0)
947 >                pool.awaitJoin(joinMe, this, timed, nanos);
948 >        }
949          UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
950      }
951  
952      /**
953 +     * Run tasks in local queue until given task is done.
954 +     * Not currently used because it complicates semantics.
955 +     *
956 +     * @param joinMe the task to join
957 +     */
958 +    private void localHelpJoinTask(ForkJoinTask<?> joinMe) {
959 +        int s;
960 +        ForkJoinTask<?>[] q;
961 +        while (joinMe.status >= 0 && (s = sp) != base && (q = queue) != null) {
962 +            int i = (q.length - 1) & --s;
963 +            long u = (i << qShift) + qBase; // raw offset
964 +            ForkJoinTask<?> t = q[i];
965 +            if (t == null)  // lost to a stealer
966 +                break;
967 +            if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
968 +                /*
969 +                 * This recheck (and similarly in helpJoinTask)
970 +                 * handles cases where joinMe is independently
971 +                 * cancelled or forced even though there is other work
972 +                 * available. Back out of the pop by putting t back
973 +                 * into slot before we commit by writing sp.
974 +                 */
975 +                if (joinMe.status < 0) {
976 +                    UNSAFE.putObjectVolatile(q, u, t);
977 +                    break;
978 +                }
979 +                sp = s;
980 +                // UNSAFE.putOrderedInt(this, spOffset, s);
981 +                t.quietlyExec();
982 +            }
983 +        }
984 +    }
985 +
986 +    /**
987       * Tries to locate and help perform tasks for a stealer of the
988       * given task, or in turn one of its stealers.  Traces
989       * currentSteal->currentJoin links looking for a thread working on
# Line 978 | Line 1016 | public class ForkJoinWorkerThread extend
1016                  for (int j = 0; ; ++j) {      // search array
1017                      if (j < n) {
1018                          ForkJoinTask<?> vs;
1019 <                        if ((v = ws[j]) != null && v != this &&
1019 >                        if ((v = ws[j]) != null &&
1020 >                            (v != this || base == sp) &&
1021                              (vs = v.currentSteal) != null) {
1022                              if (joinMe.status < 0 || task.status < 0)
1023                                  return;       // stale or done

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines