ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk8/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jdk8/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.1 by jsr166, Sat Mar 26 06:22:50 2016 UTC vs.
Revision 1.2 by dl, Sat Apr 2 13:38:38 2016 UTC

# Line 1496 | Line 1496 | public class ForkJoinPool extends Abstra
1496          int t, n, sp;
1497          long c = ctl;
1498          WorkQueue[] ws = workQueues;
1499 <        if ((t = (short)(c >> TC_SHIFT)) >= 0) {
1499 >        if ((t = (short)(c >>> TC_SHIFT)) >= 0) {
1500              if (ws == null || (n = ws.length) <= 0 || w == null)
1501                  return 0;                        // disabled
1502              else if ((sp = (int)c) != 0) {       // replace or release
# Line 2380 | Line 2380 | public class ForkJoinPool extends Abstra
2380          long c = ((((long)(-parallelism) << TC_SHIFT) & TC_MASK) |
2381                    (((long)(-parallelism) << RC_SHIFT) & RC_MASK));
2382          int b = ((1 - parallelism) & SMASK) | (COMMON_MAX_SPARES << SWIDTH);
2383        int m = (parallelism < 1) ? 1 : parallelism;
2383          int n = (parallelism > 1) ? parallelism - 1 : 1;
2384          n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
2385          n = (n + 1) << 1;
# Line 2392 | Line 2391 | public class ForkJoinPool extends Abstra
2391          this.saturate = null;
2392          this.keepAlive = DEFAULT_KEEPALIVE;
2393          this.bounds = b;
2394 <        this.mode = m;
2394 >        this.mode = parallelism;
2395          this.ctl = c;
2396      }
2397  
# Line 2569 | Line 2568 | public class ForkJoinPool extends Abstra
2568       * @return the targeted parallelism level of this pool
2569       */
2570      public int getParallelism() {
2571 <        return mode & SMASK;
2571 >        int par = mode & SMASK;
2572 >        return (par > 0) ? par : 1;
2573      }
2574  
2575      /**
# Line 2651 | Line 2651 | public class ForkJoinPool extends Abstra
2651          for (;;) {
2652              long c = ctl;
2653              int md = mode, pc = md & SMASK;
2654 <            int tc = pc + (short)(c >> TC_SHIFT);
2654 >            int tc = pc + (short)(c >>> TC_SHIFT);
2655              int rc = pc + (int)(c >> RC_SHIFT);
2656              if ((md & (STOP | TERMINATED)) != 0)
2657                  return true;
# Line 3141 | Line 3141 | public class ForkJoinPool extends Abstra
3141          }
3142      }
3143  
3144 +    /**
3145 +     * If the given executor is a ForkJoinPool, poll and execute
3146 +     * AsynchronousCompletionTasks from worker's queue until none are
3147 +     * available or blocker is released.
3148 +     */
3149 +    static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
3150 +        if (blocker != null && (e instanceof ForkJoinPool)) {
3151 +            WorkQueue w; ForkJoinWorkerThread wt; WorkQueue[] ws; int r, n;
3152 +            ForkJoinPool p = (ForkJoinPool)e;
3153 +            Thread thread = Thread.currentThread();
3154 +            if (thread instanceof ForkJoinWorkerThread &&
3155 +                (wt = (ForkJoinWorkerThread)thread).pool == p)
3156 +                w = wt.workQueue;
3157 +            else if ((r = ThreadLocalRandom.getProbe()) != 0 &&
3158 +                     (ws = p.workQueues) != null && (n = ws.length) > 0)
3159 +                w = ws[(n - 1) & r & SQMASK];
3160 +            else
3161 +                w = null;
3162 +            if (w != null) {
3163 +                for (;;) {
3164 +                    int b = w.base, s = w.top, d, al; ForkJoinTask<?>[] a;
3165 +                    if ((a = w.array) != null && (d = b - s) < 0 &&
3166 +                        (al = a.length) > 0) {
3167 +                        int index = (al - 1) & b;
3168 +                        long offset = ((long)index << ASHIFT) + ABASE;
3169 +                        ForkJoinTask<?> t = (ForkJoinTask<?>)
3170 +                            U.getObjectVolatile(a, offset);
3171 +                        if (blocker.isReleasable())
3172 +                            break;
3173 +                        else if (b++ == w.base) {
3174 +                            if (t == null) {
3175 +                                if (d == -1)
3176 +                                    break;
3177 +                            }
3178 +                            else if (!(t instanceof CompletableFuture.
3179 +                                  AsynchronousCompletionTask))
3180 +                                break;
3181 +                            else if (U.compareAndSwapObject(a, offset,
3182 +                                                            t, null)) {
3183 +                                w.base = b;
3184 +                                t.doExec();
3185 +                            }
3186 +                        }
3187 +                    }
3188 +                    else
3189 +                        break;
3190 +                }
3191 +            }
3192 +        }
3193 +    }
3194 +
3195      // AbstractExecutorService overrides.  These rely on undocumented
3196      // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
3197      // implement RunnableFuture.
# Line 3197 | Line 3248 | public class ForkJoinPool extends Abstra
3248                      public ForkJoinPool run() {
3249                          return new ForkJoinPool((byte)0); }});
3250  
3251 <        COMMON_PARALLELISM = common.mode & SMASK;
3251 >        COMMON_PARALLELISM = Math.max(common.mode & SMASK, 1);
3252      }
3253  
3254      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines