ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.94 by dl, Tue Mar 1 10:59:04 2011 UTC vs.
Revision 1.102 by jsr166, Thu Apr 14 01:17:58 2011 UTC

# Line 1 | Line 1
1   /*
2   * Written by Doug Lea with assistance from members of JCP JSR-166
3   * Expert Group and released to the public domain, as explained at
4 < * http://creativecommons.org/licenses/publicdomain
4 > * http://creativecommons.org/publicdomain/zero/1.0/
5   */
6  
7   package jsr166y;
# Line 151 | Line 151 | public class ForkJoinPool extends Abstra
151       * Updates tend not to contend with each other except during
152       * bursts while submitted tasks begin or end.  In some cases when
153       * they do contend, threads can instead do something else
154 <     * (usually, scan for tesks) until contention subsides.
154 >     * (usually, scan for tasks) until contention subsides.
155       *
156       * To enable packing, we restrict maximum parallelism to (1<<15)-1
157       * (which is far in excess of normal operating range) to allow
# Line 195 | Line 195 | public class ForkJoinPool extends Abstra
195       * shutdown schemes.
196       *
197       * Wait Queuing. Unlike HPC work-stealing frameworks, we cannot
198 <     * let workers spin indefinitely scanning for tasks when none are
199 <     * can be immediately found, and we cannot start/resume workers
200 <     * unless there appear to be tasks available.  On the other hand,
201 <     * we must quickly prod them into action when new tasks are
202 <     * submitted or generated.  We park/unpark workers after placing
203 <     * in an event wait queue when they cannot find work. This "queue"
204 <     * is actually a simple Treiber stack, headed by the "id" field of
205 <     * ctl, plus a 15bit counter value to both wake up waiters (by
206 <     * advancing their count) and avoid ABA effects. Successors are
207 <     * held in worker field "nextWait".  Queuing deals with several
208 <     * intrinsic races, mainly that a task-producing thread can miss
209 <     * seeing (and signalling) another thread that gave up looking for
210 <     * work but has not yet entered the wait queue. We solve this by
211 <     * requiring a full sweep of all workers both before (in scan())
212 <     * and after (in awaitWork()) a newly waiting worker is added to
213 <     * the wait queue. During a rescan, the worker might release some
214 <     * other queued worker rather than itself, which has the same net
215 <     * effect.
198 >     * let workers spin indefinitely scanning for tasks when none can
199 >     * be found immediately, and we cannot start/resume workers unless
200 >     * there appear to be tasks available.  On the other hand, we must
201 >     * quickly prod them into action when new tasks are submitted or
202 >     * generated.  We park/unpark workers after placing in an event
203 >     * wait queue when they cannot find work. This "queue" is actually
204 >     * a simple Treiber stack, headed by the "id" field of ctl, plus a
205 >     * 15bit counter value to both wake up waiters (by advancing their
206 >     * count) and avoid ABA effects. Successors are held in worker
207 >     * field "nextWait".  Queuing deals with several intrinsic races,
208 >     * mainly that a task-producing thread can miss seeing (and
209 >     * signalling) another thread that gave up looking for work but
210 >     * has not yet entered the wait queue. We solve this by requiring
211 >     * a full sweep of all workers both before (in scan()) and after
212 >     * (in tryAwaitWork()) a newly waiting worker is added to the wait
213 >     * queue. During a rescan, the worker might release some other
214 >     * queued worker rather than itself, which has the same net
215 >     * effect. Because enqueued workers may actually be rescanning
216 >     * rather than waiting, we set and clear the "parked" field of
217 >     * ForkJoinWorkerThread to reduce unnecessary calls to unpark.
218 >     * (Use of the parked field requires a secondary recheck to avoid
219 >     * missed signals.)
220       *
221       * Signalling.  We create or wake up workers only when there
222       * appears to be at least one task they might be able to find and
# Line 229 | Line 233 | public class ForkJoinPool extends Abstra
233       * Trimming workers. To release resources after periods of lack of
234       * use, a worker starting to wait when the pool is quiescent will
235       * time out and terminate if the pool has remained quiescent for
236 <     * SHRINK_RATE nanosecs.
236 >     * SHRINK_RATE nanosecs. This will slowly propagate, eventually
237 >     * terminating all workers after long periods of non-use.
238       *
239       * Submissions. External submissions are maintained in an
240       * array-based queue that is structured identically to
241 <     * ForkJoinWorkerThread queues (which see) except for the use of
242 <     * submissionLock in method addSubmission. Unlike worker queues,
243 <     * multiple external threads can add new submissions.
241 >     * ForkJoinWorkerThread queues except for the use of
242 >     * submissionLock in method addSubmission. Unlike the case for
243 >     * worker queues, multiple external threads can add new
244 >     * submissions, so adding requires a lock.
245       *
246       * Compensation. Beyond work-stealing support and lifecycle
247       * control, the main responsibility of this framework is to take
# Line 272 | Line 278 | public class ForkJoinPool extends Abstra
278       * if blocking would leave less than one active (non-waiting,
279       * non-blocked) worker. Additionally, to avoid some false alarms
280       * due to GC, lagging counters, system activity, etc, compensated
281 <     * blocking for joins is only attempted after a number of rechecks
282 <     * proportional to the current apparent deficit (where retries are
283 <     * interspersed with Thread.yield, for good citizenship).  The
284 <     * variable blockedCount, incremented before blocking and
285 <     * decremented after, is sometimes needed to distinguish cases of
286 <     * waiting for work vs blocking on joins or other managed sync,
287 <     * but both the cases are equivalent for most pool control, so we
288 <     * can update non-atomically. (Additionally, contention on
283 <     * blockedCount alleviates some contention on ctl).
281 >     * blocking for joins is only attempted after rechecks stabilize
282 >     * (retries are interspersed with Thread.yield, for good
283 >     * citizenship).  The variable blockedCount, incremented before
284 >     * blocking and decremented after, is sometimes needed to
285 >     * distinguish cases of waiting for work vs blocking on joins or
286 >     * other managed sync. Both cases are equivalent for most pool
287 >     * control, so we can update non-atomically. (Additionally,
288 >     * contention on blockedCount alleviates some contention on ctl).
289       *
290       * Shutdown and Termination. A call to shutdownNow atomically sets
291       * the ctl stop bit and then (non-atomically) sets each workers
# Line 522 | Line 527 | public class ForkJoinPool extends Abstra
527  
528      /**
529       * Index (mod submission queue length) of next element to take
530 <     * from submission queue.
530 >     * from submission queue. Usage is identical to that for
531 >     * per-worker queues -- see ForkJoinWorkerThread internal
532 >     * documentation.
533       */
534      volatile int queueBase;
535  
536      /**
537       * Index (mod submission queue length) of next element to add
538 <     * in submission queue.
538 >     * in submission queue. Usage is identical to that for
539 >     * per-worker queues -- see ForkJoinWorkerThread internal
540 >     * documentation.
541       */
542      int queueTop;
543  
# Line 566 | Line 575 | public class ForkJoinPool extends Abstra
575      private int nextWorkerIndex;
576  
577      /**
578 <     * SeqLock and index masking for for updates to workers array.
579 <     * Locked when SG_UNIT is set. Unlocking clears bit by adding
578 >     * SeqLock and index masking for updates to workers array.  Locked
579 >     * when SG_UNIT is set. Unlocking clears bit by adding
580       * SG_UNIT. Staleness of read-only operations can be checked by
581       * comparing scanGuard to value before the reads. The low 16 bits
582       * (i.e, anding with SMASK) hold (the smallest power of two
# Line 705 | Line 714 | public class ForkJoinPool extends Abstra
714       */
715      private boolean scan(ForkJoinWorkerThread w, int a) {
716          int g = scanGuard; // mask 0 avoids useless scans if only one active
717 <        int m = parallelism == 1 - a? 0 : g & SMASK;
717 >        int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;
718          ForkJoinWorkerThread[] ws = workers;
719          if (ws == null || ws.length <= m)         // staleness check
720              return false;
# Line 753 | Line 762 | public class ForkJoinPool extends Abstra
762  
763      /**
764       * Tries to enqueue worker w in wait queue and await change in
765 <     * worker's eventCount.  If the pool is quiescent, possibly
766 <     * terminates worker upon exit.  Otherwise, before blocking,
767 <     * rescans queues to avoid missed signals.  Upon finding work,
768 <     * releases at least one worker (which may be the current
769 <     * worker). Rescans restart upon detected staleness or failure to
770 <     * release due to contention.
765 >     * worker's eventCount.  If the pool is quiescent and there is
766 >     * more than one worker, possibly terminates worker upon exit.
767 >     * Otherwise, before blocking, rescans queues to avoid missed
768 >     * signals.  Upon finding work, releases at least one worker
769 >     * (which may be the current worker). Rescans restart upon
770 >     * detected staleness or failure to release due to
771 >     * contention. Note the unusual conventions about Thread.interrupt
772 >     * here and elsewhere: Because interrupts are used solely to alert
773 >     * threads to check termination, which is checked here anyway, we
774 >     * clear status (using Thread.interrupted) before any call to
775 >     * park, so that park does not immediately return due to status
776 >     * being set via some other unrelated call to interrupt in user
777 >     * code.
778       *
779       * @param w the calling worker
780       * @param c the ctl value on entry
# Line 779 | Line 795 | public class ForkJoinPool extends Abstra
795              else if (w.eventCount != v)
796                  return true;                      // update next time
797          }
798 <        if (parallelism + (int)(nc >> AC_SHIFT) == 0 &&
798 >        if ((!shutdown || !tryTerminate(false)) &&
799 >            (int)c != 0 && parallelism + (int)(nc >> AC_SHIFT) == 0 &&
800              blockedCount == 0 && quiescerCount == 0)
801              idleAwaitWork(w, nc, c, v);           // quiescent
802          for (boolean rescanned = false;;) {
# Line 849 | Line 866 | public class ForkJoinPool extends Abstra
866                  w.parked = false;
867                  if (w.eventCount != v)
868                      break;
869 <                else if (System.nanoTime() - startTime < SHRINK_RATE)
869 >                else if (System.nanoTime() - startTime <
870 >                         SHRINK_RATE - (SHRINK_RATE / 10)) // timing slop
871                      Thread.interrupted();          // spurious wakeup
872                  else if (UNSAFE.compareAndSwapLong(this, ctlOffset,
873                                                     currentCtl, prevCtl)) {
# Line 891 | Line 909 | public class ForkJoinPool extends Abstra
909  
910      /**
911       * Creates or doubles submissionQueue array.
912 <     * Basically identical to ForkJoinWorkerThread version
912 >     * Basically identical to ForkJoinWorkerThread version.
913       */
914      private void growSubmissionQueue() {
915          ForkJoinTask<?>[] oldQ = submissionQueue;
# Line 978 | Line 996 | public class ForkJoinPool extends Abstra
996          do {} while (!UNSAFE.compareAndSwapLong(this, ctlOffset,  // no mask
997                                                  c = ctl, c + AC_UNIT));
998          int b;
999 <        do {} while(!UNSAFE.compareAndSwapInt(this, blockedCountOffset,
1000 <                                              b = blockedCount, b - 1));
999 >        do {} while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset,
1000 >                                               b = blockedCount, b - 1));
1001      }
1002  
1003      /**
# Line 996 | Line 1014 | public class ForkJoinPool extends Abstra
1014                  joinMe.tryAwaitDone(0L);
1015                  postBlock();
1016              }
1017 <            if ((ctl & STOP_BIT) != 0L)
1017 >            else if ((ctl & STOP_BIT) != 0L)
1018                  joinMe.cancelIgnoringExceptions();
1019          }
1020      }
# Line 1131 | Line 1149 | public class ForkJoinPool extends Abstra
1149                          ws[k] = w;
1150                          nextWorkerIndex = k + 1;
1151                          int m = g & SMASK;
1152 <                        g = k >= m? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1);
1152 >                        g = (k > m) ? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1);
1153                      }
1154                  } finally {
1155                      scanGuard = g;
# Line 1211 | Line 1229 | public class ForkJoinPool extends Abstra
1229                  if ((int)(c >> AC_SHIFT) != -parallelism)
1230                      return false;
1231                  if (!shutdown || blockedCount != 0 || quiescerCount != 0 ||
1232 <                    queueTop - queueBase > 0) {
1232 >                    queueBase != queueTop) {
1233                      if (ctl == c) // staleness check
1234                          return false;
1235                      continue;
# Line 1220 | Line 1238 | public class ForkJoinPool extends Abstra
1238              if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, c | STOP_BIT))
1239                  startTerminating();
1240          }
1241 <        if ((short)(c >>> TC_SHIFT) == -parallelism) {
1242 <            submissionLock.lock();
1243 <            termination.signalAll();
1244 <            submissionLock.unlock();
1241 >        if ((short)(c >>> TC_SHIFT) == -parallelism) { // signal when 0 workers
1242 >            final ReentrantLock lock = this.submissionLock;
1243 >            lock.lock();
1244 >            try {
1245 >                termination.signalAll();
1246 >            } finally {
1247 >                lock.unlock();
1248 >            }
1249          }
1250          return true;
1251      }
1252  
1253      /**
1254       * Runs up to three passes through workers: (0) Setting
1255 <     * termination status for each worker, followed by wakeups up
1256 <     * queued workers (1) helping cancel tasks (2) interrupting
1255 >     * termination status for each worker, followed by wakeups up to
1256 >     * queued workers; (1) helping cancel tasks; (2) interrupting
1257       * lagging threads (likely in external tasks, but possibly also
1258       * blocked in joins).  Each pass repeats previous steps because of
1259       * potential lagging thread creation.
# Line 1277 | Line 1299 | public class ForkJoinPool extends Abstra
1299  
1300      /**
1301       * Tries to set the termination status of waiting workers, and
1302 <     * then wake them up (after which they will terminate).
1302 >     * then wakes them up (after which they will terminate).
1303       */
1304      private void terminateWaiters() {
1305          ForkJoinWorkerThread[] ws = workers;
# Line 1310 | Line 1332 | public class ForkJoinPool extends Abstra
1332       */
1333      final void addQuiescerCount(int delta) {
1334          int c;
1335 <        do {} while(!UNSAFE.compareAndSwapInt(this, quiescerCountOffset,
1336 <                                              c = quiescerCount, c + delta));
1335 >        do {} while (!UNSAFE.compareAndSwapInt(this, quiescerCountOffset,
1336 >                                               c = quiescerCount, c + delta));
1337      }
1338  
1339      /**
# Line 1336 | Line 1358 | public class ForkJoinPool extends Abstra
1358      final int idlePerActive() {
1359          // Approximate at powers of two for small values, saturate past 4
1360          int p = parallelism;
1361 <        int a = p + (int)(ctl >> AC_SHIFT);
1362 <        return (a > (p >>>= 1) ? 0 :
1363 <                a > (p >>>= 1) ? 1 :
1364 <                a > (p >>>= 1) ? 2 :
1365 <                a > (p >>>= 1) ? 4 :
1366 <                8);
1361 >        int a = p + (int)(ctl >> AC_SHIFT);
1362 >        return (a > (p >>>= 1) ? 0 :
1363 >                a > (p >>>= 1) ? 1 :
1364 >                a > (p >>>= 1) ? 2 :
1365 >                a > (p >>>= 1) ? 4 :
1366 >                8);
1367      }
1368  
1369      // Exported methods
# Line 1664 | Line 1686 | public class ForkJoinPool extends Abstra
1686       */
1687      public int getRunningThreadCount() {
1688          int r = parallelism + (int)(ctl >> AC_SHIFT);
1689 <        return r <= 0? 0 : r; // suppress momentarily negative values
1689 >        return (r <= 0) ? 0 : r; // suppress momentarily negative values
1690      }
1691  
1692      /**
# Line 1676 | Line 1698 | public class ForkJoinPool extends Abstra
1698       */
1699      public int getActiveThreadCount() {
1700          int r = parallelism + (int)(ctl >> AC_SHIFT) + blockedCount;
1701 <        return r <= 0? 0 : r; // suppress momentarily negative values
1701 >        return (r <= 0) ? 0 : r; // suppress momentarily negative values
1702      }
1703  
1704      /**
# Line 1831 | Line 1853 | public class ForkJoinPool extends Abstra
1853          int ac = rc + blockedCount;
1854          String level;
1855          if ((c & STOP_BIT) != 0)
1856 <            level = (tc == 0)? "Terminated" : "Terminating";
1856 >            level = (tc == 0) ? "Terminated" : "Terminating";
1857          else
1858 <            level = shutdown? "Shutting down" : "Running";
1858 >            level = shutdown ? "Shutting down" : "Running";
1859          return super.toString() +
1860              "[" + level +
1861              ", parallelism = " + pc +

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines