--- jsr166/src/jsr166y/ForkJoinPool.java 2009/01/12 17:16:18 1.4 +++ jsr166/src/jsr166y/ForkJoinPool.java 2009/07/20 23:07:43 1.10 @@ -27,7 +27,9 @@ import java.lang.reflect.*; * (eventually blocking if none exist). This makes them efficient when * most tasks spawn other subtasks (as do most ForkJoinTasks), as well * as the mixed execution of some plain Runnable- or Callable- based - * activities along with ForkJoinTasks. Otherwise, other + * activities along with ForkJoinTasks. When setting + * {@code setAsyncMode}, a ForkJoinPools may also be appropriate for + * use with fine-grained tasks that are never joined. Otherwise, other * ExecutorService implementations are typically more appropriate * choices. * @@ -36,18 +38,18 @@ import java.lang.reflect.*; * adding, suspending, or resuming threads, even if some tasks are * waiting to join others. However, no such adjustments are performed * in the face of blocked IO or other unmanaged synchronization. The - * nested ManagedBlocker interface enables extension of + * nested {@code ManagedBlocker} interface enables extension of * the kinds of synchronization accommodated. The target parallelism - * level may also be changed dynamically (setParallelism) - * and dynamically thread construction can be limited using methods - * setMaximumPoolSize and/or - * setMaintainsParallelism. + * level may also be changed dynamically ({@code setParallelism}) + * and thread construction can be limited using methods + * {@code setMaximumPoolSize} and/or + * {@code setMaintainsParallelism}. * *

In addition to execution and lifecycle control methods, this * class provides status check methods (for example - * getStealCount) that are intended to aid in developing, + * {@code getStealCount}) that are intended to aid in developing, * tuning, and monitoring fork/join applications. Also, method - * toString returns indications of pool state in a + * {@code toString} returns indications of pool state in a * convenient form for informal monitoring. * *

Implementation notes: This implementation restricts the @@ -131,11 +133,11 @@ public class ForkJoinPool extends Abstra new AtomicInteger(); /** - * Array holding all worker threads in the pool. Array size must - * be a power of two. Updates and replacements are protected by - * workerLock, but it is always kept in a consistent enough state - * to be randomly accessed without locking by workers performing - * work-stealing. + * Array holding all worker threads in the pool. Initialized upon + * first use. Array size must be a power of two. Updates and + * replacements are protected by workerLock, but it is always kept + * in a consistent enough state to be randomly accessed without + * locking by workers performing work-stealing. */ volatile ForkJoinWorkerThread[] workers; @@ -151,7 +153,7 @@ public class ForkJoinPool extends Abstra /** * The uncaught exception handler used when any worker - * abrupty terminates + * abruptly terminates */ private Thread.UncaughtExceptionHandler ueh; @@ -204,6 +206,11 @@ public class ForkJoinPool extends Abstra private volatile int parallelism; /** + * True if use local fifo, not default lifo, for local polling + */ + private volatile boolean locallyFifo; + + /** * Holds number of total (i.e., created and not yet terminated) * and running (i.e., not blocked on joins or other managed sync) * threads, packed into one int to ensure consistent snapshot when @@ -219,9 +226,9 @@ public class ForkJoinPool extends Abstra private static int workerCountsFor(int t, int r) { return (t << 16) + r; } /** - * Add delta (which may be negative) to running count. This must + * Adds delta (which may be negative) to running count. This must * be called before (with negative arg) and after (with positive) - * any managed synchronization (i.e., mainly, joins) + * any managed synchronization (i.e., mainly, joins). * @param delta the number to add */ final void updateRunningCount(int delta) { @@ -230,7 +237,7 @@ public class ForkJoinPool extends Abstra } /** - * Add delta (which may be negative) to both total and running + * Adds delta (which may be negative) to both total and running * count. This must be called upon creation and termination of * worker threads. * @param delta the number to add @@ -274,8 +281,8 @@ public class ForkJoinPool extends Abstra } /** - * Try decrementing active count; fail on contention. - * Possibly trigger termination on success + * Tries decrementing active count; fails on contention. + * Possibly triggers termination on success. * Called by workers when they can't find tasks. * @return true on success */ @@ -290,7 +297,7 @@ public class ForkJoinPool extends Abstra } /** - * Return true if argument represents zero active count and + * Returns true if argument represents zero active count and * nonzero runstate, which is the triggering condition for * terminating on shutdown. */ @@ -326,7 +333,7 @@ public class ForkJoinPool extends Abstra * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), + * java.lang.RuntimePermission}{@code ("modifyThread")}, */ public ForkJoinPool() { this(Runtime.getRuntime().availableProcessors(), @@ -334,7 +341,7 @@ public class ForkJoinPool extends Abstra } /** - * Creates a ForkJoinPool with the indicated parellelism level + * Creates a ForkJoinPool with the indicated parallelism level * threads, and using the default ForkJoinWorkerThreadFactory, * @param parallelism the number of worker threads * @throws IllegalArgumentException if parallelism less than or @@ -342,7 +349,7 @@ public class ForkJoinPool extends Abstra * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), + * java.lang.RuntimePermission}{@code ("modifyThread")}, */ public ForkJoinPool(int parallelism) { this(parallelism, defaultForkJoinWorkerThreadFactory); @@ -357,7 +364,7 @@ public class ForkJoinPool extends Abstra * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), + * java.lang.RuntimePermission}{@code ("modifyThread")}, */ public ForkJoinPool(ForkJoinWorkerThreadFactory factory) { this(Runtime.getRuntime().availableProcessors(), factory); @@ -374,7 +381,7 @@ public class ForkJoinPool extends Abstra * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), + * java.lang.RuntimePermission}{@code ("modifyThread")}, */ public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory) { if (parallelism <= 0 || parallelism > MAX_THREADS) @@ -391,7 +398,7 @@ public class ForkJoinPool extends Abstra this.termination = workerLock.newCondition(); this.stealCount = new AtomicLong(); this.submissionQueue = new LinkedTransferQueue>(); - createAndStartInitialWorkers(parallelism); + // worker array and workers are lazily constructed } /** @@ -405,6 +412,7 @@ public class ForkJoinPool extends Abstra if (w != null) { w.poolIndex = index; w.setDaemon(true); + w.setAsyncMode(locallyFifo); w.setName("ForkJoinPool-" + poolNumber + "-worker-" + index); if (h != null) w.setUncaughtExceptionHandler(h); @@ -413,7 +421,7 @@ public class ForkJoinPool extends Abstra } /** - * Return a good size for worker array given pool size. + * Returns a good size for worker array given pool size. * Currently requires size to be a power of two. */ private static int arraySizeFor(int ps) { @@ -421,7 +429,8 @@ public class ForkJoinPool extends Abstra } /** - * Create or resize array if necessary to hold newLength + * Creates or resizes array if necessary to hold newLength. + * Call only under exclusion or lock. * @return the array */ private ForkJoinWorkerThread[] ensureWorkerArrayCapacity(int newLength) { @@ -439,35 +448,42 @@ public class ForkJoinPool extends Abstra */ private void tryShrinkWorkerArray() { ForkJoinWorkerThread[] ws = workers; - int len = ws.length; - int last = len - 1; - while (last >= 0 && ws[last] == null) - --last; - int newLength = arraySizeFor(last+1); - if (newLength < len) - workers = Arrays.copyOf(ws, newLength); + if (ws != null) { + int len = ws.length; + int last = len - 1; + while (last >= 0 && ws[last] == null) + --last; + int newLength = arraySizeFor(last+1); + if (newLength < len) + workers = Arrays.copyOf(ws, newLength); + } } /** - * Initial worker array and worker creation and startup. (This - * must be done under lock to avoid interference by some of the - * newly started threads while creating others.) + * Initialize workers if necessary */ - private void createAndStartInitialWorkers(int ps) { - final ReentrantLock lock = this.workerLock; - lock.lock(); - try { - ForkJoinWorkerThread[] ws = ensureWorkerArrayCapacity(ps); - for (int i = 0; i < ps; ++i) { - ForkJoinWorkerThread w = createWorker(i); - if (w != null) { - ws[i] = w; - w.start(); - updateWorkerCount(1); + final void ensureWorkerInitialization() { + ForkJoinWorkerThread[] ws = workers; + if (ws == null) { + final ReentrantLock lock = this.workerLock; + lock.lock(); + try { + ws = workers; + if (ws == null) { + int ps = parallelism; + ws = ensureWorkerArrayCapacity(ps); + for (int i = 0; i < ps; ++i) { + ForkJoinWorkerThread w = createWorker(i); + if (w != null) { + ws[i] = w; + w.start(); + updateWorkerCount(1); + } + } } + } finally { + lock.unlock(); } - } finally { - lock.unlock(); } } @@ -513,6 +529,8 @@ public class ForkJoinPool extends Abstra private void doSubmit(ForkJoinTask task) { if (isShutdown()) throw new RejectedExecutionException(); + if (workers == null) + ensureWorkerInitialization(); submissionQueue.offer(task); signalIdleWorkers(); } @@ -671,7 +689,7 @@ public class ForkJoinPool extends Abstra * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), + * java.lang.RuntimePermission}{@code ("modifyThread")}, */ public Thread.UncaughtExceptionHandler setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler h) { @@ -683,10 +701,12 @@ public class ForkJoinPool extends Abstra old = ueh; ueh = h; ForkJoinWorkerThread[] ws = workers; - for (int i = 0; i < ws.length; ++i) { - ForkJoinWorkerThread w = ws[i]; - if (w != null) - w.setUncaughtExceptionHandler(h); + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread w = ws[i]; + if (w != null) + w.setUncaughtExceptionHandler(h); + } } } finally { lock.unlock(); @@ -696,14 +716,14 @@ public class ForkJoinPool extends Abstra /** - * Sets the target paralleism level of this pool. + * Sets the target parallelism level of this pool. * @param parallelism the target parallelism * @throws IllegalArgumentException if parallelism less than or * equal to zero or greater than maximum size bounds. * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), + * java.lang.RuntimePermission}{@code ("modifyThread")}, */ public void setParallelism(int parallelism) { checkPermission(); @@ -738,7 +758,7 @@ public class ForkJoinPool extends Abstra /** * Returns the number of worker threads that have started but not * yet terminated. This result returned by this method may differ - * from getParallelism when threads are created to + * from {@code getParallelism} when threads are created to * maintain parallelism when others are cooperatively blocked. * * @return the number of worker threads @@ -793,6 +813,42 @@ public class ForkJoinPool extends Abstra } /** + * Establishes local first-in-first-out scheduling mode for forked + * tasks that are never joined. This mode may be more appropriate + * than default locally stack-based mode in applications in which + * worker threads only process asynchronous tasks. This method is + * designed to be invoked only when pool is quiescent, and + * typically only before any tasks are submitted. The effects of + * invocations at other times may be unpredictable. + * + * @param async if true, use locally FIFO scheduling + * @return the previous mode. + */ + public boolean setAsyncMode(boolean async) { + boolean oldMode = locallyFifo; + locallyFifo = async; + ForkJoinWorkerThread[] ws = workers; + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread t = ws[i]; + if (t != null) + t.setAsyncMode(async); + } + } + return oldMode; + } + + /** + * Returns true if this pool uses local first-in-first-out + * scheduling mode for forked tasks that are never joined. + * + * @return true if this pool uses async mode. + */ + public boolean getAsyncMode() { + return locallyFifo; + } + + /** * Returns an estimate of the number of worker threads that are * not blocked waiting to join tasks or for other managed * synchronization. @@ -874,10 +930,12 @@ public class ForkJoinPool extends Abstra public long getQueuedTaskCount() { long count = 0; ForkJoinWorkerThread[] ws = workers; - for (int i = 0; i < ws.length; ++i) { - ForkJoinWorkerThread t = ws[i]; - if (t != null) - count += t.getQueueSize(); + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread t = ws[i]; + if (t != null) + count += t.getQueueSize(); + } } return count; } @@ -895,7 +953,7 @@ public class ForkJoinPool extends Abstra /** * Returns true if there are any tasks submitted to this pool * that have not yet begun executing. - * @return true if there are any queued submissions. + * @return {@code true} if there are any queued submissions. */ public boolean hasQueuedSubmissions() { return !submissionQueue.isEmpty(); @@ -912,6 +970,35 @@ public class ForkJoinPool extends Abstra } /** + * Removes all available unexecuted submitted and forked tasks + * from scheduling queues and adds them to the given collection, + * without altering their execution status. These may include + * artificially generated or wrapped tasks. This method is designed + * to be invoked only when the pool is known to be + * quiescent. Invocations at other times may not remove all + * tasks. A failure encountered while attempting to add elements + * to collection {@code c} may result in elements being in + * neither, either or both collections when the associated + * exception is thrown. The behavior of this operation is + * undefined if the specified collection is modified while the + * operation is in progress. + * @param c the collection to transfer elements into + * @return the number of elements transferred + */ + protected int drainTasksTo(Collection> c) { + int n = submissionQueue.drainTo(c); + ForkJoinWorkerThread[] ws = workers; + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread w = ws[i]; + if (w != null) + n += w.drainTasksTo(c); + } + } + return n; + } + + /** * Returns a string identifying this pool, as well as its state, * including indications of run state, parallelism level, and * worker and task counts. @@ -958,7 +1045,7 @@ public class ForkJoinPool extends Abstra * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), + * java.lang.RuntimePermission}{@code ("modifyThread")}, */ public void shutdown() { checkPermission(); @@ -972,13 +1059,15 @@ public class ForkJoinPool extends Abstra * waiting tasks. Tasks that are in the process of being * submitted or executed concurrently during the course of this * method may or may not be rejected. Unlike some other executors, - * this method cancels rather than collects non-executed tasks, - * so always returns an empty list. + * this method cancels rather than collects non-executed tasks + * upon termination, so always returns an empty list. However, you + * can use method {@code drainTasksTo} before invoking this + * method to transfer unexecuted tasks to another collection. * @return an empty list * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), + * java.lang.RuntimePermission}{@code ("modifyThread")}, */ public List shutdownNow() { checkPermission(); @@ -987,28 +1076,28 @@ public class ForkJoinPool extends Abstra } /** - * Returns true if all tasks have completed following shut down. + * Returns {@code true} if all tasks have completed following shut down. * - * @return true if all tasks have completed following shut down + * @return {@code true} if all tasks have completed following shut down */ public boolean isTerminated() { return runStateOf(runControl) == TERMINATED; } /** - * Returns true if the process of termination has + * Returns {@code true} if the process of termination has * commenced but possibly not yet completed. * - * @return true if terminating + * @return {@code true} if terminating */ public boolean isTerminating() { return runStateOf(runControl) >= TERMINATING; } /** - * Returns true if this pool has been shut down. + * Returns {@code true} if this pool has been shut down. * - * @return true if this pool has been shut down + * @return {@code true} if this pool has been shut down */ public boolean isShutdown() { return runStateOf(runControl) >= SHUTDOWN; @@ -1021,8 +1110,8 @@ public class ForkJoinPool extends Abstra * * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument - * @return true if this executor terminated and - * false if the timeout elapsed before termination + * @return {@code true} if this executor terminated and + * {@code false} if the timeout elapsed before termination * @throws InterruptedException if interrupted while waiting */ public boolean awaitTermination(long timeout, TimeUnit unit) @@ -1058,17 +1147,19 @@ public class ForkJoinPool extends Abstra lock.lock(); try { ForkJoinWorkerThread[] ws = workers; - int idx = w.poolIndex; - if (idx >= 0 && idx < ws.length && ws[idx] == w) - ws[idx] = null; - if (totalCountOf(workerCounts) == 0) { - terminate(); // no-op if already terminating - transitionRunStateTo(TERMINATED); - termination.signalAll(); - } - else if (!isTerminating()) { - tryShrinkWorkerArray(); - tryResumeSpare(true); // allow replacement + if (ws != null) { + int idx = w.poolIndex; + if (idx >= 0 && idx < ws.length && ws[idx] == w) + ws[idx] = null; + if (totalCountOf(workerCounts) == 0) { + terminate(); // no-op if already terminating + transitionRunStateTo(TERMINATED); + termination.signalAll(); + } + else if (!isTerminating()) { + tryShrinkWorkerArray(); + tryResumeSpare(true); // allow replacement + } } } finally { lock.unlock(); @@ -1092,7 +1183,7 @@ public class ForkJoinPool extends Abstra } /** - * Possibly terminate when on shutdown state + * Possibly terminates when on shutdown state. */ private void terminateOnShutdown() { if (!hasQueuedSubmissions() && canTerminateOnShutdown(runControl)) @@ -1100,7 +1191,7 @@ public class ForkJoinPool extends Abstra } /** - * Clear out and cancel submissions + * Clears out and cancels submissions. */ private void cancelQueuedSubmissions() { ForkJoinTask task; @@ -1109,17 +1200,19 @@ public class ForkJoinPool extends Abstra } /** - * Clean out worker queues. + * Cleans out worker queues. */ private void cancelQueuedWorkerTasks() { final ReentrantLock lock = this.workerLock; lock.lock(); try { ForkJoinWorkerThread[] ws = workers; - for (int i = 0; i < ws.length; ++i) { - ForkJoinWorkerThread t = ws[i]; - if (t != null) - t.cancelTasks(); + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread t = ws[i]; + if (t != null) + t.cancelTasks(); + } } } finally { lock.unlock(); @@ -1127,18 +1220,20 @@ public class ForkJoinPool extends Abstra } /** - * Set each worker's status to terminating. Requires lock to avoid - * conflicts with add/remove + * Sets each worker's status to terminating. Requires lock to avoid + * conflicts with add/remove. */ private void stopAllWorkers() { final ReentrantLock lock = this.workerLock; lock.lock(); try { ForkJoinWorkerThread[] ws = workers; - for (int i = 0; i < ws.length; ++i) { - ForkJoinWorkerThread t = ws[i]; - if (t != null) - t.shutdownNow(); + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread t = ws[i]; + if (t != null) + t.shutdownNow(); + } } } finally { lock.unlock(); @@ -1146,7 +1241,7 @@ public class ForkJoinPool extends Abstra } /** - * Interrupt all unterminated workers. This is not required for + * Interrupts all unterminated workers. This is not required for * sake of internal control, but may help unstick user code during * shutdown. */ @@ -1155,12 +1250,14 @@ public class ForkJoinPool extends Abstra lock.lock(); try { ForkJoinWorkerThread[] ws = workers; - for (int i = 0; i < ws.length; ++i) { - ForkJoinWorkerThread t = ws[i]; - if (t != null && !t.isTerminated()) { - try { - t.interrupt(); - } catch (SecurityException ignore) { + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread t = ws[i]; + if (t != null && !t.isTerminated()) { + try { + t.interrupt(); + } catch (SecurityException ignore) { + } } } } @@ -1214,7 +1311,7 @@ public class ForkJoinPool extends Abstra } /** - * Wake up waiter, returning false if known to already + * Wakes up waiter, returning false if known to already */ boolean signal() { ForkJoinWorkerThread t = thread; @@ -1226,7 +1323,7 @@ public class ForkJoinPool extends Abstra } /** - * Await release on sync + * Awaits release on sync. */ void awaitSyncRelease(ForkJoinPool p) { while (thread != null && !p.syncIsReleasable(this)) @@ -1234,7 +1331,7 @@ public class ForkJoinPool extends Abstra } /** - * Await resumption as spare + * Awaits resumption as spare. */ void awaitSpareRelease() { while (thread != null) { @@ -1274,7 +1371,7 @@ public class ForkJoinPool extends Abstra } /** - * Signal threads waiting to poll a task. Because method sync + * Signals threads waiting to poll a task. Because method sync * rechecks availability, it is OK to only proceed if queue * appears to be non-empty, and OK to skip under contention to * increment count (since some other thread succeeded). @@ -1302,7 +1399,7 @@ public class ForkJoinPool extends Abstra long prev = w.lastEventCount; WaitQueueNode node = null; WaitQueueNode h; - while (eventCount == prev && + while (eventCount == prev && ((h = syncStack) == null || h.count == prev)) { if (node == null) node = new WaitQueueNode(prev, w); @@ -1324,8 +1421,8 @@ public class ForkJoinPool extends Abstra * - on signal (thread == null) * - on event count advance (winning race to notify vs signaller) * - on Interrupt - * - if the first queued node, we find work available - * If node was not signalled and event count not advanced on exit, + * - if the first queued node, we find work available + * If node was not signalled and event count not advanced on exit, * then we also help advance event count. * @return true if node can be released */ @@ -1361,7 +1458,7 @@ public class ForkJoinPool extends Abstra // Parallelism maintenance /** - * Decrement running count; if too low, add spare. + * Decrements running count; if too low, adds spare. * * Conceptually, all we need to do here is add or resume a * spare thread when one is about to block (and remove or @@ -1381,7 +1478,7 @@ public class ForkJoinPool extends Abstra * only be suspended or removed when they are idle, not * immediately when they aren't needed. So adding threads will * raise parallelism level for longer than necessary. Also, - * FJ applications often enounter highly transient peaks when + * FJ applications often encounter highly transient peaks when * many threads are blocked joining, but for less time than it * takes to create or resume spares. * @@ -1446,13 +1543,13 @@ public class ForkJoinPool extends Abstra return (tc < maxPoolSize && (rc == 0 || totalSurplus < 0 || (maintainParallelism && - runningDeficit > totalSurplus && + runningDeficit > totalSurplus && ForkJoinWorkerThread.hasQueuedTasks(workers)))); } - + /** - * Add a spare worker if lock available and no more than the - * expected numbers of threads exist + * Adds a spare worker if lock available and no more than the + * expected numbers of threads exist. * @return true if successful */ private boolean tryAddSpare(int expectedCounts) { @@ -1485,7 +1582,7 @@ public class ForkJoinPool extends Abstra } /** - * Add the kth spare worker. On entry, pool coounts are already + * Adds the kth spare worker. On entry, pool counts are already * adjusted to reflect addition. */ private void createAndStartSpare(int k) { @@ -1507,11 +1604,11 @@ public class ForkJoinPool extends Abstra } /** - * Suspend calling thread w if there are excess threads. Called - * only from sync. Spares are enqueued in a Treiber stack - * using the same WaitQueueNodes as barriers. They are resumed - * mainly in preJoin, but are also woken on pool events that - * require all threads to check run state. + * Suspends calling thread w if there are excess threads. Called + * only from sync. Spares are enqueued in a Treiber stack using + * the same WaitQueueNodes as barriers. They are resumed mainly + * in preJoin, but are also woken on pool events that require all + * threads to check run state. * @param w the caller */ private boolean suspendIfSpare(ForkJoinWorkerThread w) { @@ -1532,7 +1629,7 @@ public class ForkJoinPool extends Abstra } /** - * Try to pop and resume a spare thread. + * Tries to pop and resume a spare thread. * @param updateCount if true, increment running count on success * @return true if successful */ @@ -1550,7 +1647,7 @@ public class ForkJoinPool extends Abstra } /** - * Pop and resume all spare threads. Same idea as ensureSync. + * Pops and resumes all spare threads. Same idea as ensureSync. * @return true if any spares released */ private boolean resumeAllSpares() { @@ -1568,7 +1665,7 @@ public class ForkJoinPool extends Abstra } /** - * Pop and shutdown excessive spare threads. Call only while + * Pops and shuts down excessive spare threads. Call only while * holding lock. This is not guaranteed to eliminate all excess * threads, only those suspended as spares, which are the ones * unlikely to be needed in the future. @@ -1593,8 +1690,8 @@ public class ForkJoinPool extends Abstra /** * Interface for extending managed parallelism for tasks running * in ForkJoinPools. A ManagedBlocker provides two methods. - * Method isReleasable must return true if blocking is not - * necessary. Method block blocks the current thread + * Method {@code isReleasable} must return true if blocking is not + * necessary. Method {@code block} blocks the current thread * if necessary (perhaps internally invoking isReleasable before * actually blocking.). *

For example, here is a ManagedBlocker based on a @@ -1622,7 +1719,7 @@ public class ForkJoinPool extends Abstra * @return true if no additional blocking is necessary (i.e., * if isReleasable would return true). * @throws InterruptedException if interrupted while waiting - * (the method is not required to do so, but is allowe to). + * (the method is not required to do so, but is allowed to). */ boolean block() throws InterruptedException; @@ -1637,7 +1734,7 @@ public class ForkJoinPool extends Abstra * is a ForkJoinWorkerThread, this method possibly arranges for a * spare thread to be activated if necessary to ensure parallelism * while the current thread is blocked. If - * maintainParallelism is true and the pool supports + * {@code maintainParallelism} is true and the pool supports * it ({@link #getMaintainsParallelism}), this method attempts to * maintain the pool's nominal parallelism. Otherwise if activates * a thread only if necessary to avoid complete starvation. This @@ -1696,6 +1793,34 @@ public class ForkJoinPool extends Abstra // Temporary Unsafe mechanics for preliminary release + private static Unsafe getUnsafe() throws Throwable { + try { + return Unsafe.getUnsafe(); + } catch (SecurityException se) { + try { + return java.security.AccessController.doPrivileged + (new java.security.PrivilegedExceptionAction() { + public Unsafe run() throws Exception { + return getUnsafePrivileged(); + }}); + } catch (java.security.PrivilegedActionException e) { + throw e.getCause(); + } + } + } + + private static Unsafe getUnsafePrivileged() + throws NoSuchFieldException, IllegalAccessException { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (Unsafe) f.get(null); + } + + private static long fieldOffset(String fieldName) + throws NoSuchFieldException { + return _unsafe.objectFieldOffset + (ForkJoinPool.class.getDeclaredField(fieldName)); + } static final Unsafe _unsafe; static final long eventCountOffset; @@ -1706,24 +1831,13 @@ public class ForkJoinPool extends Abstra static { try { - if (ForkJoinPool.class.getClassLoader() != null) { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - _unsafe = (Unsafe)f.get(null); - } - else - _unsafe = Unsafe.getUnsafe(); - eventCountOffset = _unsafe.objectFieldOffset - (ForkJoinPool.class.getDeclaredField("eventCount")); - workerCountsOffset = _unsafe.objectFieldOffset - (ForkJoinPool.class.getDeclaredField("workerCounts")); - runControlOffset = _unsafe.objectFieldOffset - (ForkJoinPool.class.getDeclaredField("runControl")); - syncStackOffset = _unsafe.objectFieldOffset - (ForkJoinPool.class.getDeclaredField("syncStack")); - spareStackOffset = _unsafe.objectFieldOffset - (ForkJoinPool.class.getDeclaredField("spareStack")); - } catch (Exception e) { + _unsafe = getUnsafe(); + eventCountOffset = fieldOffset("eventCount"); + workerCountsOffset = fieldOffset("workerCounts"); + runControlOffset = fieldOffset("runControl"); + syncStackOffset = fieldOffset("syncStack"); + spareStackOffset = fieldOffset("spareStack"); + } catch (Throwable e) { throw new RuntimeException("Could not initialize intrinsics", e); } }