ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.167 by jsr166, Sun Dec 30 02:05:53 2012 UTC vs.
Revision 1.168 by dl, Tue Jan 1 15:10:39 2013 UTC

# Line 440 | Line 440 | public class ForkJoinPool extends Abstra
440       * Common Pool
441       * ===========
442       *
443 <     * The static commonPool always exists after static
443 >     * The static common Pool always exists after static
444       * initialization.  Since it (or any other created pool) need
445       * never be used, we minimize initial construction overhead and
446       * footprint to the setup of about a dozen fields, with no nested
# Line 1074 | Line 1074 | public class ForkJoinPool extends Abstra
1074       * to paranoically avoid potential initialization circularities
1075       * as well as to simplify generated code.
1076       */
1077 <    static final ForkJoinPool commonPool;
1077 >    static final ForkJoinPool common;
1078  
1079      /**
1080 <     * Common pool parallelism. Must equal commonPool.parallelism.
1080 >     * Common pool parallelism. Must equal common.parallelism.
1081       */
1082 <    static final int commonPoolParallelism;
1082 >    static final int commonParallelism;
1083  
1084      /**
1085       * Sequence number for creating workerNamePrefix.
# Line 2241 | Line 2241 | public class ForkJoinPool extends Abstra
2241       */
2242      private boolean tryTerminate(boolean now, boolean enable) {
2243          int ps;
2244 <        if (this == commonPool)                    // cannot shut down
2244 >        if (this == common)                    // cannot shut down
2245              return false;
2246          if ((ps = plock) >= 0) {                   // enable by setting plock
2247              if (!enable)
# Line 2332 | Line 2332 | public class ForkJoinPool extends Abstra
2332      static WorkQueue commonSubmitterQueue() {
2333          ForkJoinPool p; WorkQueue[] ws; int m; Submitter z;
2334          return ((z = submitters.get()) != null &&
2335 <                (p = commonPool) != null &&
2335 >                (p = common) != null &&
2336                  (ws = p.workQueues) != null &&
2337                  (m = ws.length - 1) >= 0) ?
2338              ws[m & z.seed & SQMASK] : null;
# Line 2346 | Line 2346 | public class ForkJoinPool extends Abstra
2346          ForkJoinTask<?>[] a;  int m, s;
2347          if (t != null &&
2348              (z = submitters.get()) != null &&
2349 <            (p = commonPool) != null &&
2349 >            (p = common) != null &&
2350              (ws = p.workQueues) != null &&
2351              (m = ws.length - 1) >= 0 &&
2352              (q = ws[m & z.seed & SQMASK]) != null &&
# Line 2423 | Line 2423 | public class ForkJoinPool extends Abstra
2423          ForkJoinTask<?>[] a;  int m, s, n;
2424          if (t != null &&
2425              (z = submitters.get()) != null &&
2426 <            (p = commonPool) != null &&
2426 >            (p = common) != null &&
2427              (ws = p.workQueues) != null &&
2428              (m = ws.length - 1) >= 0 &&
2429              (q = ws[m & z.seed & SQMASK]) != null &&
# Line 2452 | Line 2452 | public class ForkJoinPool extends Abstra
2452          }
2453      }
2454  
2455    /**
2456     * Restricted version of helpQuiescePool for external callers
2457     */
2458    static void externalHelpQuiescePool() {
2459        ForkJoinPool p; ForkJoinTask<?> t; WorkQueue q; int b;
2460        if ((p = commonPool) != null &&
2461            (q = p.findNonEmptyStealQueue(1)) != null &&
2462            (b = q.base) - q.top < 0 &&
2463            (t = q.pollAt(b)) != null) {
2464            if (q.base - q.top < 0)
2465                p.signalWork(q);
2466            t.doExec();
2467        }
2468    }
2469
2455      // Exported methods
2456  
2457      // Constructors
# Line 2566 | Line 2551 | public class ForkJoinPool extends Abstra
2551  
2552      /**
2553       * Returns the common pool instance. This pool is statically
2554 <     * constructed; its run state is unaffected by attempts to
2555 <     * {@link #shutdown} or {@link #shutdownNow}.
2554 >     * constructed; its run state is unaffected by attempts to {@link
2555 >     * #shutdown} or {@link #shutdownNow}. However this pool and any
2556 >     * ongoing processing are automatically terminated upon program
2557 >     * {@link System#exit}.  Any program that relies on asynchronous
2558 >     * task processing to complete before program termination should
2559 >     * invoke {@link #quiesceCommonPool}, or the timeout-based {@code
2560 >     * commonPool().}{@link #awaitQuiescence}, before exit.
2561       *
2562       * @return the common pool instance
2563       */
2564      public static ForkJoinPool commonPool() {
2565 <        // assert commonPool != null : "static init error";
2566 <        return commonPool;
2565 >        // assert common != null : "static init error";
2566 >        return common;
2567      }
2568  
2569      // Execution methods
# Line 2754 | Line 2744 | public class ForkJoinPool extends Abstra
2744       * @return the targeted parallelism level of the common pool
2745       */
2746      public static int getCommonPoolParallelism() {
2747 <        return commonPoolParallelism;
2747 >        return commonParallelism;
2748      }
2749  
2750      /**
# Line 3093 | Line 3083 | public class ForkJoinPool extends Abstra
3083      /**
3084       * Blocks until all tasks have completed execution after a
3085       * shutdown request, or the timeout occurs, or the current thread
3086 <     * is interrupted, whichever happens first. Note that the {@link
3087 <     * #commonPool()} never terminates until program shutdown so
3088 <     * this method will always time out.
3086 >     * is interrupted, whichever happens first. Because the {@link
3087 >     * #commonPool()} never terminates until program shutdown, when
3088 >     * applied to the common pool, this method is equivalent to {@link
3089 >     * #awaitQuiescence} but always returns {@code false}.
3090       *
3091       * @param timeout the maximum time to wait
3092       * @param unit the time unit of the timeout argument
# Line 3105 | Line 3096 | public class ForkJoinPool extends Abstra
3096       */
3097      public boolean awaitTermination(long timeout, TimeUnit unit)
3098          throws InterruptedException {
3099 +        if (Thread.interrupted())
3100 +            throw new InterruptedException();
3101 +        if (this == common) {
3102 +            awaitQuiescence(timeout, unit);
3103 +            return false;
3104 +        }
3105          long nanos = unit.toNanos(timeout);
3106          if (isTerminated())
3107              return true;
# Line 3124 | Line 3121 | public class ForkJoinPool extends Abstra
3121      }
3122  
3123      /**
3124 +     * If called by a ForkJoinTask operating in this pool, equivalent
3125 +     * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
3126 +     * waits and/or attempts to assist performing tasks until this
3127 +     * pool {@link #isQuiescent} or the indicated timeout elapses.
3128 +     *
3129 +     * @param timeout the maximum time to wait
3130 +     * @param unit the time unit of the timeout argument
3131 +     * @return {@code true} if quiescent; {@code false} if the
3132 +     * timeout elapsed.
3133 +     */
3134 +    public boolean awaitQuiescence(long timeout, TimeUnit unit) {
3135 +        long nanos = unit.toNanos(timeout);
3136 +        ForkJoinWorkerThread wt;
3137 +        Thread thread = Thread.currentThread();
3138 +        if ((thread instanceof ForkJoinWorkerThread) &&
3139 +            (wt = (ForkJoinWorkerThread)thread).pool == this) {
3140 +            helpQuiescePool(wt.workQueue);
3141 +            return true;
3142 +        }
3143 +        long startTime = System.nanoTime();
3144 +        WorkQueue[] ws;
3145 +        int r = 0, m;
3146 +        boolean found = true;
3147 +        while (!isQuiescent() && (ws = workQueues) != null &&
3148 +               (m = ws.length - 1) >= 0) {
3149 +            if (!found) {
3150 +                if ((System.nanoTime() - startTime) > nanos)
3151 +                    return false;
3152 +                Thread.yield(); // cannot block
3153 +            }
3154 +            found = false;
3155 +            for (int j = (m + 1) << 2; j >= 0; --j) {
3156 +                ForkJoinTask<?> t; WorkQueue q; int b;
3157 +                if ((q = ws[r++ & m]) != null && (b = q.base) - q.top < 0) {
3158 +                    found = true;
3159 +                    if ((t = q.pollAt(b)) != null) {
3160 +                        if (q.base - q.top < 0)
3161 +                            signalWork(q);
3162 +                        t.doExec();
3163 +                    }
3164 +                    break;
3165 +                }
3166 +            }
3167 +        }
3168 +        return true;
3169 +    }
3170 +
3171 +    /**
3172 +     * Waits and/or attempts to assist performing tasks indefinitely
3173 +     * until the {@link #commonPool()} {@link #isQuiescent}
3174 +     */
3175 +    public static void quiesceCommonPool() {
3176 +        common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
3177 +    }
3178 +
3179 +    /**
3180       * Interface for extending managed parallelism for tasks running
3181       * in {@link ForkJoinPool}s.
3182       *
# Line 3339 | Line 3392 | public class ForkJoinPool extends Abstra
3392              par = Runtime.getRuntime().availableProcessors();
3393          if (par > MAX_CAP)
3394              par = MAX_CAP;
3395 <        commonPoolParallelism = par;
3395 >        commonParallelism = par;
3396          long np = (long)(-par); // precompute initial ctl value
3397          long ct = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
3398  
3399 <        commonPool = new ForkJoinPool(par, ct, fac, handler);
3399 >        common = new ForkJoinPool(par, ct, fac, handler);
3400      }
3401  
3349
3402      /**
3403       * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
3404       * Replace with a simple call to Unsafe.getUnsafe when integrating

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines