ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinWorkerThread.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinWorkerThread.java (file contents):
Revision 1.1 by dl, Tue Jan 6 14:30:31 2009 UTC vs.
Revision 1.2 by dl, Wed Jan 7 16:07:37 2009 UTC

# Line 13 | Line 13 | import sun.misc.Unsafe;
13   import java.lang.reflect.*;
14  
15   /**
16 < * A thread that is internally managed by a ForkJoinPool to execute
17 < * ForkJoinTasks. This class additionally provides public
18 < * <tt>static</tt> methods accessing some basic scheduling and
19 < * execution mechanics for the <em>current</em>
20 < * ForkJoinWorkerThread. These methods may be invoked only from within
21 < * other ForkJoinTask computations. Attempts to invoke in other
22 < * contexts result in exceptions or errors including
23 < * ClassCastException.  These methods enable construction of
24 < * special-purpose task classes, as well as specialized idioms
25 < * occasionally useful in ForkJoinTask processing.
26 < *
27 < * <p>The form of supported static methods reflects the fact that
28 < * worker threads may access and process tasks obtained in any of
29 < * three ways. In preference order: <em>Local</em> tasks are processed
30 < * in LIFO (newest first) order. <em>Stolen</em> tasks are obtained
31 < * from other threads in FIFO (oldest first) order, only if there are
32 < * no local tasks to run.  <em>Submissions</em> form a FIFO queue
33 < * common to the entire pool, and are started only if no other
34 < * work is available.
35 < *
36 < * <p> This class is subclassable solely for the sake of adding
37 < * functionality -- there are no overridable methods dealing with
38 < * scheduling or execution. However, you can override initialization
39 < * and termination cleanup methods surrounding the main task
40 < * processing loop.  If you do create such a subclass, you will also
41 < * need to supply a custom ForkJoinWorkerThreadFactory to use it in a
42 < * ForkJoinPool.
16 > * A thread managed by a {@link ForkJoinPool}.  This class is
17 > * subclassable solely for the sake of adding functionality -- there
18 > * are no overridable methods dealing with scheduling or
19 > * execution. However, you can override initialization and termination
20 > * cleanup methods surrounding the main task processing loop.  If you
21 > * do create such a subclass, you will also need to supply a custom
22 > * ForkJoinWorkerThreadFactory to use it in a ForkJoinPool.
23 > *
24 > * <p>This class also provides methods for generating per-thread
25 > * random numbers, with the same properties as {@link
26 > * java.util.Random} but with each generator isolated from those of
27 > * other threads.
28   */
29   public class ForkJoinWorkerThread extends Thread {
30      /*
# Line 239 | Line 224 | public class ForkJoinWorkerThread extend
224          // remaining initialization deferred to onStart
225      }
226  
227 +    // public access methods
228 +
229 +    /**
230 +     * Returns the pool hosting the current task execution.
231 +     * @return the pool
232 +     */
233 +    public static ForkJoinPool getPool() {
234 +        return ((ForkJoinWorkerThread)(Thread.currentThread())).pool;
235 +    }
236 +
237 +    /**
238 +     * Returns the index number of the current worker thread in its
239 +     * pool.  The returned value ranges from zero to the maximum
240 +     * number of threads (minus one) that have ever been created in
241 +     * the pool.  This method may be useful for applications that
242 +     * track status or collect results on a per-worker basis.
243 +     * @return the index number.
244 +     */
245 +    public static int getPoolIndex() {
246 +        return ((ForkJoinWorkerThread)(Thread.currentThread())).poolIndex;
247 +    }
248 +
249      //  Access methods used by Pool
250  
251      /**
# Line 366 | Line 373 | public class ForkJoinWorkerThread extend
373      /**
374       * Returns next task to pop.
375       */
376 <    private ForkJoinTask<?> peekTask() {
376 >    final ForkJoinTask<?> peekTask() {
377          ForkJoinTask<?>[] q = queue;
378          return q == null? null : q[(sp - 1) & (q.length - 1)];
379      }
# Line 659 | Line 666 | public class ForkJoinWorkerThread extend
666          }
667      }
668  
669 +    // Support for ForkJoinTask methods
670 +
671      /**
672       * Implements ForkJoinTask.helpJoin
673       */
# Line 681 | Line 690 | public class ForkJoinWorkerThread extend
690          return s;
691      }
692  
684    // Support for public static and/or ForkJoinTask methods
685
686    /**
687     * Returns an estimate of the number of tasks in the queue.
688     */
689    final int getQueueSize() {
690        int b = base;
691        int n = sp - b;
692        return n <= 0? 0 : n; // suppress momentarily negative values
693    }
694
695    /**
696     * Runs one popped task, if available
697     * @return true if ran a task
698     */
699    private boolean runLocalTask() {
700        ForkJoinTask<?> t = popTask();
701        if (t == null)
702            return false;
703        t.quietlyExec();
704        return true;
705    }
706
693      /**
694       * Pops or steals a task
695       * @return task, or null if none available
696       */
697 <    private ForkJoinTask<?> getLocalOrStolenTask() {
697 >    final ForkJoinTask<?> getLocalOrStolenTask() {
698          ForkJoinTask<?> t = popTask();
699          return t != null? t : scan(null, false);
700      }
701  
702      /**
717     * Runs a popped or stolen task, if available
718     * @return true if ran a task
719     */
720    private boolean runLocalOrStolenTask() {
721        ForkJoinTask<?> t = getLocalOrStolenTask();
722        if (t == null)
723            return false;
724        t.quietlyExec();
725        return true;
726    }
727
728    /**
703       * Runs tasks until pool isQuiescent
704       */
705      final void helpQuiescePool() {
732        activate();
706          for (;;) {
707 <            if (!runLocalOrStolenTask()) {
707 >            ForkJoinTask<?> t = getLocalOrStolenTask();
708 >            if (t != null) {
709 >                activate();
710 >                t.quietlyExec();
711 >            }
712 >            else {
713                  inactivate();
714                  if (pool.isQuiescent()) {
715                      activate(); // re-activate on exit
# Line 742 | Line 720 | public class ForkJoinWorkerThread extend
720      }
721  
722      /**
723 <     * Returns an estimate of the number of tasks, offset by a
746 <     * function of number of idle workers.
747 <     */
748 <    final int getEstimatedSurplusTaskCount() {
749 <        return (sp - base) - (pool.getIdleThreadCount() >>> 1);
750 <    }
751 <
752 <    // Public methods on current thread
753 <
754 <    /**
755 <     * Returns the pool hosting the current task execution.
756 <     * @return the pool
757 <     */
758 <    public static ForkJoinPool getPool() {
759 <        return ((ForkJoinWorkerThread)(Thread.currentThread())).pool;
760 <    }
761 <
762 <    /**
763 <     * Returns the index number of the current worker thread in its
764 <     * pool.  The returned value ranges from zero to the maximum
765 <     * number of threads (minus one) that have ever been created in
766 <     * the pool.  This method may be useful for applications that
767 <     * track status or collect results per-worker rather than
768 <     * per-task.
769 <     * @return the index number.
770 <     */
771 <    public static int getPoolIndex() {
772 <        return ((ForkJoinWorkerThread)(Thread.currentThread())).poolIndex;
773 <    }
774 <
775 <    /**
776 <     * Returns an estimate of the number of tasks waiting to be run by
777 <     * the current worker thread. This value may be useful for
778 <     * heuristic decisions about whether to fork other tasks.
779 <     * @return the number of tasks
780 <     */
781 <    public static int getLocalQueueSize() {
782 <        return ((ForkJoinWorkerThread)(Thread.currentThread())).
783 <            getQueueSize();
784 <    }
785 <
786 <    /**
787 <     * Returns, but does not remove or execute, the next task locally
788 <     * queued for execution by the current worker thread. There is no
789 <     * guarantee that this task will be the next one actually returned
790 <     * or executed from other polling or execution methods.
791 <     * @return the next task or null if none
792 <     */
793 <    public static ForkJoinTask<?> peekLocalTask() {
794 <        return ((ForkJoinWorkerThread)(Thread.currentThread())).peekTask();
795 <    }
796 <
797 <    /**
798 <     * Removes and returns, without executing, the next task queued
799 <     * for execution in the current worker thread's local queue.
800 <     * @return the next task to execute, or null if none
801 <     */
802 <    public static ForkJoinTask<?> pollLocalTask() {
803 <        return ((ForkJoinWorkerThread)(Thread.currentThread())).popTask();
804 <    }
805 <
806 <    /**
807 <     * Execute the next task locally queued by the current worker, if
808 <     * one is available.
809 <     * @return true if a task was run; a false return indicates
810 <     * that no task was available.
811 <     */
812 <    public static boolean executeLocalTask() {
813 <        return ((ForkJoinWorkerThread)(Thread.currentThread())).
814 <            runLocalTask();
815 <    }
816 <
817 <    /**
818 <     * Removes and returns, without executing, the next task queued
819 <     * for execution in the current worker thread's local queue or if
820 <     * none, a task stolen from another worker, if one is available.
821 <     * A null return does not necessarily imply that all tasks are
822 <     * completed, only that there are currently none available.
823 <     * @return the next task to execute, or null if none
723 >     * Returns an estimate of the number of tasks in the queue.
724       */
725 <    public static ForkJoinTask<?> pollTask() {
726 <        return ((ForkJoinWorkerThread)(Thread.currentThread())).
727 <            getLocalOrStolenTask();
725 >    final int getQueueSize() {
726 >        int b = base;
727 >        int n = sp - b;
728 >        return n <= 0? 0 : n; // suppress momentarily negative values
729      }
730  
731      /**
732 <     * Helps this program complete by processing a local or stolen
733 <     * task, if one is available.  This method may be useful when
833 <     * several tasks are forked, and only one of them must be joined,
834 <     * as in:
835 <     *
836 <     * <pre>
837 <     *   while (!t1.isDone() &amp;&amp; !t2.isDone())
838 <     *     ForkJoinWorkerThread.executeTask();
839 <     * </pre>
840 <     *
841 <     * @return true if a task was run; a false return indicates
842 <     * that no task was available.
732 >     * Returns an estimate of the number of tasks, offset by a
733 >     * function of number of idle workers.
734       */
735 <    public static boolean executeTask() {
736 <        return ((ForkJoinWorkerThread)(Thread.currentThread())).
846 <            runLocalOrStolenTask();
735 >    final int getEstimatedSurplusTaskCount() {
736 >        return (sp - base) - (pool.getIdleThreadCount() >>> 1);
737      }
738  
739      // Per-worker exported random numbers

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines