ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SubmissionPublisherTest.java
(Generate patch)

Comparing jsr166/src/test/tck/SubmissionPublisherTest.java (file contents):
Revision 1.8 by jsr166, Mon Sep 7 21:36:03 2015 UTC vs.
Revision 1.10 by dl, Sat Sep 12 11:25:15 2015 UTC

# Line 5 | Line 5
5   * http://creativecommons.org/publicdomain/zero/1.0/
6   */
7  
8 < import static java.util.concurrent.TimeUnit.MILLISECONDS;
9 < import static java.util.concurrent.TimeUnit.SECONDS;
10 <
8 > import java.util.concurrent.CompletableFuture;
9   import java.util.concurrent.Executor;
10   import java.util.concurrent.Executors;
11   import java.util.concurrent.Flow;
14 import static java.util.concurrent.Flow.Publisher;
15 import static java.util.concurrent.Flow.Subscriber;
16 import static java.util.concurrent.Flow.Subscription;
17 import java.util.concurrent.LinkedBlockingQueue;
12   import java.util.concurrent.ForkJoinPool;
13 + import java.util.concurrent.LinkedBlockingQueue;
14   import java.util.concurrent.SubmissionPublisher;
15   import java.util.concurrent.ThreadFactory;
16   import java.util.concurrent.ThreadPoolExecutor;
17   import java.util.concurrent.TimeUnit;
18   import java.util.concurrent.atomic.AtomicInteger;
19   import java.util.function.BiConsumer;
25 import java.util.function.BiPredicate;
20   import java.util.function.BiFunction;
21 <
21 > import java.util.function.BiPredicate;
22 > import java.util.stream.Stream;
23   import junit.framework.Test;
24   import junit.framework.TestSuite;
25  
26 + import static java.util.concurrent.Flow.Publisher;
27 + import static java.util.concurrent.Flow.Subscriber;
28 + import static java.util.concurrent.Flow.Subscription;
29 + import static java.util.concurrent.TimeUnit.MILLISECONDS;
30 + import static java.util.concurrent.TimeUnit.SECONDS;
31 +
32   public class SubmissionPublisherTest extends JSR166TestCase {
33  
34      public static void main(String[] args) {
# Line 47 | Line 48 | public class SubmissionPublisherTest ext
48      }
49  
50      static final Executor basicExecutor =
51 <        (ForkJoinPool.getCommonPoolParallelism() > 0) ?
51 >        (ForkJoinPool.getCommonPoolParallelism() > 1) ?
52          ForkJoinPool.commonPool() :
53          new ThreadPoolExecutor(1, 1, 60, SECONDS,
54                                 new LinkedBlockingQueue<Runnable>(),
# Line 285 | Line 286 | public class SubmissionPublisherTest ext
286          assertEquals(0, s2.nexts);
287          assertEquals(0, s2.errors);
288          assertEquals(0, s2.completes);
289 +        p.close();
290      }
291  
292      /**
# Line 870 | Line 872 | public class SubmissionPublisherTest ext
872          p.subscribe(s2);
873          s2.awaitSubscribe();
874          s1.awaitSubscribe();
875 +        long delay = timeoutMillis();
876          for (int i = 1; i <= 4; ++i)
877 <            assertTrue(p.offer(i, SHORT_DELAY_MS, MILLISECONDS, null) >= 0);
878 <        p.offer(5, SHORT_DELAY_MS, MILLISECONDS, null);
879 <        assertTrue(p.offer(6, SHORT_DELAY_MS, MILLISECONDS, null) < 0);
877 >            assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0);
878 >        long startTime = System.nanoTime();
879 >        assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0);
880          s1.sn.request(64);
881 <        assertTrue(p.offer(7, SHORT_DELAY_MS, MILLISECONDS, null) < 0);
881 >        assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0);
882 >        // 2 * delay should elapse but check only 1 * delay to allow timer slop
883 >        assertTrue(millisElapsedSince(startTime) >= delay);
884          s2.sn.request(64);
885          p.close();
886          s2.awaitComplete();
# Line 899 | Line 904 | public class SubmissionPublisherTest ext
904          p.subscribe(s2);
905          s2.awaitSubscribe();
906          s1.awaitSubscribe();
907 +        long delay = timeoutMillis();
908          for (int i = 1; i <= 4; ++i)
909 <            assertTrue(p.offer(i, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
910 <        p.offer(5, (s, x) -> noopHandle(calls));
911 <        assertTrue(p.offer(6, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
909 >            assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
910 >        long startTime = System.nanoTime();
911 >        assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
912          s1.sn.request(64);
913 <        assertTrue(p.offer(7, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
913 >        assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
914 >        assertTrue(millisElapsedSince(startTime) >= delay);
915          s2.sn.request(64);
916          p.close();
917          s2.awaitComplete();
# Line 928 | Line 935 | public class SubmissionPublisherTest ext
935          s2.awaitSubscribe();
936          s1.awaitSubscribe();
937          int n = 0;
938 <        for (int i = 1; i <= 8; ++i) {
939 <            int d = p.offer(i, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> reqHandle(calls, s));
938 >        long delay = timeoutMillis();
939 >        long startTime = System.nanoTime();
940 >        for (int i = 1; i <= 6; ++i) {
941 >            int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
942              n = n + 2 + (d < 0 ? d : 0);
943          }
944 +        assertTrue(millisElapsedSince(startTime) >= delay);
945          p.close();
946          s2.awaitComplete();
947          s1.awaitComplete();
# Line 939 | Line 949 | public class SubmissionPublisherTest ext
949          assertTrue(calls.get() >= 2);
950      }
951  
952 +    /**
953 +     * consume returns a CompletableFuture that is done when
954 +     * publisher completes
955 +     */
956 +    public void testConsume() {
957 +        AtomicInteger sum = new AtomicInteger();
958 +        SubmissionPublisher<Integer> p = basicPublisher();
959 +        CompletableFuture<Void> f =
960 +            p.consume((Integer x) -> { sum.getAndAdd(x.intValue()); });
961 +        int n = 20;
962 +        for (int i = 1; i <= n; ++i)
963 +            p.submit(i);
964 +        p.close();
965 +        f.join();
966 +        assertEquals((n * (n + 1)) / 2, sum.get());
967 +    }
968 +
969 +    /**
970 +     * consume(null) throws NPE
971 +     */
972 +    public void testConsumeNPE() {
973 +        SubmissionPublisher<Integer> p = basicPublisher();
974 +        try {
975 +            CompletableFuture<Void> f = p.consume(null);
976 +            shouldThrow();
977 +        } catch(NullPointerException success) {
978 +        }
979 +    }
980 +
981 +    /**
982 +     * consume eventually stops processing published items if cancelled
983 +     */
984 +    public void testCancelledConsume() {
985 +        AtomicInteger count = new AtomicInteger();
986 +        SubmissionPublisher<Integer> p = basicPublisher();
987 +        CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
988 +        f.cancel(true);
989 +        int n = 1000000; // arbitrary limit
990 +        for (int i = 1; i <= n; ++i)
991 +            p.submit(i);
992 +        assertTrue(count.get() < n);
993 +    }
994 +    
995   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines