ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinTask.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinTask.java (file contents):
Revision 1.69 by dl, Mon Nov 22 12:24:34 2010 UTC vs.
Revision 1.73 by jsr166, Sun Nov 28 21:21:03 2010 UTC

# Line 134 | Line 134 | import java.util.concurrent.TimeoutExcep
134   * computation. Large tasks should be split into smaller subtasks,
135   * usually via recursive decomposition. As a very rough rule of thumb,
136   * a task should perform more than 100 and less than 10000 basic
137 < * computational steps. If tasks are too big, then parallelism cannot
138 < * improve throughput. If too small, then memory and internal task
139 < * maintenance overhead may overwhelm processing.
137 > * computational steps, and should avoid indefinite looping. If tasks
138 > * are too big, then parallelism cannot improve throughput. If too
139 > * small, then memory and internal task maintenance overhead may
140 > * overwhelm processing.
141   *
142   * <p>This class provides {@code adapt} methods for {@link Runnable}
143   * and {@link Callable}, that may be of use when mixing execution of
# Line 233 | Line 234 | public abstract class ForkJoinTask<V> im
234      }
235  
236      /**
236     * Blocks a worker thread until completion. Called only by
237     * pool. Currently unused -- pool-based waits use timeout
238     * version below.
239     */
240    final void internalAwaitDone() {
241        int s;         // the odd construction reduces lock bias effects
242        while ((s = status) >= 0) {
243            try {
244                synchronized (this) {
245                    if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
246                        wait();
247                }
248            } catch (InterruptedException ie) {
249                cancelIfTerminating();
250            }
251        }
252    }
253
254    /**
237       * Blocks a worker thread until completed or timed out.  Called
238       * only by pool.
257     *
258     * @return status on exit
239       */
240 <    final int internalAwaitDone(long millis, int nanos) {
241 <        int s;
242 <        if ((s = status) >= 0) {
243 <            try {
240 >    final void internalAwaitDone(long millis, int nanos) {
241 >        int s = status;
242 >        if ((s == 0 &&
243 >             UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL)) ||
244 >            s > 0)  {
245 >            try {     // the odd construction reduces lock bias effects
246                  synchronized (this) {
247 <                    if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
247 >                    if (status > 0)
248                          wait(millis, nanos);
249 +                    else
250 +                        notifyAll();
251                  }
252              } catch (InterruptedException ie) {
253                  cancelIfTerminating();
254              }
271            s = status;
255          }
273        return s;
256      }
257  
258      /**
259       * Blocks a non-worker-thread until completion.
260       */
261      private void externalAwaitDone() {
262 <        int s;
263 <        while ((s = status) >= 0) {
262 >        if (status >= 0) {
263 >            boolean interrupted = false;
264              synchronized (this) {
265 <                if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) {
266 <                    boolean interrupted = false;
267 <                    while (status >= 0) {
265 >                for (;;) {
266 >                    int s = status;
267 >                    if (s == 0)
268 >                        UNSAFE.compareAndSwapInt(this, statusOffset,
269 >                                                 0, SIGNAL);
270 >                    else if (s < 0) {
271 >                        notifyAll();
272 >                        break;
273 >                    }
274 >                    else {
275                          try {
276                              wait();
277                          } catch (InterruptedException ie) {
278                              interrupted = true;
279                          }
280                      }
281 <                    if (interrupted)
282 <                        Thread.currentThread().interrupt();
283 <                    break;
281 >                }
282 >            }
283 >            if (interrupted)
284 >                Thread.currentThread().interrupt();
285 >        }
286 >    }
287 >
288 >    /**
289 >     * Blocks a non-worker-thread until completion or interruption or timeout.
290 >     */
291 >    private void externalInterruptibleAwaitDone(boolean timed, long nanos)
292 >        throws InterruptedException {
293 >        if (Thread.interrupted())
294 >            throw new InterruptedException();
295 >        if (status >= 0) {
296 >            long startTime = timed ? System.nanoTime() : 0L;
297 >            synchronized (this) {
298 >                for (;;) {
299 >                    long nt;
300 >                    int s = status;
301 >                    if (s == 0)
302 >                        UNSAFE.compareAndSwapInt(this, statusOffset,
303 >                                                 0, SIGNAL);
304 >                    else if (s < 0) {
305 >                        notifyAll();
306 >                        break;
307 >                    }
308 >                    else if (!timed)
309 >                        wait();
310 >                    else if ((nt = nanos - (System.nanoTime()-startTime)) > 0L)
311 >                        wait(nt / 1000000, (int)(nt % 1000000));
312 >                    else
313 >                        break;
314                  }
315              }
316          }
# Line 326 | Line 345 | public abstract class ForkJoinTask<V> im
345       * #isDone} returning {@code true}.
346       *
347       * <p>This method may be invoked only from within {@code
348 <     * ForkJoinTask} computations (as may be determined using method
348 >     * ForkJoinPool} computations (as may be determined using method
349       * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
350       * result in exceptions or errors, possibly including {@code
351       * ClassCastException}.
# Line 678 | Line 697 | public abstract class ForkJoinTask<V> im
697       * member of a ForkJoinPool and was interrupted while waiting
698       */
699      public final V get() throws InterruptedException, ExecutionException {
700 <        int s;
701 <        if (Thread.currentThread() instanceof ForkJoinWorkerThread) {
700 >        Thread t = Thread.currentThread();
701 >        if (t instanceof ForkJoinWorkerThread)
702              quietlyJoin();
703 <            s = status;
704 <        }
705 <        else {
706 <            while ((s = status) >= 0) {
688 <                synchronized (this) { // interruptible form of awaitDone
689 <                    if (UNSAFE.compareAndSwapInt(this, statusOffset,
690 <                                                 s, SIGNAL)) {
691 <                        while (status >= 0)
692 <                            wait();
693 <                    }
694 <                }
695 <            }
696 <        }
697 <        if (s < NORMAL) {
703 >        else
704 >            externalInterruptibleAwaitDone(false, 0L);
705 >        int s = status;
706 >        if (s != NORMAL) {
707              Throwable ex;
708              if (s == CANCELLED)
709                  throw new CancellationException();
# Line 721 | Line 730 | public abstract class ForkJoinTask<V> im
730      public final V get(long timeout, TimeUnit unit)
731          throws InterruptedException, ExecutionException, TimeoutException {
732          long nanos = unit.toNanos(timeout);
733 <        if (status >= 0) {
734 <            Thread t = Thread.currentThread();
735 <            if (t instanceof ForkJoinWorkerThread) {
736 <                ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
737 <                boolean completed = false; // timed variant of quietlyJoin
738 <                if (w.unpushTask(this)) {
739 <                    try {
731 <                        completed = exec();
732 <                    } catch (Throwable rex) {
733 <                        setExceptionalCompletion(rex);
734 <                    }
735 <                }
736 <                if (completed)
737 <                    setCompletion(NORMAL);
738 <                else if (status >= 0)
739 <                    w.joinTask(this, true, nanos);
740 <            }
741 <            else if (Thread.interrupted())
742 <                throw new InterruptedException();
743 <            else {
744 <                long startTime = System.nanoTime();
745 <                int s; long nt;
746 <                while ((s = status) >= 0 &&
747 <                       (nt = nanos - (System.nanoTime() - startTime)) > 0) {
748 <                    if (UNSAFE.compareAndSwapInt(this, statusOffset, s,
749 <                                                 SIGNAL)) {
750 <                        long ms = nt / 1000000;
751 <                        int ns = (int) (nt % 1000000);
752 <                        synchronized (this) {
753 <                            if (status >= 0)
754 <                                wait(ms, ns); // exit on IE throw
755 <                        }
756 <                    }
757 <                }
758 <            }
759 <        }
760 <        int es = status;
761 <        if (es != NORMAL) {
733 >        Thread t = Thread.currentThread();
734 >        if (t instanceof ForkJoinWorkerThread)
735 >            ((ForkJoinWorkerThread)t).joinTask(this, true, nanos);
736 >        else
737 >            externalInterruptibleAwaitDone(true, nanos);
738 >        int s = status;
739 >        if (s != NORMAL) {
740              Throwable ex;
741 <            if (es == CANCELLED)
741 >            if (s == CANCELLED)
742                  throw new CancellationException();
743 <            if (es == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
743 >            if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
744                  throw new ExecutionException(ex);
745              throw new TimeoutException();
746          }
# Line 875 | Line 853 | public abstract class ForkJoinTask<V> im
853      }
854  
855      /**
856 <     * Returns {@code true} if the current thread is executing as a
857 <     * ForkJoinPool computation.
856 >     * Returns {@code true} if the current thread is a {@link
857 >     * ForkJoinWorkerThread} executing as a ForkJoinPool computation.
858       *
859 <     * @return {@code true} if the current thread is executing as a
860 <     * ForkJoinPool computation, or false otherwise
859 >     * @return {@code true} if the current thread is a {@link
860 >     * ForkJoinWorkerThread} executing as a ForkJoinPool computation,
861 >     * or {@code false} otherwise
862       */
863      public static boolean inForkJoinPool() {
864          return Thread.currentThread() instanceof ForkJoinWorkerThread;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines