ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.42 by dl, Sun Aug 29 23:34:46 2010 UTC vs.
Revision 1.52 by dl, Sun Oct 10 11:56:11 2010 UTC

# Line 172 | Line 172 | public class ForkJoinWorkerThread extend
172  
173      /**
174       * Maximum work-stealing queue array size.  Must be less than or
175 <     * equal to 1 << 28 to ensure lack of index wraparound. (This
176 <     * is less than usual bounds, because we need leftshift by 3
177 <     * to be in int range).
175 >     * equal to 1 << (31 - width of array entry) to ensure lack of
176 >     * index wraparound. The value is set in the static block
177 >     * at the end of this file after obtaining width.
178       */
179 <    private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28;
179 >    private static final int MAXIMUM_QUEUE_CAPACITY;
180  
181      /**
182       * The pool this thread works in. Accessed directly by ForkJoinTask.
# Line 230 | Line 230 | public class ForkJoinWorkerThread extend
230      private static final int TRIMMED     = 0x08; // killed while suspended
231  
232      /**
233 <     * Number of steals, transferred and reset in pool callbacks pool
234 <     * when idle Accessed directly by pool.
233 >     * Number of steals. Directly accessed (and reset) by
234 >     * pool.tryAccumulateStealCount when idle.
235       */
236      int stealCount;
237  
# Line 286 | Line 286 | public class ForkJoinWorkerThread extend
286  
287      /**
288       * The task currently being joined, set only when actively trying
289 <     * to helpStealer. Written only by current thread, but read by
290 <     * others.
289 >     * to help other stealers in helpJoinTask. Written only by this
290 >     * thread, but read by others.
291       */
292      private volatile ForkJoinTask<?> currentJoin;
293  
294      /**
295       * The task most recently stolen from another worker (or
296 <     * submission queue).  Written only by current thread, but read by
296 >     * submission queue).  Written only by this thread, but read by
297       * others.
298       */
299      private volatile ForkJoinTask<?> currentSteal;
# Line 313 | Line 313 | public class ForkJoinWorkerThread extend
313      }
314  
315      /**
316 <     * Performs additional initialization and starts this thread
316 >     * Performs additional initialization and starts this thread.
317       */
318      final void start(int poolIndex, UncaughtExceptionHandler ueh) {
319          this.poolIndex = poolIndex;
# Line 349 | Line 349 | public class ForkJoinWorkerThread extend
349      /**
350       * Initializes internal state after construction but before
351       * processing any tasks. If you override this method, you must
352 <     * invoke super.onStart() at the beginning of the method.
352 >     * invoke @code{super.onStart()} at the beginning of the method.
353       * Initialization requires care: Most fields must have legal
354       * default values, to ensure that attempted accesses from other
355       * threads work correctly even before this thread starts
# Line 381 | Line 381 | public class ForkJoinWorkerThread extend
381              if (active) {
382                  int a; // inline p.tryDecrementActiveCount
383                  active = false;
384 <                do {} while(!UNSAFE.compareAndSwapInt
385 <                            (p, poolRunStateOffset, a = p.runState, a - 1));
384 >                do {} while (!UNSAFE.compareAndSwapInt
385 >                             (p, poolRunStateOffset, a = p.runState, a - 1));
386              }
387              cancelTasks();
388              setTerminated();
# Line 416 | Line 416 | public class ForkJoinWorkerThread extend
416      // helpers for run()
417  
418      /**
419 <     * Find and execute tasks and check status while running
419 >     * Finds and executes tasks, and checks status while running.
420       */
421      private void mainLoop() {
422          boolean ran = false; // true if ran a task on last step
# Line 430 | Line 430 | public class ForkJoinWorkerThread extend
430      }
431  
432      /**
433 <     * Try to steal a task and execute it
433 >     * Tries to steal a task and execute it.
434       *
435       * @return true if ran a task
436       */
# Line 447 | Line 447 | public class ForkJoinWorkerThread extend
447      }
448  
449      /**
450 <     * If a submission exists, try to activate and run it;
450 >     * If a submission exists, try to activate and run it.
451       *
452       * @return true if ran a task
453       */
454      private boolean tryExecSubmission() {
455          ForkJoinPool p = pool;
456 +        // This loop is needed in case attempt to activate fails, in
457 +        // which case we only retry if there still appears to be a
458 +        // submission.
459          while (p.hasQueuedSubmissions()) {
460              ForkJoinTask<?> t; int a;
461              if (active || // inline p.tryIncrementActiveCount
# Line 477 | Line 480 | public class ForkJoinWorkerThread extend
480       */
481      private void execLocalTasks() {
482          while (runState == 0) {
483 <            ForkJoinTask<?> t = locallyFifo? locallyDeqTask() : popTask();
483 >            ForkJoinTask<?> t = locallyFifo ? locallyDeqTask() : popTask();
484              if (t != null)
485                  t.quietlyExec();
486              else if (sp == base)
# Line 487 | Line 490 | public class ForkJoinWorkerThread extend
490  
491      /*
492       * Intrinsics-based atomic writes for queue slots. These are
493 <     * basically the same as methods in AtomicObjectArray, but
493 >     * basically the same as methods in AtomicReferenceArray, but
494       * specialized for (1) ForkJoinTask elements (2) requirement that
495       * nullness and bounds checks have already been performed by
496       * callers and (3) effective offsets are known not to overflow
497       * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't
498       * need corresponding version for reads: plain array reads are OK
499 <     * because they protected by other volatile reads and are
499 >     * because they are protected by other volatile reads and are
500       * confirmed by CASes.
501       *
502       * Most uses don't actually call these methods, but instead contain
# Line 517 | Line 520 | public class ForkJoinWorkerThread extend
520       * range. This method is used only during resets and backouts.
521       */
522      private static final void writeSlot(ForkJoinTask<?>[] q, int i,
523 <                                              ForkJoinTask<?> t) {
523 >                                        ForkJoinTask<?> t) {
524          UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t);
525      }
526  
# Line 562 | Line 565 | public class ForkJoinWorkerThread extend
565  
566      /**
567       * Tries to take a task from the base of own queue. Assumes active
568 <     * status.  Called only by current thread.
568 >     * status.  Called only by this thread.
569       *
570       * @return a task, or null if none
571       */
# Line 585 | Line 588 | public class ForkJoinWorkerThread extend
588  
589      /**
590       * Returns a popped task, or null if empty. Assumes active status.
591 <     * Called only by current thread.
591 >     * Called only by this thread.
592       */
593      private ForkJoinTask<?> popTask() {
594          ForkJoinTask<?>[] q = queue;
# Line 609 | Line 612 | public class ForkJoinWorkerThread extend
612  
613      /**
614       * Specialized version of popTask to pop only if topmost element
615 <     * is the given task. Called only by current thread while
613 <     * active.
615 >     * is the given task. Called only by this thread while active.
616       *
617       * @param t the task. Caller must ensure non-null.
618       */
# Line 628 | Line 630 | public class ForkJoinWorkerThread extend
630      }
631  
632      /**
633 <     * Returns next task or null if empty or contended
633 >     * Returns next task, or null if empty or contended.
634       */
635      final ForkJoinTask<?> peekTask() {
636          ForkJoinTask<?>[] q = queue;
# Line 670 | Line 672 | public class ForkJoinWorkerThread extend
672       * Computes next value for random victim probe in scan().  Scans
673       * don't require a very high quality generator, but also not a
674       * crummy one.  Marsaglia xor-shift is cheap and works well enough.
675 <     * Note: This is manually inlined in scan()
675 >     * Note: This is manually inlined in scan().
676       */
677      private static final int xorShift(int r) {
678          r ^= r << 13;
# Line 749 | Line 751 | public class ForkJoinWorkerThread extend
751  
752      // status check methods used mainly by ForkJoinPool
753      final boolean isRunning()     { return runState == 0; }
752    final boolean isTerminating() { return (runState & TERMINATING) != 0; }
754      final boolean isTerminated()  { return (runState & TERMINATED) != 0; }
755      final boolean isSuspended()   { return (runState & SUSPENDED) != 0; }
756      final boolean isTrimmed()     { return (runState & TRIMMED) != 0; }
757  
758 +    final boolean isTerminating() {
759 +        if ((runState & TERMINATING) != 0)
760 +            return true;
761 +        if (pool.isAtLeastTerminating()) { // propagate pool state
762 +            shutdown();
763 +            return true;
764 +        }
765 +        return false;
766 +    }
767 +
768      /**
769       * Sets state to TERMINATING. Does NOT unpark or interrupt
770       * to wake up if currently blocked. Callers must do so if desired.
# Line 776 | Line 787 | public class ForkJoinWorkerThread extend
787      }
788  
789      /**
790 <     * Sets state to TERMINATED. Called only by onTermination()
790 >     * Sets state to TERMINATED. Called only by onTermination().
791       */
792      private void setTerminated() {
793          int s;
# Line 897 | Line 908 | public class ForkJoinWorkerThread extend
908              if (active ||
909                  (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
910                                                     a = p.runState, a + 1)))
911 <                return locallyFifo? locallyDeqTask() : popTask();
911 >                return locallyFifo ? locallyDeqTask() : popTask();
912          }
913          return null;
914      }
# Line 926 | Line 937 | public class ForkJoinWorkerThread extend
937          // currentJoin only written by this thread; only need ordered store
938          ForkJoinTask<?> prevJoin = currentJoin;
939          UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
940 <        if (sp != base)
941 <            localHelpJoinTask(joinMe);
942 <        if (joinMe.status >= 0)
943 <            pool.awaitJoin(joinMe, this);
940 >        if (isTerminating())                // cancel if shutting down
941 >            joinMe.cancelIgnoringExceptions();
942 >        else {
943 >            if (sp != base)
944 >                localHelpJoinTask(joinMe);
945 >            if (joinMe.status >= 0)
946 >                pool.awaitJoin(joinMe, this);
947 >        }
948          UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
949      }
950  
# Line 967 | Line 982 | public class ForkJoinWorkerThread extend
982      }
983  
984      /**
985 <     * Unless terminating, tries to locate and help perform tasks for
986 <     * a stealer of the given task, or in turn one of its stealers.
987 <     * Traces currentSteal->currentJoin links looking for a thread
988 <     * working on a descendant of the given task and with a non-empty
989 <     * queue to steal back and execute tasks from.
985 >     * Tries to locate and help perform tasks for a stealer of the
986 >     * given task, or in turn one of its stealers.  Traces
987 >     * currentSteal->currentJoin links looking for a thread working on
988 >     * a descendant of the given task and with a non-empty queue to
989 >     * steal back and execute tasks from.
990       *
991       * The implementation is very branchy to cope with potential
992       * inconsistencies or loops encountering chains that are stale,
# Line 987 | Line 1002 | public class ForkJoinWorkerThread extend
1002          int n;
1003          if (joinMe.status < 0)                // already done
1004              return;
990        if ((runState & TERMINATING) != 0) {  // cancel if shutting down
991            joinMe.cancelIgnoringExceptions();
992            return;
993        }
1005          if ((ws = pool.workers) == null || (n = ws.length) <= 1)
1006              return;                           // need at least 2 workers
1007  
# Line 1055 | Line 1066 | public class ForkJoinWorkerThread extend
1066      }
1067  
1068      /**
1069 +     * Implements ForkJoinTask.getSurplusQueuedTaskCount().
1070       * Returns an estimate of the number of tasks, offset by a
1071       * function of number of idle workers.
1072       *
# Line 1126 | Line 1138 | public class ForkJoinWorkerThread extend
1138                  }
1139                  if (p.isQuiescent()) {
1140                      active = true; // re-activate
1141 <                    do {} while(!UNSAFE.compareAndSwapInt
1142 <                                (p, poolRunStateOffset, a = p.runState, a+1));
1141 >                    do {} while (!UNSAFE.compareAndSwapInt
1142 >                                 (p, poolRunStateOffset, a = p.runState, a+1));
1143                      return;
1144                  }
1145              }
# Line 1157 | Line 1169 | public class ForkJoinWorkerThread extend
1169          if ((s & (s-1)) != 0)
1170              throw new Error("data type scale not a power of two");
1171          qShift = 31 - Integer.numberOfLeadingZeros(s);
1172 +        MAXIMUM_QUEUE_CAPACITY = 1 << (31 - qShift);
1173      }
1174  
1175      private static long objectFieldOffset(String field, Class<?> klazz) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines