ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.82 by dl, Sun Oct 10 11:56:11 2010 UTC vs.
Revision 1.83 by dl, Sun Oct 24 19:37:26 2010 UTC

# Line 705 | Line 705 | public class ForkJoinPool extends Abstra
705       */
706      final void workerTerminated(ForkJoinWorkerThread w) {
707          forgetWorker(w);
708 <        decrementWorkerCounts(w.isTrimmed()? 0 : ONE_RUNNING, ONE_TOTAL);
708 >        decrementWorkerCounts(w.isTrimmed() ? 0 : ONE_RUNNING, ONE_TOTAL);
709          while (w.stealCount != 0) // collect final count
710              tryAccumulateStealCount(w);
711          tryTerminate(false);
# Line 791 | Line 791 | public class ForkJoinPool extends Abstra
791              if (tryAccumulateStealCount(w)) { // transfer while idle
792                  boolean untimed = (w.nextWaiter != 0L ||
793                                     (workerCounts & RUNNING_COUNT_MASK) <= 1);
794 <                long startTime = untimed? 0 : System.nanoTime();
794 >                long startTime = untimed ? 0 : System.nanoTime();
795                  Thread.interrupted();         // clear/ignore interrupt
796                  if (eventCount != ec || w.isTerminating())
797                      break;                    // recheck after clear
# Line 1014 | Line 1014 | public class ForkJoinPool extends Abstra
1014       *
1015       * @param joinMe the task to join
1016       * @param worker the current worker thread
1017 +     * @param timed true if wait should time out
1018 +     * @param nanos timeout value if timed
1019       */
1020 <    final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker) {
1020 >    final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker,
1021 >                         boolean timed, long nanos) {
1022 >        long startTime = timed? System.nanoTime() : 0L;
1023          int retries = 2 + (parallelism >> 2); // #helpJoins before blocking
1024          while (joinMe.status >= 0) {
1025              int wc;
1026 +            long nt = 0L;
1027              if (runState >= TERMINATING) {
1028                  joinMe.cancelIgnoringExceptions();
1029                  break;
# Line 1028 | Line 1033 | public class ForkJoinPool extends Abstra
1033                  break;
1034              else if (retries > 0)
1035                  --retries;
1036 +            else if (timed &&
1037 +                     (nt = nanos - (System.nanoTime() - startTime)) <= 0L)
1038 +                break;
1039              else if (((wc = workerCounts) & RUNNING_COUNT_MASK) != 0 &&
1040                       UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1041                                                wc, wc - ONE_RUNNING)) {
# Line 1036 | Line 1044 | public class ForkJoinPool extends Abstra
1044                         (h = eventWaiters) != 0L && // help release others
1045                         (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
1046                      releaseEventWaiters();
1047 <                if (stat >= 0 &&
1048 <                    ((workerCounts & RUNNING_COUNT_MASK) == 0 ||
1049 <                     (stat =
1050 <                      joinMe.internalAwaitDone(JOIN_TIMEOUT_MILLIS)) >= 0))
1051 <                    helpMaintainParallelism(); // timeout or no running workers
1047 >                if (stat >= 0) {
1048 >                    if ((workerCounts & RUNNING_COUNT_MASK) != 0) {
1049 >                        long ms; int ns;
1050 >                        if (!timed) {
1051 >                            ms = JOIN_TIMEOUT_MILLIS;
1052 >                            ns = 0;
1053 >                        }
1054 >                        else { // at most JOIN_TIMEOUT_MILLIS per wait
1055 >                            ms = nt / 1000000;
1056 >                            if (ms > JOIN_TIMEOUT_MILLIS) {
1057 >                                ms = JOIN_TIMEOUT_MILLIS;
1058 >                                ns = 0;
1059 >                            }
1060 >                            else
1061 >                                ns = (int) (nt % 1000000);
1062 >                        }
1063 >                        stat = joinMe.internalAwaitDone(ms, ns);
1064 >                    }
1065 >                    if (stat >= 0) // timeout or no running workers
1066 >                        helpMaintainParallelism();
1067 >                }
1068                  do {} while (!UNSAFE.compareAndSwapInt
1069                               (this, workerCountsOffset,
1070                                c = workerCounts, c + ONE_RUNNING));

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines