ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.31 by dl, Mon Apr 5 15:52:26 2010 UTC vs.
Revision 1.33 by dl, Thu May 27 16:46:49 2010 UTC

# Line 504 | Line 504 | public class ForkJoinWorkerThread extend
504      /**
505       * Returns a popped task, or null if empty. Assumes active status.
506       * Called only by current thread. (Note: a specialization of this
507 <     * code appears in scanWhileJoining.)
507 >     * code appears in popWhileJoining.)
508       */
509      final ForkJoinTask<?> popTask() {
510          int s;
511 <        ForkJoinTask<?>[] q = queue;
512 <        if (q != null && (s = sp) != base) {
511 >        ForkJoinTask<?>[] q;
512 >        if (base != (s = sp) && (q = queue) != null) {
513              int i = (q.length - 1) & --s;
514              ForkJoinTask<?> t = q[i];
515              if (t != null && UNSAFE.compareAndSwapObject
# Line 522 | Line 522 | public class ForkJoinWorkerThread extend
522      }
523  
524      /**
525 <     * Specialized version of popTask to pop only if
526 <     * topmost element is the given task. Called only
527 <     * by current thread while active.
525 >     * Specialized version of popTask to pop only if topmost element
526 >     * is the given task. Called only by current thread while
527 >     * active.
528       *
529       * @param t the task. Caller must ensure non-null.
530       */
531      final boolean unpushTask(ForkJoinTask<?> t) {
532          int s;
533 <        ForkJoinTask<?>[] q = queue;
534 <        if (q != null && UNSAFE.compareAndSwapObject
535 <            (q, (((q.length - 1) & (s = sp - 1)) << qShift) + qBase, t, null)){
533 >        ForkJoinTask<?>[] q;
534 >        if (base != (s = sp) && (q = queue) != null &&
535 >            UNSAFE.compareAndSwapObject
536 >            (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) {
537              sp = s;
538              return true;
539          }
# Line 610 | Line 611 | public class ForkJoinWorkerThread extend
611       */
612      private ForkJoinTask<?> scan() {
613          ForkJoinPool p = pool;
614 <        ForkJoinWorkerThread[] ws = p.workers;
615 <        int n = ws.length;            // upper bound of #workers
616 <        boolean canSteal = active;    // shadow active status
617 <        int r = seed;                 // extract seed once
618 <        int k = r;                    // index: random if j<0 else step
619 <        for (int j = -n; j < n; ++j) {
620 <            ForkJoinWorkerThread v = ws[k & (n - 1)];
621 <            r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
622 <            if (v != null && v.base != v.sp) {
623 <                if (canSteal ||       // ensure active status
624 <                    (canSteal = active = p.tryIncrementActiveCount())) {
625 <                    int b, i;         // inlined specialization of deqTask
626 <                    ForkJoinTask<?> t;
614 >        ForkJoinWorkerThread[] ws;        // worker array
615 >        int n;                            // upper bound of #workers
616 >        if ((ws = p.workers) != null && (n = ws.length) > 1) {
617 >            boolean canSteal = active;    // shadow active status
618 >            int r = seed;                 // extract seed once
619 >            int mask = n - 1;
620 >            int j = -n;                   // loop counter
621 >            int k = r;                    // worker index, random if j < 0
622 >            for (;;) {
623 >                ForkJoinWorkerThread v = ws[k & mask];
624 >                r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
625 >                if (v != null && v.base != v.sp) {
626 >                    int b, i;             // inline specialized deqTask
627                      ForkJoinTask<?>[] q;
628 <                    if ((b = v.base) != v.sp &&  // recheck
628 >                    ForkJoinTask<?> t;
629 >                    if ((canSteal ||      // ensure active status
630 >                         (canSteal = active = p.tryIncrementActiveCount())) &&
631                          (q = v.queue) != null &&
632 <                        (t = q[i = (q.length - 1) & b]) != null &&
632 >                        (t = q[i = (q.length - 1) & (b = v.base)]) != null &&
633                          UNSAFE.compareAndSwapObject
634                          (q, (i << qShift) + qBase, t, null)) {
635                          v.base = b + 1;
# Line 634 | Line 637 | public class ForkJoinWorkerThread extend
637                          ++stealCount;
638                          return t;
639                      }
640 +                    j = -n;
641 +                    k = r;                // restart on contention
642                  }
643 <                j = -n;               // reset on contention
643 >                else if (++j <= 0)
644 >                    k = r;
645 >                else if (j <= n)
646 >                    k += (n >>> 1) | 1;
647 >                else
648 >                    break;
649              }
640            k = j >= 0? k + ((n >>> 1) | 1) : r;
650          }
651          return null;
652      }
# Line 877 | Line 886 | public class ForkJoinWorkerThread extend
886      }
887  
888      /**
889 <     * Returns a stolen task, if available, unless joinMe is done
889 >     * Executes or processes other tasks awaiting the given task
890 >     * @return task completion status
891 >     */
892 >    final int execWhileJoining(ForkJoinTask<?> joinMe) {
893 >        int s;
894 >        while ((s = joinMe.status) >= 0) {
895 >            ForkJoinTask<?> t = base != sp?
896 >                popWhileJoining(joinMe) :
897 >                scanWhileJoining(joinMe);
898 >            if (t != null)
899 >                t.tryExec();
900 >        }
901 >        return s;
902 >    }
903 >
904 >    /**
905 >     * Returns or stolen task, if available, unless joinMe is done
906       *
907       * This method is intrinsically nonmodular. To maintain the
908       * property that tasks are never stolen if the awaited task is
# Line 886 | Line 911 | public class ForkJoinWorkerThread extend
911       * to cancel a steal even after CASing slot to null, but before
912       * adjusting base index: If, after the CAS, we see that joinMe is
913       * ready, we can back out by placing the task back into the slot,
914 <     * without adjusting index. The scan loop is otherwise the same as
915 <     * in scan.
914 >     * without adjusting index. The loop is otherwise a variant of the
915 >     * one in scan().
916       *
892     * The outer loop cannot be allowed to run forever, because it
893     * could lead to a form of deadlock if all threads are executing
894     * this method. However, we must also be patient before giving up,
895     * to cope with GC stalls, transient high loads, etc. The loop
896     * terminates (causing caller to possibly block this thread and
897     * create a replacement) only after #workers clean sweeps during
898     * which all running threads are active.
917       */
918 <    final ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
901 <        int sweeps = 0;
918 >    private ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
919          int r = seed;
920          ForkJoinPool p = pool;
921 <        p.releaseWaiters(); // help other threads progress
922 <        while (joinMe.status >= 0) {
923 <            ForkJoinWorkerThread[] ws = p.workers;
924 <            int n = ws.length;
921 >        ForkJoinWorkerThread[] ws;
922 >        int n;
923 >        outer:while ((ws = p.workers) != null && (n = ws.length) > 1) {
924 >            int mask = n - 1;
925              int k = r;
926 <            for (int j = -n; j < n; ++j) {
927 <                ForkJoinWorkerThread v = ws[k & (n - 1)];
926 >            boolean contended = false; // to retry loop if deq contends
927 >            for (int j = -n; j <= n; ++j) {
928 >                if (joinMe.status < 0)
929 >                    break outer;
930 >                int b;
931 >                ForkJoinTask<?>[] q;
932 >                ForkJoinWorkerThread v = ws[k & mask];
933                  r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
934 <                if (v != null) {
935 <                    int b = v.base;
936 <                    ForkJoinTask<?>[] q;
937 <                    if (b != v.sp && (q = v.queue) != null) {
938 <                        int i = (q.length - 1) & b;
939 <                        ForkJoinTask<?> t = q[i];
940 <                        if (t != null) {
941 <                            if (joinMe.status < 0)
942 <                                return null;
943 <                            if (UNSAFE.compareAndSwapObject
922 <                                (q, (i << qShift) + qBase, t, null)) {
923 <                                if (joinMe.status < 0) {
924 <                                    writeSlot(q, i, t); // back out
925 <                                    return null;
926 <                                }
927 <                                v.base = b + 1;
928 <                                seed = r;
929 <                                ++stealCount;
930 <                                return t;
931 <                            }
934 >                if (v != null && (b=v.base) != v.sp && (q=v.queue) != null) {
935 >                    int i = (q.length - 1) & b;
936 >                    ForkJoinTask<?> t = q[i];
937 >                    if (t != null && UNSAFE.compareAndSwapObject
938 >                        (q, (i << qShift) + qBase, t, null)) {
939 >                        if (joinMe.status >= 0) {
940 >                            v.base = b + 1;
941 >                            seed = r;
942 >                            ++stealCount;
943 >                            return t;
944                          }
945 <                        sweeps = 0; // ensure rescan on contention
945 >                        UNSAFE.putObjectVolatile(q, (i<<qShift)+qBase, t);
946 >                        break outer; // back out
947                      }
948 +                    contended = true;
949 +                }
950 +                k = j < 0 ? r : (k + ((n >>> 1) | 1));
951 +            }
952 +            if (!contended && p.tryAwaitBusyJoin(joinMe))
953 +                break;
954 +        }
955 +        return null;
956 +    }
957 +
958 +    /**
959 +     * Version of popTask with join checks surrounding extraction.
960 +     * Uses the same backout strategy as helpJoinTask. Note that
961 +     * we ignore locallyFifo flag for local tasks here since helping
962 +     * joins only make sense in LIFO mode.
963 +     *
964 +     * @return a popped task, if available, unless joinMe is done
965 +     */
966 +    private ForkJoinTask<?> popWhileJoining(ForkJoinTask<?> joinMe) {
967 +        int s;
968 +        ForkJoinTask<?>[] q;
969 +        while ((s = sp) != base && (q = queue) != null && joinMe.status >= 0) {
970 +            int i = (q.length - 1) & --s;
971 +            ForkJoinTask<?> t = q[i];
972 +            if (t != null && UNSAFE.compareAndSwapObject
973 +                (q, (i << qShift) + qBase, t, null)) {
974 +                if (joinMe.status >= 0) {
975 +                    sp = s;
976 +                    return t;
977                  }
978 <                k = j >= 0? k + ((n >>> 1) | 1) : r;
979 <                if ((j & 7) == 0 && joinMe.status < 0) // periodically recheck
938 <                    return null;
978 >                UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t);
979 >                break;  // back out
980              }
940            if ((sweeps = p.inactiveCount() == 0 ? sweeps + 1 : 0) > n)
941                return null;
981          }
982          return null;
983      }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines