ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.51 by jsr166, Mon Sep 20 20:42:37 2010 UTC vs.
Revision 1.58 by jsr166, Sun Nov 21 08:18:19 2010 UTC

# Line 6 | Line 6
6  
7   package jsr166y;
8  
9 import java.util.concurrent.*;
10
9   import java.util.Random;
10   import java.util.Collection;
11   import java.util.concurrent.locks.LockSupport;
12 + import java.util.concurrent.RejectedExecutionException;
13  
14   /**
15   * A thread managed by a {@link ForkJoinPool}.  This class is
# Line 349 | Line 348 | public class ForkJoinWorkerThread extend
348      /**
349       * Initializes internal state after construction but before
350       * processing any tasks. If you override this method, you must
351 <     * invoke @code{super.onStart()} at the beginning of the method.
351 >     * invoke {@code super.onStart()} at the beginning of the method.
352       * Initialization requires care: Most fields must have legal
353       * default values, to ensure that attempted accesses from other
354       * threads work correctly even before this thread starts
# Line 750 | Line 749 | public class ForkJoinWorkerThread extend
749      // Run State management
750  
751      // status check methods used mainly by ForkJoinPool
752 <    final boolean isRunning()     { return runState == 0; }
753 <    final boolean isTerminated()  { return (runState & TERMINATED) != 0; }
754 <    final boolean isSuspended()   { return (runState & SUSPENDED) != 0; }
755 <    final boolean isTrimmed()     { return (runState & TRIMMED) != 0; }
752 >    final boolean isRunning()    { return runState == 0; }
753 >    final boolean isTerminated() { return (runState & TERMINATED) != 0; }
754 >    final boolean isSuspended()  { return (runState & SUSPENDED) != 0; }
755 >    final boolean isTrimmed()    { return (runState & TRIMMED) != 0; }
756  
757      final boolean isTerminating() {
758          if ((runState & TERMINATING) != 0)
# Line 932 | Line 931 | public class ForkJoinWorkerThread extend
931       * Possibly runs some tasks and/or blocks, until task is done.
932       *
933       * @param joinMe the task to join
934 +     * @param timed true if use timed wait
935 +     * @param nanos wait time if timed
936       */
937 <    final void joinTask(ForkJoinTask<?> joinMe) {
937 >    final void joinTask(ForkJoinTask<?> joinMe, boolean timed, long nanos) {
938          // currentJoin only written by this thread; only need ordered store
939          ForkJoinTask<?> prevJoin = currentJoin;
940          UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
941 <        if (sp != base)
942 <            localHelpJoinTask(joinMe);
943 <        if (joinMe.status >= 0)
944 <            pool.awaitJoin(joinMe, this);
941 >        if (isTerminating())                // cancel if shutting down
942 >            joinMe.cancelIgnoringExceptions();
943 >        else {
944 >            if (sp != base)
945 >                localHelpJoinTask(joinMe);
946 >            if (joinMe.status >= 0)
947 >                pool.awaitJoin(joinMe, this, timed, nanos);
948 >        }
949          UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
950      }
951  
952      /**
953       * Run tasks in local queue until given task is done.
954 +     * Not currently used because it complicates semantics.
955       *
956       * @param joinMe the task to join
957       */
# Line 978 | Line 984 | public class ForkJoinWorkerThread extend
984      }
985  
986      /**
987 <     * Unless terminating, tries to locate and help perform tasks for
988 <     * a stealer of the given task, or in turn one of its stealers.
989 <     * Traces currentSteal->currentJoin links looking for a thread
990 <     * working on a descendant of the given task and with a non-empty
991 <     * queue to steal back and execute tasks from.
987 >     * Tries to locate and help perform tasks for a stealer of the
988 >     * given task, or in turn one of its stealers.  Traces
989 >     * currentSteal->currentJoin links looking for a thread working on
990 >     * a descendant of the given task and with a non-empty queue to
991 >     * steal back and execute tasks from.
992       *
993       * The implementation is very branchy to cope with potential
994       * inconsistencies or loops encountering chains that are stale,
# Line 998 | Line 1004 | public class ForkJoinWorkerThread extend
1004          int n;
1005          if (joinMe.status < 0)                // already done
1006              return;
1001        if ((runState & TERMINATING) != 0) {  // cancel if shutting down
1002            joinMe.cancelIgnoringExceptions();
1003            return;
1004        }
1007          if ((ws = pool.workers) == null || (n = ws.length) <= 1)
1008              return;                           // need at least 2 workers
1009  
# Line 1015 | Line 1017 | public class ForkJoinWorkerThread extend
1017                      if (j < n) {
1018                          ForkJoinTask<?> vs;
1019                          if ((v = ws[j]) != null &&
1020 +                            (v != this || base == sp) &&
1021                              (vs = v.currentSteal) != null) {
1022                              if (joinMe.status < 0 || task.status < 0)
1023                                  return;       // stale or done

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines