ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinWorkerThread.java (file contents):
Revision 1.16 by dl, Thu May 27 16:47:21 2010 UTC vs.
Revision 1.17 by dl, Wed Jul 7 20:41:24 2010 UTC

# Line 81 | Line 81 | public class ForkJoinWorkerThread extend
81       * by the ForkJoinPool).  This allows use in message-passing
82       * frameworks in which tasks are never joined.
83       *
84 <     * Efficient implementation of this approach currently relies on
84 >     * When a worker would otherwise be blocked waiting to join a
85 >     * task, it first tries a form of linear helping: Each worker
86 >     * records (in field stolen) the most recent task it stole
87 >     * from some other worker. Plus, it records (in field joining) the
88 >     * task it is currently actively joining. Method joinTask uses
89 >     * these markers to try to find a worker to help (i.e., steal back
90 >     * a task from and execute it) that could hasten completion of the
91 >     * actively joined task. In essence, the joiner executes a task
92 >     * that would be on its own local deque had the to-be-joined task
93 >     * not been stolen. This may be seen as a conservative variant of
94 >     * the approach in Wagner & Calder "Leapfrogging: a portable
95 >     * technique for implementing efficient futures" SIGPLAN Notices,
96 >     * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
97 >     * in that: (1) We only maintain dependency links across workers
98 >     * upon steals, rather than maintain per-task bookkeeping.  This
99 >     * requires a linear scan of workers array to locate stealers,
100 >     * which isolates cost to when it is needed, rather than adding to
101 >     * per-task overhead.  (2) It is "shallow", ignoring nesting and
102 >     * potentially cyclic mutual steals.  (3) It is intentionally
103 >     * racy: field joining is updated only while actively joining,
104 >     * which means that we could miss links in the chain during
105 >     * long-lived tasks, GC stalls etc.  (4) We fall back to
106 >     * suspending the worker and if necessary replacing it with a
107 >     * spare (see ForkJoinPool.tryAwaitJoin).
108 >     *
109 >     * Efficient implementation of these algorithms currently relies on
110       * an uncomfortable amount of "Unsafe" mechanics. To maintain
111       * correct orderings, reads and writes of variable base require
112       * volatile ordering.  Variable sp does not require volatile
# Line 136 | Line 161 | public class ForkJoinWorkerThread extend
161  
162      /**
163       * Capacity of work-stealing queue array upon initialization.
164 <     * Must be a power of two. Initial size must be at least 2, but is
164 >     * Must be a power of two. Initial size must be at least 4, but is
165       * padded to minimize cache effects.
166       */
167      private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
# Line 155 | Line 180 | public class ForkJoinWorkerThread extend
180      final ForkJoinPool pool;
181  
182      /**
183 +     * The task most recently stolen from another worker
184 +     */
185 +    private volatile ForkJoinTask<?> stolen;
186 +
187 +    /**
188 +     * The task currently being joined, set only when actively
189 +     * trying to helpStealer.
190 +     */
191 +    private volatile ForkJoinTask<?> joining;
192 +
193 +    /**
194       * The work-stealing queue array. Size must be a power of two.
195       * Initialized in onStart, to improve memory locality.
196       */
# Line 194 | Line 230 | public class ForkJoinWorkerThread extend
230       * currently not exported but included because volatile write upon
231       * park also provides a workaround for a JVM bug.
232       */
233 <    private volatile int parkCount;
233 >    volatile int parkCount;
234  
235      /**
236       * Number of steals, transferred and reset in pool callbacks pool
# Line 219 | Line 255 | public class ForkJoinWorkerThread extend
255       * Shadows value from ForkJoinPool, which resets it if changed
256       * pool-wide.
257       */
258 <    private boolean locallyFifo;
259 <
258 >    private final boolean locallyFifo;
259 >    
260      /**
261       * Index of this worker in pool array. Set once by pool before
262       * running, and accessed directly by pool to locate this worker in
# Line 247 | Line 283 | public class ForkJoinWorkerThread extend
283       * @throws NullPointerException if pool is null
284       */
285      protected ForkJoinWorkerThread(ForkJoinPool pool) {
250        if (pool == null) throw new NullPointerException();
286          this.pool = pool;
287 +        this.locallyFifo = pool.locallyFifo;
288          // To avoid exposing construction details to subclasses,
289          // remaining initialization is in start() and onStart()
290      }
# Line 256 | Line 292 | public class ForkJoinWorkerThread extend
292      /**
293       * Performs additional initialization and starts this thread
294       */
295 <    final void start(int poolIndex, boolean locallyFifo,
260 <                     UncaughtExceptionHandler ueh) {
295 >    final void start(int poolIndex, UncaughtExceptionHandler ueh) {
296          this.poolIndex = poolIndex;
262        this.locallyFifo = locallyFifo;
297          if (ueh != null)
298              setUncaughtExceptionHandler(ueh);
299          setDaemon(true);
# Line 303 | Line 337 | public class ForkJoinWorkerThread extend
337          int rs = seedGenerator.nextInt();
338          seed = rs == 0? 1 : rs; // seed must be nonzero
339  
340 <        // Allocate name string and queue array in this thread
340 >        // Allocate name string and arrays in this thread
341          String pid = Integer.toString(pool.getPoolNumber());
342          String wid = Integer.toString(poolIndex);
343          setName("ForkJoinPool-" + pid + "-worker-" + wid);
# Line 321 | Line 355 | public class ForkJoinWorkerThread extend
355       */
356      protected void onTermination(Throwable exception) {
357          try {
358 +            stolen = null;
359 +            joining = null;
360              cancelTasks();
361              setTerminated();
362              pool.workerTerminated(this);
# Line 356 | Line 392 | public class ForkJoinWorkerThread extend
392       * Find and execute tasks and check status while running
393       */
394      private void mainLoop() {
395 <        boolean ran = false; // true if ran task on previous step
395 >        boolean ran = false;      // true if ran task in last loop iter
396 >        boolean prevRan = false;  // true if ran on last or previous step
397          ForkJoinPool p = pool;
398          for (;;) {
399 <            p.preStep(this, ran);
399 >            p.preStep(this, prevRan);
400              if (runState != 0)
401                  return;
402              ForkJoinTask<?> t; // try to get and run stolen or submitted task
403 <            if (ran = (t = scan()) != null || (t = pollSubmission()) != null) {
403 >            if ((t = scan()) != null || (t = pollSubmission()) != null) {
404                  t.tryExec();
405                  if (base != sp)
406                      runLocalTasks();
407 +                stolen = null;
408 +                prevRan = ran = true;
409 +            }
410 +            else {
411 +                prevRan = ran;
412 +                ran = false;
413              }
414          }
415      }
# Line 445 | Line 488 | public class ForkJoinWorkerThread extend
488       * @param t the task. Caller must ensure non-null.
489       */
490      final void pushTask(ForkJoinTask<?> t) {
448        int s;
491          ForkJoinTask<?>[] q = queue;
492          int mask = q.length - 1; // implicit assert q != null
493 <        UNSAFE.putOrderedObject(q, (((s = sp++) & mask) << qShift) + qBase, t);
494 <        if ((s -= base) <= 0)
495 <            pool.signalWork();
496 <        else if (s + 1 >= mask)
497 <            growQueue();
493 >        int s = sp++;            // ok to increment sp before slot write
494 >        UNSAFE.putOrderedObject(q, ((s & mask) << qShift) + qBase, t);
495 >        if ((s -= base) == 0)
496 >            pool.signalWork();   // was empty
497 >        else if (s == mask)
498 >            growQueue();         // is full
499      }
500  
501      /**
502       * Tries to take a task from the base of the queue, failing if
503       * empty or contended. Note: Specializations of this code appear
504 <     * in scan and scanWhileJoining.
504 >     * in locallyDeqTask and elsewhere.
505       *
506       * @return a task, or null if none or contended
507       */
# Line 468 | Line 511 | public class ForkJoinWorkerThread extend
511          int b, i;
512          if ((b = base) != sp &&
513              (q = queue) != null && // must read q after b
514 <            (t = q[i = (q.length - 1) & b]) != null &&
514 >            (t = q[i = (q.length - 1) & b]) != null && base == b &&
515              UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) {
516              base = b + 1;
517              return t;
# Line 488 | Line 531 | public class ForkJoinWorkerThread extend
531              ForkJoinTask<?> t;
532              int b, i;
533              while (sp != (b = base)) {
534 <                if ((t = q[i = (q.length - 1) & b]) != null &&
534 >                if ((t = q[i = (q.length - 1) & b]) != null && base == b &&
535                      UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase,
536                                                  t, null)) {
537                      base = b + 1;
# Line 621 | Line 664 | public class ForkJoinWorkerThread extend
664                  ForkJoinWorkerThread v = ws[k & mask];
665                  r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
666                  if (v != null && v.base != v.sp) {
667 <                    int b, i;             // inline specialized deqTask
668 <                    ForkJoinTask<?>[] q;
669 <                    ForkJoinTask<?> t;
670 <                    if ((canSteal ||      // ensure active status
671 <                         (canSteal = active = p.tryIncrementActiveCount())) &&
672 <                        (q = v.queue) != null &&
673 <                        (t = q[i = (q.length - 1) & (b = v.base)]) != null &&
674 <                        UNSAFE.compareAndSwapObject
675 <                        (q, (i << qShift) + qBase, t, null)) {
676 <                        v.base = b + 1;
677 <                        seed = r;
678 <                        ++stealCount;
679 <                        return t;
667 >                    if (canSteal ||       // ensure active status
668 >                        (canSteal = active = p.tryIncrementActiveCount())) {
669 >                        int b = v.base;   // inline specialized deqTask
670 >                        ForkJoinTask<?>[] q;
671 >                        if (b != v.sp && (q = v.queue) != null) {
672 >                            ForkJoinTask<?> t;
673 >                            int i = (q.length - 1) & b;
674 >                            long u = (i << qShift) + qBase; // raw offset
675 >                            if ((t = q[i]) != null && v.base == b &&
676 >                                UNSAFE.compareAndSwapObject(q, u, t, null)) {
677 >                                stolen = t;
678 >                                v.base = b + 1;
679 >                                seed = r;
680 >                                ++stealCount;
681 >                                return t;
682 >                            }
683 >                        }
684                      }
685                      j = -n;
686                      k = r;                // restart on contention
# Line 688 | Line 735 | public class ForkJoinWorkerThread extend
735      }
736  
737      /**
738 <     * Instrumented version of park. Also used by ForkJoinPool.awaitEvent
738 >     * Instrumented version of park used by ForkJoinPool.awaitEvent
739       */
740      final void doPark() {
741          ++parkCount;
# Line 702 | Line 749 | public class ForkJoinWorkerThread extend
749       * @return true if successful
750       */
751      final boolean tryUnsuspend() {
752 <        int s;
753 <        return (((s = runState) & SUSPENDED) != 0 &&
754 <                UNSAFE.compareAndSwapInt(this, runStateOffset, s,
755 <                                         s & ~SUSPENDED));
752 >        int s = runState;
753 >        if ((s & SUSPENDED) != 0)
754 >            return UNSAFE.compareAndSwapInt(this, runStateOffset, s,
755 >                                            s & ~SUSPENDED);
756 >        return false;
757      }
758  
759      /**
# Line 726 | Line 774 | public class ForkJoinWorkerThread extend
774                                                s | SUSPENDED))
775                  break;
776          }
777 +        boolean timed;
778 +        long nanos;
779 +        long startTime;
780 +        if (poolIndex < pool.parallelism) {
781 +            timed = false;
782 +            nanos = 0L;
783 +            startTime = 0L;
784 +        }
785 +        else {
786 +            timed = true;
787 +            nanos = SPARE_KEEPALIVE_NANOS;
788 +            startTime = System.nanoTime();
789 +        }
790 +        pool.accumulateStealCount(this);
791          lastEventCount = 0;      // reset upon resume
730        ForkJoinPool p = pool;
731        p.releaseWaiters();      // help others progress
732        p.accumulateStealCount(this);
792          interrupted();           // clear/ignore interrupts
734        if (poolIndex < p.getParallelism()) { // untimed wait
735            while ((runState & SUSPENDED) != 0)
736                doPark();
737            return true;
738        }
739        return timedSuspend();   // timed wait if apparently non-core
740    }
741
742    /**
743     * Blocks as spare until resumed or timed out
744     * @return false if trimmed
745     */
746    private boolean timedSuspend() {
747        long nanos = SPARE_KEEPALIVE_NANOS;
748        long startTime = System.nanoTime();
793          while ((runState & SUSPENDED) != 0) {
794              ++parkCount;
795 <            if ((nanos -= (System.nanoTime() - startTime)) > 0)
795 >            if (!timed)
796 >                LockSupport.park(this);
797 >            else if ((nanos -= (System.nanoTime() - startTime)) > 0)
798                  LockSupport.parkNanos(this, nanos);
799              else { // try to trim on timeout
800                  int s = runState;
# Line 772 | Line 818 | public class ForkJoinWorkerThread extend
818      }
819  
820      /**
775     * Set locallyFifo mode. Called only by ForkJoinPool
776     */
777    final void setAsyncMode(boolean async) {
778        locallyFifo = async;
779    }
780
781    /**
821       * Removes and cancels all tasks in queue.  Can be called from any
822       * thread.
823       */
# Line 810 | Line 849 | public class ForkJoinWorkerThread extend
849      // Support methods for ForkJoinTask
850  
851      /**
852 +     * Possibly runs some tasks and/or blocks, until task is done.
853 +     *
854 +     * @param joinMe the task to join
855 +     */
856 +    final void joinTask(ForkJoinTask<?> joinMe) {
857 +        ForkJoinTask<?> prevJoining = joining;
858 +        joining = joinMe;
859 +        while (joinMe.status >= 0) {
860 +            int s = sp;
861 +            if (s == base) {
862 +                nonlocalJoinTask(joinMe);
863 +                break;
864 +            }
865 +            // process local task
866 +            ForkJoinTask<?> t;
867 +            ForkJoinTask<?>[] q = queue;
868 +            int i = (q.length - 1) & --s;
869 +            long u = (i << qShift) + qBase; // raw offset
870 +            if ((t = q[i]) != null &&
871 +                UNSAFE.compareAndSwapObject(q, u, t, null)) {
872 +                /*
873 +                 * This recheck (and similarly in nonlocalJoinTask)
874 +                 * handles cases where joinMe is independently
875 +                 * cancelled or forced even though there is other work
876 +                 * available. Back out of the pop by putting t back
877 +                 * into slot before we commit by setting sp.
878 +                 */
879 +                if (joinMe.status < 0) {
880 +                    UNSAFE.putObjectVolatile(q, u, t);
881 +                    break;
882 +                }
883 +                sp = s;
884 +                t.tryExec();
885 +            }
886 +        }
887 +        joining = prevJoining;
888 +    }
889 +
890 +    /**
891 +     * Tries to locate and help perform tasks for a stealer of the
892 +     * given task (or in turn one of its stealers), blocking (via
893 +     * pool.tryAwaitJoin) upon failure to find work.  Traces
894 +     * stolen->joining links looking for a thread working on
895 +     * a descendant of the given task and with a non-empty queue to
896 +     * steal back and execute tasks from. Inhibits mutual steal chains
897 +     * and scans on outer joins upon nesting to avoid unbounded
898 +     * growth.  Restarts search upon encountering inconsistencies.
899 +     * Tries to block if two passes agree that there are no remaining
900 +     * targets.
901 +     *
902 +     * @param joinMe the task to join
903 +     */
904 +    private void nonlocalJoinTask(ForkJoinTask<?> joinMe) {
905 +        ForkJoinPool p = pool;
906 +        int scans = p.parallelism;       // give up if too many retries
907 +        ForkJoinTask<?> bottom = null;   // target seen when can't descend
908 +        restart: while (joinMe.status >= 0) {
909 +            ForkJoinTask<?> target = null;
910 +            ForkJoinTask<?> next = joinMe;
911 +            while (scans >= 0 && next != null) {
912 +                --scans;
913 +                target = next;
914 +                next = null;
915 +                ForkJoinWorkerThread v = null;
916 +                ForkJoinWorkerThread[] ws = p.workers;
917 +                int n = ws.length;
918 +                for (int j = 0; j < n; ++j) {
919 +                    ForkJoinWorkerThread w = ws[j];
920 +                    if (w != null && w.stolen == target) {
921 +                        v = w;
922 +                        break;
923 +                    }
924 +                }
925 +                if (v != null && v != this) {
926 +                    ForkJoinTask<?> prevStolen = stolen;
927 +                    int b;
928 +                    ForkJoinTask<?>[] q;
929 +                    while ((b = v.base) != v.sp && (q = v.queue) != null) {
930 +                        int i = (q.length - 1) & b;
931 +                        long u = (i << qShift) + qBase;
932 +                        ForkJoinTask<?> t = q[i];
933 +                        if (target.status < 0)
934 +                            continue restart;
935 +                        if (t != null && v.base == b &&
936 +                            UNSAFE.compareAndSwapObject(q, u, t, null)) {
937 +                            if (joinMe.status < 0) {
938 +                                UNSAFE.putObjectVolatile(q, u, t);
939 +                                return; // back out
940 +                            }
941 +                            stolen = t;
942 +                            v.base = b + 1;
943 +                            t.tryExec();
944 +                            stolen = prevStolen;
945 +                        }
946 +                        if (joinMe.status < 0)
947 +                            return;
948 +                    }
949 +                    next = v.joining;
950 +                }
951 +                if (target.status < 0)
952 +                    continue restart;  // inconsistent
953 +                if (joinMe.status < 0)
954 +                    return;
955 +            }
956 +
957 +            if (bottom != target)
958 +                bottom = target;    // recheck landing spot
959 +            else if (p.tryAwaitJoin(joinMe) < 0)
960 +                return;             // successfully blocked
961 +            Thread.yield();         // tame spin in case too many active
962 +        }
963 +    }
964 +
965 +    /**
966       * Returns an estimate of the number of tasks, offset by a
967       * function of number of idle workers.
968       *
# Line 866 | Line 1019 | public class ForkJoinWorkerThread extend
1019       * @return a task, if available
1020       */
1021      final ForkJoinTask<?> pollLocalTask() {
1022 <        while (base != sp) {
1022 >        while (sp != base) {
1023              if (active || (active = pool.tryIncrementActiveCount()))
1024                  return locallyFifo? locallyDeqTask() : popTask();
1025          }
# Line 884 | Line 1037 | public class ForkJoinWorkerThread extend
1037      }
1038  
1039      /**
887     * Executes or processes other tasks awaiting the given task
888     * @return task completion status
889     */
890    final int execWhileJoining(ForkJoinTask<?> joinMe) {
891        int s;
892        while ((s = joinMe.status) >= 0) {
893            ForkJoinTask<?> t = base != sp?
894                popWhileJoining(joinMe) :
895                scanWhileJoining(joinMe);
896            if (t != null)
897                t.tryExec();
898        }
899        return s;
900    }
901
902    /**
903     * Returns or stolen task, if available, unless joinMe is done
904     *
905     * This method is intrinsically nonmodular. To maintain the
906     * property that tasks are never stolen if the awaited task is
907     * ready, we must interleave mechanics of scan with status
908     * checks. We rely here on the commit points of deq that allow us
909     * to cancel a steal even after CASing slot to null, but before
910     * adjusting base index: If, after the CAS, we see that joinMe is
911     * ready, we can back out by placing the task back into the slot,
912     * without adjusting index. The loop is otherwise a variant of the
913     * one in scan().
914     *
915     */
916    private ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
917        int r = seed;
918        ForkJoinPool p = pool;
919        ForkJoinWorkerThread[] ws;
920        int n;
921        outer:while ((ws = p.workers) != null && (n = ws.length) > 1) {
922            int mask = n - 1;
923            int k = r;
924            boolean contended = false; // to retry loop if deq contends
925            for (int j = -n; j <= n; ++j) {
926                if (joinMe.status < 0)
927                    break outer;
928                int b;
929                ForkJoinTask<?>[] q;
930                ForkJoinWorkerThread v = ws[k & mask];
931                r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
932                if (v != null && (b=v.base) != v.sp && (q=v.queue) != null) {
933                    int i = (q.length - 1) & b;
934                    ForkJoinTask<?> t = q[i];
935                    if (t != null && UNSAFE.compareAndSwapObject
936                        (q, (i << qShift) + qBase, t, null)) {
937                        if (joinMe.status >= 0) {
938                            v.base = b + 1;
939                            seed = r;
940                            ++stealCount;
941                            return t;
942                        }
943                        UNSAFE.putObjectVolatile(q, (i<<qShift)+qBase, t);
944                        break outer; // back out
945                    }
946                    contended = true;
947                }
948                k = j < 0 ? r : (k + ((n >>> 1) | 1));
949            }
950            if (!contended && p.tryAwaitBusyJoin(joinMe))
951                break;
952        }
953        return null;
954    }
955
956    /**
957     * Version of popTask with join checks surrounding extraction.
958     * Uses the same backout strategy as helpJoinTask. Note that
959     * we ignore locallyFifo flag for local tasks here since helping
960     * joins only make sense in LIFO mode.
961     *
962     * @return a popped task, if available, unless joinMe is done
963     */
964    private ForkJoinTask<?> popWhileJoining(ForkJoinTask<?> joinMe) {
965        int s;
966        ForkJoinTask<?>[] q;
967        while ((s = sp) != base && (q = queue) != null && joinMe.status >= 0) {
968            int i = (q.length - 1) & --s;
969            ForkJoinTask<?> t = q[i];
970            if (t != null && UNSAFE.compareAndSwapObject
971                (q, (i << qShift) + qBase, t, null)) {
972                if (joinMe.status >= 0) {
973                    sp = s;
974                    return t;
975                }
976                UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t);
977                break;  // back out
978            }
979        }
980        return null;
981    }
982
983    /**
1040       * Runs tasks until {@code pool.isQuiescent()}.
1041       */
1042      final void helpQuiescePool() {
1043          for (;;) {
1044              ForkJoinTask<?> t = pollLocalTask();
1045 <            if (t != null || (t = scan()) != null)
1045 >            if (t != null || (t = scan()) != null) {
1046                  t.tryExec();
1047 +                stolen = null;
1048 +            }
1049              else {
1050                  ForkJoinPool p = pool;
1051                  if (active) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines