ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/ScheduledExecutorTest.java
(Generate patch)

Comparing jsr166/src/test/tck/ScheduledExecutorTest.java (file contents):
Revision 1.88 by jsr166, Sun Mar 26 02:00:39 2017 UTC vs.
Revision 1.89 by jsr166, Tue Mar 28 18:13:10 2017 UTC

# Line 29 | Line 29 | import java.util.concurrent.ThreadPoolEx
29   import java.util.concurrent.atomic.AtomicBoolean;
30   import java.util.concurrent.atomic.AtomicInteger;
31   import java.util.concurrent.atomic.AtomicLong;
32 + import java.util.stream.Stream;
33  
34   import junit.framework.Test;
35   import junit.framework.TestSuite;
# Line 745 | Line 746 | public class ScheduledExecutorTest exten
746       * - setContinueExistingPeriodicTasksAfterShutdownPolicy
747       */
748      public void testShutdown_cancellation() throws Exception {
749 <        final int poolSize = 2;
749 >        final int poolSize = 6;
750          final ScheduledThreadPoolExecutor p
751              = new ScheduledThreadPoolExecutor(poolSize);
752 +        final BlockingQueue<Runnable> q = p.getQueue();
753          final ThreadLocalRandom rnd = ThreadLocalRandom.current();
754          final boolean effectiveDelayedPolicy;
755          final boolean effectivePeriodicPolicy;
# Line 777 | Line 779 | public class ScheduledExecutorTest exten
779          assertEquals(effectiveRemovePolicy,
780                       p.getRemoveOnCancelPolicy());
781  
782 <        // Strategy: Wedge the pool with poolSize "blocker" threads
782 >        // System.err.println("effectiveDelayedPolicy="+effectiveDelayedPolicy);
783 >        // System.err.println("effectivePeriodicPolicy="+effectivePeriodicPolicy);
784 >        // System.err.println("effectiveRemovePolicy="+effectiveRemovePolicy);
785 >
786 >        // Strategy: Wedge the pool with one wave of "blocker" tasks,
787 >        // then add a second wave that waits in the queue.
788          final AtomicInteger ran = new AtomicInteger(0);
789          final CountDownLatch poolBlocked = new CountDownLatch(poolSize);
790          final CountDownLatch unblock = new CountDownLatch(1);
784        final CountDownLatch periodicLatch1 = new CountDownLatch(2);
785        final CountDownLatch periodicLatch2 = new CountDownLatch(2);
786        Runnable task = new CheckedRunnable() { public void realRun()
787                                                    throws InterruptedException {
788            poolBlocked.countDown();
789            await(unblock);
790            ran.getAndIncrement();
791        }};
792        List<Future<?>> blockers = new ArrayList<>();
793        List<Future<?>> periodics = new ArrayList<>();
794        List<Future<?>> delayeds = new ArrayList<>();
795        for (int i = 0; i < poolSize; i++)
796            blockers.add(p.submit(task));
797        await(poolBlocked);
791  
792 <        periodics.add(p.scheduleAtFixedRate(
793 <                          countDowner(periodicLatch1), 1, 1, MILLISECONDS));
794 <        periodics.add(p.scheduleWithFixedDelay(
795 <                          countDowner(periodicLatch2), 1, 1, MILLISECONDS));
792 >        class Task extends CheckedRunnable {
793 >            public void realRun() throws InterruptedException {
794 >                ran.getAndIncrement();
795 >                poolBlocked.countDown();
796 >                await(unblock);
797 >            }
798 >        }
799 >
800 >        class PeriodicTask extends Task {
801 >            PeriodicTask(int rounds) { this.rounds = rounds; }
802 >            int rounds;
803 >            public void realRun() throws InterruptedException {
804 >                if (--rounds == 0) super.realRun();
805 >            }
806 >        }
807 >
808 >        Runnable task = new Task();
809 >
810 >        List<Future<?>> immediates = new ArrayList<>();
811 >        List<Future<?>> delayeds   = new ArrayList<>();
812 >        List<Future<?>> periodics  = new ArrayList<>();
813 >
814 >        immediates.add(p.submit(task));
815          delayeds.add(p.schedule(task, 1, MILLISECONDS));
816 +        for (int rounds : new int[] { 1, 2 }) {
817 +            periodics.add(p.scheduleAtFixedRate(
818 +                              new PeriodicTask(rounds), 1, 1, MILLISECONDS));
819 +            periodics.add(p.scheduleWithFixedDelay(
820 +                              new PeriodicTask(rounds), 1, 1, MILLISECONDS));
821 +        }
822 +
823 +        await(poolBlocked);
824 +
825 +        assertEquals(poolSize, ran.get());
826 +        assertTrue(q.isEmpty());
827 +
828 +        // Add second wave of tasks.
829 +        immediates.add(p.submit(task));
830 +        long delay_ms = effectiveDelayedPolicy ? 1 : LONG_DELAY_MS;
831 +        delayeds.add(p.schedule(task, delay_ms, MILLISECONDS));
832 +        for (int rounds : new int[] { 1, 2 }) {
833 +            periodics.add(p.scheduleAtFixedRate(
834 +                              new PeriodicTask(rounds), 1, 1, MILLISECONDS));
835 +            periodics.add(p.scheduleWithFixedDelay(
836 +                              new PeriodicTask(rounds), 1, 1, MILLISECONDS));
837 +        }
838 +
839 +        assertEquals(poolSize, q.size());
840 +        assertEquals(poolSize, ran.get());
841 +
842 +        immediates.forEach(
843 +            f -> assertTrue(((ScheduledFuture)f).getDelay(NANOSECONDS) <= 0L));
844 +
845 +        Stream.of(immediates, delayeds, periodics).flatMap(c -> c.stream())
846 +            .forEach(f -> assertFalse(f.isDone()));
847  
805        assertTrue(p.getQueue().containsAll(periodics));
806        assertTrue(p.getQueue().containsAll(delayeds));
848          try { p.shutdown(); } catch (SecurityException ok) { return; }
849          assertTrue(p.isShutdown());
850 +        assertTrue(p.isTerminating());
851          assertFalse(p.isTerminated());
810        for (Future<?> periodic : periodics) {
811            assertTrue(effectivePeriodicPolicy ^ periodic.isCancelled());
812            assertTrue(effectivePeriodicPolicy ^ periodic.isDone());
813        }
814        for (Future<?> delayed : delayeds) {
815            assertTrue(effectiveDelayedPolicy ^ delayed.isCancelled());
816            assertTrue(effectiveDelayedPolicy ^ delayed.isDone());
817        }
818        if (testImplementationDetails) {
819            assertEquals(effectivePeriodicPolicy,
820                         p.getQueue().containsAll(periodics));
821            assertEquals(effectiveDelayedPolicy,
822                         p.getQueue().containsAll(delayeds));
823        }
824        unblock.countDown();    // Release all pool threads
852  
853 +        if (rnd.nextBoolean())
854 +            assertThrows(
855 +                RejectedExecutionException.class,
856 +                () -> p.submit(task),
857 +                () -> p.schedule(task, 1, SECONDS),
858 +                () -> p.scheduleAtFixedRate(
859 +                    new PeriodicTask(1), 1, 1, SECONDS),
860 +                () -> p.scheduleWithFixedDelay(
861 +                    new PeriodicTask(2), 1, 1, SECONDS));
862 +
863 +        assertTrue(q.contains(immediates.get(1)));
864 +        assertTrue(!effectiveDelayedPolicy
865 +                   ^ q.contains(delayeds.get(1)));
866 +        assertTrue(!effectivePeriodicPolicy
867 +                   ^ q.containsAll(periodics.subList(4, 8)));
868 +
869 +        immediates.forEach(f -> assertFalse(f.isDone()));
870 +
871 +        assertFalse(delayeds.get(0).isDone());
872          if (effectiveDelayedPolicy)
873 <            for (Future<?> delayed : delayeds) assertNull(delayed.get());
874 <        if (effectivePeriodicPolicy) {
875 <            await(periodicLatch1);
876 <            await(periodicLatch2);
877 <            for (Future<?> periodic : periodics) {
878 <                assertTrue(periodic.cancel(false));
879 <                assertTrue(periodic.isCancelled());
880 <                assertTrue(periodic.isDone());
873 >            assertFalse(delayeds.get(1).isDone());
874 >        else
875 >            assertTrue(delayeds.get(1).isCancelled());
876 >
877 >        if (testImplementationDetails) {
878 >            if (effectivePeriodicPolicy)
879 >                // TODO: ensure periodic tasks continue executing
880 >                periodics.forEach(
881 >                    f -> {
882 >                        assertFalse(f.isDone());
883 >                        assertTrue(f.cancel(false));
884 >                    });
885 >            else {
886 >                periodics.subList(0, 4).forEach(f -> assertFalse(f.isDone()));
887 >                periodics.subList(4, 8).forEach(f -> assertTrue(f.isCancelled()));
888              }
889          }
890 <        for (Future<?> blocker : blockers) assertNull(blocker.get());
890 >
891 >        unblock.countDown();    // Release all pool threads
892 >
893          assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
894 +        assertFalse(p.isTerminating());
895          assertTrue(p.isTerminated());
896  
897 <        for (Future<?> future : delayeds) {
898 <            assertTrue(effectiveDelayedPolicy ^ future.isCancelled());
899 <            assertTrue(future.isDone());
900 <        }
901 <        for (Future<?> future : periodics)
902 <            assertTrue(future.isCancelled());
903 <        for (Future<?> future : blockers)
904 <            assertNull(future.get());
905 <        assertEquals(2 + (effectiveDelayedPolicy ? 1 : 0), ran.get());
897 >        assertTrue(q.isEmpty());
898 >
899 >        for (Future<?> f : immediates) assertNull(f.get());
900 >
901 >        assertNull(delayeds.get(0).get());
902 >        if (effectiveDelayedPolicy)
903 >            assertNull(delayeds.get(1).get());
904 >        else
905 >            assertTrue(delayeds.get(1).isCancelled());
906 >
907 >        periodics.forEach(f -> assertTrue(f.isDone()));
908 >        periodics.forEach(f -> assertTrue(f.isCancelled()));
909 >
910 >        assertEquals(poolSize + 1 + (effectiveDelayedPolicy ? 1 : 0), ran.get());
911      }
912  
913      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines