ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.32 by dl, Sun Apr 18 12:51:18 2010 UTC vs.
Revision 1.33 by dl, Thu May 27 16:46:49 2010 UTC

# Line 504 | Line 504 | public class ForkJoinWorkerThread extend
504      /**
505       * Returns a popped task, or null if empty. Assumes active status.
506       * Called only by current thread. (Note: a specialization of this
507 <     * code appears in scanWhileJoining.)
507 >     * code appears in popWhileJoining.)
508       */
509      final ForkJoinTask<?> popTask() {
510          int s;
511 <        ForkJoinTask<?>[] q = queue;
512 <        if (q != null && (s = sp) != base) {
511 >        ForkJoinTask<?>[] q;
512 >        if (base != (s = sp) && (q = queue) != null) {
513              int i = (q.length - 1) & --s;
514              ForkJoinTask<?> t = q[i];
515              if (t != null && UNSAFE.compareAndSwapObject
# Line 522 | Line 522 | public class ForkJoinWorkerThread extend
522      }
523  
524      /**
525 <     * Specialized version of popTask to pop only if
526 <     * topmost element is the given task. Called only
527 <     * by current thread while active.
525 >     * Specialized version of popTask to pop only if topmost element
526 >     * is the given task. Called only by current thread while
527 >     * active.
528       *
529       * @param t the task. Caller must ensure non-null.
530       */
531      final boolean unpushTask(ForkJoinTask<?> t) {
532          int s;
533 <        ForkJoinTask<?>[] q = queue;
534 <        if (q != null && UNSAFE.compareAndSwapObject
535 <            (q, (((q.length - 1) & (s = sp - 1)) << qShift) + qBase, t, null)){
533 >        ForkJoinTask<?>[] q;
534 >        if (base != (s = sp) && (q = queue) != null &&
535 >            UNSAFE.compareAndSwapObject
536 >            (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) {
537              sp = s;
538              return true;
539          }
# Line 610 | Line 611 | public class ForkJoinWorkerThread extend
611       */
612      private ForkJoinTask<?> scan() {
613          ForkJoinPool p = pool;
614 <        ForkJoinWorkerThread[] ws = p.workers;
615 <        int n = ws.length;            // upper bound of #workers
616 <        boolean canSteal = active;    // shadow active status
617 <        int r = seed;                 // extract seed once
618 <        int k = r;                    // index: random if j<0 else step
619 <        for (int j = -n; j < n; ++j) {
620 <            ForkJoinWorkerThread v = ws[k & (n - 1)];
621 <            r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
622 <            if (v != null && v.base != v.sp) {
623 <                if (canSteal ||       // ensure active status
624 <                    (canSteal = active = p.tryIncrementActiveCount())) {
625 <                    int b, i;         // inlined specialization of deqTask
626 <                    ForkJoinTask<?> t;
614 >        ForkJoinWorkerThread[] ws;        // worker array
615 >        int n;                            // upper bound of #workers
616 >        if ((ws = p.workers) != null && (n = ws.length) > 1) {
617 >            boolean canSteal = active;    // shadow active status
618 >            int r = seed;                 // extract seed once
619 >            int mask = n - 1;
620 >            int j = -n;                   // loop counter
621 >            int k = r;                    // worker index, random if j < 0
622 >            for (;;) {
623 >                ForkJoinWorkerThread v = ws[k & mask];
624 >                r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
625 >                if (v != null && v.base != v.sp) {
626 >                    int b, i;             // inline specialized deqTask
627                      ForkJoinTask<?>[] q;
628 <                    if ((b = v.base) != v.sp &&  // recheck
628 >                    ForkJoinTask<?> t;
629 >                    if ((canSteal ||      // ensure active status
630 >                         (canSteal = active = p.tryIncrementActiveCount())) &&
631                          (q = v.queue) != null &&
632 <                        (t = q[i = (q.length - 1) & b]) != null &&
632 >                        (t = q[i = (q.length - 1) & (b = v.base)]) != null &&
633                          UNSAFE.compareAndSwapObject
634                          (q, (i << qShift) + qBase, t, null)) {
635                          v.base = b + 1;
# Line 634 | Line 637 | public class ForkJoinWorkerThread extend
637                          ++stealCount;
638                          return t;
639                      }
640 +                    j = -n;
641 +                    k = r;                // restart on contention
642                  }
643 <                j = -n;           // reset on contention
643 >                else if (++j <= 0)
644 >                    k = r;
645 >                else if (j <= n)
646 >                    k += (n >>> 1) | 1;
647 >                else
648 >                    break;
649              }
640            k = j >= 0? k + ((n >>> 1) | 1) : r;
650          }
651          return null;
652      }
# Line 877 | Line 886 | public class ForkJoinWorkerThread extend
886      }
887  
888      /**
889 <     * Returns a popped or stolen task, if available, unless joinMe is done
889 >     * Executes or processes other tasks awaiting the given task
890 >     * @return task completion status
891 >     */
892 >    final int execWhileJoining(ForkJoinTask<?> joinMe) {
893 >        int s;
894 >        while ((s = joinMe.status) >= 0) {
895 >            ForkJoinTask<?> t = base != sp?
896 >                popWhileJoining(joinMe) :
897 >                scanWhileJoining(joinMe);
898 >            if (t != null)
899 >                t.tryExec();
900 >        }
901 >        return s;
902 >    }
903 >
904 >    /**
905 >     * Returns or stolen task, if available, unless joinMe is done
906       *
907       * This method is intrinsically nonmodular. To maintain the
908       * property that tasks are never stolen if the awaited task is
# Line 886 | Line 911 | public class ForkJoinWorkerThread extend
911       * to cancel a steal even after CASing slot to null, but before
912       * adjusting base index: If, after the CAS, we see that joinMe is
913       * ready, we can back out by placing the task back into the slot,
914 <     * without adjusting index. The scan loop is otherwise the same as
915 <     * in scan.
914 >     * without adjusting index. The loop is otherwise a variant of the
915 >     * one in scan().
916       *
917       */
918 <    final ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
919 <        ForkJoinTask<?> popped; // prefer local tasks
920 <        if (base != sp && (popped = popWhileJoining(joinMe)) != null)
921 <            return popped;
922 <        if (joinMe.status >= 0) {
923 <            ForkJoinPool p = pool;
924 <            ForkJoinWorkerThread[] ws = p.workers;
900 <            int n = ws.length;
901 <            int r = seed;
918 >    private ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
919 >        int r = seed;
920 >        ForkJoinPool p = pool;
921 >        ForkJoinWorkerThread[] ws;
922 >        int n;
923 >        outer:while ((ws = p.workers) != null && (n = ws.length) > 1) {
924 >            int mask = n - 1;
925              int k = r;
926 <            for (int j = -n; j < n && joinMe.status >= 0; ++j) {
927 <                ForkJoinWorkerThread v = ws[k & (n - 1)];
926 >            boolean contended = false; // to retry loop if deq contends
927 >            for (int j = -n; j <= n; ++j) {
928 >                if (joinMe.status < 0)
929 >                    break outer;
930 >                int b;
931 >                ForkJoinTask<?>[] q;
932 >                ForkJoinWorkerThread v = ws[k & mask];
933                  r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
934 <                if (v != null) {
935 <                    int b = v.base;
936 <                    ForkJoinTask<?>[] q;
937 <                    if (b != v.sp && (q = v.queue) != null) {
938 <                        int i = (q.length - 1) & b;
939 <                        ForkJoinTask<?> t = q[i];
940 <                        if (t != null && UNSAFE.compareAndSwapObject
941 <                            (q, (i << qShift) + qBase, t, null)) {
942 <                            if (joinMe.status >= 0) {
943 <                                v.base = b + 1;
916 <                                seed = r;
917 <                                ++stealCount;
918 <                                return t;
919 <                            }
920 <                            UNSAFE.putObjectVolatile(q, (i<<qShift)+qBase, t);
921 <                            break; // back out
934 >                if (v != null && (b=v.base) != v.sp && (q=v.queue) != null) {
935 >                    int i = (q.length - 1) & b;
936 >                    ForkJoinTask<?> t = q[i];
937 >                    if (t != null && UNSAFE.compareAndSwapObject
938 >                        (q, (i << qShift) + qBase, t, null)) {
939 >                        if (joinMe.status >= 0) {
940 >                            v.base = b + 1;
941 >                            seed = r;
942 >                            ++stealCount;
943 >                            return t;
944                          }
945 <                        j = -n;
945 >                        UNSAFE.putObjectVolatile(q, (i<<qShift)+qBase, t);
946 >                        break outer; // back out
947                      }
948 +                    contended = true;
949                  }
950 <                k = j >= 0? k + ((n >>> 1) | 1) : r;
950 >                k = j < 0 ? r : (k + ((n >>> 1) | 1));
951              }
952 +            if (!contended && p.tryAwaitBusyJoin(joinMe))
953 +                break;
954          }
955          return null;
956      }
957  
958      /**
959       * Version of popTask with join checks surrounding extraction.
960 <     * Uses the same backout strategy as scanWhileJoining. Note that
960 >     * Uses the same backout strategy as helpJoinTask. Note that
961       * we ignore locallyFifo flag for local tasks here since helping
962       * joins only make sense in LIFO mode.
963       *

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines