55 |
|
* minimize other blocking synchronization apart from joining other |
56 |
|
* tasks or using synchronizers such as Phasers that are advertised to |
57 |
|
* cooperate with fork/join scheduling. Subdividable tasks should also |
58 |
< |
* not perform blocking IO, and should ideally access variables that |
58 |
> |
* not perform blocking I/O, and should ideally access variables that |
59 |
|
* are completely independent of those accessed by other running |
60 |
|
* tasks. These guidelines are loosely enforced by not permitting |
61 |
|
* checked exceptions such as {@code IOExceptions} to be |
73 |
|
* <p>It is possible to define and use ForkJoinTasks that may block, |
74 |
|
* but doing do requires three further considerations: (1) Completion |
75 |
|
* of few if any <em>other</em> tasks should be dependent on a task |
76 |
< |
* that blocks on external synchronization or IO. Event-style async |
76 |
> |
* that blocks on external synchronization or I/O. Event-style async |
77 |
|
* tasks that are never joined (for example, those subclassing {@link |
78 |
|
* CountedCompleter}) often fall into this category. (2) To minimize |
79 |
|
* resource impact, tasks should be small; ideally performing only the |
134 |
|
* (DAG). Otherwise, executions may encounter a form of deadlock as |
135 |
|
* tasks cyclically wait for each other. However, this framework |
136 |
|
* supports other methods and techniques (for example the use of |
137 |
< |
* {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that |
137 |
> |
* {@link java.util.concurrent.Phaser Phaser}, {@link #helpQuiesce}, and {@link #complete}) that |
138 |
|
* may be of use in constructing custom subclasses for problems that |
139 |
< |
* are not statically structured as DAGs. To support such usages a |
139 |
> |
* are not statically structured as DAGs. To support such usages, a |
140 |
|
* ForkJoinTask may be atomically <em>tagged</em> with a {@code short} |
141 |
|
* value using {@link #setForkJoinTaskTag} or {@link |
142 |
|
* #compareAndSetForkJoinTaskTag} and checked using {@link |
285 |
|
*/ |
286 |
|
private int externalAwaitDone() { |
287 |
|
int s; |
288 |
< |
ForkJoinPool.externalHelpJoin(this); |
289 |
< |
boolean interrupted = false; |
290 |
< |
while ((s = status) >= 0) { |
291 |
< |
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { |
292 |
< |
synchronized (this) { |
293 |
< |
if (status >= 0) { |
294 |
< |
try { |
295 |
< |
wait(); |
296 |
< |
} catch (InterruptedException ie) { |
297 |
< |
interrupted = true; |
288 |
> |
ForkJoinPool cp = ForkJoinPool.common; |
289 |
> |
if ((s = status) >= 0) { |
290 |
> |
if (cp != null) { |
291 |
> |
if (this instanceof CountedCompleter) |
292 |
> |
s = cp.externalHelpComplete((CountedCompleter<?>)this); |
293 |
> |
else if (cp.tryExternalUnpush(this)) |
294 |
> |
s = doExec(); |
295 |
> |
} |
296 |
> |
if (s >= 0 && (s = status) >= 0) { |
297 |
> |
boolean interrupted = false; |
298 |
> |
do { |
299 |
> |
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { |
300 |
> |
synchronized (this) { |
301 |
> |
if (status >= 0) { |
302 |
> |
try { |
303 |
> |
wait(); |
304 |
> |
} catch (InterruptedException ie) { |
305 |
> |
interrupted = true; |
306 |
> |
} |
307 |
> |
} |
308 |
> |
else |
309 |
> |
notifyAll(); |
310 |
|
} |
311 |
|
} |
312 |
< |
else |
313 |
< |
notifyAll(); |
314 |
< |
} |
312 |
> |
} while ((s = status) >= 0); |
313 |
> |
if (interrupted) |
314 |
> |
Thread.currentThread().interrupt(); |
315 |
|
} |
316 |
|
} |
305 |
– |
if (interrupted) |
306 |
– |
Thread.currentThread().interrupt(); |
317 |
|
return s; |
318 |
|
} |
319 |
|
|
322 |
|
*/ |
323 |
|
private int externalInterruptibleAwaitDone() throws InterruptedException { |
324 |
|
int s; |
325 |
+ |
ForkJoinPool cp = ForkJoinPool.common; |
326 |
|
if (Thread.interrupted()) |
327 |
|
throw new InterruptedException(); |
328 |
< |
ForkJoinPool.externalHelpJoin(this); |
328 |
> |
if ((s = status) >= 0 && cp != null) { |
329 |
> |
if (this instanceof CountedCompleter) |
330 |
> |
cp.externalHelpComplete((CountedCompleter<?>)this); |
331 |
> |
else if (cp.tryExternalUnpush(this)) |
332 |
> |
doExec(); |
333 |
> |
} |
334 |
|
while ((s = status) >= 0) { |
335 |
|
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { |
336 |
|
synchronized (this) { |
411 |
|
final Throwable ex; |
412 |
|
ExceptionNode next; |
413 |
|
final long thrower; // use id not ref to avoid weak cycles |
414 |
+ |
final int hashCode; // store task hashCode before weak ref disappears |
415 |
|
ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) { |
416 |
|
super(task, exceptionTableRefQueue); |
417 |
|
this.ex = ex; |
418 |
|
this.next = next; |
419 |
|
this.thrower = Thread.currentThread().getId(); |
420 |
+ |
this.hashCode = System.identityHashCode(task); |
421 |
|
} |
422 |
|
} |
423 |
|
|
453 |
|
} |
454 |
|
|
455 |
|
/** |
456 |
< |
* Records exception and possibly propagates |
456 |
> |
* Records exception and possibly propagates. |
457 |
|
* |
458 |
|
* @return status on exit |
459 |
|
*/ |
486 |
|
} |
487 |
|
|
488 |
|
/** |
489 |
< |
* Removes exception node and clears status |
489 |
> |
* Removes exception node and clears status. |
490 |
|
*/ |
491 |
|
private void clearExceptionalCompletion() { |
492 |
|
int h = System.identityHashCode(this); |
576 |
|
/** |
577 |
|
* Poll stale refs and remove them. Call only while holding lock. |
578 |
|
*/ |
579 |
+ |
/** |
580 |
+ |
* Poll stale refs and remove them. Call only while holding lock. |
581 |
+ |
*/ |
582 |
|
private static void expungeStaleExceptions() { |
583 |
|
for (Object x; (x = exceptionTableRefQueue.poll()) != null;) { |
584 |
|
if (x instanceof ExceptionNode) { |
585 |
< |
ForkJoinTask<?> key = ((ExceptionNode)x).get(); |
585 |
> |
int hashCode = ((ExceptionNode)x).hashCode; |
586 |
|
ExceptionNode[] t = exceptionTable; |
587 |
< |
int i = System.identityHashCode(key) & (t.length - 1); |
587 |
> |
int i = hashCode & (t.length - 1); |
588 |
|
ExceptionNode e = t[i]; |
589 |
|
ExceptionNode pred = null; |
590 |
|
while (e != null) { |
621 |
|
/** |
622 |
|
* A version of "sneaky throw" to relay exceptions |
623 |
|
*/ |
624 |
< |
static void rethrow(final Throwable ex) { |
625 |
< |
if (ex != null) { |
626 |
< |
if (ex instanceof Error) |
606 |
< |
throw (Error)ex; |
607 |
< |
if (ex instanceof RuntimeException) |
608 |
< |
throw (RuntimeException)ex; |
609 |
< |
throw uncheckedThrowable(ex, RuntimeException.class); |
610 |
< |
} |
624 |
> |
static void rethrow(Throwable ex) { |
625 |
> |
if (ex != null) |
626 |
> |
ForkJoinTask.<RuntimeException>uncheckedThrow(ex); |
627 |
|
} |
628 |
|
|
629 |
|
/** |
632 |
|
* unchecked exceptions |
633 |
|
*/ |
634 |
|
@SuppressWarnings("unchecked") static <T extends Throwable> |
635 |
< |
T uncheckedThrowable(final Throwable t, final Class<T> c) { |
636 |
< |
return (T)t; // rely on vacuous cast |
635 |
> |
void uncheckedThrow(Throwable t) throws T { |
636 |
> |
throw (T)t; // rely on vacuous cast |
637 |
|
} |
638 |
|
|
639 |
|
/** |
668 |
|
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) |
669 |
|
((ForkJoinWorkerThread)t).workQueue.push(this); |
670 |
|
else |
671 |
< |
ForkJoinPool.commonPool.externalPush(this); |
671 |
> |
ForkJoinPool.common.externalPush(this); |
672 |
|
return this; |
673 |
|
} |
674 |
|
|
787 |
|
* unprocessed. |
788 |
|
* |
789 |
|
* @param tasks the collection of tasks |
790 |
+ |
* @param <T> the type of the values returned from the tasks |
791 |
|
* @return the tasks argument, to simplify usage |
792 |
|
* @throws NullPointerException if tasks or any element are null |
793 |
|
*/ |
845 |
|
* <p>This method is designed to be invoked by <em>other</em> |
846 |
|
* tasks. To terminate the current task, you can just return or |
847 |
|
* throw an unchecked exception from its computation method, or |
848 |
< |
* invoke {@link #completeExceptionally}. |
848 |
> |
* invoke {@link #completeExceptionally(Throwable)}. |
849 |
|
* |
850 |
|
* @param mayInterruptIfRunning this value has no effect in the |
851 |
|
* default implementation because interrupts are not used to |
995 |
|
if (Thread.interrupted()) |
996 |
|
throw new InterruptedException(); |
997 |
|
// Messy in part because we measure in nanosecs, but wait in millisecs |
998 |
< |
int s; long ns, ms; |
999 |
< |
if ((s = status) >= 0 && (ns = unit.toNanos(timeout)) > 0L) { |
998 |
> |
int s; long ms; |
999 |
> |
long ns = unit.toNanos(timeout); |
1000 |
> |
ForkJoinPool cp; |
1001 |
> |
if ((s = status) >= 0 && ns > 0L) { |
1002 |
|
long deadline = System.nanoTime() + ns; |
1003 |
|
ForkJoinPool p = null; |
1004 |
|
ForkJoinPool.WorkQueue w = null; |
1009 |
|
w = wt.workQueue; |
1010 |
|
p.helpJoinOnce(w, this); // no retries on failure |
1011 |
|
} |
1012 |
< |
else |
1013 |
< |
ForkJoinPool.externalHelpJoin(this); |
1012 |
> |
else if ((cp = ForkJoinPool.common) != null) { |
1013 |
> |
if (this instanceof CountedCompleter) |
1014 |
> |
cp.externalHelpComplete((CountedCompleter<?>)this); |
1015 |
> |
else if (cp.tryExternalUnpush(this)) |
1016 |
> |
doExec(); |
1017 |
> |
} |
1018 |
|
boolean canBlock = false; |
1019 |
|
boolean interrupted = false; |
1020 |
|
try { |
1022 |
|
if (w != null && w.qlock < 0) |
1023 |
|
cancelIgnoringExceptions(this); |
1024 |
|
else if (!canBlock) { |
1025 |
< |
if (p == null || p.tryCompensate()) |
1025 |
> |
if (p == null || p.tryCompensate(p.ctl)) |
1026 |
|
canBlock = true; |
1027 |
|
} |
1028 |
|
else { |
1098 |
|
wt.pool.helpQuiescePool(wt.workQueue); |
1099 |
|
} |
1100 |
|
else |
1101 |
< |
ForkJoinPool.externalHelpQuiescePool(); |
1101 |
> |
ForkJoinPool.quiesceCommonPool(); |
1102 |
|
} |
1103 |
|
|
1104 |
|
/** |
1163 |
|
Thread t; |
1164 |
|
return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? |
1165 |
|
((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) : |
1166 |
< |
ForkJoinPool.tryExternalUnpush(this)); |
1166 |
> |
ForkJoinPool.common.tryExternalUnpush(this)); |
1167 |
|
} |
1168 |
|
|
1169 |
|
/** |
1332 |
|
* |
1333 |
|
* @param e the expected tag value |
1334 |
|
* @param tag the new tag value |
1335 |
< |
* @return true if successful; i.e., the current value was |
1335 |
> |
* @return {@code true} if successful; i.e., the current value was |
1336 |
|
* equal to e and is now tag. |
1337 |
|
* @since 1.8 |
1338 |
|
*/ |
1347 |
|
} |
1348 |
|
|
1349 |
|
/** |
1350 |
< |
* Adaptor for Runnables. This implements RunnableFuture |
1350 |
> |
* Adapter for Runnables. This implements RunnableFuture |
1351 |
|
* to be compliant with AbstractExecutorService constraints |
1352 |
|
* when used in ForkJoinPool. |
1353 |
|
*/ |
1368 |
|
} |
1369 |
|
|
1370 |
|
/** |
1371 |
< |
* Adaptor for Runnables without results |
1371 |
> |
* Adapter for Runnables without results |
1372 |
|
*/ |
1373 |
|
static final class AdaptedRunnableAction extends ForkJoinTask<Void> |
1374 |
|
implements RunnableFuture<Void> { |
1385 |
|
} |
1386 |
|
|
1387 |
|
/** |
1388 |
< |
* Adaptor for Callables |
1388 |
> |
* Adapter for Runnables in which failure forces worker exception |
1389 |
> |
*/ |
1390 |
> |
static final class RunnableExecuteAction extends ForkJoinTask<Void> { |
1391 |
> |
final Runnable runnable; |
1392 |
> |
RunnableExecuteAction(Runnable runnable) { |
1393 |
> |
if (runnable == null) throw new NullPointerException(); |
1394 |
> |
this.runnable = runnable; |
1395 |
> |
} |
1396 |
> |
public final Void getRawResult() { return null; } |
1397 |
> |
public final void setRawResult(Void v) { } |
1398 |
> |
public final boolean exec() { runnable.run(); return true; } |
1399 |
> |
void internalPropagateException(Throwable ex) { |
1400 |
> |
rethrow(ex); // rethrow outside exec() catches. |
1401 |
> |
} |
1402 |
> |
private static final long serialVersionUID = 5232453952276885070L; |
1403 |
> |
} |
1404 |
> |
|
1405 |
> |
/** |
1406 |
> |
* Adapter for Callables |
1407 |
|
*/ |
1408 |
|
static final class AdaptedCallable<T> extends ForkJoinTask<T> |
1409 |
|
implements RunnableFuture<T> { |
1450 |
|
* |
1451 |
|
* @param runnable the runnable action |
1452 |
|
* @param result the result upon completion |
1453 |
+ |
* @param <T> the type of the result |
1454 |
|
* @return the task |
1455 |
|
*/ |
1456 |
|
public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) { |
1464 |
|
* encountered into {@code RuntimeException}. |
1465 |
|
* |
1466 |
|
* @param callable the callable action |
1467 |
+ |
* @param <T> the type of the callable's result |
1468 |
|
* @return the task |
1469 |
|
*/ |
1470 |
|
public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) { |
1478 |
|
/** |
1479 |
|
* Saves this task to a stream (that is, serializes it). |
1480 |
|
* |
1481 |
+ |
* @param s the stream |
1482 |
+ |
* @throws java.io.IOException if an I/O error occurs |
1483 |
|
* @serialData the current run status and the exception thrown |
1484 |
|
* during execution, or {@code null} if none |
1485 |
|
*/ |
1491 |
|
|
1492 |
|
/** |
1493 |
|
* Reconstitutes this task from a stream (that is, deserializes it). |
1494 |
+ |
* @param s the stream |
1495 |
+ |
* @throws ClassNotFoundException if the class of a serialized object |
1496 |
+ |
* could not be found |
1497 |
+ |
* @throws java.io.IOException if an I/O error occurs |
1498 |
|
*/ |
1499 |
|
private void readObject(java.io.ObjectInputStream s) |
1500 |
|
throws java.io.IOException, ClassNotFoundException { |
1532 |
|
private static sun.misc.Unsafe getUnsafe() { |
1533 |
|
try { |
1534 |
|
return sun.misc.Unsafe.getUnsafe(); |
1535 |
< |
} catch (SecurityException se) { |
1536 |
< |
try { |
1537 |
< |
return java.security.AccessController.doPrivileged |
1538 |
< |
(new java.security |
1539 |
< |
.PrivilegedExceptionAction<sun.misc.Unsafe>() { |
1540 |
< |
public sun.misc.Unsafe run() throws Exception { |
1541 |
< |
java.lang.reflect.Field f = sun.misc |
1542 |
< |
.Unsafe.class.getDeclaredField("theUnsafe"); |
1543 |
< |
f.setAccessible(true); |
1544 |
< |
return (sun.misc.Unsafe) f.get(null); |
1545 |
< |
}}); |
1546 |
< |
} catch (java.security.PrivilegedActionException e) { |
1547 |
< |
throw new RuntimeException("Could not initialize intrinsics", |
1548 |
< |
e.getCause()); |
1549 |
< |
} |
1535 |
> |
} catch (SecurityException tryReflectionInstead) {} |
1536 |
> |
try { |
1537 |
> |
return java.security.AccessController.doPrivileged |
1538 |
> |
(new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() { |
1539 |
> |
public sun.misc.Unsafe run() throws Exception { |
1540 |
> |
Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class; |
1541 |
> |
for (java.lang.reflect.Field f : k.getDeclaredFields()) { |
1542 |
> |
f.setAccessible(true); |
1543 |
> |
Object x = f.get(null); |
1544 |
> |
if (k.isInstance(x)) |
1545 |
> |
return k.cast(x); |
1546 |
> |
} |
1547 |
> |
throw new NoSuchFieldError("the Unsafe"); |
1548 |
> |
}}); |
1549 |
> |
} catch (java.security.PrivilegedActionException e) { |
1550 |
> |
throw new RuntimeException("Could not initialize intrinsics", |
1551 |
> |
e.getCause()); |
1552 |
|
} |
1553 |
|
} |
1554 |
|
} |