227 |
|
* atomicity. Status is initially zero, and takes on nonnegative |
228 |
|
* values until completed, upon which it holds (sign bit) DONE, |
229 |
|
* possibly with ABNORMAL (cancelled or exceptional) and THROWN |
230 |
< |
* (in which case an exception has been stored). These control |
231 |
< |
* bits occupy only (some of) the upper half (16 bits) of status |
232 |
< |
* field. The lower bits are used for user-defined tags. |
230 |
> |
* (in which case an exception has been stored). A value of |
231 |
> |
* ABNORMAL without DONE signifies an interrupted wait. These |
232 |
> |
* control bits occupy only (some of) the upper half (16 bits) of |
233 |
> |
* status field. The lower bits are used for user-defined tags. |
234 |
|
*/ |
235 |
|
private static final int DONE = 1 << 31; // must be negative |
236 |
< |
private static final int ABNORMAL = 1 << 16; // set atomically with DONE |
237 |
< |
private static final int THROWN = 1 << 17; // set atomically with ABNORMAL |
236 |
> |
private static final int ABNORMAL = 1 << 16; |
237 |
> |
private static final int THROWN = 1 << 17; |
238 |
|
private static final int SMASK = 0xffff; // short bits for tags |
239 |
|
// sentinels can be any positive upper half value: |
239 |
– |
private static final int INTRPT = 1 << 16; // awaitDone interrupt return |
240 |
|
static final int ADJUST = 1 << 16; // uncompensate after block |
241 |
|
|
242 |
|
// Fields |
243 |
|
volatile int status; // accessed directly by pool and workers |
244 |
|
private transient volatile Aux aux; // either waiters or thrown Exception |
245 |
+ |
|
246 |
|
// Support for atomic operations |
247 |
|
private static final VarHandle STATUS; |
248 |
|
private static final VarHandle AUX; |
274 |
|
* |
275 |
|
* @param interruptible true if wait can be cancelled by interrupt |
276 |
|
* @param deadline if non-zero use timed waits and possibly timeout |
277 |
< |
* @param pool if nonull, pool to uncompensate when unblocking |
278 |
< |
* @return status on exit, or INTRPT if interrupted while waiting |
277 |
> |
* @param adjust if true, uncompensate pool after unblocking |
278 |
> |
* @param pool if nonull, current pool (possibly comonPool if unknown) |
279 |
> |
* @return status on exit, or ABNORMAL if interrupted while waiting |
280 |
|
*/ |
281 |
< |
private int awaitDone(boolean interruptible, long deadline, |
281 |
> |
private int awaitDone(boolean interruptible, long deadline, boolean adjust, |
282 |
|
ForkJoinPool pool) { |
283 |
< |
int s; |
283 |
> |
int s; Aux node = null; boolean interrupted = false, queued = false; |
284 |
> |
long nanos = 0L; |
285 |
|
try { |
286 |
< |
Aux node = null; boolean interrupted = false, queued = false; |
284 |
< |
for (;;) { |
285 |
< |
Aux a; long nanos; |
286 |
> |
for (Aux a;;) { |
287 |
|
if ((s = status) < 0) |
288 |
|
break; |
289 |
|
else if (node == null) |
294 |
|
else if (queued = casAux(node.next = a, node)) |
295 |
|
LockSupport.setCurrentBlocker(this); |
296 |
|
} |
297 |
< |
else { |
298 |
< |
if (deadline == 0L) |
299 |
< |
LockSupport.park(); |
299 |
< |
else if ((nanos = deadline - System.nanoTime()) > 0L) |
300 |
< |
LockSupport.parkNanos(nanos); |
301 |
< |
else { |
302 |
< |
s = 0; // timeout |
303 |
< |
break; |
304 |
< |
} |
305 |
< |
if ((interrupted |= Thread.interrupted()) && interruptible) { |
306 |
< |
s = INTRPT; |
297 |
> |
else if (Thread.interrupted()) { |
298 |
> |
if (interruptible) { |
299 |
> |
s = ABNORMAL; |
300 |
|
break; |
301 |
|
} |
302 |
+ |
interrupted = true; |
303 |
|
} |
304 |
+ |
else if (pool != null && pool.isStopping()) |
305 |
+ |
casStatus(s, s | (DONE | ABNORMAL)); // help cancel |
306 |
+ |
else if (deadline != 0L && |
307 |
+ |
(nanos = deadline - System.nanoTime()) <= 0L) |
308 |
+ |
break; // timeout |
309 |
+ |
else if ((s = status) < 0) |
310 |
+ |
break; // recheck |
311 |
+ |
else if (nanos > 0L) |
312 |
+ |
LockSupport.parkNanos(nanos); |
313 |
+ |
else |
314 |
+ |
LockSupport.park(); |
315 |
|
} |
316 |
< |
if (queued) { |
317 |
< |
LockSupport.setCurrentBlocker(null); |
318 |
< |
if (s >= 0) { // try to unsplice after cancellation |
319 |
< |
outer: for (Aux a; (a = aux) != null && a.ex == null; ) { |
320 |
< |
for (Aux trail = null;;) { |
321 |
< |
Aux next = a.next; |
322 |
< |
if (a == node) { |
323 |
< |
if (trail != null) |
324 |
< |
trail.casNext(trail, next); |
325 |
< |
else if (casAux(a, next)) |
326 |
< |
break outer; // cannot be re-encountered |
327 |
< |
break; // restart |
328 |
< |
} else { |
329 |
< |
trail = a; |
330 |
< |
if ((a = next) == null) |
331 |
< |
break outer; |
332 |
< |
} |
316 |
> |
} finally { |
317 |
> |
if (adjust && pool != null) |
318 |
> |
pool.uncompensate(); |
319 |
> |
} |
320 |
> |
if (queued) { |
321 |
> |
LockSupport.setCurrentBlocker(null); |
322 |
> |
if (s >= 0) { // try to unsplice after cancellation |
323 |
> |
outer: for (Aux a; (a = aux) != null && a.ex == null; ) { |
324 |
> |
for (Aux trail = null;;) { |
325 |
> |
Aux next = a.next; |
326 |
> |
if (a == node) { |
327 |
> |
if (trail != null) |
328 |
> |
trail.casNext(trail, next); |
329 |
> |
else if (casAux(a, next)) |
330 |
> |
break outer; // cannot be re-encountered |
331 |
> |
break; // restart |
332 |
> |
} else { |
333 |
> |
trail = a; |
334 |
> |
if ((a = next) == null) |
335 |
> |
break outer; |
336 |
|
} |
337 |
|
} |
338 |
|
} |
331 |
– |
else { |
332 |
– |
signalWaiters(); // help clean or signal |
333 |
– |
if (interrupted) |
334 |
– |
Thread.currentThread().interrupt(); |
335 |
– |
} |
339 |
|
} |
340 |
< |
} finally { // for sake of OOME on node construction |
341 |
< |
if (pool != null) |
342 |
< |
pool.uncompensate(); |
340 |
> |
else { |
341 |
> |
signalWaiters(); // help clean or signal |
342 |
> |
if (interrupted) |
343 |
> |
Thread.currentThread().interrupt(); |
344 |
> |
} |
345 |
|
} |
346 |
|
return s; |
347 |
|
} |
428 |
|
} |
429 |
|
|
430 |
|
/** |
431 |
< |
* Helps and/or waits for completion. |
431 |
> |
* Helps and/or waits for completion from join, or async invoke if ran true. |
432 |
|
* |
433 |
< |
* @param ran true if task known to be invoked |
434 |
< |
* @param interruptible true if wait can be cancelled by interrupt |
430 |
< |
* @param deadline if non-zero use timed waits and possibly timeout |
431 |
< |
* @return status on exit, or INTRPT if interruptible and interrupted |
433 |
> |
* @param ran true if task known to have been exec'd |
434 |
> |
* @return status on exit |
435 |
|
*/ |
436 |
< |
private int helpOrWait(boolean ran, boolean interruptible, long deadline) { |
437 |
< |
boolean cc = (this instanceof CountedCompleter); |
436 |
> |
private int awaitJoin(boolean ran) { |
437 |
> |
boolean adjust = false, owned; |
438 |
|
Thread t; ForkJoinWorkerThread wt; |
439 |
< |
ForkJoinPool.WorkQueue q; ForkJoinPool p; boolean owned; int s; |
439 |
> |
ForkJoinPool p; ForkJoinPool.WorkQueue q; int s; |
440 |
|
if (owned = ((t = Thread.currentThread()) |
441 |
|
instanceof ForkJoinWorkerThread)) { |
442 |
|
p = (wt = (ForkJoinWorkerThread)t).pool; |
446 |
|
p = ForkJoinPool.common; |
447 |
|
q = ForkJoinPool.commonQueue(); |
448 |
|
} |
449 |
< |
if (p == null || q == null) |
450 |
< |
s = 0; |
451 |
< |
else if (cc) |
452 |
< |
s = p.helpComplete(this, q, owned); |
453 |
< |
else if (ran || !q.tryRemove(this, owned) || (s = doExec()) >= 0) |
454 |
< |
s = owned ? p.helpJoin(this, q) : 0; |
455 |
< |
return (s < 0) ? s : awaitDone(interruptible, deadline, |
456 |
< |
(s == ADJUST) ? p : null); |
449 |
> |
if (q != null && p != null) { |
450 |
> |
if ((this instanceof CountedCompleter) ? |
451 |
> |
(s = p.helpComplete(this, q, owned)) < 0 : |
452 |
> |
(!ran && q.tryRemove(this, owned) && (s = doExec()) < 0)) |
453 |
> |
return s; |
454 |
> |
else if (owned) { |
455 |
> |
if ((s = p.helpJoin(this, q)) < 0) |
456 |
> |
return s; |
457 |
> |
else if (s == ADJUST) |
458 |
> |
adjust = true; |
459 |
> |
} |
460 |
> |
} |
461 |
> |
return awaitDone(false, 0L, adjust, p); |
462 |
> |
} |
463 |
> |
|
464 |
> |
/** |
465 |
> |
* Helps and/or waits for completion from get. |
466 |
> |
* |
467 |
> |
* @param timed if true use timed wait |
468 |
> |
* @param nanos wait time |
469 |
> |
* @return status on exit, or ABNORMAL if interruptible and interrupted |
470 |
> |
*/ |
471 |
> |
private int awaitGet(boolean timed, long nanos) { |
472 |
> |
boolean adjust = false, owned; |
473 |
> |
Thread t; ForkJoinWorkerThread wt; |
474 |
> |
ForkJoinPool p; ForkJoinPool.WorkQueue q; int s; long deadline; |
475 |
> |
if (owned = ((t = Thread.currentThread()) |
476 |
> |
instanceof ForkJoinWorkerThread)) { |
477 |
> |
p = (wt = (ForkJoinWorkerThread)t).pool; |
478 |
> |
q = wt.workQueue; |
479 |
> |
} |
480 |
> |
else if (!Thread.interrupted()) { |
481 |
> |
p = ForkJoinPool.common; |
482 |
> |
q = ForkJoinPool.commonQueue(); |
483 |
> |
} |
484 |
> |
else |
485 |
> |
return ABNORMAL; |
486 |
> |
if (!timed) |
487 |
> |
deadline = 0L; |
488 |
> |
else if (nanos <= 0L) |
489 |
> |
return 0; |
490 |
> |
else if ((deadline = nanos + System.nanoTime()) == 0L) |
491 |
> |
deadline = 1L; |
492 |
> |
if (q != null && p != null) { |
493 |
> |
if ((!timed || p.isSaturated()) && |
494 |
> |
((this instanceof CountedCompleter) ? |
495 |
> |
(s = p.helpComplete(this, q, owned)) < 0 : |
496 |
> |
(q.tryRemove(this, owned) && (s = doExec()) < 0))) |
497 |
> |
return s; |
498 |
> |
else if (owned) { |
499 |
> |
if ((s = p.helpJoin(this, q)) < 0) |
500 |
> |
return s; |
501 |
> |
else if (s == ADJUST) |
502 |
> |
adjust = true; |
503 |
> |
} |
504 |
> |
} |
505 |
> |
return awaitDone(!owned, deadline, adjust, p); |
506 |
|
} |
507 |
|
|
508 |
|
/** |
509 |
< |
* Cancels, ignoring any exceptions thrown by cancel. Used during |
510 |
< |
* worker and pool shutdown. Cancel is spec'ed not to throw any |
511 |
< |
* exceptions, but if it does anyway, we have no recourse during |
460 |
< |
* shutdown, so guard against this case. |
509 |
> |
* Cancels, ignoring any exceptions thrown by cancel. Cancel is |
510 |
> |
* spec'ed not to throw any exceptions, but if it does anyway, we |
511 |
> |
* have no recourse, so guard against this case. |
512 |
|
*/ |
513 |
< |
static final void cancelIgnoringExceptions(ForkJoinTask<?> t) { |
513 |
> |
static final void cancelIgnoringExceptions(Future<?> t) { |
514 |
|
if (t != null) { |
515 |
|
try { |
516 |
|
t.cancel(false); |
563 |
|
} |
564 |
|
|
565 |
|
/** |
566 |
+ |
* Returns exception associated with the given status, or null if none. |
567 |
+ |
*/ |
568 |
+ |
private Throwable getException(int s) { |
569 |
+ |
Throwable ex = null; |
570 |
+ |
if ((s & ABNORMAL) != 0 && |
571 |
+ |
((s & THROWN) == 0 || (ex = getThrowableException()) == null)) |
572 |
+ |
ex = new CancellationException(); |
573 |
+ |
return ex; |
574 |
+ |
} |
575 |
+ |
|
576 |
+ |
/** |
577 |
|
* Throws exception associated with the given status, or |
578 |
|
* CancellationException if none recorded. |
579 |
|
*/ |
643 |
|
public final V join() { |
644 |
|
int s; |
645 |
|
if ((s = status) >= 0) |
646 |
< |
s = helpOrWait(false, false, 0L); |
646 |
> |
s = awaitJoin(false); |
647 |
|
if ((s & ABNORMAL) != 0) |
648 |
|
reportException(s); |
649 |
|
return getRawResult(); |
660 |
|
public final V invoke() { |
661 |
|
int s; |
662 |
|
if ((s = doExec()) >= 0) |
663 |
< |
s = helpOrWait(false, true, 0L); |
663 |
> |
s = awaitJoin(true); |
664 |
|
if ((s & ABNORMAL) != 0) |
665 |
|
reportException(s); |
666 |
|
return getRawResult(); |
689 |
|
throw new NullPointerException(); |
690 |
|
t2.fork(); |
691 |
|
if ((s1 = t1.doExec()) >= 0) |
692 |
< |
s1 = t1.helpOrWait(true, false, 0L); |
693 |
< |
if ((s1 & ABNORMAL) != 0) |
692 |
> |
s1 = t1.awaitJoin(true); |
693 |
> |
if ((s1 & ABNORMAL) != 0) { |
694 |
> |
cancelIgnoringExceptions(t2); |
695 |
|
t1.reportException(s1); |
696 |
< |
if ((s2 = t2.status) >= 0) |
697 |
< |
s2 = t2.helpOrWait(false, false, 0L); |
698 |
< |
if ((s2 & ABNORMAL) != 0) |
699 |
< |
t2.reportException(s2); |
696 |
> |
} |
697 |
> |
else { |
698 |
> |
if ((s2 = t2.status) >= 0) |
699 |
> |
s2 = t2.awaitJoin(false); |
700 |
> |
if ((s2 & ABNORMAL) != 0) |
701 |
> |
t2.reportException(s2); |
702 |
> |
} |
703 |
|
} |
704 |
|
|
705 |
|
/** |
728 |
|
} |
729 |
|
if (i == 0) { |
730 |
|
if ((s = t.doExec()) >= 0) |
731 |
< |
s = t.helpOrWait(true, false, 0L); |
731 |
> |
s = t.awaitJoin(true); |
732 |
|
if ((s & ABNORMAL) != 0) |
733 |
< |
ex = t.getException(); |
733 |
> |
ex = t.getException(s); |
734 |
|
break; |
735 |
|
} |
736 |
|
t.fork(); |
740 |
|
ForkJoinTask<?> t; |
741 |
|
if ((t = tasks[i]) != null) { |
742 |
|
if ((s = t.status) >= 0) |
743 |
< |
s = t.helpOrWait(false, false, 0L); |
744 |
< |
if ((s & ABNORMAL) != 0) { |
679 |
< |
ex = t.getException(); |
743 |
> |
s = t.awaitJoin(false); |
744 |
> |
if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null) |
745 |
|
break; |
681 |
– |
} |
746 |
|
} |
747 |
|
} |
748 |
|
} |
749 |
< |
if (ex != null) |
749 |
> |
if (ex != null) { |
750 |
> |
for (int i = 1, s; i <= last; ++i) |
751 |
> |
cancelIgnoringExceptions(tasks[i]); |
752 |
|
rethrow(ex); |
753 |
+ |
} |
754 |
|
} |
755 |
|
|
756 |
|
/** |
789 |
|
} |
790 |
|
if (i == 0) { |
791 |
|
if ((s = t.doExec()) >= 0) |
792 |
< |
s = t.helpOrWait(true, false, 0L); |
792 |
> |
s = t.awaitJoin(true); |
793 |
|
if ((s & ABNORMAL) != 0) |
794 |
< |
ex = t.getException(); |
794 |
> |
ex = t.getException(s); |
795 |
|
break; |
796 |
|
} |
797 |
|
t.fork(); |
801 |
|
ForkJoinTask<?> t; |
802 |
|
if ((t = ts.get(i)) != null) { |
803 |
|
if ((s = t.status) >= 0) |
804 |
< |
s = t.helpOrWait(false, false, 0L); |
805 |
< |
if ((s & ABNORMAL) != 0) { |
739 |
< |
ex = t.getException(); |
804 |
> |
s = t.awaitJoin(false); |
805 |
> |
if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null) |
806 |
|
break; |
741 |
– |
} |
807 |
|
} |
808 |
|
} |
809 |
|
} |
810 |
< |
if (ex != null) |
810 |
> |
if (ex != null) { |
811 |
> |
for (int i = 1, s; i <= last; ++i) |
812 |
> |
cancelIgnoringExceptions(ts.get(i)); |
813 |
|
rethrow(ex); |
814 |
+ |
} |
815 |
|
return tasks; |
816 |
|
} |
817 |
|
|
882 |
|
* @return the exception, or {@code null} if none |
883 |
|
*/ |
884 |
|
public final Throwable getException() { |
885 |
< |
int s = status; |
818 |
< |
return ((s & ABNORMAL) == 0 ? null : |
819 |
< |
(s & THROWN) == 0 ? new CancellationException() : |
820 |
< |
getThrowableException()); |
885 |
> |
return getException(status); |
886 |
|
} |
887 |
|
|
888 |
|
/** |
952 |
|
* member of a ForkJoinPool and was interrupted while waiting |
953 |
|
*/ |
954 |
|
public final V get() throws InterruptedException, ExecutionException { |
955 |
< |
int s; |
956 |
< |
if (Thread.interrupted()) |
892 |
< |
s = INTRPT; |
893 |
< |
else if ((s = status) >= 0) |
894 |
< |
s = helpOrWait(false, true, 0L); |
895 |
< |
if (s == INTRPT) |
955 |
> |
int s; Throwable ex; |
956 |
> |
if ((s = status) >= 0 && (s = awaitGet(false, 0L)) >= 0) |
957 |
|
throw new InterruptedException(); |
958 |
< |
else if ((s & THROWN) != 0) |
959 |
< |
throw new ExecutionException(getThrowableException()); |
960 |
< |
else if ((s & ABNORMAL) != 0) |
958 |
> |
else if ((s & ABNORMAL) == 0) |
959 |
> |
return getRawResult(); |
960 |
> |
else if ((s & THROWN) == 0 || (ex = getThrowableException()) == null) |
961 |
|
throw new CancellationException(); |
962 |
|
else |
963 |
< |
return getRawResult(); |
963 |
> |
throw new ExecutionException(ex); |
964 |
|
} |
965 |
|
|
966 |
|
/** |
979 |
|
*/ |
980 |
|
public final V get(long timeout, TimeUnit unit) |
981 |
|
throws InterruptedException, ExecutionException, TimeoutException { |
982 |
+ |
int s; Throwable ex; |
983 |
|
long nanos = unit.toNanos(timeout); |
984 |
< |
int s; |
985 |
< |
if (Thread.interrupted()) |
986 |
< |
s = INTRPT; |
987 |
< |
else if ((s = status) >= 0 && nanos > 0L) { |
988 |
< |
long d = nanos + System.nanoTime(); |
927 |
< |
s = helpOrWait(false, true, (d == 0L) ? 1L : d); // avoid 0 |
984 |
> |
if ((s = status) >= 0 && (s = awaitGet(true, nanos)) >= 0) { |
985 |
> |
if (s == ABNORMAL) |
986 |
> |
throw new InterruptedException(); |
987 |
> |
else |
988 |
> |
throw new TimeoutException(); |
989 |
|
} |
990 |
< |
|
991 |
< |
if (s == INTRPT) |
992 |
< |
throw new InterruptedException(); |
932 |
< |
else if (s >= 0) |
933 |
< |
throw new TimeoutException(); |
934 |
< |
else if ((s & THROWN) != 0) |
935 |
< |
throw new ExecutionException(getThrowableException()); |
936 |
< |
else if ((s & ABNORMAL) != 0) |
990 |
> |
else if ((s & ABNORMAL) == 0) |
991 |
> |
return getRawResult(); |
992 |
> |
else if ((s & THROWN) == 0 || (ex = getThrowableException()) == null) |
993 |
|
throw new CancellationException(); |
994 |
|
else |
995 |
< |
return getRawResult(); |
995 |
> |
throw new ExecutionException(ex); |
996 |
|
} |
997 |
|
|
998 |
|
/** |
1003 |
|
*/ |
1004 |
|
public final void quietlyJoin() { |
1005 |
|
if (status >= 0) |
1006 |
< |
helpOrWait(false, false, 0L); |
1006 |
> |
awaitJoin(false); |
1007 |
|
} |
1008 |
|
|
1009 |
|
/** |
1013 |
|
*/ |
1014 |
|
public final void quietlyInvoke() { |
1015 |
|
if (doExec() >= 0) |
1016 |
< |
helpOrWait(true, false, 0L); |
1016 |
> |
awaitJoin(true); |
1017 |
|
} |
1018 |
|
|
1019 |
|
/** |
1027 |
|
Thread t; ForkJoinWorkerThread w; ForkJoinPool p; |
1028 |
|
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread && |
1029 |
|
(p = (w = (ForkJoinWorkerThread)t).pool) != null) |
1030 |
< |
p.helpQuiescePool(w.workQueue); |
1030 |
> |
p.helpQuiescePool(w.workQueue, Long.MAX_VALUE, false); |
1031 |
|
else |
1032 |
< |
ForkJoinPool.quiesceCommonPool(); |
1032 |
> |
ForkJoinPool.common.externalHelpQuiescePool(Long.MAX_VALUE, false); |
1033 |
|
} |
1034 |
|
|
1035 |
|
/** |