268 |
|
} |
269 |
|
|
270 |
|
/** |
271 |
< |
* Tries to set SIGNAL status unless already completed. Used by |
272 |
< |
* ForkJoinPool. Other variants are directly incorporated into |
273 |
< |
* externalAwaitDone etc. |
274 |
< |
* |
275 |
< |
* @return true if successful |
276 |
< |
*/ |
277 |
< |
final boolean trySetSignal() { |
278 |
< |
int s = status; |
279 |
< |
return s >= 0 && U.compareAndSwapInt(this, STATUS, s, s | SIGNAL); |
271 |
> |
* If not done, sets SIGNAL status and performs Object.wait(timeout). |
272 |
> |
* This task may or may not be done on exit. Ignores interrupts. |
273 |
> |
* |
274 |
> |
* @param timeout using Object.wait conventions. |
275 |
> |
*/ |
276 |
> |
final void internalWait(long timeout) { |
277 |
> |
int s; |
278 |
> |
if ((s = status) >= 0 && // force completer to issue notify |
279 |
> |
U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { |
280 |
> |
synchronized (this) { |
281 |
> |
if (status >= 0) |
282 |
> |
try { wait(timeout); } catch (InterruptedException ie) { } |
283 |
> |
else |
284 |
> |
notifyAll(); |
285 |
> |
} |
286 |
> |
} |
287 |
|
} |
288 |
|
|
289 |
|
/** |
291 |
|
* @return status upon completion |
292 |
|
*/ |
293 |
|
private int externalAwaitDone() { |
294 |
< |
int s; |
295 |
< |
ForkJoinPool cp = ForkJoinPool.common; |
296 |
< |
if ((s = status) >= 0) { |
297 |
< |
if (cp != null) { |
298 |
< |
if (this instanceof CountedCompleter) |
299 |
< |
s = cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE); |
300 |
< |
else if (cp.tryExternalUnpush(this)) |
301 |
< |
s = doExec(); |
302 |
< |
} |
303 |
< |
if (s >= 0 && (s = status) >= 0) { |
304 |
< |
boolean interrupted = false; |
305 |
< |
do { |
306 |
< |
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { |
307 |
< |
synchronized (this) { |
301 |
< |
if (status >= 0) { |
302 |
< |
try { |
303 |
< |
wait(); |
304 |
< |
} catch (InterruptedException ie) { |
305 |
< |
interrupted = true; |
306 |
< |
} |
294 |
> |
int s = ((this instanceof CountedCompleter) ? // try helping |
295 |
> |
ForkJoinPool.common.externalHelpComplete( |
296 |
> |
(CountedCompleter<?>)this, 0) : |
297 |
> |
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0); |
298 |
> |
if (s >= 0 && (s = status) >= 0) { |
299 |
> |
boolean interrupted = false; |
300 |
> |
do { |
301 |
> |
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { |
302 |
> |
synchronized (this) { |
303 |
> |
if (status >= 0) { |
304 |
> |
try { |
305 |
> |
wait(0L); |
306 |
> |
} catch (InterruptedException ie) { |
307 |
> |
interrupted = true; |
308 |
|
} |
308 |
– |
else |
309 |
– |
notifyAll(); |
309 |
|
} |
310 |
+ |
else |
311 |
+ |
notifyAll(); |
312 |
|
} |
313 |
< |
} while ((s = status) >= 0); |
314 |
< |
if (interrupted) |
315 |
< |
Thread.currentThread().interrupt(); |
316 |
< |
} |
313 |
> |
} |
314 |
> |
} while ((s = status) >= 0); |
315 |
> |
if (interrupted) |
316 |
> |
Thread.currentThread().interrupt(); |
317 |
|
} |
318 |
|
return s; |
319 |
|
} |
323 |
|
*/ |
324 |
|
private int externalInterruptibleAwaitDone() throws InterruptedException { |
325 |
|
int s; |
325 |
– |
ForkJoinPool cp = ForkJoinPool.common; |
326 |
|
if (Thread.interrupted()) |
327 |
|
throw new InterruptedException(); |
328 |
< |
if ((s = status) >= 0 && cp != null) { |
329 |
< |
if (this instanceof CountedCompleter) |
330 |
< |
cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE); |
331 |
< |
else if (cp.tryExternalUnpush(this)) |
332 |
< |
doExec(); |
333 |
< |
} |
334 |
< |
while ((s = status) >= 0) { |
335 |
< |
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { |
336 |
< |
synchronized (this) { |
337 |
< |
if (status >= 0) |
338 |
< |
wait(); |
339 |
< |
else |
340 |
< |
notifyAll(); |
328 |
> |
if ((s = status) >= 0 && |
329 |
> |
(s = ((this instanceof CountedCompleter) ? |
330 |
> |
ForkJoinPool.common.externalHelpComplete( |
331 |
> |
(CountedCompleter<?>)this, 0) : |
332 |
> |
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : |
333 |
> |
0)) >= 0) { |
334 |
> |
while ((s = status) >= 0) { |
335 |
> |
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { |
336 |
> |
synchronized (this) { |
337 |
> |
if (status >= 0) |
338 |
> |
wait(0L); |
339 |
> |
else |
340 |
> |
notifyAll(); |
341 |
> |
} |
342 |
|
} |
343 |
|
} |
344 |
|
} |
358 |
|
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? |
359 |
|
(w = (wt = (ForkJoinWorkerThread)t).workQueue). |
360 |
|
tryUnpush(this) && (s = doExec()) < 0 ? s : |
361 |
< |
wt.pool.awaitJoin(w, this) : |
361 |
> |
wt.pool.awaitJoin(w, this, 0L) : |
362 |
|
externalAwaitDone(); |
363 |
|
} |
364 |
|
|
371 |
|
int s; Thread t; ForkJoinWorkerThread wt; |
372 |
|
return (s = doExec()) < 0 ? s : |
373 |
|
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? |
374 |
< |
(wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, this) : |
374 |
> |
(wt = (ForkJoinWorkerThread)t).pool. |
375 |
> |
awaitJoin(wt.workQueue, this, 0L) : |
376 |
|
externalAwaitDone(); |
377 |
|
} |
378 |
|
|
990 |
|
*/ |
991 |
|
public final V get(long timeout, TimeUnit unit) |
992 |
|
throws InterruptedException, ExecutionException, TimeoutException { |
993 |
+ |
int s; |
994 |
+ |
long nanos = unit.toNanos(timeout); |
995 |
|
if (Thread.interrupted()) |
996 |
|
throw new InterruptedException(); |
997 |
< |
// Messy in part because we measure in nanosecs, but wait in millisecs |
998 |
< |
int s; long ms; |
999 |
< |
long ns = unit.toNanos(timeout); |
996 |
< |
ForkJoinPool cp; |
997 |
< |
if ((s = status) >= 0 && ns > 0L) { |
998 |
< |
long deadline = System.nanoTime() + ns; |
999 |
< |
ForkJoinPool p = null; |
1000 |
< |
ForkJoinPool.WorkQueue w = null; |
997 |
> |
if ((s = status) >= 0 && nanos > 0L) { |
998 |
> |
long d = System.nanoTime() + nanos; |
999 |
> |
long deadline = (d == 0L)? 1L : d; // avoid 0 |
1000 |
|
Thread t = Thread.currentThread(); |
1001 |
|
if (t instanceof ForkJoinWorkerThread) { |
1002 |
|
ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; |
1003 |
< |
p = wt.pool; |
1005 |
< |
w = wt.workQueue; |
1006 |
< |
p.helpJoinOnce(w, this); // no retries on failure |
1007 |
< |
} |
1008 |
< |
else if ((cp = ForkJoinPool.common) != null) { |
1009 |
< |
if (this instanceof CountedCompleter) |
1010 |
< |
cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE); |
1011 |
< |
else if (cp.tryExternalUnpush(this)) |
1012 |
< |
doExec(); |
1003 |
> |
s = wt.pool.awaitJoin(wt.workQueue, this, deadline); |
1004 |
|
} |
1005 |
< |
boolean canBlock = false; |
1006 |
< |
boolean interrupted = false; |
1007 |
< |
try { |
1008 |
< |
while ((s = status) >= 0) { |
1009 |
< |
if (w != null && w.qlock < 0) |
1010 |
< |
cancelIgnoringExceptions(this); |
1011 |
< |
else if (!canBlock) { |
1012 |
< |
if (p == null || p.tryCompensate(p.ctl)) |
1013 |
< |
canBlock = true; |
1014 |
< |
} |
1015 |
< |
else { |
1016 |
< |
if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L && |
1017 |
< |
U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { |
1018 |
< |
synchronized (this) { |
1019 |
< |
if (status >= 0) { |
1029 |
< |
try { |
1030 |
< |
wait(ms); |
1031 |
< |
} catch (InterruptedException ie) { |
1032 |
< |
if (p == null) |
1033 |
< |
interrupted = true; |
1034 |
< |
} |
1035 |
< |
} |
1036 |
< |
else |
1037 |
< |
notifyAll(); |
1038 |
< |
} |
1005 |
> |
else if ((s = ((this instanceof CountedCompleter) ? |
1006 |
> |
ForkJoinPool.common.externalHelpComplete( |
1007 |
> |
(CountedCompleter<?>)this, 0) : |
1008 |
> |
ForkJoinPool.common.tryExternalUnpush(this) ? |
1009 |
> |
doExec() : 0)) >= 0) { |
1010 |
> |
long ns, ms; // measure in nanosecs, but wait in millisecs |
1011 |
> |
while ((s = status) >= 0 && |
1012 |
> |
(ns = deadline - System.nanoTime()) > 0L) { |
1013 |
> |
if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L && |
1014 |
> |
U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { |
1015 |
> |
synchronized (this) { |
1016 |
> |
if (status >= 0) |
1017 |
> |
wait(ms); // OK to throw InterruptedException |
1018 |
> |
else |
1019 |
> |
notifyAll(); |
1020 |
|
} |
1040 |
– |
if ((s = status) < 0 || interrupted || |
1041 |
– |
(ns = deadline - System.nanoTime()) <= 0L) |
1042 |
– |
break; |
1021 |
|
} |
1022 |
|
} |
1045 |
– |
} finally { |
1046 |
– |
if (p != null && canBlock) |
1047 |
– |
p.incrementActiveCount(); |
1023 |
|
} |
1049 |
– |
if (interrupted) |
1050 |
– |
throw new InterruptedException(); |
1024 |
|
} |
1025 |
+ |
if (s >= 0) |
1026 |
+ |
s = status; |
1027 |
|
if ((s &= DONE_MASK) != NORMAL) { |
1028 |
|
Throwable ex; |
1029 |
|
if (s == CANCELLED) |