ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.35 by dl, Wed Jul 7 19:52:32 2010 UTC vs.
Revision 1.36 by dl, Fri Jul 23 13:07:43 2010 UTC

# Line 85 | Line 85 | public class ForkJoinWorkerThread extend
85       *
86       * When a worker would otherwise be blocked waiting to join a
87       * task, it first tries a form of linear helping: Each worker
88 <     * records (in field stolen) the most recent task it stole
89 <     * from some other worker. Plus, it records (in field joining) the
90 <     * task it is currently actively joining. Method joinTask uses
88 >     * records (in field currentSteal) the most recent task it stole
89 >     * from some other worker. Plus, it records (in field currentJoin)
90 >     * the task it is currently actively joining. Method joinTask uses
91       * these markers to try to find a worker to help (i.e., steal back
92       * a task from and execute it) that could hasten completion of the
93       * actively joined task. In essence, the joiner executes a task
# Line 98 | Line 98 | public class ForkJoinWorkerThread extend
98       * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
99       * in that: (1) We only maintain dependency links across workers
100       * upon steals, rather than maintain per-task bookkeeping.  This
101 <     * requires a linear scan of workers array to locate stealers,
102 <     * which isolates cost to when it is needed, rather than adding to
101 >     * may require a linear scan of workers array to locate stealers,
102 >     * but usually doesn't because stealers leave hints (that may
103 >     * become stale/wrong) of where to locate the kathem. This
104 >     * isolates cost to when it is needed, rather than adding to
105       * per-task overhead.  (2) It is "shallow", ignoring nesting and
106       * potentially cyclic mutual steals.  (3) It is intentionally
107 <     * racy: field joining is updated only while actively joining,
107 >     * racy: field currentJoin is updated only while actively joining,
108       * which means that we could miss links in the chain during
109 <     * long-lived tasks, GC stalls etc.  (4) We fall back to
109 >     * long-lived tasks, GC stalls etc.  (4) We bound the number of
110 >     * attempts to find work (see MAX_HELP_DEPTH) and fall back to
111       * suspending the worker and if necessary replacing it with a
112       * spare (see ForkJoinPool.tryAwaitJoin).
113       *
114 <     * Efficient implementation of these algorithms currently relies on
115 <     * an uncomfortable amount of "Unsafe" mechanics. To maintain
114 >     * Efficient implementation of these algorithms currently relies
115 >     * on an uncomfortable amount of "Unsafe" mechanics. To maintain
116       * correct orderings, reads and writes of variable base require
117       * volatile ordering.  Variable sp does not require volatile
118       * writes but still needs store-ordering, which we accomplish by
119       * pre-incrementing sp before filling the slot with an ordered
120       * store.  (Pre-incrementing also enables backouts used in
121 <     * scanWhileJoining.)  Because they are protected by volatile base
122 <     * reads, reads of the queue array and its slots by other threads
123 <     * do not need volatile load semantics, but writes (in push)
124 <     * require store order and CASes (in pop and deq) require
125 <     * (volatile) CAS semantics.  (Michael, Saraswat, and Vechev's
126 <     * algorithm has similar properties, but without support for
127 <     * nulling slots.)  Since these combinations aren't supported
128 <     * using ordinary volatiles, the only way to accomplish these
129 <     * efficiently is to use direct Unsafe calls. (Using external
130 <     * AtomicIntegers and AtomicReferenceArrays for the indices and
131 <     * array is significantly slower because of memory locality and
132 <     * indirection effects.)
121 >     * joinTask.)  Because they are protected by volatile base reads,
122 >     * reads of the queue array and its slots by other threads do not
123 >     * need volatile load semantics, but writes (in push) require
124 >     * store order and CASes (in pop and deq) require (volatile) CAS
125 >     * semantics.  (Michael, Saraswat, and Vechev's algorithm has
126 >     * similar properties, but without support for nulling slots.)
127 >     * Since these combinations aren't supported using ordinary
128 >     * volatiles, the only way to accomplish these efficiently is to
129 >     * use direct Unsafe calls. (Using external AtomicIntegers and
130 >     * AtomicReferenceArrays for the indices and array is
131 >     * significantly slower because of memory locality and indirection
132 >     * effects.)
133       *
134       * Further, performance on most platforms is very sensitive to
135       * placement and sizing of the (resizable) queue array.  Even
# Line 162 | Line 165 | public class ForkJoinWorkerThread extend
165          5L * 1000L * 1000L * 1000L; // 5 secs
166  
167      /**
168 +     * The maximum stolen->joining link depth allowed in helpJoinTask.
169 +     * Depths for legitimate chains are unbounded, but we use a fixed
170 +     * constant to avoid (otherwise unchecked) cycles and bound
171 +     * staleness of traversal parameters at the expense of sometimes
172 +     * blocking when we could be helping.
173 +     */
174 +    private static final int MAX_HELP_DEPTH = 8;
175 +
176 +    /**
177       * Capacity of work-stealing queue array upon initialization.
178       * Must be a power of two. Initial size must be at least 4, but is
179       * padded to minimize cache effects.
# Line 182 | Line 194 | public class ForkJoinWorkerThread extend
194      final ForkJoinPool pool;
195  
196      /**
185     * The task most recently stolen from another worker
186     */
187    private volatile ForkJoinTask<?> stolen;
188
189    /**
190     * The task currently being joined, set only when actively
191     * trying to helpStealer.
192     */
193    private volatile ForkJoinTask<?> joining;
194
195    /**
197       * The work-stealing queue array. Size must be a power of two.
198       * Initialized in onStart, to improve memory locality.
199       */
200      private ForkJoinTask<?>[] queue;
201 <
201 >    
202      /**
203       * Index (mod queue.length) of least valid queue slot, which is
204       * always the next position to steal from if nonempty.
# Line 214 | Line 215 | public class ForkJoinWorkerThread extend
215      private int sp;
216  
217      /**
218 +     * The index of most recent stealer, used as a hint to avoid
219 +     * traversal in method helpJoinTask. This is only a hint because a
220 +     * worker might have had multiple steals and this only holds one
221 +     * of them (usually the most current). Declared non-volatile,
222 +     * relying on other prevailing sync to keep reasonably current.
223 +     */
224 +    private int stealHint;
225 +
226 +    /**
227       * Run state of this worker. In addition to the usual run levels,
228       * tracks if this worker is suspended as a spare, and if it was
229       * killed (trimmed) while suspended. However, "active" status is
# Line 246 | Line 256 | public class ForkJoinWorkerThread extend
256       */
257      private int seed;
258  
259 +
260      /**
261       * Activity status. When true, this worker is considered active.
262       * Accessed directly by pool.  Must be false upon construction.
# Line 279 | Line 290 | public class ForkJoinWorkerThread extend
290      volatile long nextWaiter;
291  
292      /**
293 +     * The task currently being joined, set only when actively trying
294 +     * to helpStealer. Written only by current thread, but read by
295 +     * others.
296 +     */
297 +    private volatile ForkJoinTask<?> currentJoin;
298 +    
299 +    /**
300 +     * The task most recently stolen from another worker (or
301 +     * submission queue).  Not volatile because always read/written in
302 +     * presence of related volatiles in those cases where it matters.
303 +     */
304 +    private ForkJoinTask<?> currentSteal;
305 +
306 +    /**
307       * Creates a ForkJoinWorkerThread operating in the given pool.
308       *
309       * @param pool the pool this thread works in
# Line 357 | Line 382 | public class ForkJoinWorkerThread extend
382       */
383      protected void onTermination(Throwable exception) {
384          try {
360            stolen = null;
361            joining = null;
385              cancelTasks();
386              setTerminated();
387              pool.workerTerminated(this);
# Line 394 | Line 417 | public class ForkJoinWorkerThread extend
417       * Find and execute tasks and check status while running
418       */
419      private void mainLoop() {
420 <        boolean ran = false;      // true if ran task in last loop iter
398 <        boolean prevRan = false;  // true if ran on last or previous step
420 >        int emptyScans = 0; // consecutive times failed to find work
421          ForkJoinPool p = pool;
422          for (;;) {
423 <            p.preStep(this, prevRan);
423 >            p.preStep(this, emptyScans);
424              if (runState != 0)
425                  return;
426              ForkJoinTask<?> t; // try to get and run stolen or submitted task
# Line 406 | Line 428 | public class ForkJoinWorkerThread extend
428                  t.tryExec();
429                  if (base != sp)
430                      runLocalTasks();
431 <                stolen = null;
432 <                prevRan = ran = true;
411 <            }
412 <            else {
413 <                prevRan = ran;
414 <                ran = false;
431 >                currentSteal = null;
432 >                emptyScans = 0;
433              }
434 +            else
435 +                ++emptyScans;
436          }
437      }
438  
# Line 440 | Line 460 | public class ForkJoinWorkerThread extend
460          while (p.hasQueuedSubmissions()) {
461              if (active || (active = p.tryIncrementActiveCount())) {
462                  ForkJoinTask<?> t = p.pollSubmission();
463 <                return t != null ? t : scan(); // if missed, rescan
463 >                if (t != null) {
464 >                    currentSteal = t;
465 >                    return t;
466 >                }
467 >                return scan(); // if missed, rescan
468              }
469          }
470          return null;
# Line 546 | Line 570 | public class ForkJoinWorkerThread extend
570  
571      /**
572       * Returns a popped task, or null if empty. Assumes active status.
573 <     * Called only by current thread. (Note: a specialization of this
550 <     * code appears in popWhileJoining.)
573 >     * Called only by current thread.
574       */
575      final ForkJoinTask<?> popTask() {
576          int s;
# Line 676 | Line 699 | public class ForkJoinWorkerThread extend
699                              long u = (i << qShift) + qBase; // raw offset
700                              if ((t = q[i]) != null && v.base == b &&
701                                  UNSAFE.compareAndSwapObject(q, u, t, null)) {
702 <                                stolen = t;
702 >                                currentSteal = t;
703 >                                v.stealHint = poolIndex;
704                                  v.base = b + 1;
705                                  seed = r;
706                                  ++stealCount;
# Line 745 | Line 769 | public class ForkJoinWorkerThread extend
769      }
770  
771      /**
772 <     * If suspended, tries to set status to unsuspended.
749 <     * Caller must unpark to actually resume
772 >     * If suspended, tries to set status to unsuspended and unparks.
773       *
774       * @return true if successful
775       */
776 <    final boolean tryUnsuspend() {
776 >    final boolean tryResumeSpare() {
777          int s = runState;
778 <        if ((s & SUSPENDED) != 0)
779 <            return UNSAFE.compareAndSwapInt(this, runStateOffset, s,
780 <                                            s & ~SUSPENDED);
778 >        if ((s & SUSPENDED) != 0 &&
779 >            UNSAFE.compareAndSwapInt(this, runStateOffset, s,
780 >                                     s & ~SUSPENDED)) {
781 >            LockSupport.unpark(this);
782 >            return true;
783 >        }
784          return false;
785      }
786  
# Line 824 | Line 850 | public class ForkJoinWorkerThread extend
850       * thread.
851       */
852      final void cancelTasks() {
853 +        ForkJoinTask<?> cj = currentJoin; // try to kill live tasks
854 +        if (cj != null) {
855 +            currentJoin = null;
856 +            cj.cancelIgnoringExceptions();
857 +        }
858 +        ForkJoinTask<?> cs = currentSteal;
859 +        if (cs != null) {
860 +            currentSteal = null;
861 +            cs.cancelIgnoringExceptions();
862 +        }
863          while (base != sp) {
864              ForkJoinTask<?> t = deqTask();
865              if (t != null)
# Line 851 | Line 887 | public class ForkJoinWorkerThread extend
887      // Support methods for ForkJoinTask
888  
889      /**
890 +     * Gets and removes a local task.
891 +     *
892 +     * @return a task, if available
893 +     */
894 +    final ForkJoinTask<?> pollLocalTask() {
895 +        while (sp != base) {
896 +            if (active || (active = pool.tryIncrementActiveCount()))
897 +                return locallyFifo? locallyDeqTask() : popTask();
898 +        }
899 +        return null;
900 +    }
901 +
902 +    /**
903 +     * Gets and removes a local or stolen task.
904 +     *
905 +     * @return a task, if available
906 +     */
907 +    final ForkJoinTask<?> pollTask() {
908 +        ForkJoinTask<?> t;
909 +        return (t = pollLocalTask()) != null ? t : scan();
910 +    }
911 +
912 +    /**
913       * Possibly runs some tasks and/or blocks, until task is done.
914 +     * The main body is basically a big spinloop, alternating between
915 +     * calls to helpJoinTask and pool.tryAwaitJoin with increased
916 +     * patience parameters until either the task is done without
917 +     * waiting, or we have, if necessary, created or resumed a
918 +     * replacement for this thread while it blocks.
919       *
920       * @param joinMe the task to join
921 +     * @return task status on exit
922       */
923 <    final void joinTask(ForkJoinTask<?> joinMe) {
924 <        ForkJoinTask<?> prevJoining = joining;
925 <        joining = joinMe;
926 <        while (joinMe.status >= 0) {
927 <            int s = sp;
928 <            if (s == base) {
929 <                nonlocalJoinTask(joinMe);
930 <                break;
931 <            }
932 <            // process local task
923 >    final int joinTask(ForkJoinTask<?> joinMe) {
924 >        int stat;
925 >        ForkJoinTask<?> prevJoin = currentJoin;
926 >        currentJoin = joinMe;
927 >        if ((stat = joinMe.status) >= 0 &&
928 >            (sp == base || (stat = localHelpJoinTask(joinMe)) >= 0)) {
929 >            ForkJoinPool p = pool;
930 >            int helpRetries = 2;     // initial patience values
931 >            int awaitRetries = -1;   // -1 is sentinel for replace-check only
932 >            do {
933 >                helpJoinTask(joinMe, helpRetries);
934 >                if ((stat = joinMe.status) < 0)
935 >                    break;
936 >                boolean busy = p.tryAwaitJoin(joinMe, awaitRetries);
937 >                if ((stat = joinMe.status) < 0)
938 >                    break;
939 >                if (awaitRetries == -1)
940 >                    awaitRetries = 0;
941 >                else if (busy)
942 >                    ++awaitRetries;
943 >                if (helpRetries < p.parallelism)
944 >                    helpRetries <<= 1;
945 >                Thread.yield(); // tame unbounded loop
946 >            } while (joinMe.status >= 0);
947 >        }
948 >        currentJoin = prevJoin;
949 >        return stat;
950 >    }
951 >
952 >    /**
953 >     * Run tasks in local queue until given task is done.
954 >     *
955 >     * @param joinMe the task to join
956 >     * @return task status on exit
957 >     */
958 >    private int localHelpJoinTask(ForkJoinTask<?> joinMe) {
959 >        int stat, s;
960 >        ForkJoinTask<?>[] q;
961 >        while ((stat = joinMe.status) >= 0 &&
962 >               base != (s = sp) && (q = queue) != null) {
963              ForkJoinTask<?> t;
869            ForkJoinTask<?>[] q = queue;
964              int i = (q.length - 1) & --s;
965              long u = (i << qShift) + qBase; // raw offset
966              if ((t = q[i]) != null &&
967                  UNSAFE.compareAndSwapObject(q, u, t, null)) {
968                  /*
969 <                 * This recheck (and similarly in nonlocalJoinTask)
969 >                 * This recheck (and similarly in helpJoinTask)
970                   * handles cases where joinMe is independently
971                   * cancelled or forced even though there is other work
972                   * available. Back out of the pop by putting t back
973 <                 * into slot before we commit by setting sp.
973 >                 * into slot before we commit by writing sp.
974                   */
975 <                if (joinMe.status < 0) {
975 >                if ((stat = joinMe.status) < 0) {
976                      UNSAFE.putObjectVolatile(q, u, t);
977                      break;
978                  }
# Line 886 | Line 980 | public class ForkJoinWorkerThread extend
980                  t.tryExec();
981              }
982          }
983 <        joining = prevJoining;
983 >        return stat;
984      }
985  
986      /**
987       * Tries to locate and help perform tasks for a stealer of the
988 <     * given task (or in turn one of its stealers), blocking (via
989 <     * pool.tryAwaitJoin) upon failure to find work.  Traces
896 <     * stolen->joining links looking for a thread working on
988 >     * given task, or in turn one of its stealers.  Traces
989 >     * currentSteal->currentJoin links looking for a thread working on
990       * a descendant of the given task and with a non-empty queue to
991 <     * steal back and execute tasks from. Inhibits mutual steal chains
992 <     * and scans on outer joins upon nesting to avoid unbounded
993 <     * growth.  Restarts search upon encountering inconsistencies.
994 <     * Tries to block if two passes agree that there are no remaining
995 <     * targets.
991 >     * steal back and execute tasks from. Restarts search upon
992 >     * encountering chains that are stale, unknown, or of length
993 >     * greater than MAX_HELP_DEPTH links, to avoid unbounded cycles.
994 >     *
995 >     * The implementation is very branchy to cope with the restart
996 >     * cases.  Returns void, not task status (which must be reread by
997 >     * caller anyway) to slightly simplify control paths.
998       *
999       * @param joinMe the task to join
1000       */
1001 <    private void nonlocalJoinTask(ForkJoinTask<?> joinMe) {
1002 <        ForkJoinPool p = pool;
1003 <        int scans = p.parallelism;       // give up if too many retries
1004 <        ForkJoinTask<?> bottom = null;   // target seen when can't descend
1005 <        restart: while (joinMe.status >= 0) {
1006 <            ForkJoinTask<?> target = null;
1007 <            ForkJoinTask<?> next = joinMe;
1008 <            while (scans >= 0 && next != null) {
1009 <                --scans;
1010 <                target = next;
1011 <                next = null;
1012 <                ForkJoinWorkerThread v = null;
1013 <                ForkJoinWorkerThread[] ws = p.workers;
1014 <                int n = ws.length;
1015 <                for (int j = 0; j < n; ++j) {
1016 <                    ForkJoinWorkerThread w = ws[j];
1017 <                    if (w != null && w.stolen == target) {
1018 <                        v = w;
1019 <                        break;
1001 >    final void helpJoinTask(ForkJoinTask<?> joinMe, int retries) {
1002 >        ForkJoinWorkerThread[] ws = pool.workers;
1003 >        int n;
1004 >        if (ws == null || (n = ws.length) <= 1)
1005 >            return;                   // need at least 2 workers
1006 >
1007 >        restart:while (joinMe.status >= 0 && --retries >= 0) {
1008 >            ForkJoinTask<?> task = joinMe;        // base of chain
1009 >            ForkJoinWorkerThread thread = this;   // thread with stolen task
1010 >            for (int depth = 0; depth < MAX_HELP_DEPTH; ++depth) {
1011 >                // Try to find v, the stealer of task, by first using hint
1012 >                ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
1013 >                if (v == null || v.currentSteal != task) {
1014 >                    for (int j = 0; ; ++j) {      // search array
1015 >                        if (task.status < 0 || j == n)
1016 >                            continue restart;     // stale or no stealer
1017 >                        if ((v = ws[j]) != null && v.currentSteal == task) {
1018 >                            thread.stealHint = j; // save for next time
1019 >                            break;
1020 >                        }
1021                      }
1022                  }
1023 <                if (v != null && v != this) {
1024 <                    ForkJoinTask<?> prevStolen = stolen;
1025 <                    int b;
1026 <                    ForkJoinTask<?>[] q;
1027 <                    while ((b = v.base) != v.sp && (q = v.queue) != null) {
1028 <                        int i = (q.length - 1) & b;
1029 <                        long u = (i << qShift) + qBase;
1030 <                        ForkJoinTask<?> t = q[i];
1031 <                        if (target.status < 0)
1032 <                            continue restart;
1033 <                        if (t != null && v.base == b &&
1034 <                            UNSAFE.compareAndSwapObject(q, u, t, null)) {
1035 <                            if (joinMe.status < 0) {
1023 >                // Try to help v, using specialized form of deqTask
1024 >                int b;
1025 >                ForkJoinTask<?>[] q;
1026 >                while ((b = v.base) != v.sp && (q = v.queue) != null) {
1027 >                    int i = (q.length - 1) & b;
1028 >                    long u = (i << qShift) + qBase;
1029 >                    ForkJoinTask<?> t = q[i];
1030 >                    if (task.status < 0)          // stale
1031 >                        continue restart;
1032 >                    if (v.base == b) {            // recheck after reading t
1033 >                        if (t == null)            // producer stalled
1034 >                            continue restart;     // retry via restart
1035 >                        if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
1036 >                            if (joinMe.status < 0) {
1037                                  UNSAFE.putObjectVolatile(q, u, t);
1038 <                                return; // back out
1038 >                                return;           // back out on cancel
1039                              }
1040 <                            stolen = t;
1040 >                            ForkJoinTask<?> prevSteal = currentSteal;
1041 >                            currentSteal = t;
1042 >                            v.stealHint = poolIndex;
1043                              v.base = b + 1;
1044                              t.tryExec();
1045 <                            stolen = prevStolen;
1045 >                            currentSteal = prevSteal;
1046                          }
948                        if (joinMe.status < 0)
949                            return;
1047                      }
1048 <                    next = v.joining;
1048 >                    if (joinMe.status < 0)
1049 >                        return;
1050                  }
1051 <                if (target.status < 0)
1052 <                    continue restart;  // inconsistent
1053 <                if (joinMe.status < 0)
1051 >                // Try to descend to find v's stealer
1052 >                ForkJoinTask<?> next = v.currentJoin;
1053 >                if (next == null || task.status < 0)
1054 >                    continue restart;             // no descendent or stale
1055 >                if (joinMe.status < 0)
1056                      return;
1057 +                task = next;
1058 +                thread = v;
1059              }
958
959            if (bottom != target)
960                bottom = target;    // recheck landing spot
961            else if (p.tryAwaitJoin(joinMe) < 0)
962                return;             // successfully blocked
963            Thread.yield();         // tame spin in case too many active
1060          }
1061      }
1062  
# Line 1016 | Line 1112 | public class ForkJoinWorkerThread extend
1112      }
1113  
1114      /**
1019     * Gets and removes a local task.
1020     *
1021     * @return a task, if available
1022     */
1023    final ForkJoinTask<?> pollLocalTask() {
1024        while (sp != base) {
1025            if (active || (active = pool.tryIncrementActiveCount()))
1026                return locallyFifo? locallyDeqTask() : popTask();
1027        }
1028        return null;
1029    }
1030
1031    /**
1032     * Gets and removes a local or stolen task.
1033     *
1034     * @return a task, if available
1035     */
1036    final ForkJoinTask<?> pollTask() {
1037        ForkJoinTask<?> t;
1038        return (t = pollLocalTask()) != null ? t : scan();
1039    }
1040
1041    /**
1115       * Runs tasks until {@code pool.isQuiescent()}.
1116       */
1117      final void helpQuiescePool() {
# Line 1046 | Line 1119 | public class ForkJoinWorkerThread extend
1119              ForkJoinTask<?> t = pollLocalTask();
1120              if (t != null || (t = scan()) != null) {
1121                  t.tryExec();
1122 <                stolen = null;
1122 >                currentSteal = null;
1123              }
1124              else {
1125                  ForkJoinPool p = pool;
# Line 1070 | Line 1143 | public class ForkJoinWorkerThread extend
1143          objectFieldOffset("runState", ForkJoinWorkerThread.class);
1144      private static final long qBase =
1145          UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
1146 +    private static final long threadStatusOffset =
1147 +        objectFieldOffset("threadStatus", Thread.class);
1148      private static final int qShift;
1149  
1150      static {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines