ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.165 by jsr166, Fri Feb 22 01:24:49 2013 UTC vs.
Revision 1.166 by dl, Mon Feb 25 17:59:40 2013 UTC

# Line 128 | Line 128 | import java.util.concurrent.TimeUnit;
128   * @since 1.7
129   * @author Doug Lea
130   */
131 + //@sun.misc.Contended
132   public class ForkJoinPool extends AbstractExecutorService {
133  
134      /*
# Line 614 | Line 615 | public class ForkJoinPool extends Abstra
615       * arrays are initialized by workers before use. Others are
616       * allocated on first use.
617       */
618 +    // disabled until compatible builds    @sun.misc.Contended
619      static final class WorkQueue {
620          /**
621           * Capacity of work-stealing queue array upon initialization.
# Line 945 | Line 947 | public class ForkJoinPool extends Abstra
947           */
948          final boolean pollAndExecCC(ForkJoinTask<?> root) {
949              ForkJoinTask<?>[] a; int b; Object o;
950 <            outer: while ((b = base) - top < 0 && (a = array) != null) {
950 >            outer: while (root.status >= 0 && (b = base) - top < 0 &&
951 >                          (a = array) != null) {
952                  long j = (((a.length - 1) & b) << ASHIFT) + ABASE;
953                  if ((o = U.getObject(a, j)) == null ||
954                      !(o instanceof CountedCompleter))
# Line 1199 | Line 1202 | public class ForkJoinPool extends Abstra
1202      static final int SHARED_QUEUE        = -1;
1203  
1204      // bounds for #steps in scan loop -- must be power 2 minus 1
1205 <    private static final int MIN_SCAN    = 0x1ff;   // cover estimation slop
1205 >    private static final int MIN_SCAN    = 0x7ff;   // cover estimation slop
1206      private static final int MAX_SCAN    = 0x1ffff; // 4 * max workers
1207  
1208      // Instance fields
# Line 1897 | Line 1900 | public class ForkJoinPool extends Abstra
1900       * and run tasks within the target's computation.
1901       *
1902       * @param task the task to join
1900     * @param mode if shared, exit upon completing any task
1901     * if all workers are active
1903       */
1904 <    private int helpComplete(ForkJoinTask<?> task, int mode) {
1904 >    private int helpComplete(ForkJoinTask<?> task) {
1905          WorkQueue[] ws; WorkQueue q; int m, n, s, u;
1906 <        if (task != null && (ws = workQueues) != null &&
1906 >        if (task != null && task.status >= 0 && (ws = workQueues) != null &&
1907              (m = ws.length - 1) >= 0) {
1908              for (int j = 1, origin = j;;) {
1909                  if ((s = task.status) < 0)
1910                      return s;
1911 <                if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) {
1911 >                if ((q = ws[j & m]) != null && q.pollAndExecCC(task))
1912                      origin = j;
1912                    if (mode == SHARED_QUEUE &&
1913                        ((u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0))
1914                        break;
1915                }
1913                  else if ((j = (j + 2) & m) == origin)
1914                      break;
1915              }
# Line 1988 | Line 1985 | public class ForkJoinPool extends Abstra
1985                  helpSignal(task, joiner.poolIndex);
1986                  if ((s = task.status) >= 0 &&
1987                      (task instanceof CountedCompleter))
1988 <                    s = helpComplete(task, LIFO_QUEUE);
1988 >                    s = helpComplete(task);
1989              }
1990              while (s >= 0 && (s = task.status) >= 0) {
1991                  if ((!joiner.isEmpty() ||           // try helping
# Line 2038 | Line 2035 | public class ForkJoinPool extends Abstra
2035                  helpSignal(task, joiner.poolIndex);
2036                  if ((s = task.status) >= 0 &&
2037                      (task instanceof CountedCompleter))
2038 <                    s = helpComplete(task, LIFO_QUEUE);
2038 >                    s = helpComplete(task);
2039              }
2040              if (s >= 0 && joiner.isEmpty()) {
2041                  do {} while (task.status >= 0 &&
# Line 2340 | Line 2337 | public class ForkJoinPool extends Abstra
2337       */
2338      private void externalHelpComplete(WorkQueue q, ForkJoinTask<?> root) {
2339          ForkJoinTask<?>[] a; int m;
2340 <        if (q != null && (a = q.array) != null && (m = (a.length - 1)) >= 0 &&
2341 <            root != null && root.status >= 0) {
2342 <            for (;;) {
2343 <                int s, u; Object o; CountedCompleter<?> task = null;
2344 <                if ((s = q.top) - q.base > 0) {
2340 >        if (root != null && q != null && (a = q.array) != null &&
2341 >            (m = (a.length - 1)) >= 0) {
2342 >            outer: for (;;) {
2343 >                int s, b, u; Object o;
2344 >                if (root.status < 0)
2345 >                    return;
2346 >                if ((s = q.top) - q.base > 0) { // try pop
2347                      long j = ((m & (s - 1)) << ASHIFT) + ABASE;
2348                      if ((o = U.getObject(a, j)) != null &&
2349                          (o instanceof CountedCompleter)) {
# Line 2355 | Line 2354 | public class ForkJoinPool extends Abstra
2354                                      if (q.array == a && q.top == s &&
2355                                          U.compareAndSwapObject(a, j, t, null)) {
2356                                          q.top = s - 1;
2357 <                                        task = t;
2357 >                                        q.qlock = 0;
2358 >                                        t.doExec();
2359                                      }
2360 <                                    q.qlock = 0;
2360 >                                    else
2361 >                                        q.qlock = 0;
2362                                  }
2363 <                                break;
2363 >                                continue outer;
2364                              }
2365                          } while ((r = r.completer) != null);
2366                      }
2367                  }
2368 <                if (task != null)
2369 <                    task.doExec();
2370 <                if (root.status < 0 ||
2371 <                    (config != 0 &&
2372 <                     ((u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0)))
2373 <                    break;
2374 <               if (task == null) {
2375 <                    helpSignal(root, q.poolIndex);
2376 <                    if (root.status >= 0)
2377 <                        helpComplete(root, SHARED_QUEUE);
2378 <                    break;
2368 >                if ((b = q.base) - q.top < 0) { // try poll
2369 >                    if (root.status < 0)
2370 >                        return;
2371 >                    long j = (((a.length - 1) & b) << ASHIFT) + ABASE;
2372 >                    if ((o = U.getObject(a, j)) == null ||
2373 >                        !(o instanceof CountedCompleter))
2374 >                        break;
2375 >                    CountedCompleter<?> t = (CountedCompleter<?>)o, r = t;
2376 >                    for (;;) {
2377 >                        if (r == root) {
2378 >                            if (q.base == b &&
2379 >                                U.compareAndSwapObject(a, j, t, null)) {
2380 >                                q.base = b + 1;
2381 >                                t.doExec();
2382 >                            }
2383 >                            break;
2384 >                        }
2385 >                        if ((r = r.completer) == null)
2386 >                            break outer;
2387 >                    }
2388                  }
2389 +                else
2390 +                    break;
2391              }
2392 +            helpComplete(root);
2393          }
2394      }
2395  
# Line 3371 | Line 3384 | public class ForkJoinPool extends Abstra
3384          }
3385  
3386          if (parallelism < 0)
3387 <            parallelism = Runtime.getRuntime().availableProcessors();
3387 >            parallelism = Runtime.getRuntime().availableProcessors() - 1;
3388          if (parallelism > MAX_CAP)
3389              parallelism = MAX_CAP;
3390          return new ForkJoinPool(parallelism, factory, handler, false,

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines