ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.91 by dl, Tue Feb 22 00:39:31 2011 UTC vs.
Revision 1.100 by dl, Fri Apr 1 20:20:37 2011 UTC

# Line 1 | Line 1
1   /*
2   * Written by Doug Lea with assistance from members of JCP JSR-166
3   * Expert Group and released to the public domain, as explained at
4 < * http://creativecommons.org/licenses/publicdomain
4 > * http://creativecommons.org/publicdomain/zero/1.0/
5   */
6  
7   package jsr166y;
# Line 151 | Line 151 | public class ForkJoinPool extends Abstra
151       * Updates tend not to contend with each other except during
152       * bursts while submitted tasks begin or end.  In some cases when
153       * they do contend, threads can instead do something else
154 <     * (usually, scan for tesks) until contention subsides.
154 >     * (usually, scan for tasks) until contention subsides.
155       *
156       * To enable packing, we restrict maximum parallelism to (1<<15)-1
157       * (which is far in excess of normal operating range) to allow
# Line 195 | Line 195 | public class ForkJoinPool extends Abstra
195       * shutdown schemes.
196       *
197       * Wait Queuing. Unlike HPC work-stealing frameworks, we cannot
198 <     * let workers spin indefinitely scanning for tasks when none are
199 <     * can be immediately found, and we cannot start/resume workers
200 <     * unless there appear to be tasks available.  On the other hand,
201 <     * we must quickly prod them into action when new tasks are
202 <     * submitted or generated.  We park/unpark workers after placing
203 <     * in an event wait queue when they cannot find work. This "queue"
204 <     * is actually a simple Treiber stack, headed by the "id" field of
205 <     * ctl, plus a 15bit counter value to both wake up waiters (by
206 <     * advancing their count) and avoid ABA effects. Successors are
207 <     * held in worker field "nextWait".  Queuing deals with several
208 <     * intrinsic races, mainly that a task-producing thread can miss
209 <     * seeing (and signalling) another thread that gave up looking for
210 <     * work but has not yet entered the wait queue. We solve this by
211 <     * requiring a full sweep of all workers both before (in scan())
212 <     * and after (in awaitWork()) a newly waiting worker is added to
213 <     * the wait queue. During a rescan, the worker might release some
214 <     * other queued worker rather than itself, which has the same net
215 <     * effect.
198 >     * let workers spin indefinitely scanning for tasks when none can
199 >     * be found immediately, and we cannot start/resume workers unless
200 >     * there appear to be tasks available.  On the other hand, we must
201 >     * quickly prod them into action when new tasks are submitted or
202 >     * generated.  We park/unpark workers after placing in an event
203 >     * wait queue when they cannot find work. This "queue" is actually
204 >     * a simple Treiber stack, headed by the "id" field of ctl, plus a
205 >     * 15bit counter value to both wake up waiters (by advancing their
206 >     * count) and avoid ABA effects. Successors are held in worker
207 >     * field "nextWait".  Queuing deals with several intrinsic races,
208 >     * mainly that a task-producing thread can miss seeing (and
209 >     * signalling) another thread that gave up looking for work but
210 >     * has not yet entered the wait queue. We solve this by requiring
211 >     * a full sweep of all workers both before (in scan()) and after
212 >     * (in tryAwaitWork()) a newly waiting worker is added to the wait
213 >     * queue. During a rescan, the worker might release some other
214 >     * queued worker rather than itself, which has the same net
215 >     * effect. Because enqueued workers may actually be rescanning
216 >     * rather than waiting, we set and clear the "parked" field of
217 >     * ForkJoinWorkerThread to reduce unnecessary calls to unpark.
218 >     * (Use of the parked field requires a secondary recheck to avoid
219 >     * missed signals.)
220       *
221       * Signalling.  We create or wake up workers only when there
222       * appears to be at least one task they might be able to find and
# Line 229 | Line 233 | public class ForkJoinPool extends Abstra
233       * Trimming workers. To release resources after periods of lack of
234       * use, a worker starting to wait when the pool is quiescent will
235       * time out and terminate if the pool has remained quiescent for
236 <     * SHRINK_RATE nanosecs.
236 >     * SHRINK_RATE nanosecs. This will slowly propagate, eventually
237 >     * terminating all workers after long periods of non-use.
238       *
239       * Submissions. External submissions are maintained in an
240       * array-based queue that is structured identically to
241 <     * ForkJoinWorkerThread queues (which see) except for the use of
242 <     * submissionLock in method addSubmission. Unlike worker queues,
243 <     * multiple external threads can add new submissions.
241 >     * ForkJoinWorkerThread queues except for the use of
242 >     * submissionLock in method addSubmission. Unlike the case for
243 >     * worker queues, multiple external threads can add new
244 >     * submissions, so adding requires a lock.
245       *
246       * Compensation. Beyond work-stealing support and lifecycle
247       * control, the main responsibility of this framework is to take
# Line 272 | Line 278 | public class ForkJoinPool extends Abstra
278       * if blocking would leave less than one active (non-waiting,
279       * non-blocked) worker. Additionally, to avoid some false alarms
280       * due to GC, lagging counters, system activity, etc, compensated
281 <     * blocking for joins is only attempted after a number of rechecks
282 <     * proportional to the current apparent deficit (where retries are
283 <     * interspersed with Thread.yield, for good citizenship).  The
284 <     * variable blockedCount, incremented before blocking and
285 <     * decremented after, is sometimes needed to distinguish cases of
286 <     * waiting for work vs blocking on joins or other managed sync,
287 <     * but both the cases are equivalent for most pool control, so we
288 <     * can update non-atomically. (Additionally, contention on
283 <     * blockedCount alleviates some contention on ctl).
281 >     * blocking for joins is only attempted after rechecks stabilize
282 >     * (retries are interspersed with Thread.yield, for good
283 >     * citizenship).  The variable blockedCount, incremented before
284 >     * blocking and decremented after, is sometimes needed to
285 >     * distinguish cases of waiting for work vs blocking on joins or
286 >     * other managed sync. Both cases are equivalent for most pool
287 >     * control, so we can update non-atomically. (Additionally,
288 >     * contention on blockedCount alleviates some contention on ctl).
289       *
290       * Shutdown and Termination. A call to shutdownNow atomically sets
291       * the ctl stop bit and then (non-atomically) sets each workers
# Line 478 | Line 483 | public class ForkJoinPool extends Abstra
483       * negative, there is at least one waiting worker, and when e is
484       * negative, the pool is terminating.  To deal with these possibly
485       * negative fields, we use casts in and out of "short" and/or
486 <     * signed shifts to maintain signedness.  Note: AC_SHIFT is
482 <     * redundantly declared in ForkJoinWorkerThread in order to
483 <     * integrate a surplus-threads check.
486 >     * signed shifts to maintain signedness.
487       */
488      volatile long ctl;
489  
# Line 524 | Line 527 | public class ForkJoinPool extends Abstra
527  
528      /**
529       * Index (mod submission queue length) of next element to take
530 <     * from submission queue.
530 >     * from submission queue. Usage is identical to that for
531 >     * per-worker queues -- see ForkJoinWorkerThread internal
532 >     * documentation.
533       */
534      volatile int queueBase;
535  
536      /**
537       * Index (mod submission queue length) of next element to add
538 <     * in submission queue.
538 >     * in submission queue. Usage is identical to that for
539 >     * per-worker queues -- see ForkJoinWorkerThread internal
540 >     * documentation.
541       */
542      int queueTop;
543  
# Line 568 | Line 575 | public class ForkJoinPool extends Abstra
575      private int nextWorkerIndex;
576  
577      /**
578 <     * SeqLock and index masking for for updates to workers array.
579 <     * Locked when SG_UNIT is set. Unlocking clears bit by adding
578 >     * SeqLock and index masking for updates to workers array.  Locked
579 >     * when SG_UNIT is set. Unlocking clears bit by adding
580       * SG_UNIT. Staleness of read-only operations can be checked by
581       * comparing scanGuard to value before the reads. The low 16 bits
582       * (i.e, anding with SMASK) hold (the smallest power of two
# Line 707 | Line 714 | public class ForkJoinPool extends Abstra
714       */
715      private boolean scan(ForkJoinWorkerThread w, int a) {
716          int g = scanGuard; // mask 0 avoids useless scans if only one active
717 <        int m = parallelism == 1 - a? 0 : g & SMASK;
717 >        int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;
718          ForkJoinWorkerThread[] ws = workers;
719          if (ws == null || ws.length <= m)         // staleness check
720              return false;
# Line 754 | Line 761 | public class ForkJoinPool extends Abstra
761      }
762  
763      /**
764 <     * Tries to enqueue worker in wait queue and await change in
765 <     * worker's eventCount.  Before blocking, rescans queues to avoid
766 <     * missed signals.  If the pool is quiescent, possibly terminates
767 <     * worker upon exit.
764 >     * Tries to enqueue worker w in wait queue and await change in
765 >     * worker's eventCount.  If the pool is quiescent and there is
766 >     * more than one worker, possibly terminates worker upon exit.
767 >     * Otherwise, before blocking, rescans queues to avoid missed
768 >     * signals.  Upon finding work, releases at least one worker
769 >     * (which may be the current worker). Rescans restart upon
770 >     * detected staleness or failure to release due to
771 >     * contention. Note the unusual conventions about Thread.interrupt
772 >     * here and elsewhere: Because interrupts are used solely to alert
773 >     * threads to check termination, which is checked here anyway, we
774 >     * clear status (using Thread.interrupted) before any call to
775 >     * park, so that park does not immediately return due to status
776 >     * being set via some other unrelated call to interrupt in user
777 >     * code.
778       *
779       * @param w the calling worker
780       * @param c the ctl value on entry
# Line 765 | Line 782 | public class ForkJoinPool extends Abstra
782       */
783      private boolean tryAwaitWork(ForkJoinWorkerThread w, long c) {
784          int v = w.eventCount;
785 <        w.nextWait = (int)c;                       // w's successor record
785 >        w.nextWait = (int)c;                      // w's successor record
786          long nc = (long)(v & E_MASK) | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
787          if (ctl != c || !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
788 <            long d = ctl; // return true if lost to a deq, to force rescan
788 >            long d = ctl; // return true if lost to a deq, to force scan
789              return (int)d != (int)c && ((d - c) & AC_MASK) >= 0L;
790          }
791 <        boolean rescanned = false;
792 <        for (int sc;;) {
791 >        for (int sc = w.stealCount; sc != 0;) {   // accumulate stealCount
792 >            long s = stealCount;
793 >            if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s + sc))
794 >                sc = w.stealCount = 0;
795 >            else if (w.eventCount != v)
796 >                return true;                      // update next time
797 >        }
798 >        if ((int)c != 0 && parallelism + (int)(nc >> AC_SHIFT) == 0 &&
799 >            blockedCount == 0 && quiescerCount == 0)
800 >            idleAwaitWork(w, nc, c, v);           // quiescent
801 >        for (boolean rescanned = false;;) {
802              if (w.eventCount != v)
803                  return true;
804 <            if ((sc = w.stealCount) != 0) {
779 <                long s = stealCount;               // accumulate stealCount
780 <                if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s+sc))
781 <                    w.stealCount = 0;
782 <            }
783 <            else if (!rescanned) {
804 >            if (!rescanned) {
805                  int g = scanGuard, m = g & SMASK;
806                  ForkJoinWorkerThread[] ws = workers;
807                  if (ws != null && m < ws.length) {
# Line 804 | Line 825 | public class ForkJoinPool extends Abstra
825                  else
826                      Thread.interrupted();          // clear before park
827              }
807            else if (parallelism + (int)(ctl >> AC_SHIFT) == 0 &&
808                     blockedCount == 0 && quiescerCount == 0)
809                idleAwaitWork(w, v);               // quiescent -- maybe shrink
828              else {
829                  w.parked = true;                   // must recheck
830                  if (w.eventCount != v) {
# Line 820 | Line 838 | public class ForkJoinPool extends Abstra
838      }
839  
840      /**
841 <     * If pool is quiescent, checks for termination, and waits for
842 <     * event signal for up to SHRINK_RATE nanosecs. On timeout, if ctl
843 <     * has not changed, terminates the worker. Upon its termination
844 <     * (see deregisterWorker), it may wake up another worker to
845 <     * possibly repeat this process.
841 >     * If inactivating worker w has caused pool to become
842 >     * quiescent, check for pool termination, and wait for event
843 >     * for up to SHRINK_RATE nanosecs (rescans are unnecessary in
844 >     * this case because quiescence reflects consensus about lack
845 >     * of work). On timeout, if ctl has not changed, terminate the
846 >     * worker. Upon its termination (see deregisterWorker), it may
847 >     * wake up another worker to possibly repeat this process.
848       *
849       * @param w the calling worker
850 <     * @param v the eventCount w must wait until changed
851 <     */
852 <    private void idleAwaitWork(ForkJoinWorkerThread w, int v) {
853 <        ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs
854 <        if (shutdown)
855 <            tryTerminate(false);
856 <        long c = ctl;
857 <        long nc = (((c & (AC_MASK|TC_MASK)) + AC_UNIT) |
858 <                   (long)(w.nextWait & E_MASK)); // ctl value to release w
859 <        if (w.eventCount == v &&
860 <            parallelism + (int)(c >> AC_SHIFT) == 0 &&
861 <            blockedCount == 0 && quiescerCount == 0) {
842 <            long startTime = System.nanoTime();
843 <            Thread.interrupted();
844 <            if (w.eventCount == v) {
850 >     * @param currentCtl the ctl value after enqueuing w
851 >     * @param prevCtl the ctl value if w terminated
852 >     * @param v the eventCount w awaits change
853 >     */
854 >    private void idleAwaitWork(ForkJoinWorkerThread w, long currentCtl,
855 >                               long prevCtl, int v) {
856 >        if (w.eventCount == v) {
857 >            if (shutdown)
858 >                tryTerminate(false);
859 >            ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs
860 >            while (ctl == currentCtl) {
861 >                long startTime = System.nanoTime();
862                  w.parked = true;
863 <                if (w.eventCount == v)
863 >                if (w.eventCount == v)             // must recheck
864                      LockSupport.parkNanos(this, SHRINK_RATE);
865                  w.parked = false;
866 <                if (w.eventCount == v && ctl == c &&
867 <                    System.nanoTime() - startTime >= SHRINK_RATE &&
868 <                    UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
869 <                    w.terminate = true;
870 <                    w.eventCount = ((int)c + EC_UNIT) & E_MASK;
866 >                if (w.eventCount != v)
867 >                    break;
868 >                else if (System.nanoTime() - startTime <
869 >                         SHRINK_RATE - (SHRINK_RATE / 10)) // timing slop
870 >                    Thread.interrupted();          // spurious wakeup
871 >                else if (UNSAFE.compareAndSwapLong(this, ctlOffset,
872 >                                                   currentCtl, prevCtl)) {
873 >                    w.terminate = true;            // restore previous
874 >                    w.eventCount = ((int)currentCtl + EC_UNIT) & E_MASK;
875 >                    break;
876                  }
877              }
878          }
# Line 886 | Line 908 | public class ForkJoinPool extends Abstra
908  
909      /**
910       * Creates or doubles submissionQueue array.
911 <     * Basically identical to ForkJoinWorkerThread version
911 >     * Basically identical to ForkJoinWorkerThread version.
912       */
913      private void growSubmissionQueue() {
914          ForkJoinTask<?>[] oldQ = submissionQueue;
# Line 991 | Line 1013 | public class ForkJoinPool extends Abstra
1013                  joinMe.tryAwaitDone(0L);
1014                  postBlock();
1015              }
1016 <            if ((ctl & STOP_BIT) != 0L)
1016 >            else if ((ctl & STOP_BIT) != 0L)
1017                  joinMe.cancelIgnoringExceptions();
1018          }
1019      }
# Line 1126 | Line 1148 | public class ForkJoinPool extends Abstra
1148                          ws[k] = w;
1149                          nextWorkerIndex = k + 1;
1150                          int m = g & SMASK;
1151 <                        g = k >= m? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1);
1151 >                        g = k > m? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1);
1152                      }
1153                  } finally {
1154                      scanGuard = g;
# Line 1206 | Line 1228 | public class ForkJoinPool extends Abstra
1228                  if ((int)(c >> AC_SHIFT) != -parallelism)
1229                      return false;
1230                  if (!shutdown || blockedCount != 0 || quiescerCount != 0 ||
1231 <                    queueTop - queueBase > 0) {
1231 >                    queueBase != queueTop) {
1232                      if (ctl == c) // staleness check
1233                          return false;
1234                      continue;
# Line 1215 | Line 1237 | public class ForkJoinPool extends Abstra
1237              if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, c | STOP_BIT))
1238                  startTerminating();
1239          }
1240 <        if ((short)(c >>> TC_SHIFT) == -parallelism) {
1241 <            submissionLock.lock();
1242 <            termination.signalAll();
1243 <            submissionLock.unlock();
1240 >        if ((short)(c >>> TC_SHIFT) == -parallelism) { // signal when 0 workers
1241 >            final ReentrantLock lock = this.submissionLock;
1242 >            lock.lock();
1243 >            try {
1244 >                termination.signalAll();
1245 >            } finally {
1246 >                lock.unlock();
1247 >            }
1248          }
1249          return true;
1250      }
1251  
1252      /**
1253       * Runs up to three passes through workers: (0) Setting
1254 <     * termination status for each worker, followed by wakeups up
1255 <     * queued workers (1) helping cancel tasks (2) interrupting
1254 >     * termination status for each worker, followed by wakeups up to
1255 >     * queued workers; (1) helping cancel tasks; (2) interrupting
1256       * lagging threads (likely in external tasks, but possibly also
1257       * blocked in joins).  Each pass repeats previous steps because of
1258       * potential lagging thread creation.
# Line 1272 | Line 1298 | public class ForkJoinPool extends Abstra
1298  
1299      /**
1300       * Tries to set the termination status of waiting workers, and
1301 <     * then wake them up (after which they will terminate).
1301 >     * then wakes them up (after which they will terminate).
1302       */
1303      private void terminateWaiters() {
1304          ForkJoinWorkerThread[] ws = workers;
# Line 1728 | Line 1754 | public class ForkJoinPool extends Abstra
1754  
1755      /**
1756       * Returns an estimate of the number of tasks submitted to this
1757 <     * pool that have not yet begun executing.  This meThod may take
1757 >     * pool that have not yet begun executing.  This method may take
1758       * time proportional to the number of submissions.
1759       *
1760       * @return the number of queued submissions
# Line 1965 | Line 1991 | public class ForkJoinPool extends Abstra
1991       * {@code isReleasable} must return {@code true} if blocking is
1992       * not necessary. Method {@code block} blocks the current thread
1993       * if necessary (perhaps internally invoking {@code isReleasable}
1994 <     * before actually blocking). The unusual methods in this API
1995 <     * accommodate synchronizers that may, but don't usually, block
1996 <     * for long periods. Similarly, they allow more efficient internal
1997 <     * handling of cases in which additional workers may be, but
1998 <     * usually are not, needed to ensure sufficient parallelism.
1999 <     * Toward this end, implementations of method {@code isReleasable}
2000 <     * must be amenable to repeated invocation.
1994 >     * before actually blocking). These actions are performed by any
1995 >     * thread invoking {@link ForkJoinPool#managedBlock}.  The
1996 >     * unusual methods in this API accommodate synchronizers that may,
1997 >     * but don't usually, block for long periods. Similarly, they
1998 >     * allow more efficient internal handling of cases in which
1999 >     * additional workers may be, but usually are not, needed to
2000 >     * ensure sufficient parallelism.  Toward this end,
2001 >     * implementations of method {@code isReleasable} must be amenable
2002 >     * to repeated invocation.
2003       *
2004       * <p>For example, here is a ManagedBlocker based on a
2005       * ReentrantLock:

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines