--- jsr166/src/test/tck/ScheduledExecutorTest.java 2017/03/26 02:00:39 1.88 +++ jsr166/src/test/tck/ScheduledExecutorTest.java 2017/03/28 18:13:10 1.89 @@ -29,6 +29,7 @@ import java.util.concurrent.ThreadPoolEx import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; import junit.framework.Test; import junit.framework.TestSuite; @@ -745,9 +746,10 @@ public class ScheduledExecutorTest exten * - setContinueExistingPeriodicTasksAfterShutdownPolicy */ public void testShutdown_cancellation() throws Exception { - final int poolSize = 2; + final int poolSize = 6; final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(poolSize); + final BlockingQueue q = p.getQueue(); final ThreadLocalRandom rnd = ThreadLocalRandom.current(); final boolean effectiveDelayedPolicy; final boolean effectivePeriodicPolicy; @@ -777,76 +779,135 @@ public class ScheduledExecutorTest exten assertEquals(effectiveRemovePolicy, p.getRemoveOnCancelPolicy()); - // Strategy: Wedge the pool with poolSize "blocker" threads + // System.err.println("effectiveDelayedPolicy="+effectiveDelayedPolicy); + // System.err.println("effectivePeriodicPolicy="+effectivePeriodicPolicy); + // System.err.println("effectiveRemovePolicy="+effectiveRemovePolicy); + + // Strategy: Wedge the pool with one wave of "blocker" tasks, + // then add a second wave that waits in the queue. final AtomicInteger ran = new AtomicInteger(0); final CountDownLatch poolBlocked = new CountDownLatch(poolSize); final CountDownLatch unblock = new CountDownLatch(1); - final CountDownLatch periodicLatch1 = new CountDownLatch(2); - final CountDownLatch periodicLatch2 = new CountDownLatch(2); - Runnable task = new CheckedRunnable() { public void realRun() - throws InterruptedException { - poolBlocked.countDown(); - await(unblock); - ran.getAndIncrement(); - }}; - List> blockers = new ArrayList<>(); - List> periodics = new ArrayList<>(); - List> delayeds = new ArrayList<>(); - for (int i = 0; i < poolSize; i++) - blockers.add(p.submit(task)); - await(poolBlocked); - periodics.add(p.scheduleAtFixedRate( - countDowner(periodicLatch1), 1, 1, MILLISECONDS)); - periodics.add(p.scheduleWithFixedDelay( - countDowner(periodicLatch2), 1, 1, MILLISECONDS)); + class Task extends CheckedRunnable { + public void realRun() throws InterruptedException { + ran.getAndIncrement(); + poolBlocked.countDown(); + await(unblock); + } + } + + class PeriodicTask extends Task { + PeriodicTask(int rounds) { this.rounds = rounds; } + int rounds; + public void realRun() throws InterruptedException { + if (--rounds == 0) super.realRun(); + } + } + + Runnable task = new Task(); + + List> immediates = new ArrayList<>(); + List> delayeds = new ArrayList<>(); + List> periodics = new ArrayList<>(); + + immediates.add(p.submit(task)); delayeds.add(p.schedule(task, 1, MILLISECONDS)); + for (int rounds : new int[] { 1, 2 }) { + periodics.add(p.scheduleAtFixedRate( + new PeriodicTask(rounds), 1, 1, MILLISECONDS)); + periodics.add(p.scheduleWithFixedDelay( + new PeriodicTask(rounds), 1, 1, MILLISECONDS)); + } + + await(poolBlocked); + + assertEquals(poolSize, ran.get()); + assertTrue(q.isEmpty()); + + // Add second wave of tasks. + immediates.add(p.submit(task)); + long delay_ms = effectiveDelayedPolicy ? 1 : LONG_DELAY_MS; + delayeds.add(p.schedule(task, delay_ms, MILLISECONDS)); + for (int rounds : new int[] { 1, 2 }) { + periodics.add(p.scheduleAtFixedRate( + new PeriodicTask(rounds), 1, 1, MILLISECONDS)); + periodics.add(p.scheduleWithFixedDelay( + new PeriodicTask(rounds), 1, 1, MILLISECONDS)); + } + + assertEquals(poolSize, q.size()); + assertEquals(poolSize, ran.get()); + + immediates.forEach( + f -> assertTrue(((ScheduledFuture)f).getDelay(NANOSECONDS) <= 0L)); + + Stream.of(immediates, delayeds, periodics).flatMap(c -> c.stream()) + .forEach(f -> assertFalse(f.isDone())); - assertTrue(p.getQueue().containsAll(periodics)); - assertTrue(p.getQueue().containsAll(delayeds)); try { p.shutdown(); } catch (SecurityException ok) { return; } assertTrue(p.isShutdown()); + assertTrue(p.isTerminating()); assertFalse(p.isTerminated()); - for (Future periodic : periodics) { - assertTrue(effectivePeriodicPolicy ^ periodic.isCancelled()); - assertTrue(effectivePeriodicPolicy ^ periodic.isDone()); - } - for (Future delayed : delayeds) { - assertTrue(effectiveDelayedPolicy ^ delayed.isCancelled()); - assertTrue(effectiveDelayedPolicy ^ delayed.isDone()); - } - if (testImplementationDetails) { - assertEquals(effectivePeriodicPolicy, - p.getQueue().containsAll(periodics)); - assertEquals(effectiveDelayedPolicy, - p.getQueue().containsAll(delayeds)); - } - unblock.countDown(); // Release all pool threads + if (rnd.nextBoolean()) + assertThrows( + RejectedExecutionException.class, + () -> p.submit(task), + () -> p.schedule(task, 1, SECONDS), + () -> p.scheduleAtFixedRate( + new PeriodicTask(1), 1, 1, SECONDS), + () -> p.scheduleWithFixedDelay( + new PeriodicTask(2), 1, 1, SECONDS)); + + assertTrue(q.contains(immediates.get(1))); + assertTrue(!effectiveDelayedPolicy + ^ q.contains(delayeds.get(1))); + assertTrue(!effectivePeriodicPolicy + ^ q.containsAll(periodics.subList(4, 8))); + + immediates.forEach(f -> assertFalse(f.isDone())); + + assertFalse(delayeds.get(0).isDone()); if (effectiveDelayedPolicy) - for (Future delayed : delayeds) assertNull(delayed.get()); - if (effectivePeriodicPolicy) { - await(periodicLatch1); - await(periodicLatch2); - for (Future periodic : periodics) { - assertTrue(periodic.cancel(false)); - assertTrue(periodic.isCancelled()); - assertTrue(periodic.isDone()); + assertFalse(delayeds.get(1).isDone()); + else + assertTrue(delayeds.get(1).isCancelled()); + + if (testImplementationDetails) { + if (effectivePeriodicPolicy) + // TODO: ensure periodic tasks continue executing + periodics.forEach( + f -> { + assertFalse(f.isDone()); + assertTrue(f.cancel(false)); + }); + else { + periodics.subList(0, 4).forEach(f -> assertFalse(f.isDone())); + periodics.subList(4, 8).forEach(f -> assertTrue(f.isCancelled())); } } - for (Future blocker : blockers) assertNull(blocker.get()); + + unblock.countDown(); // Release all pool threads + assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS)); + assertFalse(p.isTerminating()); assertTrue(p.isTerminated()); - for (Future future : delayeds) { - assertTrue(effectiveDelayedPolicy ^ future.isCancelled()); - assertTrue(future.isDone()); - } - for (Future future : periodics) - assertTrue(future.isCancelled()); - for (Future future : blockers) - assertNull(future.get()); - assertEquals(2 + (effectiveDelayedPolicy ? 1 : 0), ran.get()); + assertTrue(q.isEmpty()); + + for (Future f : immediates) assertNull(f.get()); + + assertNull(delayeds.get(0).get()); + if (effectiveDelayedPolicy) + assertNull(delayeds.get(1).get()); + else + assertTrue(delayeds.get(1).isCancelled()); + + periodics.forEach(f -> assertTrue(f.isDone())); + periodics.forEach(f -> assertTrue(f.isCancelled())); + + assertEquals(poolSize + 1 + (effectiveDelayedPolicy ? 1 : 0), ran.get()); } /**