ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinTask.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinTask.java (file contents):
Revision 1.63 by dl, Sat Sep 18 12:10:21 2010 UTC vs.
Revision 1.73 by jsr166, Sun Nov 28 21:21:03 2010 UTC

# Line 6 | Line 6
6  
7   package jsr166y;
8  
9 import java.util.concurrent.*;
9   import java.io.Serializable;
10   import java.util.Collection;
11   import java.util.Collections;
# Line 14 | Line 13 | import java.util.List;
13   import java.util.RandomAccess;
14   import java.util.Map;
15   import java.util.WeakHashMap;
16 + import java.util.concurrent.Callable;
17 + import java.util.concurrent.CancellationException;
18 + import java.util.concurrent.ExecutionException;
19 + import java.util.concurrent.Executor;
20 + import java.util.concurrent.ExecutorService;
21 + import java.util.concurrent.Future;
22 + import java.util.concurrent.RejectedExecutionException;
23 + import java.util.concurrent.RunnableFuture;
24 + import java.util.concurrent.TimeUnit;
25 + import java.util.concurrent.TimeoutException;
26  
27   /**
28   * Abstract base class for tasks that run within a {@link ForkJoinPool}.
# Line 101 | Line 110 | import java.util.WeakHashMap;
110   * result in exceptions or errors, possibly including
111   * {@code ClassCastException}.
112   *
113 + * <p>Method {@link #join} and its variants are appropriate for use
114 + * only when completion dependencies are acyclic; that is, the
115 + * parallel computation can be described as a directed acyclic graph
116 + * (DAG). Otherwise, executions may encounter a form of deadlock as
117 + * tasks cyclically wait for each other.  However, this framework
118 + * supports other methods and techniques (for example the use of
119 + * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that
120 + * may be of use in constructing custom subclasses for problems that
121 + * are not statically structured as DAGs.
122 + *
123   * <p>Most base support methods are {@code final}, to prevent
124   * overriding of implementations that are intrinsically tied to the
125   * underlying lightweight task scheduling framework.  Developers
# Line 115 | Line 134 | import java.util.WeakHashMap;
134   * computation. Large tasks should be split into smaller subtasks,
135   * usually via recursive decomposition. As a very rough rule of thumb,
136   * a task should perform more than 100 and less than 10000 basic
137 < * computational steps. If tasks are too big, then parallelism cannot
138 < * improve throughput. If too small, then memory and internal task
139 < * maintenance overhead may overwhelm processing.
137 > * computational steps, and should avoid indefinite looping. If tasks
138 > * are too big, then parallelism cannot improve throughput. If too
139 > * small, then memory and internal task maintenance overhead may
140 > * overwhelm processing.
141   *
142   * <p>This class provides {@code adapt} methods for {@link Runnable}
143   * and {@link Callable}, that may be of use when mixing execution of
# Line 214 | Line 234 | public abstract class ForkJoinTask<V> im
234      }
235  
236      /**
217     * Blocks a worker thread until completion. Called only by
218     * pool. Currently unused -- pool-based waits use timeout
219     * version below.
220     */
221    final void internalAwaitDone() {
222        int s;         // the odd construction reduces lock bias effects
223        while ((s = status) >= 0) {
224            try {
225                synchronized (this) {
226                    if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
227                        wait();
228                }
229            } catch (InterruptedException ie) {
230                cancelIfTerminating();
231            }
232        }
233    }
234
235    /**
237       * Blocks a worker thread until completed or timed out.  Called
238       * only by pool.
238     *
239     * @return status on exit
239       */
240 <    final int internalAwaitDone(long millis) {
241 <        int s;
242 <        if ((s = status) >= 0) {
243 <            try {
240 >    final void internalAwaitDone(long millis, int nanos) {
241 >        int s = status;
242 >        if ((s == 0 &&
243 >             UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL)) ||
244 >            s > 0)  {
245 >            try {     // the odd construction reduces lock bias effects
246                  synchronized (this) {
247 <                    if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
248 <                        wait(millis, 0);
247 >                    if (status > 0)
248 >                        wait(millis, nanos);
249 >                    else
250 >                        notifyAll();
251                  }
252              } catch (InterruptedException ie) {
253                  cancelIfTerminating();
254              }
252            s = status;
255          }
254        return s;
256      }
257  
258      /**
259       * Blocks a non-worker-thread until completion.
260       */
261      private void externalAwaitDone() {
262 <        int s;
263 <        while ((s = status) >= 0) {
262 >        if (status >= 0) {
263 >            boolean interrupted = false;
264              synchronized (this) {
265 <                if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)){
266 <                    boolean interrupted = false;
267 <                    while (status >= 0) {
265 >                for (;;) {
266 >                    int s = status;
267 >                    if (s == 0)
268 >                        UNSAFE.compareAndSwapInt(this, statusOffset,
269 >                                                 0, SIGNAL);
270 >                    else if (s < 0) {
271 >                        notifyAll();
272 >                        break;
273 >                    }
274 >                    else {
275                          try {
276                              wait();
277                          } catch (InterruptedException ie) {
278                              interrupted = true;
279                          }
280                      }
281 <                    if (interrupted)
282 <                        Thread.currentThread().interrupt();
283 <                    break;
281 >                }
282 >            }
283 >            if (interrupted)
284 >                Thread.currentThread().interrupt();
285 >        }
286 >    }
287 >
288 >    /**
289 >     * Blocks a non-worker-thread until completion or interruption or timeout.
290 >     */
291 >    private void externalInterruptibleAwaitDone(boolean timed, long nanos)
292 >        throws InterruptedException {
293 >        if (Thread.interrupted())
294 >            throw new InterruptedException();
295 >        if (status >= 0) {
296 >            long startTime = timed ? System.nanoTime() : 0L;
297 >            synchronized (this) {
298 >                for (;;) {
299 >                    long nt;
300 >                    int s = status;
301 >                    if (s == 0)
302 >                        UNSAFE.compareAndSwapInt(this, statusOffset,
303 >                                                 0, SIGNAL);
304 >                    else if (s < 0) {
305 >                        notifyAll();
306 >                        break;
307 >                    }
308 >                    else if (!timed)
309 >                        wait();
310 >                    else if ((nt = nanos - (System.nanoTime()-startTime)) > 0L)
311 >                        wait(nt / 1000000, (int)(nt % 1000000));
312 >                    else
313 >                        break;
314                  }
315              }
316          }
# Line 307 | Line 345 | public abstract class ForkJoinTask<V> im
345       * #isDone} returning {@code true}.
346       *
347       * <p>This method may be invoked only from within {@code
348 <     * ForkJoinTask} computations (as may be determined using method
348 >     * ForkJoinPool} computations (as may be determined using method
349       * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
350       * result in exceptions or errors, possibly including {@code
351       * ClassCastException}.
# Line 321 | Line 359 | public abstract class ForkJoinTask<V> im
359      }
360  
361      /**
362 <     * Returns the result of the computation when it {@link #isDone is done}.
363 <     * This method differs from {@link #get()} in that
362 >     * Returns the result of the computation when it {@link #isDone is
363 >     * done}.  This method differs from {@link #get()} in that
364       * abnormal completion results in {@code RuntimeException} or
365 <     * {@code Error}, not {@code ExecutionException}.
365 >     * {@code Error}, not {@code ExecutionException}, and that
366 >     * interrupts of the calling thread do <em>not</em> cause the
367 >     * method to abruptly return by throwing {@code
368 >     * InterruptedException}.
369       *
370       * @return the computed result
371       */
# Line 366 | Line 407 | public abstract class ForkJoinTask<V> im
407       * unprocessed.
408       *
409       * <p>This method may be invoked only from within {@code
410 <     * ForkJoinTask} computations (as may be determined using method
410 >     * ForkJoinPool} computations (as may be determined using method
411       * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
412       * result in exceptions or errors, possibly including {@code
413       * ClassCastException}.
# Line 394 | Line 435 | public abstract class ForkJoinTask<V> im
435       * normally or exceptionally, or left unprocessed.
436       *
437       * <p>This method may be invoked only from within {@code
438 <     * ForkJoinTask} computations (as may be determined using method
438 >     * ForkJoinPool} computations (as may be determined using method
439       * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
440       * result in exceptions or errors, possibly including {@code
441       * ClassCastException}.
# Line 449 | Line 490 | public abstract class ForkJoinTask<V> im
490       * unprocessed.
491       *
492       * <p>This method may be invoked only from within {@code
493 <     * ForkJoinTask} computations (as may be determined using method
493 >     * ForkJoinPool} computations (as may be determined using method
494       * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
495       * result in exceptions or errors, possibly including {@code
496       * ClassCastException}.
# Line 501 | Line 542 | public abstract class ForkJoinTask<V> im
542  
543      /**
544       * Attempts to cancel execution of this task. This attempt will
545 <     * fail if the task has already completed, has already been
546 <     * cancelled, or could not be cancelled for some other reason. If
547 <     * successful, and this task has not started when cancel is
548 <     * called, execution of this task is suppressed, {@link
549 <     * #isCancelled} will report true, and {@link #join} will result
550 <     * in a {@code CancellationException} being thrown.
545 >     * fail if the task has already completed or could not be
546 >     * cancelled for some other reason. If successful, and this task
547 >     * has not started when {@code cancel} is called, execution of
548 >     * this task is suppressed. After this method returns
549 >     * successfully, unless there is an intervening call to {@link
550 >     * #reinitialize}, subsequent calls to {@link #isCancelled},
551 >     * {@link #isDone}, and {@code cancel} will return {@code true}
552 >     * and calls to {@link #join} and related methods will result in
553 >     * {@code CancellationException}.
554       *
555       * <p>This method may be overridden in subclasses, but if so, must
556 <     * still ensure that these minimal properties hold. In particular,
557 <     * the {@code cancel} method itself must not throw exceptions.
556 >     * still ensure that these properties hold. In particular, the
557 >     * {@code cancel} method itself must not throw exceptions.
558       *
559       * <p>This method is designed to be invoked by <em>other</em>
560       * tasks. To terminate the current task, you can just return or
561       * throw an unchecked exception from its computation method, or
562       * invoke {@link #completeExceptionally}.
563       *
564 <     * @param mayInterruptIfRunning this value is ignored in the
565 <     * default implementation because tasks are not
566 <     * cancelled via interruption
564 >     * @param mayInterruptIfRunning this value has no effect in the
565 >     * default implementation because interrupts are not used to
566 >     * control cancellation.
567       *
568       * @return {@code true} if this task is now cancelled
569       */
# Line 653 | Line 697 | public abstract class ForkJoinTask<V> im
697       * member of a ForkJoinPool and was interrupted while waiting
698       */
699      public final V get() throws InterruptedException, ExecutionException {
700 <        int s;
701 <        if (Thread.currentThread() instanceof ForkJoinWorkerThread) {
700 >        Thread t = Thread.currentThread();
701 >        if (t instanceof ForkJoinWorkerThread)
702              quietlyJoin();
703 <            s = status;
704 <        }
705 <        else {
706 <            while ((s = status) >= 0) {
663 <                synchronized (this) { // interruptible form of awaitDone
664 <                    if (UNSAFE.compareAndSwapInt(this, statusOffset,
665 <                                                 s, SIGNAL)) {
666 <                        while (status >= 0)
667 <                            wait();
668 <                    }
669 <                }
670 <            }
671 <        }
672 <        if (s < NORMAL) {
703 >        else
704 >            externalInterruptibleAwaitDone(false, 0L);
705 >        int s = status;
706 >        if (s != NORMAL) {
707              Throwable ex;
708              if (s == CANCELLED)
709                  throw new CancellationException();
# Line 695 | Line 729 | public abstract class ForkJoinTask<V> im
729       */
730      public final V get(long timeout, TimeUnit unit)
731          throws InterruptedException, ExecutionException, TimeoutException {
732 +        long nanos = unit.toNanos(timeout);
733          Thread t = Thread.currentThread();
734 <        ForkJoinPool pool;
735 <        if (t instanceof ForkJoinWorkerThread) {
701 <            ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
702 <            if (status >= 0 && w.unpushTask(this))
703 <                quietlyExec();
704 <            pool = w.pool;
705 <        }
734 >        if (t instanceof ForkJoinWorkerThread)
735 >            ((ForkJoinWorkerThread)t).joinTask(this, true, nanos);
736          else
737 <            pool = null;
738 <        /*
739 <         * Timed wait loop intermixes cases for FJ (pool != null) and
710 <         * non FJ threads. For FJ, decrement pool count but don't try
711 <         * for replacement; increment count on completion. For non-FJ,
712 <         * deal with interrupts. This is messy, but a little less so
713 <         * than is splitting the FJ and nonFJ cases.
714 <         */
715 <        boolean interrupted = false;
716 <        boolean dec = false; // true if pool count decremented
717 <        long nanos = unit.toNanos(timeout);
718 <        for (;;) {
719 <            if (pool == null && Thread.interrupted()) {
720 <                interrupted = true;
721 <                break;
722 <            }
723 <            int s = status;
724 <            if (s < 0)
725 <                break;
726 <            if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) {
727 <                long startTime = System.nanoTime();
728 <                long nt; // wait time
729 <                while (status >= 0 &&
730 <                       (nt = nanos - (System.nanoTime() - startTime)) > 0) {
731 <                    if (pool != null && !dec)
732 <                        dec = pool.tryDecrementRunningCount();
733 <                    else {
734 <                        long ms = nt / 1000000;
735 <                        int ns = (int) (nt % 1000000);
736 <                        try {
737 <                            synchronized (this) {
738 <                                if (status >= 0)
739 <                                    wait(ms, ns);
740 <                            }
741 <                        } catch (InterruptedException ie) {
742 <                            if (pool != null)
743 <                                cancelIfTerminating();
744 <                            else {
745 <                                interrupted = true;
746 <                                break;
747 <                            }
748 <                        }
749 <                    }
750 <                }
751 <                break;
752 <            }
753 <        }
754 <        if (pool != null && dec)
755 <            pool.incrementRunningCount();
756 <        if (interrupted)
757 <            throw new InterruptedException();
758 <        int es = status;
759 <        if (es != NORMAL) {
737 >            externalInterruptibleAwaitDone(true, nanos);
738 >        int s = status;
739 >        if (s != NORMAL) {
740              Throwable ex;
741 <            if (es == CANCELLED)
741 >            if (s == CANCELLED)
742                  throw new CancellationException();
743 <            if (es == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
743 >            if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
744                  throw new ExecutionException(ex);
745              throw new TimeoutException();
746          }
# Line 791 | Line 771 | public abstract class ForkJoinTask<V> im
771                          return;
772                      }
773                  }
774 <                w.joinTask(this);
774 >                w.joinTask(this, false, 0L);
775              }
776          }
777          else
# Line 827 | Line 807 | public abstract class ForkJoinTask<V> im
807       * processed.
808       *
809       * <p>This method may be invoked only from within {@code
810 <     * ForkJoinTask} computations (as may be determined using method
810 >     * ForkJoinPool} computations (as may be determined using method
811       * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
812       * result in exceptions or errors, possibly including {@code
813       * ClassCastException}.
# Line 846 | Line 826 | public abstract class ForkJoinTask<V> im
826       * under any other usage conditions are not guaranteed.
827       * This method may be useful when executing
828       * pre-constructed trees of subtasks in loops.
829 +     *
830 +     * <p>Upon completion of this method, {@code isDone()} reports
831 +     * {@code false}, and {@code getException()} reports {@code
832 +     * null}. However, the value returned by {@code getRawResult} is
833 +     * unaffected. To clear this value, you can invoke {@code
834 +     * setRawResult(null)}.
835       */
836      public void reinitialize() {
837          if (status == EXCEPTIONAL)
# Line 867 | Line 853 | public abstract class ForkJoinTask<V> im
853      }
854  
855      /**
856 <     * Returns {@code true} if the current thread is executing as a
857 <     * ForkJoinPool computation.
856 >     * Returns {@code true} if the current thread is a {@link
857 >     * ForkJoinWorkerThread} executing as a ForkJoinPool computation.
858       *
859 <     * @return {@code true} if the current thread is executing as a
860 <     * ForkJoinPool computation, or false otherwise
859 >     * @return {@code true} if the current thread is a {@link
860 >     * ForkJoinWorkerThread} executing as a ForkJoinPool computation,
861 >     * or {@code false} otherwise
862       */
863      public static boolean inForkJoinPool() {
864          return Thread.currentThread() instanceof ForkJoinWorkerThread;
# Line 886 | Line 873 | public abstract class ForkJoinTask<V> im
873       * were not, stolen.
874       *
875       * <p>This method may be invoked only from within {@code
876 <     * ForkJoinTask} computations (as may be determined using method
876 >     * ForkJoinPool} computations (as may be determined using method
877       * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
878       * result in exceptions or errors, possibly including {@code
879       * ClassCastException}.
# Line 905 | Line 892 | public abstract class ForkJoinTask<V> im
892       * fork other tasks.
893       *
894       * <p>This method may be invoked only from within {@code
895 <     * ForkJoinTask} computations (as may be determined using method
895 >     * ForkJoinPool} computations (as may be determined using method
896       * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
897       * result in exceptions or errors, possibly including {@code
898       * ClassCastException}.
# Line 928 | Line 915 | public abstract class ForkJoinTask<V> im
915       * exceeded.
916       *
917       * <p>This method may be invoked only from within {@code
918 <     * ForkJoinTask} computations (as may be determined using method
918 >     * ForkJoinPool} computations (as may be determined using method
919       * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
920       * result in exceptions or errors, possibly including {@code
921       * ClassCastException}.
# Line 986 | Line 973 | public abstract class ForkJoinTask<V> im
973       * otherwise.
974       *
975       * <p>This method may be invoked only from within {@code
976 <     * ForkJoinTask} computations (as may be determined using method
976 >     * ForkJoinPool} computations (as may be determined using method
977       * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
978       * result in exceptions or errors, possibly including {@code
979       * ClassCastException}.
# Line 1005 | Line 992 | public abstract class ForkJoinTask<V> im
992       * be useful otherwise.
993       *
994       * <p>This method may be invoked only from within {@code
995 <     * ForkJoinTask} computations (as may be determined using method
995 >     * ForkJoinPool} computations (as may be determined using method
996       * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
997       * result in exceptions or errors, possibly including {@code
998       * ClassCastException}.
# Line 1028 | Line 1015 | public abstract class ForkJoinTask<V> im
1015       * otherwise.
1016       *
1017       * <p>This method may be invoked only from within {@code
1018 <     * ForkJoinTask} computations (as may be determined using method
1018 >     * ForkJoinPool} computations (as may be determined using method
1019       * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
1020       * result in exceptions or errors, possibly including {@code
1021       * ClassCastException}.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines