ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SubmissionPublisherTest.java
(Generate patch)

Comparing jsr166/src/test/tck/SubmissionPublisherTest.java (file contents):
Revision 1.6 by jsr166, Mon Sep 7 20:53:10 2015 UTC vs.
Revision 1.11 by jsr166, Sat Sep 12 17:11:12 2015 UTC

# Line 5 | Line 5
5   * http://creativecommons.org/publicdomain/zero/1.0/
6   */
7  
8 < import static java.util.concurrent.TimeUnit.MILLISECONDS;
9 < import static java.util.concurrent.TimeUnit.SECONDS;
10 <
8 > import java.util.concurrent.CompletableFuture;
9   import java.util.concurrent.Executor;
10   import java.util.concurrent.Executors;
11   import java.util.concurrent.Flow;
14 import static java.util.concurrent.Flow.Publisher;
15 import static java.util.concurrent.Flow.Subscriber;
16 import static java.util.concurrent.Flow.Subscription;
17 import java.util.concurrent.LinkedBlockingQueue;
12   import java.util.concurrent.ForkJoinPool;
13 + import java.util.concurrent.LinkedBlockingQueue;
14   import java.util.concurrent.SubmissionPublisher;
15   import java.util.concurrent.ThreadFactory;
16   import java.util.concurrent.ThreadPoolExecutor;
17   import java.util.concurrent.TimeUnit;
18   import java.util.concurrent.atomic.AtomicInteger;
19   import java.util.function.BiConsumer;
25 import java.util.function.BiPredicate;
20   import java.util.function.BiFunction;
21 <
21 > import java.util.function.BiPredicate;
22 > import java.util.stream.Stream;
23   import junit.framework.Test;
24   import junit.framework.TestSuite;
25  
26 + import static java.util.concurrent.Flow.Publisher;
27 + import static java.util.concurrent.Flow.Subscriber;
28 + import static java.util.concurrent.Flow.Subscription;
29 + import static java.util.concurrent.TimeUnit.MILLISECONDS;
30 + import static java.util.concurrent.TimeUnit.SECONDS;
31 +
32   public class SubmissionPublisherTest extends JSR166TestCase {
33  
34      public static void main(String[] args) {
# Line 47 | Line 48 | public class SubmissionPublisherTest ext
48      }
49  
50      static final Executor basicExecutor =
51 <        (ForkJoinPool.getCommonPoolParallelism() > 0) ?
51 >        (ForkJoinPool.getCommonPoolParallelism() > 1) ?
52          ForkJoinPool.commonPool() :
53          new ThreadPoolExecutor(1, 1, 60, SECONDS,
54                                 new LinkedBlockingQueue<Runnable>(),
# Line 285 | Line 286 | public class SubmissionPublisherTest ext
286          assertEquals(0, s2.nexts);
287          assertEquals(0, s2.errors);
288          assertEquals(0, s2.completes);
289 +        p.close();
290      }
291  
292      /**
# Line 360 | Line 362 | public class SubmissionPublisherTest ext
362      }
363  
364      /**
365 <     * subscribe(null) thows NPE
365 >     * subscribe(null) throws NPE
366       */
367      public void testSubscribe6() {
368          SubmissionPublisher<Integer> p = basicPublisher();
# Line 455 | Line 457 | public class SubmissionPublisherTest ext
457      }
458  
459      /**
460 <     * If a handler is supplied in conctructor, it is invoked when
460 >     * If a handler is supplied in constructor, it is invoked when
461       * subscriber throws an exception in onNext
462       */
463      public void testThrowOnNextHandler() {
# Line 806 | Line 808 | public class SubmissionPublisherTest ext
808       */
809      public void testEmptyTimedOffer() {
810          SubmissionPublisher<Integer> p = basicPublisher();
811 +        long startTime = System.nanoTime();
812          assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null));
813 +        assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
814      }
815  
816      /**
# Line 814 | Line 818 | public class SubmissionPublisherTest ext
818       */
819      public void testNullTimedOffer() {
820          SubmissionPublisher<Integer> p = basicPublisher();
821 +        long startTime = System.nanoTime();
822          try {
823 <            p.offer(null, SHORT_DELAY_MS, MILLISECONDS, null);
823 >            p.offer(null, LONG_DELAY_MS, MILLISECONDS, null);
824              shouldThrow();
825          } catch (NullPointerException success) {}
826          try {
827 <            p.offer(1, SHORT_DELAY_MS, null, null);
827 >            p.offer(1, LONG_DELAY_MS, null, null);
828              shouldThrow();
829          } catch (NullPointerException success) {}
830 +        assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
831      }
832  
833      /**
# Line 837 | Line 843 | public class SubmissionPublisherTest ext
843          p.subscribe(s2);
844          s2.awaitSubscribe();
845          s1.awaitSubscribe();
846 <        assertTrue(p.offer(1, SHORT_DELAY_MS, MILLISECONDS, null) >= 1);
847 <        assertTrue(p.offer(2, SHORT_DELAY_MS, MILLISECONDS, null) >= 2);
846 >        long startTime = System.nanoTime();
847 >        assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
848 >        assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
849          s1.sn.request(4);
850 <        assertTrue(p.offer(3, SHORT_DELAY_MS, MILLISECONDS, null) >= 3);
850 >        assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
851          s2.sn.request(4);
852 <        p.offer(4, SHORT_DELAY_MS, MILLISECONDS, null);
852 >        p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
853          p.close();
854          s2.awaitComplete();
855          assertEquals(4, s2.nexts);
856          s1.awaitComplete();
857          assertEquals(4, s2.nexts);
858 +        assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
859      }
860  
861      /**
# Line 864 | Line 872 | public class SubmissionPublisherTest ext
872          p.subscribe(s2);
873          s2.awaitSubscribe();
874          s1.awaitSubscribe();
875 +        long delay = timeoutMillis();
876          for (int i = 1; i <= 4; ++i)
877 <            assertTrue(p.offer(i, SHORT_DELAY_MS, MILLISECONDS, null) >= 0);
878 <        p.offer(5, SHORT_DELAY_MS, MILLISECONDS, null);
879 <        assertTrue(p.offer(6, SHORT_DELAY_MS, MILLISECONDS, null) < 0);
877 >            assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0);
878 >        long startTime = System.nanoTime();
879 >        assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0);
880          s1.sn.request(64);
881 <        assertTrue(p.offer(7, SHORT_DELAY_MS, MILLISECONDS, null) < 0);
881 >        assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0);
882 >        // 2 * delay should elapse but check only 1 * delay to allow timer slop
883 >        assertTrue(millisElapsedSince(startTime) >= delay);
884          s2.sn.request(64);
885          p.close();
886          s2.awaitComplete();
# Line 893 | Line 904 | public class SubmissionPublisherTest ext
904          p.subscribe(s2);
905          s2.awaitSubscribe();
906          s1.awaitSubscribe();
907 +        long delay = timeoutMillis();
908          for (int i = 1; i <= 4; ++i)
909 <            assertTrue(p.offer(i, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
910 <        p.offer(5, (s, x) -> noopHandle(calls));
911 <        assertTrue(p.offer(6, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
909 >            assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
910 >        long startTime = System.nanoTime();
911 >        assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
912          s1.sn.request(64);
913 <        assertTrue(p.offer(7, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
913 >        assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
914 >        assertTrue(millisElapsedSince(startTime) >= delay);
915          s2.sn.request(64);
916          p.close();
917          s2.awaitComplete();
# Line 922 | Line 935 | public class SubmissionPublisherTest ext
935          s2.awaitSubscribe();
936          s1.awaitSubscribe();
937          int n = 0;
938 <        for (int i = 1; i <= 8; ++i) {
939 <            int d = p.offer(i, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> reqHandle(calls, s));
938 >        long delay = timeoutMillis();
939 >        long startTime = System.nanoTime();
940 >        for (int i = 1; i <= 6; ++i) {
941 >            int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
942              n = n + 2 + (d < 0 ? d : 0);
943          }
944 +        assertTrue(millisElapsedSince(startTime) >= delay);
945          p.close();
946          s2.awaitComplete();
947          s1.awaitComplete();
# Line 933 | Line 949 | public class SubmissionPublisherTest ext
949          assertTrue(calls.get() >= 2);
950      }
951  
952 +    /**
953 +     * consume returns a CompletableFuture that is done when
954 +     * publisher completes
955 +     */
956 +    public void testConsume() {
957 +        AtomicInteger sum = new AtomicInteger();
958 +        SubmissionPublisher<Integer> p = basicPublisher();
959 +        CompletableFuture<Void> f =
960 +            p.consume((Integer x) -> { sum.getAndAdd(x.intValue()); });
961 +        int n = 20;
962 +        for (int i = 1; i <= n; ++i)
963 +            p.submit(i);
964 +        p.close();
965 +        f.join();
966 +        assertEquals((n * (n + 1)) / 2, sum.get());
967 +    }
968 +
969 +    /**
970 +     * consume(null) throws NPE
971 +     */
972 +    public void testConsumeNPE() {
973 +        SubmissionPublisher<Integer> p = basicPublisher();
974 +        try {
975 +            CompletableFuture<Void> f = p.consume(null);
976 +            shouldThrow();
977 +        } catch (NullPointerException success) {}
978 +    }
979 +
980 +    /**
981 +     * consume eventually stops processing published items if cancelled
982 +     */
983 +    public void testCancelledConsume() {
984 +        AtomicInteger count = new AtomicInteger();
985 +        SubmissionPublisher<Integer> p = basicPublisher();
986 +        CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
987 +        f.cancel(true);
988 +        int n = 1000000; // arbitrary limit
989 +        for (int i = 1; i <= n; ++i)
990 +            p.submit(i);
991 +        assertTrue(count.get() < n);
992 +    }
993 +
994   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines