ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.34 by dl, Fri Jun 4 14:37:54 2010 UTC vs.
Revision 1.35 by dl, Wed Jul 7 19:52:32 2010 UTC

# Line 83 | Line 83 | public class ForkJoinWorkerThread extend
83       * by the ForkJoinPool).  This allows use in message-passing
84       * frameworks in which tasks are never joined.
85       *
86 <     * Efficient implementation of this approach currently relies on
86 >     * When a worker would otherwise be blocked waiting to join a
87 >     * task, it first tries a form of linear helping: Each worker
88 >     * records (in field stolen) the most recent task it stole
89 >     * from some other worker. Plus, it records (in field joining) the
90 >     * task it is currently actively joining. Method joinTask uses
91 >     * these markers to try to find a worker to help (i.e., steal back
92 >     * a task from and execute it) that could hasten completion of the
93 >     * actively joined task. In essence, the joiner executes a task
94 >     * that would be on its own local deque had the to-be-joined task
95 >     * not been stolen. This may be seen as a conservative variant of
96 >     * the approach in Wagner & Calder "Leapfrogging: a portable
97 >     * technique for implementing efficient futures" SIGPLAN Notices,
98 >     * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
99 >     * in that: (1) We only maintain dependency links across workers
100 >     * upon steals, rather than maintain per-task bookkeeping.  This
101 >     * requires a linear scan of workers array to locate stealers,
102 >     * which isolates cost to when it is needed, rather than adding to
103 >     * per-task overhead.  (2) It is "shallow", ignoring nesting and
104 >     * potentially cyclic mutual steals.  (3) It is intentionally
105 >     * racy: field joining is updated only while actively joining,
106 >     * which means that we could miss links in the chain during
107 >     * long-lived tasks, GC stalls etc.  (4) We fall back to
108 >     * suspending the worker and if necessary replacing it with a
109 >     * spare (see ForkJoinPool.tryAwaitJoin).
110 >     *
111 >     * Efficient implementation of these algorithms currently relies on
112       * an uncomfortable amount of "Unsafe" mechanics. To maintain
113       * correct orderings, reads and writes of variable base require
114       * volatile ordering.  Variable sp does not require volatile
# Line 157 | Line 182 | public class ForkJoinWorkerThread extend
182      final ForkJoinPool pool;
183  
184      /**
185 +     * The task most recently stolen from another worker
186 +     */
187 +    private volatile ForkJoinTask<?> stolen;
188 +
189 +    /**
190 +     * The task currently being joined, set only when actively
191 +     * trying to helpStealer.
192 +     */
193 +    private volatile ForkJoinTask<?> joining;
194 +
195 +    /**
196       * The work-stealing queue array. Size must be a power of two.
197       * Initialized in onStart, to improve memory locality.
198       */
# Line 196 | Line 232 | public class ForkJoinWorkerThread extend
232       * currently not exported but included because volatile write upon
233       * park also provides a workaround for a JVM bug.
234       */
235 <    private volatile int parkCount;
235 >    volatile int parkCount;
236  
237      /**
238       * Number of steals, transferred and reset in pool callbacks pool
# Line 221 | Line 257 | public class ForkJoinWorkerThread extend
257       * Shadows value from ForkJoinPool, which resets it if changed
258       * pool-wide.
259       */
260 <    private boolean locallyFifo;
261 <
260 >    private final boolean locallyFifo;
261 >    
262      /**
263       * Index of this worker in pool array. Set once by pool before
264       * running, and accessed directly by pool to locate this worker in
# Line 249 | Line 285 | public class ForkJoinWorkerThread extend
285       * @throws NullPointerException if pool is null
286       */
287      protected ForkJoinWorkerThread(ForkJoinPool pool) {
252        if (pool == null) throw new NullPointerException();
288          this.pool = pool;
289 +        this.locallyFifo = pool.locallyFifo;
290          // To avoid exposing construction details to subclasses,
291          // remaining initialization is in start() and onStart()
292      }
# Line 258 | Line 294 | public class ForkJoinWorkerThread extend
294      /**
295       * Performs additional initialization and starts this thread
296       */
297 <    final void start(int poolIndex, boolean locallyFifo,
262 <                     UncaughtExceptionHandler ueh) {
297 >    final void start(int poolIndex, UncaughtExceptionHandler ueh) {
298          this.poolIndex = poolIndex;
264        this.locallyFifo = locallyFifo;
299          if (ueh != null)
300              setUncaughtExceptionHandler(ueh);
301          setDaemon(true);
# Line 305 | Line 339 | public class ForkJoinWorkerThread extend
339          int rs = seedGenerator.nextInt();
340          seed = rs == 0? 1 : rs; // seed must be nonzero
341  
342 <        // Allocate name string and queue array in this thread
342 >        // Allocate name string and arrays in this thread
343          String pid = Integer.toString(pool.getPoolNumber());
344          String wid = Integer.toString(poolIndex);
345          setName("ForkJoinPool-" + pid + "-worker-" + wid);
# Line 323 | Line 357 | public class ForkJoinWorkerThread extend
357       */
358      protected void onTermination(Throwable exception) {
359          try {
360 +            stolen = null;
361 +            joining = null;
362              cancelTasks();
363              setTerminated();
364              pool.workerTerminated(this);
# Line 370 | Line 406 | public class ForkJoinWorkerThread extend
406                  t.tryExec();
407                  if (base != sp)
408                      runLocalTasks();
409 +                stolen = null;
410                  prevRan = ran = true;
411              }
412              else {
# Line 466 | Line 503 | public class ForkJoinWorkerThread extend
503      /**
504       * Tries to take a task from the base of the queue, failing if
505       * empty or contended. Note: Specializations of this code appear
506 <     * in scan and scanWhileJoining.
506 >     * in locallyDeqTask and elsewhere.
507       *
508       * @return a task, or null if none or contended
509       */
# Line 476 | Line 513 | public class ForkJoinWorkerThread extend
513          int b, i;
514          if ((b = base) != sp &&
515              (q = queue) != null && // must read q after b
516 <            (t = q[i = (q.length - 1) & b]) != null &&
516 >            (t = q[i = (q.length - 1) & b]) != null && base == b &&
517              UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) {
518              base = b + 1;
519              return t;
# Line 496 | Line 533 | public class ForkJoinWorkerThread extend
533              ForkJoinTask<?> t;
534              int b, i;
535              while (sp != (b = base)) {
536 <                if ((t = q[i = (q.length - 1) & b]) != null &&
536 >                if ((t = q[i = (q.length - 1) & b]) != null && base == b &&
537                      UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase,
538                                                  t, null)) {
539                      base = b + 1;
# Line 629 | Line 666 | public class ForkJoinWorkerThread extend
666                  ForkJoinWorkerThread v = ws[k & mask];
667                  r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
668                  if (v != null && v.base != v.sp) {
669 <                    int b, i;             // inline specialized deqTask
670 <                    ForkJoinTask<?>[] q;
671 <                    ForkJoinTask<?> t;
672 <                    if ((canSteal ||      // ensure active status
673 <                         (canSteal = active = p.tryIncrementActiveCount())) &&
674 <                        (q = v.queue) != null &&
675 <                        (t = q[i = (q.length - 1) & (b = v.base)]) != null &&
676 <                        UNSAFE.compareAndSwapObject
677 <                        (q, (i << qShift) + qBase, t, null)) {
678 <                        v.base = b + 1;
679 <                        seed = r;
680 <                        ++stealCount;
681 <                        return t;
669 >                    if (canSteal ||       // ensure active status
670 >                        (canSteal = active = p.tryIncrementActiveCount())) {
671 >                        int b = v.base;   // inline specialized deqTask
672 >                        ForkJoinTask<?>[] q;
673 >                        if (b != v.sp && (q = v.queue) != null) {
674 >                            ForkJoinTask<?> t;
675 >                            int i = (q.length - 1) & b;
676 >                            long u = (i << qShift) + qBase; // raw offset
677 >                            if ((t = q[i]) != null && v.base == b &&
678 >                                UNSAFE.compareAndSwapObject(q, u, t, null)) {
679 >                                stolen = t;
680 >                                v.base = b + 1;
681 >                                seed = r;
682 >                                ++stealCount;
683 >                                return t;
684 >                            }
685 >                        }
686                      }
687                      j = -n;
688                      k = r;                // restart on contention
# Line 696 | Line 737 | public class ForkJoinWorkerThread extend
737      }
738  
739      /**
740 <     * Instrumented version of park. Also used by ForkJoinPool.awaitEvent
740 >     * Instrumented version of park used by ForkJoinPool.awaitEvent
741       */
742      final void doPark() {
743          ++parkCount;
# Line 710 | Line 751 | public class ForkJoinWorkerThread extend
751       * @return true if successful
752       */
753      final boolean tryUnsuspend() {
754 <        int s;
755 <        return (((s = runState) & SUSPENDED) != 0 &&
756 <                UNSAFE.compareAndSwapInt(this, runStateOffset, s,
757 <                                         s & ~SUSPENDED));
754 >        int s = runState;
755 >        if ((s & SUSPENDED) != 0)
756 >            return UNSAFE.compareAndSwapInt(this, runStateOffset, s,
757 >                                            s & ~SUSPENDED);
758 >        return false;
759      }
760  
761      /**
# Line 734 | Line 776 | public class ForkJoinWorkerThread extend
776                                                s | SUSPENDED))
777                  break;
778          }
779 +        boolean timed;
780 +        long nanos;
781 +        long startTime;
782 +        if (poolIndex < pool.parallelism) {
783 +            timed = false;
784 +            nanos = 0L;
785 +            startTime = 0L;
786 +        }
787 +        else {
788 +            timed = true;
789 +            nanos = SPARE_KEEPALIVE_NANOS;
790 +            startTime = System.nanoTime();
791 +        }
792 +        pool.accumulateStealCount(this);
793          lastEventCount = 0;      // reset upon resume
738        ForkJoinPool p = pool;
739        p.releaseWaiters();      // help others progress
740        p.accumulateStealCount(this);
794          interrupted();           // clear/ignore interrupts
742        if (poolIndex < p.getParallelism()) { // untimed wait
743            while ((runState & SUSPENDED) != 0)
744                doPark();
745            return true;
746        }
747        return timedSuspend();   // timed wait if apparently non-core
748    }
749
750    /**
751     * Blocks as spare until resumed or timed out
752     * @return false if trimmed
753     */
754    private boolean timedSuspend() {
755        long nanos = SPARE_KEEPALIVE_NANOS;
756        long startTime = System.nanoTime();
795          while ((runState & SUSPENDED) != 0) {
796              ++parkCount;
797 <            if ((nanos -= (System.nanoTime() - startTime)) > 0)
797 >            if (!timed)
798 >                LockSupport.park(this);
799 >            else if ((nanos -= (System.nanoTime() - startTime)) > 0)
800                  LockSupport.parkNanos(this, nanos);
801              else { // try to trim on timeout
802                  int s = runState;
# Line 780 | Line 820 | public class ForkJoinWorkerThread extend
820      }
821  
822      /**
783     * Set locallyFifo mode. Called only by ForkJoinPool
784     */
785    final void setAsyncMode(boolean async) {
786        locallyFifo = async;
787    }
788
789    /**
823       * Removes and cancels all tasks in queue.  Can be called from any
824       * thread.
825       */
# Line 818 | Line 851 | public class ForkJoinWorkerThread extend
851      // Support methods for ForkJoinTask
852  
853      /**
854 +     * Possibly runs some tasks and/or blocks, until task is done.
855 +     *
856 +     * @param joinMe the task to join
857 +     */
858 +    final void joinTask(ForkJoinTask<?> joinMe) {
859 +        ForkJoinTask<?> prevJoining = joining;
860 +        joining = joinMe;
861 +        while (joinMe.status >= 0) {
862 +            int s = sp;
863 +            if (s == base) {
864 +                nonlocalJoinTask(joinMe);
865 +                break;
866 +            }
867 +            // process local task
868 +            ForkJoinTask<?> t;
869 +            ForkJoinTask<?>[] q = queue;
870 +            int i = (q.length - 1) & --s;
871 +            long u = (i << qShift) + qBase; // raw offset
872 +            if ((t = q[i]) != null &&
873 +                UNSAFE.compareAndSwapObject(q, u, t, null)) {
874 +                /*
875 +                 * This recheck (and similarly in nonlocalJoinTask)
876 +                 * handles cases where joinMe is independently
877 +                 * cancelled or forced even though there is other work
878 +                 * available. Back out of the pop by putting t back
879 +                 * into slot before we commit by setting sp.
880 +                 */
881 +                if (joinMe.status < 0) {
882 +                    UNSAFE.putObjectVolatile(q, u, t);
883 +                    break;
884 +                }
885 +                sp = s;
886 +                t.tryExec();
887 +            }
888 +        }
889 +        joining = prevJoining;
890 +    }
891 +
892 +    /**
893 +     * Tries to locate and help perform tasks for a stealer of the
894 +     * given task (or in turn one of its stealers), blocking (via
895 +     * pool.tryAwaitJoin) upon failure to find work.  Traces
896 +     * stolen->joining links looking for a thread working on
897 +     * a descendant of the given task and with a non-empty queue to
898 +     * steal back and execute tasks from. Inhibits mutual steal chains
899 +     * and scans on outer joins upon nesting to avoid unbounded
900 +     * growth.  Restarts search upon encountering inconsistencies.
901 +     * Tries to block if two passes agree that there are no remaining
902 +     * targets.
903 +     *
904 +     * @param joinMe the task to join
905 +     */
906 +    private void nonlocalJoinTask(ForkJoinTask<?> joinMe) {
907 +        ForkJoinPool p = pool;
908 +        int scans = p.parallelism;       // give up if too many retries
909 +        ForkJoinTask<?> bottom = null;   // target seen when can't descend
910 +        restart: while (joinMe.status >= 0) {
911 +            ForkJoinTask<?> target = null;
912 +            ForkJoinTask<?> next = joinMe;
913 +            while (scans >= 0 && next != null) {
914 +                --scans;
915 +                target = next;
916 +                next = null;
917 +                ForkJoinWorkerThread v = null;
918 +                ForkJoinWorkerThread[] ws = p.workers;
919 +                int n = ws.length;
920 +                for (int j = 0; j < n; ++j) {
921 +                    ForkJoinWorkerThread w = ws[j];
922 +                    if (w != null && w.stolen == target) {
923 +                        v = w;
924 +                        break;
925 +                    }
926 +                }
927 +                if (v != null && v != this) {
928 +                    ForkJoinTask<?> prevStolen = stolen;
929 +                    int b;
930 +                    ForkJoinTask<?>[] q;
931 +                    while ((b = v.base) != v.sp && (q = v.queue) != null) {
932 +                        int i = (q.length - 1) & b;
933 +                        long u = (i << qShift) + qBase;
934 +                        ForkJoinTask<?> t = q[i];
935 +                        if (target.status < 0)
936 +                            continue restart;
937 +                        if (t != null && v.base == b &&
938 +                            UNSAFE.compareAndSwapObject(q, u, t, null)) {
939 +                            if (joinMe.status < 0) {
940 +                                UNSAFE.putObjectVolatile(q, u, t);
941 +                                return; // back out
942 +                            }
943 +                            stolen = t;
944 +                            v.base = b + 1;
945 +                            t.tryExec();
946 +                            stolen = prevStolen;
947 +                        }
948 +                        if (joinMe.status < 0)
949 +                            return;
950 +                    }
951 +                    next = v.joining;
952 +                }
953 +                if (target.status < 0)
954 +                    continue restart;  // inconsistent
955 +                if (joinMe.status < 0)
956 +                    return;
957 +            }
958 +
959 +            if (bottom != target)
960 +                bottom = target;    // recheck landing spot
961 +            else if (p.tryAwaitJoin(joinMe) < 0)
962 +                return;             // successfully blocked
963 +            Thread.yield();         // tame spin in case too many active
964 +        }
965 +    }
966 +
967 +    /**
968       * Returns an estimate of the number of tasks, offset by a
969       * function of number of idle workers.
970       *
# Line 874 | Line 1021 | public class ForkJoinWorkerThread extend
1021       * @return a task, if available
1022       */
1023      final ForkJoinTask<?> pollLocalTask() {
1024 <        while (base != sp) {
1024 >        while (sp != base) {
1025              if (active || (active = pool.tryIncrementActiveCount()))
1026                  return locallyFifo? locallyDeqTask() : popTask();
1027          }
# Line 892 | Line 1039 | public class ForkJoinWorkerThread extend
1039      }
1040  
1041      /**
895     * Executes or processes other tasks awaiting the given task
896     * @return task completion status
897     */
898    final int execWhileJoining(ForkJoinTask<?> joinMe) {
899        int s;
900        while ((s = joinMe.status) >= 0) {
901            ForkJoinTask<?> t = base != sp?
902                popWhileJoining(joinMe) :
903                scanWhileJoining(joinMe);
904            if (t != null)
905                t.tryExec();
906        }
907        return s;
908    }
909
910    /**
911     * Returns or stolen task, if available, unless joinMe is done
912     *
913     * This method is intrinsically nonmodular. To maintain the
914     * property that tasks are never stolen if the awaited task is
915     * ready, we must interleave mechanics of scan with status
916     * checks. We rely here on the commit points of deq that allow us
917     * to cancel a steal even after CASing slot to null, but before
918     * adjusting base index: If, after the CAS, we see that joinMe is
919     * ready, we can back out by placing the task back into the slot,
920     * without adjusting index. The loop is otherwise a variant of the
921     * one in scan().
922     *
923     */
924    private ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
925        int r = seed;
926        ForkJoinPool p = pool;
927        ForkJoinWorkerThread[] ws;
928        int n;
929        outer:while ((ws = p.workers) != null && (n = ws.length) > 1) {
930            int mask = n - 1;
931            int k = r;
932            boolean contended = false; // to retry loop if deq contends
933            for (int j = -n; j <= n; ++j) {
934                if (joinMe.status < 0)
935                    break outer;
936                int b;
937                ForkJoinTask<?>[] q;
938                ForkJoinWorkerThread v = ws[k & mask];
939                r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
940                if (v != null && (b=v.base) != v.sp && (q=v.queue) != null) {
941                    int i = (q.length - 1) & b;
942                    ForkJoinTask<?> t = q[i];
943                    if (t != null && UNSAFE.compareAndSwapObject
944                        (q, (i << qShift) + qBase, t, null)) {
945                        if (joinMe.status >= 0) {
946                            v.base = b + 1;
947                            seed = r;
948                            ++stealCount;
949                            return t;
950                        }
951                        UNSAFE.putObjectVolatile(q, (i<<qShift)+qBase, t);
952                        break outer; // back out
953                    }
954                    contended = true;
955                }
956                k = j < 0 ? r : (k + ((n >>> 1) | 1));
957            }
958            if (!contended && p.tryAwaitBusyJoin(joinMe))
959                break;
960        }
961        return null;
962    }
963
964    /**
965     * Version of popTask with join checks surrounding extraction.
966     * Uses the same backout strategy as helpJoinTask. Note that
967     * we ignore locallyFifo flag for local tasks here since helping
968     * joins only make sense in LIFO mode.
969     *
970     * @return a popped task, if available, unless joinMe is done
971     */
972    private ForkJoinTask<?> popWhileJoining(ForkJoinTask<?> joinMe) {
973        int s;
974        ForkJoinTask<?>[] q;
975        while ((s = sp) != base && (q = queue) != null && joinMe.status >= 0) {
976            int i = (q.length - 1) & --s;
977            ForkJoinTask<?> t = q[i];
978            if (t != null && UNSAFE.compareAndSwapObject
979                (q, (i << qShift) + qBase, t, null)) {
980                if (joinMe.status >= 0) {
981                    sp = s;
982                    return t;
983                }
984                UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t);
985                break;  // back out
986            }
987        }
988        return null;
989    }
990
991    /**
1042       * Runs tasks until {@code pool.isQuiescent()}.
1043       */
1044      final void helpQuiescePool() {
1045          for (;;) {
1046              ForkJoinTask<?> t = pollLocalTask();
1047 <            if (t != null || (t = scan()) != null)
1047 >            if (t != null || (t = scan()) != null) {
1048                  t.tryExec();
1049 +                stolen = null;
1050 +            }
1051              else {
1052                  ForkJoinPool p = pool;
1053                  if (active) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines