6 |
|
|
7 |
|
package jsr166y; |
8 |
|
|
9 |
– |
import java.util.concurrent.*; |
10 |
– |
|
9 |
|
import java.io.Serializable; |
10 |
|
import java.util.Collection; |
11 |
|
import java.util.Collections; |
13 |
|
import java.util.RandomAccess; |
14 |
|
import java.util.Map; |
15 |
|
import java.util.WeakHashMap; |
16 |
+ |
import java.util.concurrent.Callable; |
17 |
+ |
import java.util.concurrent.CancellationException; |
18 |
+ |
import java.util.concurrent.ExecutionException; |
19 |
+ |
import java.util.concurrent.Executor; |
20 |
+ |
import java.util.concurrent.ExecutorService; |
21 |
+ |
import java.util.concurrent.Future; |
22 |
+ |
import java.util.concurrent.RejectedExecutionException; |
23 |
+ |
import java.util.concurrent.RunnableFuture; |
24 |
+ |
import java.util.concurrent.TimeUnit; |
25 |
+ |
import java.util.concurrent.TimeoutException; |
26 |
|
|
27 |
|
/** |
28 |
|
* Abstract base class for tasks that run within a {@link ForkJoinPool}. |
247 |
|
* |
248 |
|
* @return status on exit |
249 |
|
*/ |
250 |
< |
final int internalAwaitDone(long millis) { |
250 |
> |
final int internalAwaitDone(long millis, int nanos) { |
251 |
|
int s; |
252 |
|
if ((s = status) >= 0) { |
253 |
|
try { |
254 |
|
synchronized (this) { |
255 |
|
if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL)) |
256 |
< |
wait(millis, 0); |
256 |
> |
wait(millis, nanos); |
257 |
|
} |
258 |
|
} catch (InterruptedException ie) { |
259 |
|
cancelIfTerminating(); |
270 |
|
int s; |
271 |
|
while ((s = status) >= 0) { |
272 |
|
synchronized (this) { |
273 |
< |
if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)){ |
273 |
> |
if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) { |
274 |
|
boolean interrupted = false; |
275 |
|
while (status >= 0) { |
276 |
|
try { |
651 |
|
} |
652 |
|
|
653 |
|
/** |
654 |
< |
* @throws CancellationException {@inheritDoc} |
654 |
> |
* Waits if necessary for the computation to complete, and then |
655 |
> |
* retrieves its result. |
656 |
> |
* |
657 |
> |
* @return the computed result |
658 |
> |
* @throws CancellationException if the computation was cancelled |
659 |
> |
* @throws ExecutionException if the computation threw an |
660 |
> |
* exception |
661 |
> |
* @throws InterruptedException if the current thread is not a |
662 |
> |
* member of a ForkJoinPool and was interrupted while waiting |
663 |
|
*/ |
664 |
|
public final V get() throws InterruptedException, ExecutionException { |
665 |
|
int s; |
670 |
|
else { |
671 |
|
while ((s = status) >= 0) { |
672 |
|
synchronized (this) { // interruptible form of awaitDone |
673 |
< |
if (UNSAFE.compareAndSwapInt(this, statusOffset, |
673 |
> |
if (UNSAFE.compareAndSwapInt(this, statusOffset, |
674 |
|
s, SIGNAL)) { |
675 |
|
while (status >= 0) |
676 |
|
wait(); |
689 |
|
} |
690 |
|
|
691 |
|
/** |
692 |
< |
* @throws CancellationException {@inheritDoc} |
692 |
> |
* Waits if necessary for at most the given time for the computation |
693 |
> |
* to complete, and then retrieves its result, if available. |
694 |
> |
* |
695 |
> |
* @param timeout the maximum time to wait |
696 |
> |
* @param unit the time unit of the timeout argument |
697 |
> |
* @return the computed result |
698 |
> |
* @throws CancellationException if the computation was cancelled |
699 |
> |
* @throws ExecutionException if the computation threw an |
700 |
> |
* exception |
701 |
> |
* @throws InterruptedException if the current thread is not a |
702 |
> |
* member of a ForkJoinPool and was interrupted while waiting |
703 |
> |
* @throws TimeoutException if the wait timed out |
704 |
|
*/ |
705 |
|
public final V get(long timeout, TimeUnit unit) |
706 |
|
throws InterruptedException, ExecutionException, TimeoutException { |
680 |
– |
Thread t = Thread.currentThread(); |
681 |
– |
ForkJoinPool pool; |
682 |
– |
if (t instanceof ForkJoinWorkerThread) { |
683 |
– |
ForkJoinWorkerThread w = (ForkJoinWorkerThread) t; |
684 |
– |
if (status >= 0 && w.unpushTask(this)) |
685 |
– |
quietlyExec(); |
686 |
– |
pool = w.pool; |
687 |
– |
} |
688 |
– |
else |
689 |
– |
pool = null; |
690 |
– |
/* |
691 |
– |
* Timed wait loop intermixes cases for FJ (pool != null) and |
692 |
– |
* non FJ threads. For FJ, decrement pool count but don't try |
693 |
– |
* for replacement; increment count on completion. For non-FJ, |
694 |
– |
* deal with interrupts. This is messy, but a little less so |
695 |
– |
* than is splitting the FJ and nonFJ cases. |
696 |
– |
*/ |
697 |
– |
boolean interrupted = false; |
698 |
– |
boolean dec = false; // true if pool count decremented |
707 |
|
long nanos = unit.toNanos(timeout); |
708 |
< |
for (;;) { |
709 |
< |
if (pool == null && Thread.interrupted()) { |
710 |
< |
interrupted = true; |
711 |
< |
break; |
708 |
> |
if (status >= 0) { |
709 |
> |
Thread t = Thread.currentThread(); |
710 |
> |
if (t instanceof ForkJoinWorkerThread) { |
711 |
> |
ForkJoinWorkerThread w = (ForkJoinWorkerThread) t; |
712 |
> |
boolean completed = false; // timed variant of quietlyJoin |
713 |
> |
if (w.unpushTask(this)) { |
714 |
> |
try { |
715 |
> |
completed = exec(); |
716 |
> |
} catch (Throwable rex) { |
717 |
> |
setExceptionalCompletion(rex); |
718 |
> |
} |
719 |
> |
} |
720 |
> |
if (completed) |
721 |
> |
setCompletion(NORMAL); |
722 |
> |
else if (status >= 0) |
723 |
> |
w.joinTask(this, true, nanos); |
724 |
|
} |
725 |
< |
int s = status; |
726 |
< |
if (s < 0) |
727 |
< |
break; |
708 |
< |
if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) { |
725 |
> |
else if (Thread.interrupted()) |
726 |
> |
throw new InterruptedException(); |
727 |
> |
else { |
728 |
|
long startTime = System.nanoTime(); |
729 |
< |
long nt; // wait time |
730 |
< |
while (status >= 0 && |
729 |
> |
int s; long nt; |
730 |
> |
while ((s = status) >= 0 && |
731 |
|
(nt = nanos - (System.nanoTime() - startTime)) > 0) { |
732 |
< |
if (pool != null && !dec) |
733 |
< |
dec = pool.tryDecrementRunningCount(); |
715 |
< |
else { |
732 |
> |
if (UNSAFE.compareAndSwapInt(this, statusOffset, s, |
733 |
> |
SIGNAL)) { |
734 |
|
long ms = nt / 1000000; |
735 |
|
int ns = (int) (nt % 1000000); |
736 |
< |
try { |
737 |
< |
synchronized (this) { |
738 |
< |
if (status >= 0) |
721 |
< |
wait(ms, ns); |
722 |
< |
} |
723 |
< |
} catch (InterruptedException ie) { |
724 |
< |
if (pool != null) |
725 |
< |
cancelIfTerminating(); |
726 |
< |
else { |
727 |
< |
interrupted = true; |
728 |
< |
break; |
729 |
< |
} |
736 |
> |
synchronized (this) { |
737 |
> |
if (status >= 0) |
738 |
> |
wait(ms, ns); // exit on IE throw |
739 |
|
} |
740 |
|
} |
741 |
|
} |
733 |
– |
break; |
742 |
|
} |
743 |
|
} |
736 |
– |
if (pool != null && dec) |
737 |
– |
pool.incrementRunningCount(); |
738 |
– |
if (interrupted) |
739 |
– |
throw new InterruptedException(); |
744 |
|
int es = status; |
745 |
|
if (es != NORMAL) { |
746 |
|
Throwable ex; |
777 |
|
return; |
778 |
|
} |
779 |
|
} |
780 |
< |
w.joinTask(this); |
780 |
> |
w.joinTask(this, false, 0L); |
781 |
|
} |
782 |
|
} |
783 |
|
else |
832 |
|
* under any other usage conditions are not guaranteed. |
833 |
|
* This method may be useful when executing |
834 |
|
* pre-constructed trees of subtasks in loops. |
835 |
+ |
* |
836 |
+ |
* <p>Upon completion of this method, {@code isDone()} reports |
837 |
+ |
* {@code false}, and {@code getException()} reports {@code |
838 |
+ |
* null}. However, the value returned by {@code getRawResult} is |
839 |
+ |
* unaffected. To clear this value, you can invoke {@code |
840 |
+ |
* setRawResult(null)}. |
841 |
|
*/ |
842 |
|
public void reinitialize() { |
843 |
|
if (status == EXCEPTIONAL) |