ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.166 by jsr166, Sun Dec 30 00:03:36 2012 UTC vs.
Revision 1.173 by jsr166, Wed Jan 9 02:51:37 2013 UTC

# Line 440 | Line 440 | public class ForkJoinPool extends Abstra
440       * Common Pool
441       * ===========
442       *
443 <     * The static commonPool always exists after static
443 >     * The static common Pool always exists after static
444       * initialization.  Since it (or any other created pool) need
445       * never be used, we minimize initial construction overhead and
446       * footprint to the setup of about a dozen fields, with no nested
# Line 1074 | Line 1074 | public class ForkJoinPool extends Abstra
1074       * to paranoically avoid potential initialization circularities
1075       * as well as to simplify generated code.
1076       */
1077 <    static final ForkJoinPool commonPool;
1077 >    static final ForkJoinPool common;
1078  
1079      /**
1080 <     * Common pool parallelism. Must equal commonPool.parallelism.
1080 >     * Common pool parallelism. Must equal common.parallelism.
1081       */
1082 <    static final int commonPoolParallelism;
1082 >    static final int commonParallelism;
1083  
1084      /**
1085       * Sequence number for creating workerNamePrefix.
# Line 1661 | Line 1661 | public class ForkJoinPool extends Abstra
1661       * park awaiting signal, else lingering to help scan and signal.
1662       *
1663       * * If a non-empty queue discovered or left as a hint,
1664 <     * help wake up other workers before return
1664 >     * help wake up other workers before return.
1665       *
1666       * @param w the worker (via its WorkQueue)
1667       * @return a task or null if none found
# Line 1932 | Line 1932 | public class ForkJoinPool extends Abstra
1932       * @param task the task to join
1933       * @param mode if shared, exit upon completing any task
1934       * if all workers are active
1935     *
1935       */
1936      private int helpComplete(ForkJoinTask<?> task, int mode) {
1937          WorkQueue[] ws; WorkQueue q; int m, n, s, u;
# Line 2241 | Line 2240 | public class ForkJoinPool extends Abstra
2240       */
2241      private boolean tryTerminate(boolean now, boolean enable) {
2242          int ps;
2243 <        if (this == commonPool)                    // cannot shut down
2243 >        if (this == common)                    // cannot shut down
2244              return false;
2245          if ((ps = plock) >= 0) {                   // enable by setting plock
2246              if (!enable)
# Line 2332 | Line 2331 | public class ForkJoinPool extends Abstra
2331      static WorkQueue commonSubmitterQueue() {
2332          ForkJoinPool p; WorkQueue[] ws; int m; Submitter z;
2333          return ((z = submitters.get()) != null &&
2334 <                (p = commonPool) != null &&
2334 >                (p = common) != null &&
2335                  (ws = p.workQueues) != null &&
2336                  (m = ws.length - 1) >= 0) ?
2337              ws[m & z.seed & SQMASK] : null;
# Line 2346 | Line 2345 | public class ForkJoinPool extends Abstra
2345          ForkJoinTask<?>[] a;  int m, s;
2346          if (t != null &&
2347              (z = submitters.get()) != null &&
2348 <            (p = commonPool) != null &&
2348 >            (p = common) != null &&
2349              (ws = p.workQueues) != null &&
2350              (m = ws.length - 1) >= 0 &&
2351              (q = ws[m & z.seed & SQMASK]) != null &&
# Line 2423 | Line 2422 | public class ForkJoinPool extends Abstra
2422          ForkJoinTask<?>[] a;  int m, s, n;
2423          if (t != null &&
2424              (z = submitters.get()) != null &&
2425 <            (p = commonPool) != null &&
2425 >            (p = common) != null &&
2426              (ws = p.workQueues) != null &&
2427              (m = ws.length - 1) >= 0 &&
2428              (q = ws[m & z.seed & SQMASK]) != null &&
# Line 2452 | Line 2451 | public class ForkJoinPool extends Abstra
2451          }
2452      }
2453  
2455    /**
2456     * Restricted version of helpQuiescePool for external callers
2457     */
2458    static void externalHelpQuiescePool() {
2459        ForkJoinPool p; ForkJoinTask<?> t; WorkQueue q; int b;
2460        if ((p = commonPool) != null &&
2461            (q = p.findNonEmptyStealQueue(1)) != null &&
2462            (b = q.base) - q.top < 0 &&
2463            (t = q.pollAt(b)) != null) {
2464            if (q.base - q.top < 0)
2465                p.signalWork(q);
2466            t.doExec();
2467        }
2468    }
2469
2454      // Exported methods
2455  
2456      // Constructors
# Line 2566 | Line 2550 | public class ForkJoinPool extends Abstra
2550  
2551      /**
2552       * Returns the common pool instance. This pool is statically
2553 <     * constructed; its run state is unaffected by attempts to
2554 <     * {@link #shutdown} or {@link #shutdownNow}.
2553 >     * constructed; its run state is unaffected by attempts to {@link
2554 >     * #shutdown} or {@link #shutdownNow}. However this pool and any
2555 >     * ongoing processing are automatically terminated upon program
2556 >     * {@link System#exit}.  Any program that relies on asynchronous
2557 >     * task processing to complete before program termination should
2558 >     * invoke {@code commonPool().}{@link #awaitQuiescence}, before
2559 >     * exit.
2560       *
2561       * @return the common pool instance
2562 +     * @since 1.8
2563       */
2564      public static ForkJoinPool commonPool() {
2565 <        // assert commonPool != null : "static init error";
2566 <        return commonPool;
2565 >        // assert common != null : "static init error";
2566 >        return common;
2567      }
2568  
2569      // Execution methods
# Line 2752 | Line 2742 | public class ForkJoinPool extends Abstra
2742       * Returns the targeted parallelism level of the common pool.
2743       *
2744       * @return the targeted parallelism level of the common pool
2745 +     * @since 1.8
2746       */
2747      public static int getCommonPoolParallelism() {
2748 <        return commonPoolParallelism;
2748 >        return commonParallelism;
2749      }
2750  
2751      /**
# Line 3012 | Line 3003 | public class ForkJoinPool extends Abstra
3003       * Possibly initiates an orderly shutdown in which previously
3004       * submitted tasks are executed, but no new tasks will be
3005       * accepted. Invocation has no effect on execution state if this
3006 <     * is the {@link #commonPool}, and no additional effect if
3006 >     * is the {@link #commonPool()}, and no additional effect if
3007       * already shut down.  Tasks that are in the process of being
3008       * submitted concurrently during the course of this method may or
3009       * may not be rejected.
# Line 3030 | Line 3021 | public class ForkJoinPool extends Abstra
3021      /**
3022       * Possibly attempts to cancel and/or stop all tasks, and reject
3023       * all subsequently submitted tasks.  Invocation has no effect on
3024 <     * execution state if this is the {@link #commonPool}, and no
3024 >     * execution state if this is the {@link #commonPool()}, and no
3025       * additional effect if already shut down. Otherwise, tasks that
3026       * are in the process of being submitted or executed concurrently
3027       * during the course of this method may or may not be
# Line 3093 | Line 3084 | public class ForkJoinPool extends Abstra
3084      /**
3085       * Blocks until all tasks have completed execution after a
3086       * shutdown request, or the timeout occurs, or the current thread
3087 <     * is interrupted, whichever happens first. Note that the {@link
3088 <     * #commonPool()} never terminates until program shutdown so
3089 <     * this method will always time out.
3087 >     * is interrupted, whichever happens first. Because the {@link
3088 >     * #commonPool()} never terminates until program shutdown, when
3089 >     * applied to the common pool, this method is equivalent to {@link
3090 >     * #awaitQuiescence} but always returns {@code false}.
3091       *
3092       * @param timeout the maximum time to wait
3093       * @param unit the time unit of the timeout argument
# Line 3105 | Line 3097 | public class ForkJoinPool extends Abstra
3097       */
3098      public boolean awaitTermination(long timeout, TimeUnit unit)
3099          throws InterruptedException {
3100 +        if (Thread.interrupted())
3101 +            throw new InterruptedException();
3102 +        if (this == common) {
3103 +            awaitQuiescence(timeout, unit);
3104 +            return false;
3105 +        }
3106          long nanos = unit.toNanos(timeout);
3107          if (isTerminated())
3108              return true;
# Line 3124 | Line 3122 | public class ForkJoinPool extends Abstra
3122      }
3123  
3124      /**
3125 +     * If called by a ForkJoinTask operating in this pool, equivalent
3126 +     * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
3127 +     * waits and/or attempts to assist performing tasks until this
3128 +     * pool {@link #isQuiescent} or the indicated timeout elapses.
3129 +     *
3130 +     * @param timeout the maximum time to wait
3131 +     * @param unit the time unit of the timeout argument
3132 +     * @return {@code true} if quiescent; {@code false} if the
3133 +     * timeout elapsed.
3134 +     */
3135 +    public boolean awaitQuiescence(long timeout, TimeUnit unit) {
3136 +        long nanos = unit.toNanos(timeout);
3137 +        ForkJoinWorkerThread wt;
3138 +        Thread thread = Thread.currentThread();
3139 +        if ((thread instanceof ForkJoinWorkerThread) &&
3140 +            (wt = (ForkJoinWorkerThread)thread).pool == this) {
3141 +            helpQuiescePool(wt.workQueue);
3142 +            return true;
3143 +        }
3144 +        long startTime = System.nanoTime();
3145 +        WorkQueue[] ws;
3146 +        int r = 0, m;
3147 +        boolean found = true;
3148 +        while (!isQuiescent() && (ws = workQueues) != null &&
3149 +               (m = ws.length - 1) >= 0) {
3150 +            if (!found) {
3151 +                if ((System.nanoTime() - startTime) > nanos)
3152 +                    return false;
3153 +                Thread.yield(); // cannot block
3154 +            }
3155 +            found = false;
3156 +            for (int j = (m + 1) << 2; j >= 0; --j) {
3157 +                ForkJoinTask<?> t; WorkQueue q; int b;
3158 +                if ((q = ws[r++ & m]) != null && (b = q.base) - q.top < 0) {
3159 +                    found = true;
3160 +                    if ((t = q.pollAt(b)) != null) {
3161 +                        if (q.base - q.top < 0)
3162 +                            signalWork(q);
3163 +                        t.doExec();
3164 +                    }
3165 +                    break;
3166 +                }
3167 +            }
3168 +        }
3169 +        return true;
3170 +    }
3171 +
3172 +    /**
3173 +     * Waits and/or attempts to assist performing tasks indefinitely
3174 +     * until the {@link #commonPool()} {@link #isQuiescent}
3175 +     */
3176 +    static void quiesceCommonPool() {
3177 +        common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
3178 +    }
3179 +
3180 +    /**
3181       * Interface for extending managed parallelism for tasks running
3182       * in {@link ForkJoinPool}s.
3183       *
# Line 3339 | Line 3393 | public class ForkJoinPool extends Abstra
3393              par = Runtime.getRuntime().availableProcessors();
3394          if (par > MAX_CAP)
3395              par = MAX_CAP;
3396 <        commonPoolParallelism = par;
3396 >        commonParallelism = par;
3397          long np = (long)(-par); // precompute initial ctl value
3398          long ct = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
3399  
3400 <        commonPool = new ForkJoinPool(par, ct, fac, handler);
3400 >        common = new ForkJoinPool(par, ct, fac, handler);
3401      }
3402  
3349
3403      /**
3404       * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
3405       * Replace with a simple call to Unsafe.getUnsafe when integrating
# Line 3357 | Line 3410 | public class ForkJoinPool extends Abstra
3410      private static sun.misc.Unsafe getUnsafe() {
3411          try {
3412              return sun.misc.Unsafe.getUnsafe();
3413 <        } catch (SecurityException se) {
3414 <            try {
3415 <                return java.security.AccessController.doPrivileged
3416 <                    (new java.security
3417 <                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
3418 <                        public sun.misc.Unsafe run() throws Exception {
3419 <                            java.lang.reflect.Field f = sun.misc
3420 <                                .Unsafe.class.getDeclaredField("theUnsafe");
3421 <                            f.setAccessible(true);
3422 <                            return (sun.misc.Unsafe) f.get(null);
3423 <                        }});
3424 <            } catch (java.security.PrivilegedActionException e) {
3425 <                throw new RuntimeException("Could not initialize intrinsics",
3426 <                                           e.getCause());
3427 <            }
3413 >        } catch (SecurityException tryReflectionInstead) {}
3414 >        try {
3415 >            return java.security.AccessController.doPrivileged
3416 >            (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
3417 >                public sun.misc.Unsafe run() throws Exception {
3418 >                    Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
3419 >                    for (java.lang.reflect.Field f : k.getDeclaredFields()) {
3420 >                        f.setAccessible(true);
3421 >                        Object x = f.get(null);
3422 >                        if (k.isInstance(x))
3423 >                            return k.cast(x);
3424 >                    }
3425 >                    throw new NoSuchFieldError("the Unsafe");
3426 >                }});
3427 >        } catch (java.security.PrivilegedActionException e) {
3428 >            throw new RuntimeException("Could not initialize intrinsics",
3429 >                                       e.getCause());
3430          }
3431      }
3377
3432   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines