ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.45 by jsr166, Sat Sep 4 00:08:04 2010 UTC vs.
Revision 1.59 by dl, Sun Nov 21 13:55:04 2010 UTC

# Line 6 | Line 6
6  
7   package jsr166y;
8  
9 import java.util.concurrent.*;
10
9   import java.util.Random;
10   import java.util.Collection;
11   import java.util.concurrent.locks.LockSupport;
12 + import java.util.concurrent.RejectedExecutionException;
13  
14   /**
15 < * A thread managed by a {@link ForkJoinPool}.  This class is
16 < * subclassable solely for the sake of adding functionality -- there
17 < * are no overridable methods dealing with scheduling or execution.
18 < * However, you can override initialization and termination methods
19 < * surrounding the main task processing loop.  If you do create such a
20 < * subclass, you will also need to supply a custom {@link
21 < * ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code
22 < * ForkJoinPool}.
15 > * A thread managed by a {@link ForkJoinPool}, which executes
16 > * {@link ForkJoinTask}s.
17 > * This class is subclassable solely for the sake of adding
18 > * functionality -- there are no overridable methods dealing with
19 > * scheduling or execution.  However, you can override initialization
20 > * and termination methods surrounding the main task processing loop.
21 > * If you do create such a subclass, you will also need to supply a
22 > * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
23 > * in a {@code ForkJoinPool}.
24   *
25   * @since 1.7
26   * @author Doug Lea
# Line 172 | Line 172 | public class ForkJoinWorkerThread extend
172  
173      /**
174       * Maximum work-stealing queue array size.  Must be less than or
175 <     * equal to 1 << 28 to ensure lack of index wraparound. (This
176 <     * is less than usual bounds, because we need leftshift by 3
177 <     * to be in int range).
175 >     * equal to 1 << (31 - width of array entry) to ensure lack of
176 >     * index wraparound. The value is set in the static block
177 >     * at the end of this file after obtaining width.
178       */
179 <    private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28;
179 >    private static final int MAXIMUM_QUEUE_CAPACITY;
180  
181      /**
182       * The pool this thread works in. Accessed directly by ForkJoinTask.
# Line 230 | Line 230 | public class ForkJoinWorkerThread extend
230      private static final int TRIMMED     = 0x08; // killed while suspended
231  
232      /**
233 <     * Number of steals, transferred and reset in pool callbacks pool
234 <     * when idle Accessed directly by pool.
233 >     * Number of steals. Directly accessed (and reset) by
234 >     * pool.tryAccumulateStealCount when idle.
235       */
236      int stealCount;
237  
# Line 286 | Line 286 | public class ForkJoinWorkerThread extend
286  
287      /**
288       * The task currently being joined, set only when actively trying
289 <     * to helpStealer. Written only by current thread, but read by
290 <     * others.
289 >     * to help other stealers in helpJoinTask. Written only by this
290 >     * thread, but read by others.
291       */
292      private volatile ForkJoinTask<?> currentJoin;
293  
294      /**
295       * The task most recently stolen from another worker (or
296 <     * submission queue).  Written only by current thread, but read by
296 >     * submission queue).  Written only by this thread, but read by
297       * others.
298       */
299      private volatile ForkJoinTask<?> currentSteal;
# Line 349 | Line 349 | public class ForkJoinWorkerThread extend
349      /**
350       * Initializes internal state after construction but before
351       * processing any tasks. If you override this method, you must
352 <     * invoke super.onStart() at the beginning of the method.
352 >     * invoke {@code super.onStart()} at the beginning of the method.
353       * Initialization requires care: Most fields must have legal
354       * default values, to ensure that attempted accesses from other
355       * threads work correctly even before this thread starts
# Line 399 | Line 399 | public class ForkJoinWorkerThread extend
399      /**
400       * This method is required to be public, but should never be
401       * called explicitly. It performs the main run loop to execute
402 <     * ForkJoinTasks.
402 >     * {@link ForkJoinTask}s.
403       */
404      public void run() {
405          Throwable exception = null;
# Line 447 | Line 447 | public class ForkJoinWorkerThread extend
447      }
448  
449      /**
450 <     * If a submission exists, try to activate and run it;
450 >     * If a submission exists, try to activate and run it.
451       *
452       * @return true if ran a task
453       */
454      private boolean tryExecSubmission() {
455          ForkJoinPool p = pool;
456 +        // This loop is needed in case attempt to activate fails, in
457 +        // which case we only retry if there still appears to be a
458 +        // submission.
459          while (p.hasQueuedSubmissions()) {
460              ForkJoinTask<?> t; int a;
461              if (active || // inline p.tryIncrementActiveCount
# Line 487 | Line 490 | public class ForkJoinWorkerThread extend
490  
491      /*
492       * Intrinsics-based atomic writes for queue slots. These are
493 <     * basically the same as methods in AtomicObjectArray, but
493 >     * basically the same as methods in AtomicReferenceArray, but
494       * specialized for (1) ForkJoinTask elements (2) requirement that
495       * nullness and bounds checks have already been performed by
496       * callers and (3) effective offsets are known not to overflow
497       * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't
498       * need corresponding version for reads: plain array reads are OK
499 <     * because they protected by other volatile reads and are
499 >     * because they are protected by other volatile reads and are
500       * confirmed by CASes.
501       *
502       * Most uses don't actually call these methods, but instead contain
# Line 562 | Line 565 | public class ForkJoinWorkerThread extend
565  
566      /**
567       * Tries to take a task from the base of own queue. Assumes active
568 <     * status.  Called only by current thread.
568 >     * status.  Called only by this thread.
569       *
570       * @return a task, or null if none
571       */
# Line 585 | Line 588 | public class ForkJoinWorkerThread extend
588  
589      /**
590       * Returns a popped task, or null if empty. Assumes active status.
591 <     * Called only by current thread.
591 >     * Called only by this thread.
592       */
593      private ForkJoinTask<?> popTask() {
594          ForkJoinTask<?>[] q = queue;
# Line 598 | Line 601 | public class ForkJoinWorkerThread extend
601                  if (t == null)   // lost to stealer
602                      break;
603                  if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
604 +                    /*
605 +                     * Note: here and in related methods, as a
606 +                     * performance (not correctness) issue, we'd like
607 +                     * to encourage compiler not to arbitrarily
608 +                     * postpone setting sp after successful CAS.
609 +                     * Currently there is no intrinsic for arranging
610 +                     * this, but using Unsafe putOrderedInt may be a
611 +                     * preferable strategy on some compilers even
612 +                     * though its main effect is a pre-, not post-
613 +                     * fence. To simplify possible changes, the option
614 +                     * is left in comments next to the associated
615 +                     * assignments.
616 +                     */
617                      sp = s; // putOrderedInt may encourage more timely write
618                      // UNSAFE.putOrderedInt(this, spOffset, s);
619                      return t;
# Line 609 | Line 625 | public class ForkJoinWorkerThread extend
625  
626      /**
627       * Specialized version of popTask to pop only if topmost element
628 <     * is the given task. Called only by current thread while
613 <     * active.
628 >     * is the given task. Called only by this thread while active.
629       *
630       * @param t the task. Caller must ensure non-null.
631       */
# Line 748 | Line 763 | public class ForkJoinWorkerThread extend
763      // Run State management
764  
765      // status check methods used mainly by ForkJoinPool
766 <    final boolean isRunning()     { return runState == 0; }
767 <    final boolean isTerminating() { return (runState & TERMINATING) != 0; }
768 <    final boolean isTerminated()  { return (runState & TERMINATED) != 0; }
769 <    final boolean isSuspended()   { return (runState & SUSPENDED) != 0; }
770 <    final boolean isTrimmed()     { return (runState & TRIMMED) != 0; }
766 >    final boolean isRunning()    { return runState == 0; }
767 >    final boolean isTerminated() { return (runState & TERMINATED) != 0; }
768 >    final boolean isSuspended()  { return (runState & SUSPENDED) != 0; }
769 >    final boolean isTrimmed()    { return (runState & TRIMMED) != 0; }
770 >
771 >    final boolean isTerminating() {
772 >        if ((runState & TERMINATING) != 0)
773 >            return true;
774 >        if (pool.isAtLeastTerminating()) { // propagate pool state
775 >            shutdown();
776 >            return true;
777 >        }
778 >        return false;
779 >    }
780  
781      /**
782       * Sets state to TERMINATING. Does NOT unpark or interrupt
# Line 846 | Line 870 | public class ForkJoinWorkerThread extend
870       */
871      final void cancelTasks() {
872          ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
873 <        if (cj != null) {
850 <            currentJoin = null;
873 >        if (cj != null && cj.status >= 0) {
874              cj.cancelIgnoringExceptions();
875              try {
876                  this.interrupt(); // awaken wait
# Line 855 | Line 878 | public class ForkJoinWorkerThread extend
878              }
879          }
880          ForkJoinTask<?> cs = currentSteal;
881 <        if (cs != null) {
859 <            currentSteal = null;
881 >        if (cs != null && cs.status >= 0)
882              cs.cancelIgnoringExceptions();
861        }
883          while (base != sp) {
884              ForkJoinTask<?> t = deqTask();
885              if (t != null)
# Line 921 | Line 942 | public class ForkJoinWorkerThread extend
942       * Possibly runs some tasks and/or blocks, until task is done.
943       *
944       * @param joinMe the task to join
945 +     * @param timed true if use timed wait
946 +     * @param nanos wait time if timed
947       */
948 <    final void joinTask(ForkJoinTask<?> joinMe) {
948 >    final void joinTask(ForkJoinTask<?> joinMe, boolean timed, long nanos) {
949          // currentJoin only written by this thread; only need ordered store
950          ForkJoinTask<?> prevJoin = currentJoin;
951          UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
952 <        if (sp != base)
930 <            localHelpJoinTask(joinMe);
931 <        if (joinMe.status >= 0)
932 <            pool.awaitJoin(joinMe, this);
952 >        pool.awaitJoin(joinMe, this, timed, nanos);
953          UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
954      }
955  
956      /**
957 <     * Run tasks in local queue until given task is done.
958 <     *
959 <     * @param joinMe the task to join
960 <     */
961 <    private void localHelpJoinTask(ForkJoinTask<?> joinMe) {
942 <        int s;
943 <        ForkJoinTask<?>[] q;
944 <        while (joinMe.status >= 0 && (s = sp) != base && (q = queue) != null) {
945 <            int i = (q.length - 1) & --s;
946 <            long u = (i << qShift) + qBase; // raw offset
947 <            ForkJoinTask<?> t = q[i];
948 <            if (t == null)  // lost to a stealer
949 <                break;
950 <            if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
951 <                /*
952 <                 * This recheck (and similarly in helpJoinTask)
953 <                 * handles cases where joinMe is independently
954 <                 * cancelled or forced even though there is other work
955 <                 * available. Back out of the pop by putting t back
956 <                 * into slot before we commit by writing sp.
957 <                 */
958 <                if (joinMe.status < 0) {
959 <                    UNSAFE.putObjectVolatile(q, u, t);
960 <                    break;
961 <                }
962 <                sp = s;
963 <                // UNSAFE.putOrderedInt(this, spOffset, s);
964 <                t.quietlyExec();
965 <            }
966 <        }
967 <    }
968 <
969 <    /**
970 <     * Unless terminating, tries to locate and help perform tasks for
971 <     * a stealer of the given task, or in turn one of its stealers.
972 <     * Traces currentSteal->currentJoin links looking for a thread
973 <     * working on a descendant of the given task and with a non-empty
974 <     * queue to steal back and execute tasks from.
957 >     * Tries to locate and help perform tasks for a stealer of the
958 >     * given task, or in turn one of its stealers.  Traces
959 >     * currentSteal->currentJoin links looking for a thread working on
960 >     * a descendant of the given task and with a non-empty queue to
961 >     * steal back and execute tasks from.
962       *
963       * The implementation is very branchy to cope with potential
964       * inconsistencies or loops encountering chains that are stale,
# Line 981 | Line 968 | public class ForkJoinWorkerThread extend
968       * don't work out.
969       *
970       * @param joinMe the task to join
971 <     */
972 <    final void helpJoinTask(ForkJoinTask<?> joinMe) {
973 <        ForkJoinWorkerThread[] ws;
974 <        int n;
975 <        if (joinMe.status < 0)                // already done
976 <            return;
977 <        if ((runState & TERMINATING) != 0) {  // cancel if shutting down
978 <            joinMe.cancelIgnoringExceptions();
979 <            return;
980 <        }
981 <        if ((ws = pool.workers) == null || (n = ws.length) <= 1)
982 <            return;                           // need at least 2 workers
983 <
984 <        ForkJoinTask<?> task = joinMe;        // base of chain
985 <        ForkJoinWorkerThread thread = this;   // thread with stolen task
986 <        for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
987 <            // Try to find v, the stealer of task, by first using hint
988 <            ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
989 <            if (v == null || v.currentSteal != task) {
990 <                for (int j = 0; ; ++j) {      // search array
991 <                    if (j < n) {
992 <                        ForkJoinTask<?> vs;
993 <                        if ((v = ws[j]) != null &&
994 <                            (vs = v.currentSteal) != null) {
995 <                            if (joinMe.status < 0 || task.status < 0)
996 <                                return;       // stale or done
997 <                            if (vs == task) {
998 <                                thread.stealHint = j;
999 <                                break;        // save hint for next time
971 >     * @param running if false, then must update pool count upon
972 >     *  running a task
973 >     * @return value of running on exit
974 >     */
975 >    final boolean helpJoinTask(ForkJoinTask<?> joinMe, boolean running) {
976 >        /*
977 >         * Initial checks to (1) abort if terminating; (2) clean out
978 >         * old cancelled tasks from local queue; (3) if joinMe is next
979 >         * task, run it; (4) omit scan if local queue nonempty (since
980 >         * it may contain non-descendents of joinMe).
981 >         */
982 >        ForkJoinPool p = pool;
983 >        for (;;) {
984 >            ForkJoinTask<?>[] q;
985 >            int s;
986 >            if (joinMe.status < 0)
987 >                return running;
988 >            else if ((runState & TERMINATING) != 0)
989 >                joinMe.cancelIgnoringExceptions();
990 >            else if ((s = sp) == base || (q = queue) == null)
991 >                break;                            // queue empty
992 >            else {
993 >                int i = (q.length - 1) & --s;
994 >                long u = (i << qShift) + qBase;   // raw offset
995 >                ForkJoinTask<?> t = q[i];
996 >                if (t == null)
997 >                    break;                        // lost to a stealer
998 >                else if (t != joinMe && t.status >= 0)
999 >                    return running;               // cannot safely help
1000 >                else if ((running ||
1001 >                          (running = p.tryIncrementRunningCount())) &&
1002 >                         UNSAFE.compareAndSwapObject(q, u, t, null)) {
1003 >                    sp = s; // putOrderedInt may encourage more timely write
1004 >                    // UNSAFE.putOrderedInt(this, spOffset, s);
1005 >                    if (t.status >= 0)
1006 >                        t.quietlyExec();
1007 >                }
1008 >            }
1009 >        }
1010 >
1011 >        int n;                                    // worker array size
1012 >        ForkJoinWorkerThread[] ws = p.workers;
1013 >        if (ws != null && (n = ws.length) > 1) {  // need at least 2 workers
1014 >            ForkJoinTask<?> task = joinMe;        // base of chain
1015 >            ForkJoinWorkerThread thread = this;   // thread with stolen task
1016 >
1017 >            outer:for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
1018 >                // Try to find v, the stealer of task, by first using hint
1019 >                ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
1020 >                if (v == null || v.currentSteal != task) {
1021 >                    for (int j = 0; ; ++j) {      // search array
1022 >                        if (j < n) {
1023 >                            ForkJoinTask<?> vs;
1024 >                            if ((v = ws[j]) != null &&
1025 >                                (vs = v.currentSteal) != null) {
1026 >                                if (joinMe.status < 0)
1027 >                                    break outer;
1028 >                                if (vs == task) {
1029 >                                    if (task.status < 0)
1030 >                                        break outer; // stale
1031 >                                    thread.stealHint = j;
1032 >                                    break;        // save hint for next time
1033 >                                }
1034                              }
1035                          }
1036 +                        else
1037 +                            break outer;          // no stealer
1038                      }
1016                    else
1017                        return;               // no stealer
1039                  }
1040 <            }
1041 <            for (;;) { // Try to help v, using specialized form of deqTask
1042 <                if (joinMe.status < 0)
1043 <                    return;
1044 <                int b = v.base;
1045 <                ForkJoinTask<?>[] q = v.queue;
1046 <                if (b == v.sp || q == null)
1047 <                    break;
1048 <                int i = (q.length - 1) & b;
1049 <                long u = (i << qShift) + qBase;
1050 <                ForkJoinTask<?> t = q[i];
1051 <                int pid = poolIndex;
1052 <                ForkJoinTask<?> ps = currentSteal;
1053 <                if (task.status < 0)
1054 <                    return;                   // stale or done
1055 <                if (t != null && v.base == b++ &&
1056 <                    UNSAFE.compareAndSwapObject(q, u, t, null)) {
1057 <                    if (joinMe.status < 0) {
1058 <                        UNSAFE.putObjectVolatile(q, u, t);
1059 <                        return;               // back out on cancel
1040 >
1041 >                // Try to help v, using specialized form of deqTask
1042 >                for (;;) {
1043 >                    if (joinMe.status < 0)
1044 >                        break outer;
1045 >                    int b = v.base;
1046 >                    ForkJoinTask<?>[] q = v.queue;
1047 >                    if (b == v.sp || q == null)
1048 >                        break;                    // empty
1049 >                    int i = (q.length - 1) & b;
1050 >                    long u = (i << qShift) + qBase;
1051 >                    ForkJoinTask<?> t = q[i];
1052 >                    if (task.status < 0)
1053 >                        break outer;              // stale
1054 >                    if (t != null &&
1055 >                        (running ||
1056 >                         (running = p.tryIncrementRunningCount())) &&
1057 >                        v.base == b++ &&
1058 >                        UNSAFE.compareAndSwapObject(q, u, t, null)) {
1059 >                        if (t != joinMe && joinMe.status < 0) {
1060 >                            UNSAFE.putObjectVolatile(q, u, t);
1061 >                            break outer;          // joinMe cancelled; back out
1062 >                        }
1063 >                        v.base = b;
1064 >                        if (t.status >= 0) {
1065 >                            ForkJoinTask<?> ps = currentSteal;
1066 >                            int pid = poolIndex;
1067 >                            v.stealHint = pid;
1068 >                            UNSAFE.putOrderedObject(this,
1069 >                                                    currentStealOffset, t);
1070 >                            t.quietlyExec();
1071 >                            UNSAFE.putOrderedObject(this,
1072 >                                                    currentStealOffset, ps);
1073 >                        }
1074                      }
1040                    v.base = b;
1041                    v.stealHint = pid;
1042                    UNSAFE.putOrderedObject(this, currentStealOffset, t);
1043                    t.quietlyExec();
1044                    UNSAFE.putOrderedObject(this, currentStealOffset, ps);
1075                  }
1076 +
1077 +                // Try to descend to find v's stealer
1078 +                ForkJoinTask<?> next = v.currentJoin;
1079 +                if (task.status < 0 || next == null || next == task)
1080 +                    break;                       // stale, dead-end, or cyclic
1081 +                if ((runState & TERMINATING) != 0)
1082 +                    joinMe.cancelIgnoringExceptions();
1083 +                if (joinMe.status < 0)
1084 +                    break;
1085 +                task = next;
1086 +                thread = v;
1087              }
1047            // Try to descend to find v's stealer
1048            ForkJoinTask<?> next = v.currentJoin;
1049            if (task.status < 0 || next == null || next == task ||
1050                joinMe.status < 0)
1051                return;
1052            task = next;
1053            thread = v;
1088          }
1089 +        return running;
1090      }
1091  
1092      /**
1093 +     * Implements ForkJoinTask.getSurplusQueuedTaskCount().
1094       * Returns an estimate of the number of tasks, offset by a
1095       * function of number of idle workers.
1096       *
# Line 1157 | Line 1193 | public class ForkJoinWorkerThread extend
1193          if ((s & (s-1)) != 0)
1194              throw new Error("data type scale not a power of two");
1195          qShift = 31 - Integer.numberOfLeadingZeros(s);
1196 +        MAXIMUM_QUEUE_CAPACITY = 1 << (31 - qShift);
1197      }
1198  
1199      private static long objectFieldOffset(String field, Class<?> klazz) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines