6 |
|
|
7 |
|
package jsr166y; |
8 |
|
|
9 |
– |
import java.util.concurrent.*; |
9 |
|
import java.io.Serializable; |
10 |
|
import java.util.Collection; |
11 |
|
import java.util.Collections; |
13 |
|
import java.util.RandomAccess; |
14 |
|
import java.util.Map; |
15 |
|
import java.util.WeakHashMap; |
16 |
+ |
import java.util.concurrent.Callable; |
17 |
+ |
import java.util.concurrent.CancellationException; |
18 |
+ |
import java.util.concurrent.ExecutionException; |
19 |
+ |
import java.util.concurrent.Executor; |
20 |
+ |
import java.util.concurrent.ExecutorService; |
21 |
+ |
import java.util.concurrent.Future; |
22 |
+ |
import java.util.concurrent.RejectedExecutionException; |
23 |
+ |
import java.util.concurrent.RunnableFuture; |
24 |
+ |
import java.util.concurrent.TimeUnit; |
25 |
+ |
import java.util.concurrent.TimeoutException; |
26 |
|
|
27 |
|
/** |
28 |
|
* Abstract base class for tasks that run within a {@link ForkJoinPool}. |
247 |
|
* |
248 |
|
* @return status on exit |
249 |
|
*/ |
250 |
< |
final int internalAwaitDone(long millis) { |
250 |
> |
final int internalAwaitDone(long millis, int nanos) { |
251 |
|
int s; |
252 |
|
if ((s = status) >= 0) { |
253 |
|
try { |
254 |
|
synchronized (this) { |
255 |
|
if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL)) |
256 |
< |
wait(millis, 0); |
256 |
> |
wait(millis, nanos); |
257 |
|
} |
258 |
|
} catch (InterruptedException ie) { |
259 |
|
cancelIfTerminating(); |
270 |
|
int s; |
271 |
|
while ((s = status) >= 0) { |
272 |
|
synchronized (this) { |
273 |
< |
if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)){ |
273 |
> |
if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) { |
274 |
|
boolean interrupted = false; |
275 |
|
while (status >= 0) { |
276 |
|
try { |
704 |
|
*/ |
705 |
|
public final V get(long timeout, TimeUnit unit) |
706 |
|
throws InterruptedException, ExecutionException, TimeoutException { |
698 |
– |
Thread t = Thread.currentThread(); |
699 |
– |
ForkJoinPool pool; |
700 |
– |
if (t instanceof ForkJoinWorkerThread) { |
701 |
– |
ForkJoinWorkerThread w = (ForkJoinWorkerThread) t; |
702 |
– |
if (status >= 0 && w.unpushTask(this)) |
703 |
– |
quietlyExec(); |
704 |
– |
pool = w.pool; |
705 |
– |
} |
706 |
– |
else |
707 |
– |
pool = null; |
708 |
– |
/* |
709 |
– |
* Timed wait loop intermixes cases for FJ (pool != null) and |
710 |
– |
* non FJ threads. For FJ, decrement pool count but don't try |
711 |
– |
* for replacement; increment count on completion. For non-FJ, |
712 |
– |
* deal with interrupts. This is messy, but a little less so |
713 |
– |
* than is splitting the FJ and nonFJ cases. |
714 |
– |
*/ |
715 |
– |
boolean interrupted = false; |
716 |
– |
boolean dec = false; // true if pool count decremented |
707 |
|
long nanos = unit.toNanos(timeout); |
708 |
< |
for (;;) { |
709 |
< |
if (pool == null && Thread.interrupted()) { |
710 |
< |
interrupted = true; |
711 |
< |
break; |
708 |
> |
if (status >= 0) { |
709 |
> |
Thread t = Thread.currentThread(); |
710 |
> |
if (t instanceof ForkJoinWorkerThread) { |
711 |
> |
ForkJoinWorkerThread w = (ForkJoinWorkerThread) t; |
712 |
> |
boolean completed = false; // timed variant of quietlyJoin |
713 |
> |
if (w.unpushTask(this)) { |
714 |
> |
try { |
715 |
> |
completed = exec(); |
716 |
> |
} catch (Throwable rex) { |
717 |
> |
setExceptionalCompletion(rex); |
718 |
> |
} |
719 |
> |
} |
720 |
> |
if (completed) |
721 |
> |
setCompletion(NORMAL); |
722 |
> |
else if (status >= 0) |
723 |
> |
w.joinTask(this, true, nanos); |
724 |
|
} |
725 |
< |
int s = status; |
726 |
< |
if (s < 0) |
727 |
< |
break; |
726 |
< |
if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) { |
725 |
> |
else if (Thread.interrupted()) |
726 |
> |
throw new InterruptedException(); |
727 |
> |
else { |
728 |
|
long startTime = System.nanoTime(); |
729 |
< |
long nt; // wait time |
730 |
< |
while (status >= 0 && |
729 |
> |
int s; long nt; |
730 |
> |
while ((s = status) >= 0 && |
731 |
|
(nt = nanos - (System.nanoTime() - startTime)) > 0) { |
732 |
< |
if (pool != null && !dec) |
733 |
< |
dec = pool.tryDecrementRunningCount(); |
733 |
< |
else { |
732 |
> |
if (UNSAFE.compareAndSwapInt(this, statusOffset, s, |
733 |
> |
SIGNAL)) { |
734 |
|
long ms = nt / 1000000; |
735 |
|
int ns = (int) (nt % 1000000); |
736 |
< |
try { |
737 |
< |
synchronized (this) { |
738 |
< |
if (status >= 0) |
739 |
< |
wait(ms, ns); |
740 |
< |
} |
741 |
< |
} catch (InterruptedException ie) { |
742 |
< |
if (pool != null) |
743 |
< |
cancelIfTerminating(); |
744 |
< |
else { |
745 |
< |
interrupted = true; |
746 |
< |
break; |
747 |
< |
} |
736 |
> |
synchronized (this) { |
737 |
> |
if (status >= 0) |
738 |
> |
wait(ms, ns); // exit on IE throw |
739 |
|
} |
740 |
|
} |
741 |
|
} |
751 |
– |
break; |
742 |
|
} |
743 |
|
} |
754 |
– |
if (pool != null && dec) |
755 |
– |
pool.incrementRunningCount(); |
756 |
– |
if (interrupted) |
757 |
– |
throw new InterruptedException(); |
744 |
|
int es = status; |
745 |
|
if (es != NORMAL) { |
746 |
|
Throwable ex; |
777 |
|
return; |
778 |
|
} |
779 |
|
} |
780 |
< |
w.joinTask(this); |
780 |
> |
w.joinTask(this, false, 0L); |
781 |
|
} |
782 |
|
} |
783 |
|
else |