ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinTask.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinTask.java (file contents):
Revision 1.9 by jsr166, Mon Jul 20 22:26:03 2009 UTC vs.
Revision 1.27 by dl, Sun Aug 2 11:54:31 2009 UTC

# Line 5 | Line 5
5   */
6  
7   package jsr166y;
8 < import java.io.Serializable;
9 < import java.util.*;
8 >
9   import java.util.concurrent.*;
10 < import java.util.concurrent.atomic.*;
11 < import sun.misc.Unsafe;
12 < import java.lang.reflect.*;
10 >
11 > import java.io.Serializable;
12 > import java.util.Collection;
13 > import java.util.Collections;
14 > import java.util.List;
15 > import java.util.Map;
16 > import java.util.WeakHashMap;
17  
18   /**
19 < * Abstract base class for tasks that run within a {@link
20 < * ForkJoinPool}.  A ForkJoinTask is a thread-like entity that is much
19 > * Abstract base class for tasks that run within a {@link ForkJoinPool}.
20 > * A {@code ForkJoinTask} is a thread-like entity that is much
21   * lighter weight than a normal thread.  Huge numbers of tasks and
22   * subtasks may be hosted by a small number of actual threads in a
23   * ForkJoinPool, at the price of some usage limitations.
# Line 22 | Line 25 | import java.lang.reflect.*;
25   * <p> A "main" ForkJoinTask begins execution when submitted to a
26   * {@link ForkJoinPool}. Once started, it will usually in turn start
27   * other subtasks.  As indicated by the name of this class, many
28 < * programs using ForkJoinTasks employ only methods {@code fork}
29 < * and {@code join}, or derivatives such as
30 < * {@code invokeAll}.  However, this class also provides a number
31 < * of other methods that can come into play in advanced usages, as
32 < * well as extension mechanics that allow support of new forms of
30 < * fork/join processing.
28 > * programs using ForkJoinTasks employ only methods {@code fork} and
29 > * {@code join}, or derivatives such as {@code invokeAll}.  However,
30 > * this class also provides a number of other methods that can come
31 > * into play in advanced usages, as well as extension mechanics that
32 > * allow support of new forms of fork/join processing.
33   *
34   * <p>A ForkJoinTask is a lightweight form of {@link Future}.  The
35   * efficiency of ForkJoinTasks stems from a set of restrictions (that
# Line 74 | Line 76 | import java.lang.reflect.*;
76   *
77   * <p> The ForkJoinTask class is not usually directly subclassed.
78   * Instead, you subclass one of the abstract classes that support a
79 < * particular style of fork/join processing.  Normally, a concrete
79 > * particular style of fork/join processing, typically {@link
80 > * RecursiveAction} for computations that do not return results, or
81 > * {@link RecursiveTask} for those that do.  Normally, a concrete
82   * ForkJoinTask subclass declares fields comprising its parameters,
83   * established in a constructor, and then defines a {@code compute}
84   * method that somehow uses the control methods supplied by this base
85   * class. While these methods have {@code public} access (to allow
86   * instances of different task subclasses to call each others
87   * methods), some of them may only be called from within other
88 < * ForkJoinTasks. Attempts to invoke them in other contexts result in
89 < * exceptions or errors possibly including ClassCastException.
88 > * ForkJoinTasks (as may be determined using method {@link
89 > * #inForkJoinPool}).  Attempts to invoke them in other contexts
90 > * result in exceptions or errors, possibly including
91 > * ClassCastException.
92   *
93   * <p>Most base support methods are {@code final} because their
94   * implementations are intrinsically tied to the underlying
95   * lightweight task scheduling framework, and so cannot be overridden.
96   * Developers creating new basic styles of fork/join processing should
97   * minimally implement {@code protected} methods
98 < * {@code exec}, {@code setRawResult}, and
99 < * {@code getRawResult}, while also introducing an abstract
98 > * {@link #exec}, {@link #setRawResult}, and
99 > * {@link #getRawResult}, while also introducing an abstract
100   * computational method that can be implemented in its subclasses,
101   * possibly relying on other {@code protected} methods provided
102   * by this class.
# Line 102 | Line 108 | import java.lang.reflect.*;
108   * parallelism cannot improve throughput. If too small, then memory
109   * and internal task maintenance overhead may overwhelm processing.
110   *
111 + * <p>This class provides {@code adapt} methods for {@link
112 + * java.lang.Runnable} and {@link java.util.concurrent.Callable}, that
113 + * may be of use when mixing execution of ForkJoinTasks with other
114 + * kinds of tasks. When all tasks are of this form, consider using a
115 + * pool in {@link ForkJoinPool#setAsyncMode}.
116 + *
117   * <p>ForkJoinTasks are {@code Serializable}, which enables them
118   * to be used in extensions such as remote execution frameworks. It is
119   * in general sensible to serialize tasks only before or after, but
120   * not during execution. Serialization is not relied on during
121   * execution itself.
122 + *
123 + * @since 1.7
124 + * @author Doug Lea
125   */
126   public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
127  
# Line 141 | Line 156 | public abstract class ForkJoinTask<V> im
156      /**
157       * Table of exceptions thrown by tasks, to enable reporting by
158       * callers. Because exceptions are rare, we don't directly keep
159 <     * them with task objects, but instead us a weak ref table.  Note
159 >     * them with task objects, but instead use a weak ref table.  Note
160       * that cancellation exceptions don't appear in the table, but are
161       * instead recorded as status values.
162 <     * Todo: Use ConcurrentReferenceHashMap
162 >     * TODO: Use ConcurrentReferenceHashMap
163       */
164      static final Map<ForkJoinTask<?>, Throwable> exceptionMap =
165          Collections.synchronizedMap
# Line 153 | Line 168 | public abstract class ForkJoinTask<V> im
168      // within-package utilities
169  
170      /**
171 <     * Get current worker thread, or null if not a worker thread
171 >     * Gets current worker thread, or null if not a worker thread.
172       */
173      static ForkJoinWorkerThread getWorker() {
174          Thread t = Thread.currentThread();
175 <        return ((t instanceof ForkJoinWorkerThread)?
176 <                (ForkJoinWorkerThread)t : null);
175 >        return ((t instanceof ForkJoinWorkerThread) ?
176 >                (ForkJoinWorkerThread) t : null);
177      }
178  
179      final boolean casStatus(int cmp, int val) {
180 <        return _unsafe.compareAndSwapInt(this, statusOffset, cmp, val);
180 >        return UNSAFE.compareAndSwapInt(this, statusOffset, cmp, val);
181      }
182  
183      /**
# Line 170 | Line 185 | public abstract class ForkJoinTask<V> im
185       */
186      static void rethrowException(Throwable ex) {
187          if (ex != null)
188 <            _unsafe.throwException(ex);
188 >            UNSAFE.throwException(ex);
189      }
190  
191      // Setting completion status
192  
193      /**
194 <     * Mark completion and wake up threads waiting to join this task.
194 >     * Marks completion and wakes up threads waiting to join this task.
195 >     *
196       * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
197       */
198      final void setCompletion(int completion) {
199          ForkJoinPool pool = getPool();
200          if (pool != null) {
201              int s; // Clear signal bits while setting completion status
202 <            do;while ((s = status) >= 0 && !casStatus(s, completion));
202 >            do {} while ((s = status) >= 0 && !casStatus(s, completion));
203  
204              if ((s & SIGNAL_MASK) != 0) {
205                  if ((s &= INTERNAL_SIGNAL_MASK) != 0)
206                      pool.updateRunningCount(s);
207 <                synchronized(this) { notifyAll(); }
207 >                synchronized (this) { notifyAll(); }
208              }
209          }
210          else
# Line 201 | Line 217 | public abstract class ForkJoinTask<V> im
217       */
218      private void externallySetCompletion(int completion) {
219          int s;
220 <        do;while ((s = status) >= 0 &&
221 <                  !casStatus(s, (s & SIGNAL_MASK) | completion));
222 <        synchronized(this) { notifyAll(); }
220 >        do {} while ((s = status) >= 0 &&
221 >                     !casStatus(s, (s & SIGNAL_MASK) | completion));
222 >        synchronized (this) { notifyAll(); }
223      }
224  
225      /**
226 <     * Sets status to indicate normal completion
226 >     * Sets status to indicate normal completion.
227       */
228      final void setNormalCompletion() {
229          // Try typical fast case -- single CAS, no signal, not already done.
230          // Manually expand casStatus to improve chances of inlining it
231 <        if (!_unsafe.compareAndSwapInt(this, statusOffset, 0, NORMAL))
231 >        if (!UNSAFE.compareAndSwapInt(this, statusOffset, 0, NORMAL))
232              setCompletion(NORMAL);
233      }
234  
235      // internal waiting and notification
236  
237      /**
238 <     * Performs the actual monitor wait for awaitDone
238 >     * Performs the actual monitor wait for awaitDone.
239       */
240      private void doAwaitDone() {
241          // Minimize lock bias and in/de-flation effects by maximizing
242          // chances of waiting inside sync
243          try {
244              while (status >= 0)
245 <                synchronized(this) { if (status >= 0) wait(); }
245 >                synchronized (this) { if (status >= 0) wait(); }
246          } catch (InterruptedException ie) {
247              onInterruptedWait();
248          }
249      }
250  
251      /**
252 <     * Performs the actual monitor wait for awaitDone
252 >     * Performs the actual timed monitor wait for awaitDone.
253       */
254      private void doAwaitDone(long startTime, long nanos) {
255 <        synchronized(this) {
255 >        synchronized (this) {
256              try {
257                  while (status >= 0) {
258 <                    long nt = nanos - System.nanoTime() - startTime;
258 >                    long nt = nanos - (System.nanoTime() - startTime);
259                      if (nt <= 0)
260                          break;
261 <                    wait(nt / 1000000, (int)(nt % 1000000));
261 >                    wait(nt / 1000000, (int) (nt % 1000000));
262                  }
263              } catch (InterruptedException ie) {
264                  onInterruptedWait();
# Line 255 | Line 271 | public abstract class ForkJoinTask<V> im
271      /**
272       * Sets status to indicate there is joiner, then waits for join,
273       * surrounded with pool notifications.
274 +     *
275       * @return status upon exit
276       */
277 <    private int awaitDone(ForkJoinWorkerThread w, boolean maintainParallelism) {
278 <        ForkJoinPool pool = w == null? null : w.pool;
277 >    private int awaitDone(ForkJoinWorkerThread w,
278 >                          boolean maintainParallelism) {
279 >        ForkJoinPool pool = (w == null) ? null : w.pool;
280          int s;
281          while ((s = status) >= 0) {
282 <            if (casStatus(s, pool == null? s|EXTERNAL_SIGNAL : s+1)) {
282 >            if (casStatus(s, (pool == null) ? s|EXTERNAL_SIGNAL : s+1)) {
283                  if (pool == null || !pool.preJoin(this, maintainParallelism))
284                      doAwaitDone();
285                  if (((s = status) & INTERNAL_SIGNAL_MASK) != 0)
# Line 274 | Line 292 | public abstract class ForkJoinTask<V> im
292  
293      /**
294       * Timed version of awaitDone
295 +     *
296       * @return status upon exit
297       */
298      private int awaitDone(ForkJoinWorkerThread w, long nanos) {
299 <        ForkJoinPool pool = w == null? null : w.pool;
299 >        ForkJoinPool pool = (w == null) ? null : w.pool;
300          int s;
301          while ((s = status) >= 0) {
302 <            if (casStatus(s, pool == null? s|EXTERNAL_SIGNAL : s+1)) {
302 >            if (casStatus(s, (pool == null) ? s|EXTERNAL_SIGNAL : s+1)) {
303                  long startTime = System.nanoTime();
304                  if (pool == null || !pool.preJoin(this, false))
305                      doAwaitDone(startTime, nanos);
# Line 297 | Line 316 | public abstract class ForkJoinTask<V> im
316      }
317  
318      /**
319 <     * Notify pool that thread is unblocked. Called by signalled
319 >     * Notifies pool that thread is unblocked. Called by signalled
320       * threads when woken by non-FJ threads (which is atypical).
321       */
322      private void adjustPoolCountsOnUnblock(ForkJoinPool pool) {
323          int s;
324 <        do;while ((s = status) < 0 && !casStatus(s, s & COMPLETION_MASK));
324 >        do {} while ((s = status) < 0 && !casStatus(s, s & COMPLETION_MASK));
325          if (pool != null && (s &= INTERNAL_SIGNAL_MASK) != 0)
326              pool.updateRunningCount(s);
327      }
328  
329      /**
330 <     * Notify pool to adjust counts on cancelled or timed out wait
330 >     * Notifies pool to adjust counts on cancelled or timed out wait.
331       */
332      private void adjustPoolCountsOnCancelledWait(ForkJoinPool pool) {
333          if (pool != null) {
# Line 323 | Line 342 | public abstract class ForkJoinTask<V> im
342      }
343  
344      /**
345 <     * Handle interruptions during waits.
345 >     * Handles interruptions during waits.
346       */
347      private void onInterruptedWait() {
348          ForkJoinWorkerThread w = getWorker();
# Line 342 | Line 361 | public abstract class ForkJoinTask<V> im
361      }
362  
363      /**
364 <     * Throws the exception associated with status s;
364 >     * Throws the exception associated with status s.
365 >     *
366       * @throws the exception
367       */
368      private void reportException(int s) {
# Line 355 | Line 375 | public abstract class ForkJoinTask<V> im
375      }
376  
377      /**
378 <     * Returns result or throws exception using j.u.c.Future conventions
379 <     * Only call when isDone known to be true.
378 >     * Returns result or throws exception using j.u.c.Future conventions.
379 >     * Only call when {@code isDone} known to be true.
380       */
381      private V reportFutureResult()
382          throws ExecutionException, InterruptedException {
# Line 375 | Line 395 | public abstract class ForkJoinTask<V> im
395  
396      /**
397       * Returns result or throws exception using j.u.c.Future conventions
398 <     * with timeouts
398 >     * with timeouts.
399       */
400      private V reportTimedFutureResult()
401          throws InterruptedException, ExecutionException, TimeoutException {
# Line 396 | Line 416 | public abstract class ForkJoinTask<V> im
416  
417      /**
418       * Calls exec, recording completion, and rethrowing exception if
419 <     * encountered. Caller should normally check status before calling
419 >     * encountered. Caller should normally check status before calling.
420 >     *
421       * @return true if completed normally
422       */
423      private boolean tryExec() {
# Line 414 | Line 435 | public abstract class ForkJoinTask<V> im
435  
436      /**
437       * Main execution method used by worker threads. Invokes
438 <     * base computation unless already complete
438 >     * base computation unless already complete.
439       */
440      final void quietlyExec() {
441          if (status >= 0) {
442              try {
443                  if (!exec())
444                      return;
445 <            } catch(Throwable rex) {
445 >            } catch (Throwable rex) {
446                  setDoneExceptionally(rex);
447                  return;
448              }
# Line 430 | Line 451 | public abstract class ForkJoinTask<V> im
451      }
452  
453      /**
454 <     * Calls exec, recording but not rethrowing exception
455 <     * Caller should normally check status before calling
454 >     * Calls exec(), recording but not rethrowing exception.
455 >     * Caller should normally check status before calling.
456 >     *
457       * @return true if completed normally
458       */
459      private boolean tryQuietlyInvoke() {
# Line 447 | Line 469 | public abstract class ForkJoinTask<V> im
469      }
470  
471      /**
472 <     * Cancel, ignoring any exceptions it throws
472 >     * Cancels, ignoring any exceptions it throws.
473       */
474      final void cancelIgnoringExceptions() {
475          try {
476              cancel(false);
477 <        } catch(Throwable ignore) {
477 >        } catch (Throwable ignore) {
478          }
479      }
480  
# Line 464 | Line 486 | public abstract class ForkJoinTask<V> im
486          ForkJoinTask<?> t;
487          while ((s = status) >= 0 && (t = w.scanWhileJoining(this)) != null)
488              t.quietlyExec();
489 <        return (s >= 0)? awaitDone(w, false) : s; // block if no work
489 >        return (s >= 0) ? awaitDone(w, false) : s; // block if no work
490      }
491  
492      // public methods
# Line 474 | Line 496 | public abstract class ForkJoinTask<V> im
496       * necessarily enforced, it is a usage error to fork a task more
497       * than once unless it has completed and been reinitialized.  This
498       * method may be invoked only from within ForkJoinTask
499 <     * computations. Attempts to invoke in other contexts result in
500 <     * exceptions or errors possibly including ClassCastException.
499 >     * computations (as may be determined using method {@link
500 >     * #inForkJoinPool}). Attempts to invoke in other contexts result
501 >     * in exceptions or errors, possibly including ClassCastException.
502 >     *
503 >     * @return {@code this}, to simplify usage.
504       */
505 <    public final void fork() {
506 <        ((ForkJoinWorkerThread)(Thread.currentThread())).pushTask(this);
505 >    public final ForkJoinTask<V> fork() {
506 >        ((ForkJoinWorkerThread) Thread.currentThread())
507 >            .pushTask(this);
508 >        return this;
509      }
510  
511      /**
# Line 499 | Line 526 | public abstract class ForkJoinTask<V> im
526      /**
527       * Commences performing this task, awaits its completion if
528       * necessary, and return its result.
529 +     *
530       * @throws Throwable (a RuntimeException, Error, or unchecked
531 <     * exception) if the underlying computation did so.
531 >     * exception) if the underlying computation did so
532       * @return the computed result
533       */
534      public final V invoke() {
# Line 511 | Line 539 | public abstract class ForkJoinTask<V> im
539      }
540  
541      /**
542 <     * Forks both tasks, returning when {@code isDone} holds for
543 <     * both of them or an exception is encountered. This method may be
544 <     * invoked only from within ForkJoinTask computations. Attempts to
545 <     * invoke in other contexts result in exceptions or errors
542 >     * Forks the given tasks, returning when {@code isDone} holds for
543 >     * each task or an exception is encountered. This method may be
544 >     * invoked only from within ForkJoinTask computations (as may be
545 >     * determined using method {@link #inForkJoinPool}). Attempts to
546 >     * invoke in other contexts result in exceptions or errors,
547       * possibly including ClassCastException.
548 <     * @param t1 one task
549 <     * @param t2 the other task
550 <     * @throws NullPointerException if t1 or t2 are null
551 <     * @throws RuntimeException or Error if either task did so.
548 >     *
549 >     * @param t1 the first task
550 >     * @param t2 the second task
551 >     * @throws NullPointerException if any task is null
552 >     * @throws RuntimeException or Error if a task did so
553       */
554      public static void invokeAll(ForkJoinTask<?>t1, ForkJoinTask<?> t2) {
555          t2.fork();
# Line 528 | Line 558 | public abstract class ForkJoinTask<V> im
558      }
559  
560      /**
561 <     * Forks the given tasks, returning when {@code isDone} holds
562 <     * for all of them. If any task encounters an exception, others
563 <     * may be cancelled.  This method may be invoked only from within
564 <     * ForkJoinTask computations. Attempts to invoke in other contexts
565 <     * result in exceptions or errors possibly including ClassCastException.
566 <     * @param tasks the array of tasks
567 <     * @throws NullPointerException if tasks or any element are null.
568 <     * @throws RuntimeException or Error if any task did so.
561 >     * Forks the given tasks, returning when {@code isDone} holds for
562 >     * each task or an exception is encountered. If any task
563 >     * encounters an exception, others may be, but are not guaranteed
564 >     * to be, cancelled.  This method may be invoked only from within
565 >     * ForkJoinTask computations (as may be determined using method
566 >     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
567 >     * result in exceptions or errors, possibly including
568 >     * ClassCastException.
569 >     *
570 >     * Overloadings of this method exist for the special cases
571 >     * of one to four arguments.
572 >     *
573 >     * @param tasks the tasks
574 >     * @throws NullPointerException if tasks or any element are null
575 >     * @throws RuntimeException or Error if any task did so
576       */
577      public static void invokeAll(ForkJoinTask<?>... tasks) {
578          Throwable ex = null;
# Line 571 | Line 608 | public abstract class ForkJoinTask<V> im
608      }
609  
610      /**
611 <     * Forks all tasks in the collection, returning when
612 <     * {@code isDone} holds for all of them. If any task
613 <     * encounters an exception, others may be cancelled.  This method
614 <     * may be invoked only from within ForkJoinTask
615 <     * computations. Attempts to invoke in other contexts result in
616 <     * exceptions or errors possibly including ClassCastException.
611 >     * Forks all tasks in the collection, returning when {@code
612 >     * isDone} holds for each task or an exception is encountered. If
613 >     * any task encounters an exception, others may be, but are not
614 >     * guaranteed to be, cancelled.  This method may be invoked only
615 >     * from within ForkJoinTask computations (as may be determined
616 >     * using method {@link #inForkJoinPool}). Attempts to invoke in
617 >     * other contexts result in exceptions or errors, possibly
618 >     * including ClassCastException.
619 >     *
620       * @param tasks the collection of tasks
621 <     * @throws NullPointerException if tasks or any element are null.
622 <     * @throws RuntimeException or Error if any task did so.
623 <     */
624 <    public static void invokeAll(Collection<? extends ForkJoinTask<?>> tasks) {
625 <        if (!(tasks instanceof List)) {
626 <            invokeAll(tasks.toArray(new ForkJoinTask[tasks.size()]));
627 <            return;
621 >     * @return the tasks argument, to simplify usage
622 >     * @throws NullPointerException if tasks or any element are null
623 >     * @throws RuntimeException or Error if any task did so
624 >     */
625 >    public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) {
626 >        if (!(tasks instanceof List<?>)) {
627 >            invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()]));
628 >            return tasks;
629          }
630 +        @SuppressWarnings("unchecked")
631          List<? extends ForkJoinTask<?>> ts =
632 <            (List<? extends ForkJoinTask<?>>)tasks;
632 >            (List<? extends ForkJoinTask<?>>) tasks;
633          Throwable ex = null;
634          int last = ts.size() - 1;
635          for (int i = last; i >= 0; --i) {
# Line 618 | Line 660 | public abstract class ForkJoinTask<V> im
660          }
661          if (ex != null)
662              rethrowException(ex);
663 +        return tasks;
664      }
665  
666      /**
667 <     * Returns true if the computation performed by this task has
668 <     * completed (or has been cancelled).
669 <     * @return true if this computation has completed
667 >     * Returns {@code true} if the computation performed by this task
668 >     * has completed (or has been cancelled).
669 >     *
670 >     * @return {@code true} if this computation has completed
671       */
672      public final boolean isDone() {
673          return status < 0;
674      }
675  
676      /**
677 <     * Returns true if this task was cancelled.
678 <     * @return true if this task was cancelled
677 >     * Returns {@code true} if this task was cancelled.
678 >     *
679 >     * @return {@code true} if this task was cancelled
680       */
681      public final boolean isCancelled() {
682          return (status & COMPLETION_MASK) == CANCELLED;
# Line 640 | Line 685 | public abstract class ForkJoinTask<V> im
685      /**
686       * Asserts that the results of this task's computation will not be
687       * used. If a cancellation occurs before attempting to execute this
688 <     * task, then execution will be suppressed, {@code isCancelled}
689 <     * will report true, and {@code join} will result in a
688 >     * task, execution will be suppressed, {@link #isCancelled}
689 >     * will report true, and {@link #join} will result in a
690       * {@code CancellationException} being thrown. Otherwise, when
691       * cancellation races with completion, there are no guarantees
692 <     * about whether {@code isCancelled} will report true, whether
693 <     * {@code join} will return normally or via an exception, or
694 <     * whether these behaviors will remain consistent upon repeated
692 >     * about whether {@code isCancelled} will report {@code true},
693 >     * whether {@code join} will return normally or via an exception,
694 >     * or whether these behaviors will remain consistent upon repeated
695       * invocation.
696       *
697       * <p>This method may be overridden in subclasses, but if so, must
# Line 656 | Line 701 | public abstract class ForkJoinTask<V> im
701       * <p> This method is designed to be invoked by <em>other</em>
702       * tasks. To terminate the current task, you can just return or
703       * throw an unchecked exception from its computation method, or
704 <     * invoke {@code completeExceptionally}.
704 >     * invoke {@link #completeExceptionally}.
705       *
706       * @param mayInterruptIfRunning this value is ignored in the
707       * default implementation because tasks are not in general
708 <     * cancelled via interruption.
708 >     * cancelled via interruption
709       *
710 <     * @return true if this task is now cancelled
710 >     * @return {@code true} if this task is now cancelled
711       */
712      public boolean cancel(boolean mayInterruptIfRunning) {
713          setCompletion(CANCELLED);
# Line 670 | Line 715 | public abstract class ForkJoinTask<V> im
715      }
716  
717      /**
718 <     * Returns true if this task threw an exception or was cancelled
719 <     * @return true if this task threw an exception or was cancelled
718 >     * Returns {@code true} if this task threw an exception or was cancelled.
719 >     *
720 >     * @return {@code true} if this task threw an exception or was cancelled
721       */
722      public final boolean isCompletedAbnormally() {
723          return (status & COMPLETION_MASK) < NORMAL;
# Line 681 | Line 727 | public abstract class ForkJoinTask<V> im
727       * Returns the exception thrown by the base computation, or a
728       * CancellationException if cancelled, or null if none or if the
729       * method has not yet completed.
730 <     * @return the exception, or null if none
730 >     *
731 >     * @return the exception, or {@code null} if none
732       */
733      public final Throwable getException() {
734          int s = status & COMPLETION_MASK;
# Line 698 | Line 745 | public abstract class ForkJoinTask<V> im
745       * {@code join} and related operations. This method may be used
746       * to induce exceptions in asynchronous tasks, or to force
747       * completion of tasks that would not otherwise complete.  Its use
748 <     * in other situations is likely to be wrong.  This method is
748 >     * in other situations is discouraged.  This method is
749       * overridable, but overridden versions must invoke {@code super}
750       * implementation to maintain guarantees.
751       *
# Line 708 | Line 755 | public abstract class ForkJoinTask<V> im
755       */
756      public void completeExceptionally(Throwable ex) {
757          setDoneExceptionally((ex instanceof RuntimeException) ||
758 <                             (ex instanceof Error)? ex :
758 >                             (ex instanceof Error) ? ex :
759                               new RuntimeException(ex));
760      }
761  
# Line 718 | Line 765 | public abstract class ForkJoinTask<V> im
765       * operations. This method may be used to provide results for
766       * asynchronous tasks, or to provide alternative handling for
767       * tasks that would not otherwise complete normally. Its use in
768 <     * other situations is likely to be wrong. This method is
768 >     * other situations is discouraged. This method is
769       * overridable, but overridden versions must invoke {@code super}
770       * implementation to maintain guarantees.
771       *
772 <     * @param value the result value for this task.
772 >     * @param value the result value for this task
773       */
774      public void complete(V value) {
775          try {
776              setRawResult(value);
777 <        } catch(Throwable rex) {
777 >        } catch (Throwable rex) {
778              setDoneExceptionally(rex);
779              return;
780          }
# Line 743 | Line 790 | public abstract class ForkJoinTask<V> im
790  
791      public final V get(long timeout, TimeUnit unit)
792          throws InterruptedException, ExecutionException, TimeoutException {
793 +        long nanos = unit.toNanos(timeout);
794          ForkJoinWorkerThread w = getWorker();
795          if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
796 <            awaitDone(w, unit.toNanos(timeout));
796 >            awaitDone(w, nanos);
797          return reportTimedFutureResult();
798      }
799  
# Line 757 | Line 805 | public abstract class ForkJoinTask<V> im
805       * current task and that of any other task that might be executed
806       * while helping. (This usually holds for pure divide-and-conquer
807       * tasks). This method may be invoked only from within
808 <     * ForkJoinTask computations. Attempts to invoke in other contexts
809 <     * result in exceptions or errors possibly including ClassCastException.
808 >     * ForkJoinTask computations (as may be determined using method
809 >     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
810 >     * result in exceptions or errors, possibly including
811 >     * ClassCastException.
812 >     *
813       * @return the computed result
814       */
815      public final V helpJoin() {
816 <        ForkJoinWorkerThread w = (ForkJoinWorkerThread)(Thread.currentThread());
816 >        ForkJoinWorkerThread w = (ForkJoinWorkerThread) Thread.currentThread();
817          if (status < 0 || !w.unpushTask(this) || !tryExec())
818              reportException(busyJoin(w));
819          return getRawResult();
# Line 771 | Line 822 | public abstract class ForkJoinTask<V> im
822      /**
823       * Possibly executes other tasks until this task is ready.  This
824       * method may be invoked only from within ForkJoinTask
825 <     * computations. Attempts to invoke in other contexts result in
826 <     * exceptions or errors possibly including ClassCastException.
825 >     * computations (as may be determined using method {@link
826 >     * #inForkJoinPool}). Attempts to invoke in other contexts result
827 >     * in exceptions or errors, possibly including ClassCastException.
828       */
829      public final void quietlyHelpJoin() {
830          if (status >= 0) {
831              ForkJoinWorkerThread w =
832 <                (ForkJoinWorkerThread)(Thread.currentThread());
832 >                (ForkJoinWorkerThread) Thread.currentThread();
833              if (!w.unpushTask(this) || !tryQuietlyInvoke())
834                  busyJoin(w);
835          }
# Line 813 | Line 865 | public abstract class ForkJoinTask<V> im
865       * Possibly executes tasks until the pool hosting the current task
866       * {@link ForkJoinPool#isQuiescent}. This method may be of use in
867       * designs in which many tasks are forked, but none are explicitly
868 <     * joined, instead executing them until all are processed.
868 >     * joined, instead executing them until all are processed.  This
869 >     * method may be invoked only from within ForkJoinTask
870 >     * computations (as may be determined using method {@link
871 >     * #inForkJoinPool}). Attempts to invoke in other contexts result
872 >     * in exceptions or errors, possibly including ClassCastException.
873       */
874      public static void helpQuiesce() {
875 <        ((ForkJoinWorkerThread)(Thread.currentThread())).
876 <            helpQuiescePool();
875 >        ((ForkJoinWorkerThread) Thread.currentThread())
876 >            .helpQuiescePool();
877      }
878  
879      /**
# Line 827 | Line 883 | public abstract class ForkJoinTask<V> im
883       * never been forked, or has been forked, then completed and all
884       * outstanding joins of this task have also completed. Effects
885       * under any other usage conditions are not guaranteed, and are
886 <     * almost surely wrong. This method may be useful when executing
886 >     * discouraged. This method may be useful when executing
887       * pre-constructed trees of subtasks in loops.
888       */
889      public void reinitialize() {
# Line 838 | Line 894 | public abstract class ForkJoinTask<V> im
894  
895      /**
896       * Returns the pool hosting the current task execution, or null
897 <     * if this task is executing outside of any pool.
898 <     * @return the pool, or null if none.
897 >     * if this task is executing outside of any ForkJoinPool.
898 >     *
899 >     * @see #inForkJoinPool
900 >     * @return the pool, or {@code null} if none
901       */
902      public static ForkJoinPool getPool() {
903          Thread t = Thread.currentThread();
904 <        return ((t instanceof ForkJoinWorkerThread)?
905 <                ((ForkJoinWorkerThread)t).pool : null);
904 >        return (t instanceof ForkJoinWorkerThread) ?
905 >            ((ForkJoinWorkerThread) t).pool : null;
906 >    }
907 >
908 >    /**
909 >     * Returns {@code true} if the current thread is executing as a
910 >     * ForkJoinPool computation.
911 >     *
912 >     * @return {@code true} if the current thread is executing as a
913 >     * ForkJoinPool computation, or false otherwise
914 >     */
915 >    public static boolean inForkJoinPool() {
916 >        return Thread.currentThread() instanceof ForkJoinWorkerThread;
917      }
918  
919      /**
# Line 854 | Line 923 | public abstract class ForkJoinTask<V> im
923       * another thread.  This method may be useful when arranging
924       * alternative local processing of tasks that could have been, but
925       * were not, stolen. This method may be invoked only from within
926 <     * ForkJoinTask computations. Attempts to invoke in other contexts
927 <     * result in exceptions or errors possibly including ClassCastException.
928 <     * @return true if unforked
926 >     * ForkJoinTask computations (as may be determined using method
927 >     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
928 >     * result in exceptions or errors, possibly including
929 >     * ClassCastException.
930 >     *
931 >     * @return {@code true} if unforked
932       */
933      public boolean tryUnfork() {
934 <        return ((ForkJoinWorkerThread)(Thread.currentThread())).unpushTask(this);
934 >        return ((ForkJoinWorkerThread) Thread.currentThread())
935 >            .unpushTask(this);
936      }
937  
938      /**
939       * Returns an estimate of the number of tasks that have been
940       * forked by the current worker thread but not yet executed. This
941       * value may be useful for heuristic decisions about whether to
942 <     * fork other tasks.
942 >     * fork other tasks.  This method may be invoked only from within
943 >     * ForkJoinTask computations (as may be determined using method
944 >     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
945 >     * result in exceptions or errors, possibly including
946 >     * ClassCastException.
947       * @return the number of tasks
948       */
949      public static int getQueuedTaskCount() {
950 <        return ((ForkJoinWorkerThread)(Thread.currentThread())).
951 <            getQueueSize();
950 >        return ((ForkJoinWorkerThread) Thread.currentThread())
951 >            .getQueueSize();
952      }
953  
954      /**
955 <     * Returns a estimate of how many more locally queued tasks are
955 >     * Returns an estimate of how many more locally queued tasks are
956       * held by the current worker thread than there are other worker
957       * threads that might steal them.  This value may be useful for
958       * heuristic decisions about whether to fork other tasks. In many
959       * usages of ForkJoinTasks, at steady state, each worker should
960       * aim to maintain a small constant surplus (for example, 3) of
961       * tasks, and to process computations locally if this threshold is
962 <     * exceeded.
962 >     * exceeded.  This method may be invoked only from within
963 >     * ForkJoinTask computations (as may be determined using method
964 >     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
965 >     * result in exceptions or errors, possibly including
966 >     * ClassCastException.  *
967       * @return the surplus number of tasks, which may be negative
968       */
969      public static int getSurplusQueuedTaskCount() {
970 <        return ((ForkJoinWorkerThread)(Thread.currentThread()))
970 >        return ((ForkJoinWorkerThread) Thread.currentThread())
971              .getEstimatedSurplusTaskCount();
972      }
973  
974      // Extension methods
975  
976      /**
977 <     * Returns the result that would be returned by {@code join},
978 <     * even if this task completed abnormally, or null if this task is
979 <     * not known to have been completed.  This method is designed to
980 <     * aid debugging, as well as to support extensions. Its use in any
981 <     * other context is discouraged.
977 >     * Returns the result that would be returned by {@link #join}, even
978 >     * if this task completed abnormally, or {@code null} if this task
979 >     * is not known to have been completed.  This method is designed
980 >     * to aid debugging, as well as to support extensions. Its use in
981 >     * any other context is discouraged.
982       *
983 <     * @return the result, or null if not completed.
983 >     * @return the result, or {@code null} if not completed
984       */
985      public abstract V getRawResult();
986  
# Line 918 | Line 999 | public abstract class ForkJoinTask<V> im
999       * called otherwise. The return value controls whether this task
1000       * is considered to be done normally. It may return false in
1001       * asynchronous actions that require explicit invocations of
1002 <     * {@code complete} to become joinable. It may throw exceptions
1002 >     * {@link #complete} to become joinable. It may throw exceptions
1003       * to indicate abnormal exit.
1004 <     * @return true if completed normally
1004 >     *
1005 >     * @return {@code true} if completed normally
1006       * @throws Error or RuntimeException if encountered during computation
1007       */
1008      protected abstract boolean exec();
1009  
1010      /**
1011 <     * Returns, but does not unschedule or execute, the task queued by
1012 <     * the current thread but not yet executed, if one is
1011 >     * Returns, but does not unschedule or execute, a task queued by
1012 >     * the current thread but not yet executed, if one is immediately
1013       * available. There is no guarantee that this task will actually
1014 <     * be polled or executed next.  This method is designed primarily
1015 <     * to support extensions, and is unlikely to be useful otherwise.
1016 <     * This method may be invoked only from within ForkJoinTask
1017 <     * computations. Attempts to invoke in other contexts result in
1018 <     * exceptions or errors possibly including ClassCastException.
1014 >     * be polled or executed next. Conversely, this method may return
1015 >     * null even if a task exists but cannot be accessed without
1016 >     * contention with other threads.  This method is designed
1017 >     * primarily to support extensions, and is unlikely to be useful
1018 >     * otherwise.  This method may be invoked only from within
1019 >     * ForkJoinTask computations (as may be determined using method
1020 >     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1021 >     * result in exceptions or errors, possibly including
1022 >     * ClassCastException.
1023       *
1024 <     * @return the next task, or null if none are available
1024 >     * @return the next task, or {@code null} if none are available
1025       */
1026      protected static ForkJoinTask<?> peekNextLocalTask() {
1027 <        return ((ForkJoinWorkerThread)(Thread.currentThread())).peekTask();
1027 >        return ((ForkJoinWorkerThread) Thread.currentThread())
1028 >            .peekTask();
1029      }
1030  
1031      /**
# Line 946 | Line 1033 | public abstract class ForkJoinTask<V> im
1033       * queued by the current thread but not yet executed.  This method
1034       * is designed primarily to support extensions, and is unlikely to
1035       * be useful otherwise.  This method may be invoked only from
1036 <     * within ForkJoinTask computations. Attempts to invoke in other
1037 <     * contexts result in exceptions or errors possibly including
1036 >     * within ForkJoinTask computations (as may be determined using
1037 >     * method {@link #inForkJoinPool}). Attempts to invoke in other
1038 >     * contexts result in exceptions or errors, possibly including
1039       * ClassCastException.
1040       *
1041 <     * @return the next task, or null if none are available
1041 >     * @return the next task, or {@code null} if none are available
1042       */
1043      protected static ForkJoinTask<?> pollNextLocalTask() {
1044 <        return ((ForkJoinWorkerThread)(Thread.currentThread())).pollLocalTask();
1044 >        return ((ForkJoinWorkerThread) Thread.currentThread())
1045 >            .pollLocalTask();
1046      }
1047  
1048      /**
# Line 965 | Line 1054 | public abstract class ForkJoinTask<V> im
1054       * of the pool this task is operating in.  This method is designed
1055       * primarily to support extensions, and is unlikely to be useful
1056       * otherwise.  This method may be invoked only from within
1057 <     * ForkJoinTask computations. Attempts to invoke in other contexts
1058 <     * result in exceptions or errors possibly including
1057 >     * ForkJoinTask computations (as may be determined using method
1058 >     * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1059 >     * result in exceptions or errors, possibly including
1060       * ClassCastException.
1061       *
1062 <     * @return a task, or null if none are available
1062 >     * @return a task, or {@code null} if none are available
1063       */
1064      protected static ForkJoinTask<?> pollTask() {
1065 <        return ((ForkJoinWorkerThread)(Thread.currentThread())).
1066 <            pollTask();
1065 >        return ((ForkJoinWorkerThread) Thread.currentThread())
1066 >            .pollTask();
1067 >    }
1068 >
1069 >    /**
1070 >     * Adaptor for Runnables. This implements RunnableFuture
1071 >     * to be compliant with AbstractExecutorService constraints
1072 >     * when used in ForkJoinPool.
1073 >     */
1074 >    static final class AdaptedRunnable<T> extends ForkJoinTask<T>
1075 >        implements RunnableFuture<T> {
1076 >        final Runnable runnable;
1077 >        final T resultOnCompletion;
1078 >        T result;
1079 >        AdaptedRunnable(Runnable runnable, T result) {
1080 >            if (runnable == null) throw new NullPointerException();
1081 >            this.runnable = runnable;
1082 >            this.resultOnCompletion = result;
1083 >        }
1084 >        public T getRawResult() { return result; }
1085 >        public void setRawResult(T v) { result = v; }
1086 >        public boolean exec() {
1087 >            runnable.run();
1088 >            result = resultOnCompletion;
1089 >            return true;
1090 >        }
1091 >        public void run() { invoke(); }
1092 >        private static final long serialVersionUID = 5232453952276885070L;
1093 >    }
1094 >
1095 >    /**
1096 >     * Adaptor for Callables
1097 >     */
1098 >    static final class AdaptedCallable<T> extends ForkJoinTask<T>
1099 >        implements RunnableFuture<T> {
1100 >        final Callable<? extends T> callable;
1101 >        T result;
1102 >        AdaptedCallable(Callable<? extends T> callable) {
1103 >            if (callable == null) throw new NullPointerException();
1104 >            this.callable = callable;
1105 >        }
1106 >        public T getRawResult() { return result; }
1107 >        public void setRawResult(T v) { result = v; }
1108 >        public boolean exec() {
1109 >            try {
1110 >                result = callable.call();
1111 >                return true;
1112 >            } catch (Error err) {
1113 >                throw err;
1114 >            } catch (RuntimeException rex) {
1115 >                throw rex;
1116 >            } catch (Exception ex) {
1117 >                throw new RuntimeException(ex);
1118 >            }
1119 >        }
1120 >        public void run() { invoke(); }
1121 >        private static final long serialVersionUID = 2838392045355241008L;
1122 >    }
1123 >
1124 >    /**
1125 >     * Returns a new ForkJoinTask that performs the {@code run}
1126 >     * method of the given Runnable as its action, and returns a null
1127 >     * result upon {@code join}.
1128 >     *
1129 >     * @param runnable the runnable action
1130 >     * @return the task
1131 >     */
1132 >    public static ForkJoinTask<?> adapt(Runnable runnable) {
1133 >        return new AdaptedRunnable<Void>(runnable, null);
1134 >    }
1135 >
1136 >    /**
1137 >     * Returns a new ForkJoinTask that performs the {@code run}
1138 >     * method of the given Runnable as its action, and returns the
1139 >     * given result upon {@code join}.
1140 >     *
1141 >     * @param runnable the runnable action
1142 >     * @param result the result upon completion
1143 >     * @return the task
1144 >     */
1145 >    public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {
1146 >        return new AdaptedRunnable<T>(runnable, result);
1147 >    }
1148 >
1149 >    /**
1150 >     * Returns a new ForkJoinTask that performs the {@code call}
1151 >     * method of the given Callable as its action, and returns its
1152 >     * result upon {@code join}, translating any checked
1153 >     * exceptions encountered into {@code RuntimeException}.
1154 >     *
1155 >     * @param callable the callable action
1156 >     * @return the task
1157 >     */
1158 >    public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {
1159 >        return new AdaptedCallable<T>(callable);
1160      }
1161  
1162      // Serialization support
# Line 984 | Line 1167 | public abstract class ForkJoinTask<V> im
1167       * Save the state to a stream.
1168       *
1169       * @serialData the current run status and the exception thrown
1170 <     * during execution, or null if none.
1170 >     * during execution, or {@code null} if none
1171       * @param s the stream
1172       */
1173      private void writeObject(java.io.ObjectOutputStream s)
# Line 995 | Line 1178 | public abstract class ForkJoinTask<V> im
1178  
1179      /**
1180       * Reconstitute the instance from a stream.
1181 +     *
1182       * @param s the stream
1183       */
1184      private void readObject(java.io.ObjectInputStream s)
# Line 1004 | Line 1188 | public abstract class ForkJoinTask<V> im
1188          status |= EXTERNAL_SIGNAL; // conservatively set external signal
1189          Object ex = s.readObject();
1190          if (ex != null)
1191 <            setDoneExceptionally((Throwable)ex);
1191 >            setDoneExceptionally((Throwable) ex);
1192      }
1193  
1194 <    // Temporary Unsafe mechanics for preliminary release
1195 <    private static Unsafe getUnsafe() throws Throwable {
1194 >    // Unsafe mechanics
1195 >
1196 >    private static final sun.misc.Unsafe UNSAFE = getUnsafe();
1197 >    private static final long statusOffset =
1198 >        objectFieldOffset("status", ForkJoinTask.class);
1199 >
1200 >    private static long objectFieldOffset(String field, Class<?> klazz) {
1201 >        try {
1202 >            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1203 >        } catch (NoSuchFieldException e) {
1204 >            // Convert Exception to corresponding Error
1205 >            NoSuchFieldError error = new NoSuchFieldError(field);
1206 >            error.initCause(e);
1207 >            throw error;
1208 >        }
1209 >    }
1210 >
1211 >    /**
1212 >     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
1213 >     * Replace with a simple call to Unsafe.getUnsafe when integrating
1214 >     * into a jdk.
1215 >     *
1216 >     * @return a sun.misc.Unsafe
1217 >     */
1218 >    private static sun.misc.Unsafe getUnsafe() {
1219          try {
1220 <            return Unsafe.getUnsafe();
1220 >            return sun.misc.Unsafe.getUnsafe();
1221          } catch (SecurityException se) {
1222              try {
1223                  return java.security.AccessController.doPrivileged
1224 <                    (new java.security.PrivilegedExceptionAction<Unsafe>() {
1225 <                        public Unsafe run() throws Exception {
1226 <                            return getUnsafePrivileged();
1224 >                    (new java.security
1225 >                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
1226 >                        public sun.misc.Unsafe run() throws Exception {
1227 >                            java.lang.reflect.Field f = sun.misc
1228 >                                .Unsafe.class.getDeclaredField("theUnsafe");
1229 >                            f.setAccessible(true);
1230 >                            return (sun.misc.Unsafe) f.get(null);
1231                          }});
1232              } catch (java.security.PrivilegedActionException e) {
1233 <                throw e.getCause();
1233 >                throw new RuntimeException("Could not initialize intrinsics",
1234 >                                           e.getCause());
1235              }
1236          }
1237      }
1026
1027    private static Unsafe getUnsafePrivileged()
1028            throws NoSuchFieldException, IllegalAccessException {
1029        Field f = Unsafe.class.getDeclaredField("theUnsafe");
1030        f.setAccessible(true);
1031        return (Unsafe) f.get(null);
1032    }
1033
1034    private static long fieldOffset(String fieldName)
1035            throws NoSuchFieldException {
1036        return _unsafe.objectFieldOffset
1037            (ForkJoinTask.class.getDeclaredField(fieldName));
1038    }
1039
1040    static final Unsafe _unsafe;
1041    static final long statusOffset;
1042
1043    static {
1044        try {
1045            _unsafe = getUnsafe();
1046            statusOffset = fieldOffset("status");
1047        } catch (Throwable e) {
1048            throw new RuntimeException("Could not initialize intrinsics", e);
1049        }
1050    }
1051
1238   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines