--- jsr166/src/jsr166y/ForkJoinPool.java 2009/01/07 19:12:36 1.3 +++ jsr166/src/jsr166y/ForkJoinPool.java 2009/07/22 20:55:22 1.15 @@ -27,7 +27,9 @@ import java.lang.reflect.*; * (eventually blocking if none exist). This makes them efficient when * most tasks spawn other subtasks (as do most ForkJoinTasks), as well * as the mixed execution of some plain Runnable- or Callable- based - * activities along with ForkJoinTasks. Otherwise, other + * activities along with ForkJoinTasks. When setting + * setAsyncMode, a ForkJoinPools may also be appropriate for + * use with fine-grained tasks that are never joined. Otherwise, other * ExecutorService implementations are typically more appropriate * choices. * @@ -39,7 +41,7 @@ import java.lang.reflect.*; * nested ManagedBlocker interface enables extension of * the kinds of synchronization accommodated. The target parallelism * level may also be changed dynamically (setParallelism) - * and dynamically thread construction can be limited using methods + * and thread construction can be limited using methods * setMaximumPoolSize and/or * setMaintainsParallelism. * @@ -131,11 +133,11 @@ public class ForkJoinPool extends Abstra new AtomicInteger(); /** - * Array holding all worker threads in the pool. Array size must - * be a power of two. Updates and replacements are protected by - * workerLock, but it is always kept in a consistent enough state - * to be randomly accessed without locking by workers performing - * work-stealing. + * Array holding all worker threads in the pool. Initialized upon + * first use. Array size must be a power of two. Updates and + * replacements are protected by workerLock, but it is always kept + * in a consistent enough state to be randomly accessed without + * locking by workers performing work-stealing. */ volatile ForkJoinWorkerThread[] workers; @@ -181,7 +183,7 @@ public class ForkJoinPool extends Abstra /** * Head of Treiber stack for barrier sync. See below for explanation */ - private volatile WaitQueueNode barrierStack; + private volatile WaitQueueNode syncStack; /** * The count for event barrier @@ -204,13 +206,18 @@ public class ForkJoinPool extends Abstra private volatile int parallelism; /** + * True if use local fifo, not default lifo, for local polling + */ + private volatile boolean locallyFifo; + + /** * Holds number of total (i.e., created and not yet terminated) * and running (i.e., not blocked on joins or other managed sync) * threads, packed into one int to ensure consistent snapshot when * making decisions about creating and suspending spare * threads. Updated only by CAS. Note: CASes in - * updateRunningCount and preJoin running active count is in low - * word, so need to be modified if this changes + * updateRunningCount and preJoin assume that running active count + * is in low word, so need to be modified if this changes */ private volatile int workerCounts; @@ -264,23 +271,29 @@ public class ForkJoinPool extends Abstra private static int runControlFor(int r, int a) { return (r << 16) + a; } /** - * Increment active count. Called by workers before/during - * executing tasks. + * Try incrementing active count; fail on contention. Called by + * workers before/during executing tasks. + * @return true on success; */ - final void incrementActiveCount() { - int c; - do;while (!casRunControl(c = runControl, c+1)); + final boolean tryIncrementActiveCount() { + int c = runControl; + return casRunControl(c, c+1); } /** - * Decrement active count; possibly trigger termination. + * Try decrementing active count; fail on contention. + * Possibly trigger termination on success * Called by workers when they can't find tasks. + * @return true on success */ - final void decrementActiveCount() { - int c, nextc; - do;while (!casRunControl(c = runControl, nextc = c-1)); + final boolean tryDecrementActiveCount() { + int c = runControl; + int nextc = c - 1; + if (!casRunControl(c, nextc)) + return false; if (canTerminateOnShutdown(nextc)) terminateOnShutdown(); + return true; } /** @@ -385,7 +398,7 @@ public class ForkJoinPool extends Abstra this.termination = workerLock.newCondition(); this.stealCount = new AtomicLong(); this.submissionQueue = new LinkedTransferQueue>(); - createAndStartInitialWorkers(parallelism); + // worker array and workers are lazily constructed } /** @@ -399,6 +412,7 @@ public class ForkJoinPool extends Abstra if (w != null) { w.poolIndex = index; w.setDaemon(true); + w.setAsyncMode(locallyFifo); w.setName("ForkJoinPool-" + poolNumber + "-worker-" + index); if (h != null) w.setUncaughtExceptionHandler(h); @@ -415,7 +429,8 @@ public class ForkJoinPool extends Abstra } /** - * Create or resize array if necessary to hold newLength + * Create or resize array if necessary to hold newLength. + * Call only under exclusion * @return the array */ private ForkJoinWorkerThread[] ensureWorkerArrayCapacity(int newLength) { @@ -433,35 +448,42 @@ public class ForkJoinPool extends Abstra */ private void tryShrinkWorkerArray() { ForkJoinWorkerThread[] ws = workers; - int len = ws.length; - int last = len - 1; - while (last >= 0 && ws[last] == null) - --last; - int newLength = arraySizeFor(last+1); - if (newLength < len) - workers = Arrays.copyOf(ws, newLength); + if (ws != null) { + int len = ws.length; + int last = len - 1; + while (last >= 0 && ws[last] == null) + --last; + int newLength = arraySizeFor(last+1); + if (newLength < len) + workers = Arrays.copyOf(ws, newLength); + } } /** - * Initial worker array and worker creation and startup. (This - * must be done under lock to avoid interference by some of the - * newly started threads while creating others.) + * Initialize workers if necessary */ - private void createAndStartInitialWorkers(int ps) { - final ReentrantLock lock = this.workerLock; - lock.lock(); - try { - ForkJoinWorkerThread[] ws = ensureWorkerArrayCapacity(ps); - for (int i = 0; i < ps; ++i) { - ForkJoinWorkerThread w = createWorker(i); - if (w != null) { - ws[i] = w; - w.start(); - updateWorkerCount(1); + final void ensureWorkerInitialization() { + ForkJoinWorkerThread[] ws = workers; + if (ws == null) { + final ReentrantLock lock = this.workerLock; + lock.lock(); + try { + ws = workers; + if (ws == null) { + int ps = parallelism; + ws = ensureWorkerArrayCapacity(ps); + for (int i = 0; i < ps; ++i) { + ForkJoinWorkerThread w = createWorker(i); + if (w != null) { + ws[i] = w; + w.start(); + updateWorkerCount(1); + } + } } + } finally { + lock.unlock(); } - } finally { - lock.unlock(); } } @@ -507,8 +529,10 @@ public class ForkJoinPool extends Abstra private void doSubmit(ForkJoinTask task) { if (isShutdown()) throw new RejectedExecutionException(); + if (workers == null) + ensureWorkerInitialization(); submissionQueue.offer(task); - signalIdleWorkers(true); + signalIdleWorkers(); } /** @@ -677,10 +701,12 @@ public class ForkJoinPool extends Abstra old = ueh; ueh = h; ForkJoinWorkerThread[] ws = workers; - for (int i = 0; i < ws.length; ++i) { - ForkJoinWorkerThread w = ws[i]; - if (w != null) - w.setUncaughtExceptionHandler(h); + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread w = ws[i]; + if (w != null) + w.setUncaughtExceptionHandler(h); + } } } finally { lock.unlock(); @@ -717,7 +743,7 @@ public class ForkJoinPool extends Abstra } finally { lock.unlock(); } - signalIdleWorkers(false); + signalIdleWorkers(); } /** @@ -787,6 +813,42 @@ public class ForkJoinPool extends Abstra } /** + * Establishes local first-in-first-out scheduling mode for forked + * tasks that are never joined. This mode may be more appropriate + * than default locally stack-based mode in applications in which + * worker threads only process asynchronous tasks. This method is + * designed to be invoked only when pool is quiescent, and + * typically only before any tasks are submitted. The effects of + * invocations at ather times may be unpredictable. + * + * @param async if true, use locally FIFO scheduling + * @return the previous mode. + */ + public boolean setAsyncMode(boolean async) { + boolean oldMode = locallyFifo; + locallyFifo = async; + ForkJoinWorkerThread[] ws = workers; + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread t = ws[i]; + if (t != null) + t.setAsyncMode(async); + } + } + return oldMode; + } + + /** + * Returns true if this pool uses local first-in-first-out + * scheduling mode for forked tasks that are never joined. + * + * @return true if this pool uses async mode. + */ + public boolean getAsyncMode() { + return locallyFifo; + } + + /** * Returns an estimate of the number of worker threads that are * not blocked waiting to join tasks or for other managed * synchronization. @@ -868,10 +930,12 @@ public class ForkJoinPool extends Abstra public long getQueuedTaskCount() { long count = 0; ForkJoinWorkerThread[] ws = workers; - for (int i = 0; i < ws.length; ++i) { - ForkJoinWorkerThread t = ws[i]; - if (t != null) - count += t.getQueueSize(); + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread t = ws[i]; + if (t != null) + count += t.getQueueSize(); + } } return count; } @@ -906,6 +970,35 @@ public class ForkJoinPool extends Abstra } /** + * Removes all available unexecuted submitted and forked tasks + * from scheduling queues and adds them to the given collection, + * without altering their execution status. These may include + * artifically generated or wrapped tasks. This method id designed + * to be invoked only when the pool is known to be + * quiescent. Invocations at other times may not remove all + * tasks. A failure encountered while attempting to add elements + * to collection c may result in elements being in + * neither, either or both collections when the associated + * exception is thrown. The behavior of this operation is + * undefined if the specified collection is modified while the + * operation is in progress. + * @param c the collection to transfer elements into + * @return the number of elements transferred + */ + protected int drainTasksTo(Collection> c) { + int n = submissionQueue.drainTo(c); + ForkJoinWorkerThread[] ws = workers; + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread w = ws[i]; + if (w != null) + n += w.drainTasksTo(c); + } + } + return n; + } + + /** * Returns a string identifying this pool, as well as its state, * including indications of run state, parallelism level, and * worker and task counts. @@ -966,8 +1059,10 @@ public class ForkJoinPool extends Abstra * waiting tasks. Tasks that are in the process of being * submitted or executed concurrently during the course of this * method may or may not be rejected. Unlike some other executors, - * this method cancels rather than collects non-executed tasks, - * so always returns an empty list. + * this method cancels rather than collects non-executed tasks + * upon termination, so always returns an empty list. However, you + * can use method drainTasksTo before invoking this + * method to transfer unexecuted tasks to another collection. * @return an empty list * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads @@ -1052,22 +1147,24 @@ public class ForkJoinPool extends Abstra lock.lock(); try { ForkJoinWorkerThread[] ws = workers; - int idx = w.poolIndex; - if (idx >= 0 && idx < ws.length && ws[idx] == w) - ws[idx] = null; - if (totalCountOf(workerCounts) == 0) { - terminate(); // no-op if already terminating - transitionRunStateTo(TERMINATED); - termination.signalAll(); - } - else if (!isTerminating()) { - tryShrinkWorkerArray(); - tryResumeSpare(true); // allow replacement + if (ws != null) { + int idx = w.poolIndex; + if (idx >= 0 && idx < ws.length && ws[idx] == w) + ws[idx] = null; + if (totalCountOf(workerCounts) == 0) { + terminate(); // no-op if already terminating + transitionRunStateTo(TERMINATED); + termination.signalAll(); + } + else if (!isTerminating()) { + tryShrinkWorkerArray(); + tryResumeSpare(true); // allow replacement + } } } finally { lock.unlock(); } - signalIdleWorkers(false); + signalIdleWorkers(); } /** @@ -1077,11 +1174,11 @@ public class ForkJoinPool extends Abstra if (transitionRunStateTo(TERMINATING)) { stopAllWorkers(); resumeAllSpares(); - signalIdleWorkers(true); + signalIdleWorkers(); cancelQueuedSubmissions(); cancelQueuedWorkerTasks(); interruptUnterminatedWorkers(); - signalIdleWorkers(true); // resignal after interrupt + signalIdleWorkers(); // resignal after interrupt } } @@ -1110,10 +1207,12 @@ public class ForkJoinPool extends Abstra lock.lock(); try { ForkJoinWorkerThread[] ws = workers; - for (int i = 0; i < ws.length; ++i) { - ForkJoinWorkerThread t = ws[i]; - if (t != null) - t.cancelTasks(); + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread t = ws[i]; + if (t != null) + t.cancelTasks(); + } } } finally { lock.unlock(); @@ -1129,10 +1228,12 @@ public class ForkJoinPool extends Abstra lock.lock(); try { ForkJoinWorkerThread[] ws = workers; - for (int i = 0; i < ws.length; ++i) { - ForkJoinWorkerThread t = ws[i]; - if (t != null) - t.shutdownNow(); + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread t = ws[i]; + if (t != null) + t.shutdownNow(); + } } } finally { lock.unlock(); @@ -1149,12 +1250,14 @@ public class ForkJoinPool extends Abstra lock.lock(); try { ForkJoinWorkerThread[] ws = workers; - for (int i = 0; i < ws.length; ++i) { - ForkJoinWorkerThread t = ws[i]; - if (t != null && !t.isTerminated()) { - try { - t.interrupt(); - } catch (SecurityException ignore) { + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread t = ws[i]; + if (t != null && !t.isTerminated()) { + try { + t.interrupt(); + } catch (SecurityException ignore) { + } } } } @@ -1165,75 +1268,92 @@ public class ForkJoinPool extends Abstra /* - * Nodes for event barrier to manage idle threads. + * Nodes for event barrier to manage idle threads. Queue nodes + * are basic Treiber stack nodes, also used for spare stack. * * The event barrier has an event count and a wait queue (actually * a Treiber stack). Workers are enabled to look for work when - * the eventCount is incremented. If they fail to find some, - * they may wait for next count. Synchronization events occur only - * in enough contexts to maintain overall liveness: + * the eventCount is incremented. If they fail to find work, they + * may wait for next count. Upon release, threads help others wake + * up. + * + * Synchronization events occur only in enough contexts to + * maintain overall liveness: * * - Submission of a new task to the pool - * - Creation or termination of a worker + * - Resizes or other changes to the workers array * - pool termination * - A worker pushing a task on an empty queue * - * The last case (pushing a task) occurs often enough, and is - * heavy enough compared to simple stack pushes to require some - * special handling: Method signalNonEmptyWorkerQueue returns - * without advancing count if the queue appears to be empty. This - * would ordinarily result in races causing some queued waiters - * not to be woken up. To avoid this, a worker in sync - * rescans for tasks after being enqueued if it was the first to - * enqueue, and aborts the wait if finding one, also helping to - * signal others. This works well because the worker has nothing - * better to do anyway, and so might as well help alleviate the - * overhead and contention on the threads actually doing work. - * - * Queue nodes are basic Treiber stack nodes, also used for spare - * stack. + * The case of pushing a task occurs often enough, and is heavy + * enough compared to simple stack pushes, to require special + * handling: Method signalWork returns without advancing count if + * the queue appears to be empty. This would ordinarily result in + * races causing some queued waiters not to be woken up. To avoid + * this, the first worker enqueued in method sync (see + * syncIsReleasable) rescans for tasks after being enqueued, and + * helps signal if any are found. This works well because the + * worker has nothing better to do, and so might as well help + * alleviate the overhead and contention on the threads actually + * doing work. Also, since event counts increments on task + * availability exist to maintain liveness (rather than to force + * refreshes etc), it is OK for callers to exit early if + * contending with another signaller. */ static final class WaitQueueNode { WaitQueueNode next; // only written before enqueued volatile ForkJoinWorkerThread thread; // nulled to cancel wait final long count; // unused for spare stack - WaitQueueNode(ForkJoinWorkerThread w, long c) { + + WaitQueueNode(long c, ForkJoinWorkerThread w) { count = c; thread = w; } - final boolean signal() { + + /** + * Wake up waiter, returning false if known to already + */ + boolean signal() { ForkJoinWorkerThread t = thread; + if (t == null) + return false; thread = null; - if (t != null) { - LockSupport.unpark(t); - return true; + LockSupport.unpark(t); + return true; + } + + /** + * Await release on sync + */ + void awaitSyncRelease(ForkJoinPool p) { + while (thread != null && !p.syncIsReleasable(this)) + LockSupport.park(this); + } + + /** + * Await resumption as spare + */ + void awaitSpareRelease() { + while (thread != null) { + if (!Thread.interrupted()) + LockSupport.park(this); } - return false; } } /** - * Release at least one thread waiting for event count to advance, - * if one exists. If initial attempt fails, release all threads. - * @param all if false, at first try to only release one thread - * @return current event + * Ensures that no thread is waiting for count to advance from the + * current value of eventCount read on entry to this method, by + * releasing waiting threads if necessary. + * @return the count */ - private long releaseIdleWorkers(boolean all) { - long c; - for (;;) { - WaitQueueNode q = barrierStack; - c = eventCount; - long qc; - if (q == null || (qc = q.count) >= c) - break; - if (!all) { - if (casBarrierStack(q, q.next) && q.signal()) - break; - all = true; - } - else if (casBarrierStack(q, null)) { + final long ensureSync() { + long c = eventCount; + WaitQueueNode q; + while ((q = syncStack) != null && q.count < c) { + if (casBarrierStack(q, null)) { do { - q.signal(); + q.signal(); } while ((q = q.next) != null); break; } @@ -1242,82 +1362,97 @@ public class ForkJoinPool extends Abstra } /** - * Returns current barrier event count - * @return current barrier event count + * Increments event count and releases waiting threads. */ - final long getEventCount() { - long ec = eventCount; - releaseIdleWorkers(true); // release to ensure accurate result - return ec; - } - - /** - * Increment event count and release at least one waiting thread, - * if one exists (released threads will in turn wake up others). - * @param all if true, try to wake up all - */ - final void signalIdleWorkers(boolean all) { + private void signalIdleWorkers() { long c; do;while (!casEventCount(c = eventCount, c+1)); - releaseIdleWorkers(all); + ensureSync(); } /** - * Wake up threads waiting to steal a task. Because method - * sync rechecks availability, it is OK to only proceed if - * queue appears to be non-empty. + * Signal threads waiting to poll a task. Because method sync + * rechecks availability, it is OK to only proceed if queue + * appears to be non-empty, and OK to skip under contention to + * increment count (since some other thread succeeded). */ - final void signalNonEmptyWorkerQueue() { - // If CAS fails another signaller must have succeeded + final void signalWork() { long c; - if (barrierStack != null && casEventCount(c = eventCount, c+1)) - releaseIdleWorkers(false); + WaitQueueNode q; + if (syncStack != null && + casEventCount(c = eventCount, c+1) && + (((q = syncStack) != null && q.count <= c) && + (!casBarrierStack(q, q.next) || !q.signal()))) + ensureSync(); } /** - * Waits until event count advances from count, or some thread is - * waiting on a previous count, or there is stealable work - * available. Help wake up others on release. + * Waits until event count advances from last value held by + * caller, or if excess threads, caller is resumed as spare, or + * caller or pool is terminating. Updates caller's event on exit. * @param w the calling worker thread - * @param prev previous value returned by sync (or 0) - * @return current event count */ - final long sync(ForkJoinWorkerThread w, long prev) { - updateStealCount(w); + final void sync(ForkJoinWorkerThread w) { + updateStealCount(w); // Transfer w's count while it is idle - while (!w.isShutdown() && !isTerminating() && - (parallelism >= runningCountOf(workerCounts) || - !suspendIfSpare(w))) { // prefer suspend to waiting here + while (!w.isShutdown() && !isTerminating() && !suspendIfSpare(w)) { + long prev = w.lastEventCount; WaitQueueNode node = null; - boolean queued = false; - for (;;) { - if (!queued) { - if (eventCount != prev) - break; - WaitQueueNode h = barrierStack; - if (h != null && h.count != prev) - break; // release below and maybe retry - if (node == null) - node = new WaitQueueNode(w, prev); - queued = casBarrierStack(node.next = h, node); - } - else if (Thread.interrupted() || - node.thread == null || - (node.next == null && w.prescan()) || - eventCount != prev) { - node.thread = null; - if (eventCount == prev) // help trigger - casEventCount(prev, prev+1); + WaitQueueNode h; + while (eventCount == prev && + ((h = syncStack) == null || h.count == prev)) { + if (node == null) + node = new WaitQueueNode(prev, w); + if (casBarrierStack(node.next = h, node)) { + node.awaitSyncRelease(this); break; } - else - LockSupport.park(this); } + long ec = ensureSync(); + if (ec != prev) { + w.lastEventCount = ec; + break; + } + } + } + + /** + * Returns true if worker waiting on sync can proceed: + * - on signal (thread == null) + * - on event count advance (winning race to notify vs signaller) + * - on Interrupt + * - if the first queued node, we find work available + * If node was not signalled and event count not advanced on exit, + * then we also help advance event count. + * @return true if node can be released + */ + final boolean syncIsReleasable(WaitQueueNode node) { + long prev = node.count; + if (!Thread.interrupted() && node.thread != null && + (node.next != null || + !ForkJoinWorkerThread.hasQueuedTasks(workers)) && + eventCount == prev) + return false; + if (node.thread != null) { + node.thread = null; long ec = eventCount; - if (releaseIdleWorkers(false) != prev) - return ec; + if (prev <= ec) // help signal + casEventCount(ec, ec+1); } - return prev; // return old count if aborted + return true; + } + + /** + * Returns true if a new sync event occurred since last call to + * sync or this method, if so, updating caller's count. + */ + final boolean hasNewSyncEvent(ForkJoinWorkerThread w) { + long lc = w.lastEventCount; + long ec = ensureSync(); + if (ec == lc) + return false; + w.lastEventCount = ec; + return true; } // Parallelism maintenance @@ -1372,7 +1507,8 @@ public class ForkJoinPool extends Abstra /** * Same idea as preJoin */ - final boolean preBlock(ManagedBlocker blocker, boolean maintainParallelism){ + final boolean preBlock(ManagedBlocker blocker, + boolean maintainParallelism) { maintainParallelism &= maintainsParallelism; boolean dec = false; while (spareStack == null || !tryResumeSpare(dec)) { @@ -1408,26 +1544,8 @@ public class ForkJoinPool extends Abstra return (tc < maxPoolSize && (rc == 0 || totalSurplus < 0 || (maintainParallelism && - runningDeficit > totalSurplus && mayHaveQueuedWork()))); - } - - /** - * Returns true if at least one worker queue appears to be - * nonempty. This is expensive but not often called. It is not - * critical that this be accurate, but if not, more or fewer - * running threads than desired might be maintained. - */ - private boolean mayHaveQueuedWork() { - ForkJoinWorkerThread[] ws = workers; - int len = ws.length; - ForkJoinWorkerThread v; - for (int i = 0; i < len; ++i) { - if ((v = ws[i]) != null && v.getRawQueueSize() > 0) { - releaseIdleWorkers(false); // help wake up stragglers - return true; - } - } - return false; + runningDeficit > totalSurplus && + ForkJoinWorkerThread.hasQueuedTasks(workers)))); } /** @@ -1483,7 +1601,7 @@ public class ForkJoinPool extends Abstra } else updateWorkerCount(-1); // adjust on failure - signalIdleWorkers(false); + signalIdleWorkers(); } /** @@ -1499,17 +1617,12 @@ public class ForkJoinPool extends Abstra int s; while (parallelism < runningCountOf(s = workerCounts)) { if (node == null) - node = new WaitQueueNode(w, 0); + node = new WaitQueueNode(0, w); if (casWorkerCounts(s, s-1)) { // representation-dependent // push onto stack do;while (!casSpareStack(node.next = spareStack, node)); - // block until released by resumeSpare - while (node.thread != null) { - if (!Thread.interrupted()) - LockSupport.park(this); - } - w.activate(); // help warm up + node.awaitSpareRelease(); return true; } } @@ -1535,8 +1648,7 @@ public class ForkJoinPool extends Abstra } /** - * Pop and resume all spare threads. Same idea as - * releaseIdleWorkers. + * Pop and resume all spare threads. Same idea as ensureSync. * @return true if any spares released */ private boolean resumeAllSpares() { @@ -1577,16 +1689,6 @@ public class ForkJoinPool extends Abstra } /** - * Returns approximate number of spares, just for diagnostics. - */ - private int countSpares() { - int sum = 0; - for (WaitQueueNode q = spareStack; q != null; q = q.next) - ++sum; - return sum; - } - - /** * Interface for extending managed parallelism for tasks running * in ForkJoinPools. A ManagedBlocker provides two methods. * Method isReleasable must return true if blocking is not @@ -1692,34 +1794,51 @@ public class ForkJoinPool extends Abstra // Temporary Unsafe mechanics for preliminary release + private static Unsafe getUnsafe() throws Throwable { + try { + return Unsafe.getUnsafe(); + } catch (SecurityException se) { + try { + return java.security.AccessController.doPrivileged + (new java.security.PrivilegedExceptionAction() { + public Unsafe run() throws Exception { + return getUnsafePrivileged(); + }}); + } catch (java.security.PrivilegedActionException e) { + throw e.getCause(); + } + } + } + + private static Unsafe getUnsafePrivileged() + throws NoSuchFieldException, IllegalAccessException { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (Unsafe) f.get(null); + } + + private static long fieldOffset(String fieldName) + throws NoSuchFieldException { + return _unsafe.objectFieldOffset + (ForkJoinPool.class.getDeclaredField(fieldName)); + } static final Unsafe _unsafe; static final long eventCountOffset; static final long workerCountsOffset; static final long runControlOffset; - static final long barrierStackOffset; + static final long syncStackOffset; static final long spareStackOffset; static { try { - if (ForkJoinPool.class.getClassLoader() != null) { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - _unsafe = (Unsafe)f.get(null); - } - else - _unsafe = Unsafe.getUnsafe(); - eventCountOffset = _unsafe.objectFieldOffset - (ForkJoinPool.class.getDeclaredField("eventCount")); - workerCountsOffset = _unsafe.objectFieldOffset - (ForkJoinPool.class.getDeclaredField("workerCounts")); - runControlOffset = _unsafe.objectFieldOffset - (ForkJoinPool.class.getDeclaredField("runControl")); - barrierStackOffset = _unsafe.objectFieldOffset - (ForkJoinPool.class.getDeclaredField("barrierStack")); - spareStackOffset = _unsafe.objectFieldOffset - (ForkJoinPool.class.getDeclaredField("spareStack")); - } catch (Exception e) { + _unsafe = getUnsafe(); + eventCountOffset = fieldOffset("eventCount"); + workerCountsOffset = fieldOffset("workerCounts"); + runControlOffset = fieldOffset("runControl"); + syncStackOffset = fieldOffset("syncStack"); + spareStackOffset = fieldOffset("spareStack"); + } catch (Throwable e) { throw new RuntimeException("Could not initialize intrinsics", e); } } @@ -1737,6 +1856,6 @@ public class ForkJoinPool extends Abstra return _unsafe.compareAndSwapObject(this, spareStackOffset, cmp, val); } private boolean casBarrierStack(WaitQueueNode cmp, WaitQueueNode val) { - return _unsafe.compareAndSwapObject(this, barrierStackOffset, cmp, val); + return _unsafe.compareAndSwapObject(this, syncStackOffset, cmp, val); } }