ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.167 by jsr166, Sun Dec 30 02:05:53 2012 UTC vs.
Revision 1.171 by jsr166, Thu Jan 3 19:21:21 2013 UTC

# Line 440 | Line 440 | public class ForkJoinPool extends Abstra
440       * Common Pool
441       * ===========
442       *
443 <     * The static commonPool always exists after static
443 >     * The static common Pool always exists after static
444       * initialization.  Since it (or any other created pool) need
445       * never be used, we minimize initial construction overhead and
446       * footprint to the setup of about a dozen fields, with no nested
# Line 1074 | Line 1074 | public class ForkJoinPool extends Abstra
1074       * to paranoically avoid potential initialization circularities
1075       * as well as to simplify generated code.
1076       */
1077 <    static final ForkJoinPool commonPool;
1077 >    static final ForkJoinPool common;
1078  
1079      /**
1080 <     * Common pool parallelism. Must equal commonPool.parallelism.
1080 >     * Common pool parallelism. Must equal common.parallelism.
1081       */
1082 <    static final int commonPoolParallelism;
1082 >    static final int commonParallelism;
1083  
1084      /**
1085       * Sequence number for creating workerNamePrefix.
# Line 1932 | Line 1932 | public class ForkJoinPool extends Abstra
1932       * @param task the task to join
1933       * @param mode if shared, exit upon completing any task
1934       * if all workers are active
1935     *
1935       */
1936      private int helpComplete(ForkJoinTask<?> task, int mode) {
1937          WorkQueue[] ws; WorkQueue q; int m, n, s, u;
# Line 2241 | Line 2240 | public class ForkJoinPool extends Abstra
2240       */
2241      private boolean tryTerminate(boolean now, boolean enable) {
2242          int ps;
2243 <        if (this == commonPool)                    // cannot shut down
2243 >        if (this == common)                    // cannot shut down
2244              return false;
2245          if ((ps = plock) >= 0) {                   // enable by setting plock
2246              if (!enable)
# Line 2332 | Line 2331 | public class ForkJoinPool extends Abstra
2331      static WorkQueue commonSubmitterQueue() {
2332          ForkJoinPool p; WorkQueue[] ws; int m; Submitter z;
2333          return ((z = submitters.get()) != null &&
2334 <                (p = commonPool) != null &&
2334 >                (p = common) != null &&
2335                  (ws = p.workQueues) != null &&
2336                  (m = ws.length - 1) >= 0) ?
2337              ws[m & z.seed & SQMASK] : null;
# Line 2346 | Line 2345 | public class ForkJoinPool extends Abstra
2345          ForkJoinTask<?>[] a;  int m, s;
2346          if (t != null &&
2347              (z = submitters.get()) != null &&
2348 <            (p = commonPool) != null &&
2348 >            (p = common) != null &&
2349              (ws = p.workQueues) != null &&
2350              (m = ws.length - 1) >= 0 &&
2351              (q = ws[m & z.seed & SQMASK]) != null &&
# Line 2423 | Line 2422 | public class ForkJoinPool extends Abstra
2422          ForkJoinTask<?>[] a;  int m, s, n;
2423          if (t != null &&
2424              (z = submitters.get()) != null &&
2425 <            (p = commonPool) != null &&
2425 >            (p = common) != null &&
2426              (ws = p.workQueues) != null &&
2427              (m = ws.length - 1) >= 0 &&
2428              (q = ws[m & z.seed & SQMASK]) != null &&
# Line 2452 | Line 2451 | public class ForkJoinPool extends Abstra
2451          }
2452      }
2453  
2455    /**
2456     * Restricted version of helpQuiescePool for external callers
2457     */
2458    static void externalHelpQuiescePool() {
2459        ForkJoinPool p; ForkJoinTask<?> t; WorkQueue q; int b;
2460        if ((p = commonPool) != null &&
2461            (q = p.findNonEmptyStealQueue(1)) != null &&
2462            (b = q.base) - q.top < 0 &&
2463            (t = q.pollAt(b)) != null) {
2464            if (q.base - q.top < 0)
2465                p.signalWork(q);
2466            t.doExec();
2467        }
2468    }
2469
2454      // Exported methods
2455  
2456      // Constructors
# Line 2566 | Line 2550 | public class ForkJoinPool extends Abstra
2550  
2551      /**
2552       * Returns the common pool instance. This pool is statically
2553 <     * constructed; its run state is unaffected by attempts to
2554 <     * {@link #shutdown} or {@link #shutdownNow}.
2553 >     * constructed; its run state is unaffected by attempts to {@link
2554 >     * #shutdown} or {@link #shutdownNow}. However this pool and any
2555 >     * ongoing processing are automatically terminated upon program
2556 >     * {@link System#exit}.  Any program that relies on asynchronous
2557 >     * task processing to complete before program termination should
2558 >     * invoke {@code commonPool().}{@link #awaitQuiescence}, before
2559 >     * exit.
2560       *
2561       * @return the common pool instance
2562       */
2563      public static ForkJoinPool commonPool() {
2564 <        // assert commonPool != null : "static init error";
2565 <        return commonPool;
2564 >        // assert common != null : "static init error";
2565 >        return common;
2566      }
2567  
2568      // Execution methods
# Line 2754 | Line 2743 | public class ForkJoinPool extends Abstra
2743       * @return the targeted parallelism level of the common pool
2744       */
2745      public static int getCommonPoolParallelism() {
2746 <        return commonPoolParallelism;
2746 >        return commonParallelism;
2747      }
2748  
2749      /**
# Line 3012 | Line 3001 | public class ForkJoinPool extends Abstra
3001       * Possibly initiates an orderly shutdown in which previously
3002       * submitted tasks are executed, but no new tasks will be
3003       * accepted. Invocation has no effect on execution state if this
3004 <     * is the {@link #commonPool}, and no additional effect if
3004 >     * is the {@link #commonPool()}, and no additional effect if
3005       * already shut down.  Tasks that are in the process of being
3006       * submitted concurrently during the course of this method may or
3007       * may not be rejected.
# Line 3030 | Line 3019 | public class ForkJoinPool extends Abstra
3019      /**
3020       * Possibly attempts to cancel and/or stop all tasks, and reject
3021       * all subsequently submitted tasks.  Invocation has no effect on
3022 <     * execution state if this is the {@link #commonPool}, and no
3022 >     * execution state if this is the {@link #commonPool()}, and no
3023       * additional effect if already shut down. Otherwise, tasks that
3024       * are in the process of being submitted or executed concurrently
3025       * during the course of this method may or may not be
# Line 3093 | Line 3082 | public class ForkJoinPool extends Abstra
3082      /**
3083       * Blocks until all tasks have completed execution after a
3084       * shutdown request, or the timeout occurs, or the current thread
3085 <     * is interrupted, whichever happens first. Note that the {@link
3086 <     * #commonPool()} never terminates until program shutdown so
3087 <     * this method will always time out.
3085 >     * is interrupted, whichever happens first. Because the {@link
3086 >     * #commonPool()} never terminates until program shutdown, when
3087 >     * applied to the common pool, this method is equivalent to {@link
3088 >     * #awaitQuiescence} but always returns {@code false}.
3089       *
3090       * @param timeout the maximum time to wait
3091       * @param unit the time unit of the timeout argument
# Line 3105 | Line 3095 | public class ForkJoinPool extends Abstra
3095       */
3096      public boolean awaitTermination(long timeout, TimeUnit unit)
3097          throws InterruptedException {
3098 +        if (Thread.interrupted())
3099 +            throw new InterruptedException();
3100 +        if (this == common) {
3101 +            awaitQuiescence(timeout, unit);
3102 +            return false;
3103 +        }
3104          long nanos = unit.toNanos(timeout);
3105          if (isTerminated())
3106              return true;
# Line 3124 | Line 3120 | public class ForkJoinPool extends Abstra
3120      }
3121  
3122      /**
3123 +     * If called by a ForkJoinTask operating in this pool, equivalent
3124 +     * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
3125 +     * waits and/or attempts to assist performing tasks until this
3126 +     * pool {@link #isQuiescent} or the indicated timeout elapses.
3127 +     *
3128 +     * @param timeout the maximum time to wait
3129 +     * @param unit the time unit of the timeout argument
3130 +     * @return {@code true} if quiescent; {@code false} if the
3131 +     * timeout elapsed.
3132 +     */
3133 +    public boolean awaitQuiescence(long timeout, TimeUnit unit) {
3134 +        long nanos = unit.toNanos(timeout);
3135 +        ForkJoinWorkerThread wt;
3136 +        Thread thread = Thread.currentThread();
3137 +        if ((thread instanceof ForkJoinWorkerThread) &&
3138 +            (wt = (ForkJoinWorkerThread)thread).pool == this) {
3139 +            helpQuiescePool(wt.workQueue);
3140 +            return true;
3141 +        }
3142 +        long startTime = System.nanoTime();
3143 +        WorkQueue[] ws;
3144 +        int r = 0, m;
3145 +        boolean found = true;
3146 +        while (!isQuiescent() && (ws = workQueues) != null &&
3147 +               (m = ws.length - 1) >= 0) {
3148 +            if (!found) {
3149 +                if ((System.nanoTime() - startTime) > nanos)
3150 +                    return false;
3151 +                Thread.yield(); // cannot block
3152 +            }
3153 +            found = false;
3154 +            for (int j = (m + 1) << 2; j >= 0; --j) {
3155 +                ForkJoinTask<?> t; WorkQueue q; int b;
3156 +                if ((q = ws[r++ & m]) != null && (b = q.base) - q.top < 0) {
3157 +                    found = true;
3158 +                    if ((t = q.pollAt(b)) != null) {
3159 +                        if (q.base - q.top < 0)
3160 +                            signalWork(q);
3161 +                        t.doExec();
3162 +                    }
3163 +                    break;
3164 +                }
3165 +            }
3166 +        }
3167 +        return true;
3168 +    }
3169 +
3170 +    /**
3171 +     * Waits and/or attempts to assist performing tasks indefinitely
3172 +     * until the {@link #commonPool()} {@link #isQuiescent}
3173 +     */
3174 +    static void quiesceCommonPool() {
3175 +        common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
3176 +    }
3177 +
3178 +    /**
3179       * Interface for extending managed parallelism for tasks running
3180       * in {@link ForkJoinPool}s.
3181       *
# Line 3339 | Line 3391 | public class ForkJoinPool extends Abstra
3391              par = Runtime.getRuntime().availableProcessors();
3392          if (par > MAX_CAP)
3393              par = MAX_CAP;
3394 <        commonPoolParallelism = par;
3394 >        commonParallelism = par;
3395          long np = (long)(-par); // precompute initial ctl value
3396          long ct = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
3397  
3398 <        commonPool = new ForkJoinPool(par, ct, fac, handler);
3398 >        common = new ForkJoinPool(par, ct, fac, handler);
3399      }
3400  
3349
3401      /**
3402       * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
3403       * Replace with a simple call to Unsafe.getUnsafe when integrating

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines