ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinTask.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinTask.java (file contents):
Revision 1.151 by dl, Sat Feb 6 17:33:47 2021 UTC vs.
Revision 1.152 by dl, Sun Feb 7 15:01:38 2021 UTC

# Line 267 | Line 267 | public abstract class ForkJoinTask<V> im
267      }
268  
269      /**
270     * Possibly blocks until task is done or interrupted or timed out.
271     *
272     * @param interruptible true if wait can be cancelled by interrupt
273     * @param deadline if non-zero use timed waits and possibly timeout
274     * @param pool current pool if known
275     * @param uncompensate if true uncompensate after unblocking
276     * @return status on exit, or ABNORMAL if interrupted while waiting
277     */
278    private int awaitDone(boolean interruptible, long deadline,
279                          ForkJoinPool pool, boolean uncompensate) {
280        int s;
281        boolean interrupted = false, queued = false, parked = false, fail = false;
282        Aux node = null;
283        while ((s = status) >= 0) {
284            Aux a; long ns;
285            if (fail || (fail = (pool != null && pool.mode < 0)))
286                casStatus(s, s | (DONE | ABNORMAL)); // try to cancel
287            else if (parked && Thread.interrupted()) {
288                if (interruptible) {
289                    s = ABNORMAL;
290                    break;
291                }
292                interrupted = true;
293            }
294            else if (queued) {
295                if (deadline != 0L) {
296                    if ((ns = deadline - System.nanoTime()) <= 0L)
297                        break;
298                    LockSupport.parkNanos(ns);
299                }
300                else
301                    LockSupport.park();
302                parked = true;
303            }
304            else if (node != null) {
305                if ((a = aux) != null && a.ex != null)
306                    Thread.onSpinWait();     // exception in progress
307                else if (queued = casAux(node.next = a, node))
308                    LockSupport.setCurrentBlocker(this);
309            }
310            else {
311                try {
312                    node = new Aux(Thread.currentThread(), null);
313                } catch (Throwable ex) {     // cannot create
314                    fail = true;
315                }
316            }
317        }
318        if (pool != null && uncompensate)
319            pool.uncompensate();
320
321        if (queued) {
322            LockSupport.setCurrentBlocker(null);
323            if (s >= 0) { // cancellation similar to AbstractQueuedSynchronizer
324                outer: for (Aux a; (a = aux) != null && a.ex == null; ) {
325                    for (Aux trail = null;;) {
326                        Aux next = a.next;
327                        if (a == node) {
328                            if (trail != null)
329                                trail.casNext(trail, next);
330                            else if (casAux(a, next))
331                                break outer; // cannot be re-encountered
332                            break;           // restart
333                        } else {
334                            trail = a;
335                            if ((a = next) == null)
336                                break outer;
337                        }
338                    }
339                }
340            }
341            else {
342                signalWaiters();             // help clean or signal
343                if (interrupted)
344                    Thread.currentThread().interrupt();
345            }
346        }
347        return s;
348    }
349
350    /**
270       * Sets DONE status and wakes up threads waiting to join this task.
271       * @return status on exit
272       */
# Line 437 | Line 356 | public abstract class ForkJoinTask<V> im
356       * Helps and/or waits for completion from join, get, or invoke;
357       * called from either internal or external threads.
358       *
359 <     * @param submittedPool if nonnull, known externally submitted pool
359 >     * @param pool if nonnull, known submitted pool, else assumes current pool
360       * @param ran true if task known to have been exec'd
361       * @param interruptible true if park interruptibly when external
362       * @param timed true if use timed wait
363       * @param nanos if timed, timeout value
364       * @return ABNORMAL if interrupted, else status on exit
365       */
366 <    private int awaitJoin(ForkJoinPool submittedPool, boolean ran,
366 >    private int awaitDone(ForkJoinPool pool, boolean ran,
367                            boolean interruptible, boolean timed,
368                            long nanos) {
369 <        boolean internal; ForkJoinPool p, hostPool;
370 <        ForkJoinPool.WorkQueue q; int s;
371 <        Thread t; ForkJoinWorkerThread wt;
372 <        if (internal = ((t = Thread.currentThread())
373 <                        instanceof ForkJoinWorkerThread)) {
374 <            p = (wt = (ForkJoinWorkerThread)t).pool;
375 <            q = wt.workQueue;
376 <            if (submittedPool == null)
377 <                submittedPool = p;
459 <            else if (submittedPool != p)
460 <                internal = false;
369 >        ForkJoinPool p; boolean internal; int s; Thread t;
370 >        ForkJoinPool.WorkQueue q = null;
371 >        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
372 >            ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
373 >            p = wt.pool;
374 >            if (pool == null)
375 >                pool = p;
376 >            if (internal = (pool == p))
377 >                q = wt.workQueue;
378          }
379          else {
380 +            internal = false;
381              p = ForkJoinPool.common;
382 <            q = ForkJoinPool.commonQueue();
383 <            if (interruptible && Thread.interrupted())
384 <                return ABNORMAL;
382 >            if (pool == null)
383 >                pool = p;
384 >            if (pool == p && p != null)
385 >                q = p.externalQueue();
386          }
387 +        if (interruptible && Thread.interrupted())
388 +            return ABNORMAL;
389          if ((s = status) < 0)
390              return s;
391          long deadline = 0L;
# Line 475 | Line 396 | public abstract class ForkJoinTask<V> im
396                  deadline = 1L;
397          }
398          boolean uncompensate = false;
399 <        // try helping unless timed, external, and pool has workers
400 <        if (q != null && p != null &&
401 <            (internal || !timed || (p.mode & SMASK) == 0)) {
402 <            if (this instanceof CountedCompleter)
403 <                s = p.helpComplete(this, q, internal);
404 <            else if (!ran &&
405 <                     (!internal && q.externalTryUnpush(this)) ||
406 <                     q.tryRemove(this, internal))
407 <                s = doExec();
408 <            else
409 <                s = status;
410 <            if (s < 0)
411 <                return s;
491 <            else if (internal) {
492 <                if ((s = p.helpJoin(this, q)) < 0)
399 >        if (q != null && p != null) {  // try helping
400 >            // help even in timed mode if pool has no parallelism
401 >            boolean canHelp = !timed || (p.mode & SMASK) == 0;
402 >            if (canHelp) {
403 >                if ((this instanceof CountedCompleter) &&
404 >                    (s = p.helpComplete(this, q, internal)) < 0)
405 >                    return s;
406 >                if (!ran && ((!internal && q.externalTryUnpush(this)) ||
407 >                             q.tryRemove(this, internal)) && (s = doExec()) < 0)
408 >                    return s;
409 >            }
410 >            if (internal) {
411 >                if ((s = p.helpJoin(this, q, canHelp)) < 0)
412                      return s;
413                  if (s == UNCOMPENSATE)
414                      uncompensate = true;
496                interruptible = false;
415              }
416          }
417 <        return awaitDone(interruptible, deadline, submittedPool, uncompensate);
417 >        // block until done or cancelled wait
418 >        boolean interrupted = false, queued = false;
419 >        boolean parked = false, fail = false;
420 >        Aux node = null;
421 >        while ((s = status) >= 0) {
422 >            Aux a; long ns;
423 >            if (fail || (fail = (pool != null && pool.mode < 0)))
424 >                casStatus(s, s | (DONE | ABNORMAL)); // try to cancel
425 >            else if (parked && Thread.interrupted()) {
426 >                if (interruptible) {
427 >                    s = ABNORMAL;
428 >                    break;
429 >                }
430 >                interrupted = true;
431 >            }
432 >            else if (queued) {
433 >                if (deadline != 0L) {
434 >                    if ((ns = deadline - System.nanoTime()) <= 0L)
435 >                        break;
436 >                    LockSupport.parkNanos(ns);
437 >                }
438 >                else
439 >                    LockSupport.park();
440 >                parked = true;
441 >            }
442 >            else if (node != null) {
443 >                if ((a = aux) != null && a.ex != null)
444 >                    Thread.onSpinWait();     // exception in progress
445 >                else if (queued = casAux(node.next = a, node))
446 >                    LockSupport.setCurrentBlocker(this);
447 >            }
448 >            else {
449 >                try {
450 >                    node = new Aux(Thread.currentThread(), null);
451 >                } catch (Throwable ex) {     // cannot create
452 >                    fail = true;
453 >                }
454 >            }
455 >        }
456 >        if (pool != null && uncompensate)
457 >            pool.uncompensate();
458 >
459 >        if (queued) {
460 >            LockSupport.setCurrentBlocker(null);
461 >            if (s >= 0) { // cancellation similar to AbstractQueuedSynchronizer
462 >                outer: for (Aux a; (a = aux) != null && a.ex == null; ) {
463 >                    for (Aux trail = null;;) {
464 >                        Aux next = a.next;
465 >                        if (a == node) {
466 >                            if (trail != null)
467 >                                trail.casNext(trail, next);
468 >                            else if (casAux(a, next))
469 >                                break outer; // cannot be re-encountered
470 >                            break;           // restart
471 >                        } else {
472 >                            trail = a;
473 >                            if ((a = next) == null)
474 >                                break outer;
475 >                        }
476 >                    }
477 >                }
478 >            }
479 >            else {
480 >                signalWaiters();             // help clean or signal
481 >                if (interrupted)
482 >                    Thread.currentThread().interrupt();
483 >            }
484 >        }
485 >        return s;
486      }
487  
488      /**
# Line 652 | Line 638 | public abstract class ForkJoinTask<V> im
638      public final V join() {
639          int s;
640          if ((s = status) >= 0)
641 <            s = awaitJoin(null, false, false, false, 0L);
641 >            s = awaitDone(null, false, false, false, 0L);
642          if ((s & ABNORMAL) != 0)
643              reportException(s);
644          return getRawResult();
# Line 669 | Line 655 | public abstract class ForkJoinTask<V> im
655      public final V invoke() {
656          int s;
657          if ((s = doExec()) >= 0)
658 <            s = awaitJoin(null, true, false, false, 0L);
658 >            s = awaitDone(null, true, false, false, 0L);
659          if ((s & ABNORMAL) != 0)
660              reportException(s);
661          return getRawResult();
# Line 698 | Line 684 | public abstract class ForkJoinTask<V> im
684              throw new NullPointerException();
685          t2.fork();
686          if ((s1 = t1.doExec()) >= 0)
687 <            s1 = t1.awaitJoin(null, true, false, false, 0L);
687 >            s1 = t1.awaitDone(null, true, false, false, 0L);
688          if ((s1 & ABNORMAL) != 0) {
689              cancelIgnoringExceptions(t2);
690              t1.reportException(s1);
691          }
692 <        else if (((s2 = t2.awaitJoin(null, false, false, false, 0L)) & ABNORMAL) != 0)
692 >        else if (((s2 = t2.awaitDone(null, false, false, false, 0L)) & ABNORMAL) != 0)
693              t2.reportException(s2);
694      }
695  
# Line 734 | Line 720 | public abstract class ForkJoinTask<V> im
720              if (i == 0) {
721                  int s;
722                  if ((s = t.doExec()) >= 0)
723 <                    s = t.awaitJoin(null, true, false, false, 0L);
723 >                    s = t.awaitDone(null, true, false, false, 0L);
724                  if ((s & ABNORMAL) != 0)
725                      ex = t.getException(s);
726                  break;
# Line 747 | Line 733 | public abstract class ForkJoinTask<V> im
733                  if ((t = tasks[i]) != null) {
734                      int s;
735                      if ((s = t.status) >= 0)
736 <                        s = t.awaitJoin(null, false, false, false, 0L);
736 >                        s = t.awaitDone(null, false, false, false, 0L);
737                      if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
738                          break;
739                  }
# Line 797 | Line 783 | public abstract class ForkJoinTask<V> im
783              if (i == 0) {
784                  int s;
785                  if ((s = t.doExec()) >= 0)
786 <                    s = t.awaitJoin(null, true, false, false, 0L);
786 >                    s = t.awaitDone(null, true, false, false, 0L);
787                  if ((s & ABNORMAL) != 0)
788                      ex = t.getException(s);
789                  break;
# Line 810 | Line 796 | public abstract class ForkJoinTask<V> im
796                  if ((t = ts.get(i)) != null) {
797                      int s;
798                      if ((s = t.status) >= 0)
799 <                        s = t.awaitJoin(null, false, false, false, 0L);
799 >                        s = t.awaitDone(null, false, false, false, 0L);
800                      if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
801                          break;
802                  }
# Line 961 | Line 947 | public abstract class ForkJoinTask<V> im
947       * member of a ForkJoinPool and was interrupted while waiting
948       */
949      public final V get() throws InterruptedException, ExecutionException {
950 <        int s = awaitJoin(null, false, true, false, 0L);
950 >        int s = awaitDone(null, false, true, false, 0L);
951          if ((s & ABNORMAL) != 0)
952              reportExecutionException(s);
953          return getRawResult();
# Line 984 | Line 970 | public abstract class ForkJoinTask<V> im
970      public final V get(long timeout, TimeUnit unit)
971          throws InterruptedException, ExecutionException, TimeoutException {
972          long nanos = unit.toNanos(timeout);
973 <        int s = awaitJoin(null, false, true, true, nanos);
973 >        int s = awaitDone(null, false, true, true, nanos);
974          if (s >= 0 || (s & ABNORMAL) != 0)
975              reportExecutionException(s);
976          return getRawResult();
# Line 998 | Line 984 | public abstract class ForkJoinTask<V> im
984       */
985      public final void quietlyJoin() {
986          if (status >= 0)
987 <            awaitJoin(null, false, false, false, 0L);
987 >            awaitDone(null, false, false, false, 0L);
988      }
989  
990  
# Line 1009 | Line 995 | public abstract class ForkJoinTask<V> im
995       */
996      public final void quietlyInvoke() {
997          if (doExec() >= 0)
998 <            awaitJoin(null, true, false, false, 0L);
998 >            awaitDone(null, true, false, false, 0L);
999      }
1000  
1001      // Versions of join/get for pool.invoke* methods that use external,
1002      // possibly-non-commonPool submits
1003  
1004 +    final void awaitPoolInvoke(ForkJoinPool pool) {
1005 +        awaitDone(pool, false, false, false, 0L);
1006 +    }
1007 +    final void awaitPoolInvoke(ForkJoinPool pool, long nanos) {
1008 +        awaitDone(pool, false, true, true, nanos);
1009 +    }
1010      final V joinForPoolInvoke(ForkJoinPool pool) {
1011 <        int s;
1020 <        if ((s = status) >= 0)
1021 <            s = awaitJoin(pool, false, false, false, 0L);
1011 >        int s = awaitDone(pool, false, false, false, 0L);
1012          if ((s & ABNORMAL) != 0)
1013              reportException(s);
1014          return getRawResult();
1015      }
1026    final void tryJoinForPoolInvoke(ForkJoinPool pool) {
1027        if (status >= 0)
1028            awaitJoin(pool, false, false, false, 0L);
1029    }
1016      final V getForPoolInvoke(ForkJoinPool pool)
1017          throws InterruptedException, ExecutionException {
1018 <        int s = awaitJoin(pool, false, true, false, 0L);
1018 >        int s = awaitDone(pool, false, true, false, 0L);
1019          if ((s & ABNORMAL) != 0)
1020              reportExecutionException(s);
1021          return getRawResult();
1022      }
1037
1023      final V getForPoolInvoke(ForkJoinPool pool, long nanos)
1024          throws InterruptedException, ExecutionException, TimeoutException {
1025 <        int s = awaitJoin(pool, false, true, true, nanos);
1025 >        int s = awaitDone(pool, false, true, true, nanos);
1026          if (s >= 0 || (s & ABNORMAL) != 0)
1027              reportExecutionException(s);
1028          return getRawResult();

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines