134 |
|
* (DAG). Otherwise, executions may encounter a form of deadlock as |
135 |
|
* tasks cyclically wait for each other. However, this framework |
136 |
|
* supports other methods and techniques (for example the use of |
137 |
< |
* {@link java.util.concurrent.Phaser}, {@link #helpQuiesce}, and |
138 |
< |
* {@link #complete}) that |
137 |
> |
* {@link java.util.concurrent.Phaser Phaser}, {@link #helpQuiesce}, and {@link #complete}) that |
138 |
|
* may be of use in constructing custom subclasses for problems that |
139 |
< |
* are not statically structured as DAGs. To support such usages a |
139 |
> |
* are not statically structured as DAGs. To support such usages, a |
140 |
|
* ForkJoinTask may be atomically <em>tagged</em> with a {@code short} |
141 |
|
* value using {@link #setForkJoinTaskTag} or {@link |
142 |
|
* #compareAndSetForkJoinTaskTag} and checked using {@link |
285 |
|
*/ |
286 |
|
private int externalAwaitDone() { |
287 |
|
int s; |
288 |
< |
ForkJoinPool.externalHelpJoin(this); |
289 |
< |
boolean interrupted = false; |
290 |
< |
while ((s = status) >= 0) { |
291 |
< |
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { |
292 |
< |
synchronized (this) { |
293 |
< |
if (status >= 0) { |
294 |
< |
try { |
295 |
< |
wait(); |
296 |
< |
} catch (InterruptedException ie) { |
297 |
< |
interrupted = true; |
288 |
> |
ForkJoinPool cp = ForkJoinPool.common; |
289 |
> |
if ((s = status) >= 0) { |
290 |
> |
if (cp != null) { |
291 |
> |
if (this instanceof CountedCompleter) |
292 |
> |
s = cp.externalHelpComplete((CountedCompleter<?>)this); |
293 |
> |
else if (cp.tryExternalUnpush(this)) |
294 |
> |
s = doExec(); |
295 |
> |
} |
296 |
> |
if (s >= 0 && (s = status) >= 0) { |
297 |
> |
boolean interrupted = false; |
298 |
> |
do { |
299 |
> |
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { |
300 |
> |
synchronized (this) { |
301 |
> |
if (status >= 0) { |
302 |
> |
try { |
303 |
> |
wait(); |
304 |
> |
} catch (InterruptedException ie) { |
305 |
> |
interrupted = true; |
306 |
> |
} |
307 |
> |
} |
308 |
> |
else |
309 |
> |
notifyAll(); |
310 |
|
} |
311 |
|
} |
312 |
< |
else |
313 |
< |
notifyAll(); |
314 |
< |
} |
312 |
> |
} while ((s = status) >= 0); |
313 |
> |
if (interrupted) |
314 |
> |
Thread.currentThread().interrupt(); |
315 |
|
} |
316 |
|
} |
306 |
– |
if (interrupted) |
307 |
– |
Thread.currentThread().interrupt(); |
317 |
|
return s; |
318 |
|
} |
319 |
|
|
322 |
|
*/ |
323 |
|
private int externalInterruptibleAwaitDone() throws InterruptedException { |
324 |
|
int s; |
325 |
+ |
ForkJoinPool cp = ForkJoinPool.common; |
326 |
|
if (Thread.interrupted()) |
327 |
|
throw new InterruptedException(); |
328 |
< |
ForkJoinPool.externalHelpJoin(this); |
328 |
> |
if ((s = status) >= 0 && cp != null) { |
329 |
> |
if (this instanceof CountedCompleter) |
330 |
> |
cp.externalHelpComplete((CountedCompleter<?>)this); |
331 |
> |
else if (cp.tryExternalUnpush(this)) |
332 |
> |
doExec(); |
333 |
> |
} |
334 |
|
while ((s = status) >= 0) { |
335 |
|
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { |
336 |
|
synchronized (this) { |
411 |
|
final Throwable ex; |
412 |
|
ExceptionNode next; |
413 |
|
final long thrower; // use id not ref to avoid weak cycles |
414 |
+ |
final int hashCode; // store task hashCode before weak ref disappears |
415 |
|
ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) { |
416 |
|
super(task, exceptionTableRefQueue); |
417 |
|
this.ex = ex; |
418 |
|
this.next = next; |
419 |
|
this.thrower = Thread.currentThread().getId(); |
420 |
+ |
this.hashCode = System.identityHashCode(task); |
421 |
|
} |
422 |
|
} |
423 |
|
|
576 |
|
/** |
577 |
|
* Poll stale refs and remove them. Call only while holding lock. |
578 |
|
*/ |
579 |
+ |
/** |
580 |
+ |
* Poll stale refs and remove them. Call only while holding lock. |
581 |
+ |
*/ |
582 |
|
private static void expungeStaleExceptions() { |
583 |
|
for (Object x; (x = exceptionTableRefQueue.poll()) != null;) { |
584 |
|
if (x instanceof ExceptionNode) { |
585 |
< |
ForkJoinTask<?> key = ((ExceptionNode)x).get(); |
585 |
> |
int hashCode = ((ExceptionNode)x).hashCode; |
586 |
|
ExceptionNode[] t = exceptionTable; |
587 |
< |
int i = System.identityHashCode(key) & (t.length - 1); |
587 |
> |
int i = hashCode & (t.length - 1); |
588 |
|
ExceptionNode e = t[i]; |
589 |
|
ExceptionNode pred = null; |
590 |
|
while (e != null) { |
621 |
|
/** |
622 |
|
* A version of "sneaky throw" to relay exceptions |
623 |
|
*/ |
624 |
< |
static void rethrow(final Throwable ex) { |
625 |
< |
if (ex != null) { |
606 |
< |
if (ex instanceof Error) |
607 |
< |
throw (Error)ex; |
608 |
< |
if (ex instanceof RuntimeException) |
609 |
< |
throw (RuntimeException)ex; |
624 |
> |
static void rethrow(Throwable ex) { |
625 |
> |
if (ex != null) |
626 |
|
ForkJoinTask.<RuntimeException>uncheckedThrow(ex); |
611 |
– |
} |
627 |
|
} |
628 |
|
|
629 |
|
/** |
633 |
|
*/ |
634 |
|
@SuppressWarnings("unchecked") static <T extends Throwable> |
635 |
|
void uncheckedThrow(Throwable t) throws T { |
636 |
< |
if (t != null) |
622 |
< |
throw (T)t; // rely on vacuous cast |
636 |
> |
throw (T)t; // rely on vacuous cast |
637 |
|
} |
638 |
|
|
639 |
|
/** |
787 |
|
* unprocessed. |
788 |
|
* |
789 |
|
* @param tasks the collection of tasks |
790 |
+ |
* @param <T> the type of the values returned from the tasks |
791 |
|
* @return the tasks argument, to simplify usage |
792 |
|
* @throws NullPointerException if tasks or any element are null |
793 |
|
*/ |
845 |
|
* <p>This method is designed to be invoked by <em>other</em> |
846 |
|
* tasks. To terminate the current task, you can just return or |
847 |
|
* throw an unchecked exception from its computation method, or |
848 |
< |
* invoke {@link #completeExceptionally}. |
848 |
> |
* invoke {@link #completeExceptionally(Throwable)}. |
849 |
|
* |
850 |
|
* @param mayInterruptIfRunning this value has no effect in the |
851 |
|
* default implementation because interrupts are not used to |
997 |
|
// Messy in part because we measure in nanosecs, but wait in millisecs |
998 |
|
int s; long ms; |
999 |
|
long ns = unit.toNanos(timeout); |
1000 |
+ |
ForkJoinPool cp; |
1001 |
|
if ((s = status) >= 0 && ns > 0L) { |
1002 |
|
long deadline = System.nanoTime() + ns; |
1003 |
|
ForkJoinPool p = null; |
1009 |
|
w = wt.workQueue; |
1010 |
|
p.helpJoinOnce(w, this); // no retries on failure |
1011 |
|
} |
1012 |
< |
else |
1013 |
< |
ForkJoinPool.externalHelpJoin(this); |
1012 |
> |
else if ((cp = ForkJoinPool.common) != null) { |
1013 |
> |
if (this instanceof CountedCompleter) |
1014 |
> |
cp.externalHelpComplete((CountedCompleter<?>)this); |
1015 |
> |
else if (cp.tryExternalUnpush(this)) |
1016 |
> |
doExec(); |
1017 |
> |
} |
1018 |
|
boolean canBlock = false; |
1019 |
|
boolean interrupted = false; |
1020 |
|
try { |
1022 |
|
if (w != null && w.qlock < 0) |
1023 |
|
cancelIgnoringExceptions(this); |
1024 |
|
else if (!canBlock) { |
1025 |
< |
if (p == null || p.tryCompensate()) |
1025 |
> |
if (p == null || p.tryCompensate(p.ctl)) |
1026 |
|
canBlock = true; |
1027 |
|
} |
1028 |
|
else { |
1086 |
|
|
1087 |
|
/** |
1088 |
|
* Possibly executes tasks until the pool hosting the current task |
1089 |
< |
* {@link ForkJoinPool#isQuiescent is quiescent}. This method may |
1090 |
< |
* be of use in designs in which many tasks are forked, but none |
1091 |
< |
* are explicitly joined, instead executing them until all are |
1092 |
< |
* processed. |
1089 |
> |
* {@linkplain ForkJoinPool#isQuiescent is quiescent}. This |
1090 |
> |
* method may be of use in designs in which many tasks are forked, |
1091 |
> |
* but none are explicitly joined, instead executing them until |
1092 |
> |
* all are processed. |
1093 |
|
*/ |
1094 |
|
public static void helpQuiesce() { |
1095 |
|
Thread t; |
1163 |
|
Thread t; |
1164 |
|
return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? |
1165 |
|
((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) : |
1166 |
< |
ForkJoinPool.tryExternalUnpush(this)); |
1166 |
> |
ForkJoinPool.common.tryExternalUnpush(this)); |
1167 |
|
} |
1168 |
|
|
1169 |
|
/** |
1332 |
|
* |
1333 |
|
* @param e the expected tag value |
1334 |
|
* @param tag the new tag value |
1335 |
< |
* @return true if successful; i.e., the current value was |
1335 |
> |
* @return {@code true} if successful; i.e., the current value was |
1336 |
|
* equal to e and is now tag. |
1337 |
|
* @since 1.8 |
1338 |
|
*/ |
1347 |
|
} |
1348 |
|
|
1349 |
|
/** |
1350 |
< |
* Adaptor for Runnables. This implements RunnableFuture |
1350 |
> |
* Adapter for Runnables. This implements RunnableFuture |
1351 |
|
* to be compliant with AbstractExecutorService constraints |
1352 |
|
* when used in ForkJoinPool. |
1353 |
|
*/ |
1368 |
|
} |
1369 |
|
|
1370 |
|
/** |
1371 |
< |
* Adaptor for Runnables without results |
1371 |
> |
* Adapter for Runnables without results |
1372 |
|
*/ |
1373 |
|
static final class AdaptedRunnableAction extends ForkJoinTask<Void> |
1374 |
|
implements RunnableFuture<Void> { |
1385 |
|
} |
1386 |
|
|
1387 |
|
/** |
1388 |
< |
* Adaptor for Callables |
1388 |
> |
* Adapter for Runnables in which failure forces worker exception |
1389 |
> |
*/ |
1390 |
> |
static final class RunnableExecuteAction extends ForkJoinTask<Void> { |
1391 |
> |
final Runnable runnable; |
1392 |
> |
RunnableExecuteAction(Runnable runnable) { |
1393 |
> |
if (runnable == null) throw new NullPointerException(); |
1394 |
> |
this.runnable = runnable; |
1395 |
> |
} |
1396 |
> |
public final Void getRawResult() { return null; } |
1397 |
> |
public final void setRawResult(Void v) { } |
1398 |
> |
public final boolean exec() { runnable.run(); return true; } |
1399 |
> |
void internalPropagateException(Throwable ex) { |
1400 |
> |
rethrow(ex); // rethrow outside exec() catches. |
1401 |
> |
} |
1402 |
> |
private static final long serialVersionUID = 5232453952276885070L; |
1403 |
> |
} |
1404 |
> |
|
1405 |
> |
/** |
1406 |
> |
* Adapter for Callables |
1407 |
|
*/ |
1408 |
|
static final class AdaptedCallable<T> extends ForkJoinTask<T> |
1409 |
|
implements RunnableFuture<T> { |
1450 |
|
* |
1451 |
|
* @param runnable the runnable action |
1452 |
|
* @param result the result upon completion |
1453 |
+ |
* @param <T> the type of the result |
1454 |
|
* @return the task |
1455 |
|
*/ |
1456 |
|
public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) { |
1464 |
|
* encountered into {@code RuntimeException}. |
1465 |
|
* |
1466 |
|
* @param callable the callable action |
1467 |
+ |
* @param <T> the type of the callable's result |
1468 |
|
* @return the task |
1469 |
|
*/ |
1470 |
|
public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) { |
1478 |
|
/** |
1479 |
|
* Saves this task to a stream (that is, serializes it). |
1480 |
|
* |
1481 |
+ |
* @param s the stream |
1482 |
+ |
* @throws java.io.IOException if an I/O error occurs |
1483 |
|
* @serialData the current run status and the exception thrown |
1484 |
|
* during execution, or {@code null} if none |
1485 |
|
*/ |
1491 |
|
|
1492 |
|
/** |
1493 |
|
* Reconstitutes this task from a stream (that is, deserializes it). |
1494 |
+ |
* @param s the stream |
1495 |
+ |
* @throws ClassNotFoundException if the class of a serialized object |
1496 |
+ |
* could not be found |
1497 |
+ |
* @throws java.io.IOException if an I/O error occurs |
1498 |
|
*/ |
1499 |
|
private void readObject(java.io.ObjectInputStream s) |
1500 |
|
throws java.io.IOException, ClassNotFoundException { |