ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.98 by dl, Mon Mar 21 23:29:03 2011 UTC vs.
Revision 1.106 by jsr166, Fri Jul 1 01:15:06 2011 UTC

# Line 19 | Line 19 | import java.util.concurrent.Future;
19   import java.util.concurrent.RejectedExecutionException;
20   import java.util.concurrent.RunnableFuture;
21   import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.TimeoutException;
22   import java.util.concurrent.atomic.AtomicInteger;
23   import java.util.concurrent.locks.LockSupport;
24   import java.util.concurrent.locks.ReentrantLock;
# Line 102 | Line 101 | import java.util.concurrent.locks.Condit
101   * daemon} mode, there is typically no need to explicitly {@link
102   * #shutdown} such a pool upon program exit.
103   *
104 < * <pre>
104 > *  <pre> {@code
105   * static final ForkJoinPool mainPool = new ForkJoinPool();
106   * ...
107   * public void sort(long[] array) {
108   *   mainPool.invoke(new SortTask(array, 0, array.length));
109 < * }
111 < * </pre>
109 > * }}</pre>
110   *
111   * <p><b>Implementation notes</b>: This implementation restricts the
112   * maximum number of running threads to 32767. Attempts to create
# Line 292 | Line 290 | public class ForkJoinPool extends Abstra
290       * "terminate" status, cancels all unprocessed tasks, and wakes up
291       * all waiting workers.  Detecting whether termination should
292       * commence after a non-abrupt shutdown() call requires more work
293 <     * and bookkeeping. We need consensus about quiesence (i.e., that
293 >     * and bookkeeping. We need consensus about quiescence (i.e., that
294       * there is no more work) which is reflected in active counts so
295       * long as there are no current blockers, as well as possible
296       * re-evaluations during independent changes in blocking or
# Line 467 | Line 465 | public class ForkJoinPool extends Abstra
465      /**
466       * Main pool control -- a long packed with:
467       * AC: Number of active running workers minus target parallelism (16 bits)
468 <     * TC: Number of total workers minus target parallelism (16bits)
468 >     * TC: Number of total workers minus target parallelism (16 bits)
469       * ST: true if pool is terminating (1 bit)
470       * EC: the wait count of top waiting thread (15 bits)
471       * ID: ~poolIndex of top of Treiber stack of waiting threads (16 bits)
# Line 762 | Line 760 | public class ForkJoinPool extends Abstra
760  
761      /**
762       * Tries to enqueue worker w in wait queue and await change in
763 <     * worker's eventCount.  If the pool is quiescent, possibly
764 <     * terminates worker upon exit.  Otherwise, before blocking,
765 <     * rescans queues to avoid missed signals.  Upon finding work,
766 <     * releases at least one worker (which may be the current
767 <     * worker). Rescans restart upon detected staleness or failure to
768 <     * release due to contention. Note the unusual conventions about
769 <     * Thread.interrupt here and elsewhere: Because interrupts are
770 <     * used solely to alert threads to check termination, which is
771 <     * checked here anyway, we clear status (using Thread.interrupted)
772 <     * before any call to park, so that park does not immediately
773 <     * return due to status being set via some other unrelated call to
774 <     * interrupt in user code.
763 >     * worker's eventCount.  If the pool is quiescent and there is
764 >     * more than one worker, possibly terminates worker upon exit.
765 >     * Otherwise, before blocking, rescans queues to avoid missed
766 >     * signals.  Upon finding work, releases at least one worker
767 >     * (which may be the current worker). Rescans restart upon
768 >     * detected staleness or failure to release due to
769 >     * contention. Note the unusual conventions about Thread.interrupt
770 >     * here and elsewhere: Because interrupts are used solely to alert
771 >     * threads to check termination, which is checked here anyway, we
772 >     * clear status (using Thread.interrupted) before any call to
773 >     * park, so that park does not immediately return due to status
774 >     * being set via some other unrelated call to interrupt in user
775 >     * code.
776       *
777       * @param w the calling worker
778       * @param c the ctl value on entry
# Line 794 | Line 793 | public class ForkJoinPool extends Abstra
793              else if (w.eventCount != v)
794                  return true;                      // update next time
795          }
796 <        if (parallelism + (int)(nc >> AC_SHIFT) == 0 &&
796 >        if ((!shutdown || !tryTerminate(false)) &&
797 >            (int)c != 0 && parallelism + (int)(nc >> AC_SHIFT) == 0 &&
798              blockedCount == 0 && quiescerCount == 0)
799              idleAwaitWork(w, nc, c, v);           // quiescent
800          for (boolean rescanned = false;;) {
# Line 864 | Line 864 | public class ForkJoinPool extends Abstra
864                  w.parked = false;
865                  if (w.eventCount != v)
866                      break;
867 <                else if (System.nanoTime() - startTime < SHRINK_RATE)
867 >                else if (System.nanoTime() - startTime <
868 >                         SHRINK_RATE - (SHRINK_RATE / 10)) // timing slop
869                      Thread.interrupted();          // spurious wakeup
870                  else if (UNSAFE.compareAndSwapLong(this, ctlOffset,
871                                                     currentCtl, prevCtl)) {
# Line 946 | Line 947 | public class ForkJoinPool extends Abstra
947              int pc = parallelism;
948              do {
949                  ForkJoinWorkerThread[] ws; ForkJoinWorkerThread w;
950 <                int e, ac, tc, rc, i;
950 >                int e, ac, tc, i;
951                  long c = ctl;
952                  int u = (int)(c >>> 32);
953                  if ((e = (int)c) < 0) {
# Line 986 | Line 987 | public class ForkJoinPool extends Abstra
987      }
988  
989      /**
990 <     * Decrements blockedCount and increments active count
990 >     * Decrements blockedCount and increments active count.
991       */
992      private void postBlock() {
993          long c;
994          do {} while (!UNSAFE.compareAndSwapLong(this, ctlOffset,  // no mask
995                                                  c = ctl, c + AC_UNIT));
996          int b;
997 <        do {} while(!UNSAFE.compareAndSwapInt(this, blockedCountOffset,
998 <                                              b = blockedCount, b - 1));
997 >        do {} while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset,
998 >                                               b = blockedCount, b - 1));
999      }
1000  
1001      /**
# Line 1004 | Line 1005 | public class ForkJoinPool extends Abstra
1005       * @param joinMe the task
1006       */
1007      final void tryAwaitJoin(ForkJoinTask<?> joinMe) {
1007        int s;
1008          Thread.interrupted(); // clear interrupts before checking termination
1009          if (joinMe.status >= 0) {
1010              if (tryPreBlock()) {
# Line 1018 | Line 1018 | public class ForkJoinPool extends Abstra
1018  
1019      /**
1020       * Possibly blocks the given worker waiting for joinMe to
1021 <     * complete or timeout
1021 >     * complete or timeout.
1022       *
1023       * @param joinMe the task
1024       * @param millis the wait time for underlying Object.wait
# Line 1054 | Line 1054 | public class ForkJoinPool extends Abstra
1054      }
1055  
1056      /**
1057 <     * If necessary, compensates for blocker, and blocks
1057 >     * If necessary, compensates for blocker, and blocks.
1058       */
1059      private void awaitBlocker(ManagedBlocker blocker)
1060          throws InterruptedException {
# Line 1146 | Line 1146 | public class ForkJoinPool extends Abstra
1146                          ws[k] = w;
1147                          nextWorkerIndex = k + 1;
1148                          int m = g & SMASK;
1149 <                        g = k > m? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1);
1149 >                        g = (k > m) ? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1);
1150                      }
1151                  } finally {
1152                      scanGuard = g;
# Line 1329 | Line 1329 | public class ForkJoinPool extends Abstra
1329       */
1330      final void addQuiescerCount(int delta) {
1331          int c;
1332 <        do {} while(!UNSAFE.compareAndSwapInt(this, quiescerCountOffset,
1333 <                                              c = quiescerCount, c + delta));
1332 >        do {} while (!UNSAFE.compareAndSwapInt(this, quiescerCountOffset,
1333 >                                               c = quiescerCount, c + delta));
1334      }
1335  
1336      /**
# Line 1355 | Line 1355 | public class ForkJoinPool extends Abstra
1355      final int idlePerActive() {
1356          // Approximate at powers of two for small values, saturate past 4
1357          int p = parallelism;
1358 <        int a = p + (int)(ctl >> AC_SHIFT);
1359 <        return (a > (p >>>= 1) ? 0 :
1360 <                a > (p >>>= 1) ? 1 :
1361 <                a > (p >>>= 1) ? 2 :
1362 <                a > (p >>>= 1) ? 4 :
1363 <                8);
1358 >        int a = p + (int)(ctl >> AC_SHIFT);
1359 >        return (a > (p >>>= 1) ? 0 :
1360 >                a > (p >>>= 1) ? 1 :
1361 >                a > (p >>>= 1) ? 2 :
1362 >                a > (p >>>= 1) ? 4 :
1363 >                8);
1364      }
1365  
1366      // Exported methods
# Line 1683 | Line 1683 | public class ForkJoinPool extends Abstra
1683       */
1684      public int getRunningThreadCount() {
1685          int r = parallelism + (int)(ctl >> AC_SHIFT);
1686 <        return r <= 0? 0 : r; // suppress momentarily negative values
1686 >        return (r <= 0) ? 0 : r; // suppress momentarily negative values
1687      }
1688  
1689      /**
# Line 1695 | Line 1695 | public class ForkJoinPool extends Abstra
1695       */
1696      public int getActiveThreadCount() {
1697          int r = parallelism + (int)(ctl >> AC_SHIFT) + blockedCount;
1698 <        return r <= 0? 0 : r; // suppress momentarily negative values
1698 >        return (r <= 0) ? 0 : r; // suppress momentarily negative values
1699      }
1700  
1701      /**
# Line 1850 | Line 1850 | public class ForkJoinPool extends Abstra
1850          int ac = rc + blockedCount;
1851          String level;
1852          if ((c & STOP_BIT) != 0)
1853 <            level = (tc == 0)? "Terminated" : "Terminating";
1853 >            level = (tc == 0) ? "Terminated" : "Terminating";
1854          else
1855 <            level = shutdown? "Shutting down" : "Running";
1855 >            level = shutdown ? "Shutting down" : "Running";
1856          return super.toString() +
1857              "[" + level +
1858              ", parallelism = " + pc +
# Line 2115 | Line 2115 | public class ForkJoinPool extends Abstra
2115          modifyThreadPermission = new RuntimePermission("modifyThread");
2116          defaultForkJoinWorkerThreadFactory =
2117              new DefaultForkJoinWorkerThreadFactory();
2118        int s;
2118          try {
2119              UNSAFE = getUnsafe();
2120 <            Class k = ForkJoinPool.class;
2120 >            Class<?> k = ForkJoinPool.class;
2121              ctlOffset = UNSAFE.objectFieldOffset
2122                  (k.getDeclaredField("ctl"));
2123              stealCountOffset = UNSAFE.objectFieldOffset
# Line 2131 | Line 2130 | public class ForkJoinPool extends Abstra
2130                  (k.getDeclaredField("scanGuard"));
2131              nextWorkerNumberOffset = UNSAFE.objectFieldOffset
2132                  (k.getDeclaredField("nextWorkerNumber"));
2134            Class a = ForkJoinTask[].class;
2135            ABASE = UNSAFE.arrayBaseOffset(a);
2136            s = UNSAFE.arrayIndexScale(a);
2133          } catch (Exception e) {
2134              throw new Error(e);
2135          }
2136 +        Class<?> a = ForkJoinTask[].class;
2137 +        ABASE = UNSAFE.arrayBaseOffset(a);
2138 +        int s = UNSAFE.arrayIndexScale(a);
2139          if ((s & (s-1)) != 0)
2140              throw new Error("data type scale not a power of two");
2141          ASHIFT = 31 - Integer.numberOfLeadingZeros(s);

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines