ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.58 by dl, Fri Jul 23 13:07:43 2010 UTC vs.
Revision 1.59 by dl, Fri Jul 23 14:09:17 2010 UTC

# Line 60 | Line 60 | import java.util.concurrent.CountDownLat
60   * Runnable}- or {@code Callable}- based activities as well.  However,
61   * tasks that are already executing in a pool should normally
62   * <em>NOT</em> use these pool execution methods, but instead use the
63 < * within-computation forms listed in the table.
63 > * within-computation forms listed in the table.
64   *
65   * <table BORDER CELLPADDING=3 CELLSPACING=1>
66   *  <tr>
# Line 84 | Line 84 | import java.util.concurrent.CountDownLat
84   *    <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
85   *  </tr>
86   * </table>
87 < *
87 > *
88   * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is
89   * used for all parallel task execution in a program or subsystem.
90   * Otherwise, use would not usually outweigh the construction and
# Line 171 | Line 171 | public class ForkJoinPool extends Abstra
171       * ForkJoinWorkerThread.joinTask) interleave these options until
172       * successful.  Creating a new spare always succeeds, but also
173       * increases application footprint, so we try to avoid it, within
174 <     * reason.
174 >     * reason.
175       *
176       * The ManagedBlocker extension API can't use option (1) so uses a
177       * special version of (2) in method awaitBlocker.
# Line 539 | Line 539 | public class ForkJoinPool extends Abstra
539      final void incrementRunningCount() {
540          int c;
541          do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
542 <                                               c = workerCounts,
542 >                                               c = workerCounts,
543                                                 c + ONE_RUNNING));
544      }
545  
# Line 689 | Line 689 | public class ForkJoinPool extends Abstra
689       * parallelism maintenance
690       */
691      private void ensureEnoughWorkers() {
692 <        for (;;) {
692 >        while ((runState & TERMINATING) == 0) {
693              int pc = parallelism;
694              int wc = workerCounts;
695              int rc = wc & RUNNING_COUNT_MASK;
696              int tc = wc >>> TOTAL_COUNT_SHIFT;
697              if (tc < pc) {
698 <                if (runState == TERMINATING ||
699 <                    (UNSAFE.compareAndSwapInt
700 <                     (this, workerCountsOffset,
701 <                      wc, wc + (ONE_RUNNING|ONE_TOTAL)) &&
702 <                     addWorker() == null))
698 >                if (UNSAFE.compareAndSwapInt
699 >                    (this, workerCountsOffset,
700 >                     wc, wc + (ONE_RUNNING|ONE_TOTAL)) &&
701 >                    addWorker() == null)
702                      break;
703              }
704 <            else if (tc > pc && rc < pc &&
704 >            else if (tc > pc && rc < pc &&
705                       tc > (runState & ACTIVE_COUNT_MASK)) {
706                  ForkJoinWorkerThread spare = null;
707                  ForkJoinWorkerThread[] ws = workers;
708                  int nws = ws.length;
709 <                for (int i = 0; i < nws; ++i) {
709 >                for (int i = 0; i < nws; ++i) {
710                      ForkJoinWorkerThread w = ws[i];
711                      if (w != null && w.isSuspended()) {
712 <                        if ((workerCounts & RUNNING_COUNT_MASK) > pc ||
714 <                            runState == TERMINATING)
712 >                        if ((workerCounts & RUNNING_COUNT_MASK) > pc)
713                              return;
714                          if (w.tryResumeSpare())
715                              incrementRunningCount();
# Line 792 | Line 790 | public class ForkJoinPool extends Abstra
790       */
791      private void signalEvent() {
792          int c;
793 <        do {} while (!UNSAFE.compareAndSwapInt(this, eventCountOffset,
793 >        do {} while (!UNSAFE.compareAndSwapInt(this, eventCountOffset,
794                                                 c = eventCount, c+1));
795          releaseWaiters();
796      }
# Line 919 | Line 917 | public class ForkJoinPool extends Abstra
917       *
918       * We allow blocking if:
919       *
920 <     * 1. There would still be at least as many running threads as
920 >     * 1. There would still be at least as many running threads as
921       *    parallelism level if this thread blocks.
922       *
923       * 2. A spare is resumed to replace this worker. We tolerate
# Line 929 | Line 927 | public class ForkJoinPool extends Abstra
927       *    preStep().
928       *
929       * 3. After #spares repeated checks, there are no fewer than #spare
930 <     *    threads not running. We allow this slack to avoid hysteresis
931 <     *    and as a hedge against lag/uncertainty of running count
930 >     *    threads not running. We allow this slack to avoid hysteresis
931 >     *    and as a hedge against lag/uncertainty of running count
932       *    estimates when signalling or unblocking stalls.
933       *
934       * 4. All existing workers are busy (as rechecked via repeated
935       *    retries by caller) and a new spare is created.
936 <     *
936 >     *
937       * If none of the above hold, we try to escape out by
938       * re-incrementing count and returning to caller, which can retry
939       * later.
# Line 948 | Line 946 | public class ForkJoinPool extends Abstra
946       *   none of the blocking checks hold
947       */
948      final boolean tryAwaitJoin(ForkJoinTask<?> joinMe, int retries) {
949 <        if (joinMe.status < 0) // precheck to prime loop
949 >        if (joinMe.status < 0) // precheck for cancellation
950 >            return false;
951 >        if ((runState & TERMINATING) != 0) { // shutting down
952 >            joinMe.cancelIgnoringExceptions();
953              return false;
954 +        }
955 +
956          int pc = parallelism;
957          boolean running = true; // false when running count decremented
958          outer:for (;;) {
# Line 957 | Line 960 | public class ForkJoinPool extends Abstra
960              int rc = wc & RUNNING_COUNT_MASK;
961              int tc = wc >>> TOTAL_COUNT_SHIFT;
962              if (running) { // replace with spare or decrement count
963 <                if (rc <= pc && tc > pc &&
963 >                if (rc <= pc && tc > pc &&
964                      (retries > 0 || tc > (runState & ACTIVE_COUNT_MASK))) {
965                      ForkJoinWorkerThread[] ws = workers;
966                      int nws = ws.length;
# Line 979 | Line 982 | public class ForkJoinPool extends Abstra
982                  }
983                  if (retries < 0 || // < 0 means replacement check only
984                      rc == 0 || joinMe.status < 0 || workerCounts != wc ||
985 <                    !UNSAFE.compareAndSwapInt(this, workerCountsOffset,
985 >                    !UNSAFE.compareAndSwapInt(this, workerCountsOffset,
986                                                wc, wc - ONE_RUNNING))
987                      return false; // done or inconsistent or contended
988                  running = false;
# Line 993 | Line 996 | public class ForkJoinPool extends Abstra
996                  if (retries > sc) {
997                      if (rc > 0 && rc >= pc - sc) // allow slack
998                          break;
999 <                    if (tc < MAX_THREADS &&
1000 <                        tc == (runState & ACTIVE_COUNT_MASK) &&
999 >                    if (tc < MAX_THREADS &&
1000 >                        tc == (runState & ACTIVE_COUNT_MASK) &&
1001                          workerCounts == wc &&
1002                          UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
1003                                                   wc+(ONE_RUNNING|ONE_TOTAL))) {
# Line 1036 | Line 1039 | public class ForkJoinPool extends Abstra
1039              int wc = workerCounts;
1040              int rc = wc & RUNNING_COUNT_MASK;
1041              int tc = wc >>> TOTAL_COUNT_SHIFT;
1042 <            if (running) {
1043 <                if (rc <= pc && tc > pc &&
1042 >            if (running) {
1043 >                if (rc <= pc && tc > pc &&
1044                      (retries > 0 || tc > (runState & ACTIVE_COUNT_MASK))) {
1045                      ForkJoinWorkerThread[] ws = workers;
1046                      int nws = ws.length;
1047 <                    for (int i = 0; i < nws; ++i) {
1047 >                    for (int i = 0; i < nws; ++i) {
1048                          ForkJoinWorkerThread w = ws[i];
1049                          if (w != null) {
1050                              if (done = blocker.isReleasable())
# Line 1060 | Line 1063 | public class ForkJoinPool extends Abstra
1063                  if (done = blocker.isReleasable())
1064                      return;
1065                  if (rc == 0 || workerCounts != wc ||
1066 <                    !UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1066 >                    !UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1067                                                wc, wc - ONE_RUNNING))
1068                      continue;
1069                  running = false;
1070                  if (rc > pc)
1071                      break;
1072              }
1073 <            else {
1073 >            else {
1074                  if (rc >= pc || (done = blocker.isReleasable()))
1075                      break;
1076                  int sc = tc - pc + 1;
1077                  if (retries++ > sc) {
1078                      if (rc > 0 && rc >= pc - sc)
1079                          break;
1080 <                    if (tc < MAX_THREADS &&
1081 <                        tc == (runState & ACTIVE_COUNT_MASK) &&
1080 >                    if (tc < MAX_THREADS &&
1081 >                        tc == (runState & ACTIVE_COUNT_MASK) &&
1082                          workerCounts == wc &&
1083                          UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
1084                                                   wc+(ONE_RUNNING|ONE_TOTAL))) {
# Line 1086 | Line 1089 | public class ForkJoinPool extends Abstra
1089                  Thread.yield();
1090              }
1091          }
1092 <        
1092 >
1093          try {
1094              if (!done)
1095                  do {} while (!blocker.isReleasable() && !blocker.block());
# Line 1098 | Line 1101 | public class ForkJoinPool extends Abstra
1101                                c = workerCounts, c + ONE_RUNNING));
1102              }
1103          }
1104 <    }  
1104 >    }
1105  
1106      /**
1107       * Possibly initiates and/or completes termination.
# Line 1275 | Line 1278 | public class ForkJoinPool extends Abstra
1278       * use {@link java.lang.Runtime#availableProcessors}.
1279       * @param factory the factory for creating new threads. For default value,
1280       * use {@link #defaultForkJoinWorkerThreadFactory}.
1281 <     * @param handler the handler for internal worker threads that
1282 <     * terminate due to unrecoverable errors encountered while executing
1281 >     * @param handler the handler for internal worker threads that
1282 >     * terminate due to unrecoverable errors encountered while executing
1283       * tasks. For default value, use <code>null</code>.
1284 <     * @param asyncMode if true,
1284 >     * @param asyncMode if true,
1285       * establishes local first-in-first-out scheduling mode for forked
1286       * tasks that are never joined. This mode may be more appropriate
1287       * than default locally stack-based mode in applications in which
# Line 1292 | Line 1295 | public class ForkJoinPool extends Abstra
1295       *         because it does not hold {@link
1296       *         java.lang.RuntimePermission}{@code ("modifyThread")}
1297       */
1298 <    public ForkJoinPool(int parallelism,
1298 >    public ForkJoinPool(int parallelism,
1299                          ForkJoinWorkerThreadFactory factory,
1300                          Thread.UncaughtExceptionHandler handler,
1301                          boolean asyncMode) {
# Line 1345 | Line 1348 | public class ForkJoinPool extends Abstra
1348      /**
1349       * Performs the given task, returning its result upon completion.
1350       * If the caller is already engaged in a fork/join computation in
1351 <     * the current pool, this method is equivalent in effect to
1351 >     * the current pool, this method is equivalent in effect to
1352       * {@link ForkJoinTask#invoke}.
1353       *
1354       * @param task the task
# Line 1362 | Line 1365 | public class ForkJoinPool extends Abstra
1365      /**
1366       * Arranges for (asynchronous) execution of the given task.
1367       * If the caller is already engaged in a fork/join computation in
1368 <     * the current pool, this method is equivalent in effect to
1368 >     * the current pool, this method is equivalent in effect to
1369       * {@link ForkJoinTask#fork}.
1370       *
1371       * @param task the task
# Line 1393 | Line 1396 | public class ForkJoinPool extends Abstra
1396      /**
1397       * Submits a ForkJoinTask for execution.
1398       * If the caller is already engaged in a fork/join computation in
1399 <     * the current pool, this method is equivalent in effect to
1399 >     * the current pool, this method is equivalent in effect to
1400       * {@link ForkJoinTask#fork}.
1401       *
1402       * @param task the task to submit

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines