ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinTask.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinTask.java (file contents):
Revision 1.117 by jsr166, Tue Sep 26 03:42:11 2017 UTC vs.
Revision 1.118 by dl, Sat Dec 2 16:42:41 2017 UTC

# Line 190 | Line 190 | public abstract class ForkJoinTask<V> im
190       * methods in a way that flows well in javadocs.
191       */
192  
193 <    /*
193 >    /**
194       * The status field holds run control status bits packed into a
195 <     * single int to minimize footprint and to ensure atomicity (via
196 <     * CAS).  Status is initially zero, and takes on nonnegative
197 <     * values until completed, upon which status (anded with
198 <     * DONE_MASK) holds value NORMAL, CANCELLED, or EXCEPTIONAL. Tasks
199 <     * undergoing blocking waits by other threads have the SIGNAL bit
200 <     * set.  Completion of a stolen task with SIGNAL set awakens any
201 <     * waiters via notifyAll. Even though suboptimal for some
202 <     * purposes, we use basic builtin wait/notify to take advantage of
203 <     * "monitor inflation" in JVMs that we would otherwise need to
204 <     * emulate to avoid adding further per-task bookkeeping overhead.
205 <     * We want these monitors to be "fat", i.e., not use biasing or
206 <     * thin-lock techniques, so use some odd coding idioms that tend
207 <     * to avoid them, mainly by arranging that every synchronized
208 <     * block performs a wait, notifyAll or both.
195 >     * single int to ensure atomicity.  Status is initially zero, and
196 >     * takes on nonnegative values until completed, upon which it
197 >     * holds (negative bit) DONE, possibly with ABNORMAL (cancelled or
198 >     * exceptional) and THROWN (in which case an exception has been
199 >     * stored). Tasks undergoing blocking waits by other threads have
200 >     * the SIGNAL bit set.  Completion of a task with SIGNAL set
201 >     * awakens any waiters via notifyAll. (Waiters also help signal
202 >     * others upon completion.)
203       *
204       * These control bits occupy only (some of) the upper half (16
205       * bits) of status field. The lower bits are used for user-defined
206       * tags.
207       */
214
215    /** The run status of this task */
208      volatile int status; // accessed directly by pool and workers
209 <    static final int DONE_MASK   = 0xf0000000;  // mask out non-completion bits
210 <    static final int NORMAL      = 0xf0000000;  // must be negative
211 <    static final int CANCELLED   = 0xc0000000;  // must be < NORMAL
212 <    static final int EXCEPTIONAL = 0x80000000;  // must be < CANCELLED
213 <    static final int SIGNAL      = 0x00010000;  // must be >= 1 << 16
214 <    static final int SMASK       = 0x0000ffff;  // short bits for tags
209 >
210 >    private static final int DONE     = 1 << 31; // must be negative
211 >    private static final int ABNORMAL = 1 << 18; // set atomically with DONE
212 >    private static final int THROWN   = 1 << 17; // set atomically with ABNORMAL
213 >    private static final int SIGNAL   = 1 << 16; // true if joiner waiting
214 >    private static final int SMASK    = 0xffff;  // short bits for tags
215 >
216 >    static boolean isExceptionalStatus(int s) {  // needed by subclasses
217 >        return (s & THROWN) != 0;
218 >    }
219  
220      /**
221 <     * Marks completion and wakes up threads waiting to join this
226 <     * task.
221 >     * Sets DONE status and wakes up threads waiting to join this task.
222       *
223 <     * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
229 <     * @return completion status on exit
223 >     * @return status on exit
224       */
225 <    private int setCompletion(int completion) {
226 <        for (int s;;) {
225 >    private int setDone() {
226 >        int s;
227 >        if (((s = (int)STATUS.getAndBitwiseOr(this, DONE)) & SIGNAL) != 0)
228 >            synchronized (this) { notifyAll(); }
229 >        return s | DONE;
230 >    }
231 >
232 >    /**
233 >     * Marks cancelled or exceptional completion unless already done.
234 >     *
235 >     * @param completion must be DONE | ABNORMAL, ORed with THROWN if exceptional
236 >     * @return status on exit
237 >     */
238 >    private int abnormalCompletion(int completion) {
239 >        for (int s, ns;;) {
240              if ((s = status) < 0)
241                  return s;
242 <            if (STATUS.compareAndSet(this, s, s | completion)) {
243 <                if ((s >>> 16) != 0)
242 >            else if (STATUS.weakCompareAndSet(this, s, ns = s | completion)) {
243 >                if ((s & SIGNAL) != 0)
244                      synchronized (this) { notifyAll(); }
245 <                return completion;
245 >                return ns;
246              }
247          }
248      }
# Line 253 | Line 260 | public abstract class ForkJoinTask<V> im
260              try {
261                  completed = exec();
262              } catch (Throwable rex) {
263 <                return setExceptionalCompletion(rex);
263 >                completed = false;
264 >                s = setExceptionalCompletion(rex);
265              }
266              if (completed)
267 <                s = setCompletion(NORMAL);
267 >                s = setDone();
268          }
269          return s;
270      }
# Line 268 | Line 276 | public abstract class ForkJoinTask<V> im
276       * @param timeout using Object.wait conventions.
277       */
278      final void internalWait(long timeout) {
279 <        int s;
272 <        if ((s = status) >= 0 && // force completer to issue notify
273 <            STATUS.compareAndSet(this, s, s | SIGNAL)) {
279 >        if ((int)STATUS.getAndBitwiseOr(this, SIGNAL) >= 0) {
280              synchronized (this) {
281                  if (status >= 0)
282                      try { wait(timeout); } catch (InterruptedException ie) { }
# Line 285 | Line 291 | public abstract class ForkJoinTask<V> im
291       * @return status upon completion
292       */
293      private int externalAwaitDone() {
294 <        int s = ((this instanceof CountedCompleter) ? // try helping
295 <                 ForkJoinPool.common.externalHelpComplete(
290 <                     (CountedCompleter<?>)this, 0) :
291 <                 ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
292 <        if (s >= 0 && (s = status) >= 0) {
294 >        int s = tryExternalHelp();
295 >        if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) {
296              boolean interrupted = false;
297 <            do {
298 <                if (STATUS.compareAndSet(this, s, s | SIGNAL)) {
299 <                    synchronized (this) {
300 <                        if (status >= 0) {
301 <                            try {
302 <                                wait(0L);
303 <                            } catch (InterruptedException ie) {
301 <                                interrupted = true;
302 <                            }
297 >            synchronized (this) {
298 >                for (;;) {
299 >                    if ((s = status) >= 0) {
300 >                        try {
301 >                            wait(0L);
302 >                        } catch (InterruptedException ie) {
303 >                            interrupted = true;
304                          }
305 <                        else
306 <                            notifyAll();
305 >                    }
306 >                    else {
307 >                        notifyAll();
308 >                        break;
309                      }
310                  }
311 <            } while ((s = status) >= 0);
311 >            }
312              if (interrupted)
313                  Thread.currentThread().interrupt();
314          }
# Line 316 | Line 319 | public abstract class ForkJoinTask<V> im
319       * Blocks a non-worker-thread until completion or interruption.
320       */
321      private int externalInterruptibleAwaitDone() throws InterruptedException {
322 <        int s;
323 <        if (Thread.interrupted())
324 <            throw new InterruptedException();
325 <        if ((s = status) >= 0 &&
326 <            (s = ((this instanceof CountedCompleter) ?
327 <                  ForkJoinPool.common.externalHelpComplete(
328 <                      (CountedCompleter<?>)this, 0) :
329 <                  ForkJoinPool.common.tryExternalUnpush(this) ? doExec() :
330 <                  0)) >= 0) {
328 <            while ((s = status) >= 0) {
329 <                if (STATUS.compareAndSet(this, s, s | SIGNAL)) {
330 <                    synchronized (this) {
331 <                        if (status >= 0)
332 <                            wait(0L);
333 <                        else
334 <                            notifyAll();
322 >        int s = tryExternalHelp();
323 >        if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) {
324 >            synchronized (this) {
325 >                for (;;) {
326 >                    if ((s = status) >= 0)
327 >                        wait(0L);
328 >                    else {
329 >                        notifyAll();
330 >                        break;
331                      }
332                  }
333              }
334          }
335 +        else if (Thread.interrupted())
336 +            throw new InterruptedException();
337          return s;
338      }
339  
340      /**
341 +     * Tries to help with tasks allowed for external callers.
342 +     *
343 +     * @return current status
344 +     */
345 +    private int tryExternalHelp() {
346 +        int s;
347 +        return ((s = status) < 0 ? s:
348 +                (this instanceof CountedCompleter) ?
349 +                ForkJoinPool.common.externalHelpComplete(
350 +                    (CountedCompleter<?>)this, 0) :
351 +                ForkJoinPool.common.tryExternalUnpush(this) ?
352 +                doExec() : 0);
353 +    }
354 +
355 +    /**
356       * Implementation for join, get, quietlyJoin. Directly handles
357       * only cases of already-completed, external wait, and
358       * unfork+exec.  Others are relayed to ForkJoinPool.awaitJoin.
# Line 446 | Line 459 | public abstract class ForkJoinTask<V> im
459              } finally {
460                  lock.unlock();
461              }
462 <            s = setCompletion(EXCEPTIONAL);
462 >            s = abnormalCompletion(DONE | ABNORMAL | THROWN);
463          }
464          return s;
465      }
# Line 458 | Line 471 | public abstract class ForkJoinTask<V> im
471       */
472      private int setExceptionalCompletion(Throwable ex) {
473          int s = recordExceptionalCompletion(ex);
474 <        if ((s & DONE_MASK) == EXCEPTIONAL)
474 >        if ((s & THROWN) != 0)
475              internalPropagateException(ex);
476          return s;
477      }
# Line 633 | Line 646 | public abstract class ForkJoinTask<V> im
646       * Throws exception, if any, associated with the given status.
647       */
648      private void reportException(int s) {
649 <        if (s == CANCELLED)
650 <            throw new CancellationException();
638 <        if (s == EXCEPTIONAL)
639 <            rethrow(getThrowableException());
649 >        rethrow((s & THROWN) != 0 ? getThrowableException() :
650 >                new CancellationException());
651      }
652  
653      // public methods
# Line 678 | Line 689 | public abstract class ForkJoinTask<V> im
689       */
690      public final V join() {
691          int s;
692 <        if ((s = doJoin() & DONE_MASK) != NORMAL)
692 >        if (((s = doJoin()) & ABNORMAL) != 0)
693              reportException(s);
694          return getRawResult();
695      }
# Line 693 | Line 704 | public abstract class ForkJoinTask<V> im
704       */
705      public final V invoke() {
706          int s;
707 <        if ((s = doInvoke() & DONE_MASK) != NORMAL)
707 >        if (((s = doInvoke()) & ABNORMAL) != 0)
708              reportException(s);
709          return getRawResult();
710      }
# Line 718 | Line 729 | public abstract class ForkJoinTask<V> im
729      public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
730          int s1, s2;
731          t2.fork();
732 <        if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL)
732 >        if (((s1 = t1.doInvoke()) & ABNORMAL) != 0)
733              t1.reportException(s1);
734 <        if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL)
734 >        if (((s2 = t2.doJoin()) & ABNORMAL) != 0)
735              t2.reportException(s2);
736      }
737  
# Line 750 | Line 761 | public abstract class ForkJoinTask<V> im
761              }
762              else if (i != 0)
763                  t.fork();
764 <            else if (t.doInvoke() < NORMAL && ex == null)
764 >            else if ((t.doInvoke() & ABNORMAL) != 0 && ex == null)
765                  ex = t.getException();
766          }
767          for (int i = 1; i <= last; ++i) {
# Line 758 | Line 769 | public abstract class ForkJoinTask<V> im
769              if (t != null) {
770                  if (ex != null)
771                      t.cancel(false);
772 <                else if (t.doJoin() < NORMAL)
772 >                else if ((t.doJoin() & ABNORMAL) != 0)
773                      ex = t.getException();
774              }
775          }
# Line 802 | Line 813 | public abstract class ForkJoinTask<V> im
813              }
814              else if (i != 0)
815                  t.fork();
816 <            else if (t.doInvoke() < NORMAL && ex == null)
816 >            else if ((t.doInvoke() & ABNORMAL) != 0 && ex == null)
817                  ex = t.getException();
818          }
819          for (int i = 1; i <= last; ++i) {
# Line 810 | Line 821 | public abstract class ForkJoinTask<V> im
821              if (t != null) {
822                  if (ex != null)
823                      t.cancel(false);
824 <                else if (t.doJoin() < NORMAL)
824 >                else if ((t.doJoin() & ABNORMAL) != 0)
825                      ex = t.getException();
826              }
827          }
# Line 847 | Line 858 | public abstract class ForkJoinTask<V> im
858       * @return {@code true} if this task is now cancelled
859       */
860      public boolean cancel(boolean mayInterruptIfRunning) {
861 <        return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
861 >        int s = abnormalCompletion(DONE | ABNORMAL);
862 >        return (s & (ABNORMAL | THROWN)) == ABNORMAL;
863      }
864  
865      public final boolean isDone() {
# Line 855 | Line 867 | public abstract class ForkJoinTask<V> im
867      }
868  
869      public final boolean isCancelled() {
870 <        return (status & DONE_MASK) == CANCELLED;
870 >        return (status & (ABNORMAL | THROWN)) == ABNORMAL;
871      }
872  
873      /**
# Line 864 | Line 876 | public abstract class ForkJoinTask<V> im
876       * @return {@code true} if this task threw an exception or was cancelled
877       */
878      public final boolean isCompletedAbnormally() {
879 <        return status < NORMAL;
879 >        return (status & ABNORMAL) != 0;
880      }
881  
882      /**
# Line 875 | Line 887 | public abstract class ForkJoinTask<V> im
887       * exception and was not cancelled
888       */
889      public final boolean isCompletedNormally() {
890 <        return (status & DONE_MASK) == NORMAL;
890 >        return (status & (DONE | ABNORMAL)) == DONE;
891      }
892  
893      /**
# Line 886 | Line 898 | public abstract class ForkJoinTask<V> im
898       * @return the exception, or {@code null} if none
899       */
900      public final Throwable getException() {
901 <        int s = status & DONE_MASK;
902 <        return ((s >= NORMAL)    ? null :
903 <                (s == CANCELLED) ? new CancellationException() :
901 >        int s = status;
902 >        return ((s & ABNORMAL) == 0 ? null :
903 >                (s & THROWN)   == 0 ? new CancellationException() :
904                  getThrowableException());
905      }
906  
# Line 932 | Line 944 | public abstract class ForkJoinTask<V> im
944              setExceptionalCompletion(rex);
945              return;
946          }
947 <        setCompletion(NORMAL);
947 >        setDone();
948      }
949  
950      /**
# Line 944 | Line 956 | public abstract class ForkJoinTask<V> im
956       * @since 1.8
957       */
958      public final void quietlyComplete() {
959 <        setCompletion(NORMAL);
959 >        setDone();
960      }
961  
962      /**
# Line 961 | Line 973 | public abstract class ForkJoinTask<V> im
973      public final V get() throws InterruptedException, ExecutionException {
974          int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
975              doJoin() : externalInterruptibleAwaitDone();
976 <        if ((s &= DONE_MASK) == CANCELLED)
965 <            throw new CancellationException();
966 <        if (s == EXCEPTIONAL)
976 >        if ((s & THROWN) != 0)
977              throw new ExecutionException(getThrowableException());
978 <        return getRawResult();
978 >        else if ((s & ABNORMAL) != 0)
979 >            throw new CancellationException();
980 >        else
981 >            return getRawResult();
982      }
983  
984      /**
# Line 1005 | Line 1018 | public abstract class ForkJoinTask<V> im
1018                  while ((s = status) >= 0 &&
1019                         (ns = deadline - System.nanoTime()) > 0L) {
1020                      if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
1021 <                        STATUS.compareAndSet(this, s, s | SIGNAL)) {
1021 >                        (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) {
1022                          synchronized (this) {
1023                              if (status >= 0)
1024                                  wait(ms); // OK to throw InterruptedException
# Line 1017 | Line 1030 | public abstract class ForkJoinTask<V> im
1030              }
1031          }
1032          if (s >= 0)
1033 <            s = status;
1034 <        if ((s &= DONE_MASK) != NORMAL) {
1022 <            if (s == CANCELLED)
1023 <                throw new CancellationException();
1024 <            if (s != EXCEPTIONAL)
1025 <                throw new TimeoutException();
1033 >            throw new TimeoutException();
1034 >        else if ((s & THROWN) != 0)
1035              throw new ExecutionException(getThrowableException());
1036 <        }
1037 <        return getRawResult();
1036 >        else if ((s & ABNORMAL) != 0)
1037 >            throw new CancellationException();
1038 >        else
1039 >            return getRawResult();
1040      }
1041  
1042      /**
# Line 1081 | Line 1092 | public abstract class ForkJoinTask<V> im
1092       * setRawResult(null)}.
1093       */
1094      public void reinitialize() {
1095 <        if ((status & DONE_MASK) == EXCEPTIONAL)
1095 >        if ((status & THROWN) != 0)
1096              clearExceptionalCompletion();
1097          else
1098              status = 0;
# Line 1298 | Line 1309 | public abstract class ForkJoinTask<V> im
1309       */
1310      public final short setForkJoinTaskTag(short newValue) {
1311          for (int s;;) {
1312 <            if (STATUS.compareAndSet(this, s = status,
1313 <                                     (s & ~SMASK) | (newValue & SMASK)))
1312 >            if (STATUS.weakCompareAndSet(this, s = status,
1313 >                                         (s & ~SMASK) | (newValue & SMASK)))
1314                  return (short)s;
1315          }
1316      }
# Line 1322 | Line 1333 | public abstract class ForkJoinTask<V> im
1333          for (int s;;) {
1334              if ((short)(s = status) != expect)
1335                  return false;
1336 <            if (STATUS.compareAndSet(this, s,
1337 <                                     (s & ~SMASK) | (update & SMASK)))
1336 >            if (STATUS.weakCompareAndSet(this, s,
1337 >                                         (s & ~SMASK) | (update & SMASK)))
1338                  return true;
1339          }
1340      }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines