ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinTask.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinTask.java (file contents):
Revision 1.2 by dl, Wed Jan 7 16:07:37 2009 UTC vs.
Revision 1.5 by jsr166, Thu Mar 19 05:10:42 2009 UTC

# Line 18 | Line 18 | import java.lang.reflect.*;
18   * lighter weight than a normal thread.  Huge numbers of tasks and
19   * subtasks may be hosted by a small number of actual threads in a
20   * ForkJoinPool, at the price of some usage limitations.
21 < *
21 > *
22   * <p> A "main" ForkJoinTask begins execution when submitted to a
23   * {@link ForkJoinPool}. Once started, it will usually in turn start
24   * other subtasks.  As indicated by the name of this class, many
# Line 28 | Line 28 | import java.lang.reflect.*;
28   * of other methods that can come into play in advanced usages, as
29   * well as extension mechanics that allow support of new forms of
30   * fork/join processing.
31 < *
31 > *
32   * <p>A ForkJoinTask is a lightweight form of {@link Future}.  The
33   * efficiency of ForkJoinTasks stems from a set of restrictions (that
34   * are only partially statically enforceable) reflecting their
# Line 82 | Line 82 | import java.lang.reflect.*;
82   * instances of different task subclasses to call each others
83   * methods), some of them may only be called from within other
84   * ForkJoinTasks. Attempts to invoke them in other contexts result in
85 < * exceptions or errors including ClassCastException.
85 > * exceptions or errors possibly including ClassCastException.
86   *
87   * <p>Most base support methods are <code>final</code> because their
88   * implementations are intrinsically tied to the underlying
# Line 257 | Line 257 | public abstract class ForkJoinTask<V> im
257       * surrounded with pool notifications.
258       * @return status upon exit
259       */
260 <    final int awaitDone(ForkJoinWorkerThread w, boolean maintainParallelism) {
260 >    private int awaitDone(ForkJoinWorkerThread w, boolean maintainParallelism) {
261          ForkJoinPool pool = w == null? null : w.pool;
262          int s;
263          while ((s = status) >= 0) {
# Line 276 | Line 276 | public abstract class ForkJoinTask<V> im
276       * Timed version of awaitDone
277       * @return status upon exit
278       */
279 <    final int awaitDone(ForkJoinWorkerThread w, long nanos) {
279 >    private int awaitDone(ForkJoinWorkerThread w, long nanos) {
280          ForkJoinPool pool = w == null? null : w.pool;
281          int s;
282          while ((s = status) >= 0) {
# Line 330 | Line 330 | public abstract class ForkJoinTask<V> im
330          if (w == null)
331              Thread.currentThread().interrupt(); // re-interrupt
332          else if (w.isTerminating())
333 <            cancelIgnoreExceptions();
333 >            cancelIgnoringExceptions();
334          // else if FJworker, ignore interrupt
335      }
336  
# Line 449 | Line 449 | public abstract class ForkJoinTask<V> im
449      /**
450       * Cancel, ignoring any exceptions it throws
451       */
452 <    final void cancelIgnoreExceptions() {
452 >    final void cancelIgnoringExceptions() {
453          try {
454              cancel(false);
455          } catch(Throwable ignore) {
456          }
457      }
458  
459 +    /**
460 +     * Main implementation of helpJoin
461 +     */
462 +    private int busyJoin(ForkJoinWorkerThread w) {
463 +        int s;
464 +        ForkJoinTask<?> t;
465 +        while ((s = status) >= 0 && (t = w.scanWhileJoining(this)) != null)
466 +            t.quietlyExec();
467 +        return (s >= 0)? awaitDone(w, false) : s; // block if no work
468 +    }
469 +
470      // public methods
471  
472      /**
# Line 464 | Line 475 | public abstract class ForkJoinTask<V> im
475       * than once unless it has completed and been reinitialized.  This
476       * method may be invoked only from within ForkJoinTask
477       * computations. Attempts to invoke in other contexts result in
478 <     * exceptions or errors including ClassCastException.
478 >     * exceptions or errors possibly including ClassCastException.
479       */
480      public final void fork() {
481          ((ForkJoinWorkerThread)(Thread.currentThread())).pushTask(this);
# Line 485 | Line 496 | public abstract class ForkJoinTask<V> im
496          return getRawResult();
497      }
498  
488    public final V get() throws InterruptedException, ExecutionException {
489        ForkJoinWorkerThread w = getWorker();
490        if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
491            awaitDone(w, true);
492        return reportFutureResult();
493    }
494
495    public final V get(long timeout, TimeUnit unit)
496        throws InterruptedException, ExecutionException, TimeoutException {
497        ForkJoinWorkerThread w = getWorker();
498        if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
499            awaitDone(w, unit.toNanos(timeout));
500        return reportTimedFutureResult();
501    }
502
499      /**
500       * Commences performing this task, awaits its completion if
501       * necessary, and return its result.
# Line 519 | Line 515 | public abstract class ForkJoinTask<V> im
515       * both of them or an exception is encountered. This method may be
516       * invoked only from within ForkJoinTask computations. Attempts to
517       * invoke in other contexts result in exceptions or errors
518 <     * including ClassCastException.
518 >     * possibly including ClassCastException.
519       * @param t1 one task
520       * @param t2 the other task
521       * @throws NullPointerException if t1 or t2 are null
# Line 536 | Line 532 | public abstract class ForkJoinTask<V> im
532       * for all of them. If any task encounters an exception, others
533       * may be cancelled.  This method may be invoked only from within
534       * ForkJoinTask computations. Attempts to invoke in other contexts
535 <     * result in exceptions or errors including ClassCastException.
535 >     * result in exceptions or errors possibly including ClassCastException.
536       * @param tasks the array of tasks
537       * @throws NullPointerException if tasks or any element are null.
538       * @throws RuntimeException or Error if any task did so.
# Line 580 | Line 576 | public abstract class ForkJoinTask<V> im
576       * encounters an exception, others may be cancelled.  This method
577       * may be invoked only from within ForkJoinTask
578       * computations. Attempts to invoke in other contexts resul!t in
579 <     * exceptions or errors including ClassCastException.
579 >     * exceptions or errors possibly including ClassCastException.
580       * @param tasks the collection of tasks
581       * @throws NullPointerException if tasks or any element are null.
582       * @throws RuntimeException or Error if any task did so.
# Line 642 | Line 638 | public abstract class ForkJoinTask<V> im
638      }
639  
640      /**
645     * Returns true if this task threw an exception or was cancelled
646     * @return true if this task threw an exception or was cancelled
647     */
648    public final boolean isCompletedAbnormally() {
649        return (status & COMPLETION_MASK) < NORMAL;
650    }
651
652    /**
653     * Returns the exception thrown by the base computation, or a
654     * CancellationException if cancelled, or null if none or if the
655     * method has not yet completed.
656     * @return the exception, or null if none
657     */
658    public final Throwable getException() {
659        int s = status & COMPLETION_MASK;
660        if (s >= NORMAL)
661            return null;
662        if (s == CANCELLED)
663            return new CancellationException();
664        return exceptionMap.get(this);
665    }
666
667    /**
641       * Asserts that the results of this task's computation will not be
642       * used. If a cancellation occurs before atempting to execute this
643       * task, then execution will be suppressed, <code>isCancelled</code>
# Line 697 | Line 670 | public abstract class ForkJoinTask<V> im
670      }
671  
672      /**
673 +     * Returns true if this task threw an exception or was cancelled
674 +     * @return true if this task threw an exception or was cancelled
675 +     */
676 +    public final boolean isCompletedAbnormally() {
677 +        return (status & COMPLETION_MASK) < NORMAL;
678 +    }
679 +
680 +    /**
681 +     * Returns the exception thrown by the base computation, or a
682 +     * CancellationException if cancelled, or null if none or if the
683 +     * method has not yet completed.
684 +     * @return the exception, or null if none
685 +     */
686 +    public final Throwable getException() {
687 +        int s = status & COMPLETION_MASK;
688 +        if (s >= NORMAL)
689 +            return null;
690 +        if (s == CANCELLED)
691 +            return new CancellationException();
692 +        return exceptionMap.get(this);
693 +    }
694 +
695 +    /**
696       * Completes this task abnormally, and if not already aborted or
697       * cancelled, causes it to throw the given exception upon
698       * <code>join</code> and related operations. This method may be used
# Line 738 | Line 734 | public abstract class ForkJoinTask<V> im
734          setNormalCompletion();
735      }
736  
737 +    public final V get() throws InterruptedException, ExecutionException {
738 +        ForkJoinWorkerThread w = getWorker();
739 +        if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
740 +            awaitDone(w, true);
741 +        return reportFutureResult();
742 +    }
743 +
744 +    public final V get(long timeout, TimeUnit unit)
745 +        throws InterruptedException, ExecutionException, TimeoutException {
746 +        ForkJoinWorkerThread w = getWorker();
747 +        if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
748 +            awaitDone(w, unit.toNanos(timeout));
749 +        return reportTimedFutureResult();
750 +    }
751 +
752      /**
753       * Possibly executes other tasks until this task is ready, then
754       * returns the result of the computation.  This method may be more
# Line 747 | Line 758 | public abstract class ForkJoinTask<V> im
758       * while helping. (This usually holds for pure divide-and-conquer
759       * tasks). This method may be invoked only from within
760       * ForkJoinTask computations. Attempts to invoke in other contexts
761 <     * resul!t in exceptions or errors including ClassCastException.
761 >     * resul!t in exceptions or errors possibly including ClassCastException.
762       * @return the computed result
763       */
764      public final V helpJoin() {
765          ForkJoinWorkerThread w = (ForkJoinWorkerThread)(Thread.currentThread());
766          if (status < 0 || !w.unpushTask(this) || !tryExec())
767 <            reportException(w.helpJoinTask(this));
767 >            reportException(busyJoin(w));
768          return getRawResult();
769      }
770  
# Line 761 | Line 772 | public abstract class ForkJoinTask<V> im
772       * Possibly executes other tasks until this task is ready.  This
773       * method may be invoked only from within ForkJoinTask
774       * computations. Attempts to invoke in other contexts resul!t in
775 <     * exceptions or errors including ClassCastException.
775 >     * exceptions or errors possibly including ClassCastException.
776       */
777      public final void quietlyHelpJoin() {
778          if (status >= 0) {
779              ForkJoinWorkerThread w =
780                  (ForkJoinWorkerThread)(Thread.currentThread());
781              if (!w.unpushTask(this) || !tryQuietlyInvoke())
782 <                w.helpJoinTask(this);
782 >                busyJoin(w);
783          }
784      }
785  
# Line 799 | Line 810 | public abstract class ForkJoinTask<V> im
810      }
811  
812      /**
813 +     * Possibly executes tasks until the pool hosting the current task
814 +     * {@link ForkJoinPool#isQuiescent}. This method may be of use in
815 +     * designs in which many tasks are forked, but none are explicitly
816 +     * joined, instead executing them until all are processed.
817 +     */
818 +    public static void helpQuiesce() {
819 +        ((ForkJoinWorkerThread)(Thread.currentThread())).
820 +            helpQuiescePool();
821 +    }
822 +
823 +    /**
824       * Resets the internal bookkeeping state of this task, allowing a
825       * subsequent <code>fork</code>. This method allows repeated reuse of
826       * this task, but only if reuse occurs when this task has either
# Line 833 | Line 855 | public abstract class ForkJoinTask<V> im
855       * alternative local processing of tasks that could have been, but
856       * were not, stolen. This method may be invoked only from within
857       * ForkJoinTask computations. Attempts to invoke in other contexts
858 <     * result in exceptions or errors including ClassCastException.
858 >     * result in exceptions or errors possibly including ClassCastException.
859       * @return true if unforked
860       */
861      public boolean tryUnfork() {
# Line 841 | Line 863 | public abstract class ForkJoinTask<V> im
863      }
864  
865      /**
844     * Possibly executes tasks until the pool hosting the current task
845     * {@link ForkJoinPool#isQuiescent}. This method may be of use in
846     * designs in which many tasks are forked, but none are explicitly
847     * joined, instead executing them until all are processed.
848     */
849    public static void helpQuiesce() {
850        ((ForkJoinWorkerThread)(Thread.currentThread())).
851            helpQuiescePool();
852    }
853
854    /**
866       * Returns an estimate of the number of tasks that have been
867       * forked by the current worker thread but not yet executed. This
868       * value may be useful for heuristic decisions about whether to
# Line 921 | Line 932 | public abstract class ForkJoinTask<V> im
932       * actually be polled or executed next.
933       * This method is designed primarily to support extensions,
934       * and is unlikely to be useful otherwise.
935 +     * This method may be invoked only from within
936 +     * ForkJoinTask computations. Attempts to invoke in other contexts
937 +     * result in exceptions or errors possibly including ClassCastException.
938       *
939       * @return the next task, or null if none are available
940       */
# Line 933 | Line 947 | public abstract class ForkJoinTask<V> im
947       * recently forked by the current thread but not yet executed.
948       * This method is designed primarily to support extensions,
949       * and is unlikely to be useful otherwise.
950 +     * This method may be invoked only from within
951 +     * ForkJoinTask computations. Attempts to invoke in other contexts
952 +     * result in exceptions or errors possibly including ClassCastException.
953       *
954       * @return the next task, or null if none are available
955       */
# Line 946 | Line 963 | public abstract class ForkJoinTask<V> im
963       * one is available, or if not available, a task that was forked
964       * by some other thread, if available. Availability may be
965       * transient, so a <code>null</code> result does not necessarily
966 <     * imply quiecence of the pool this task is operating in.
966 >     * imply quiecence of the pool this task is operating in.
967       * This method is designed primarily to support extensions,
968       * and is unlikely to be useful otherwise.
969 <     *
969 >     * This method may be invoked only from within
970 >     * ForkJoinTask computations. Attempts to invoke in other contexts
971 >     * result in exceptions or errors possibly including ClassCastException.
972 >     *
973       * @return a task, or null if none are available
974       */
975      protected static ForkJoinTask<?> pollTask() {
976          return ((ForkJoinWorkerThread)(Thread.currentThread())).
977 <            getLocalOrStolenTask();
977 >            pollTask();
978      }
979  
980      // Serialization support
# Line 989 | Line 1009 | public abstract class ForkJoinTask<V> im
1009      }
1010  
1011      // Temporary Unsafe mechanics for preliminary release
1012 +    private static Unsafe getUnsafe() throws Throwable {
1013 +        try {
1014 +            return Unsafe.getUnsafe();
1015 +        } catch (SecurityException se) {
1016 +            try {
1017 +                return java.security.AccessController.doPrivileged
1018 +                    (new java.security.PrivilegedExceptionAction<Unsafe>() {
1019 +                        public Unsafe run() throws Exception {
1020 +                            return getUnsafePrivileged();
1021 +                        }});
1022 +            } catch (java.security.PrivilegedActionException e) {
1023 +                throw e.getCause();
1024 +            }
1025 +        }
1026 +    }
1027 +
1028 +    private static Unsafe getUnsafePrivileged()
1029 +            throws NoSuchFieldException, IllegalAccessException {
1030 +        Field f = Unsafe.class.getDeclaredField("theUnsafe");
1031 +        f.setAccessible(true);
1032 +        return (Unsafe) f.get(null);
1033 +    }
1034 +
1035 +    private static long fieldOffset(String fieldName)
1036 +            throws NoSuchFieldException {
1037 +        return _unsafe.objectFieldOffset
1038 +            (ForkJoinTask.class.getDeclaredField(fieldName));
1039 +    }
1040  
1041      static final Unsafe _unsafe;
1042      static final long statusOffset;
1043  
1044      static {
1045          try {
1046 <            if (ForkJoinTask.class.getClassLoader() != null) {
1047 <                Field f = Unsafe.class.getDeclaredField("theUnsafe");
1048 <                f.setAccessible(true);
1049 <                _unsafe = (Unsafe)f.get(null);
1050 <            }
1003 <            else
1004 <                _unsafe = Unsafe.getUnsafe();
1005 <            statusOffset = _unsafe.objectFieldOffset
1006 <                (ForkJoinTask.class.getDeclaredField("status"));
1007 <        } catch (Exception ex) { throw new Error(ex); }
1046 >            _unsafe = getUnsafe();
1047 >            statusOffset = fieldOffset("status");
1048 >        } catch (Throwable e) {
1049 >            throw new RuntimeException("Could not initialize intrinsics", e);
1050 >        }
1051      }
1052  
1053   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines