1 |
/* |
2 |
* Written by Doug Lea and Martin Buchholz with assistance from members of |
3 |
* JCP JSR-166 Expert Group and released to the public domain, as explained |
4 |
* at http://creativecommons.org/publicdomain/zero/1.0/ |
5 |
*/ |
6 |
|
7 |
import java.util.concurrent.*; |
8 |
import java.util.concurrent.atomic.*; |
9 |
|
10 |
/** |
11 |
* Tries to demonstrate a leaked interrupt from FutureTask.cancel(true). |
12 |
*/ |
13 |
public class FutureTaskCancelLoops { |
14 |
static long millisElapsedSince(long startTimeNanos) { |
15 |
return (System.nanoTime() - startTimeNanos)/(1000L*1000L); |
16 |
} |
17 |
|
18 |
public static void main(String[] args) throws Exception { |
19 |
|
20 |
long startTime = System.nanoTime(); |
21 |
|
22 |
final BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>(10000); |
23 |
|
24 |
final ThreadPoolExecutor pool = |
25 |
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, q); |
26 |
|
27 |
final AtomicLong count = new AtomicLong(0); |
28 |
|
29 |
final AtomicReference<Future<?>> cancelMe |
30 |
= new AtomicReference<Future<?>>(null); |
31 |
|
32 |
final AtomicBoolean leakedInterrupt = new AtomicBoolean(false); |
33 |
final AtomicBoolean goHome = new AtomicBoolean(false); |
34 |
|
35 |
class InterruptMeTask extends FutureTask<Void> { |
36 |
InterruptMeTask() { this(new AtomicReference<Future<?>>()); } |
37 |
InterruptMeTask(final AtomicReference<Future<?>> myFuture) { |
38 |
super(new Runnable() { |
39 |
public void run() { |
40 |
if (cancelMe.get() != null) { |
41 |
// We're likely to get the interrupt meant for previous task. |
42 |
// Clear interrupts first to prove *we* got interrupted. |
43 |
Thread.interrupted(); |
44 |
while (cancelMe.get() != null && !goHome.get()) { |
45 |
if (Thread.interrupted()) { |
46 |
leakedInterrupt.set(true); |
47 |
goHome.set(true); |
48 |
System.err.println("leaked interrupt!"); |
49 |
} |
50 |
} |
51 |
} else { |
52 |
cancelMe.set(myFuture.get()); |
53 |
do {} while (! myFuture.get().isCancelled() && |
54 |
!goHome.get()); |
55 |
} |
56 |
count.getAndIncrement(); |
57 |
|
58 |
if (q.isEmpty()) |
59 |
for (int i = 0, n = q.remainingCapacity(); i < n; i++) |
60 |
pool.execute(new InterruptMeTask()); |
61 |
}}, null); |
62 |
myFuture.set(this); |
63 |
} |
64 |
} |
65 |
|
66 |
pool.execute(new InterruptMeTask()); // "starter" task |
67 |
|
68 |
while (!goHome.get() && millisElapsedSince(startTime) < 1000L) { |
69 |
Future<?> future = cancelMe.get(); |
70 |
if (future != null) { |
71 |
future.cancel(true); |
72 |
cancelMe.set(null); |
73 |
} |
74 |
} |
75 |
|
76 |
goHome.set(true); |
77 |
pool.shutdown(); |
78 |
|
79 |
if (leakedInterrupt.get()) { |
80 |
String msg = String.format |
81 |
("%d tasks run, %d millis elapsed, till leaked interrupt%n", |
82 |
count.get(), millisElapsedSince(startTime)); |
83 |
throw new IllegalStateException(msg); |
84 |
} else { |
85 |
System.out.printf |
86 |
("%d tasks run, %d millis elapsed%n", |
87 |
count.get(), millisElapsedSince(startTime)); |
88 |
} |
89 |
} |
90 |
} |