ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.14 by dl, Mon Apr 5 16:05:09 2010 UTC vs.
Revision 1.15 by dl, Sun Apr 18 12:54:57 2010 UTC

# Line 976 | Line 976 | public class ForkJoinPool extends Abstra
976      }
977  
978      /**
979 +     * Unless there are not enough other running threads, adjusts
980 +     * counts for a a worker in performing helpJoin that cannot find
981 +     * any work, so that this worker can now block.
982 +     *
983 +     * @return true if worker may block
984 +     */
985 +    final boolean preBlockHelpingJoin(ForkJoinTask<?> joinMe) {
986 +        while (joinMe.status >= 0) {
987 +            releaseWaiters(); // help other threads progress
988 +
989 +            // if a spare exists, resume it to maintain parallelism level
990 +            if ((workerCounts & RUNNING_COUNT_MASK) <= parallelism) {
991 +                ForkJoinWorkerThread spare = null;
992 +                for (ForkJoinWorkerThread w : workers) {
993 +                    if (w != null && w.isSuspended()) {
994 +                        spare = w;
995 +                        break;
996 +                    }
997 +                }
998 +                if (joinMe.status < 0)
999 +                    break;
1000 +                if (spare != null) {
1001 +                    if (spare.tryUnsuspend()) {
1002 +                        boolean canBlock = true;
1003 +                        if (joinMe.requestSignal() < 0) {
1004 +                            canBlock = false; // already done
1005 +                            int c;
1006 +                            do {} while (!UNSAFE.compareAndSwapInt
1007 +                                         (this, workerCountsOffset,
1008 +                                          c = workerCounts, c + ONE_RUNNING));
1009 +                        }
1010 +                        LockSupport.unpark(spare);
1011 +                        return canBlock;
1012 +                    }
1013 +                    continue; // recheck -- another spare may exist
1014 +                }
1015 +            }
1016 +
1017 +            int wc = workerCounts; // reread to shorten CAS window
1018 +            int rc = wc & RUNNING_COUNT_MASK;
1019 +            if (rc <= 2) // keep this and at most one other thread alive
1020 +                break;
1021 +
1022 +            if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1023 +                                         wc, wc - ONE_RUNNING)) {
1024 +                if (joinMe.requestSignal() >= 0)
1025 +                    return true;
1026 +                int c;                        // back out
1027 +                do {} while (!UNSAFE.compareAndSwapInt
1028 +                             (this, workerCountsOffset,
1029 +                              c = workerCounts, c + ONE_RUNNING));
1030 +                break;
1031 +            }
1032 +        }
1033 +        return false;
1034 +    }
1035 +
1036 +    /**
1037       * Possibly initiates and/or completes termination.
1038       *
1039       * @param now if true, unconditionally terminate, else only
# Line 1071 | Line 1129 | public class ForkJoinPool extends Abstra
1129          return pc <= ac? 0 : pc >>> 1 <= ac? 1 : pc >>> 2 <= ac? 3 : pc >>> 3;
1130      }
1131  
1074    /**
1075     * Returns the approximate (non-atomic) difference between running
1076     * and active counts.
1077     */
1078    final int inactiveCount() {
1079        return (workerCounts & RUNNING_COUNT_MASK) -
1080            (runState & ACTIVE_COUNT_MASK);
1081    }
1082
1132      // Public and protected methods
1133  
1134      // Constructors
# Line 1303 | Line 1352 | public class ForkJoinPool extends Abstra
1352          invoke(new InvokeAll<T>(forkJoinTasks));
1353  
1354          @SuppressWarnings({"unchecked", "rawtypes"})
1355 <        List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
1355 >            List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
1356          return futures;
1357      }
1358  
# Line 1440 | Line 1489 | public class ForkJoinPool extends Abstra
1489       * pool. The given value should normally be greater than or equal
1490       * to the {@link #getParallelism parallelism} level. Setting this
1491       * value has no effect on current pool size. It controls
1492 <     * construction of new threads.
1492 >     * construction of new threads. The use of this method may cause
1493 >     * tasks that intrinsically require extra threads for dependent
1494 >     * computations to indefinitely stall. If you are instead trying
1495 >     * to minimize internal thread creation, consider setting {link
1496 >     * #setMaintainsParallelism} as false.
1497       *
1498       * @throws IllegalArgumentException if negative or greater than
1499       * internal implementation limit

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines