ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SubmissionPublisherTest.java
(Generate patch)

Comparing jsr166/src/test/tck/SubmissionPublisherTest.java (file contents):
Revision 1.9 by dl, Tue Sep 8 19:44:10 2015 UTC vs.
Revision 1.10 by dl, Sat Sep 12 11:25:15 2015 UTC

# Line 5 | Line 5
5   * http://creativecommons.org/publicdomain/zero/1.0/
6   */
7  
8 < import static java.util.concurrent.TimeUnit.MILLISECONDS;
9 < import static java.util.concurrent.TimeUnit.SECONDS;
10 <
8 > import java.util.concurrent.CompletableFuture;
9   import java.util.concurrent.Executor;
10   import java.util.concurrent.Executors;
11   import java.util.concurrent.Flow;
14 import static java.util.concurrent.Flow.Publisher;
15 import static java.util.concurrent.Flow.Subscriber;
16 import static java.util.concurrent.Flow.Subscription;
17 import java.util.concurrent.LinkedBlockingQueue;
12   import java.util.concurrent.ForkJoinPool;
13 + import java.util.concurrent.LinkedBlockingQueue;
14   import java.util.concurrent.SubmissionPublisher;
15   import java.util.concurrent.ThreadFactory;
16   import java.util.concurrent.ThreadPoolExecutor;
17   import java.util.concurrent.TimeUnit;
18   import java.util.concurrent.atomic.AtomicInteger;
19   import java.util.function.BiConsumer;
25 import java.util.function.BiPredicate;
20   import java.util.function.BiFunction;
21 <
21 > import java.util.function.BiPredicate;
22 > import java.util.stream.Stream;
23   import junit.framework.Test;
24   import junit.framework.TestSuite;
25  
26 + import static java.util.concurrent.Flow.Publisher;
27 + import static java.util.concurrent.Flow.Subscriber;
28 + import static java.util.concurrent.Flow.Subscription;
29 + import static java.util.concurrent.TimeUnit.MILLISECONDS;
30 + import static java.util.concurrent.TimeUnit.SECONDS;
31 +
32   public class SubmissionPublisherTest extends JSR166TestCase {
33  
34      public static void main(String[] args) {
# Line 948 | Line 949 | public class SubmissionPublisherTest ext
949          assertTrue(calls.get() >= 2);
950      }
951  
952 +    /**
953 +     * consume returns a CompletableFuture that is done when
954 +     * publisher completes
955 +     */
956 +    public void testConsume() {
957 +        AtomicInteger sum = new AtomicInteger();
958 +        SubmissionPublisher<Integer> p = basicPublisher();
959 +        CompletableFuture<Void> f =
960 +            p.consume((Integer x) -> { sum.getAndAdd(x.intValue()); });
961 +        int n = 20;
962 +        for (int i = 1; i <= n; ++i)
963 +            p.submit(i);
964 +        p.close();
965 +        f.join();
966 +        assertEquals((n * (n + 1)) / 2, sum.get());
967 +    }
968 +
969 +    /**
970 +     * consume(null) throws NPE
971 +     */
972 +    public void testConsumeNPE() {
973 +        SubmissionPublisher<Integer> p = basicPublisher();
974 +        try {
975 +            CompletableFuture<Void> f = p.consume(null);
976 +            shouldThrow();
977 +        } catch(NullPointerException success) {
978 +        }
979 +    }
980 +
981 +    /**
982 +     * consume eventually stops processing published items if cancelled
983 +     */
984 +    public void testCancelledConsume() {
985 +        AtomicInteger count = new AtomicInteger();
986 +        SubmissionPublisher<Integer> p = basicPublisher();
987 +        CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
988 +        f.cancel(true);
989 +        int n = 1000000; // arbitrary limit
990 +        for (int i = 1; i <= n; ++i)
991 +            p.submit(i);
992 +        assertTrue(count.get() < n);
993 +    }
994 +    
995   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines