83 |
|
/* |
84 |
|
* Overview: |
85 |
|
* |
86 |
< |
* 1. Non-nullness of field result (set via CAS) indicates |
87 |
< |
* done. An AltResult is used to box null as a result, as well as |
88 |
< |
* to hold exceptions. Using a single field makes completion fast |
86 |
> |
* 1. Non-nullness of field result (set via CAS) indicates done. |
87 |
> |
* An AltResult is used to box null as a result, as well as to |
88 |
> |
* hold exceptions. Using a single field makes completion fast |
89 |
|
* and simple to detect and trigger, at the expense of a lot of |
90 |
|
* encoding and decoding that infiltrates many methods. One minor |
91 |
|
* simplification relies on the (static) NIL (to box null results) |
287 |
|
else if (q.thread != null && result == null) { |
288 |
|
try { |
289 |
|
ForkJoinPool.managedBlock(q); |
290 |
< |
} catch(InterruptedException ex){ |
290 |
> |
} catch (InterruptedException ex) { |
291 |
|
q.interruptControl = -1; |
292 |
|
} |
293 |
|
} |
320 |
|
if (nanos <= 0L) |
321 |
|
throw new TimeoutException(); |
322 |
|
long d = System.nanoTime() + nanos; |
323 |
< |
q = new WaitNode(true, nanos, d == 0L? 1L : d); // avoid 0 |
323 |
> |
q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0 |
324 |
|
} |
325 |
|
else if (!queued) |
326 |
|
queued = UNSAFE.compareAndSwapObject(this, WAITERS, |
338 |
|
else if (q.thread != null && result == null) { |
339 |
|
try { |
340 |
|
ForkJoinPool.managedBlock(q); |
341 |
< |
} catch(InterruptedException ex){ |
341 |
> |
} catch (InterruptedException ex) { |
342 |
|
q.interruptControl = -1; |
343 |
|
} |
344 |
|
} |
388 |
|
} |
389 |
|
|
390 |
|
/** Base class can act as either FJ or plain Runnable */ |
391 |
< |
static abstract class Async extends ForkJoinTask<Void> |
391 |
> |
abstract static class Async extends ForkJoinTask<Void> |
392 |
|
implements Runnable, AsynchronousCompletionTask { |
393 |
|
public final Void getRawResult() { return null; } |
394 |
|
public final void setRawResult(Void v) { } |
403 |
|
} |
404 |
|
public final boolean exec() { |
405 |
|
CompletableFuture<Void> d; Throwable ex; |
406 |
< |
if ((d = this.dst) != null) { |
406 |
> |
if ((d = this.dst) != null && d.result == null) { |
407 |
|
try { |
408 |
|
fn.run(); |
409 |
|
ex = null; |
425 |
|
} |
426 |
|
public final boolean exec() { |
427 |
|
CompletableFuture<U> d; U u; Throwable ex; |
428 |
< |
if ((d = this.dst) != null) { |
428 |
> |
if ((d = this.dst) != null && d.result == null) { |
429 |
|
try { |
430 |
|
u = fn.get(); |
431 |
|
ex = null; |
450 |
|
} |
451 |
|
public final boolean exec() { |
452 |
|
CompletableFuture<U> d; U u; Throwable ex; |
453 |
< |
if ((d = this.dst) != null) { |
453 |
> |
if ((d = this.dst) != null && d.result == null) { |
454 |
|
try { |
455 |
|
u = fn.apply(arg); |
456 |
|
ex = null; |
477 |
|
} |
478 |
|
public final boolean exec() { |
479 |
|
CompletableFuture<V> d; V v; Throwable ex; |
480 |
< |
if ((d = this.dst) != null) { |
480 |
> |
if ((d = this.dst) != null && d.result == null) { |
481 |
|
try { |
482 |
|
v = fn.apply(arg1, arg2); |
483 |
|
ex = null; |
502 |
|
} |
503 |
|
public final boolean exec() { |
504 |
|
CompletableFuture<Void> d; Throwable ex; |
505 |
< |
if ((d = this.dst) != null) { |
505 |
> |
if ((d = this.dst) != null && d.result == null) { |
506 |
|
try { |
507 |
|
fn.accept(arg); |
508 |
|
ex = null; |
528 |
|
} |
529 |
|
public final boolean exec() { |
530 |
|
CompletableFuture<Void> d; Throwable ex; |
531 |
< |
if ((d = this.dst) != null) { |
531 |
> |
if ((d = this.dst) != null && d.result == null) { |
532 |
|
try { |
533 |
|
fn.accept(arg1, arg2); |
534 |
|
ex = null; |
557 |
|
} |
558 |
|
|
559 |
|
// Opportunistically subclass AtomicInteger to use compareAndSet to claim. |
560 |
< |
static abstract class Completion extends AtomicInteger implements Runnable { |
560 |
> |
abstract static class Completion extends AtomicInteger implements Runnable { |
561 |
|
} |
562 |
|
|
563 |
|
static final class ApplyCompletion<T,U> extends Completion { |
1319 |
|
Object r; Throwable ex, cause; |
1320 |
|
if ((r = result) == null && (r = waitingGet(true)) == null) |
1321 |
|
throw new InterruptedException(); |
1322 |
< |
if (r instanceof AltResult) { |
1323 |
< |
if ((ex = ((AltResult)r).ex) != null) { |
1324 |
< |
if (ex instanceof CancellationException) |
1325 |
< |
throw (CancellationException)ex; |
1326 |
< |
if ((ex instanceof CompletionException) && |
1327 |
< |
(cause = ex.getCause()) != null) |
1328 |
< |
ex = cause; |
1329 |
< |
throw new ExecutionException(ex); |
1330 |
< |
} |
1331 |
< |
return null; |
1322 |
> |
if (!(r instanceof AltResult)) { |
1323 |
> |
@SuppressWarnings("unchecked") T tr = (T) r; |
1324 |
> |
return tr; |
1325 |
|
} |
1326 |
< |
@SuppressWarnings("unchecked") T tr = (T) r; |
1327 |
< |
return tr; |
1326 |
> |
if ((ex = ((AltResult)r).ex) == null) |
1327 |
> |
return null; |
1328 |
> |
if (ex instanceof CancellationException) |
1329 |
> |
throw (CancellationException)ex; |
1330 |
> |
if ((ex instanceof CompletionException) && |
1331 |
> |
(cause = ex.getCause()) != null) |
1332 |
> |
ex = cause; |
1333 |
> |
throw new ExecutionException(ex); |
1334 |
|
} |
1335 |
|
|
1336 |
|
/** |
1355 |
|
throw new InterruptedException(); |
1356 |
|
if ((r = result) == null) |
1357 |
|
r = timedAwaitDone(nanos); |
1358 |
< |
if (r instanceof AltResult) { |
1359 |
< |
if ((ex = ((AltResult)r).ex) != null) { |
1360 |
< |
if (ex instanceof CancellationException) |
1362 |
< |
throw (CancellationException)ex; |
1363 |
< |
if ((ex instanceof CompletionException) && |
1364 |
< |
(cause = ex.getCause()) != null) |
1365 |
< |
ex = cause; |
1366 |
< |
throw new ExecutionException(ex); |
1367 |
< |
} |
1368 |
< |
return null; |
1358 |
> |
if (!(r instanceof AltResult)) { |
1359 |
> |
@SuppressWarnings("unchecked") T tr = (T) r; |
1360 |
> |
return tr; |
1361 |
|
} |
1362 |
< |
@SuppressWarnings("unchecked") T tr = (T) r; |
1363 |
< |
return tr; |
1362 |
> |
if ((ex = ((AltResult)r).ex) == null) |
1363 |
> |
return null; |
1364 |
> |
if (ex instanceof CancellationException) |
1365 |
> |
throw (CancellationException)ex; |
1366 |
> |
if ((ex instanceof CompletionException) && |
1367 |
> |
(cause = ex.getCause()) != null) |
1368 |
> |
ex = cause; |
1369 |
> |
throw new ExecutionException(ex); |
1370 |
|
} |
1371 |
|
|
1372 |
|
/** |
1387 |
|
Object r; Throwable ex; |
1388 |
|
if ((r = result) == null) |
1389 |
|
r = waitingGet(false); |
1390 |
< |
if (r instanceof AltResult) { |
1391 |
< |
if ((ex = ((AltResult)r).ex) != null) { |
1392 |
< |
if (ex instanceof CancellationException) |
1395 |
< |
throw (CancellationException)ex; |
1396 |
< |
if (ex instanceof CompletionException) |
1397 |
< |
throw (CompletionException)ex; |
1398 |
< |
throw new CompletionException(ex); |
1399 |
< |
} |
1400 |
< |
return null; |
1390 |
> |
if (!(r instanceof AltResult)) { |
1391 |
> |
@SuppressWarnings("unchecked") T tr = (T) r; |
1392 |
> |
return tr; |
1393 |
|
} |
1394 |
< |
@SuppressWarnings("unchecked") T tr = (T) r; |
1395 |
< |
return tr; |
1394 |
> |
if ((ex = ((AltResult)r).ex) == null) |
1395 |
> |
return null; |
1396 |
> |
if (ex instanceof CancellationException) |
1397 |
> |
throw (CancellationException)ex; |
1398 |
> |
if (ex instanceof CompletionException) |
1399 |
> |
throw (CompletionException)ex; |
1400 |
> |
throw new CompletionException(ex); |
1401 |
|
} |
1402 |
|
|
1403 |
|
/** |
1414 |
|
Object r; Throwable ex; |
1415 |
|
if ((r = result) == null) |
1416 |
|
return valueIfAbsent; |
1417 |
< |
if (r instanceof AltResult) { |
1418 |
< |
if ((ex = ((AltResult)r).ex) != null) { |
1419 |
< |
if (ex instanceof CancellationException) |
1423 |
< |
throw (CancellationException)ex; |
1424 |
< |
if (ex instanceof CompletionException) |
1425 |
< |
throw (CompletionException)ex; |
1426 |
< |
throw new CompletionException(ex); |
1427 |
< |
} |
1428 |
< |
return null; |
1417 |
> |
if (!(r instanceof AltResult)) { |
1418 |
> |
@SuppressWarnings("unchecked") T tr = (T) r; |
1419 |
> |
return tr; |
1420 |
|
} |
1421 |
< |
@SuppressWarnings("unchecked") T tr = (T) r; |
1422 |
< |
return tr; |
1421 |
> |
if ((ex = ((AltResult)r).ex) == null) |
1422 |
> |
return null; |
1423 |
> |
if (ex instanceof CancellationException) |
1424 |
> |
throw (CancellationException)ex; |
1425 |
> |
if (ex instanceof CompletionException) |
1426 |
> |
throw (CompletionException)ex; |
1427 |
> |
throw new CompletionException(ex); |
1428 |
|
} |
1429 |
|
|
1430 |
|
/** |
2445 |
|
* then the returned CompletableFuture also does so, with a |
2446 |
|
* CompletionException holding this exception as its cause. |
2447 |
|
* |
2448 |
< |
* @param fn the function returning a new CompletableFuture. |
2448 |
> |
* @param fn the function returning a new CompletableFuture |
2449 |
|
* @return the CompletableFuture, that {@code isDone()} upon |
2450 |
|
* return if completed by the given function, or an exception |
2451 |
< |
* occurs. |
2451 |
> |
* occurs |
2452 |
|
*/ |
2453 |
|
public <U> CompletableFuture<U> thenCompose(Fun<? super T, |
2454 |
|
CompletableFuture<U>> fn) { |
2629 |
|
*/ |
2630 |
|
public boolean isCancelled() { |
2631 |
|
Object r; |
2632 |
< |
return ((r = result) != null && |
2633 |
< |
(r instanceof AltResult) && |
2638 |
< |
(((AltResult)r).ex instanceof CancellationException)); |
2632 |
> |
return ((r = result) instanceof AltResult) && |
2633 |
> |
(((AltResult)r).ex instanceof CancellationException); |
2634 |
|
} |
2635 |
|
|
2636 |
|
/** |
2637 |
|
* Forcibly sets or resets the value subsequently returned by |
2638 |
< |
* method get() and related methods, whether or not already |
2639 |
< |
* completed. This method is designed for use only in error |
2640 |
< |
* recovery actions, and even in such situations may result in |
2641 |
< |
* ongoing dependent completions using established versus |
2642 |
< |
* overwritten values. |
2638 |
> |
* method {@link #get()} and related methods, whether or not |
2639 |
> |
* already completed. This method is designed for use only in |
2640 |
> |
* error recovery actions, and even in such situations may result |
2641 |
> |
* in ongoing dependent completions using established versus |
2642 |
> |
* overwritten outcomes. |
2643 |
|
* |
2644 |
|
* @param value the completion value |
2645 |
|
*/ |
2648 |
|
postComplete(); |
2649 |
|
} |
2650 |
|
|
2651 |
+ |
/** |
2652 |
+ |
* Forcibly causes subsequent invocations of method {@link #get()} |
2653 |
+ |
* and related methods to throw the given exception, whether or |
2654 |
+ |
* not already completed. This method is designed for use only in |
2655 |
+ |
* recovery actions, and even in such situations may result in |
2656 |
+ |
* ongoing dependent completions using established versus |
2657 |
+ |
* overwritten outcomes. |
2658 |
+ |
* |
2659 |
+ |
* @param ex the exception |
2660 |
+ |
*/ |
2661 |
+ |
public void obtrudeException(Throwable ex) { |
2662 |
+ |
if (ex == null) throw new NullPointerException(); |
2663 |
+ |
result = new AltResult(ex); |
2664 |
+ |
postComplete(); |
2665 |
+ |
} |
2666 |
+ |
|
2667 |
|
// Unsafe mechanics |
2668 |
|
private static final sun.misc.Unsafe UNSAFE; |
2669 |
|
private static final long RESULT; |
2695 |
|
private static sun.misc.Unsafe getUnsafe() { |
2696 |
|
try { |
2697 |
|
return sun.misc.Unsafe.getUnsafe(); |
2698 |
< |
} catch (SecurityException se) { |
2699 |
< |
try { |
2700 |
< |
return java.security.AccessController.doPrivileged |
2701 |
< |
(new java.security |
2702 |
< |
.PrivilegedExceptionAction<sun.misc.Unsafe>() { |
2703 |
< |
public sun.misc.Unsafe run() throws Exception { |
2704 |
< |
java.lang.reflect.Field f = sun.misc |
2705 |
< |
.Unsafe.class.getDeclaredField("theUnsafe"); |
2706 |
< |
f.setAccessible(true); |
2707 |
< |
return (sun.misc.Unsafe) f.get(null); |
2708 |
< |
}}); |
2709 |
< |
} catch (java.security.PrivilegedActionException e) { |
2710 |
< |
throw new RuntimeException("Could not initialize intrinsics", |
2711 |
< |
e.getCause()); |
2712 |
< |
} |
2698 |
> |
} catch (SecurityException tryReflectionInstead) {} |
2699 |
> |
try { |
2700 |
> |
return java.security.AccessController.doPrivileged |
2701 |
> |
(new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() { |
2702 |
> |
public sun.misc.Unsafe run() throws Exception { |
2703 |
> |
Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class; |
2704 |
> |
for (java.lang.reflect.Field f : k.getDeclaredFields()) { |
2705 |
> |
f.setAccessible(true); |
2706 |
> |
Object x = f.get(null); |
2707 |
> |
if (k.isInstance(x)) |
2708 |
> |
return k.cast(x); |
2709 |
> |
} |
2710 |
> |
throw new NoSuchFieldError("the Unsafe"); |
2711 |
> |
}}); |
2712 |
> |
} catch (java.security.PrivilegedActionException e) { |
2713 |
> |
throw new RuntimeException("Could not initialize intrinsics", |
2714 |
> |
e.getCause()); |
2715 |
|
} |
2716 |
|
} |
2704 |
– |
|
2717 |
|
} |