ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.208 by dl, Wed Jul 9 15:30:36 2014 UTC vs.
Revision 1.209 by dl, Thu Jul 10 16:00:59 2014 UTC

# Line 467 | Line 467 | public class ForkJoinPool extends Abstra
467       * relies on the active count bits of "ctl" maintaining consensus
468       * about quiescence. However, external submitters do not take part
469       * in this consensus.  So, tryTerminate sweeps through queues to
470 <     * ensure lack of in-flight submissions before triggering the
471 <     * "STOP" phase of termination.
470 >     * ensure lack of in-flight submissions and workers about to
471 >     * process them before triggering the "STOP" phase of
472 >     * termination.
473       *
474       * Joining Tasks
475       * =============
# Line 1083 | Line 1084 | public class ForkJoinPool extends Abstra
1084                                          U.putOrderedInt(this, QLOCK, 0);
1085                                          return t;
1086                                      }
1087 <                                    qlock = 0;
1087 >                                    U.compareAndSwapInt(this, QLOCK, 1, 0);
1088                                  }
1089                              }
1090                              else if (U.compareAndSwapObject(a, j, t, null)) {
# Line 1533 | Line 1534 | public class ForkJoinPool extends Abstra
1534              w.cancelAll();                            // cancel remaining tasks
1535              U.getAndAddLong(this, STEALCOUNT, w.nsteals); // collect steals
1536          }
1537 <        if (!tryTerminate(false, false) && w != null && w.array != null) {
1537 >        for (;;) {                                    // possibly replace
1538              WorkQueue[] ws; int m, sp;
1539 <            while ((runState & STOP) == 0 &&
1540 <                   (ws = workQueues) != null && (m = ws.length - 1) >= 0) {
1541 <                if ((sp = (int)(c = ctl)) != 0) {     // wake up replacement
1542 <                    if (tryRelease(c, ws[sp & m], AC_UNIT))
1543 <                        break;
1544 <                }
1544 <                else if (ex != null && (c & ADD_WORKER) != 0L) {
1545 <                    tryAddWorker(c);                  // create replacement
1546 <                    break;
1547 <                }
1548 <                else
1539 >            if (tryTerminate(false, false) || w == null || w.array == null ||
1540 >                (runState & STOP) != 0 || (ws = workQueues) == null ||
1541 >                (m = ws.length - 1) < 0)              // already terminating
1542 >                break;
1543 >            if ((sp = (int)(c = ctl)) != 0) {         // wake up replacement
1544 >                if (tryRelease(c, ws[sp & m], AC_UNIT))
1545                      break;
1546              }
1547 +            else if (ex != null && (c & ADD_WORKER) != 0L) {
1548 +                tryAddWorker(c);                      // create replacement
1549 +                break;
1550 +            }
1551 +            else                                      // don't need replacement
1552 +                break;
1553          }
1554          if (ex == null)                               // help clean on way out
1555              ForkJoinTask.helpExpungeStaleExceptions();
# Line 2172 | Line 2174 | public class ForkJoinPool extends Abstra
2174          }
2175          if ((rs & STOP) == 0) {
2176              if (!now) {
2177 <                if ((int)(ctl >> AC_SHIFT) + (config & SMASK) > 0)
2178 <                    return false;
2179 <                WorkQueue[] ws; WorkQueue w;      // check external submissions
2180 <                if ((ws = workQueues) != null) {
2181 <                    for (int i = 0; i < ws.length; ++i) {
2182 <                        if ((w = ws[i]) != null &&
2183 <                            (!w.isEmpty() ||
2184 <                             ((i & 1) != 0 && w.scanState >= 0))) {
2185 <                            signalWork(ws, w);
2186 <                            return false;
2177 >                for (int oldSum = 0, checkSum = 0;;) {
2178 >                    WorkQueue[] ws; WorkQueue w; int m, b;
2179 >                    if ((int)(ctl >> AC_SHIFT) + (config & SMASK) > 0)
2180 >                        return false;             // not quiescent
2181 >                    if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
2182 >                        break;                    // scan for submissions
2183 >                    for (int i = 0; i <= m; ++i) {
2184 >                        if ((w = ws[i]) != null) {
2185 >                            if ((i & 1) == 0)
2186 >                                w.qlock = -1;     // disable external queue
2187 >                            else if (w.scanState >= 0)
2188 >                                return false;     // still active
2189 >                            if ((b = w.base) != w.top )
2190 >                                return false;
2191 >                            checkSum += b;
2192                          }
2193                      }
2194 +                    if (oldSum == (oldSum = checkSum))
2195 +                        break;                    // continue until stable
2196 +                    checkSum = 0;
2197                  }
2188                if ((int)(ctl >> AC_SHIFT) + (config & SMASK) > 0)
2189                    return false;                 // recheck
2198              }
2199              rs = lockRunState();                  // enter STOP phase
2200 <            unlockRunState(rs, (rs & ~RSLOCK) | STOP);
2200 >            int ns = rs & STOP;
2201 >            if (ns != 0 || !now ||
2202 >                (int)(ctl >> AC_SHIFT) + (config & SMASK) <= 0)
2203 >                ns = STOP;                        // recheck under lock
2204 >            unlockRunState(rs, (rs & ~RSLOCK) | ns);
2205 >            if (ns == 0)
2206 >                return false;
2207          }
2208          for (int pass = 0; pass < 3; ++pass) {    // clobber other workers
2209              WorkQueue[] ws; int n;
# Line 2277 | Line 2291 | public class ForkJoinPool extends Abstra
2291                              submitted = true;
2292                          }
2293                      } finally {
2294 <                        q.qlock = 0;
2294 >                        U.compareAndSwapInt(q, QLOCK, 1, 0);
2295                      }
2296                      if (submitted) {
2297                          signalWork(ws, q);
# Line 2291 | Line 2305 | public class ForkJoinPool extends Abstra
2305                  q.hint = r;
2306                  q.config = k | SHARED_QUEUE;
2307                  rs = lockRunState();           // publish index
2308 <                if ((ws = workQueues) != null && k < ws.length && ws[k] == null)
2308 >                if (rs > 0 &&  (ws = workQueues) != null &&
2309 >                    k < ws.length && ws[k] == null)
2310                      ws[k] = q;                 // else terminated
2311                  unlockRunState(rs, rs & ~RSLOCK);
2312              }
# Line 2328 | Line 2343 | public class ForkJoinPool extends Abstra
2343                      signalWork(ws, q);
2344                  return;
2345              }
2346 <            q.qlock = 0;
2346 >            U.compareAndSwapInt(q, QLOCK, 1, 0);
2347          }
2348          externalSubmit(task);
2349      }
# Line 2365 | Line 2380 | public class ForkJoinPool extends Abstra
2380                      U.putOrderedInt(w, QLOCK, 0);
2381                      return true;
2382                  }
2383 <                w.qlock = 0;
2383 >                U.compareAndSwapInt(w, QLOCK, 1, 0);
2384              }
2385          }
2386          return false;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines