ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.31 by dl, Mon Apr 5 15:52:26 2010 UTC vs.
Revision 1.34 by dl, Fri Jun 4 14:37:54 2010 UTC

# Line 138 | Line 138 | public class ForkJoinWorkerThread extend
138  
139      /**
140       * Capacity of work-stealing queue array upon initialization.
141 <     * Must be a power of two. Initial size must be at least 2, but is
141 >     * Must be a power of two. Initial size must be at least 4, but is
142       * padded to minimize cache effects.
143       */
144      private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
# Line 358 | Line 358 | public class ForkJoinWorkerThread extend
358       * Find and execute tasks and check status while running
359       */
360      private void mainLoop() {
361 <        boolean ran = false; // true if ran task on previous step
361 >        boolean ran = false;      // true if ran task in last loop iter
362 >        boolean prevRan = false;  // true if ran on last or previous step
363          ForkJoinPool p = pool;
364          for (;;) {
365 <            p.preStep(this, ran);
365 >            p.preStep(this, prevRan);
366              if (runState != 0)
367                  return;
368              ForkJoinTask<?> t; // try to get and run stolen or submitted task
369 <            if (ran = (t = scan()) != null || (t = pollSubmission()) != null) {
369 >            if ((t = scan()) != null || (t = pollSubmission()) != null) {
370                  t.tryExec();
371                  if (base != sp)
372                      runLocalTasks();
373 +                prevRan = ran = true;
374 +            }
375 +            else {
376 +                prevRan = ran;
377 +                ran = false;
378              }
379          }
380      }
# Line 447 | Line 453 | public class ForkJoinWorkerThread extend
453       * @param t the task. Caller must ensure non-null.
454       */
455      final void pushTask(ForkJoinTask<?> t) {
450        int s;
456          ForkJoinTask<?>[] q = queue;
457          int mask = q.length - 1; // implicit assert q != null
458 <        UNSAFE.putOrderedObject(q, (((s = sp++) & mask) << qShift) + qBase, t);
459 <        if ((s -= base) <= 0)
460 <            pool.signalWork();
461 <        else if (s + 1 >= mask)
462 <            growQueue();
458 >        int s = sp++;            // ok to increment sp before slot write
459 >        UNSAFE.putOrderedObject(q, ((s & mask) << qShift) + qBase, t);
460 >        if ((s -= base) == 0)
461 >            pool.signalWork();   // was empty
462 >        else if (s == mask)
463 >            growQueue();         // is full
464      }
465  
466      /**
# Line 504 | Line 510 | public class ForkJoinWorkerThread extend
510      /**
511       * Returns a popped task, or null if empty. Assumes active status.
512       * Called only by current thread. (Note: a specialization of this
513 <     * code appears in scanWhileJoining.)
513 >     * code appears in popWhileJoining.)
514       */
515      final ForkJoinTask<?> popTask() {
516          int s;
517 <        ForkJoinTask<?>[] q = queue;
518 <        if (q != null && (s = sp) != base) {
517 >        ForkJoinTask<?>[] q;
518 >        if (base != (s = sp) && (q = queue) != null) {
519              int i = (q.length - 1) & --s;
520              ForkJoinTask<?> t = q[i];
521              if (t != null && UNSAFE.compareAndSwapObject
# Line 522 | Line 528 | public class ForkJoinWorkerThread extend
528      }
529  
530      /**
531 <     * Specialized version of popTask to pop only if
532 <     * topmost element is the given task. Called only
533 <     * by current thread while active.
531 >     * Specialized version of popTask to pop only if topmost element
532 >     * is the given task. Called only by current thread while
533 >     * active.
534       *
535       * @param t the task. Caller must ensure non-null.
536       */
537      final boolean unpushTask(ForkJoinTask<?> t) {
538          int s;
539 <        ForkJoinTask<?>[] q = queue;
540 <        if (q != null && UNSAFE.compareAndSwapObject
541 <            (q, (((q.length - 1) & (s = sp - 1)) << qShift) + qBase, t, null)){
539 >        ForkJoinTask<?>[] q;
540 >        if (base != (s = sp) && (q = queue) != null &&
541 >            UNSAFE.compareAndSwapObject
542 >            (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) {
543              sp = s;
544              return true;
545          }
# Line 610 | Line 617 | public class ForkJoinWorkerThread extend
617       */
618      private ForkJoinTask<?> scan() {
619          ForkJoinPool p = pool;
620 <        ForkJoinWorkerThread[] ws = p.workers;
621 <        int n = ws.length;            // upper bound of #workers
622 <        boolean canSteal = active;    // shadow active status
623 <        int r = seed;                 // extract seed once
624 <        int k = r;                    // index: random if j<0 else step
625 <        for (int j = -n; j < n; ++j) {
626 <            ForkJoinWorkerThread v = ws[k & (n - 1)];
627 <            r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
628 <            if (v != null && v.base != v.sp) {
629 <                if (canSteal ||       // ensure active status
630 <                    (canSteal = active = p.tryIncrementActiveCount())) {
631 <                    int b, i;         // inlined specialization of deqTask
632 <                    ForkJoinTask<?> t;
620 >        ForkJoinWorkerThread[] ws;        // worker array
621 >        int n;                            // upper bound of #workers
622 >        if ((ws = p.workers) != null && (n = ws.length) > 1) {
623 >            boolean canSteal = active;    // shadow active status
624 >            int r = seed;                 // extract seed once
625 >            int mask = n - 1;
626 >            int j = -n;                   // loop counter
627 >            int k = r;                    // worker index, random if j < 0
628 >            for (;;) {
629 >                ForkJoinWorkerThread v = ws[k & mask];
630 >                r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
631 >                if (v != null && v.base != v.sp) {
632 >                    int b, i;             // inline specialized deqTask
633                      ForkJoinTask<?>[] q;
634 <                    if ((b = v.base) != v.sp &&  // recheck
634 >                    ForkJoinTask<?> t;
635 >                    if ((canSteal ||      // ensure active status
636 >                         (canSteal = active = p.tryIncrementActiveCount())) &&
637                          (q = v.queue) != null &&
638 <                        (t = q[i = (q.length - 1) & b]) != null &&
638 >                        (t = q[i = (q.length - 1) & (b = v.base)]) != null &&
639                          UNSAFE.compareAndSwapObject
640                          (q, (i << qShift) + qBase, t, null)) {
641                          v.base = b + 1;
# Line 634 | Line 643 | public class ForkJoinWorkerThread extend
643                          ++stealCount;
644                          return t;
645                      }
646 +                    j = -n;
647 +                    k = r;                // restart on contention
648                  }
649 <                j = -n;               // reset on contention
649 >                else if (++j <= 0)
650 >                    k = r;
651 >                else if (j <= n)
652 >                    k += (n >>> 1) | 1;
653 >                else
654 >                    break;
655              }
640            k = j >= 0? k + ((n >>> 1) | 1) : r;
656          }
657          return null;
658      }
# Line 877 | Line 892 | public class ForkJoinWorkerThread extend
892      }
893  
894      /**
895 <     * Returns a stolen task, if available, unless joinMe is done
895 >     * Executes or processes other tasks awaiting the given task
896 >     * @return task completion status
897 >     */
898 >    final int execWhileJoining(ForkJoinTask<?> joinMe) {
899 >        int s;
900 >        while ((s = joinMe.status) >= 0) {
901 >            ForkJoinTask<?> t = base != sp?
902 >                popWhileJoining(joinMe) :
903 >                scanWhileJoining(joinMe);
904 >            if (t != null)
905 >                t.tryExec();
906 >        }
907 >        return s;
908 >    }
909 >
910 >    /**
911 >     * Returns or stolen task, if available, unless joinMe is done
912       *
913       * This method is intrinsically nonmodular. To maintain the
914       * property that tasks are never stolen if the awaited task is
# Line 886 | Line 917 | public class ForkJoinWorkerThread extend
917       * to cancel a steal even after CASing slot to null, but before
918       * adjusting base index: If, after the CAS, we see that joinMe is
919       * ready, we can back out by placing the task back into the slot,
920 <     * without adjusting index. The scan loop is otherwise the same as
921 <     * in scan.
920 >     * without adjusting index. The loop is otherwise a variant of the
921 >     * one in scan().
922       *
892     * The outer loop cannot be allowed to run forever, because it
893     * could lead to a form of deadlock if all threads are executing
894     * this method. However, we must also be patient before giving up,
895     * to cope with GC stalls, transient high loads, etc. The loop
896     * terminates (causing caller to possibly block this thread and
897     * create a replacement) only after #workers clean sweeps during
898     * which all running threads are active.
923       */
924 <    final ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
901 <        int sweeps = 0;
924 >    private ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
925          int r = seed;
926          ForkJoinPool p = pool;
927 <        p.releaseWaiters(); // help other threads progress
928 <        while (joinMe.status >= 0) {
929 <            ForkJoinWorkerThread[] ws = p.workers;
930 <            int n = ws.length;
927 >        ForkJoinWorkerThread[] ws;
928 >        int n;
929 >        outer:while ((ws = p.workers) != null && (n = ws.length) > 1) {
930 >            int mask = n - 1;
931              int k = r;
932 <            for (int j = -n; j < n; ++j) {
933 <                ForkJoinWorkerThread v = ws[k & (n - 1)];
932 >            boolean contended = false; // to retry loop if deq contends
933 >            for (int j = -n; j <= n; ++j) {
934 >                if (joinMe.status < 0)
935 >                    break outer;
936 >                int b;
937 >                ForkJoinTask<?>[] q;
938 >                ForkJoinWorkerThread v = ws[k & mask];
939                  r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
940 <                if (v != null) {
941 <                    int b = v.base;
942 <                    ForkJoinTask<?>[] q;
943 <                    if (b != v.sp && (q = v.queue) != null) {
944 <                        int i = (q.length - 1) & b;
945 <                        ForkJoinTask<?> t = q[i];
946 <                        if (t != null) {
947 <                            if (joinMe.status < 0)
948 <                                return null;
949 <                            if (UNSAFE.compareAndSwapObject
922 <                                (q, (i << qShift) + qBase, t, null)) {
923 <                                if (joinMe.status < 0) {
924 <                                    writeSlot(q, i, t); // back out
925 <                                    return null;
926 <                                }
927 <                                v.base = b + 1;
928 <                                seed = r;
929 <                                ++stealCount;
930 <                                return t;
931 <                            }
940 >                if (v != null && (b=v.base) != v.sp && (q=v.queue) != null) {
941 >                    int i = (q.length - 1) & b;
942 >                    ForkJoinTask<?> t = q[i];
943 >                    if (t != null && UNSAFE.compareAndSwapObject
944 >                        (q, (i << qShift) + qBase, t, null)) {
945 >                        if (joinMe.status >= 0) {
946 >                            v.base = b + 1;
947 >                            seed = r;
948 >                            ++stealCount;
949 >                            return t;
950                          }
951 <                        sweeps = 0; // ensure rescan on contention
951 >                        UNSAFE.putObjectVolatile(q, (i<<qShift)+qBase, t);
952 >                        break outer; // back out
953                      }
954 +                    contended = true;
955 +                }
956 +                k = j < 0 ? r : (k + ((n >>> 1) | 1));
957 +            }
958 +            if (!contended && p.tryAwaitBusyJoin(joinMe))
959 +                break;
960 +        }
961 +        return null;
962 +    }
963 +
964 +    /**
965 +     * Version of popTask with join checks surrounding extraction.
966 +     * Uses the same backout strategy as helpJoinTask. Note that
967 +     * we ignore locallyFifo flag for local tasks here since helping
968 +     * joins only make sense in LIFO mode.
969 +     *
970 +     * @return a popped task, if available, unless joinMe is done
971 +     */
972 +    private ForkJoinTask<?> popWhileJoining(ForkJoinTask<?> joinMe) {
973 +        int s;
974 +        ForkJoinTask<?>[] q;
975 +        while ((s = sp) != base && (q = queue) != null && joinMe.status >= 0) {
976 +            int i = (q.length - 1) & --s;
977 +            ForkJoinTask<?> t = q[i];
978 +            if (t != null && UNSAFE.compareAndSwapObject
979 +                (q, (i << qShift) + qBase, t, null)) {
980 +                if (joinMe.status >= 0) {
981 +                    sp = s;
982 +                    return t;
983                  }
984 <                k = j >= 0? k + ((n >>> 1) | 1) : r;
985 <                if ((j & 7) == 0 && joinMe.status < 0) // periodically recheck
938 <                    return null;
984 >                UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t);
985 >                break;  // back out
986              }
940            if ((sweeps = p.inactiveCount() == 0 ? sweeps + 1 : 0) > n)
941                return null;
987          }
988          return null;
989      }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines