ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinTask.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinTask.java (file contents):
Revision 1.92 by dl, Wed Oct 31 12:49:24 2012 UTC vs.
Revision 1.93 by dl, Wed Nov 14 17:20:37 2012 UTC

# Line 285 | Line 285 | public abstract class ForkJoinTask<V> im
285       */
286      private int externalAwaitDone() {
287          int s;
288 +        ForkJoinPool.externalHelpJoin(this);
289          boolean interrupted = false;
290 <        if ((s = status) >= 0 && ForkJoinPool.tryUnsubmitFromCommonPool(this))
290 <            s = doExec();
291 <        while (s >= 0) {
290 >        while ((s = status) >= 0) {
291              if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
292                  synchronized (this) {
293                      if (status >= 0) {
# Line 302 | Line 301 | public abstract class ForkJoinTask<V> im
301                          notifyAll();
302                  }
303              }
305            s = status;
304          }
305          if (interrupted)
306              Thread.currentThread().interrupt();
# Line 313 | Line 311 | public abstract class ForkJoinTask<V> im
311       * Blocks a non-worker-thread until completion or interruption.
312       */
313      private int externalInterruptibleAwaitDone() throws InterruptedException {
314 +        int s;
315          if (Thread.interrupted())
316              throw new InterruptedException();
317 <        int s;
318 <        if ((s = status) >= 0 && ForkJoinPool.tryUnsubmitFromCommonPool(this))
320 <            s = doExec();
321 <        while (s >= 0) {
317 >        ForkJoinPool.externalHelpJoin(this);
318 >        while ((s = status) >= 0) {
319              if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
320                  synchronized (this) {
321                      if (status >= 0)
# Line 327 | Line 324 | public abstract class ForkJoinTask<V> im
324                          notifyAll();
325                  }
326              }
330            s = status;
327          }
328          return s;
329      }
330  
331 +
332      /**
333       * Implementation for join, get, quietlyJoin. Directly handles
334       * only cases of already-completed, external wait, and
# Line 655 | Line 652 | public abstract class ForkJoinTask<V> im
652          if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
653              ((ForkJoinWorkerThread)t).workQueue.push(this);
654          else
655 <            ForkJoinPool.submitToCommonPool(this);
655 >            ForkJoinPool.commonPool.externalPush(this);
656          return this;
657      }
658  
# Line 991 | Line 988 | public abstract class ForkJoinTask<V> im
988                  ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
989                  p = wt.pool;
990                  w = wt.workQueue;
991 <                s = p.helpJoinOnce(w, this); // no retries on failure
991 >                p.helpJoinOnce(w, this); // no retries on failure
992              }
993 +            else
994 +                ForkJoinPool.externalHelpJoin(this);
995              boolean canBlock = false;
996              boolean interrupted = false;
997              try {
998                  while ((s = status) >= 0) {
999 <                    if (w != null && w.runState < 0)
999 >                    if (w != null && w.qlock < 0)
1000                          cancelIgnoringExceptions(this);
1001                      else if (!canBlock) {
1002 <                        if (p == null || p.tryCompensate(this, null))
1002 >                        if (p == null || p.tryCompensate())
1003                              canBlock = true;
1004                      }
1005                      else {
# Line 1139 | Line 1138 | public abstract class ForkJoinTask<V> im
1138       */
1139      public boolean tryUnfork() {
1140          Thread t;
1141 <        return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1142 <            ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) :
1143 <            ForkJoinPool.tryUnsubmitFromCommonPool(this);
1141 >        return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1142 >                ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) :
1143 >                ForkJoinPool.tryExternalUnpush(this));
1144      }
1145  
1146      /**
# Line 1153 | Line 1152 | public abstract class ForkJoinTask<V> im
1152       * @return the number of tasks
1153       */
1154      public static int getQueuedTaskCount() {
1155 <        Thread t;
1156 <        return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1157 <            ((ForkJoinWorkerThread)t).workQueue.queueSize() :
1158 <            ForkJoinPool.getEstimatedSubmitterQueueLength();
1155 >        Thread t; ForkJoinPool.WorkQueue q;
1156 >        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
1157 >            q = ((ForkJoinWorkerThread)t).workQueue;
1158 >        else
1159 >            q = ForkJoinPool.commonSubmitterQueue();
1160 >        return (q == null) ? 0 : q.queueSize();
1161      }
1162  
1163      /**
# Line 1173 | Line 1174 | public abstract class ForkJoinTask<V> im
1174       * @return the surplus number of tasks, which may be negative
1175       */
1176      public static int getSurplusQueuedTaskCount() {
1177 <        /*
1177 <         * The aim of this method is to return a cheap heuristic guide
1178 <         * for task partitioning when programmers, frameworks, tools,
1179 <         * or languages have little or no idea about task granularity.
1180 <         * In essence by offering this method, we ask users only about
1181 <         * tradeoffs in overhead vs expected throughput and its
1182 <         * variance, rather than how finely to partition tasks.
1183 <         *
1184 <         * In a steady state strict (tree-structured) computation,
1185 <         * each thread makes available for stealing enough tasks for
1186 <         * other threads to remain active. Inductively, if all threads
1187 <         * play by the same rules, each thread should make available
1188 <         * only a constant number of tasks.
1189 <         *
1190 <         * The minimum useful constant is just 1. But using a value of
1191 <         * 1 would require immediate replenishment upon each steal to
1192 <         * maintain enough tasks, which is infeasible.  Further,
1193 <         * partitionings/granularities of offered tasks should
1194 <         * minimize steal rates, which in general means that threads
1195 <         * nearer the top of computation tree should generate more
1196 <         * than those nearer the bottom. In perfect steady state, each
1197 <         * thread is at approximately the same level of computation
1198 <         * tree. However, producing extra tasks amortizes the
1199 <         * uncertainty of progress and diffusion assumptions.
1200 <         *
1201 <         * So, users will want to use values larger, but not much
1202 <         * larger than 1 to both smooth over transient shortages and
1203 <         * hedge against uneven progress; as traded off against the
1204 <         * cost of extra task overhead. We leave the user to pick a
1205 <         * threshold value to compare with the results of this call to
1206 <         * guide decisions, but recommend values such as 3.
1207 <         *
1208 <         * When all threads are active, it is on average OK to
1209 <         * estimate surplus strictly locally. In steady-state, if one
1210 <         * thread is maintaining say 2 surplus tasks, then so are
1211 <         * others. So we can just use estimated queue length.
1212 <         * However, this strategy alone leads to serious mis-estimates
1213 <         * in some non-steady-state conditions (ramp-up, ramp-down,
1214 <         * other stalls). We can detect many of these by further
1215 <         * considering the number of "idle" threads, that are known to
1216 <         * have zero queued tasks, so compensate by a factor of
1217 <         * (#idle/#active) threads.
1218 <         */
1219 <        Thread t; ForkJoinWorkerThread wt;
1220 <        return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1221 <            (wt = (ForkJoinWorkerThread)t).workQueue.queueSize() - wt.pool.idlePerActive() :
1222 <            0;
1177 >        return ForkJoinPool.getSurplusQueuedTaskCount();
1178      }
1179  
1180      // Extension methods
# Line 1263 | Line 1218 | public abstract class ForkJoinTask<V> im
1218      /**
1219       * Returns, but does not unschedule or execute, a task queued by
1220       * the current thread but not yet executed, if one is immediately
1221 <     * available and the current thread is operating in a
1222 <     * ForkJoinPool. There is no guarantee that this task will
1223 <     * actually be polled or executed next. Conversely, this method
1224 <     * may return null even if a task exists but cannot be accessed
1270 <     * without contention with other threads.  This method is designed
1221 >     * available. There is no guarantee that this task will actually
1222 >     * be polled or executed next. Conversely, this method may return
1223 >     * null even if a task exists but cannot be accessed without
1224 >     * contention with other threads.  This method is designed
1225       * primarily to support extensions, and is unlikely to be useful
1226       * otherwise.
1227       *
1228       * @return the next task, or {@code null} if none are available
1229       */
1230      protected static ForkJoinTask<?> peekNextLocalTask() {
1231 <        Thread t;
1232 <        return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1233 <            ((ForkJoinWorkerThread)t).workQueue.peek() :
1234 <            null;
1231 >        Thread t; ForkJoinPool.WorkQueue q;
1232 >        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
1233 >            q = ((ForkJoinWorkerThread)t).workQueue;
1234 >        else
1235 >            q = ForkJoinPool.commonSubmitterQueue();
1236 >        return (q == null) ? null : q.peek();
1237      }
1238  
1239      /**
# Line 1502 | Line 1458 | public abstract class ForkJoinTask<V> im
1458      // Unsafe mechanics
1459      private static final sun.misc.Unsafe U;
1460      private static final long STATUS;
1461 +
1462      static {
1463          exceptionTableLock = new ReentrantLock();
1464          exceptionTableRefQueue = new ReferenceQueue<Object>();
1465          exceptionTable = new ExceptionNode[EXCEPTION_MAP_CAPACITY];
1466          try {
1467              U = getUnsafe();
1468 +            Class<?> k = ForkJoinTask.class;
1469              STATUS = U.objectFieldOffset
1470 <                (ForkJoinTask.class.getDeclaredField("status"));
1470 >                (k.getDeclaredField("status"));
1471          } catch (Exception e) {
1472              throw new Error(e);
1473          }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines