ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinTask.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinTask.java (file contents):
Revision 1.91 by dl, Sun Oct 28 22:36:01 2012 UTC vs.
Revision 1.94 by dl, Mon Nov 19 18:12:42 2012 UTC

# Line 33 | Line 33 | import java.lang.reflect.Constructor;
33   * <p>A "main" {@code ForkJoinTask} begins execution when it is
34   * explicitly submitted to a {@link ForkJoinPool}, or, if not already
35   * engaged in a ForkJoin computation, commenced in the {@link
36 < * ForkJoinPool#commonPool} via {@link #fork}, {@link #invoke}, or
36 > * ForkJoinPool#commonPool()} via {@link #fork}, {@link #invoke}, or
37   * related methods.  Once started, it will usually in turn start other
38   * subtasks.  As indicated by the name of this class, many programs
39   * using {@code ForkJoinTask} employ only methods {@link #fork} and
# Line 285 | Line 285 | public abstract class ForkJoinTask<V> im
285       */
286      private int externalAwaitDone() {
287          int s;
288 +        ForkJoinPool.externalHelpJoin(this);
289          boolean interrupted = false;
290 <        if ((s = status) >= 0 && ForkJoinPool.tryUnsubmitFromCommonPool(this))
290 <            s = doExec();
291 <        while (s >= 0) {
290 >        while ((s = status) >= 0) {
291              if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
292                  synchronized (this) {
293                      if (status >= 0) {
# Line 302 | Line 301 | public abstract class ForkJoinTask<V> im
301                          notifyAll();
302                  }
303              }
305            s = status;
304          }
305          if (interrupted)
306              Thread.currentThread().interrupt();
# Line 313 | Line 311 | public abstract class ForkJoinTask<V> im
311       * Blocks a non-worker-thread until completion or interruption.
312       */
313      private int externalInterruptibleAwaitDone() throws InterruptedException {
314 +        int s;
315          if (Thread.interrupted())
316              throw new InterruptedException();
317 <        int s;
318 <        if ((s = status) >= 0 && ForkJoinPool.tryUnsubmitFromCommonPool(this))
320 <            s = doExec();
321 <        while (s >= 0) {
317 >        ForkJoinPool.externalHelpJoin(this);
318 >        while ((s = status) >= 0) {
319              if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
320                  synchronized (this) {
321                      if (status >= 0)
# Line 327 | Line 324 | public abstract class ForkJoinTask<V> im
324                          notifyAll();
325                  }
326              }
330            s = status;
327          }
328          return s;
329      }
330  
331 +
332      /**
333       * Implementation for join, get, quietlyJoin. Directly handles
334       * only cases of already-completed, external wait, and
# Line 601 | Line 598 | public abstract class ForkJoinTask<V> im
598      }
599  
600      /**
601 +     * A version of "sneaky throw" to relay exceptions
602 +     */
603 +    static void rethrow(final Throwable ex) {
604 +        if (ex != null) {
605 +            if (ex instanceof Error)
606 +                throw (Error)ex;
607 +            if (ex instanceof RuntimeException)
608 +                throw (RuntimeException)ex;
609 +            throw uncheckedThrowable(ex, RuntimeException.class);
610 +        }
611 +    }
612 +
613 +    /**
614 +     * The sneaky part of sneaky throw, relying on generics
615 +     * limitations to evade compiler complaints about rethrowing
616 +     * unchecked exceptions
617 +     */
618 +    @SuppressWarnings("unchecked") static <T extends Throwable>
619 +        T uncheckedThrowable(final Throwable t, final Class<T> c) {
620 +        return (T)t; // rely on vacuous cast
621 +    }
622 +
623 +    /**
624       * Throws exception, if any, associated with the given status.
625       */
626      private void reportException(int s) {
627 <        Throwable ex = ((s == CANCELLED) ?  new CancellationException() :
628 <                        (s == EXCEPTIONAL) ? getThrowableException() :
629 <                        null);
630 <        if (ex != null)
611 <            U.throwException(ex);
627 >        if (s == CANCELLED)
628 >            throw new CancellationException();
629 >        if (s == EXCEPTIONAL)
630 >            rethrow(getThrowableException());
631      }
632  
633      // public methods
# Line 616 | Line 635 | public abstract class ForkJoinTask<V> im
635      /**
636       * Arranges to asynchronously execute this task in the pool the
637       * current task is running in, if applicable, or using the {@link
638 <     * ForkJoinPool#commonPool} if not {@link #inForkJoinPool}.  While
638 >     * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}.  While
639       * it is not necessarily enforced, it is a usage error to fork a
640       * task more than once unless it has completed and been
641       * reinitialized.  Subsequent modifications to the state of this
# Line 633 | Line 652 | public abstract class ForkJoinTask<V> im
652          if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
653              ((ForkJoinWorkerThread)t).workQueue.push(this);
654          else
655 <            ForkJoinPool.submitToCommonPool(this);
655 >            ForkJoinPool.commonPool.externalPush(this);
656          return this;
657      }
658  
# Line 735 | Line 754 | public abstract class ForkJoinTask<V> im
754              }
755          }
756          if (ex != null)
757 <            U.throwException(ex);
757 >            rethrow(ex);
758      }
759  
760      /**
# Line 786 | Line 805 | public abstract class ForkJoinTask<V> im
805              }
806          }
807          if (ex != null)
808 <            U.throwException(ex);
808 >            rethrow(ex);
809          return tasks;
810      }
811  
# Line 969 | Line 988 | public abstract class ForkJoinTask<V> im
988                  ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
989                  p = wt.pool;
990                  w = wt.workQueue;
991 <                s = p.helpJoinOnce(w, this); // no retries on failure
991 >                p.helpJoinOnce(w, this); // no retries on failure
992              }
993 +            else
994 +                ForkJoinPool.externalHelpJoin(this);
995              boolean canBlock = false;
996              boolean interrupted = false;
997              try {
998                  while ((s = status) >= 0) {
999 <                    if (w != null && w.runState < 0)
999 >                    if (w != null && w.qlock < 0)
1000                          cancelIgnoringExceptions(this);
1001                      else if (!canBlock) {
1002 <                        if (p == null || p.tryCompensate(this, null))
1002 >                        if (p == null || p.tryCompensate())
1003                              canBlock = true;
1004                      }
1005                      else {
# Line 1117 | Line 1138 | public abstract class ForkJoinTask<V> im
1138       */
1139      public boolean tryUnfork() {
1140          Thread t;
1141 <        return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1142 <            ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) :
1143 <            ForkJoinPool.tryUnsubmitFromCommonPool(this);
1141 >        return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1142 >                ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) :
1143 >                ForkJoinPool.tryExternalUnpush(this));
1144      }
1145  
1146      /**
# Line 1131 | Line 1152 | public abstract class ForkJoinTask<V> im
1152       * @return the number of tasks
1153       */
1154      public static int getQueuedTaskCount() {
1155 <        Thread t;
1156 <        return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1157 <            ((ForkJoinWorkerThread)t).workQueue.queueSize() :
1158 <            ForkJoinPool.getEstimatedSubmitterQueueLength();
1155 >        Thread t; ForkJoinPool.WorkQueue q;
1156 >        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
1157 >            q = ((ForkJoinWorkerThread)t).workQueue;
1158 >        else
1159 >            q = ForkJoinPool.commonSubmitterQueue();
1160 >        return (q == null) ? 0 : q.queueSize();
1161      }
1162  
1163      /**
# Line 1151 | Line 1174 | public abstract class ForkJoinTask<V> im
1174       * @return the surplus number of tasks, which may be negative
1175       */
1176      public static int getSurplusQueuedTaskCount() {
1177 <        /*
1155 <         * The aim of this method is to return a cheap heuristic guide
1156 <         * for task partitioning when programmers, frameworks, tools,
1157 <         * or languages have little or no idea about task granularity.
1158 <         * In essence by offering this method, we ask users only about
1159 <         * tradeoffs in overhead vs expected throughput and its
1160 <         * variance, rather than how finely to partition tasks.
1161 <         *
1162 <         * In a steady state strict (tree-structured) computation,
1163 <         * each thread makes available for stealing enough tasks for
1164 <         * other threads to remain active. Inductively, if all threads
1165 <         * play by the same rules, each thread should make available
1166 <         * only a constant number of tasks.
1167 <         *
1168 <         * The minimum useful constant is just 1. But using a value of
1169 <         * 1 would require immediate replenishment upon each steal to
1170 <         * maintain enough tasks, which is infeasible.  Further,
1171 <         * partitionings/granularities of offered tasks should
1172 <         * minimize steal rates, which in general means that threads
1173 <         * nearer the top of computation tree should generate more
1174 <         * than those nearer the bottom. In perfect steady state, each
1175 <         * thread is at approximately the same level of computation
1176 <         * tree. However, producing extra tasks amortizes the
1177 <         * uncertainty of progress and diffusion assumptions.
1178 <         *
1179 <         * So, users will want to use values larger, but not much
1180 <         * larger than 1 to both smooth over transient shortages and
1181 <         * hedge against uneven progress; as traded off against the
1182 <         * cost of extra task overhead. We leave the user to pick a
1183 <         * threshold value to compare with the results of this call to
1184 <         * guide decisions, but recommend values such as 3.
1185 <         *
1186 <         * When all threads are active, it is on average OK to
1187 <         * estimate surplus strictly locally. In steady-state, if one
1188 <         * thread is maintaining say 2 surplus tasks, then so are
1189 <         * others. So we can just use estimated queue length.
1190 <         * However, this strategy alone leads to serious mis-estimates
1191 <         * in some non-steady-state conditions (ramp-up, ramp-down,
1192 <         * other stalls). We can detect many of these by further
1193 <         * considering the number of "idle" threads, that are known to
1194 <         * have zero queued tasks, so compensate by a factor of
1195 <         * (#idle/#active) threads.
1196 <         */
1197 <        Thread t; ForkJoinWorkerThread wt;
1198 <        return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1199 <            (wt = (ForkJoinWorkerThread)t).workQueue.queueSize() - wt.pool.idlePerActive() :
1200 <            0;
1177 >        return ForkJoinPool.getSurplusQueuedTaskCount();
1178      }
1179  
1180      // Extension methods
# Line 1241 | Line 1218 | public abstract class ForkJoinTask<V> im
1218      /**
1219       * Returns, but does not unschedule or execute, a task queued by
1220       * the current thread but not yet executed, if one is immediately
1221 <     * available and the current thread is operating in a
1222 <     * ForkJoinPool. There is no guarantee that this task will
1223 <     * actually be polled or executed next. Conversely, this method
1224 <     * may return null even if a task exists but cannot be accessed
1248 <     * without contention with other threads.  This method is designed
1221 >     * available. There is no guarantee that this task will actually
1222 >     * be polled or executed next. Conversely, this method may return
1223 >     * null even if a task exists but cannot be accessed without
1224 >     * contention with other threads.  This method is designed
1225       * primarily to support extensions, and is unlikely to be useful
1226       * otherwise.
1227       *
1228       * @return the next task, or {@code null} if none are available
1229       */
1230      protected static ForkJoinTask<?> peekNextLocalTask() {
1231 <        Thread t;
1232 <        return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1233 <            ((ForkJoinWorkerThread)t).workQueue.peek() :
1234 <            null;
1231 >        Thread t; ForkJoinPool.WorkQueue q;
1232 >        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
1233 >            q = ((ForkJoinWorkerThread)t).workQueue;
1234 >        else
1235 >            q = ForkJoinPool.commonSubmitterQueue();
1236 >        return (q == null) ? null : q.peek();
1237      }
1238  
1239      /**
# Line 1480 | Line 1458 | public abstract class ForkJoinTask<V> im
1458      // Unsafe mechanics
1459      private static final sun.misc.Unsafe U;
1460      private static final long STATUS;
1461 +
1462      static {
1463          exceptionTableLock = new ReentrantLock();
1464          exceptionTableRefQueue = new ReferenceQueue<Object>();
1465          exceptionTable = new ExceptionNode[EXCEPTION_MAP_CAPACITY];
1466          try {
1467              U = getUnsafe();
1468 +            Class<?> k = ForkJoinTask.class;
1469              STATUS = U.objectFieldOffset
1470 <                (ForkJoinTask.class.getDeclaredField("status"));
1470 >                (k.getDeclaredField("status"));
1471          } catch (Exception e) {
1472              throw new Error(e);
1473          }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines