--- jsr166/src/test/tck/ScheduledExecutorSubclassTest.java 2017/03/25 21:41:10 1.65 +++ jsr166/src/test/tck/ScheduledExecutorSubclassTest.java 2017/04/01 16:28:31 1.66 @@ -25,12 +25,14 @@ import java.util.concurrent.RunnableSche import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; import junit.framework.Test; import junit.framework.TestSuite; @@ -786,90 +788,185 @@ public class ScheduledExecutorSubclassTe * - setContinueExistingPeriodicTasksAfterShutdownPolicy */ public void testShutdown_cancellation() throws Exception { - Boolean[] allBooleans = { null, Boolean.FALSE, Boolean.TRUE }; - for (Boolean policy : allBooleans) - { - final int poolSize = 2; + final int poolSize = 4; final CustomExecutor p = new CustomExecutor(poolSize); - final boolean effectiveDelayedPolicy = (policy != Boolean.FALSE); - final boolean effectivePeriodicPolicy = (policy == Boolean.TRUE); - final boolean effectiveRemovePolicy = (policy == Boolean.TRUE); - if (policy != null) { - p.setExecuteExistingDelayedTasksAfterShutdownPolicy(policy); - p.setContinueExistingPeriodicTasksAfterShutdownPolicy(policy); - p.setRemoveOnCancelPolicy(policy); - } + final BlockingQueue q = p.getQueue(); + final ThreadLocalRandom rnd = ThreadLocalRandom.current(); + final long delay = rnd.nextInt(2); + final int rounds = rnd.nextInt(1, 3); + final boolean effectiveDelayedPolicy; + final boolean effectivePeriodicPolicy; + final boolean effectiveRemovePolicy; + + if (rnd.nextBoolean()) + p.setExecuteExistingDelayedTasksAfterShutdownPolicy( + effectiveDelayedPolicy = rnd.nextBoolean()); + else + effectiveDelayedPolicy = true; assertEquals(effectiveDelayedPolicy, p.getExecuteExistingDelayedTasksAfterShutdownPolicy()); + + if (rnd.nextBoolean()) + p.setContinueExistingPeriodicTasksAfterShutdownPolicy( + effectivePeriodicPolicy = rnd.nextBoolean()); + else + effectivePeriodicPolicy = false; assertEquals(effectivePeriodicPolicy, p.getContinueExistingPeriodicTasksAfterShutdownPolicy()); + + if (rnd.nextBoolean()) + p.setRemoveOnCancelPolicy( + effectiveRemovePolicy = rnd.nextBoolean()); + else + effectiveRemovePolicy = false; assertEquals(effectiveRemovePolicy, p.getRemoveOnCancelPolicy()); - // Strategy: Wedge the pool with poolSize "blocker" threads + + final boolean periodicTasksContinue = effectivePeriodicPolicy && rnd.nextBoolean(); + + // Strategy: Wedge the pool with one wave of "blocker" tasks, + // then add a second wave that waits in the queue until unblocked. final AtomicInteger ran = new AtomicInteger(0); final CountDownLatch poolBlocked = new CountDownLatch(poolSize); final CountDownLatch unblock = new CountDownLatch(1); - final CountDownLatch periodicLatch1 = new CountDownLatch(2); - final CountDownLatch periodicLatch2 = new CountDownLatch(2); - Runnable task = new CheckedRunnable() { public void realRun() - throws InterruptedException { - poolBlocked.countDown(); - await(unblock); - ran.getAndIncrement(); - }}; - List> blockers = new ArrayList<>(); - List> periodics = new ArrayList<>(); - List> delayeds = new ArrayList<>(); - for (int i = 0; i < poolSize; i++) - blockers.add(p.submit(task)); + final RuntimeException exception = new RuntimeException(); + + class Task implements Runnable { + public void run() { + try { + ran.getAndIncrement(); + poolBlocked.countDown(); + await(unblock); + } catch (Throwable fail) { threadUnexpectedException(fail); } + } + } + + class PeriodicTask extends Task { + PeriodicTask(int rounds) { this.rounds = rounds; } + int rounds; + public void run() { + if (--rounds == 0) super.run(); + // throw exception to surely terminate this periodic task, + // but in a separate execution and in a detectable way. + if (rounds == -1) throw exception; + } + } + + Runnable task = new Task(); + + List> immediates = new ArrayList<>(); + List> delayeds = new ArrayList<>(); + List> periodics = new ArrayList<>(); + + immediates.add(p.submit(task)); + delayeds.add(p.schedule(task, delay, MILLISECONDS)); + periodics.add(p.scheduleAtFixedRate( + new PeriodicTask(rounds), delay, 1, MILLISECONDS)); + periodics.add(p.scheduleWithFixedDelay( + new PeriodicTask(rounds), delay, 1, MILLISECONDS)); + await(poolBlocked); - periodics.add(p.scheduleAtFixedRate(countDowner(periodicLatch1), - 1, 1, MILLISECONDS)); - periodics.add(p.scheduleWithFixedDelay(countDowner(periodicLatch2), - 1, 1, MILLISECONDS)); - delayeds.add(p.schedule(task, 1, MILLISECONDS)); + assertEquals(poolSize, ran.get()); + assertEquals(poolSize, p.getActiveCount()); + assertTrue(q.isEmpty()); + + // Add second wave of tasks. + immediates.add(p.submit(task)); + delayeds.add(p.schedule(task, effectiveDelayedPolicy ? delay : LONG_DELAY_MS, MILLISECONDS)); + periodics.add(p.scheduleAtFixedRate( + new PeriodicTask(rounds), delay, 1, MILLISECONDS)); + periodics.add(p.scheduleWithFixedDelay( + new PeriodicTask(rounds), delay, 1, MILLISECONDS)); + + assertEquals(poolSize, q.size()); + assertEquals(poolSize, ran.get()); + + immediates.forEach( + f -> assertTrue(((ScheduledFuture)f).getDelay(NANOSECONDS) <= 0L)); + + Stream.of(immediates, delayeds, periodics).flatMap(c -> c.stream()) + .forEach(f -> assertFalse(f.isDone())); - assertTrue(p.getQueue().containsAll(periodics)); - assertTrue(p.getQueue().containsAll(delayeds)); try { p.shutdown(); } catch (SecurityException ok) { return; } assertTrue(p.isShutdown()); + assertTrue(p.isTerminating()); assertFalse(p.isTerminated()); - for (Future periodic : periodics) { - assertTrue(effectivePeriodicPolicy ^ periodic.isCancelled()); - assertTrue(effectivePeriodicPolicy ^ periodic.isDone()); - } - for (Future delayed : delayeds) { - assertTrue(effectiveDelayedPolicy ^ delayed.isCancelled()); - assertTrue(effectiveDelayedPolicy ^ delayed.isDone()); - } - if (testImplementationDetails) { - assertEquals(effectivePeriodicPolicy, - p.getQueue().containsAll(periodics)); - assertEquals(effectiveDelayedPolicy, - p.getQueue().containsAll(delayeds)); - } - // Release all pool threads - unblock.countDown(); - - for (Future delayed : delayeds) { - if (effectiveDelayedPolicy) { - assertNull(delayed.get()); - } - } - if (effectivePeriodicPolicy) { - await(periodicLatch1); - await(periodicLatch2); - for (Future periodic : periodics) { - assertTrue(periodic.cancel(false)); - assertTrue(periodic.isCancelled()); - assertTrue(periodic.isDone()); - } + + if (rnd.nextBoolean()) + assertThrows( + RejectedExecutionException.class, + () -> p.submit(task), + () -> p.schedule(task, 1, SECONDS), + () -> p.scheduleAtFixedRate( + new PeriodicTask(1), 1, 1, SECONDS), + () -> p.scheduleWithFixedDelay( + new PeriodicTask(2), 1, 1, SECONDS)); + + assertTrue(q.contains(immediates.get(1))); + assertTrue(!effectiveDelayedPolicy + ^ q.contains(delayeds.get(1))); + assertTrue(!effectivePeriodicPolicy + ^ q.containsAll(periodics.subList(2, 4))); + + immediates.forEach(f -> assertFalse(f.isDone())); + + assertFalse(delayeds.get(0).isDone()); + if (effectiveDelayedPolicy) + assertFalse(delayeds.get(1).isDone()); + else + assertTrue(delayeds.get(1).isCancelled()); + + if (effectivePeriodicPolicy) + periodics.forEach( + f -> { + assertFalse(f.isDone()); + if (!periodicTasksContinue) { + assertTrue(f.cancel(false)); + assertTrue(f.isCancelled()); + } + }); + else { + periodics.subList(0, 2).forEach(f -> assertFalse(f.isDone())); + periodics.subList(2, 4).forEach(f -> assertTrue(f.isCancelled())); } + + unblock.countDown(); // Release all pool threads + assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS)); + assertFalse(p.isTerminating()); assertTrue(p.isTerminated()); - assertEquals(2 + (effectiveDelayedPolicy ? 1 : 0), ran.get()); - }} + + assertTrue(q.isEmpty()); + + Stream.of(immediates, delayeds, periodics).flatMap(c -> c.stream()) + .forEach(f -> assertTrue(f.isDone())); + + for (Future f : immediates) assertNull(f.get()); + + assertNull(delayeds.get(0).get()); + if (effectiveDelayedPolicy) + assertNull(delayeds.get(1).get()); + else + assertTrue(delayeds.get(1).isCancelled()); + + if (periodicTasksContinue) + periodics.forEach( + f -> { + try { f.get(); } + catch (ExecutionException success) { + assertSame(exception, success.getCause()); + } + catch (Throwable fail) { threadUnexpectedException(fail); } + }); + else + periodics.forEach(f -> assertTrue(f.isCancelled())); + + assertEquals(poolSize + 1 + + (effectiveDelayedPolicy ? 1 : 0) + + (periodicTasksContinue ? 2 : 0), + ran.get()); + } /** * completed submit of callable returns result