--- jsr166/src/jsr166y/ForkJoinWorkerThread.java 2010/09/20 20:42:37 1.51 +++ jsr166/src/jsr166y/ForkJoinWorkerThread.java 2010/11/17 15:36:39 1.55 @@ -6,11 +6,10 @@ package jsr166y; -import java.util.concurrent.*; - import java.util.Random; import java.util.Collection; import java.util.concurrent.locks.LockSupport; +import java.util.concurrent.RejectedExecutionException; /** * A thread managed by a {@link ForkJoinPool}. This class is @@ -750,10 +749,10 @@ public class ForkJoinWorkerThread extend // Run State management // status check methods used mainly by ForkJoinPool - final boolean isRunning() { return runState == 0; } - final boolean isTerminated() { return (runState & TERMINATED) != 0; } - final boolean isSuspended() { return (runState & SUSPENDED) != 0; } - final boolean isTrimmed() { return (runState & TRIMMED) != 0; } + final boolean isRunning() { return runState == 0; } + final boolean isTerminated() { return (runState & TERMINATED) != 0; } + final boolean isSuspended() { return (runState & SUSPENDED) != 0; } + final boolean isTrimmed() { return (runState & TRIMMED) != 0; } final boolean isTerminating() { if ((runState & TERMINATING) != 0) @@ -932,20 +931,23 @@ public class ForkJoinWorkerThread extend * Possibly runs some tasks and/or blocks, until task is done. * * @param joinMe the task to join + * @param timed true if use timed wait + * @param nanos wait time if timed */ - final void joinTask(ForkJoinTask joinMe) { + final void joinTask(ForkJoinTask joinMe, boolean timed, long nanos) { // currentJoin only written by this thread; only need ordered store ForkJoinTask prevJoin = currentJoin; UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe); - if (sp != base) - localHelpJoinTask(joinMe); - if (joinMe.status >= 0) - pool.awaitJoin(joinMe, this); + if (isTerminating()) // cancel if shutting down + joinMe.cancelIgnoringExceptions(); + else + pool.awaitJoin(joinMe, this, timed, nanos); UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin); } /** * Run tasks in local queue until given task is done. + * Not currently used because it complicates semantics. * * @param joinMe the task to join */ @@ -978,11 +980,11 @@ public class ForkJoinWorkerThread extend } /** - * Unless terminating, tries to locate and help perform tasks for - * a stealer of the given task, or in turn one of its stealers. - * Traces currentSteal->currentJoin links looking for a thread - * working on a descendant of the given task and with a non-empty - * queue to steal back and execute tasks from. + * Tries to locate and help perform tasks for a stealer of the + * given task, or in turn one of its stealers. Traces + * currentSteal->currentJoin links looking for a thread working on + * a descendant of the given task and with a non-empty queue to + * steal back and execute tasks from. * * The implementation is very branchy to cope with potential * inconsistencies or loops encountering chains that are stale, @@ -998,10 +1000,6 @@ public class ForkJoinWorkerThread extend int n; if (joinMe.status < 0) // already done return; - if ((runState & TERMINATING) != 0) { // cancel if shutting down - joinMe.cancelIgnoringExceptions(); - return; - } if ((ws = pool.workers) == null || (n = ws.length) <= 1) return; // need at least 2 workers @@ -1014,7 +1012,7 @@ public class ForkJoinWorkerThread extend for (int j = 0; ; ++j) { // search array if (j < n) { ForkJoinTask vs; - if ((v = ws[j]) != null && + if ((v = ws[j]) != null && v != this && (vs = v.currentSteal) != null) { if (joinMe.status < 0 || task.status < 0) return; // stale or done