ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.42 by dl, Sun Aug 29 23:34:46 2010 UTC vs.
Revision 1.56 by jsr166, Thu Nov 18 00:39:15 2010 UTC

# Line 6 | Line 6
6  
7   package jsr166y;
8  
9 import java.util.concurrent.*;
10
9   import java.util.Random;
10   import java.util.Collection;
11   import java.util.concurrent.locks.LockSupport;
12 + import java.util.concurrent.RejectedExecutionException;
13  
14   /**
15   * A thread managed by a {@link ForkJoinPool}.  This class is
# Line 172 | Line 171 | public class ForkJoinWorkerThread extend
171  
172      /**
173       * Maximum work-stealing queue array size.  Must be less than or
174 <     * equal to 1 << 28 to ensure lack of index wraparound. (This
175 <     * is less than usual bounds, because we need leftshift by 3
176 <     * to be in int range).
174 >     * equal to 1 << (31 - width of array entry) to ensure lack of
175 >     * index wraparound. The value is set in the static block
176 >     * at the end of this file after obtaining width.
177       */
178 <    private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28;
178 >    private static final int MAXIMUM_QUEUE_CAPACITY;
179  
180      /**
181       * The pool this thread works in. Accessed directly by ForkJoinTask.
# Line 230 | Line 229 | public class ForkJoinWorkerThread extend
229      private static final int TRIMMED     = 0x08; // killed while suspended
230  
231      /**
232 <     * Number of steals, transferred and reset in pool callbacks pool
233 <     * when idle Accessed directly by pool.
232 >     * Number of steals. Directly accessed (and reset) by
233 >     * pool.tryAccumulateStealCount when idle.
234       */
235      int stealCount;
236  
# Line 286 | Line 285 | public class ForkJoinWorkerThread extend
285  
286      /**
287       * The task currently being joined, set only when actively trying
288 <     * to helpStealer. Written only by current thread, but read by
289 <     * others.
288 >     * to help other stealers in helpJoinTask. Written only by this
289 >     * thread, but read by others.
290       */
291      private volatile ForkJoinTask<?> currentJoin;
292  
293      /**
294       * The task most recently stolen from another worker (or
295 <     * submission queue).  Written only by current thread, but read by
295 >     * submission queue).  Written only by this thread, but read by
296       * others.
297       */
298      private volatile ForkJoinTask<?> currentSteal;
# Line 313 | Line 312 | public class ForkJoinWorkerThread extend
312      }
313  
314      /**
315 <     * Performs additional initialization and starts this thread
315 >     * Performs additional initialization and starts this thread.
316       */
317      final void start(int poolIndex, UncaughtExceptionHandler ueh) {
318          this.poolIndex = poolIndex;
# Line 349 | Line 348 | public class ForkJoinWorkerThread extend
348      /**
349       * Initializes internal state after construction but before
350       * processing any tasks. If you override this method, you must
351 <     * invoke super.onStart() at the beginning of the method.
351 >     * invoke @code{super.onStart()} at the beginning of the method.
352       * Initialization requires care: Most fields must have legal
353       * default values, to ensure that attempted accesses from other
354       * threads work correctly even before this thread starts
# Line 381 | Line 380 | public class ForkJoinWorkerThread extend
380              if (active) {
381                  int a; // inline p.tryDecrementActiveCount
382                  active = false;
383 <                do {} while(!UNSAFE.compareAndSwapInt
384 <                            (p, poolRunStateOffset, a = p.runState, a - 1));
383 >                do {} while (!UNSAFE.compareAndSwapInt
384 >                             (p, poolRunStateOffset, a = p.runState, a - 1));
385              }
386              cancelTasks();
387              setTerminated();
# Line 416 | Line 415 | public class ForkJoinWorkerThread extend
415      // helpers for run()
416  
417      /**
418 <     * Find and execute tasks and check status while running
418 >     * Finds and executes tasks, and checks status while running.
419       */
420      private void mainLoop() {
421          boolean ran = false; // true if ran a task on last step
# Line 430 | Line 429 | public class ForkJoinWorkerThread extend
429      }
430  
431      /**
432 <     * Try to steal a task and execute it
432 >     * Tries to steal a task and execute it.
433       *
434       * @return true if ran a task
435       */
# Line 447 | Line 446 | public class ForkJoinWorkerThread extend
446      }
447  
448      /**
449 <     * If a submission exists, try to activate and run it;
449 >     * If a submission exists, try to activate and run it.
450       *
451       * @return true if ran a task
452       */
453      private boolean tryExecSubmission() {
454          ForkJoinPool p = pool;
455 +        // This loop is needed in case attempt to activate fails, in
456 +        // which case we only retry if there still appears to be a
457 +        // submission.
458          while (p.hasQueuedSubmissions()) {
459              ForkJoinTask<?> t; int a;
460              if (active || // inline p.tryIncrementActiveCount
# Line 477 | Line 479 | public class ForkJoinWorkerThread extend
479       */
480      private void execLocalTasks() {
481          while (runState == 0) {
482 <            ForkJoinTask<?> t = locallyFifo? locallyDeqTask() : popTask();
482 >            ForkJoinTask<?> t = locallyFifo ? locallyDeqTask() : popTask();
483              if (t != null)
484                  t.quietlyExec();
485              else if (sp == base)
# Line 487 | Line 489 | public class ForkJoinWorkerThread extend
489  
490      /*
491       * Intrinsics-based atomic writes for queue slots. These are
492 <     * basically the same as methods in AtomicObjectArray, but
492 >     * basically the same as methods in AtomicReferenceArray, but
493       * specialized for (1) ForkJoinTask elements (2) requirement that
494       * nullness and bounds checks have already been performed by
495       * callers and (3) effective offsets are known not to overflow
496       * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't
497       * need corresponding version for reads: plain array reads are OK
498 <     * because they protected by other volatile reads and are
498 >     * because they are protected by other volatile reads and are
499       * confirmed by CASes.
500       *
501       * Most uses don't actually call these methods, but instead contain
# Line 517 | Line 519 | public class ForkJoinWorkerThread extend
519       * range. This method is used only during resets and backouts.
520       */
521      private static final void writeSlot(ForkJoinTask<?>[] q, int i,
522 <                                              ForkJoinTask<?> t) {
522 >                                        ForkJoinTask<?> t) {
523          UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t);
524      }
525  
# Line 562 | Line 564 | public class ForkJoinWorkerThread extend
564  
565      /**
566       * Tries to take a task from the base of own queue. Assumes active
567 <     * status.  Called only by current thread.
567 >     * status.  Called only by this thread.
568       *
569       * @return a task, or null if none
570       */
# Line 585 | Line 587 | public class ForkJoinWorkerThread extend
587  
588      /**
589       * Returns a popped task, or null if empty. Assumes active status.
590 <     * Called only by current thread.
590 >     * Called only by this thread.
591       */
592      private ForkJoinTask<?> popTask() {
593          ForkJoinTask<?>[] q = queue;
# Line 609 | Line 611 | public class ForkJoinWorkerThread extend
611  
612      /**
613       * Specialized version of popTask to pop only if topmost element
614 <     * is the given task. Called only by current thread while
613 <     * active.
614 >     * is the given task. Called only by this thread while active.
615       *
616       * @param t the task. Caller must ensure non-null.
617       */
# Line 628 | Line 629 | public class ForkJoinWorkerThread extend
629      }
630  
631      /**
632 <     * Returns next task or null if empty or contended
632 >     * Returns next task, or null if empty or contended.
633       */
634      final ForkJoinTask<?> peekTask() {
635          ForkJoinTask<?>[] q = queue;
# Line 670 | Line 671 | public class ForkJoinWorkerThread extend
671       * Computes next value for random victim probe in scan().  Scans
672       * don't require a very high quality generator, but also not a
673       * crummy one.  Marsaglia xor-shift is cheap and works well enough.
674 <     * Note: This is manually inlined in scan()
674 >     * Note: This is manually inlined in scan().
675       */
676      private static final int xorShift(int r) {
677          r ^= r << 13;
# Line 748 | Line 749 | public class ForkJoinWorkerThread extend
749      // Run State management
750  
751      // status check methods used mainly by ForkJoinPool
752 <    final boolean isRunning()     { return runState == 0; }
753 <    final boolean isTerminating() { return (runState & TERMINATING) != 0; }
754 <    final boolean isTerminated()  { return (runState & TERMINATED) != 0; }
755 <    final boolean isSuspended()   { return (runState & SUSPENDED) != 0; }
756 <    final boolean isTrimmed()     { return (runState & TRIMMED) != 0; }
752 >    final boolean isRunning()    { return runState == 0; }
753 >    final boolean isTerminated() { return (runState & TERMINATED) != 0; }
754 >    final boolean isSuspended()  { return (runState & SUSPENDED) != 0; }
755 >    final boolean isTrimmed()    { return (runState & TRIMMED) != 0; }
756 >
757 >    final boolean isTerminating() {
758 >        if ((runState & TERMINATING) != 0)
759 >            return true;
760 >        if (pool.isAtLeastTerminating()) { // propagate pool state
761 >            shutdown();
762 >            return true;
763 >        }
764 >        return false;
765 >    }
766  
767      /**
768       * Sets state to TERMINATING. Does NOT unpark or interrupt
# Line 776 | Line 786 | public class ForkJoinWorkerThread extend
786      }
787  
788      /**
789 <     * Sets state to TERMINATED. Called only by onTermination()
789 >     * Sets state to TERMINATED. Called only by onTermination().
790       */
791      private void setTerminated() {
792          int s;
# Line 897 | Line 907 | public class ForkJoinWorkerThread extend
907              if (active ||
908                  (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
909                                                     a = p.runState, a + 1)))
910 <                return locallyFifo? locallyDeqTask() : popTask();
910 >                return locallyFifo ? locallyDeqTask() : popTask();
911          }
912          return null;
913      }
# Line 921 | Line 931 | public class ForkJoinWorkerThread extend
931       * Possibly runs some tasks and/or blocks, until task is done.
932       *
933       * @param joinMe the task to join
934 +     * @param timed true if use timed wait
935 +     * @param nanos wait time if timed
936       */
937 <    final void joinTask(ForkJoinTask<?> joinMe) {
937 >    final void joinTask(ForkJoinTask<?> joinMe, boolean timed, long nanos) {
938          // currentJoin only written by this thread; only need ordered store
939          ForkJoinTask<?> prevJoin = currentJoin;
940          UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
941 <        if (sp != base)
942 <            localHelpJoinTask(joinMe);
943 <        if (joinMe.status >= 0)
944 <            pool.awaitJoin(joinMe, this);
941 >        if (isTerminating())                // cancel if shutting down
942 >            joinMe.cancelIgnoringExceptions();
943 >        else
944 >            pool.awaitJoin(joinMe, this, timed, nanos);
945          UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
946      }
947  
948      /**
949       * Run tasks in local queue until given task is done.
950 +     * Not currently used because it complicates semantics.
951       *
952       * @param joinMe the task to join
953       */
# Line 967 | Line 980 | public class ForkJoinWorkerThread extend
980      }
981  
982      /**
983 <     * Unless terminating, tries to locate and help perform tasks for
984 <     * a stealer of the given task, or in turn one of its stealers.
985 <     * Traces currentSteal->currentJoin links looking for a thread
986 <     * working on a descendant of the given task and with a non-empty
987 <     * queue to steal back and execute tasks from.
983 >     * Tries to locate and help perform tasks for a stealer of the
984 >     * given task, or in turn one of its stealers.  Traces
985 >     * currentSteal->currentJoin links looking for a thread working on
986 >     * a descendant of the given task and with a non-empty queue to
987 >     * steal back and execute tasks from.
988       *
989       * The implementation is very branchy to cope with potential
990       * inconsistencies or loops encountering chains that are stale,
# Line 987 | Line 1000 | public class ForkJoinWorkerThread extend
1000          int n;
1001          if (joinMe.status < 0)                // already done
1002              return;
990        if ((runState & TERMINATING) != 0) {  // cancel if shutting down
991            joinMe.cancelIgnoringExceptions();
992            return;
993        }
1003          if ((ws = pool.workers) == null || (n = ws.length) <= 1)
1004              return;                           // need at least 2 workers
1005  
# Line 1003 | Line 1012 | public class ForkJoinWorkerThread extend
1012                  for (int j = 0; ; ++j) {      // search array
1013                      if (j < n) {
1014                          ForkJoinTask<?> vs;
1015 <                        if ((v = ws[j]) != null &&
1015 >                        if ((v = ws[j]) != null && v != this &&
1016                              (vs = v.currentSteal) != null) {
1017                              if (joinMe.status < 0 || task.status < 0)
1018                                  return;       // stale or done
# Line 1055 | Line 1064 | public class ForkJoinWorkerThread extend
1064      }
1065  
1066      /**
1067 +     * Implements ForkJoinTask.getSurplusQueuedTaskCount().
1068       * Returns an estimate of the number of tasks, offset by a
1069       * function of number of idle workers.
1070       *
# Line 1126 | Line 1136 | public class ForkJoinWorkerThread extend
1136                  }
1137                  if (p.isQuiescent()) {
1138                      active = true; // re-activate
1139 <                    do {} while(!UNSAFE.compareAndSwapInt
1140 <                                (p, poolRunStateOffset, a = p.runState, a+1));
1139 >                    do {} while (!UNSAFE.compareAndSwapInt
1140 >                                 (p, poolRunStateOffset, a = p.runState, a+1));
1141                      return;
1142                  }
1143              }
# Line 1157 | Line 1167 | public class ForkJoinWorkerThread extend
1167          if ((s & (s-1)) != 0)
1168              throw new Error("data type scale not a power of two");
1169          qShift = 31 - Integer.numberOfLeadingZeros(s);
1170 +        MAXIMUM_QUEUE_CAPACITY = 1 << (31 - qShift);
1171      }
1172  
1173      private static long objectFieldOffset(String field, Class<?> klazz) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines