ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinTask.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinTask.java (file contents):
Revision 1.2 by dl, Wed Jan 7 16:07:37 2009 UTC vs.
Revision 1.7 by jsr166, Mon Jul 20 21:45:06 2009 UTC

# Line 18 | Line 18 | import java.lang.reflect.*;
18   * lighter weight than a normal thread.  Huge numbers of tasks and
19   * subtasks may be hosted by a small number of actual threads in a
20   * ForkJoinPool, at the price of some usage limitations.
21 < *
21 > *
22   * <p> A "main" ForkJoinTask begins execution when submitted to a
23   * {@link ForkJoinPool}. Once started, it will usually in turn start
24   * other subtasks.  As indicated by the name of this class, many
# Line 28 | Line 28 | import java.lang.reflect.*;
28   * of other methods that can come into play in advanced usages, as
29   * well as extension mechanics that allow support of new forms of
30   * fork/join processing.
31 < *
31 > *
32   * <p>A ForkJoinTask is a lightweight form of {@link Future}.  The
33   * efficiency of ForkJoinTasks stems from a set of restrictions (that
34   * are only partially statically enforceable) reflecting their
# Line 82 | Line 82 | import java.lang.reflect.*;
82   * instances of different task subclasses to call each others
83   * methods), some of them may only be called from within other
84   * ForkJoinTasks. Attempts to invoke them in other contexts result in
85 < * exceptions or errors including ClassCastException.
85 > * exceptions or errors possibly including ClassCastException.
86   *
87   * <p>Most base support methods are <code>final</code> because their
88   * implementations are intrinsically tied to the underlying
# Line 257 | Line 257 | public abstract class ForkJoinTask<V> im
257       * surrounded with pool notifications.
258       * @return status upon exit
259       */
260 <    final int awaitDone(ForkJoinWorkerThread w, boolean maintainParallelism) {
260 >    private int awaitDone(ForkJoinWorkerThread w, boolean maintainParallelism) {
261          ForkJoinPool pool = w == null? null : w.pool;
262          int s;
263          while ((s = status) >= 0) {
# Line 276 | Line 276 | public abstract class ForkJoinTask<V> im
276       * Timed version of awaitDone
277       * @return status upon exit
278       */
279 <    final int awaitDone(ForkJoinWorkerThread w, long nanos) {
279 >    private int awaitDone(ForkJoinWorkerThread w, long nanos) {
280          ForkJoinPool pool = w == null? null : w.pool;
281          int s;
282          while ((s = status) >= 0) {
# Line 330 | Line 330 | public abstract class ForkJoinTask<V> im
330          if (w == null)
331              Thread.currentThread().interrupt(); // re-interrupt
332          else if (w.isTerminating())
333 <            cancelIgnoreExceptions();
333 >            cancelIgnoringExceptions();
334          // else if FJworker, ignore interrupt
335      }
336  
# Line 449 | Line 449 | public abstract class ForkJoinTask<V> im
449      /**
450       * Cancel, ignoring any exceptions it throws
451       */
452 <    final void cancelIgnoreExceptions() {
452 >    final void cancelIgnoringExceptions() {
453          try {
454              cancel(false);
455          } catch(Throwable ignore) {
456          }
457      }
458  
459 +    /**
460 +     * Main implementation of helpJoin
461 +     */
462 +    private int busyJoin(ForkJoinWorkerThread w) {
463 +        int s;
464 +        ForkJoinTask<?> t;
465 +        while ((s = status) >= 0 && (t = w.scanWhileJoining(this)) != null)
466 +            t.quietlyExec();
467 +        return (s >= 0)? awaitDone(w, false) : s; // block if no work
468 +    }
469 +
470      // public methods
471  
472      /**
# Line 464 | Line 475 | public abstract class ForkJoinTask<V> im
475       * than once unless it has completed and been reinitialized.  This
476       * method may be invoked only from within ForkJoinTask
477       * computations. Attempts to invoke in other contexts result in
478 <     * exceptions or errors including ClassCastException.
478 >     * exceptions or errors possibly including ClassCastException.
479       */
480      public final void fork() {
481          ((ForkJoinWorkerThread)(Thread.currentThread())).pushTask(this);
# Line 485 | Line 496 | public abstract class ForkJoinTask<V> im
496          return getRawResult();
497      }
498  
488    public final V get() throws InterruptedException, ExecutionException {
489        ForkJoinWorkerThread w = getWorker();
490        if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
491            awaitDone(w, true);
492        return reportFutureResult();
493    }
494
495    public final V get(long timeout, TimeUnit unit)
496        throws InterruptedException, ExecutionException, TimeoutException {
497        ForkJoinWorkerThread w = getWorker();
498        if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
499            awaitDone(w, unit.toNanos(timeout));
500        return reportTimedFutureResult();
501    }
502
499      /**
500       * Commences performing this task, awaits its completion if
501       * necessary, and return its result.
# Line 519 | Line 515 | public abstract class ForkJoinTask<V> im
515       * both of them or an exception is encountered. This method may be
516       * invoked only from within ForkJoinTask computations. Attempts to
517       * invoke in other contexts result in exceptions or errors
518 <     * including ClassCastException.
518 >     * possibly including ClassCastException.
519       * @param t1 one task
520       * @param t2 the other task
521       * @throws NullPointerException if t1 or t2 are null
# Line 536 | Line 532 | public abstract class ForkJoinTask<V> im
532       * for all of them. If any task encounters an exception, others
533       * may be cancelled.  This method may be invoked only from within
534       * ForkJoinTask computations. Attempts to invoke in other contexts
535 <     * result in exceptions or errors including ClassCastException.
535 >     * result in exceptions or errors possibly including ClassCastException.
536       * @param tasks the array of tasks
537       * @throws NullPointerException if tasks or any element are null.
538       * @throws RuntimeException or Error if any task did so.
# Line 580 | Line 576 | public abstract class ForkJoinTask<V> im
576       * encounters an exception, others may be cancelled.  This method
577       * may be invoked only from within ForkJoinTask
578       * computations. Attempts to invoke in other contexts resul!t in
579 <     * exceptions or errors including ClassCastException.
579 >     * exceptions or errors possibly including ClassCastException.
580       * @param tasks the collection of tasks
581       * @throws NullPointerException if tasks or any element are null.
582       * @throws RuntimeException or Error if any task did so.
# Line 642 | Line 638 | public abstract class ForkJoinTask<V> im
638      }
639  
640      /**
645     * Returns true if this task threw an exception or was cancelled
646     * @return true if this task threw an exception or was cancelled
647     */
648    public final boolean isCompletedAbnormally() {
649        return (status & COMPLETION_MASK) < NORMAL;
650    }
651
652    /**
653     * Returns the exception thrown by the base computation, or a
654     * CancellationException if cancelled, or null if none or if the
655     * method has not yet completed.
656     * @return the exception, or null if none
657     */
658    public final Throwable getException() {
659        int s = status & COMPLETION_MASK;
660        if (s >= NORMAL)
661            return null;
662        if (s == CANCELLED)
663            return new CancellationException();
664        return exceptionMap.get(this);
665    }
666
667    /**
641       * Asserts that the results of this task's computation will not be
642       * used. If a cancellation occurs before atempting to execute this
643       * task, then execution will be suppressed, <code>isCancelled</code>
# Line 697 | Line 670 | public abstract class ForkJoinTask<V> im
670      }
671  
672      /**
673 +     * Returns true if this task threw an exception or was cancelled
674 +     * @return true if this task threw an exception or was cancelled
675 +     */
676 +    public final boolean isCompletedAbnormally() {
677 +        return (status & COMPLETION_MASK) < NORMAL;
678 +    }
679 +
680 +    /**
681 +     * Returns the exception thrown by the base computation, or a
682 +     * CancellationException if cancelled, or null if none or if the
683 +     * method has not yet completed.
684 +     * @return the exception, or null if none
685 +     */
686 +    public final Throwable getException() {
687 +        int s = status & COMPLETION_MASK;
688 +        if (s >= NORMAL)
689 +            return null;
690 +        if (s == CANCELLED)
691 +            return new CancellationException();
692 +        return exceptionMap.get(this);
693 +    }
694 +
695 +    /**
696       * Completes this task abnormally, and if not already aborted or
697       * cancelled, causes it to throw the given exception upon
698       * <code>join</code> and related operations. This method may be used
# Line 738 | Line 734 | public abstract class ForkJoinTask<V> im
734          setNormalCompletion();
735      }
736  
737 +    public final V get() throws InterruptedException, ExecutionException {
738 +        ForkJoinWorkerThread w = getWorker();
739 +        if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
740 +            awaitDone(w, true);
741 +        return reportFutureResult();
742 +    }
743 +
744 +    public final V get(long timeout, TimeUnit unit)
745 +        throws InterruptedException, ExecutionException, TimeoutException {
746 +        ForkJoinWorkerThread w = getWorker();
747 +        if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
748 +            awaitDone(w, unit.toNanos(timeout));
749 +        return reportTimedFutureResult();
750 +    }
751 +
752      /**
753       * Possibly executes other tasks until this task is ready, then
754       * returns the result of the computation.  This method may be more
# Line 747 | Line 758 | public abstract class ForkJoinTask<V> im
758       * while helping. (This usually holds for pure divide-and-conquer
759       * tasks). This method may be invoked only from within
760       * ForkJoinTask computations. Attempts to invoke in other contexts
761 <     * resul!t in exceptions or errors including ClassCastException.
761 >     * resul!t in exceptions or errors possibly including ClassCastException.
762       * @return the computed result
763       */
764      public final V helpJoin() {
765          ForkJoinWorkerThread w = (ForkJoinWorkerThread)(Thread.currentThread());
766          if (status < 0 || !w.unpushTask(this) || !tryExec())
767 <            reportException(w.helpJoinTask(this));
767 >            reportException(busyJoin(w));
768          return getRawResult();
769      }
770  
# Line 761 | Line 772 | public abstract class ForkJoinTask<V> im
772       * Possibly executes other tasks until this task is ready.  This
773       * method may be invoked only from within ForkJoinTask
774       * computations. Attempts to invoke in other contexts resul!t in
775 <     * exceptions or errors including ClassCastException.
775 >     * exceptions or errors possibly including ClassCastException.
776       */
777      public final void quietlyHelpJoin() {
778          if (status >= 0) {
779              ForkJoinWorkerThread w =
780                  (ForkJoinWorkerThread)(Thread.currentThread());
781              if (!w.unpushTask(this) || !tryQuietlyInvoke())
782 <                w.helpJoinTask(this);
782 >                busyJoin(w);
783          }
784      }
785  
# Line 799 | Line 810 | public abstract class ForkJoinTask<V> im
810      }
811  
812      /**
813 +     * Possibly executes tasks until the pool hosting the current task
814 +     * {@link ForkJoinPool#isQuiescent}. This method may be of use in
815 +     * designs in which many tasks are forked, but none are explicitly
816 +     * joined, instead executing them until all are processed.
817 +     */
818 +    public static void helpQuiesce() {
819 +        ((ForkJoinWorkerThread)(Thread.currentThread())).
820 +            helpQuiescePool();
821 +    }
822 +
823 +    /**
824       * Resets the internal bookkeeping state of this task, allowing a
825       * subsequent <code>fork</code>. This method allows repeated reuse of
826       * this task, but only if reuse occurs when this task has either
# Line 833 | Line 855 | public abstract class ForkJoinTask<V> im
855       * alternative local processing of tasks that could have been, but
856       * were not, stolen. This method may be invoked only from within
857       * ForkJoinTask computations. Attempts to invoke in other contexts
858 <     * result in exceptions or errors including ClassCastException.
858 >     * result in exceptions or errors possibly including ClassCastException.
859       * @return true if unforked
860       */
861      public boolean tryUnfork() {
# Line 841 | Line 863 | public abstract class ForkJoinTask<V> im
863      }
864  
865      /**
844     * Possibly executes tasks until the pool hosting the current task
845     * {@link ForkJoinPool#isQuiescent}. This method may be of use in
846     * designs in which many tasks are forked, but none are explicitly
847     * joined, instead executing them until all are processed.
848     */
849    public static void helpQuiesce() {
850        ((ForkJoinWorkerThread)(Thread.currentThread())).
851            helpQuiescePool();
852    }
853
854    /**
866       * Returns an estimate of the number of tasks that have been
867       * forked by the current worker thread but not yet executed. This
868       * value may be useful for heuristic decisions about whether to
# Line 915 | Line 926 | public abstract class ForkJoinTask<V> im
926      protected abstract boolean exec();
927  
928      /**
929 <     * Returns, but does not unschedule or execute, the task most
930 <     * recently forked by the current thread but not yet executed, if
931 <     * one is available. There is no guarantee that this task will
932 <     * actually be polled or executed next.
933 <     * This method is designed primarily to support extensions,
934 <     * and is unlikely to be useful otherwise.
929 >     * Returns, but does not unschedule or execute, the task queued by
930 >     * the current thread but not yet executed, if one is
931 >     * available. There is no guarantee that this task will actually
932 >     * be polled or executed next.  This method is designed primarily
933 >     * to support extensions, and is unlikely to be useful otherwise.
934 >     * This method may be invoked only from within ForkJoinTask
935 >     * computations. Attempts to invoke in other contexts result in
936 >     * exceptions or errors possibly including ClassCastException.
937       *
938       * @return the next task, or null if none are available
939       */
# Line 929 | Line 942 | public abstract class ForkJoinTask<V> im
942      }
943  
944      /**
945 <     * Unschedules and returns, without executing, the task most
946 <     * recently forked by the current thread but not yet executed.
947 <     * This method is designed primarily to support extensions,
948 <     * and is unlikely to be useful otherwise.
945 >     * Unschedules and returns, without executing, the next task
946 >     * queued by the current thread but not yet executed.  This method
947 >     * is designed primarily to support extensions, and is unlikely to
948 >     * be useful otherwise.  This method may be invoked only from
949 >     * within ForkJoinTask computations. Attempts to invoke in other
950 >     * contexts result in exceptions or errors possibly including
951 >     * ClassCastException.
952       *
953       * @return the next task, or null if none are available
954       */
955      protected static ForkJoinTask<?> pollNextLocalTask() {
956 <        return ((ForkJoinWorkerThread)(Thread.currentThread())).popTask();
956 >        return ((ForkJoinWorkerThread)(Thread.currentThread())).pollLocalTask();
957      }
958  
959      /**
960 <     * Unschedules and returns, without executing, the task most
961 <     * recently forked by the current thread but not yet executed, if
962 <     * one is available, or if not available, a task that was forked
963 <     * by some other thread, if available. Availability may be
964 <     * transient, so a <code>null</code> result does not necessarily
965 <     * imply quiecence of the pool this task is operating in.
966 <     * This method is designed primarily to support extensions,
967 <     * and is unlikely to be useful otherwise.
968 <     *
960 >     * Unschedules and returns, without executing, the next task
961 >     * queued by the current thread but not yet executed, if one is
962 >     * available, or if not available, a task that was forked by some
963 >     * other thread, if available. Availability may be transient, so a
964 >     * <code>null</code> result does not necessarily imply quiecence
965 >     * of the pool this task is operating in.  This method is designed
966 >     * primarily to support extensions, and is unlikely to be useful
967 >     * otherwise.  This method may be invoked only from within
968 >     * ForkJoinTask computations. Attempts to invoke in other contexts
969 >     * result in exceptions or errors possibly including
970 >     * ClassCastException.
971 >     *
972       * @return a task, or null if none are available
973       */
974      protected static ForkJoinTask<?> pollTask() {
975          return ((ForkJoinWorkerThread)(Thread.currentThread())).
976 <            getLocalOrStolenTask();
976 >            pollTask();
977      }
978  
979      // Serialization support
# Line 989 | Line 1008 | public abstract class ForkJoinTask<V> im
1008      }
1009  
1010      // Temporary Unsafe mechanics for preliminary release
1011 +    private static Unsafe getUnsafe() throws Throwable {
1012 +        try {
1013 +            return Unsafe.getUnsafe();
1014 +        } catch (SecurityException se) {
1015 +            try {
1016 +                return java.security.AccessController.doPrivileged
1017 +                    (new java.security.PrivilegedExceptionAction<Unsafe>() {
1018 +                        public Unsafe run() throws Exception {
1019 +                            return getUnsafePrivileged();
1020 +                        }});
1021 +            } catch (java.security.PrivilegedActionException e) {
1022 +                throw e.getCause();
1023 +            }
1024 +        }
1025 +    }
1026 +
1027 +    private static Unsafe getUnsafePrivileged()
1028 +            throws NoSuchFieldException, IllegalAccessException {
1029 +        Field f = Unsafe.class.getDeclaredField("theUnsafe");
1030 +        f.setAccessible(true);
1031 +        return (Unsafe) f.get(null);
1032 +    }
1033 +
1034 +    private static long fieldOffset(String fieldName)
1035 +            throws NoSuchFieldException {
1036 +        return _unsafe.objectFieldOffset
1037 +            (ForkJoinTask.class.getDeclaredField(fieldName));
1038 +    }
1039  
1040      static final Unsafe _unsafe;
1041      static final long statusOffset;
1042  
1043      static {
1044          try {
1045 <            if (ForkJoinTask.class.getClassLoader() != null) {
1046 <                Field f = Unsafe.class.getDeclaredField("theUnsafe");
1047 <                f.setAccessible(true);
1048 <                _unsafe = (Unsafe)f.get(null);
1049 <            }
1003 <            else
1004 <                _unsafe = Unsafe.getUnsafe();
1005 <            statusOffset = _unsafe.objectFieldOffset
1006 <                (ForkJoinTask.class.getDeclaredField("status"));
1007 <        } catch (Exception ex) { throw new Error(ex); }
1045 >            _unsafe = getUnsafe();
1046 >            statusOffset = fieldOffset("status");
1047 >        } catch (Throwable e) {
1048 >            throw new RuntimeException("Could not initialize intrinsics", e);
1049 >        }
1050      }
1051  
1052   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines