ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.92 by dl, Tue Feb 22 10:50:51 2011 UTC vs.
Revision 1.96 by dl, Mon Mar 7 14:10:48 2011 UTC

# Line 151 | Line 151 | public class ForkJoinPool extends Abstra
151       * Updates tend not to contend with each other except during
152       * bursts while submitted tasks begin or end.  In some cases when
153       * they do contend, threads can instead do something else
154 <     * (usually, scan for tesks) until contention subsides.
154 >     * (usually, scan for tasks) until contention subsides.
155       *
156       * To enable packing, we restrict maximum parallelism to (1<<15)-1
157       * (which is far in excess of normal operating range) to allow
# Line 195 | Line 195 | public class ForkJoinPool extends Abstra
195       * shutdown schemes.
196       *
197       * Wait Queuing. Unlike HPC work-stealing frameworks, we cannot
198 <     * let workers spin indefinitely scanning for tasks when none are
199 <     * can be immediately found, and we cannot start/resume workers
200 <     * unless there appear to be tasks available.  On the other hand,
201 <     * we must quickly prod them into action when new tasks are
202 <     * submitted or generated.  We park/unpark workers after placing
203 <     * in an event wait queue when they cannot find work. This "queue"
204 <     * is actually a simple Treiber stack, headed by the "id" field of
205 <     * ctl, plus a 15bit counter value to both wake up waiters (by
206 <     * advancing their count) and avoid ABA effects. Successors are
207 <     * held in worker field "nextWait".  Queuing deals with several
208 <     * intrinsic races, mainly that a task-producing thread can miss
209 <     * seeing (and signalling) another thread that gave up looking for
210 <     * work but has not yet entered the wait queue. We solve this by
211 <     * requiring a full sweep of all workers both before (in scan())
212 <     * and after (in awaitWork()) a newly waiting worker is added to
213 <     * the wait queue. During a rescan, the worker might release some
214 <     * other queued worker rather than itself, which has the same net
215 <     * effect.
198 >     * let workers spin indefinitely scanning for tasks when none can
199 >     * be found immediately, and we cannot start/resume workers unless
200 >     * there appear to be tasks available.  On the other hand, we must
201 >     * quickly prod them into action when new tasks are submitted or
202 >     * generated.  We park/unpark workers after placing in an event
203 >     * wait queue when they cannot find work. This "queue" is actually
204 >     * a simple Treiber stack, headed by the "id" field of ctl, plus a
205 >     * 15bit counter value to both wake up waiters (by advancing their
206 >     * count) and avoid ABA effects. Successors are held in worker
207 >     * field "nextWait".  Queuing deals with several intrinsic races,
208 >     * mainly that a task-producing thread can miss seeing (and
209 >     * signalling) another thread that gave up looking for work but
210 >     * has not yet entered the wait queue. We solve this by requiring
211 >     * a full sweep of all workers both before (in scan()) and after
212 >     * (in tryAwaitWork()) a newly waiting worker is added to the wait
213 >     * queue. During a rescan, the worker might release some other
214 >     * queued worker rather than itself, which has the same net
215 >     * effect. Because enqueued workers may actually be rescanning
216 >     * rather than waiting, we set and clear the "parked" field of
217 >     * ForkJoinWorkerThread to reduce unnecessary calls to unpark.
218 >     * (Use of the parked field requires a secondary recheck to avoid
219 >     * missed signals.)
220       *
221       * Signalling.  We create or wake up workers only when there
222       * appears to be at least one task they might be able to find and
# Line 229 | Line 233 | public class ForkJoinPool extends Abstra
233       * Trimming workers. To release resources after periods of lack of
234       * use, a worker starting to wait when the pool is quiescent will
235       * time out and terminate if the pool has remained quiescent for
236 <     * SHRINK_RATE nanosecs.
236 >     * SHRINK_RATE nanosecs. This will slowly propagate, eventually
237 >     * terminating all workers after long periods of non-use.
238       *
239       * Submissions. External submissions are maintained in an
240       * array-based queue that is structured identically to
241 <     * ForkJoinWorkerThread queues (which see) except for the use of
242 <     * submissionLock in method addSubmission. Unlike worker queues,
243 <     * multiple external threads can add new submissions.
241 >     * ForkJoinWorkerThread queues except for the use of
242 >     * submissionLock in method addSubmission. Unlike the case for
243 >     * worker queues, multiple external threads can add new
244 >     * submissions, so adding requires a lock.
245       *
246       * Compensation. Beyond work-stealing support and lifecycle
247       * control, the main responsibility of this framework is to take
# Line 272 | Line 278 | public class ForkJoinPool extends Abstra
278       * if blocking would leave less than one active (non-waiting,
279       * non-blocked) worker. Additionally, to avoid some false alarms
280       * due to GC, lagging counters, system activity, etc, compensated
281 <     * blocking for joins is only attempted after a number of rechecks
282 <     * proportional to the current apparent deficit (where retries are
283 <     * interspersed with Thread.yield, for good citizenship).  The
284 <     * variable blockedCount, incremented before blocking and
285 <     * decremented after, is sometimes needed to distinguish cases of
286 <     * waiting for work vs blocking on joins or other managed sync,
287 <     * but both the cases are equivalent for most pool control, so we
288 <     * can update non-atomically. (Additionally, contention on
283 <     * blockedCount alleviates some contention on ctl).
281 >     * blocking for joins is only attempted after rechecks stabilize
282 >     * (retries are interspersed with Thread.yield, for good
283 >     * citizenship).  The variable blockedCount, incremented before
284 >     * blocking and decremented after, is sometimes needed to
285 >     * distinguish cases of waiting for work vs blocking on joins or
286 >     * other managed sync. Both cases are equivalent for most pool
287 >     * control, so we can update non-atomically. (Additionally,
288 >     * contention on blockedCount alleviates some contention on ctl).
289       *
290       * Shutdown and Termination. A call to shutdownNow atomically sets
291       * the ctl stop bit and then (non-atomically) sets each workers
# Line 478 | Line 483 | public class ForkJoinPool extends Abstra
483       * negative, there is at least one waiting worker, and when e is
484       * negative, the pool is terminating.  To deal with these possibly
485       * negative fields, we use casts in and out of "short" and/or
486 <     * signed shifts to maintain signedness.  Note: AC_SHIFT is
482 <     * redundantly declared in ForkJoinWorkerThread in order to
483 <     * integrate a surplus-threads check.
486 >     * signed shifts to maintain signedness.
487       */
488      volatile long ctl;
489  
# Line 524 | Line 527 | public class ForkJoinPool extends Abstra
527  
528      /**
529       * Index (mod submission queue length) of next element to take
530 <     * from submission queue.
530 >     * from submission queue. Usage is identical to that for
531 >     * per-worker queues -- see ForkJoinWorkerThread internal
532 >     * documentation.
533       */
534      volatile int queueBase;
535  
536      /**
537       * Index (mod submission queue length) of next element to add
538 <     * in submission queue.
538 >     * in submission queue. Usage is identical to that for
539 >     * per-worker queues -- see ForkJoinWorkerThread internal
540 >     * documentation.
541       */
542      int queueTop;
543  
# Line 568 | Line 575 | public class ForkJoinPool extends Abstra
575      private int nextWorkerIndex;
576  
577      /**
578 <     * SeqLock and index masking for for updates to workers array.
579 <     * Locked when SG_UNIT is set. Unlocking clears bit by adding
578 >     * SeqLock and index masking for updates to workers array.  Locked
579 >     * when SG_UNIT is set. Unlocking clears bit by adding
580       * SG_UNIT. Staleness of read-only operations can be checked by
581       * comparing scanGuard to value before the reads. The low 16 bits
582       * (i.e, anding with SMASK) hold (the smallest power of two
# Line 707 | Line 714 | public class ForkJoinPool extends Abstra
714       */
715      private boolean scan(ForkJoinWorkerThread w, int a) {
716          int g = scanGuard; // mask 0 avoids useless scans if only one active
717 <        int m = parallelism == 1 - a? 0 : g & SMASK;
717 >        int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;
718          ForkJoinWorkerThread[] ws = workers;
719          if (ws == null || ws.length <= m)         // staleness check
720              return false;
# Line 754 | Line 761 | public class ForkJoinPool extends Abstra
761      }
762  
763      /**
764 <     * Tries to enqueue worker in wait queue and await change in
765 <     * worker's eventCount.  Before blocking, rescans queues to avoid
766 <     * missed signals.  If the pool is quiescent, possibly terminates
767 <     * worker upon exit.
764 >     * Tries to enqueue worker w in wait queue and await change in
765 >     * worker's eventCount.  If the pool is quiescent, possibly
766 >     * terminates worker upon exit.  Otherwise, before blocking,
767 >     * rescans queues to avoid missed signals.  Upon finding work,
768 >     * releases at least one worker (which may be the current
769 >     * worker). Rescans restart upon detected staleness or failure to
770 >     * release due to contention. Note the unusual conventions about
771 >     * Thread.interrupt here and elsewhere: Because interrupts are
772 >     * used solely to alert threads to check termination, which is
773 >     * checked here anyway, we clear status (using Thread.interrupted)
774 >     * before any call to park, so that park does not immediately
775 >     * return due to status being set via some other unrelated call to
776 >     * interrupt in user code.
777       *
778       * @param w the calling worker
779       * @param c the ctl value on entry
# Line 765 | Line 781 | public class ForkJoinPool extends Abstra
781       */
782      private boolean tryAwaitWork(ForkJoinWorkerThread w, long c) {
783          int v = w.eventCount;
784 <        w.nextWait = (int)c;                       // w's successor record
784 >        w.nextWait = (int)c;                      // w's successor record
785          long nc = (long)(v & E_MASK) | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
786          if (ctl != c || !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
787 <            long d = ctl; // return true if lost to a deq, to force rescan
787 >            long d = ctl; // return true if lost to a deq, to force scan
788              return (int)d != (int)c && ((d - c) & AC_MASK) >= 0L;
789          }
790 <        if (parallelism + (int)(c >> AC_SHIFT) == 1 &&
790 >        for (int sc = w.stealCount; sc != 0;) {   // accumulate stealCount
791 >            long s = stealCount;
792 >            if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s + sc))
793 >                sc = w.stealCount = 0;
794 >            else if (w.eventCount != v)
795 >                return true;                      // update next time
796 >        }
797 >        if (parallelism + (int)(nc >> AC_SHIFT) == 0 &&
798              blockedCount == 0 && quiescerCount == 0)
799 <            idleAwaitWork(w, v);               // quiescent -- maybe shrink
800 <
778 <        boolean rescanned = false;
779 <        for (int sc;;) {
799 >            idleAwaitWork(w, nc, c, v);           // quiescent
800 >        for (boolean rescanned = false;;) {
801              if (w.eventCount != v)
802                  return true;
803 <            if ((sc = w.stealCount) != 0) {
783 <                long s = stealCount;               // accumulate stealCount
784 <                if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s+sc))
785 <                    w.stealCount = 0;
786 <            }
787 <            else if (!rescanned) {
803 >            if (!rescanned) {
804                  int g = scanGuard, m = g & SMASK;
805                  ForkJoinWorkerThread[] ws = workers;
806                  if (ws != null && m < ws.length) {
# Line 821 | Line 837 | public class ForkJoinPool extends Abstra
837      }
838  
839      /**
840 <     * If pool is quiescent, checks for termination, and waits for
841 <     * event signal for up to SHRINK_RATE nanosecs. On timeout, if ctl
842 <     * has not changed, terminates the worker. Upon its termination
843 <     * (see deregisterWorker), it may wake up another worker to
844 <     * possibly repeat this process.
840 >     * If inactivating worker w has caused pool to become
841 >     * quiescent, check for pool termination, and wait for event
842 >     * for up to SHRINK_RATE nanosecs (rescans are unnecessary in
843 >     * this case because quiescence reflects consensus about lack
844 >     * of work). On timeout, if ctl has not changed, terminate the
845 >     * worker. Upon its termination (see deregisterWorker), it may
846 >     * wake up another worker to possibly repeat this process.
847       *
848       * @param w the calling worker
849 <     * @param v the eventCount w must wait until changed
850 <     */
851 <    private void idleAwaitWork(ForkJoinWorkerThread w, int v) {
852 <        ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs
853 <        if (shutdown)
854 <            tryTerminate(false);
855 <        long c = ctl;
856 <        long nc = (((c & (AC_MASK|TC_MASK)) + AC_UNIT) |
857 <                   (long)(w.nextWait & E_MASK)); // ctl value to release w
858 <        if (w.eventCount == v &&
859 <            parallelism + (int)(c >> AC_SHIFT) == 0 &&
860 <            blockedCount == 0 && quiescerCount == 0) {
843 <            long startTime = System.nanoTime();
844 <            Thread.interrupted();
845 <            if (w.eventCount == v) {
849 >     * @param currentCtl the ctl value after enqueuing w
850 >     * @param prevCtl the ctl value if w terminated
851 >     * @param v the eventCount w awaits change
852 >     */
853 >    private void idleAwaitWork(ForkJoinWorkerThread w, long currentCtl,
854 >                               long prevCtl, int v) {
855 >        if (w.eventCount == v) {
856 >            if (shutdown)
857 >                tryTerminate(false);
858 >            ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs
859 >            while (ctl == currentCtl) {
860 >                long startTime = System.nanoTime();
861                  w.parked = true;
862 <                if (w.eventCount == v)
862 >                if (w.eventCount == v)             // must recheck
863                      LockSupport.parkNanos(this, SHRINK_RATE);
864                  w.parked = false;
865 <                if (w.eventCount == v && ctl == c &&
866 <                    System.nanoTime() - startTime >= SHRINK_RATE &&
867 <                    UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
868 <                    w.terminate = true;
869 <                    w.eventCount = ((int)c + EC_UNIT) & E_MASK;
865 >                if (w.eventCount != v)
866 >                    break;
867 >                else if (System.nanoTime() - startTime < SHRINK_RATE)
868 >                    Thread.interrupted();          // spurious wakeup
869 >                else if (UNSAFE.compareAndSwapLong(this, ctlOffset,
870 >                                                   currentCtl, prevCtl)) {
871 >                    w.terminate = true;            // restore previous
872 >                    w.eventCount = ((int)currentCtl + EC_UNIT) & E_MASK;
873 >                    break;
874                  }
875              }
876          }
# Line 887 | Line 906 | public class ForkJoinPool extends Abstra
906  
907      /**
908       * Creates or doubles submissionQueue array.
909 <     * Basically identical to ForkJoinWorkerThread version
909 >     * Basically identical to ForkJoinWorkerThread version.
910       */
911      private void growSubmissionQueue() {
912          ForkJoinTask<?>[] oldQ = submissionQueue;
# Line 992 | Line 1011 | public class ForkJoinPool extends Abstra
1011                  joinMe.tryAwaitDone(0L);
1012                  postBlock();
1013              }
1014 <            if ((ctl & STOP_BIT) != 0L)
1014 >            else if ((ctl & STOP_BIT) != 0L)
1015                  joinMe.cancelIgnoringExceptions();
1016          }
1017      }
# Line 1207 | Line 1226 | public class ForkJoinPool extends Abstra
1226                  if ((int)(c >> AC_SHIFT) != -parallelism)
1227                      return false;
1228                  if (!shutdown || blockedCount != 0 || quiescerCount != 0 ||
1229 <                    queueTop - queueBase > 0) {
1229 >                    queueBase != queueTop) {
1230                      if (ctl == c) // staleness check
1231                          return false;
1232                      continue;
# Line 1216 | Line 1235 | public class ForkJoinPool extends Abstra
1235              if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, c | STOP_BIT))
1236                  startTerminating();
1237          }
1238 <        if ((short)(c >>> TC_SHIFT) == -parallelism) {
1239 <            submissionLock.lock();
1240 <            termination.signalAll();
1241 <            submissionLock.unlock();
1238 >        if ((short)(c >>> TC_SHIFT) == -parallelism) { // signal when 0 workers
1239 >            final ReentrantLock lock = this.submissionLock;
1240 >            lock.lock();
1241 >            try {
1242 >                termination.signalAll();
1243 >            } finally {
1244 >                lock.unlock();
1245 >            }
1246          }
1247          return true;
1248      }
1249  
1250      /**
1251       * Runs up to three passes through workers: (0) Setting
1252 <     * termination status for each worker, followed by wakeups up
1253 <     * queued workers (1) helping cancel tasks (2) interrupting
1252 >     * termination status for each worker, followed by wakeups up to
1253 >     * queued workers; (1) helping cancel tasks; (2) interrupting
1254       * lagging threads (likely in external tasks, but possibly also
1255       * blocked in joins).  Each pass repeats previous steps because of
1256       * potential lagging thread creation.
# Line 1273 | Line 1296 | public class ForkJoinPool extends Abstra
1296  
1297      /**
1298       * Tries to set the termination status of waiting workers, and
1299 <     * then wake them up (after which they will terminate).
1299 >     * then wakes them up (after which they will terminate).
1300       */
1301      private void terminateWaiters() {
1302          ForkJoinWorkerThread[] ws = workers;
# Line 1729 | Line 1752 | public class ForkJoinPool extends Abstra
1752  
1753      /**
1754       * Returns an estimate of the number of tasks submitted to this
1755 <     * pool that have not yet begun executing.  This meThod may take
1755 >     * pool that have not yet begun executing.  This method may take
1756       * time proportional to the number of submissions.
1757       *
1758       * @return the number of queued submissions
# Line 1966 | Line 1989 | public class ForkJoinPool extends Abstra
1989       * {@code isReleasable} must return {@code true} if blocking is
1990       * not necessary. Method {@code block} blocks the current thread
1991       * if necessary (perhaps internally invoking {@code isReleasable}
1992 <     * before actually blocking). The unusual methods in this API
1993 <     * accommodate synchronizers that may, but don't usually, block
1994 <     * for long periods. Similarly, they allow more efficient internal
1995 <     * handling of cases in which additional workers may be, but
1996 <     * usually are not, needed to ensure sufficient parallelism.
1997 <     * Toward this end, implementations of method {@code isReleasable}
1998 <     * must be amenable to repeated invocation.
1992 >     * before actually blocking). These actions are performed by any
1993 >     * thread invoking {@link ForkJoinPool#managedBlock}.  The
1994 >     * unusual methods in this API accommodate synchronizers that may,
1995 >     * but don't usually, block for long periods. Similarly, they
1996 >     * allow more efficient internal handling of cases in which
1997 >     * additional workers may be, but usually are not, needed to
1998 >     * ensure sufficient parallelism.  Toward this end,
1999 >     * implementations of method {@code isReleasable} must be amenable
2000 >     * to repeated invocation.
2001       *
2002       * <p>For example, here is a ManagedBlocker based on a
2003       * ReentrantLock:

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines