--- jsr166/src/jsr166y/ForkJoinPool.java 2009/08/04 12:41:27 1.45
+++ jsr166/src/jsr166y/ForkJoinPool.java 2009/12/05 11:39:03 1.52
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.Atomi
* An {@link ExecutorService} for running {@link ForkJoinTask}s.
* A {@code ForkJoinPool} provides the entry point for submissions
* from non-{@code ForkJoinTask}s, as well as management and
- * monitoring operations.
+ * monitoring operations.
*
*
A {@code ForkJoinPool} differs from other kinds of {@link
* ExecutorService} mainly by virtue of employing
@@ -82,9 +82,12 @@ import java.util.concurrent.atomic.Atomi
*
*
Implementation notes: This implementation restricts the
* maximum number of running threads to 32767. Attempts to create
- * pools with greater than the maximum result in
+ * pools with greater than the maximum number result in
* {@code IllegalArgumentException}.
*
+ *
This implementation rejects submitted tasks (that is, by throwing
+ * {@link RejectedExecutionException}) only when the pool is shut down.
+ *
* @since 1.7
* @author Doug Lea
*/
@@ -112,7 +115,7 @@ public class ForkJoinPool extends Abstra
* Returns a new worker thread operating in the given pool.
*
* @param pool the pool this thread works in
- * @throws NullPointerException if pool is null
+ * @throws NullPointerException if the pool is null
*/
public ForkJoinWorkerThread newThread(ForkJoinPool pool);
}
@@ -384,7 +387,7 @@ public class ForkJoinPool extends Abstra
*
* @param parallelism the parallelism level
* @throws IllegalArgumentException if parallelism less than or
- * equal to zero
+ * equal to zero, or greater than implementation limit
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
* because it does not hold {@link
@@ -400,7 +403,7 @@ public class ForkJoinPool extends Abstra
* thread factory.
*
* @param factory the factory for creating new threads
- * @throws NullPointerException if factory is null
+ * @throws NullPointerException if the factory is null
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
* because it does not hold {@link
@@ -417,8 +420,8 @@ public class ForkJoinPool extends Abstra
* @param parallelism the parallelism level
* @param factory the factory for creating new threads
* @throws IllegalArgumentException if parallelism less than or
- * equal to zero, or greater than implementation limit
- * @throws NullPointerException if factory is null
+ * equal to zero, or greater than implementation limit
+ * @throws NullPointerException if the factory is null
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
* because it does not hold {@link
@@ -523,14 +526,16 @@ public class ForkJoinPool extends Abstra
ws = workers;
if (ws == null) {
int ps = parallelism;
+ updateWorkerCount(ps);
ws = ensureWorkerArrayCapacity(ps);
for (int i = 0; i < ps; ++i) {
ForkJoinWorkerThread w = createWorker(i);
if (w != null) {
ws[i] = w;
w.start();
- updateWorkerCount(1);
}
+ else
+ updateWorkerCount(-1);
}
}
} finally {
@@ -594,8 +599,9 @@ public class ForkJoinPool extends Abstra
*
* @param task the task
* @return the task's result
- * @throws NullPointerException if task is null
- * @throws RejectedExecutionException if pool is shut down
+ * @throws NullPointerException if the task is null
+ * @throws RejectedExecutionException if the task cannot be
+ * scheduled for execution
*/
public T invoke(ForkJoinTask task) {
doSubmit(task);
@@ -606,8 +612,9 @@ public class ForkJoinPool extends Abstra
* Arranges for (asynchronous) execution of the given task.
*
* @param task the task
- * @throws NullPointerException if task is null
- * @throws RejectedExecutionException if pool is shut down
+ * @throws NullPointerException if the task is null
+ * @throws RejectedExecutionException if the task cannot be
+ * scheduled for execution
*/
public void execute(ForkJoinTask> task) {
doSubmit(task);
@@ -615,6 +622,11 @@ public class ForkJoinPool extends Abstra
// AbstractExecutorService methods
+ /**
+ * @throws NullPointerException if the task is null
+ * @throws RejectedExecutionException if the task cannot be
+ * scheduled for execution
+ */
public void execute(Runnable task) {
ForkJoinTask> job;
if (task instanceof ForkJoinTask>) // avoid re-wrap
@@ -624,18 +636,33 @@ public class ForkJoinPool extends Abstra
doSubmit(job);
}
+ /**
+ * @throws NullPointerException if the task is null
+ * @throws RejectedExecutionException if the task cannot be
+ * scheduled for execution
+ */
public ForkJoinTask submit(Callable task) {
ForkJoinTask job = ForkJoinTask.adapt(task);
doSubmit(job);
return job;
}
+ /**
+ * @throws NullPointerException if the task is null
+ * @throws RejectedExecutionException if the task cannot be
+ * scheduled for execution
+ */
public ForkJoinTask submit(Runnable task, T result) {
ForkJoinTask job = ForkJoinTask.adapt(task, result);
doSubmit(job);
return job;
}
+ /**
+ * @throws NullPointerException if the task is null
+ * @throws RejectedExecutionException if the task cannot be
+ * scheduled for execution
+ */
public ForkJoinTask> submit(Runnable task) {
ForkJoinTask> job;
if (task instanceof ForkJoinTask>) // avoid re-wrap
@@ -651,9 +678,9 @@ public class ForkJoinPool extends Abstra
*
* @param task the task to submit
* @return the task
+ * @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
- * @throws NullPointerException if the task is null
*/
public ForkJoinTask submit(ForkJoinTask task) {
doSubmit(task);
@@ -661,6 +688,10 @@ public class ForkJoinPool extends Abstra
}
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ * @throws RejectedExecutionException {@inheritDoc}
+ */
public List> invokeAll(Collection extends Callable> tasks) {
ArrayList> forkJoinTasks =
new ArrayList>(tasks.size());
@@ -770,10 +801,12 @@ public class ForkJoinPool extends Abstra
if (isProcessingTasks()) {
int p = this.parallelism;
this.parallelism = parallelism;
- if (parallelism > p)
- createAndStartAddedWorkers();
- else
- trimSpares();
+ if (workers != null) {
+ if (parallelism > p)
+ createAndStartAddedWorkers();
+ else
+ trimSpares();
+ }
}
} finally {
lock.unlock();
@@ -1077,7 +1110,7 @@ public class ForkJoinPool extends Abstra
}
private static String runStateToString(int rs) {
- switch(rs) {
+ switch (rs) {
case RUNNING: return "Running";
case SHUTDOWN: return "Shutting down";
case TERMINATING: return "Terminating";
@@ -1347,7 +1380,6 @@ public class ForkJoinPool extends Abstra
}
}
-
/*
* Nodes for event barrier to manage idle threads. Queue nodes
* are basic Treiber stack nodes, also used for spare stack.
@@ -1371,15 +1403,14 @@ public class ForkJoinPool extends Abstra
* handling: Method signalWork returns without advancing count if
* the queue appears to be empty. This would ordinarily result in
* races causing some queued waiters not to be woken up. To avoid
- * this, the first worker enqueued in method sync (see
- * syncIsReleasable) rescans for tasks after being enqueued, and
- * helps signal if any are found. This works well because the
- * worker has nothing better to do, and so might as well help
- * alleviate the overhead and contention on the threads actually
- * doing work. Also, since event counts increments on task
- * availability exist to maintain liveness (rather than to force
- * refreshes etc), it is OK for callers to exit early if
- * contending with another signaller.
+ * this, the first worker enqueued in method sync rescans for
+ * tasks after being enqueued, and helps signal if any are
+ * found. This works well because the worker has nothing better to
+ * do, and so might as well help alleviate the overhead and
+ * contention on the threads actually doing work. Also, since
+ * event counts increments on task availability exist to maintain
+ * liveness (rather than to force refreshes etc), it is OK for
+ * callers to exit early if contending with another signaller.
*/
static final class WaitQueueNode {
WaitQueueNode next; // only written before enqueued
@@ -1392,32 +1423,13 @@ public class ForkJoinPool extends Abstra
}
/**
- * Wakes up waiter, returning false if known to already
+ * Wakes up waiter, also clearing thread field
*/
- boolean signal() {
+ void signal() {
ForkJoinWorkerThread t = thread;
- if (t == null)
- return false;
- thread = null;
- LockSupport.unpark(t);
- return true;
- }
-
- /**
- * Awaits release on sync.
- */
- void awaitSyncRelease(ForkJoinPool p) {
- while (thread != null && !p.syncIsReleasable(this))
- LockSupport.park(this);
- }
-
- /**
- * Awaits resumption as spare.
- */
- void awaitSpareRelease() {
- while (thread != null) {
- if (!Thread.interrupted())
- LockSupport.park(this);
+ if (t != null) {
+ thread = null;
+ LockSupport.unpark(t);
}
}
}
@@ -1426,10 +1438,8 @@ public class ForkJoinPool extends Abstra
* Ensures that no thread is waiting for count to advance from the
* current value of eventCount read on entry to this method, by
* releasing waiting threads if necessary.
- *
- * @return the count
*/
- final long ensureSync() {
+ final void ensureSync() {
long c = eventCount;
WaitQueueNode q;
while ((q = syncStack) != null && q.count < c) {
@@ -1440,7 +1450,6 @@ public class ForkJoinPool extends Abstra
break;
}
}
- return c;
}
/**
@@ -1455,22 +1464,26 @@ public class ForkJoinPool extends Abstra
/**
* Signals threads waiting to poll a task. Because method sync
* rechecks availability, it is OK to only proceed if queue
- * appears to be non-empty, and OK to skip under contention to
- * increment count (since some other thread succeeded).
+ * appears to be non-empty, and OK if CAS to increment count
+ * fails (since some other thread succeeded).
*/
final void signalWork() {
- long c;
- WaitQueueNode q;
- if (syncStack != null &&
- casEventCount(c = eventCount, c+1) &&
- (((q = syncStack) != null && q.count <= c) &&
- (!casBarrierStack(q, q.next) || !q.signal())))
- ensureSync();
+ if (syncStack != null) {
+ long c = eventCount;
+ casEventCount(c, c+1);
+ WaitQueueNode q = syncStack;
+ if (q != null && q.count <= c) {
+ if (casBarrierStack(q, q.next))
+ q.signal();
+ else
+ ensureSync(); // awaken all on contention
+ }
+ }
}
/**
- * Waits until event count advances from last value held by
- * caller, or if excess threads, caller is resumed as spare, or
+ * Possibly blocks until event count advances from last value held
+ * by caller, or if excess threads, caller is resumed as spare, or
* caller or pool is terminating. Updates caller's event on exit.
*
* @param w the calling worker thread
@@ -1478,52 +1491,35 @@ public class ForkJoinPool extends Abstra
final void sync(ForkJoinWorkerThread w) {
updateStealCount(w); // Transfer w's count while it is idle
- while (!w.isShutdown() && isProcessingTasks() && !suspendIfSpare(w)) {
+ if (!w.isShutdown() && isProcessingTasks() && !suspendIfSpare(w)) {
long prev = w.lastEventCount;
WaitQueueNode node = null;
WaitQueueNode h;
- while (eventCount == prev &&
+ long c;
+ while ((c = eventCount) == prev &&
((h = syncStack) == null || h.count == prev)) {
if (node == null)
node = new WaitQueueNode(prev, w);
if (casBarrierStack(node.next = h, node)) {
- node.awaitSyncRelease(this);
+ if (!Thread.interrupted() &&
+ node.thread != null &&
+ eventCount == prev &&
+ (h != null || // cover signalWork race
+ (!ForkJoinWorkerThread.hasQueuedTasks(workers) &&
+ eventCount == prev)))
+ LockSupport.park(this);
+ c = eventCount;
+ if (node.thread != null) { // help signal if not unparked
+ node.thread = null;
+ if (c == prev)
+ casEventCount(prev, prev + 1);
+ }
break;
}
}
- long ec = ensureSync();
- if (ec != prev) {
- w.lastEventCount = ec;
- break;
- }
- }
- }
-
- /**
- * Returns {@code true} if worker waiting on sync can proceed:
- * - on signal (thread == null)
- * - on event count advance (winning race to notify vs signaller)
- * - on interrupt
- * - if the first queued node, we find work available
- * If node was not signalled and event count not advanced on exit,
- * then we also help advance event count.
- *
- * @return {@code true} if node can be released
- */
- final boolean syncIsReleasable(WaitQueueNode node) {
- long prev = node.count;
- if (!Thread.interrupted() && node.thread != null &&
- (node.next != null ||
- !ForkJoinWorkerThread.hasQueuedTasks(workers)) &&
- eventCount == prev)
- return false;
- if (node.thread != null) {
- node.thread = null;
- long ec = eventCount;
- if (prev <= ec) // help signal
- casEventCount(ec, ec+1);
+ w.lastEventCount = c;
+ ensureSync();
}
- return true;
}
/**
@@ -1531,12 +1527,12 @@ public class ForkJoinPool extends Abstra
* call to sync or this method, if so, updating caller's count.
*/
final boolean hasNewSyncEvent(ForkJoinWorkerThread w) {
- long lc = w.lastEventCount;
- long ec = ensureSync();
- if (ec == lc)
- return false;
- w.lastEventCount = ec;
- return true;
+ long wc = w.lastEventCount;
+ long c = eventCount;
+ if (wc != c)
+ w.lastEventCount = c;
+ ensureSync();
+ return wc != c || wc != eventCount;
}
// Parallelism maintenance
@@ -1578,7 +1574,6 @@ public class ForkJoinPool extends Abstra
while (spareStack == null || !tryResumeSpare(dec)) {
int counts = workerCounts;
if (dec || (dec = casWorkerCounts(counts, --counts))) {
- // CAS cheat
if (!needSpare(counts, maintainParallelism))
break;
if (joinMe.status < 0)
@@ -1703,19 +1698,25 @@ public class ForkJoinPool extends Abstra
*/
private boolean suspendIfSpare(ForkJoinWorkerThread w) {
WaitQueueNode node = null;
- int s;
- while (parallelism < runningCountOf(s = workerCounts)) {
+ for (;;) {
+ int s = workerCounts;
+ int rc = runningCountOf(s);
+ int tc = totalCountOf(s);
+ int ps = parallelism;
+ // use tc as bound if rc transiently out of sync
+ if (tc <= ps || rc <= ps)
+ return false; // not a spare
if (node == null)
node = new WaitQueueNode(0, w);
- if (casWorkerCounts(s, s-1)) { // representation-dependent
- // push onto stack
- do {} while (!casSpareStack(node.next = spareStack, node));
- // block until released by resumeSpare
- node.awaitSpareRelease();
- return true;
- }
+ if (casWorkerCounts(s, workerCountsFor(tc, rc - 1)))
+ break;
}
- return false;
+ // push onto stack
+ do {} while (!casSpareStack(node.next = spareStack, node));
+ // block until released by resumeSpare
+ while (!Thread.interrupted() && node.thread != null)
+ LockSupport.park(this);
+ return true;
}
/**