ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.31 by dl, Mon Apr 5 15:52:26 2010 UTC vs.
Revision 1.32 by dl, Sun Apr 18 12:51:18 2010 UTC

# Line 635 | Line 635 | public class ForkJoinWorkerThread extend
635                          return t;
636                      }
637                  }
638 <                j = -n;               // reset on contention
638 >                j = -n;           // reset on contention
639              }
640              k = j >= 0? k + ((n >>> 1) | 1) : r;
641          }
# Line 877 | Line 877 | public class ForkJoinWorkerThread extend
877      }
878  
879      /**
880 <     * Returns a stolen task, if available, unless joinMe is done
880 >     * Returns a popped or stolen task, if available, unless joinMe is done
881       *
882       * This method is intrinsically nonmodular. To maintain the
883       * property that tasks are never stolen if the awaited task is
# Line 889 | Line 889 | public class ForkJoinWorkerThread extend
889       * without adjusting index. The scan loop is otherwise the same as
890       * in scan.
891       *
892     * The outer loop cannot be allowed to run forever, because it
893     * could lead to a form of deadlock if all threads are executing
894     * this method. However, we must also be patient before giving up,
895     * to cope with GC stalls, transient high loads, etc. The loop
896     * terminates (causing caller to possibly block this thread and
897     * create a replacement) only after #workers clean sweeps during
898     * which all running threads are active.
892       */
893      final ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
894 <        int sweeps = 0;
895 <        int r = seed;
896 <        ForkJoinPool p = pool;
897 <        p.releaseWaiters(); // help other threads progress
898 <        while (joinMe.status >= 0) {
894 >        ForkJoinTask<?> popped; // prefer local tasks
895 >        if (base != sp && (popped = popWhileJoining(joinMe)) != null)
896 >            return popped;
897 >        if (joinMe.status >= 0) {
898 >            ForkJoinPool p = pool;
899              ForkJoinWorkerThread[] ws = p.workers;
900              int n = ws.length;
901 +            int r = seed;
902              int k = r;
903 <            for (int j = -n; j < n; ++j) {
903 >            for (int j = -n; j < n && joinMe.status >= 0; ++j) {
904                  ForkJoinWorkerThread v = ws[k & (n - 1)];
905                  r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
906                  if (v != null) {
# Line 915 | Line 909 | public class ForkJoinWorkerThread extend
909                      if (b != v.sp && (q = v.queue) != null) {
910                          int i = (q.length - 1) & b;
911                          ForkJoinTask<?> t = q[i];
912 <                        if (t != null) {
913 <                            if (joinMe.status < 0)
914 <                                return null;
921 <                            if (UNSAFE.compareAndSwapObject
922 <                                (q, (i << qShift) + qBase, t, null)) {
923 <                                if (joinMe.status < 0) {
924 <                                    writeSlot(q, i, t); // back out
925 <                                    return null;
926 <                                }
912 >                        if (t != null && UNSAFE.compareAndSwapObject
913 >                            (q, (i << qShift) + qBase, t, null)) {
914 >                            if (joinMe.status >= 0) {
915                                  v.base = b + 1;
916                                  seed = r;
917                                  ++stealCount;
918                                  return t;
919                              }
920 +                            UNSAFE.putObjectVolatile(q, (i<<qShift)+qBase, t);
921 +                            break; // back out
922                          }
923 <                        sweeps = 0; // ensure rescan on contention
923 >                        j = -n;
924                      }
925                  }
926                  k = j >= 0? k + ((n >>> 1) | 1) : r;
937                if ((j & 7) == 0 && joinMe.status < 0) // periodically recheck
938                    return null;
927              }
928 <            if ((sweeps = p.inactiveCount() == 0 ? sweeps + 1 : 0) > n)
929 <                return null;
928 >        }
929 >        return null;
930 >    }
931 >
932 >    /**
933 >     * Version of popTask with join checks surrounding extraction.
934 >     * Uses the same backout strategy as scanWhileJoining. Note that
935 >     * we ignore locallyFifo flag for local tasks here since helping
936 >     * joins only make sense in LIFO mode.
937 >     *
938 >     * @return a popped task, if available, unless joinMe is done
939 >     */
940 >    private ForkJoinTask<?> popWhileJoining(ForkJoinTask<?> joinMe) {
941 >        int s;
942 >        ForkJoinTask<?>[] q;
943 >        while ((s = sp) != base && (q = queue) != null && joinMe.status >= 0) {
944 >            int i = (q.length - 1) & --s;
945 >            ForkJoinTask<?> t = q[i];
946 >            if (t != null && UNSAFE.compareAndSwapObject
947 >                (q, (i << qShift) + qBase, t, null)) {
948 >                if (joinMe.status >= 0) {
949 >                    sp = s;
950 >                    return t;
951 >                }
952 >                UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t);
953 >                break;  // back out
954 >            }
955          }
956          return null;
957      }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines