ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.94 by dl, Tue Mar 1 10:59:04 2011 UTC vs.
Revision 1.99 by dl, Wed Mar 23 11:27:43 2011 UTC

# Line 1 | Line 1
1   /*
2   * Written by Doug Lea with assistance from members of JCP JSR-166
3   * Expert Group and released to the public domain, as explained at
4 < * http://creativecommons.org/licenses/publicdomain
4 > * http://creativecommons.org/publicdomain/zero/1.0/
5   */
6  
7   package jsr166y;
# Line 151 | Line 151 | public class ForkJoinPool extends Abstra
151       * Updates tend not to contend with each other except during
152       * bursts while submitted tasks begin or end.  In some cases when
153       * they do contend, threads can instead do something else
154 <     * (usually, scan for tesks) until contention subsides.
154 >     * (usually, scan for tasks) until contention subsides.
155       *
156       * To enable packing, we restrict maximum parallelism to (1<<15)-1
157       * (which is far in excess of normal operating range) to allow
# Line 195 | Line 195 | public class ForkJoinPool extends Abstra
195       * shutdown schemes.
196       *
197       * Wait Queuing. Unlike HPC work-stealing frameworks, we cannot
198 <     * let workers spin indefinitely scanning for tasks when none are
199 <     * can be immediately found, and we cannot start/resume workers
200 <     * unless there appear to be tasks available.  On the other hand,
201 <     * we must quickly prod them into action when new tasks are
202 <     * submitted or generated.  We park/unpark workers after placing
203 <     * in an event wait queue when they cannot find work. This "queue"
204 <     * is actually a simple Treiber stack, headed by the "id" field of
205 <     * ctl, plus a 15bit counter value to both wake up waiters (by
206 <     * advancing their count) and avoid ABA effects. Successors are
207 <     * held in worker field "nextWait".  Queuing deals with several
208 <     * intrinsic races, mainly that a task-producing thread can miss
209 <     * seeing (and signalling) another thread that gave up looking for
210 <     * work but has not yet entered the wait queue. We solve this by
211 <     * requiring a full sweep of all workers both before (in scan())
212 <     * and after (in awaitWork()) a newly waiting worker is added to
213 <     * the wait queue. During a rescan, the worker might release some
214 <     * other queued worker rather than itself, which has the same net
215 <     * effect.
198 >     * let workers spin indefinitely scanning for tasks when none can
199 >     * be found immediately, and we cannot start/resume workers unless
200 >     * there appear to be tasks available.  On the other hand, we must
201 >     * quickly prod them into action when new tasks are submitted or
202 >     * generated.  We park/unpark workers after placing in an event
203 >     * wait queue when they cannot find work. This "queue" is actually
204 >     * a simple Treiber stack, headed by the "id" field of ctl, plus a
205 >     * 15bit counter value to both wake up waiters (by advancing their
206 >     * count) and avoid ABA effects. Successors are held in worker
207 >     * field "nextWait".  Queuing deals with several intrinsic races,
208 >     * mainly that a task-producing thread can miss seeing (and
209 >     * signalling) another thread that gave up looking for work but
210 >     * has not yet entered the wait queue. We solve this by requiring
211 >     * a full sweep of all workers both before (in scan()) and after
212 >     * (in tryAwaitWork()) a newly waiting worker is added to the wait
213 >     * queue. During a rescan, the worker might release some other
214 >     * queued worker rather than itself, which has the same net
215 >     * effect. Because enqueued workers may actually be rescanning
216 >     * rather than waiting, we set and clear the "parked" field of
217 >     * ForkJoinWorkerThread to reduce unnecessary calls to unpark.
218 >     * (Use of the parked field requires a secondary recheck to avoid
219 >     * missed signals.)
220       *
221       * Signalling.  We create or wake up workers only when there
222       * appears to be at least one task they might be able to find and
# Line 229 | Line 233 | public class ForkJoinPool extends Abstra
233       * Trimming workers. To release resources after periods of lack of
234       * use, a worker starting to wait when the pool is quiescent will
235       * time out and terminate if the pool has remained quiescent for
236 <     * SHRINK_RATE nanosecs.
236 >     * SHRINK_RATE nanosecs. This will slowly propagate, eventually
237 >     * terminating all workers after long periods of non-use.
238       *
239       * Submissions. External submissions are maintained in an
240       * array-based queue that is structured identically to
241 <     * ForkJoinWorkerThread queues (which see) except for the use of
242 <     * submissionLock in method addSubmission. Unlike worker queues,
243 <     * multiple external threads can add new submissions.
241 >     * ForkJoinWorkerThread queues except for the use of
242 >     * submissionLock in method addSubmission. Unlike the case for
243 >     * worker queues, multiple external threads can add new
244 >     * submissions, so adding requires a lock.
245       *
246       * Compensation. Beyond work-stealing support and lifecycle
247       * control, the main responsibility of this framework is to take
# Line 272 | Line 278 | public class ForkJoinPool extends Abstra
278       * if blocking would leave less than one active (non-waiting,
279       * non-blocked) worker. Additionally, to avoid some false alarms
280       * due to GC, lagging counters, system activity, etc, compensated
281 <     * blocking for joins is only attempted after a number of rechecks
282 <     * proportional to the current apparent deficit (where retries are
283 <     * interspersed with Thread.yield, for good citizenship).  The
284 <     * variable blockedCount, incremented before blocking and
285 <     * decremented after, is sometimes needed to distinguish cases of
286 <     * waiting for work vs blocking on joins or other managed sync,
287 <     * but both the cases are equivalent for most pool control, so we
288 <     * can update non-atomically. (Additionally, contention on
283 <     * blockedCount alleviates some contention on ctl).
281 >     * blocking for joins is only attempted after rechecks stabilize
282 >     * (retries are interspersed with Thread.yield, for good
283 >     * citizenship).  The variable blockedCount, incremented before
284 >     * blocking and decremented after, is sometimes needed to
285 >     * distinguish cases of waiting for work vs blocking on joins or
286 >     * other managed sync. Both cases are equivalent for most pool
287 >     * control, so we can update non-atomically. (Additionally,
288 >     * contention on blockedCount alleviates some contention on ctl).
289       *
290       * Shutdown and Termination. A call to shutdownNow atomically sets
291       * the ctl stop bit and then (non-atomically) sets each workers
# Line 522 | Line 527 | public class ForkJoinPool extends Abstra
527  
528      /**
529       * Index (mod submission queue length) of next element to take
530 <     * from submission queue.
530 >     * from submission queue. Usage is identical to that for
531 >     * per-worker queues -- see ForkJoinWorkerThread internal
532 >     * documentation.
533       */
534      volatile int queueBase;
535  
536      /**
537       * Index (mod submission queue length) of next element to add
538 <     * in submission queue.
538 >     * in submission queue. Usage is identical to that for
539 >     * per-worker queues -- see ForkJoinWorkerThread internal
540 >     * documentation.
541       */
542      int queueTop;
543  
# Line 566 | Line 575 | public class ForkJoinPool extends Abstra
575      private int nextWorkerIndex;
576  
577      /**
578 <     * SeqLock and index masking for for updates to workers array.
579 <     * Locked when SG_UNIT is set. Unlocking clears bit by adding
578 >     * SeqLock and index masking for updates to workers array.  Locked
579 >     * when SG_UNIT is set. Unlocking clears bit by adding
580       * SG_UNIT. Staleness of read-only operations can be checked by
581       * comparing scanGuard to value before the reads. The low 16 bits
582       * (i.e, anding with SMASK) hold (the smallest power of two
# Line 705 | Line 714 | public class ForkJoinPool extends Abstra
714       */
715      private boolean scan(ForkJoinWorkerThread w, int a) {
716          int g = scanGuard; // mask 0 avoids useless scans if only one active
717 <        int m = parallelism == 1 - a? 0 : g & SMASK;
717 >        int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;
718          ForkJoinWorkerThread[] ws = workers;
719          if (ws == null || ws.length <= m)         // staleness check
720              return false;
# Line 758 | Line 767 | public class ForkJoinPool extends Abstra
767       * rescans queues to avoid missed signals.  Upon finding work,
768       * releases at least one worker (which may be the current
769       * worker). Rescans restart upon detected staleness or failure to
770 <     * release due to contention.
770 >     * release due to contention. Note the unusual conventions about
771 >     * Thread.interrupt here and elsewhere: Because interrupts are
772 >     * used solely to alert threads to check termination, which is
773 >     * checked here anyway, we clear status (using Thread.interrupted)
774 >     * before any call to park, so that park does not immediately
775 >     * return due to status being set via some other unrelated call to
776 >     * interrupt in user code.
777       *
778       * @param w the calling worker
779       * @param c the ctl value on entry
# Line 849 | Line 864 | public class ForkJoinPool extends Abstra
864                  w.parked = false;
865                  if (w.eventCount != v)
866                      break;
867 <                else if (System.nanoTime() - startTime < SHRINK_RATE)
867 >                else if (System.nanoTime() - startTime <
868 >                         SHRINK_RATE - (SHRINK_RATE / 10)) // timing slop
869                      Thread.interrupted();          // spurious wakeup
870                  else if (UNSAFE.compareAndSwapLong(this, ctlOffset,
871                                                     currentCtl, prevCtl)) {
# Line 891 | Line 907 | public class ForkJoinPool extends Abstra
907  
908      /**
909       * Creates or doubles submissionQueue array.
910 <     * Basically identical to ForkJoinWorkerThread version
910 >     * Basically identical to ForkJoinWorkerThread version.
911       */
912      private void growSubmissionQueue() {
913          ForkJoinTask<?>[] oldQ = submissionQueue;
# Line 996 | Line 1012 | public class ForkJoinPool extends Abstra
1012                  joinMe.tryAwaitDone(0L);
1013                  postBlock();
1014              }
1015 <            if ((ctl & STOP_BIT) != 0L)
1015 >            else if ((ctl & STOP_BIT) != 0L)
1016                  joinMe.cancelIgnoringExceptions();
1017          }
1018      }
# Line 1131 | Line 1147 | public class ForkJoinPool extends Abstra
1147                          ws[k] = w;
1148                          nextWorkerIndex = k + 1;
1149                          int m = g & SMASK;
1150 <                        g = k >= m? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1);
1150 >                        g = k > m? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1);
1151                      }
1152                  } finally {
1153                      scanGuard = g;
# Line 1211 | Line 1227 | public class ForkJoinPool extends Abstra
1227                  if ((int)(c >> AC_SHIFT) != -parallelism)
1228                      return false;
1229                  if (!shutdown || blockedCount != 0 || quiescerCount != 0 ||
1230 <                    queueTop - queueBase > 0) {
1230 >                    queueBase != queueTop) {
1231                      if (ctl == c) // staleness check
1232                          return false;
1233                      continue;
# Line 1220 | Line 1236 | public class ForkJoinPool extends Abstra
1236              if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, c | STOP_BIT))
1237                  startTerminating();
1238          }
1239 <        if ((short)(c >>> TC_SHIFT) == -parallelism) {
1240 <            submissionLock.lock();
1241 <            termination.signalAll();
1242 <            submissionLock.unlock();
1239 >        if ((short)(c >>> TC_SHIFT) == -parallelism) { // signal when 0 workers
1240 >            final ReentrantLock lock = this.submissionLock;
1241 >            lock.lock();
1242 >            try {
1243 >                termination.signalAll();
1244 >            } finally {
1245 >                lock.unlock();
1246 >            }
1247          }
1248          return true;
1249      }
1250  
1251      /**
1252       * Runs up to three passes through workers: (0) Setting
1253 <     * termination status for each worker, followed by wakeups up
1254 <     * queued workers (1) helping cancel tasks (2) interrupting
1253 >     * termination status for each worker, followed by wakeups up to
1254 >     * queued workers; (1) helping cancel tasks; (2) interrupting
1255       * lagging threads (likely in external tasks, but possibly also
1256       * blocked in joins).  Each pass repeats previous steps because of
1257       * potential lagging thread creation.
# Line 1277 | Line 1297 | public class ForkJoinPool extends Abstra
1297  
1298      /**
1299       * Tries to set the termination status of waiting workers, and
1300 <     * then wake them up (after which they will terminate).
1300 >     * then wakes them up (after which they will terminate).
1301       */
1302      private void terminateWaiters() {
1303          ForkJoinWorkerThread[] ws = workers;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines