109 |
|
* others -- see method postComplete). Because post-processing |
110 |
|
* may race with direct calls, class Completion opportunistically |
111 |
|
* extends AtomicInteger so callers can claim the action via |
112 |
< |
* compareAndSet(0, 1). The Completion.run methods are all |
112 |
> |
* compareAndSet(0, 1). The Completion.trigger methods are all |
113 |
|
* written a boringly similar uniform way (that sometimes includes |
114 |
|
* unnecessary-looking checks, kept to maintain uniformity). |
115 |
|
* There are enough dimensions upon which they differ that |
145 |
|
// Basic utilities for triggering and processing completions |
146 |
|
|
147 |
|
/** |
148 |
< |
* Removes and signals all waiting threads and runs all completions. |
148 |
> |
* Triggers completion with the encoding of the given arguments: |
149 |
> |
* if the exception is non-null, encodes it as a wrapped |
150 |
> |
* CompletionException unless it is one already. Otherwise uses |
151 |
> |
* the given result, boxed as NIL if null. |
152 |
|
*/ |
153 |
< |
final void postComplete() { |
153 |
> |
final void setInternalResult(T v, Throwable ex) { |
154 |
> |
if (result == null) |
155 |
> |
UNSAFE.compareAndSwapObject |
156 |
> |
(this, RESULT, null, |
157 |
> |
(ex == null) ? (v == null) ? NIL : v : |
158 |
> |
new AltResult((ex instanceof CompletionException) ? ex : |
159 |
> |
new CompletionException(ex))); |
160 |
> |
} |
161 |
> |
|
162 |
> |
/** |
163 |
> |
* Removes and signals all waiting threads |
164 |
> |
*/ |
165 |
> |
final void removeAndSignalWaiters() { |
166 |
|
WaitNode q; Thread t; |
167 |
|
while ((q = waiters) != null) { |
168 |
|
if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) && |
171 |
|
LockSupport.unpark(t); |
172 |
|
} |
173 |
|
} |
174 |
+ |
} |
175 |
|
|
176 |
< |
CompletionNode h; Completion c; |
177 |
< |
while ((h = completions) != null) { |
178 |
< |
if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) && |
179 |
< |
(c = h.completion) != null) |
180 |
< |
c.run(); |
176 |
> |
/** |
177 |
> |
* Triggers all enabled completions reachable from b. Loopifies |
178 |
> |
* the final recursive call for each stage to avoid potential |
179 |
> |
* StackOverflowErrors in cases of long linear chains. |
180 |
> |
* |
181 |
> |
* @param b if non-null, a completed CompletableFuture |
182 |
> |
*/ |
183 |
> |
static final void removeAndTriggerCompletions(CompletableFuture<?> b) { |
184 |
> |
CompletionNode h; Completion c; CompletableFuture<?> f; |
185 |
> |
while (b != null && (h = b.completions) != null) { |
186 |
> |
if (UNSAFE.compareAndSwapObject(b, COMPLETIONS, h, h.next) && |
187 |
> |
(c = h.completion) != null && |
188 |
> |
(f = c.trigger()) != null && |
189 |
> |
f.result != null) { |
190 |
> |
f.removeAndSignalWaiters(); |
191 |
> |
if (f.completions != null) { |
192 |
> |
if (b.completions == null) |
193 |
> |
b = f; // tail-recurse |
194 |
> |
else |
195 |
> |
removeAndTriggerCompletions(f); |
196 |
> |
} |
197 |
> |
} |
198 |
|
} |
199 |
|
} |
200 |
|
|
201 |
|
/** |
202 |
< |
* Triggers completion with the encoding of the given arguments: |
170 |
< |
* if the exception is non-null, encodes it as a wrapped |
171 |
< |
* CompletionException unless it is one already. Otherwise uses |
172 |
< |
* the given result, boxed as NIL if null. |
202 |
> |
* Sets result, signals waiters, and triggers dependents |
203 |
|
*/ |
204 |
|
final void internalComplete(T v, Throwable ex) { |
205 |
< |
if (result == null) |
206 |
< |
UNSAFE.compareAndSwapObject |
207 |
< |
(this, RESULT, null, |
178 |
< |
(ex == null) ? (v == null) ? NIL : v : |
179 |
< |
new AltResult((ex instanceof CompletionException) ? ex : |
180 |
< |
new CompletionException(ex))); |
181 |
< |
postComplete(); // help out even if not triggered |
205 |
> |
setInternalResult(v, ex); |
206 |
> |
removeAndSignalWaiters(); |
207 |
> |
removeAndTriggerCompletions(this); |
208 |
|
} |
209 |
|
|
210 |
+ |
|
211 |
|
/** |
212 |
< |
* If triggered, helps release and/or process completions. |
212 |
> |
* Signals waiters and triggers dependents. Call only if known to |
213 |
> |
* be completed. |
214 |
> |
*/ |
215 |
> |
final void postComplete() { |
216 |
> |
removeAndSignalWaiters(); |
217 |
> |
removeAndTriggerCompletions(this); |
218 |
> |
} |
219 |
> |
|
220 |
> |
/** |
221 |
> |
* If completed, helps signal waiters and trigger dependents |
222 |
|
*/ |
223 |
|
final void helpPostComplete() { |
224 |
< |
if (result != null) |
225 |
< |
postComplete(); |
224 |
> |
if (result != null) { |
225 |
> |
removeAndSignalWaiters(); |
226 |
> |
removeAndTriggerCompletions(this); |
227 |
> |
} |
228 |
|
} |
229 |
|
|
230 |
|
/* ------------- waiting for completions -------------- */ |
682 |
|
|
683 |
|
// Opportunistically subclass AtomicInteger to use compareAndSet to claim. |
684 |
|
@SuppressWarnings("serial") |
685 |
< |
abstract static class Completion extends AtomicInteger implements Runnable { |
685 |
> |
abstract static class Completion extends AtomicInteger { |
686 |
> |
/** |
687 |
> |
* Complete a dependent Completablefuture if enabled |
688 |
> |
* @return the dependent Completablefuture |
689 |
> |
*/ |
690 |
> |
public abstract CompletableFuture<?> trigger(); |
691 |
|
} |
692 |
|
|
693 |
|
static final class ThenApply<T,U> extends Completion { |
702 |
|
this.src = src; this.fn = fn; this.dst = dst; |
703 |
|
this.executor = executor; |
704 |
|
} |
705 |
< |
public final void run() { |
705 |
> |
public final CompletableFuture<?> trigger() { |
706 |
|
final CompletableFuture<? extends T> a; |
707 |
|
final Function<? super T,? extends U> fn; |
708 |
|
final CompletableFuture<U> dst; |
734 |
|
} |
735 |
|
} |
736 |
|
if (e == null || ex != null) |
737 |
< |
dst.internalComplete(u, ex); |
737 |
> |
dst.setInternalResult(u, ex); |
738 |
|
} |
739 |
+ |
return dst; |
740 |
|
} |
741 |
|
private static final long serialVersionUID = 5232453952276885070L; |
742 |
|
} |
753 |
|
this.src = src; this.fn = fn; this.dst = dst; |
754 |
|
this.executor = executor; |
755 |
|
} |
756 |
< |
public final void run() { |
756 |
> |
public final CompletableFuture<?> trigger() { |
757 |
|
final CompletableFuture<? extends T> a; |
758 |
|
final Consumer<? super T> fn; |
759 |
|
final CompletableFuture<?> dst; |
784 |
|
} |
785 |
|
} |
786 |
|
if (e == null || ex != null) |
787 |
< |
dst.internalComplete(null, ex); |
787 |
> |
dst.setInternalResult(null, ex); |
788 |
|
} |
789 |
+ |
return dst; |
790 |
|
} |
791 |
|
private static final long serialVersionUID = 5232453952276885070L; |
792 |
|
} |
803 |
|
this.src = src; this.fn = fn; this.dst = dst; |
804 |
|
this.executor = executor; |
805 |
|
} |
806 |
< |
public final void run() { |
806 |
> |
public final CompletableFuture<?> trigger() { |
807 |
|
final CompletableFuture<?> a; |
808 |
|
final Runnable fn; |
809 |
|
final CompletableFuture<Void> dst; |
829 |
|
} |
830 |
|
} |
831 |
|
if (e == null || ex != null) |
832 |
< |
dst.internalComplete(null, ex); |
832 |
> |
dst.setInternalResult(null, ex); |
833 |
|
} |
834 |
+ |
return dst; |
835 |
|
} |
836 |
|
private static final long serialVersionUID = 5232453952276885070L; |
837 |
|
} |
851 |
|
this.fn = fn; this.dst = dst; |
852 |
|
this.executor = executor; |
853 |
|
} |
854 |
< |
public final void run() { |
854 |
> |
public final CompletableFuture<?> trigger() { |
855 |
|
final CompletableFuture<? extends T> a; |
856 |
|
final CompletableFuture<? extends U> b; |
857 |
|
final BiFunction<? super T,? super U,? extends V> fn; |
896 |
|
} |
897 |
|
} |
898 |
|
if (e == null || ex != null) |
899 |
< |
dst.internalComplete(v, ex); |
899 |
> |
dst.setInternalResult(v, ex); |
900 |
|
} |
901 |
+ |
return dst; |
902 |
|
} |
903 |
|
private static final long serialVersionUID = 5232453952276885070L; |
904 |
|
} |
918 |
|
this.fn = fn; this.dst = dst; |
919 |
|
this.executor = executor; |
920 |
|
} |
921 |
< |
public final void run() { |
921 |
> |
public final CompletableFuture<?> trigger() { |
922 |
|
final CompletableFuture<? extends T> a; |
923 |
|
final CompletableFuture<? extends U> b; |
924 |
|
final BiConsumer<? super T,? super U> fn; |
962 |
|
} |
963 |
|
} |
964 |
|
if (e == null || ex != null) |
965 |
< |
dst.internalComplete(null, ex); |
965 |
> |
dst.setInternalResult(null, ex); |
966 |
|
} |
967 |
+ |
return dst; |
968 |
|
} |
969 |
|
private static final long serialVersionUID = 5232453952276885070L; |
970 |
|
} |
984 |
|
this.fn = fn; this.dst = dst; |
985 |
|
this.executor = executor; |
986 |
|
} |
987 |
< |
public final void run() { |
987 |
> |
public final CompletableFuture<?> trigger() { |
988 |
|
final CompletableFuture<?> a; |
989 |
|
final CompletableFuture<?> b; |
990 |
|
final Runnable fn; |
1015 |
|
} |
1016 |
|
} |
1017 |
|
if (e == null || ex != null) |
1018 |
< |
dst.internalComplete(null, ex); |
1018 |
> |
dst.setInternalResult(null, ex); |
1019 |
|
} |
1020 |
+ |
return dst; |
1021 |
|
} |
1022 |
|
private static final long serialVersionUID = 5232453952276885070L; |
1023 |
|
} |
1031 |
|
CompletableFuture<Void> dst) { |
1032 |
|
this.src = src; this.snd = snd; this.dst = dst; |
1033 |
|
} |
1034 |
< |
public final void run() { |
1034 |
> |
public final CompletableFuture<?> trigger() { |
1035 |
|
final CompletableFuture<?> a; |
1036 |
|
final CompletableFuture<?> b; |
1037 |
|
final CompletableFuture<Void> dst; |
1048 |
|
ex = null; |
1049 |
|
if (ex == null && (s instanceof AltResult)) |
1050 |
|
ex = ((AltResult)s).ex; |
1051 |
< |
dst.internalComplete(null, ex); |
1051 |
> |
dst.setInternalResult(null, ex); |
1052 |
|
} |
1053 |
+ |
return dst; |
1054 |
|
} |
1055 |
|
private static final long serialVersionUID = 5232453952276885070L; |
1056 |
|
} |
1070 |
|
this.fn = fn; this.dst = dst; |
1071 |
|
this.executor = executor; |
1072 |
|
} |
1073 |
< |
public final void run() { |
1073 |
> |
public final CompletableFuture<?> trigger() { |
1074 |
|
final CompletableFuture<? extends T> a; |
1075 |
|
final CompletableFuture<? extends T> b; |
1076 |
|
final Function<? super T,? extends U> fn; |
1103 |
|
} |
1104 |
|
} |
1105 |
|
if (e == null || ex != null) |
1106 |
< |
dst.internalComplete(u, ex); |
1106 |
> |
dst.setInternalResult(u, ex); |
1107 |
|
} |
1108 |
+ |
return dst; |
1109 |
|
} |
1110 |
|
private static final long serialVersionUID = 5232453952276885070L; |
1111 |
|
} |
1125 |
|
this.fn = fn; this.dst = dst; |
1126 |
|
this.executor = executor; |
1127 |
|
} |
1128 |
< |
public final void run() { |
1128 |
> |
public final CompletableFuture<?> trigger() { |
1129 |
|
final CompletableFuture<? extends T> a; |
1130 |
|
final CompletableFuture<? extends T> b; |
1131 |
|
final Consumer<? super T> fn; |
1157 |
|
} |
1158 |
|
} |
1159 |
|
if (e == null || ex != null) |
1160 |
< |
dst.internalComplete(null, ex); |
1160 |
> |
dst.setInternalResult(null, ex); |
1161 |
|
} |
1162 |
+ |
return dst; |
1163 |
|
} |
1164 |
|
private static final long serialVersionUID = 5232453952276885070L; |
1165 |
|
} |
1179 |
|
this.fn = fn; this.dst = dst; |
1180 |
|
this.executor = executor; |
1181 |
|
} |
1182 |
< |
public final void run() { |
1182 |
> |
public final CompletableFuture<?> trigger() { |
1183 |
|
final CompletableFuture<?> a; |
1184 |
|
final CompletableFuture<?> b; |
1185 |
|
final Runnable fn; |
1206 |
|
} |
1207 |
|
} |
1208 |
|
if (e == null || ex != null) |
1209 |
< |
dst.internalComplete(null, ex); |
1209 |
> |
dst.setInternalResult(null, ex); |
1210 |
|
} |
1211 |
+ |
return dst; |
1212 |
|
} |
1213 |
|
private static final long serialVersionUID = 5232453952276885070L; |
1214 |
|
} |
1222 |
|
CompletableFuture<Object> dst) { |
1223 |
|
this.src = src; this.snd = snd; this.dst = dst; |
1224 |
|
} |
1225 |
< |
public final void run() { |
1225 |
> |
public final CompletableFuture<?> trigger() { |
1226 |
|
final CompletableFuture<?> a; |
1227 |
|
final CompletableFuture<?> b; |
1228 |
|
final CompletableFuture<Object> dst; |
1239 |
|
ex = null; |
1240 |
|
t = r; |
1241 |
|
} |
1242 |
< |
dst.internalComplete(t, ex); |
1242 |
> |
dst.setInternalResult(t, ex); |
1243 |
|
} |
1244 |
+ |
return dst; |
1245 |
|
} |
1246 |
|
private static final long serialVersionUID = 5232453952276885070L; |
1247 |
|
} |
1255 |
|
CompletableFuture<T> dst) { |
1256 |
|
this.src = src; this.fn = fn; this.dst = dst; |
1257 |
|
} |
1258 |
< |
public final void run() { |
1258 |
> |
public final CompletableFuture<?> trigger() { |
1259 |
|
final CompletableFuture<? extends T> a; |
1260 |
|
final Function<? super Throwable, ? extends T> fn; |
1261 |
|
final CompletableFuture<T> dst; |
1277 |
|
@SuppressWarnings("unchecked") T tr = (T) r; |
1278 |
|
t = tr; |
1279 |
|
} |
1280 |
< |
dst.internalComplete(t, dx); |
1280 |
> |
dst.setInternalResult(t, dx); |
1281 |
|
} |
1282 |
+ |
return dst; |
1283 |
|
} |
1284 |
|
private static final long serialVersionUID = 5232453952276885070L; |
1285 |
|
} |
1296 |
|
this.src = src; this.fn = fn; this.dst = dst; |
1297 |
|
this.executor = executor; |
1298 |
|
} |
1299 |
< |
public final void run() { |
1299 |
> |
public final CompletableFuture<?> trigger() { |
1300 |
|
final CompletableFuture<? extends T> a; |
1301 |
|
final BiConsumer<? super T, ? super Throwable> fn; |
1302 |
|
final CompletableFuture<T> dst; |
1326 |
|
dx = rex; |
1327 |
|
} |
1328 |
|
if (e == null || dx != null) |
1329 |
< |
dst.internalComplete(t, ex != null ? ex : dx); |
1329 |
> |
dst.setInternalResult(t, ex != null ? ex : dx); |
1330 |
|
} |
1331 |
+ |
return dst; |
1332 |
|
} |
1333 |
|
private static final long serialVersionUID = 5232453952276885070L; |
1334 |
|
} |
1340 |
|
CompletableFuture<T> dst) { |
1341 |
|
this.src = src; this.dst = dst; |
1342 |
|
} |
1343 |
< |
public final void run() { |
1343 |
> |
public final CompletableFuture<?> trigger() { |
1344 |
|
final CompletableFuture<?> a; |
1345 |
|
final CompletableFuture<T> dst; |
1346 |
|
Object r; T t; Throwable ex; |
1357 |
|
@SuppressWarnings("unchecked") T tr = (T) r; |
1358 |
|
t = tr; |
1359 |
|
} |
1360 |
< |
dst.internalComplete(t, ex); |
1360 |
> |
dst.setInternalResult(t, ex); |
1361 |
|
} |
1362 |
+ |
return dst; |
1363 |
|
} |
1364 |
|
private static final long serialVersionUID = 5232453952276885070L; |
1365 |
|
} |
1372 |
|
CompletableFuture<Void> dst) { |
1373 |
|
this.src = src; this.dst = dst; |
1374 |
|
} |
1375 |
< |
public final void run() { |
1375 |
> |
public final CompletableFuture<?> trigger() { |
1376 |
|
final CompletableFuture<?> a; |
1377 |
|
final CompletableFuture<Void> dst; |
1378 |
|
Object r; Throwable ex; |
1384 |
|
ex = ((AltResult)r).ex; |
1385 |
|
else |
1386 |
|
ex = null; |
1387 |
< |
dst.internalComplete(null, ex); |
1387 |
> |
dst.setInternalResult(null, ex); |
1388 |
|
} |
1389 |
+ |
return dst; |
1390 |
|
} |
1391 |
|
private static final long serialVersionUID = 5232453952276885070L; |
1392 |
|
} |
1403 |
|
this.src = src; this.fn = fn; this.dst = dst; |
1404 |
|
this.executor = executor; |
1405 |
|
} |
1406 |
< |
public final void run() { |
1406 |
> |
public final CompletableFuture<?> trigger() { |
1407 |
|
final CompletableFuture<? extends T> a; |
1408 |
|
final BiFunction<? super T, Throwable, ? extends U> fn; |
1409 |
|
final CompletableFuture<U> dst; |
1434 |
|
dx = rex; |
1435 |
|
} |
1436 |
|
if (e == null || dx != null) |
1437 |
< |
dst.internalComplete(u, dx); |
1437 |
> |
dst.setInternalResult(u, dx); |
1438 |
|
} |
1439 |
+ |
return dst; |
1440 |
|
} |
1441 |
|
private static final long serialVersionUID = 5232453952276885070L; |
1442 |
|
} |
1453 |
|
this.src = src; this.fn = fn; this.dst = dst; |
1454 |
|
this.executor = executor; |
1455 |
|
} |
1456 |
< |
public final void run() { |
1456 |
> |
public final CompletableFuture<?> trigger() { |
1457 |
|
final CompletableFuture<? extends T> a; |
1458 |
|
final Function<? super T, ? extends CompletionStage<U>> fn; |
1459 |
|
final CompletableFuture<U> dst; |
1514 |
|
} |
1515 |
|
} |
1516 |
|
if (complete || ex != null) |
1517 |
< |
dst.internalComplete(u, ex); |
1517 |
> |
dst.setInternalResult(u, ex); |
1518 |
|
if (c != null) |
1519 |
|
c.helpPostComplete(); |
1520 |
|
} |
1521 |
+ |
return dst; |
1522 |
|
} |
1523 |
|
private static final long serialVersionUID = 5232453952276885070L; |
1524 |
|
} |