ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/FutureTaskCancelLoops.java
(Generate patch)

Comparing jsr166/src/test/loops/FutureTaskCancelLoops.java (file contents):
Revision 1.1 by jsr166, Thu Jun 16 09:53:45 2011 UTC vs.
Revision 1.2 by jsr166, Mon Jun 20 04:31:17 2011 UTC

# Line 8 | Line 8 | import java.util.concurrent.*;
8   import java.util.concurrent.atomic.*;
9  
10   /**
11 < * Tries to demonstrate a leaked interrupt.
11 > * Tries to demonstrate a leaked interrupt from FutureTask.cancel(true).
12   */
13   public class FutureTaskCancelLoops {
14 <    static final AtomicLong count = new AtomicLong(0);
14 >    static long millisElapsedSince(long startTimeNanos) {
15 >        return (System.nanoTime() - startTimeNanos)/(1000L*1000L);
16 >    }
17  
18 <    static volatile Future<?> cancelMe = null;
18 >    public static void main(String[] args) throws Exception {
19 >
20 >        long startTime = System.nanoTime();
21  
22 <    static volatile boolean leakedInterrupt = false;
22 >        final BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>(10000);
23  
24 <    static class InterruptMeTask extends FutureTask<Void> {
25 <        static class InterruptMe implements Runnable {
26 <            volatile Future<?> myFuture;
27 <
28 <            public void run() {
29 <                assert myFuture != null;
30 <                if (cancelMe != null) {
31 <                    // We're likely to get the interrupt meant for previous task.
32 <                    // Clear interrupts first to prove *we* got interrupted.
33 <                    Thread.interrupted();
34 <                    while (cancelMe != null && !leakedInterrupt) {
35 <                        if (Thread.interrupted()) {
36 <                            leakedInterrupt = true;
37 <                            System.err.println("leaked interrupt!");
24 >        final ThreadPoolExecutor pool =
25 >            new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, q);
26 >
27 >        final AtomicLong count = new AtomicLong(0);
28 >
29 >        final AtomicReference<Future<?>> cancelMe
30 >            = new AtomicReference<Future<?>>(null);
31 >
32 >        final AtomicBoolean leakedInterrupt = new AtomicBoolean(false);
33 >        final AtomicBoolean goHome = new AtomicBoolean(false);
34 >
35 >        class InterruptMeTask extends FutureTask<Void> {
36 >            InterruptMeTask() { this(new AtomicReference<Future<?>>()); }
37 >            InterruptMeTask(final AtomicReference<Future<?>> myFuture) {
38 >                super(new Runnable() {
39 >                    public void run() {
40 >                        if (cancelMe.get() != null) {
41 >                            // We're likely to get the interrupt meant for previous task.
42 >                            // Clear interrupts first to prove *we* got interrupted.
43 >                            Thread.interrupted();
44 >                            while (cancelMe.get() != null && !goHome.get()) {
45 >                                if (Thread.interrupted()) {
46 >                                    leakedInterrupt.set(true);
47 >                                    goHome.set(true);
48 >                                    System.err.println("leaked interrupt!");
49 >                                }
50 >                            }
51 >                        } else {
52 >                            cancelMe.set(myFuture.get());
53 >                            do {} while (! myFuture.get().isCancelled() &&
54 >                                         !goHome.get());
55                          }
56 <                    }
57 <                } else {
58 <                    cancelMe = myFuture;
59 <                    do {} while (! myFuture.isCancelled() && !leakedInterrupt);
60 <                }
61 <                count.getAndIncrement();
56 >                        count.getAndIncrement();
57 >
58 >                        if (q.isEmpty())
59 >                            for (int i = 0, n = q.remainingCapacity(); i < n; i++)
60 >                                pool.execute(new InterruptMeTask());
61 >                    }}, null);
62 >                myFuture.set(this);
63              }
64          }
43        InterruptMeTask() { this(new InterruptMe()); }
44        InterruptMeTask(InterruptMe r) {
45            super(r, null);
46            r.myFuture = this;
47        }
48    }
65  
66 <    static long millisElapsedSince(long startTimeNanos) {
51 <        return (System.nanoTime() - startTimeNanos)/(1000L*1000L);
52 <    }
66 >        pool.execute(new InterruptMeTask()); // "starter" task
67  
68 <    public static void main(String[] args) throws Exception {
69 <        long startTime = System.nanoTime();
70 <        final ThreadPoolExecutor pool =
71 <            new ThreadPoolExecutor(1, 1,
72 <                                   0L, TimeUnit.MILLISECONDS,
59 <                                   new LinkedBlockingQueue<Runnable>(10000));
60 <
61 <        final Thread cancelBot = new Thread(new Runnable() {
62 <            public void run() {
63 <                while (!leakedInterrupt) {
64 <                    Future<?> future = cancelMe;
65 <                    if (future != null) {
66 <                        future.cancel(true);
67 <                        cancelMe = null;
68 <                    }}}});
69 <        cancelBot.setDaemon(true);
70 <        cancelBot.start();
71 <
72 <        while (!leakedInterrupt && millisElapsedSince(startTime) < 1000L) {
73 <            try {
74 <                pool.execute(new InterruptMeTask());
75 <            } catch (RejectedExecutionException ree) {
76 <                Thread.sleep(1);
68 >        while (!goHome.get() && millisElapsedSince(startTime) < 1000L) {
69 >            Future<?> future = cancelMe.get();
70 >            if (future != null) {
71 >                future.cancel(true);
72 >                cancelMe.set(null);
73              }
74          }
75 <        pool.shutdownNow();
76 <        if (leakedInterrupt) {
75 >
76 >        goHome.set(true);
77 >        pool.shutdown();
78 >
79 >        if (leakedInterrupt.get()) {
80              String msg = String.format
81                  ("%d tasks run, %d millis elapsed, till leaked interrupt%n",
82                   count.get(), millisElapsedSince(startTime));

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines