ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.309 by jsr166, Sat Mar 26 15:18:22 2016 UTC vs.
Revision 1.310 by dl, Sat Apr 2 13:38:34 2016 UTC

# Line 1484 | Line 1484 | public class ForkJoinPool extends Abstra
1484          int t, n, sp;
1485          long c = ctl;
1486          WorkQueue[] ws = workQueues;
1487 <        if ((t = (short)(c >> TC_SHIFT)) >= 0) {
1487 >        if ((t = (short)(c >>> TC_SHIFT)) >= 0) {
1488              if (ws == null || (n = ws.length) <= 0 || w == null)
1489                  return 0;                        // disabled
1490              else if ((sp = (int)c) != 0) {       // replace or release
# Line 2368 | Line 2368 | public class ForkJoinPool extends Abstra
2368          long c = ((((long)(-parallelism) << TC_SHIFT) & TC_MASK) |
2369                    (((long)(-parallelism) << RC_SHIFT) & RC_MASK));
2370          int b = ((1 - parallelism) & SMASK) | (COMMON_MAX_SPARES << SWIDTH);
2371        int m = (parallelism < 1) ? 1 : parallelism;
2371          int n = (parallelism > 1) ? parallelism - 1 : 1;
2372          n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
2373          n = (n + 1) << 1;
# Line 2380 | Line 2379 | public class ForkJoinPool extends Abstra
2379          this.saturate = null;
2380          this.keepAlive = DEFAULT_KEEPALIVE;
2381          this.bounds = b;
2382 <        this.mode = m;
2382 >        this.mode = parallelism;
2383          this.ctl = c;
2384      }
2385  
# Line 2557 | Line 2556 | public class ForkJoinPool extends Abstra
2556       * @return the targeted parallelism level of this pool
2557       */
2558      public int getParallelism() {
2559 <        return mode & SMASK;
2559 >        int par = mode & SMASK;
2560 >        return (par > 0) ? par : 1;
2561      }
2562  
2563      /**
# Line 2639 | Line 2639 | public class ForkJoinPool extends Abstra
2639          for (;;) {
2640              long c = ctl;
2641              int md = mode, pc = md & SMASK;
2642 <            int tc = pc + (short)(c >> TC_SHIFT);
2642 >            int tc = pc + (short)(c >>> TC_SHIFT);
2643              int rc = pc + (int)(c >> RC_SHIFT);
2644              if ((md & (STOP | TERMINATED)) != 0)
2645                  return true;
# Line 3129 | Line 3129 | public class ForkJoinPool extends Abstra
3129          }
3130      }
3131  
3132 +    /**
3133 +     * If the given executor is a ForkJoinPool, poll and execute
3134 +     * AsynchronousCompletionTasks from worker's queue until none are
3135 +     * available or blocker is released.
3136 +     */
3137 +    static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
3138 +        if (blocker != null && (e instanceof ForkJoinPool)) {
3139 +            WorkQueue w; ForkJoinWorkerThread wt; WorkQueue[] ws; int r, n;
3140 +            ForkJoinPool p = (ForkJoinPool)e;
3141 +            Thread thread = Thread.currentThread();
3142 +            if (thread instanceof ForkJoinWorkerThread &&
3143 +                (wt = (ForkJoinWorkerThread)thread).pool == p)
3144 +                w = wt.workQueue;
3145 +            else if ((r = ThreadLocalRandom.getProbe()) != 0 &&
3146 +                     (ws = p.workQueues) != null && (n = ws.length) > 0)
3147 +                w = ws[(n - 1) & r & SQMASK];
3148 +            else
3149 +                w = null;
3150 +            if (w != null) {
3151 +                for (;;) {
3152 +                    int b = w.base, s = w.top, d, al; ForkJoinTask<?>[] a;
3153 +                    if ((a = w.array) != null && (d = b - s) < 0 &&
3154 +                        (al = a.length) > 0) {
3155 +                        int index = (al - 1) & b;
3156 +                        long offset = ((long)index << ASHIFT) + ABASE;
3157 +                        ForkJoinTask<?> t = (ForkJoinTask<?>)
3158 +                            U.getObjectVolatile(a, offset);
3159 +                        if (blocker.isReleasable())
3160 +                            break;
3161 +                        else if (b++ == w.base) {
3162 +                            if (t == null) {
3163 +                                if (d == -1)
3164 +                                    break;
3165 +                            }
3166 +                            else if (!(t instanceof CompletableFuture.
3167 +                                  AsynchronousCompletionTask))
3168 +                                break;
3169 +                            else if (U.compareAndSwapObject(a, offset,
3170 +                                                            t, null)) {
3171 +                                w.base = b;
3172 +                                t.doExec();
3173 +                            }
3174 +                        }
3175 +                    }
3176 +                    else
3177 +                        break;
3178 +                }
3179 +            }
3180 +        }
3181 +    }
3182 +
3183      // AbstractExecutorService overrides.  These rely on undocumented
3184      // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
3185      // implement RunnableFuture.
# Line 3185 | Line 3236 | public class ForkJoinPool extends Abstra
3236                      public ForkJoinPool run() {
3237                          return new ForkJoinPool((byte)0); }});
3238  
3239 <        COMMON_PARALLELISM = common.mode & SMASK;
3239 >        COMMON_PARALLELISM = Math.max(common.mode & SMASK, 1);
3240      }
3241  
3242      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines