ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinWorkerThread.java (file contents):
Revision 1.15 by dl, Sun Apr 18 12:54:57 2010 UTC vs.
Revision 1.16 by dl, Thu May 27 16:47:21 2010 UTC

# Line 502 | Line 502 | public class ForkJoinWorkerThread extend
502      /**
503       * Returns a popped task, or null if empty. Assumes active status.
504       * Called only by current thread. (Note: a specialization of this
505 <     * code appears in scanWhileJoining.)
505 >     * code appears in popWhileJoining.)
506       */
507      final ForkJoinTask<?> popTask() {
508          int s;
509 <        ForkJoinTask<?>[] q = queue;
510 <        if (q != null && (s = sp) != base) {
509 >        ForkJoinTask<?>[] q;
510 >        if (base != (s = sp) && (q = queue) != null) {
511              int i = (q.length - 1) & --s;
512              ForkJoinTask<?> t = q[i];
513              if (t != null && UNSAFE.compareAndSwapObject
# Line 520 | Line 520 | public class ForkJoinWorkerThread extend
520      }
521  
522      /**
523 <     * Specialized version of popTask to pop only if
524 <     * topmost element is the given task. Called only
525 <     * by current thread while active.
523 >     * Specialized version of popTask to pop only if topmost element
524 >     * is the given task. Called only by current thread while
525 >     * active.
526       *
527       * @param t the task. Caller must ensure non-null.
528       */
529      final boolean unpushTask(ForkJoinTask<?> t) {
530          int s;
531 <        ForkJoinTask<?>[] q = queue;
532 <        if (q != null && UNSAFE.compareAndSwapObject
533 <            (q, (((q.length - 1) & (s = sp - 1)) << qShift) + qBase, t, null)){
531 >        ForkJoinTask<?>[] q;
532 >        if (base != (s = sp) && (q = queue) != null &&
533 >            UNSAFE.compareAndSwapObject
534 >            (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) {
535              sp = s;
536              return true;
537          }
# Line 608 | Line 609 | public class ForkJoinWorkerThread extend
609       */
610      private ForkJoinTask<?> scan() {
611          ForkJoinPool p = pool;
612 <        ForkJoinWorkerThread[] ws = p.workers;
613 <        int n = ws.length;            // upper bound of #workers
614 <        boolean canSteal = active;    // shadow active status
615 <        int r = seed;                 // extract seed once
616 <        int k = r;                    // index: random if j<0 else step
617 <        for (int j = -n; j < n; ++j) {
618 <            ForkJoinWorkerThread v = ws[k & (n - 1)];
619 <            r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
620 <            if (v != null && v.base != v.sp) {
621 <                if (canSteal ||       // ensure active status
622 <                    (canSteal = active = p.tryIncrementActiveCount())) {
623 <                    int b, i;         // inlined specialization of deqTask
624 <                    ForkJoinTask<?> t;
612 >        ForkJoinWorkerThread[] ws;        // worker array
613 >        int n;                            // upper bound of #workers
614 >        if ((ws = p.workers) != null && (n = ws.length) > 1) {
615 >            boolean canSteal = active;    // shadow active status
616 >            int r = seed;                 // extract seed once
617 >            int mask = n - 1;
618 >            int j = -n;                   // loop counter
619 >            int k = r;                    // worker index, random if j < 0
620 >            for (;;) {
621 >                ForkJoinWorkerThread v = ws[k & mask];
622 >                r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
623 >                if (v != null && v.base != v.sp) {
624 >                    int b, i;             // inline specialized deqTask
625                      ForkJoinTask<?>[] q;
626 <                    if ((b = v.base) != v.sp &&  // recheck
626 >                    ForkJoinTask<?> t;
627 >                    if ((canSteal ||      // ensure active status
628 >                         (canSteal = active = p.tryIncrementActiveCount())) &&
629                          (q = v.queue) != null &&
630 <                        (t = q[i = (q.length - 1) & b]) != null &&
630 >                        (t = q[i = (q.length - 1) & (b = v.base)]) != null &&
631                          UNSAFE.compareAndSwapObject
632                          (q, (i << qShift) + qBase, t, null)) {
633                          v.base = b + 1;
# Line 632 | Line 635 | public class ForkJoinWorkerThread extend
635                          ++stealCount;
636                          return t;
637                      }
638 +                    j = -n;
639 +                    k = r;                // restart on contention
640                  }
641 <                j = -n;           // reset on contention
641 >                else if (++j <= 0)
642 >                    k = r;
643 >                else if (j <= n)
644 >                    k += (n >>> 1) | 1;
645 >                else
646 >                    break;
647              }
638            k = j >= 0? k + ((n >>> 1) | 1) : r;
648          }
649          return null;
650      }
# Line 875 | Line 884 | public class ForkJoinWorkerThread extend
884      }
885  
886      /**
887 <     * Returns a popped or stolen task, if available, unless joinMe is done
887 >     * Executes or processes other tasks awaiting the given task
888 >     * @return task completion status
889 >     */
890 >    final int execWhileJoining(ForkJoinTask<?> joinMe) {
891 >        int s;
892 >        while ((s = joinMe.status) >= 0) {
893 >            ForkJoinTask<?> t = base != sp?
894 >                popWhileJoining(joinMe) :
895 >                scanWhileJoining(joinMe);
896 >            if (t != null)
897 >                t.tryExec();
898 >        }
899 >        return s;
900 >    }
901 >
902 >    /**
903 >     * Returns or stolen task, if available, unless joinMe is done
904       *
905       * This method is intrinsically nonmodular. To maintain the
906       * property that tasks are never stolen if the awaited task is
# Line 884 | Line 909 | public class ForkJoinWorkerThread extend
909       * to cancel a steal even after CASing slot to null, but before
910       * adjusting base index: If, after the CAS, we see that joinMe is
911       * ready, we can back out by placing the task back into the slot,
912 <     * without adjusting index. The scan loop is otherwise the same as
913 <     * in scan.
912 >     * without adjusting index. The loop is otherwise a variant of the
913 >     * one in scan().
914       *
915       */
916 <    final ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
917 <        ForkJoinTask<?> popped; // prefer local tasks
918 <        if (base != sp && (popped = popWhileJoining(joinMe)) != null)
919 <            return popped;
920 <        if (joinMe.status >= 0) {
921 <            ForkJoinPool p = pool;
922 <            ForkJoinWorkerThread[] ws = p.workers;
898 <            int n = ws.length;
899 <            int r = seed;
916 >    private ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
917 >        int r = seed;
918 >        ForkJoinPool p = pool;
919 >        ForkJoinWorkerThread[] ws;
920 >        int n;
921 >        outer:while ((ws = p.workers) != null && (n = ws.length) > 1) {
922 >            int mask = n - 1;
923              int k = r;
924 <            for (int j = -n; j < n && joinMe.status >= 0; ++j) {
925 <                ForkJoinWorkerThread v = ws[k & (n - 1)];
924 >            boolean contended = false; // to retry loop if deq contends
925 >            for (int j = -n; j <= n; ++j) {
926 >                if (joinMe.status < 0)
927 >                    break outer;
928 >                int b;
929 >                ForkJoinTask<?>[] q;
930 >                ForkJoinWorkerThread v = ws[k & mask];
931                  r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
932 <                if (v != null) {
933 <                    int b = v.base;
934 <                    ForkJoinTask<?>[] q;
935 <                    if (b != v.sp && (q = v.queue) != null) {
936 <                        int i = (q.length - 1) & b;
937 <                        ForkJoinTask<?> t = q[i];
938 <                        if (t != null && UNSAFE.compareAndSwapObject
939 <                            (q, (i << qShift) + qBase, t, null)) {
940 <                            if (joinMe.status >= 0) {
941 <                                v.base = b + 1;
914 <                                seed = r;
915 <                                ++stealCount;
916 <                                return t;
917 <                            }
918 <                            UNSAFE.putObjectVolatile(q, (i<<qShift)+qBase, t);
919 <                            break; // back out
932 >                if (v != null && (b=v.base) != v.sp && (q=v.queue) != null) {
933 >                    int i = (q.length - 1) & b;
934 >                    ForkJoinTask<?> t = q[i];
935 >                    if (t != null && UNSAFE.compareAndSwapObject
936 >                        (q, (i << qShift) + qBase, t, null)) {
937 >                        if (joinMe.status >= 0) {
938 >                            v.base = b + 1;
939 >                            seed = r;
940 >                            ++stealCount;
941 >                            return t;
942                          }
943 <                        j = -n;
943 >                        UNSAFE.putObjectVolatile(q, (i<<qShift)+qBase, t);
944 >                        break outer; // back out
945                      }
946 +                    contended = true;
947                  }
948 <                k = j >= 0? k + ((n >>> 1) | 1) : r;
948 >                k = j < 0 ? r : (k + ((n >>> 1) | 1));
949              }
950 +            if (!contended && p.tryAwaitBusyJoin(joinMe))
951 +                break;
952          }
953          return null;
954      }
955  
956      /**
957       * Version of popTask with join checks surrounding extraction.
958 <     * Uses the same backout strategy as scanWhileJoining. Note that
958 >     * Uses the same backout strategy as helpJoinTask. Note that
959       * we ignore locallyFifo flag for local tasks here since helping
960       * joins only make sense in LIFO mode.
961       *

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines