8 |
|
import java.util.concurrent.atomic.*; |
9 |
|
|
10 |
|
/** |
11 |
< |
* Tries to demonstrate a leaked interrupt. |
11 |
> |
* Tries to demonstrate a leaked interrupt from FutureTask.cancel(true). |
12 |
|
*/ |
13 |
|
public class FutureTaskCancelLoops { |
14 |
< |
static final AtomicLong count = new AtomicLong(0); |
14 |
> |
static long millisElapsedSince(long startTimeNanos) { |
15 |
> |
return (System.nanoTime() - startTimeNanos)/(1000L*1000L); |
16 |
> |
} |
17 |
|
|
18 |
< |
static volatile Future<?> cancelMe = null; |
18 |
> |
public static void main(String[] args) throws Exception { |
19 |
> |
|
20 |
> |
long startTime = System.nanoTime(); |
21 |
|
|
22 |
< |
static volatile boolean leakedInterrupt = false; |
22 |
> |
final BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>(10000); |
23 |
|
|
24 |
< |
static class InterruptMeTask extends FutureTask<Void> { |
25 |
< |
static class InterruptMe implements Runnable { |
26 |
< |
volatile Future<?> myFuture; |
27 |
< |
|
28 |
< |
public void run() { |
29 |
< |
assert myFuture != null; |
30 |
< |
if (cancelMe != null) { |
31 |
< |
// We're likely to get the interrupt meant for previous task. |
32 |
< |
// Clear interrupts first to prove *we* got interrupted. |
33 |
< |
Thread.interrupted(); |
34 |
< |
while (cancelMe != null && !leakedInterrupt) { |
35 |
< |
if (Thread.interrupted()) { |
36 |
< |
leakedInterrupt = true; |
37 |
< |
System.err.println("leaked interrupt!"); |
24 |
> |
final ThreadPoolExecutor pool = |
25 |
> |
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, q); |
26 |
> |
|
27 |
> |
final AtomicLong count = new AtomicLong(0); |
28 |
> |
|
29 |
> |
final AtomicReference<Future<?>> cancelMe |
30 |
> |
= new AtomicReference<Future<?>>(null); |
31 |
> |
|
32 |
> |
final AtomicBoolean leakedInterrupt = new AtomicBoolean(false); |
33 |
> |
final AtomicBoolean goHome = new AtomicBoolean(false); |
34 |
> |
|
35 |
> |
class InterruptMeTask extends FutureTask<Void> { |
36 |
> |
InterruptMeTask() { this(new AtomicReference<Future<?>>()); } |
37 |
> |
InterruptMeTask(final AtomicReference<Future<?>> myFuture) { |
38 |
> |
super(new Runnable() { |
39 |
> |
public void run() { |
40 |
> |
if (cancelMe.get() != null) { |
41 |
> |
// We're likely to get the interrupt meant for previous task. |
42 |
> |
// Clear interrupts first to prove *we* got interrupted. |
43 |
> |
Thread.interrupted(); |
44 |
> |
while (cancelMe.get() != null && !goHome.get()) { |
45 |
> |
if (Thread.interrupted()) { |
46 |
> |
leakedInterrupt.set(true); |
47 |
> |
goHome.set(true); |
48 |
> |
System.err.println("leaked interrupt!"); |
49 |
> |
} |
50 |
> |
} |
51 |
> |
} else { |
52 |
> |
cancelMe.set(myFuture.get()); |
53 |
> |
do {} while (! myFuture.get().isCancelled() && |
54 |
> |
!goHome.get()); |
55 |
|
} |
56 |
< |
} |
57 |
< |
} else { |
58 |
< |
cancelMe = myFuture; |
59 |
< |
do {} while (! myFuture.isCancelled() && !leakedInterrupt); |
60 |
< |
} |
61 |
< |
count.getAndIncrement(); |
56 |
> |
count.getAndIncrement(); |
57 |
> |
|
58 |
> |
if (q.isEmpty()) |
59 |
> |
for (int i = 0, n = q.remainingCapacity(); i < n; i++) |
60 |
> |
pool.execute(new InterruptMeTask()); |
61 |
> |
}}, null); |
62 |
> |
myFuture.set(this); |
63 |
|
} |
64 |
|
} |
43 |
– |
InterruptMeTask() { this(new InterruptMe()); } |
44 |
– |
InterruptMeTask(InterruptMe r) { |
45 |
– |
super(r, null); |
46 |
– |
r.myFuture = this; |
47 |
– |
} |
48 |
– |
} |
65 |
|
|
66 |
< |
static long millisElapsedSince(long startTimeNanos) { |
51 |
< |
return (System.nanoTime() - startTimeNanos)/(1000L*1000L); |
52 |
< |
} |
66 |
> |
pool.execute(new InterruptMeTask()); // "starter" task |
67 |
|
|
68 |
< |
public static void main(String[] args) throws Exception { |
69 |
< |
long startTime = System.nanoTime(); |
70 |
< |
final ThreadPoolExecutor pool = |
71 |
< |
new ThreadPoolExecutor(1, 1, |
72 |
< |
0L, TimeUnit.MILLISECONDS, |
59 |
< |
new LinkedBlockingQueue<Runnable>(10000)); |
60 |
< |
|
61 |
< |
final Thread cancelBot = new Thread(new Runnable() { |
62 |
< |
public void run() { |
63 |
< |
while (!leakedInterrupt) { |
64 |
< |
Future<?> future = cancelMe; |
65 |
< |
if (future != null) { |
66 |
< |
future.cancel(true); |
67 |
< |
cancelMe = null; |
68 |
< |
}}}}); |
69 |
< |
cancelBot.setDaemon(true); |
70 |
< |
cancelBot.start(); |
71 |
< |
|
72 |
< |
while (!leakedInterrupt && millisElapsedSince(startTime) < 1000L) { |
73 |
< |
try { |
74 |
< |
pool.execute(new InterruptMeTask()); |
75 |
< |
} catch (RejectedExecutionException ree) { |
76 |
< |
Thread.sleep(1); |
68 |
> |
while (!goHome.get() && millisElapsedSince(startTime) < 1000L) { |
69 |
> |
Future<?> future = cancelMe.get(); |
70 |
> |
if (future != null) { |
71 |
> |
future.cancel(true); |
72 |
> |
cancelMe.set(null); |
73 |
|
} |
74 |
|
} |
75 |
< |
pool.shutdownNow(); |
76 |
< |
if (leakedInterrupt) { |
75 |
> |
|
76 |
> |
goHome.set(true); |
77 |
> |
pool.shutdown(); |
78 |
> |
|
79 |
> |
if (leakedInterrupt.get()) { |
80 |
|
String msg = String.format |
81 |
|
("%d tasks run, %d millis elapsed, till leaked interrupt%n", |
82 |
|
count.get(), millisElapsedSince(startTime)); |