ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.91 by dl, Tue Feb 22 00:39:31 2011 UTC vs.
Revision 1.107 by jsr166, Fri Jul 1 03:09:02 2011 UTC

# Line 1 | Line 1
1   /*
2   * Written by Doug Lea with assistance from members of JCP JSR-166
3   * Expert Group and released to the public domain, as explained at
4 < * http://creativecommons.org/licenses/publicdomain
4 > * http://creativecommons.org/publicdomain/zero/1.0/
5   */
6  
7   package jsr166y;
# Line 19 | Line 19 | import java.util.concurrent.Future;
19   import java.util.concurrent.RejectedExecutionException;
20   import java.util.concurrent.RunnableFuture;
21   import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.TimeoutException;
22   import java.util.concurrent.atomic.AtomicInteger;
23   import java.util.concurrent.locks.LockSupport;
24   import java.util.concurrent.locks.ReentrantLock;
# Line 102 | Line 101 | import java.util.concurrent.locks.Condit
101   * daemon} mode, there is typically no need to explicitly {@link
102   * #shutdown} such a pool upon program exit.
103   *
104 < * <pre>
104 > *  <pre> {@code
105   * static final ForkJoinPool mainPool = new ForkJoinPool();
106   * ...
107   * public void sort(long[] array) {
108   *   mainPool.invoke(new SortTask(array, 0, array.length));
109 < * }
111 < * </pre>
109 > * }}</pre>
110   *
111   * <p><b>Implementation notes</b>: This implementation restricts the
112   * maximum number of running threads to 32767. Attempts to create
# Line 151 | Line 149 | public class ForkJoinPool extends Abstra
149       * Updates tend not to contend with each other except during
150       * bursts while submitted tasks begin or end.  In some cases when
151       * they do contend, threads can instead do something else
152 <     * (usually, scan for tesks) until contention subsides.
152 >     * (usually, scan for tasks) until contention subsides.
153       *
154       * To enable packing, we restrict maximum parallelism to (1<<15)-1
155       * (which is far in excess of normal operating range) to allow
# Line 195 | Line 193 | public class ForkJoinPool extends Abstra
193       * shutdown schemes.
194       *
195       * Wait Queuing. Unlike HPC work-stealing frameworks, we cannot
196 <     * let workers spin indefinitely scanning for tasks when none are
197 <     * can be immediately found, and we cannot start/resume workers
198 <     * unless there appear to be tasks available.  On the other hand,
199 <     * we must quickly prod them into action when new tasks are
200 <     * submitted or generated.  We park/unpark workers after placing
201 <     * in an event wait queue when they cannot find work. This "queue"
202 <     * is actually a simple Treiber stack, headed by the "id" field of
203 <     * ctl, plus a 15bit counter value to both wake up waiters (by
204 <     * advancing their count) and avoid ABA effects. Successors are
205 <     * held in worker field "nextWait".  Queuing deals with several
206 <     * intrinsic races, mainly that a task-producing thread can miss
207 <     * seeing (and signalling) another thread that gave up looking for
208 <     * work but has not yet entered the wait queue. We solve this by
209 <     * requiring a full sweep of all workers both before (in scan())
210 <     * and after (in awaitWork()) a newly waiting worker is added to
211 <     * the wait queue. During a rescan, the worker might release some
212 <     * other queued worker rather than itself, which has the same net
213 <     * effect.
196 >     * let workers spin indefinitely scanning for tasks when none can
197 >     * be found immediately, and we cannot start/resume workers unless
198 >     * there appear to be tasks available.  On the other hand, we must
199 >     * quickly prod them into action when new tasks are submitted or
200 >     * generated.  We park/unpark workers after placing in an event
201 >     * wait queue when they cannot find work. This "queue" is actually
202 >     * a simple Treiber stack, headed by the "id" field of ctl, plus a
203 >     * 15bit counter value to both wake up waiters (by advancing their
204 >     * count) and avoid ABA effects. Successors are held in worker
205 >     * field "nextWait".  Queuing deals with several intrinsic races,
206 >     * mainly that a task-producing thread can miss seeing (and
207 >     * signalling) another thread that gave up looking for work but
208 >     * has not yet entered the wait queue. We solve this by requiring
209 >     * a full sweep of all workers both before (in scan()) and after
210 >     * (in tryAwaitWork()) a newly waiting worker is added to the wait
211 >     * queue. During a rescan, the worker might release some other
212 >     * queued worker rather than itself, which has the same net
213 >     * effect. Because enqueued workers may actually be rescanning
214 >     * rather than waiting, we set and clear the "parked" field of
215 >     * ForkJoinWorkerThread to reduce unnecessary calls to unpark.
216 >     * (Use of the parked field requires a secondary recheck to avoid
217 >     * missed signals.)
218       *
219       * Signalling.  We create or wake up workers only when there
220       * appears to be at least one task they might be able to find and
# Line 229 | Line 231 | public class ForkJoinPool extends Abstra
231       * Trimming workers. To release resources after periods of lack of
232       * use, a worker starting to wait when the pool is quiescent will
233       * time out and terminate if the pool has remained quiescent for
234 <     * SHRINK_RATE nanosecs.
234 >     * SHRINK_RATE nanosecs. This will slowly propagate, eventually
235 >     * terminating all workers after long periods of non-use.
236       *
237       * Submissions. External submissions are maintained in an
238       * array-based queue that is structured identically to
239 <     * ForkJoinWorkerThread queues (which see) except for the use of
240 <     * submissionLock in method addSubmission. Unlike worker queues,
241 <     * multiple external threads can add new submissions.
239 >     * ForkJoinWorkerThread queues except for the use of
240 >     * submissionLock in method addSubmission. Unlike the case for
241 >     * worker queues, multiple external threads can add new
242 >     * submissions, so adding requires a lock.
243       *
244       * Compensation. Beyond work-stealing support and lifecycle
245       * control, the main responsibility of this framework is to take
# Line 272 | Line 276 | public class ForkJoinPool extends Abstra
276       * if blocking would leave less than one active (non-waiting,
277       * non-blocked) worker. Additionally, to avoid some false alarms
278       * due to GC, lagging counters, system activity, etc, compensated
279 <     * blocking for joins is only attempted after a number of rechecks
280 <     * proportional to the current apparent deficit (where retries are
281 <     * interspersed with Thread.yield, for good citizenship).  The
282 <     * variable blockedCount, incremented before blocking and
283 <     * decremented after, is sometimes needed to distinguish cases of
284 <     * waiting for work vs blocking on joins or other managed sync,
285 <     * but both the cases are equivalent for most pool control, so we
286 <     * can update non-atomically. (Additionally, contention on
283 <     * blockedCount alleviates some contention on ctl).
279 >     * blocking for joins is only attempted after rechecks stabilize
280 >     * (retries are interspersed with Thread.yield, for good
281 >     * citizenship).  The variable blockedCount, incremented before
282 >     * blocking and decremented after, is sometimes needed to
283 >     * distinguish cases of waiting for work vs blocking on joins or
284 >     * other managed sync. Both cases are equivalent for most pool
285 >     * control, so we can update non-atomically. (Additionally,
286 >     * contention on blockedCount alleviates some contention on ctl).
287       *
288       * Shutdown and Termination. A call to shutdownNow atomically sets
289       * the ctl stop bit and then (non-atomically) sets each workers
290       * "terminate" status, cancels all unprocessed tasks, and wakes up
291       * all waiting workers.  Detecting whether termination should
292       * commence after a non-abrupt shutdown() call requires more work
293 <     * and bookkeeping. We need consensus about quiesence (i.e., that
293 >     * and bookkeeping. We need consensus about quiescence (i.e., that
294       * there is no more work) which is reflected in active counts so
295       * long as there are no current blockers, as well as possible
296       * re-evaluations during independent changes in blocking or
# Line 462 | Line 465 | public class ForkJoinPool extends Abstra
465      /**
466       * Main pool control -- a long packed with:
467       * AC: Number of active running workers minus target parallelism (16 bits)
468 <     * TC: Number of total workers minus target parallelism (16bits)
468 >     * TC: Number of total workers minus target parallelism (16 bits)
469       * ST: true if pool is terminating (1 bit)
470       * EC: the wait count of top waiting thread (15 bits)
471       * ID: ~poolIndex of top of Treiber stack of waiting threads (16 bits)
# Line 478 | Line 481 | public class ForkJoinPool extends Abstra
481       * negative, there is at least one waiting worker, and when e is
482       * negative, the pool is terminating.  To deal with these possibly
483       * negative fields, we use casts in and out of "short" and/or
484 <     * signed shifts to maintain signedness.  Note: AC_SHIFT is
482 <     * redundantly declared in ForkJoinWorkerThread in order to
483 <     * integrate a surplus-threads check.
484 >     * signed shifts to maintain signedness.
485       */
486      volatile long ctl;
487  
# Line 524 | Line 525 | public class ForkJoinPool extends Abstra
525  
526      /**
527       * Index (mod submission queue length) of next element to take
528 <     * from submission queue.
528 >     * from submission queue. Usage is identical to that for
529 >     * per-worker queues -- see ForkJoinWorkerThread internal
530 >     * documentation.
531       */
532      volatile int queueBase;
533  
534      /**
535       * Index (mod submission queue length) of next element to add
536 <     * in submission queue.
536 >     * in submission queue. Usage is identical to that for
537 >     * per-worker queues -- see ForkJoinWorkerThread internal
538 >     * documentation.
539       */
540      int queueTop;
541  
# Line 540 | Line 545 | public class ForkJoinPool extends Abstra
545      volatile boolean shutdown;
546  
547      /**
548 <     * True if use local fifo, not default lifo, for local polling
549 <     * Read by, and replicated by ForkJoinWorkerThreads
548 >     * True if use local fifo, not default lifo, for local polling.
549 >     * Read by, and replicated by ForkJoinWorkerThreads.
550       */
551      final boolean locallyFifo;
552  
# Line 568 | Line 573 | public class ForkJoinPool extends Abstra
573      private int nextWorkerIndex;
574  
575      /**
576 <     * SeqLock and index masking for for updates to workers array.
577 <     * Locked when SG_UNIT is set. Unlocking clears bit by adding
576 >     * SeqLock and index masking for updates to workers array.  Locked
577 >     * when SG_UNIT is set. Unlocking clears bit by adding
578       * SG_UNIT. Staleness of read-only operations can be checked by
579       * comparing scanGuard to value before the reads. The low 16 bits
580       * (i.e, anding with SMASK) hold (the smallest power of two
# Line 707 | Line 712 | public class ForkJoinPool extends Abstra
712       */
713      private boolean scan(ForkJoinWorkerThread w, int a) {
714          int g = scanGuard; // mask 0 avoids useless scans if only one active
715 <        int m = parallelism == 1 - a? 0 : g & SMASK;
715 >        int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;
716          ForkJoinWorkerThread[] ws = workers;
717          if (ws == null || ws.length <= m)         // staleness check
718              return false;
# Line 754 | Line 759 | public class ForkJoinPool extends Abstra
759      }
760  
761      /**
762 <     * Tries to enqueue worker in wait queue and await change in
763 <     * worker's eventCount.  Before blocking, rescans queues to avoid
764 <     * missed signals.  If the pool is quiescent, possibly terminates
765 <     * worker upon exit.
762 >     * Tries to enqueue worker w in wait queue and await change in
763 >     * worker's eventCount.  If the pool is quiescent and there is
764 >     * more than one worker, possibly terminates worker upon exit.
765 >     * Otherwise, before blocking, rescans queues to avoid missed
766 >     * signals.  Upon finding work, releases at least one worker
767 >     * (which may be the current worker). Rescans restart upon
768 >     * detected staleness or failure to release due to
769 >     * contention. Note the unusual conventions about Thread.interrupt
770 >     * here and elsewhere: Because interrupts are used solely to alert
771 >     * threads to check termination, which is checked here anyway, we
772 >     * clear status (using Thread.interrupted) before any call to
773 >     * park, so that park does not immediately return due to status
774 >     * being set via some other unrelated call to interrupt in user
775 >     * code.
776       *
777       * @param w the calling worker
778       * @param c the ctl value on entry
# Line 765 | Line 780 | public class ForkJoinPool extends Abstra
780       */
781      private boolean tryAwaitWork(ForkJoinWorkerThread w, long c) {
782          int v = w.eventCount;
783 <        w.nextWait = (int)c;                       // w's successor record
783 >        w.nextWait = (int)c;                      // w's successor record
784          long nc = (long)(v & E_MASK) | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
785          if (ctl != c || !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
786 <            long d = ctl; // return true if lost to a deq, to force rescan
786 >            long d = ctl; // return true if lost to a deq, to force scan
787              return (int)d != (int)c && ((d - c) & AC_MASK) >= 0L;
788          }
789 <        boolean rescanned = false;
790 <        for (int sc;;) {
789 >        for (int sc = w.stealCount; sc != 0;) {   // accumulate stealCount
790 >            long s = stealCount;
791 >            if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s + sc))
792 >                sc = w.stealCount = 0;
793 >            else if (w.eventCount != v)
794 >                return true;                      // update next time
795 >        }
796 >        if ((!shutdown || !tryTerminate(false)) &&
797 >            (int)c != 0 && parallelism + (int)(nc >> AC_SHIFT) == 0 &&
798 >            blockedCount == 0 && quiescerCount == 0)
799 >            idleAwaitWork(w, nc, c, v);           // quiescent
800 >        for (boolean rescanned = false;;) {
801              if (w.eventCount != v)
802                  return true;
803 <            if ((sc = w.stealCount) != 0) {
779 <                long s = stealCount;               // accumulate stealCount
780 <                if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s+sc))
781 <                    w.stealCount = 0;
782 <            }
783 <            else if (!rescanned) {
803 >            if (!rescanned) {
804                  int g = scanGuard, m = g & SMASK;
805                  ForkJoinWorkerThread[] ws = workers;
806                  if (ws != null && m < ws.length) {
# Line 804 | Line 824 | public class ForkJoinPool extends Abstra
824                  else
825                      Thread.interrupted();          // clear before park
826              }
807            else if (parallelism + (int)(ctl >> AC_SHIFT) == 0 &&
808                     blockedCount == 0 && quiescerCount == 0)
809                idleAwaitWork(w, v);               // quiescent -- maybe shrink
827              else {
828                  w.parked = true;                   // must recheck
829                  if (w.eventCount != v) {
# Line 820 | Line 837 | public class ForkJoinPool extends Abstra
837      }
838  
839      /**
840 <     * If pool is quiescent, checks for termination, and waits for
841 <     * event signal for up to SHRINK_RATE nanosecs. On timeout, if ctl
842 <     * has not changed, terminates the worker. Upon its termination
843 <     * (see deregisterWorker), it may wake up another worker to
844 <     * possibly repeat this process.
840 >     * If inactivating worker w has caused pool to become
841 >     * quiescent, check for pool termination, and wait for event
842 >     * for up to SHRINK_RATE nanosecs (rescans are unnecessary in
843 >     * this case because quiescence reflects consensus about lack
844 >     * of work). On timeout, if ctl has not changed, terminate the
845 >     * worker. Upon its termination (see deregisterWorker), it may
846 >     * wake up another worker to possibly repeat this process.
847       *
848       * @param w the calling worker
849 <     * @param v the eventCount w must wait until changed
850 <     */
851 <    private void idleAwaitWork(ForkJoinWorkerThread w, int v) {
852 <        ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs
853 <        if (shutdown)
854 <            tryTerminate(false);
855 <        long c = ctl;
856 <        long nc = (((c & (AC_MASK|TC_MASK)) + AC_UNIT) |
857 <                   (long)(w.nextWait & E_MASK)); // ctl value to release w
858 <        if (w.eventCount == v &&
859 <            parallelism + (int)(c >> AC_SHIFT) == 0 &&
860 <            blockedCount == 0 && quiescerCount == 0) {
842 <            long startTime = System.nanoTime();
843 <            Thread.interrupted();
844 <            if (w.eventCount == v) {
849 >     * @param currentCtl the ctl value after enqueuing w
850 >     * @param prevCtl the ctl value if w terminated
851 >     * @param v the eventCount w awaits change
852 >     */
853 >    private void idleAwaitWork(ForkJoinWorkerThread w, long currentCtl,
854 >                               long prevCtl, int v) {
855 >        if (w.eventCount == v) {
856 >            if (shutdown)
857 >                tryTerminate(false);
858 >            ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs
859 >            while (ctl == currentCtl) {
860 >                long startTime = System.nanoTime();
861                  w.parked = true;
862 <                if (w.eventCount == v)
862 >                if (w.eventCount == v)             // must recheck
863                      LockSupport.parkNanos(this, SHRINK_RATE);
864                  w.parked = false;
865 <                if (w.eventCount == v && ctl == c &&
866 <                    System.nanoTime() - startTime >= SHRINK_RATE &&
867 <                    UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
868 <                    w.terminate = true;
869 <                    w.eventCount = ((int)c + EC_UNIT) & E_MASK;
865 >                if (w.eventCount != v)
866 >                    break;
867 >                else if (System.nanoTime() - startTime <
868 >                         SHRINK_RATE - (SHRINK_RATE / 10)) // timing slop
869 >                    Thread.interrupted();          // spurious wakeup
870 >                else if (UNSAFE.compareAndSwapLong(this, ctlOffset,
871 >                                                   currentCtl, prevCtl)) {
872 >                    w.terminate = true;            // restore previous
873 >                    w.eventCount = ((int)currentCtl + EC_UNIT) & E_MASK;
874 >                    break;
875                  }
876              }
877          }
# Line 886 | Line 907 | public class ForkJoinPool extends Abstra
907  
908      /**
909       * Creates or doubles submissionQueue array.
910 <     * Basically identical to ForkJoinWorkerThread version
910 >     * Basically identical to ForkJoinWorkerThread version.
911       */
912      private void growSubmissionQueue() {
913          ForkJoinTask<?>[] oldQ = submissionQueue;
# Line 926 | Line 947 | public class ForkJoinPool extends Abstra
947              int pc = parallelism;
948              do {
949                  ForkJoinWorkerThread[] ws; ForkJoinWorkerThread w;
950 <                int e, ac, tc, rc, i;
950 >                int e, ac, tc, i;
951                  long c = ctl;
952                  int u = (int)(c >>> 32);
953                  if ((e = (int)c) < 0) {
# Line 966 | Line 987 | public class ForkJoinPool extends Abstra
987      }
988  
989      /**
990 <     * Decrements blockedCount and increments active count
990 >     * Decrements blockedCount and increments active count.
991       */
992      private void postBlock() {
993          long c;
994          do {} while (!UNSAFE.compareAndSwapLong(this, ctlOffset,  // no mask
995                                                  c = ctl, c + AC_UNIT));
996          int b;
997 <        do {} while(!UNSAFE.compareAndSwapInt(this, blockedCountOffset,
998 <                                              b = blockedCount, b - 1));
997 >        do {} while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset,
998 >                                               b = blockedCount, b - 1));
999      }
1000  
1001      /**
# Line 984 | Line 1005 | public class ForkJoinPool extends Abstra
1005       * @param joinMe the task
1006       */
1007      final void tryAwaitJoin(ForkJoinTask<?> joinMe) {
987        int s;
1008          Thread.interrupted(); // clear interrupts before checking termination
1009          if (joinMe.status >= 0) {
1010              if (tryPreBlock()) {
1011                  joinMe.tryAwaitDone(0L);
1012                  postBlock();
1013              }
1014 <            if ((ctl & STOP_BIT) != 0L)
1014 >            else if ((ctl & STOP_BIT) != 0L)
1015                  joinMe.cancelIgnoringExceptions();
1016          }
1017      }
1018  
1019      /**
1020       * Possibly blocks the given worker waiting for joinMe to
1021 <     * complete or timeout
1021 >     * complete or timeout.
1022       *
1023       * @param joinMe the task
1024       * @param millis the wait time for underlying Object.wait
# Line 1034 | Line 1054 | public class ForkJoinPool extends Abstra
1054      }
1055  
1056      /**
1057 <     * If necessary, compensates for blocker, and blocks
1057 >     * If necessary, compensates for blocker, and blocks.
1058       */
1059      private void awaitBlocker(ManagedBlocker blocker)
1060          throws InterruptedException {
# Line 1126 | Line 1146 | public class ForkJoinPool extends Abstra
1146                          ws[k] = w;
1147                          nextWorkerIndex = k + 1;
1148                          int m = g & SMASK;
1149 <                        g = k >= m? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1);
1149 >                        g = (k > m) ? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1);
1150                      }
1151                  } finally {
1152                      scanGuard = g;
# Line 1206 | Line 1226 | public class ForkJoinPool extends Abstra
1226                  if ((int)(c >> AC_SHIFT) != -parallelism)
1227                      return false;
1228                  if (!shutdown || blockedCount != 0 || quiescerCount != 0 ||
1229 <                    queueTop - queueBase > 0) {
1229 >                    queueBase != queueTop) {
1230                      if (ctl == c) // staleness check
1231                          return false;
1232                      continue;
# Line 1215 | Line 1235 | public class ForkJoinPool extends Abstra
1235              if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, c | STOP_BIT))
1236                  startTerminating();
1237          }
1238 <        if ((short)(c >>> TC_SHIFT) == -parallelism) {
1239 <            submissionLock.lock();
1240 <            termination.signalAll();
1241 <            submissionLock.unlock();
1238 >        if ((short)(c >>> TC_SHIFT) == -parallelism) { // signal when 0 workers
1239 >            final ReentrantLock lock = this.submissionLock;
1240 >            lock.lock();
1241 >            try {
1242 >                termination.signalAll();
1243 >            } finally {
1244 >                lock.unlock();
1245 >            }
1246          }
1247          return true;
1248      }
1249  
1250      /**
1251       * Runs up to three passes through workers: (0) Setting
1252 <     * termination status for each worker, followed by wakeups up
1253 <     * queued workers (1) helping cancel tasks (2) interrupting
1252 >     * termination status for each worker, followed by wakeups up to
1253 >     * queued workers; (1) helping cancel tasks; (2) interrupting
1254       * lagging threads (likely in external tasks, but possibly also
1255       * blocked in joins).  Each pass repeats previous steps because of
1256       * potential lagging thread creation.
# Line 1272 | Line 1296 | public class ForkJoinPool extends Abstra
1296  
1297      /**
1298       * Tries to set the termination status of waiting workers, and
1299 <     * then wake them up (after which they will terminate).
1299 >     * then wakes them up (after which they will terminate).
1300       */
1301      private void terminateWaiters() {
1302          ForkJoinWorkerThread[] ws = workers;
# Line 1297 | Line 1321 | public class ForkJoinPool extends Abstra
1321      // misc ForkJoinWorkerThread support
1322  
1323      /**
1324 <     * Increment or decrement quiescerCount. Needed only to prevent
1324 >     * Increments or decrements quiescerCount. Needed only to prevent
1325       * triggering shutdown if a worker is transiently inactive while
1326       * checking quiescence.
1327       *
# Line 1305 | Line 1329 | public class ForkJoinPool extends Abstra
1329       */
1330      final void addQuiescerCount(int delta) {
1331          int c;
1332 <        do {} while(!UNSAFE.compareAndSwapInt(this, quiescerCountOffset,
1333 <                                              c = quiescerCount, c + delta));
1332 >        do {} while (!UNSAFE.compareAndSwapInt(this, quiescerCountOffset,
1333 >                                               c = quiescerCount, c + delta));
1334      }
1335  
1336      /**
1337 <     * Directly increment or decrement active count without
1338 <     * queuing. This method is used to transiently assert inactivation
1339 <     * while checking quiescence.
1337 >     * Directly increments or decrements active count without queuing.
1338 >     * This method is used to transiently assert inactivation while
1339 >     * checking quiescence.
1340       *
1341       * @param delta 1 for increment, -1 for decrement
1342       */
# Line 1331 | Line 1355 | public class ForkJoinPool extends Abstra
1355      final int idlePerActive() {
1356          // Approximate at powers of two for small values, saturate past 4
1357          int p = parallelism;
1358 <        int a = p + (int)(ctl >> AC_SHIFT);
1359 <        return (a > (p >>>= 1) ? 0 :
1360 <                a > (p >>>= 1) ? 1 :
1361 <                a > (p >>>= 1) ? 2 :
1362 <                a > (p >>>= 1) ? 4 :
1363 <                8);
1358 >        int a = p + (int)(ctl >> AC_SHIFT);
1359 >        return (a > (p >>>= 1) ? 0 :
1360 >                a > (p >>>= 1) ? 1 :
1361 >                a > (p >>>= 1) ? 2 :
1362 >                a > (p >>>= 1) ? 4 :
1363 >                8);
1364      }
1365  
1366      // Exported methods
# Line 1659 | Line 1683 | public class ForkJoinPool extends Abstra
1683       */
1684      public int getRunningThreadCount() {
1685          int r = parallelism + (int)(ctl >> AC_SHIFT);
1686 <        return r <= 0? 0 : r; // suppress momentarily negative values
1686 >        return (r <= 0) ? 0 : r; // suppress momentarily negative values
1687      }
1688  
1689      /**
# Line 1671 | Line 1695 | public class ForkJoinPool extends Abstra
1695       */
1696      public int getActiveThreadCount() {
1697          int r = parallelism + (int)(ctl >> AC_SHIFT) + blockedCount;
1698 <        return r <= 0? 0 : r; // suppress momentarily negative values
1698 >        return (r <= 0) ? 0 : r; // suppress momentarily negative values
1699      }
1700  
1701      /**
# Line 1728 | Line 1752 | public class ForkJoinPool extends Abstra
1752  
1753      /**
1754       * Returns an estimate of the number of tasks submitted to this
1755 <     * pool that have not yet begun executing.  This meThod may take
1755 >     * pool that have not yet begun executing.  This method may take
1756       * time proportional to the number of submissions.
1757       *
1758       * @return the number of queued submissions
# Line 1826 | Line 1850 | public class ForkJoinPool extends Abstra
1850          int ac = rc + blockedCount;
1851          String level;
1852          if ((c & STOP_BIT) != 0)
1853 <            level = (tc == 0)? "Terminated" : "Terminating";
1853 >            level = (tc == 0) ? "Terminated" : "Terminating";
1854          else
1855 <            level = shutdown? "Shutting down" : "Running";
1855 >            level = shutdown ? "Shutting down" : "Running";
1856          return super.toString() +
1857              "[" + level +
1858              ", parallelism = " + pc +
# Line 1965 | Line 1989 | public class ForkJoinPool extends Abstra
1989       * {@code isReleasable} must return {@code true} if blocking is
1990       * not necessary. Method {@code block} blocks the current thread
1991       * if necessary (perhaps internally invoking {@code isReleasable}
1992 <     * before actually blocking). The unusual methods in this API
1993 <     * accommodate synchronizers that may, but don't usually, block
1994 <     * for long periods. Similarly, they allow more efficient internal
1995 <     * handling of cases in which additional workers may be, but
1996 <     * usually are not, needed to ensure sufficient parallelism.
1997 <     * Toward this end, implementations of method {@code isReleasable}
1998 <     * must be amenable to repeated invocation.
1992 >     * before actually blocking). These actions are performed by any
1993 >     * thread invoking {@link ForkJoinPool#managedBlock}.  The
1994 >     * unusual methods in this API accommodate synchronizers that may,
1995 >     * but don't usually, block for long periods. Similarly, they
1996 >     * allow more efficient internal handling of cases in which
1997 >     * additional workers may be, but usually are not, needed to
1998 >     * ensure sufficient parallelism.  Toward this end,
1999 >     * implementations of method {@code isReleasable} must be amenable
2000 >     * to repeated invocation.
2001       *
2002       * <p>For example, here is a ManagedBlocker based on a
2003       * ReentrantLock:
# Line 2089 | Line 2115 | public class ForkJoinPool extends Abstra
2115          modifyThreadPermission = new RuntimePermission("modifyThread");
2116          defaultForkJoinWorkerThreadFactory =
2117              new DefaultForkJoinWorkerThreadFactory();
2092        int s;
2118          try {
2119              UNSAFE = getUnsafe();
2120 <            Class k = ForkJoinPool.class;
2120 >            Class<?> k = ForkJoinPool.class;
2121              ctlOffset = UNSAFE.objectFieldOffset
2122                  (k.getDeclaredField("ctl"));
2123              stealCountOffset = UNSAFE.objectFieldOffset
# Line 2105 | Line 2130 | public class ForkJoinPool extends Abstra
2130                  (k.getDeclaredField("scanGuard"));
2131              nextWorkerNumberOffset = UNSAFE.objectFieldOffset
2132                  (k.getDeclaredField("nextWorkerNumber"));
2108            Class a = ForkJoinTask[].class;
2109            ABASE = UNSAFE.arrayBaseOffset(a);
2110            s = UNSAFE.arrayIndexScale(a);
2133          } catch (Exception e) {
2134              throw new Error(e);
2135          }
2136 +        Class<?> a = ForkJoinTask[].class;
2137 +        ABASE = UNSAFE.arrayBaseOffset(a);
2138 +        int s = UNSAFE.arrayIndexScale(a);
2139          if ((s & (s-1)) != 0)
2140              throw new Error("data type scale not a power of two");
2141          ASHIFT = 31 - Integer.numberOfLeadingZeros(s);

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines