--- jsr166/src/test/tck/SubmissionPublisherTest.java 2015/09/07 21:36:03 1.8 +++ jsr166/src/test/tck/SubmissionPublisherTest.java 2015/09/12 18:19:57 1.13 @@ -5,29 +5,30 @@ * http://creativecommons.org/publicdomain/zero/1.0/ */ -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; - +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.Flow; -import static java.util.concurrent.Flow.Publisher; -import static java.util.concurrent.Flow.Subscriber; -import static java.util.concurrent.Flow.Subscription; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SubmissionPublisher; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; -import java.util.function.BiPredicate; import java.util.function.BiFunction; - +import java.util.function.BiPredicate; +import java.util.stream.Stream; import junit.framework.Test; import junit.framework.TestSuite; +import static java.util.concurrent.Flow.Publisher; +import static java.util.concurrent.Flow.Subscriber; +import static java.util.concurrent.Flow.Subscription; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + public class SubmissionPublisherTest extends JSR166TestCase { public static void main(String[] args) { @@ -37,25 +38,10 @@ public class SubmissionPublisherTest ext return new TestSuite(SubmissionPublisherTest.class); } - // Factory for single thread pool in case commonPool parallelism is zero - static final class DaemonThreadFactory implements ThreadFactory { - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setDaemon(true); - return t; - } - } - - static final Executor basicExecutor = - (ForkJoinPool.getCommonPoolParallelism() > 0) ? - ForkJoinPool.commonPool() : - new ThreadPoolExecutor(1, 1, 60, SECONDS, - new LinkedBlockingQueue(), - new DaemonThreadFactory()); + final Executor basicExecutor = basicPublisher().getExecutor(); static SubmissionPublisher basicPublisher() { - return new SubmissionPublisher(basicExecutor, - Flow.defaultBufferSize()); + return new SubmissionPublisher(); } static class SPException extends RuntimeException {} @@ -167,13 +153,17 @@ public class SubmissionPublisherTest ext /** * A default-constructed SubmissionPublisher has no subscribers, * is not closed, has default buffer size, and uses the - * ForkJoinPool.commonPool executor + * defaultExecutor */ public void testConstructor1() { SubmissionPublisher p = new SubmissionPublisher(); checkInitialState(p); - assertSame(p.getExecutor(), ForkJoinPool.commonPool()); assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize()); + Executor e = p.getExecutor(), c = ForkJoinPool.commonPool(); + if (ForkJoinPool.getCommonPoolParallelism() > 1) + assertSame(e, c); + else + assertNotSame(e, c); } /** @@ -285,6 +275,7 @@ public class SubmissionPublisherTest ext assertEquals(0, s2.nexts); assertEquals(0, s2.errors); assertEquals(0, s2.completes); + p.close(); } /** @@ -870,12 +861,15 @@ public class SubmissionPublisherTest ext p.subscribe(s2); s2.awaitSubscribe(); s1.awaitSubscribe(); + long delay = timeoutMillis(); for (int i = 1; i <= 4; ++i) - assertTrue(p.offer(i, SHORT_DELAY_MS, MILLISECONDS, null) >= 0); - p.offer(5, SHORT_DELAY_MS, MILLISECONDS, null); - assertTrue(p.offer(6, SHORT_DELAY_MS, MILLISECONDS, null) < 0); + assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0); + long startTime = System.nanoTime(); + assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0); s1.sn.request(64); - assertTrue(p.offer(7, SHORT_DELAY_MS, MILLISECONDS, null) < 0); + assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0); + // 2 * delay should elapse but check only 1 * delay to allow timer slop + assertTrue(millisElapsedSince(startTime) >= delay); s2.sn.request(64); p.close(); s2.awaitComplete(); @@ -899,12 +893,14 @@ public class SubmissionPublisherTest ext p.subscribe(s2); s2.awaitSubscribe(); s1.awaitSubscribe(); + long delay = timeoutMillis(); for (int i = 1; i <= 4; ++i) - assertTrue(p.offer(i, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0); - p.offer(5, (s, x) -> noopHandle(calls)); - assertTrue(p.offer(6, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0); + assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0); + long startTime = System.nanoTime(); + assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0); s1.sn.request(64); - assertTrue(p.offer(7, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0); + assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0); + assertTrue(millisElapsedSince(startTime) >= delay); s2.sn.request(64); p.close(); s2.awaitComplete(); @@ -928,10 +924,13 @@ public class SubmissionPublisherTest ext s2.awaitSubscribe(); s1.awaitSubscribe(); int n = 0; - for (int i = 1; i <= 8; ++i) { - int d = p.offer(i, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> reqHandle(calls, s)); + long delay = timeoutMillis(); + long startTime = System.nanoTime(); + for (int i = 1; i <= 6; ++i) { + int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s)); n = n + 2 + (d < 0 ? d : 0); } + assertTrue(millisElapsedSince(startTime) >= delay); p.close(); s2.awaitComplete(); s1.awaitComplete(); @@ -939,4 +938,46 @@ public class SubmissionPublisherTest ext assertTrue(calls.get() >= 2); } + /** + * consume returns a CompletableFuture that is done when + * publisher completes + */ + public void testConsume() { + AtomicInteger sum = new AtomicInteger(); + SubmissionPublisher p = basicPublisher(); + CompletableFuture f = + p.consume((Integer x) -> { sum.getAndAdd(x.intValue()); }); + int n = 20; + for (int i = 1; i <= n; ++i) + p.submit(i); + p.close(); + f.join(); + assertEquals((n * (n + 1)) / 2, sum.get()); + } + + /** + * consume(null) throws NPE + */ + public void testConsumeNPE() { + SubmissionPublisher p = basicPublisher(); + try { + CompletableFuture f = p.consume(null); + shouldThrow(); + } catch (NullPointerException success) {} + } + + /** + * consume eventually stops processing published items if cancelled + */ + public void testCancelledConsume() { + AtomicInteger count = new AtomicInteger(); + SubmissionPublisher p = basicPublisher(); + CompletableFuture f = p.consume(x -> count.getAndIncrement()); + f.cancel(true); + int n = 1000000; // arbitrary limit + for (int i = 1; i <= n; ++i) + p.submit(i); + assertTrue(count.get() < n); + } + }