ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.47 by jsr166, Tue Sep 7 07:10:53 2010 UTC vs.
Revision 1.53 by dl, Sun Oct 24 19:37:26 2010 UTC

# Line 6 | Line 6
6  
7   package jsr166y;
8  
9 import java.util.concurrent.*;
10
9   import java.util.Random;
10   import java.util.Collection;
11   import java.util.concurrent.locks.LockSupport;
12 + import java.util.concurrent.RejectedExecutionException;
13  
14   /**
15   * A thread managed by a {@link ForkJoinPool}.  This class is
# Line 490 | Line 489 | public class ForkJoinWorkerThread extend
489  
490      /*
491       * Intrinsics-based atomic writes for queue slots. These are
492 <     * basically the same as methods in AtomicObjectArray, but
492 >     * basically the same as methods in AtomicReferenceArray, but
493       * specialized for (1) ForkJoinTask elements (2) requirement that
494       * nullness and bounds checks have already been performed by
495       * callers and (3) effective offsets are known not to overflow
# Line 750 | Line 749 | public class ForkJoinWorkerThread extend
749      // Run State management
750  
751      // status check methods used mainly by ForkJoinPool
752 <    final boolean isRunning()     { return runState == 0; }
753 <    final boolean isTerminating() { return (runState & TERMINATING) != 0; }
754 <    final boolean isTerminated()  { return (runState & TERMINATED) != 0; }
755 <    final boolean isSuspended()   { return (runState & SUSPENDED) != 0; }
756 <    final boolean isTrimmed()     { return (runState & TRIMMED) != 0; }
752 >    final boolean isRunning()    { return runState == 0; }
753 >    final boolean isTerminated() { return (runState & TERMINATED) != 0; }
754 >    final boolean isSuspended()  { return (runState & SUSPENDED) != 0; }
755 >    final boolean isTrimmed()    { return (runState & TRIMMED) != 0; }
756 >
757 >    final boolean isTerminating() {
758 >        if ((runState & TERMINATING) != 0)
759 >            return true;
760 >        if (pool.isAtLeastTerminating()) { // propagate pool state
761 >            shutdown();
762 >            return true;
763 >        }
764 >        return false;
765 >    }
766  
767      /**
768       * Sets state to TERMINATING. Does NOT unpark or interrupt
# Line 923 | Line 931 | public class ForkJoinWorkerThread extend
931       * Possibly runs some tasks and/or blocks, until task is done.
932       *
933       * @param joinMe the task to join
934 +     * @param timed true if use timed wait
935 +     * @param nanos wait time if timed
936       */
937 <    final void joinTask(ForkJoinTask<?> joinMe) {
937 >    final void joinTask(ForkJoinTask<?> joinMe, boolean timed, long nanos) {
938          // currentJoin only written by this thread; only need ordered store
939          ForkJoinTask<?> prevJoin = currentJoin;
940          UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
941 <        if (sp != base)
942 <            localHelpJoinTask(joinMe);
943 <        if (joinMe.status >= 0)
944 <            pool.awaitJoin(joinMe, this);
941 >        if (isTerminating())                // cancel if shutting down
942 >            joinMe.cancelIgnoringExceptions();
943 >        else {
944 >            if (sp != base)
945 >                localHelpJoinTask(joinMe);
946 >            if (joinMe.status >= 0)
947 >                pool.awaitJoin(joinMe, this, timed, nanos);
948 >        }
949          UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
950      }
951  
# Line 969 | Line 983 | public class ForkJoinWorkerThread extend
983      }
984  
985      /**
986 <     * Unless terminating, tries to locate and help perform tasks for
987 <     * a stealer of the given task, or in turn one of its stealers.
988 <     * Traces currentSteal->currentJoin links looking for a thread
989 <     * working on a descendant of the given task and with a non-empty
990 <     * queue to steal back and execute tasks from.
986 >     * Tries to locate and help perform tasks for a stealer of the
987 >     * given task, or in turn one of its stealers.  Traces
988 >     * currentSteal->currentJoin links looking for a thread working on
989 >     * a descendant of the given task and with a non-empty queue to
990 >     * steal back and execute tasks from.
991       *
992       * The implementation is very branchy to cope with potential
993       * inconsistencies or loops encountering chains that are stale,
# Line 989 | Line 1003 | public class ForkJoinWorkerThread extend
1003          int n;
1004          if (joinMe.status < 0)                // already done
1005              return;
992        if ((runState & TERMINATING) != 0) {  // cancel if shutting down
993            joinMe.cancelIgnoringExceptions();
994            return;
995        }
1006          if ((ws = pool.workers) == null || (n = ws.length) <= 1)
1007              return;                           // need at least 2 workers
1008  
# Line 1057 | Line 1067 | public class ForkJoinWorkerThread extend
1067      }
1068  
1069      /**
1070 <     * Implements ForJoinTask.getSurplusQueuedTaskCount().
1070 >     * Implements ForkJoinTask.getSurplusQueuedTaskCount().
1071       * Returns an estimate of the number of tasks, offset by a
1072       * function of number of idle workers.
1073       *

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines