--- jsr166/src/jsr166y/ForkJoinTask.java 2012/10/31 12:49:24 1.92 +++ jsr166/src/jsr166y/ForkJoinTask.java 2012/11/14 17:20:37 1.93 @@ -285,10 +285,9 @@ public abstract class ForkJoinTask im */ private int externalAwaitDone() { int s; + ForkJoinPool.externalHelpJoin(this); boolean interrupted = false; - if ((s = status) >= 0 && ForkJoinPool.tryUnsubmitFromCommonPool(this)) - s = doExec(); - while (s >= 0) { + while ((s = status) >= 0) { if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { synchronized (this) { if (status >= 0) { @@ -302,7 +301,6 @@ public abstract class ForkJoinTask im notifyAll(); } } - s = status; } if (interrupted) Thread.currentThread().interrupt(); @@ -313,12 +311,11 @@ public abstract class ForkJoinTask im * Blocks a non-worker-thread until completion or interruption. */ private int externalInterruptibleAwaitDone() throws InterruptedException { + int s; if (Thread.interrupted()) throw new InterruptedException(); - int s; - if ((s = status) >= 0 && ForkJoinPool.tryUnsubmitFromCommonPool(this)) - s = doExec(); - while (s >= 0) { + ForkJoinPool.externalHelpJoin(this); + while ((s = status) >= 0) { if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { synchronized (this) { if (status >= 0) @@ -327,11 +324,11 @@ public abstract class ForkJoinTask im notifyAll(); } } - s = status; } return s; } + /** * Implementation for join, get, quietlyJoin. Directly handles * only cases of already-completed, external wait, and @@ -655,7 +652,7 @@ public abstract class ForkJoinTask im if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else - ForkJoinPool.submitToCommonPool(this); + ForkJoinPool.commonPool.externalPush(this); return this; } @@ -991,16 +988,18 @@ public abstract class ForkJoinTask im ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; p = wt.pool; w = wt.workQueue; - s = p.helpJoinOnce(w, this); // no retries on failure + p.helpJoinOnce(w, this); // no retries on failure } + else + ForkJoinPool.externalHelpJoin(this); boolean canBlock = false; boolean interrupted = false; try { while ((s = status) >= 0) { - if (w != null && w.runState < 0) + if (w != null && w.qlock < 0) cancelIgnoringExceptions(this); else if (!canBlock) { - if (p == null || p.tryCompensate(this, null)) + if (p == null || p.tryCompensate()) canBlock = true; } else { @@ -1139,9 +1138,9 @@ public abstract class ForkJoinTask im */ public boolean tryUnfork() { Thread t; - return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? - ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) : - ForkJoinPool.tryUnsubmitFromCommonPool(this); + return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? + ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) : + ForkJoinPool.tryExternalUnpush(this)); } /** @@ -1153,10 +1152,12 @@ public abstract class ForkJoinTask im * @return the number of tasks */ public static int getQueuedTaskCount() { - Thread t; - return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? - ((ForkJoinWorkerThread)t).workQueue.queueSize() : - ForkJoinPool.getEstimatedSubmitterQueueLength(); + Thread t; ForkJoinPool.WorkQueue q; + if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) + q = ((ForkJoinWorkerThread)t).workQueue; + else + q = ForkJoinPool.commonSubmitterQueue(); + return (q == null) ? 0 : q.queueSize(); } /** @@ -1173,53 +1174,7 @@ public abstract class ForkJoinTask im * @return the surplus number of tasks, which may be negative */ public static int getSurplusQueuedTaskCount() { - /* - * The aim of this method is to return a cheap heuristic guide - * for task partitioning when programmers, frameworks, tools, - * or languages have little or no idea about task granularity. - * In essence by offering this method, we ask users only about - * tradeoffs in overhead vs expected throughput and its - * variance, rather than how finely to partition tasks. - * - * In a steady state strict (tree-structured) computation, - * each thread makes available for stealing enough tasks for - * other threads to remain active. Inductively, if all threads - * play by the same rules, each thread should make available - * only a constant number of tasks. - * - * The minimum useful constant is just 1. But using a value of - * 1 would require immediate replenishment upon each steal to - * maintain enough tasks, which is infeasible. Further, - * partitionings/granularities of offered tasks should - * minimize steal rates, which in general means that threads - * nearer the top of computation tree should generate more - * than those nearer the bottom. In perfect steady state, each - * thread is at approximately the same level of computation - * tree. However, producing extra tasks amortizes the - * uncertainty of progress and diffusion assumptions. - * - * So, users will want to use values larger, but not much - * larger than 1 to both smooth over transient shortages and - * hedge against uneven progress; as traded off against the - * cost of extra task overhead. We leave the user to pick a - * threshold value to compare with the results of this call to - * guide decisions, but recommend values such as 3. - * - * When all threads are active, it is on average OK to - * estimate surplus strictly locally. In steady-state, if one - * thread is maintaining say 2 surplus tasks, then so are - * others. So we can just use estimated queue length. - * However, this strategy alone leads to serious mis-estimates - * in some non-steady-state conditions (ramp-up, ramp-down, - * other stalls). We can detect many of these by further - * considering the number of "idle" threads, that are known to - * have zero queued tasks, so compensate by a factor of - * (#idle/#active) threads. - */ - Thread t; ForkJoinWorkerThread wt; - return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? - (wt = (ForkJoinWorkerThread)t).workQueue.queueSize() - wt.pool.idlePerActive() : - 0; + return ForkJoinPool.getSurplusQueuedTaskCount(); } // Extension methods @@ -1263,21 +1218,22 @@ public abstract class ForkJoinTask im /** * Returns, but does not unschedule or execute, a task queued by * the current thread but not yet executed, if one is immediately - * available and the current thread is operating in a - * ForkJoinPool. There is no guarantee that this task will - * actually be polled or executed next. Conversely, this method - * may return null even if a task exists but cannot be accessed - * without contention with other threads. This method is designed + * available. There is no guarantee that this task will actually + * be polled or executed next. Conversely, this method may return + * null even if a task exists but cannot be accessed without + * contention with other threads. This method is designed * primarily to support extensions, and is unlikely to be useful * otherwise. * * @return the next task, or {@code null} if none are available */ protected static ForkJoinTask peekNextLocalTask() { - Thread t; - return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? - ((ForkJoinWorkerThread)t).workQueue.peek() : - null; + Thread t; ForkJoinPool.WorkQueue q; + if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) + q = ((ForkJoinWorkerThread)t).workQueue; + else + q = ForkJoinPool.commonSubmitterQueue(); + return (q == null) ? null : q.peek(); } /** @@ -1502,14 +1458,16 @@ public abstract class ForkJoinTask im // Unsafe mechanics private static final sun.misc.Unsafe U; private static final long STATUS; + static { exceptionTableLock = new ReentrantLock(); exceptionTableRefQueue = new ReferenceQueue(); exceptionTable = new ExceptionNode[EXCEPTION_MAP_CAPACITY]; try { U = getUnsafe(); + Class k = ForkJoinTask.class; STATUS = U.objectFieldOffset - (ForkJoinTask.class.getDeclaredField("status")); + (k.getDeclaredField("status")); } catch (Exception e) { throw new Error(e); }