190 |
|
* methods in a way that flows well in javadocs. |
191 |
|
*/ |
192 |
|
|
193 |
< |
/* |
193 |
> |
/** |
194 |
|
* The status field holds run control status bits packed into a |
195 |
< |
* single int to minimize footprint and to ensure atomicity (via |
196 |
< |
* CAS). Status is initially zero, and takes on nonnegative |
197 |
< |
* values until completed, upon which status (anded with |
198 |
< |
* DONE_MASK) holds value NORMAL, CANCELLED, or EXCEPTIONAL. Tasks |
199 |
< |
* undergoing blocking waits by other threads have the SIGNAL bit |
200 |
< |
* set. Completion of a stolen task with SIGNAL set awakens any |
201 |
< |
* waiters via notifyAll. Even though suboptimal for some |
202 |
< |
* purposes, we use basic builtin wait/notify to take advantage of |
203 |
< |
* "monitor inflation" in JVMs that we would otherwise need to |
204 |
< |
* emulate to avoid adding further per-task bookkeeping overhead. |
205 |
< |
* We want these monitors to be "fat", i.e., not use biasing or |
206 |
< |
* thin-lock techniques, so use some odd coding idioms that tend |
207 |
< |
* to avoid them, mainly by arranging that every synchronized |
208 |
< |
* block performs a wait, notifyAll or both. |
195 |
> |
* single int to ensure atomicity. Status is initially zero, and |
196 |
> |
* takes on nonnegative values until completed, upon which it |
197 |
> |
* holds (negative bit) DONE, possibly with ABNORMAL (cancelled or |
198 |
> |
* exceptional) and THROWN (in which case an exception has been |
199 |
> |
* stored). Tasks undergoing blocking waits by other threads have |
200 |
> |
* the SIGNAL bit set. Completion of a task with SIGNAL set |
201 |
> |
* awakens any waiters via notifyAll. (Waiters also help signal |
202 |
> |
* others upon completion.) |
203 |
|
* |
204 |
|
* These control bits occupy only (some of) the upper half (16 |
205 |
|
* bits) of status field. The lower bits are used for user-defined |
206 |
|
* tags. |
207 |
|
*/ |
214 |
– |
|
215 |
– |
/** The run status of this task */ |
208 |
|
volatile int status; // accessed directly by pool and workers |
209 |
< |
static final int DONE_MASK = 0xf0000000; // mask out non-completion bits |
210 |
< |
static final int NORMAL = 0xf0000000; // must be negative |
211 |
< |
static final int CANCELLED = 0xc0000000; // must be < NORMAL |
212 |
< |
static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED |
213 |
< |
static final int SIGNAL = 0x00010000; // must be >= 1 << 16 |
214 |
< |
static final int SMASK = 0x0000ffff; // short bits for tags |
209 |
> |
|
210 |
> |
private static final int DONE = 1 << 31; // must be negative |
211 |
> |
private static final int ABNORMAL = 1 << 18; // set atomically with DONE |
212 |
> |
private static final int THROWN = 1 << 17; // set atomically with ABNORMAL |
213 |
> |
private static final int SIGNAL = 1 << 16; // true if joiner waiting |
214 |
> |
private static final int SMASK = 0xffff; // short bits for tags |
215 |
> |
|
216 |
> |
static boolean isExceptionalStatus(int s) { // needed by subclasses |
217 |
> |
return (s & THROWN) != 0; |
218 |
> |
} |
219 |
|
|
220 |
|
/** |
221 |
< |
* Marks completion and wakes up threads waiting to join this |
226 |
< |
* task. |
221 |
> |
* Sets DONE status and wakes up threads waiting to join this task. |
222 |
|
* |
223 |
< |
* @param completion one of NORMAL, CANCELLED, EXCEPTIONAL |
229 |
< |
* @return completion status on exit |
223 |
> |
* @return status on exit |
224 |
|
*/ |
225 |
< |
private int setCompletion(int completion) { |
226 |
< |
for (int s;;) { |
225 |
> |
private int setDone() { |
226 |
> |
int s; |
227 |
> |
if (((s = (int)STATUS.getAndBitwiseOr(this, DONE)) & SIGNAL) != 0) |
228 |
> |
synchronized (this) { notifyAll(); } |
229 |
> |
return s | DONE; |
230 |
> |
} |
231 |
> |
|
232 |
> |
/** |
233 |
> |
* Marks cancelled or exceptional completion unless already done. |
234 |
> |
* |
235 |
> |
* @param completion must be DONE | ABNORMAL, ORed with THROWN if exceptional |
236 |
> |
* @return status on exit |
237 |
> |
*/ |
238 |
> |
private int abnormalCompletion(int completion) { |
239 |
> |
for (int s, ns;;) { |
240 |
|
if ((s = status) < 0) |
241 |
|
return s; |
242 |
< |
if (STATUS.compareAndSet(this, s, s | completion)) { |
243 |
< |
if ((s >>> 16) != 0) |
242 |
> |
else if (STATUS.weakCompareAndSet(this, s, ns = s | completion)) { |
243 |
> |
if ((s & SIGNAL) != 0) |
244 |
|
synchronized (this) { notifyAll(); } |
245 |
< |
return completion; |
245 |
> |
return ns; |
246 |
|
} |
247 |
|
} |
248 |
|
} |
260 |
|
try { |
261 |
|
completed = exec(); |
262 |
|
} catch (Throwable rex) { |
263 |
< |
return setExceptionalCompletion(rex); |
263 |
> |
completed = false; |
264 |
> |
s = setExceptionalCompletion(rex); |
265 |
|
} |
266 |
|
if (completed) |
267 |
< |
s = setCompletion(NORMAL); |
267 |
> |
s = setDone(); |
268 |
|
} |
269 |
|
return s; |
270 |
|
} |
276 |
|
* @param timeout using Object.wait conventions. |
277 |
|
*/ |
278 |
|
final void internalWait(long timeout) { |
279 |
< |
int s; |
272 |
< |
if ((s = status) >= 0 && // force completer to issue notify |
273 |
< |
STATUS.compareAndSet(this, s, s | SIGNAL)) { |
279 |
> |
if ((int)STATUS.getAndBitwiseOr(this, SIGNAL) >= 0) { |
280 |
|
synchronized (this) { |
281 |
|
if (status >= 0) |
282 |
|
try { wait(timeout); } catch (InterruptedException ie) { } |
291 |
|
* @return status upon completion |
292 |
|
*/ |
293 |
|
private int externalAwaitDone() { |
294 |
< |
int s = ((this instanceof CountedCompleter) ? // try helping |
295 |
< |
ForkJoinPool.common.externalHelpComplete( |
290 |
< |
(CountedCompleter<?>)this, 0) : |
291 |
< |
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0); |
292 |
< |
if (s >= 0 && (s = status) >= 0) { |
294 |
> |
int s = tryExternalHelp(); |
295 |
> |
if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) { |
296 |
|
boolean interrupted = false; |
297 |
< |
do { |
298 |
< |
if (STATUS.compareAndSet(this, s, s | SIGNAL)) { |
299 |
< |
synchronized (this) { |
300 |
< |
if (status >= 0) { |
301 |
< |
try { |
302 |
< |
wait(0L); |
303 |
< |
} catch (InterruptedException ie) { |
301 |
< |
interrupted = true; |
302 |
< |
} |
297 |
> |
synchronized (this) { |
298 |
> |
for (;;) { |
299 |
> |
if ((s = status) >= 0) { |
300 |
> |
try { |
301 |
> |
wait(0L); |
302 |
> |
} catch (InterruptedException ie) { |
303 |
> |
interrupted = true; |
304 |
|
} |
305 |
< |
else |
306 |
< |
notifyAll(); |
305 |
> |
} |
306 |
> |
else { |
307 |
> |
notifyAll(); |
308 |
> |
break; |
309 |
|
} |
310 |
|
} |
311 |
< |
} while ((s = status) >= 0); |
311 |
> |
} |
312 |
|
if (interrupted) |
313 |
|
Thread.currentThread().interrupt(); |
314 |
|
} |
319 |
|
* Blocks a non-worker-thread until completion or interruption. |
320 |
|
*/ |
321 |
|
private int externalInterruptibleAwaitDone() throws InterruptedException { |
322 |
< |
int s; |
323 |
< |
if (Thread.interrupted()) |
324 |
< |
throw new InterruptedException(); |
325 |
< |
if ((s = status) >= 0 && |
326 |
< |
(s = ((this instanceof CountedCompleter) ? |
327 |
< |
ForkJoinPool.common.externalHelpComplete( |
328 |
< |
(CountedCompleter<?>)this, 0) : |
329 |
< |
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : |
330 |
< |
0)) >= 0) { |
328 |
< |
while ((s = status) >= 0) { |
329 |
< |
if (STATUS.compareAndSet(this, s, s | SIGNAL)) { |
330 |
< |
synchronized (this) { |
331 |
< |
if (status >= 0) |
332 |
< |
wait(0L); |
333 |
< |
else |
334 |
< |
notifyAll(); |
322 |
> |
int s = tryExternalHelp(); |
323 |
> |
if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) { |
324 |
> |
synchronized (this) { |
325 |
> |
for (;;) { |
326 |
> |
if ((s = status) >= 0) |
327 |
> |
wait(0L); |
328 |
> |
else { |
329 |
> |
notifyAll(); |
330 |
> |
break; |
331 |
|
} |
332 |
|
} |
333 |
|
} |
334 |
|
} |
335 |
+ |
else if (Thread.interrupted()) |
336 |
+ |
throw new InterruptedException(); |
337 |
|
return s; |
338 |
|
} |
339 |
|
|
340 |
|
/** |
341 |
+ |
* Tries to help with tasks allowed for external callers. |
342 |
+ |
* |
343 |
+ |
* @return current status |
344 |
+ |
*/ |
345 |
+ |
private int tryExternalHelp() { |
346 |
+ |
int s; |
347 |
+ |
return ((s = status) < 0 ? s: |
348 |
+ |
(this instanceof CountedCompleter) ? |
349 |
+ |
ForkJoinPool.common.externalHelpComplete( |
350 |
+ |
(CountedCompleter<?>)this, 0) : |
351 |
+ |
ForkJoinPool.common.tryExternalUnpush(this) ? |
352 |
+ |
doExec() : 0); |
353 |
+ |
} |
354 |
+ |
|
355 |
+ |
/** |
356 |
|
* Implementation for join, get, quietlyJoin. Directly handles |
357 |
|
* only cases of already-completed, external wait, and |
358 |
|
* unfork+exec. Others are relayed to ForkJoinPool.awaitJoin. |
459 |
|
} finally { |
460 |
|
lock.unlock(); |
461 |
|
} |
462 |
< |
s = setCompletion(EXCEPTIONAL); |
462 |
> |
s = abnormalCompletion(DONE | ABNORMAL | THROWN); |
463 |
|
} |
464 |
|
return s; |
465 |
|
} |
471 |
|
*/ |
472 |
|
private int setExceptionalCompletion(Throwable ex) { |
473 |
|
int s = recordExceptionalCompletion(ex); |
474 |
< |
if ((s & DONE_MASK) == EXCEPTIONAL) |
474 |
> |
if ((s & THROWN) != 0) |
475 |
|
internalPropagateException(ex); |
476 |
|
return s; |
477 |
|
} |
646 |
|
* Throws exception, if any, associated with the given status. |
647 |
|
*/ |
648 |
|
private void reportException(int s) { |
649 |
< |
if (s == CANCELLED) |
650 |
< |
throw new CancellationException(); |
638 |
< |
if (s == EXCEPTIONAL) |
639 |
< |
rethrow(getThrowableException()); |
649 |
> |
rethrow((s & THROWN) != 0 ? getThrowableException() : |
650 |
> |
new CancellationException()); |
651 |
|
} |
652 |
|
|
653 |
|
// public methods |
689 |
|
*/ |
690 |
|
public final V join() { |
691 |
|
int s; |
692 |
< |
if ((s = doJoin() & DONE_MASK) != NORMAL) |
692 |
> |
if (((s = doJoin()) & ABNORMAL) != 0) |
693 |
|
reportException(s); |
694 |
|
return getRawResult(); |
695 |
|
} |
704 |
|
*/ |
705 |
|
public final V invoke() { |
706 |
|
int s; |
707 |
< |
if ((s = doInvoke() & DONE_MASK) != NORMAL) |
707 |
> |
if (((s = doInvoke()) & ABNORMAL) != 0) |
708 |
|
reportException(s); |
709 |
|
return getRawResult(); |
710 |
|
} |
729 |
|
public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) { |
730 |
|
int s1, s2; |
731 |
|
t2.fork(); |
732 |
< |
if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL) |
732 |
> |
if (((s1 = t1.doInvoke()) & ABNORMAL) != 0) |
733 |
|
t1.reportException(s1); |
734 |
< |
if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL) |
734 |
> |
if (((s2 = t2.doJoin()) & ABNORMAL) != 0) |
735 |
|
t2.reportException(s2); |
736 |
|
} |
737 |
|
|
761 |
|
} |
762 |
|
else if (i != 0) |
763 |
|
t.fork(); |
764 |
< |
else if (t.doInvoke() < NORMAL && ex == null) |
764 |
> |
else if ((t.doInvoke() & ABNORMAL) != 0 && ex == null) |
765 |
|
ex = t.getException(); |
766 |
|
} |
767 |
|
for (int i = 1; i <= last; ++i) { |
769 |
|
if (t != null) { |
770 |
|
if (ex != null) |
771 |
|
t.cancel(false); |
772 |
< |
else if (t.doJoin() < NORMAL) |
772 |
> |
else if ((t.doJoin() & ABNORMAL) != 0) |
773 |
|
ex = t.getException(); |
774 |
|
} |
775 |
|
} |
813 |
|
} |
814 |
|
else if (i != 0) |
815 |
|
t.fork(); |
816 |
< |
else if (t.doInvoke() < NORMAL && ex == null) |
816 |
> |
else if ((t.doInvoke() & ABNORMAL) != 0 && ex == null) |
817 |
|
ex = t.getException(); |
818 |
|
} |
819 |
|
for (int i = 1; i <= last; ++i) { |
821 |
|
if (t != null) { |
822 |
|
if (ex != null) |
823 |
|
t.cancel(false); |
824 |
< |
else if (t.doJoin() < NORMAL) |
824 |
> |
else if ((t.doJoin() & ABNORMAL) != 0) |
825 |
|
ex = t.getException(); |
826 |
|
} |
827 |
|
} |
858 |
|
* @return {@code true} if this task is now cancelled |
859 |
|
*/ |
860 |
|
public boolean cancel(boolean mayInterruptIfRunning) { |
861 |
< |
return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED; |
861 |
> |
int s = abnormalCompletion(DONE | ABNORMAL); |
862 |
> |
return (s & (ABNORMAL | THROWN)) == ABNORMAL; |
863 |
|
} |
864 |
|
|
865 |
|
public final boolean isDone() { |
867 |
|
} |
868 |
|
|
869 |
|
public final boolean isCancelled() { |
870 |
< |
return (status & DONE_MASK) == CANCELLED; |
870 |
> |
return (status & (ABNORMAL | THROWN)) == ABNORMAL; |
871 |
|
} |
872 |
|
|
873 |
|
/** |
876 |
|
* @return {@code true} if this task threw an exception or was cancelled |
877 |
|
*/ |
878 |
|
public final boolean isCompletedAbnormally() { |
879 |
< |
return status < NORMAL; |
879 |
> |
return (status & ABNORMAL) != 0; |
880 |
|
} |
881 |
|
|
882 |
|
/** |
887 |
|
* exception and was not cancelled |
888 |
|
*/ |
889 |
|
public final boolean isCompletedNormally() { |
890 |
< |
return (status & DONE_MASK) == NORMAL; |
890 |
> |
return (status & (DONE | ABNORMAL)) == DONE; |
891 |
|
} |
892 |
|
|
893 |
|
/** |
898 |
|
* @return the exception, or {@code null} if none |
899 |
|
*/ |
900 |
|
public final Throwable getException() { |
901 |
< |
int s = status & DONE_MASK; |
902 |
< |
return ((s >= NORMAL) ? null : |
903 |
< |
(s == CANCELLED) ? new CancellationException() : |
901 |
> |
int s = status; |
902 |
> |
return ((s & ABNORMAL) == 0 ? null : |
903 |
> |
(s & THROWN) == 0 ? new CancellationException() : |
904 |
|
getThrowableException()); |
905 |
|
} |
906 |
|
|
944 |
|
setExceptionalCompletion(rex); |
945 |
|
return; |
946 |
|
} |
947 |
< |
setCompletion(NORMAL); |
947 |
> |
setDone(); |
948 |
|
} |
949 |
|
|
950 |
|
/** |
956 |
|
* @since 1.8 |
957 |
|
*/ |
958 |
|
public final void quietlyComplete() { |
959 |
< |
setCompletion(NORMAL); |
959 |
> |
setDone(); |
960 |
|
} |
961 |
|
|
962 |
|
/** |
973 |
|
public final V get() throws InterruptedException, ExecutionException { |
974 |
|
int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ? |
975 |
|
doJoin() : externalInterruptibleAwaitDone(); |
976 |
< |
if ((s &= DONE_MASK) == CANCELLED) |
965 |
< |
throw new CancellationException(); |
966 |
< |
if (s == EXCEPTIONAL) |
976 |
> |
if ((s & THROWN) != 0) |
977 |
|
throw new ExecutionException(getThrowableException()); |
978 |
< |
return getRawResult(); |
978 |
> |
else if ((s & ABNORMAL) != 0) |
979 |
> |
throw new CancellationException(); |
980 |
> |
else |
981 |
> |
return getRawResult(); |
982 |
|
} |
983 |
|
|
984 |
|
/** |
1018 |
|
while ((s = status) >= 0 && |
1019 |
|
(ns = deadline - System.nanoTime()) > 0L) { |
1020 |
|
if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L && |
1021 |
< |
STATUS.compareAndSet(this, s, s | SIGNAL)) { |
1021 |
> |
(s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) { |
1022 |
|
synchronized (this) { |
1023 |
|
if (status >= 0) |
1024 |
|
wait(ms); // OK to throw InterruptedException |
1030 |
|
} |
1031 |
|
} |
1032 |
|
if (s >= 0) |
1033 |
< |
s = status; |
1034 |
< |
if ((s &= DONE_MASK) != NORMAL) { |
1022 |
< |
if (s == CANCELLED) |
1023 |
< |
throw new CancellationException(); |
1024 |
< |
if (s != EXCEPTIONAL) |
1025 |
< |
throw new TimeoutException(); |
1033 |
> |
throw new TimeoutException(); |
1034 |
> |
else if ((s & THROWN) != 0) |
1035 |
|
throw new ExecutionException(getThrowableException()); |
1036 |
< |
} |
1037 |
< |
return getRawResult(); |
1036 |
> |
else if ((s & ABNORMAL) != 0) |
1037 |
> |
throw new CancellationException(); |
1038 |
> |
else |
1039 |
> |
return getRawResult(); |
1040 |
|
} |
1041 |
|
|
1042 |
|
/** |
1092 |
|
* setRawResult(null)}. |
1093 |
|
*/ |
1094 |
|
public void reinitialize() { |
1095 |
< |
if ((status & DONE_MASK) == EXCEPTIONAL) |
1095 |
> |
if ((status & THROWN) != 0) |
1096 |
|
clearExceptionalCompletion(); |
1097 |
|
else |
1098 |
|
status = 0; |
1309 |
|
*/ |
1310 |
|
public final short setForkJoinTaskTag(short newValue) { |
1311 |
|
for (int s;;) { |
1312 |
< |
if (STATUS.compareAndSet(this, s = status, |
1313 |
< |
(s & ~SMASK) | (newValue & SMASK))) |
1312 |
> |
if (STATUS.weakCompareAndSet(this, s = status, |
1313 |
> |
(s & ~SMASK) | (newValue & SMASK))) |
1314 |
|
return (short)s; |
1315 |
|
} |
1316 |
|
} |
1333 |
|
for (int s;;) { |
1334 |
|
if ((short)(s = status) != expect) |
1335 |
|
return false; |
1336 |
< |
if (STATUS.compareAndSet(this, s, |
1337 |
< |
(s & ~SMASK) | (update & SMASK))) |
1336 |
> |
if (STATUS.weakCompareAndSet(this, s, |
1337 |
> |
(s & ~SMASK) | (update & SMASK))) |
1338 |
|
return true; |
1339 |
|
} |
1340 |
|
} |