ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.53 by dl, Mon Apr 5 15:52:26 2010 UTC vs.
Revision 1.54 by dl, Sun Apr 18 12:51:18 2010 UTC

# Line 978 | Line 978 | public class ForkJoinPool extends Abstra
978      }
979  
980      /**
981 +     * Unless there are not enough other running threads, adjusts
982 +     * counts for a a worker in performing helpJoin that cannot find
983 +     * any work, so that this worker can now block.
984 +     *
985 +     * @return true if worker may block
986 +     */
987 +    final boolean preBlockHelpingJoin(ForkJoinTask<?> joinMe) {
988 +        while (joinMe.status >= 0) {
989 +            releaseWaiters(); // help other threads progress
990 +
991 +            // if a spare exists, resume it to maintain parallelism level
992 +            if ((workerCounts & RUNNING_COUNT_MASK) <= parallelism) {
993 +                ForkJoinWorkerThread spare = null;
994 +                for (ForkJoinWorkerThread w : workers) {
995 +                    if (w != null && w.isSuspended()) {
996 +                        spare = w;
997 +                        break;
998 +                    }
999 +                }
1000 +                if (joinMe.status < 0)
1001 +                    break;
1002 +                if (spare != null) {
1003 +                    if (spare.tryUnsuspend()) {
1004 +                        boolean canBlock = true;
1005 +                        if (joinMe.requestSignal() < 0) {
1006 +                            canBlock = false; // already done
1007 +                            int c;
1008 +                            do {} while (!UNSAFE.compareAndSwapInt
1009 +                                         (this, workerCountsOffset,
1010 +                                          c = workerCounts, c + ONE_RUNNING));
1011 +                        }
1012 +                        LockSupport.unpark(spare);
1013 +                        return canBlock;
1014 +                    }
1015 +                    continue; // recheck -- another spare may exist
1016 +                }
1017 +            }
1018 +
1019 +            int wc = workerCounts; // reread to shorten CAS window
1020 +            int rc = wc & RUNNING_COUNT_MASK;
1021 +            if (rc <= 2) // keep this and at most one other thread alive
1022 +                break;
1023 +
1024 +            if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1025 +                                         wc, wc - ONE_RUNNING)) {
1026 +                if (joinMe.requestSignal() >= 0)
1027 +                    return true;
1028 +                int c;                        // back out
1029 +                do {} while (!UNSAFE.compareAndSwapInt
1030 +                             (this, workerCountsOffset,
1031 +                              c = workerCounts, c + ONE_RUNNING));
1032 +                break;
1033 +            }
1034 +        }
1035 +        return false;
1036 +    }
1037 +
1038 +    /**
1039       * Possibly initiates and/or completes termination.
1040       *
1041       * @param now if true, unconditionally terminate, else only
# Line 1073 | Line 1131 | public class ForkJoinPool extends Abstra
1131          return pc <= ac? 0 : pc >>> 1 <= ac? 1 : pc >>> 2 <= ac? 3 : pc >>> 3;
1132      }
1133  
1076    /**
1077     * Returns the approximate (non-atomic) difference between running
1078     * and active counts.
1079     */
1080    final int inactiveCount() {
1081        return (workerCounts & RUNNING_COUNT_MASK) -
1082            (runState & ACTIVE_COUNT_MASK);
1083    }
1084
1134      // Public and protected methods
1135  
1136      // Constructors
# Line 1305 | Line 1354 | public class ForkJoinPool extends Abstra
1354          invoke(new InvokeAll<T>(forkJoinTasks));
1355  
1356          @SuppressWarnings({"unchecked", "rawtypes"})
1357 <        List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
1357 >            List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
1358          return futures;
1359      }
1360  
# Line 1442 | Line 1491 | public class ForkJoinPool extends Abstra
1491       * pool. The given value should normally be greater than or equal
1492       * to the {@link #getParallelism parallelism} level. Setting this
1493       * value has no effect on current pool size. It controls
1494 <     * construction of new threads.
1494 >     * construction of new threads. The use of this method may cause
1495 >     * tasks that intrinsically require extra threads for dependent
1496 >     * computations to indefinitely stall. If you are instead trying
1497 >     * to minimize internal thread creation, consider setting {link
1498 >     * #setMaintainsParallelism} as false.
1499       *
1500       * @throws IllegalArgumentException if negative or greater than
1501       * internal implementation limit

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines