ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinWorkerThread.java (file contents):
Revision 1.14 by dl, Mon Apr 5 16:05:09 2010 UTC vs.
Revision 1.15 by dl, Sun Apr 18 12:54:57 2010 UTC

# Line 633 | Line 633 | public class ForkJoinWorkerThread extend
633                          return t;
634                      }
635                  }
636 <                j = -n;               // reset on contention
636 >                j = -n;           // reset on contention
637              }
638              k = j >= 0? k + ((n >>> 1) | 1) : r;
639          }
# Line 875 | Line 875 | public class ForkJoinWorkerThread extend
875      }
876  
877      /**
878 <     * Returns a stolen task, if available, unless joinMe is done
878 >     * Returns a popped or stolen task, if available, unless joinMe is done
879       *
880       * This method is intrinsically nonmodular. To maintain the
881       * property that tasks are never stolen if the awaited task is
# Line 887 | Line 887 | public class ForkJoinWorkerThread extend
887       * without adjusting index. The scan loop is otherwise the same as
888       * in scan.
889       *
890     * The outer loop cannot be allowed to run forever, because it
891     * could lead to a form of deadlock if all threads are executing
892     * this method. However, we must also be patient before giving up,
893     * to cope with GC stalls, transient high loads, etc. The loop
894     * terminates (causing caller to possibly block this thread and
895     * create a replacement) only after #workers clean sweeps during
896     * which all running threads are active.
890       */
891      final ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
892 <        int sweeps = 0;
893 <        int r = seed;
894 <        ForkJoinPool p = pool;
895 <        p.releaseWaiters(); // help other threads progress
896 <        while (joinMe.status >= 0) {
892 >        ForkJoinTask<?> popped; // prefer local tasks
893 >        if (base != sp && (popped = popWhileJoining(joinMe)) != null)
894 >            return popped;
895 >        if (joinMe.status >= 0) {
896 >            ForkJoinPool p = pool;
897              ForkJoinWorkerThread[] ws = p.workers;
898              int n = ws.length;
899 +            int r = seed;
900              int k = r;
901 <            for (int j = -n; j < n; ++j) {
901 >            for (int j = -n; j < n && joinMe.status >= 0; ++j) {
902                  ForkJoinWorkerThread v = ws[k & (n - 1)];
903                  r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
904                  if (v != null) {
# Line 913 | Line 907 | public class ForkJoinWorkerThread extend
907                      if (b != v.sp && (q = v.queue) != null) {
908                          int i = (q.length - 1) & b;
909                          ForkJoinTask<?> t = q[i];
910 <                        if (t != null) {
911 <                            if (joinMe.status < 0)
912 <                                return null;
919 <                            if (UNSAFE.compareAndSwapObject
920 <                                (q, (i << qShift) + qBase, t, null)) {
921 <                                if (joinMe.status < 0) {
922 <                                    writeSlot(q, i, t); // back out
923 <                                    return null;
924 <                                }
910 >                        if (t != null && UNSAFE.compareAndSwapObject
911 >                            (q, (i << qShift) + qBase, t, null)) {
912 >                            if (joinMe.status >= 0) {
913                                  v.base = b + 1;
914                                  seed = r;
915                                  ++stealCount;
916                                  return t;
917                              }
918 +                            UNSAFE.putObjectVolatile(q, (i<<qShift)+qBase, t);
919 +                            break; // back out
920                          }
921 <                        sweeps = 0; // ensure rescan on contention
921 >                        j = -n;
922                      }
923                  }
924                  k = j >= 0? k + ((n >>> 1) | 1) : r;
935                if ((j & 7) == 0 && joinMe.status < 0) // periodically recheck
936                    return null;
925              }
926 <            if ((sweeps = p.inactiveCount() == 0 ? sweeps + 1 : 0) > n)
927 <                return null;
926 >        }
927 >        return null;
928 >    }
929 >
930 >    /**
931 >     * Version of popTask with join checks surrounding extraction.
932 >     * Uses the same backout strategy as scanWhileJoining. Note that
933 >     * we ignore locallyFifo flag for local tasks here since helping
934 >     * joins only make sense in LIFO mode.
935 >     *
936 >     * @return a popped task, if available, unless joinMe is done
937 >     */
938 >    private ForkJoinTask<?> popWhileJoining(ForkJoinTask<?> joinMe) {
939 >        int s;
940 >        ForkJoinTask<?>[] q;
941 >        while ((s = sp) != base && (q = queue) != null && joinMe.status >= 0) {
942 >            int i = (q.length - 1) & --s;
943 >            ForkJoinTask<?> t = q[i];
944 >            if (t != null && UNSAFE.compareAndSwapObject
945 >                (q, (i << qShift) + qBase, t, null)) {
946 >                if (joinMe.status >= 0) {
947 >                    sp = s;
948 >                    return t;
949 >                }
950 >                UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t);
951 >                break;  // back out
952 >            }
953          }
954          return null;
955      }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines