--- jsr166/src/test/loops/FutureTaskCancelLoops.java 2011/06/16 09:53:45 1.1 +++ jsr166/src/test/loops/FutureTaskCancelLoops.java 2011/06/20 04:31:17 1.2 @@ -8,76 +8,75 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; /** - * Tries to demonstrate a leaked interrupt. + * Tries to demonstrate a leaked interrupt from FutureTask.cancel(true). */ public class FutureTaskCancelLoops { - static final AtomicLong count = new AtomicLong(0); + static long millisElapsedSince(long startTimeNanos) { + return (System.nanoTime() - startTimeNanos)/(1000L*1000L); + } - static volatile Future cancelMe = null; + public static void main(String[] args) throws Exception { + + long startTime = System.nanoTime(); - static volatile boolean leakedInterrupt = false; + final BlockingQueue q = new LinkedBlockingQueue(10000); - static class InterruptMeTask extends FutureTask { - static class InterruptMe implements Runnable { - volatile Future myFuture; - - public void run() { - assert myFuture != null; - if (cancelMe != null) { - // We're likely to get the interrupt meant for previous task. - // Clear interrupts first to prove *we* got interrupted. - Thread.interrupted(); - while (cancelMe != null && !leakedInterrupt) { - if (Thread.interrupted()) { - leakedInterrupt = true; - System.err.println("leaked interrupt!"); + final ThreadPoolExecutor pool = + new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, q); + + final AtomicLong count = new AtomicLong(0); + + final AtomicReference> cancelMe + = new AtomicReference>(null); + + final AtomicBoolean leakedInterrupt = new AtomicBoolean(false); + final AtomicBoolean goHome = new AtomicBoolean(false); + + class InterruptMeTask extends FutureTask { + InterruptMeTask() { this(new AtomicReference>()); } + InterruptMeTask(final AtomicReference> myFuture) { + super(new Runnable() { + public void run() { + if (cancelMe.get() != null) { + // We're likely to get the interrupt meant for previous task. + // Clear interrupts first to prove *we* got interrupted. + Thread.interrupted(); + while (cancelMe.get() != null && !goHome.get()) { + if (Thread.interrupted()) { + leakedInterrupt.set(true); + goHome.set(true); + System.err.println("leaked interrupt!"); + } + } + } else { + cancelMe.set(myFuture.get()); + do {} while (! myFuture.get().isCancelled() && + !goHome.get()); } - } - } else { - cancelMe = myFuture; - do {} while (! myFuture.isCancelled() && !leakedInterrupt); - } - count.getAndIncrement(); + count.getAndIncrement(); + + if (q.isEmpty()) + for (int i = 0, n = q.remainingCapacity(); i < n; i++) + pool.execute(new InterruptMeTask()); + }}, null); + myFuture.set(this); } } - InterruptMeTask() { this(new InterruptMe()); } - InterruptMeTask(InterruptMe r) { - super(r, null); - r.myFuture = this; - } - } - static long millisElapsedSince(long startTimeNanos) { - return (System.nanoTime() - startTimeNanos)/(1000L*1000L); - } + pool.execute(new InterruptMeTask()); // "starter" task - public static void main(String[] args) throws Exception { - long startTime = System.nanoTime(); - final ThreadPoolExecutor pool = - new ThreadPoolExecutor(1, 1, - 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue(10000)); - - final Thread cancelBot = new Thread(new Runnable() { - public void run() { - while (!leakedInterrupt) { - Future future = cancelMe; - if (future != null) { - future.cancel(true); - cancelMe = null; - }}}}); - cancelBot.setDaemon(true); - cancelBot.start(); - - while (!leakedInterrupt && millisElapsedSince(startTime) < 1000L) { - try { - pool.execute(new InterruptMeTask()); - } catch (RejectedExecutionException ree) { - Thread.sleep(1); + while (!goHome.get() && millisElapsedSince(startTime) < 1000L) { + Future future = cancelMe.get(); + if (future != null) { + future.cancel(true); + cancelMe.set(null); } } - pool.shutdownNow(); - if (leakedInterrupt) { + + goHome.set(true); + pool.shutdown(); + + if (leakedInterrupt.get()) { String msg = String.format ("%d tasks run, %d millis elapsed, till leaked interrupt%n", count.get(), millisElapsedSince(startTime));