ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinTask.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinTask.java (file contents):
Revision 1.128 by dl, Sat Feb 1 20:20:17 2020 UTC vs.
Revision 1.129 by dl, Sun Feb 2 23:44:12 2020 UTC

# Line 513 | Line 513 | public abstract class ForkJoinTask<V> im
513      static final void cancelIgnoringExceptions(Future<?> t) {
514          if (t != null) {
515              try {
516 <                t.cancel(false);
516 >                t.cancel(true);
517              } catch (Throwable ignore) {
518              }
519          }
# Line 583 | Line 583 | public abstract class ForkJoinTask<V> im
583      }
584  
585      /**
586 +     * Throws exception for (timed or untimed) get, wrapping if
587 +     * necessary in an ExecutionException.
588 +     */
589 +    private void reportExceptionForGet(int s) {
590 +        Throwable ex = null;
591 +        if (s == ABNORMAL)
592 +            ex = new InterruptedException();
593 +        else if (s >= 0)
594 +            ex = new TimeoutException();
595 +        else if ((s & THROWN) != 0 && (ex = getThrowableException()) != null)
596 +            ex = new ExecutionException(ex);
597 +        ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
598 +    }
599 +
600 +    /**
601       * A version of "sneaky throw" to relay exceptions in other
602       * contexts.
603       */
# Line 952 | Line 967 | public abstract class ForkJoinTask<V> im
967       * member of a ForkJoinPool and was interrupted while waiting
968       */
969      public final V get() throws InterruptedException, ExecutionException {
970 <        int s; Throwable ex;
971 <        if ((s = status) >= 0 && (s = awaitGet(false, 0L)) >= 0)
972 <            throw new InterruptedException();
973 <        else if ((s & ABNORMAL) == 0)
974 <            return getRawResult();
975 <        else if ((s & THROWN) == 0 || (ex = getThrowableException()) == null)
961 <            throw new CancellationException();
962 <        else
963 <            throw new ExecutionException(ex);
970 >        int s;
971 >        if ((s = status) >= 0)
972 >            s = awaitGet(false, 0L);
973 >        if ((s & ABNORMAL) != 0)
974 >            reportExceptionForGet(s);
975 >        return getRawResult();
976      }
977  
978      /**
# Line 979 | Line 991 | public abstract class ForkJoinTask<V> im
991       */
992      public final V get(long timeout, TimeUnit unit)
993          throws InterruptedException, ExecutionException, TimeoutException {
994 <        int s; Throwable ex;
994 >        int s;
995          long nanos = unit.toNanos(timeout);
996 <        if ((s = status) >= 0 && (s = awaitGet(true, nanos)) >= 0) {
997 <            if (s == ABNORMAL)
998 <                throw new InterruptedException();
999 <            else
1000 <                throw new TimeoutException();
989 <        }
990 <        else if ((s & ABNORMAL) == 0)
991 <            return getRawResult();
992 <        else if ((s & THROWN) == 0 || (ex = getThrowableException()) == null)
993 <            throw new CancellationException();
994 <        else
995 <            throw new ExecutionException(ex);
996 >        if ((s = status) >= 0)
997 >            s = awaitGet(true, nanos);
998 >        if (s >= 0 || (s & ABNORMAL) != 0)
999 >            reportExceptionForGet(s);
1000 >        return getRawResult();
1001      }
1002  
1003      /**
# Line 1399 | Line 1404 | public abstract class ForkJoinTask<V> im
1404          private static final long serialVersionUID = 2838392045355241008L;
1405      }
1406  
1407 +    static final class AdaptedInterruptibleCallable<T> extends ForkJoinTask<T>
1408 +        implements RunnableFuture<T> {
1409 +        @SuppressWarnings("serial") // Conditionally serializable
1410 +        final Callable<? extends T> callable;
1411 +        @SuppressWarnings("serial") // Conditionally serializable
1412 +        transient volatile Thread runner;
1413 +        T result;
1414 +        AdaptedInterruptibleCallable(Callable<? extends T> callable) {
1415 +            if (callable == null) throw new NullPointerException();
1416 +            this.callable = callable;
1417 +        }
1418 +        public final T getRawResult() { return result; }
1419 +        public final void setRawResult(T v) { result = v; }
1420 +        public final boolean exec() {
1421 +            Thread.interrupted();
1422 +            runner = Thread.currentThread();
1423 +            try {
1424 +                result = callable.call();
1425 +                return true;
1426 +            } catch (RuntimeException rex) {
1427 +                throw rex;
1428 +            } catch (Exception ex) {
1429 +                throw new RuntimeException(ex);
1430 +            } finally {
1431 +                runner = null;
1432 +                Thread.interrupted();
1433 +            }
1434 +        }
1435 +        public final void run() { invoke(); }
1436 +        public final boolean cancel(boolean mayInterruptIfRunning) {
1437 +            Thread t;
1438 +            boolean stat = super.cancel(false);
1439 +            if (mayInterruptIfRunning && (t = runner) != null) {
1440 +                try {
1441 +                    t.interrupt();
1442 +                } catch (Throwable ignore) {
1443 +                }
1444 +            }
1445 +            return stat;
1446 +        }
1447 +        public String toString() {
1448 +            return super.toString() + "[Wrapped task = " + callable + "]";
1449 +        }
1450 +        private static final long serialVersionUID = 2838392045355241008L;
1451 +    }
1452 +
1453      /**
1454       * Returns a new {@code ForkJoinTask} that performs the {@code run}
1455       * method of the given {@code Runnable} as its action, and returns
# Line 1439 | Line 1490 | public abstract class ForkJoinTask<V> im
1490          return new AdaptedCallable<T>(callable);
1491      }
1492  
1493 +    /**
1494 +     * Returns a new {@code ForkJoinTask} that performs the {@code
1495 +     * call} method of the given {@code Callable} as its action, and
1496 +     * returns its result upon {@link #join}, translating any checked
1497 +     * exceptions encountered into {@code
1498 +     * RuntimeException}. Additionally, invocations of {@code cancel}
1499 +     * with {@code mayInterruptIfRunning true} will attempt to
1500 +     * interrupt the thread performing the task.
1501 +     *
1502 +     * @param callable the callable action
1503 +     * @param <T> the type of the callable's result
1504 +     * @return the task
1505 +     *
1506 +     * @since 1.15
1507 +     */
1508 +    public static <T> ForkJoinTask<T> adaptInterruptible(Callable<? extends T> callable) {
1509 +        return new AdaptedInterruptibleCallable<T>(callable);
1510 +    }
1511 +
1512      // Serialization support
1513  
1514      private static final long serialVersionUID = -7721805057305804111L;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines