--- jsr166/src/test/tck/SubmissionPublisherTest.java 2015/09/08 19:44:10 1.9 +++ jsr166/src/test/tck/SubmissionPublisherTest.java 2017/03/09 00:11:16 1.19 @@ -5,29 +5,20 @@ * http://creativecommons.org/publicdomain/zero/1.0/ */ -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; - +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.Flow; -import static java.util.concurrent.Flow.Publisher; -import static java.util.concurrent.Flow.Subscriber; -import static java.util.concurrent.Flow.Subscription; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.SubmissionPublisher; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiConsumer; -import java.util.function.BiPredicate; -import java.util.function.BiFunction; - import junit.framework.Test; import junit.framework.TestSuite; +import static java.util.concurrent.Flow.Subscriber; +import static java.util.concurrent.Flow.Subscription; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + public class SubmissionPublisherTest extends JSR166TestCase { public static void main(String[] args) { @@ -37,25 +28,10 @@ public class SubmissionPublisherTest ext return new TestSuite(SubmissionPublisherTest.class); } - // Factory for single thread pool in case commonPool parallelism is zero - static final class DaemonThreadFactory implements ThreadFactory { - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setDaemon(true); - return t; - } - } - - static final Executor basicExecutor = - (ForkJoinPool.getCommonPoolParallelism() > 1) ? - ForkJoinPool.commonPool() : - new ThreadPoolExecutor(1, 1, 60, SECONDS, - new LinkedBlockingQueue(), - new DaemonThreadFactory()); + final Executor basicExecutor = basicPublisher().getExecutor(); static SubmissionPublisher basicPublisher() { - return new SubmissionPublisher(basicExecutor, - Flow.defaultBufferSize()); + return new SubmissionPublisher(); } static class SPException extends RuntimeException {} @@ -167,13 +143,17 @@ public class SubmissionPublisherTest ext /** * A default-constructed SubmissionPublisher has no subscribers, * is not closed, has default buffer size, and uses the - * ForkJoinPool.commonPool executor + * defaultExecutor */ public void testConstructor1() { - SubmissionPublisher p = new SubmissionPublisher(); + SubmissionPublisher p = new SubmissionPublisher<>(); checkInitialState(p); - assertSame(p.getExecutor(), ForkJoinPool.commonPool()); assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize()); + Executor e = p.getExecutor(), c = ForkJoinPool.commonPool(); + if (ForkJoinPool.getCommonPoolParallelism() > 1) + assertSame(e, c); + else + assertNotSame(e, c); } /** @@ -182,7 +162,7 @@ public class SubmissionPublisherTest ext */ public void testConstructor2() { Executor e = Executors.newFixedThreadPool(1); - SubmissionPublisher p = new SubmissionPublisher(e, 8); + SubmissionPublisher p = new SubmissionPublisher<>(e, 8); checkInitialState(p); assertSame(p.getExecutor(), e); assertEquals(8, p.getMaxBufferCapacity()); @@ -395,6 +375,7 @@ public class SubmissionPublisherTest ext /** * Closing a publisher exceptionally causes onError to subscribers + * after they are subscribed */ public void testCloseExceptionallyError() { SubmissionPublisher p = basicPublisher(); @@ -405,9 +386,11 @@ public class SubmissionPublisherTest ext p.submit(1); p.closeExceptionally(new SPException()); assertTrue(p.isClosed()); + s1.awaitSubscribe(); s1.awaitError(); assertTrue(s1.nexts <= 1); assertEquals(1, s1.errors); + s2.awaitSubscribe(); s2.awaitError(); assertTrue(s2.nexts <= 1); assertEquals(1, s2.errors); @@ -461,9 +444,8 @@ public class SubmissionPublisherTest ext */ public void testThrowOnNextHandler() { AtomicInteger calls = new AtomicInteger(); - SubmissionPublisher p = new SubmissionPublisher - (basicExecutor, 8, - (s, e) -> calls.getAndIncrement()); + SubmissionPublisher p = new SubmissionPublisher<>( + basicExecutor, 8, (s, e) -> calls.getAndIncrement()); TestSubscriber s1 = new TestSubscriber(); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s1); @@ -551,17 +533,21 @@ public class SubmissionPublisherTest ext } /** - * Negative request causes error + * Non-positive request causes error */ public void testRequest3() { SubmissionPublisher p = basicPublisher(); TestSubscriber s1 = new TestSubscriber(); TestSubscriber s2 = new TestSubscriber(); + TestSubscriber s3 = new TestSubscriber(); p.subscribe(s1); p.subscribe(s2); + p.subscribe(s3); + s3.awaitSubscribe(); s2.awaitSubscribe(); s1.awaitSubscribe(); s1.sn.request(-1L); + s3.sn.request(0L); p.submit(1); p.submit(2); p.close(); @@ -571,6 +557,9 @@ public class SubmissionPublisherTest ext s1.awaitError(); assertEquals(1, s1.errors); assertTrue(s1.lastError instanceof IllegalArgumentException); + s3.awaitError(); + assertEquals(1, s3.errors); + assertTrue(s3.lastError instanceof IllegalArgumentException); } /** @@ -644,8 +633,8 @@ public class SubmissionPublisherTest ext * submit eventually issues requested items when buffer capacity is 1 */ public void testCap1Submit() { - SubmissionPublisher p = new SubmissionPublisher( - basicExecutor, 1); + SubmissionPublisher p + = new SubmissionPublisher<>(basicExecutor, 1); TestSubscriber s1 = new TestSubscriber(); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s1); @@ -723,8 +712,8 @@ public class SubmissionPublisherTest ext * offer reports drops if saturated */ public void testDroppedOffer() { - SubmissionPublisher p = new SubmissionPublisher( - basicExecutor, 4); + SubmissionPublisher p + = new SubmissionPublisher<>(basicExecutor, 4); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); @@ -752,8 +741,8 @@ public class SubmissionPublisherTest ext */ public void testHandledDroppedOffer() { AtomicInteger calls = new AtomicInteger(); - SubmissionPublisher p = new SubmissionPublisher( - basicExecutor, 4); + SubmissionPublisher p + = new SubmissionPublisher<>(basicExecutor, 4); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); @@ -780,8 +769,8 @@ public class SubmissionPublisherTest ext */ public void testRecoveredHandledDroppedOffer() { AtomicInteger calls = new AtomicInteger(); - SubmissionPublisher p = new SubmissionPublisher( - basicExecutor, 4); + SubmissionPublisher p + = new SubmissionPublisher<>(basicExecutor, 4); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); @@ -861,8 +850,8 @@ public class SubmissionPublisherTest ext * Timed offer reports drops if saturated */ public void testDroppedTimedOffer() { - SubmissionPublisher p = new SubmissionPublisher( - basicExecutor, 4); + SubmissionPublisher p + = new SubmissionPublisher<>(basicExecutor, 4); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); @@ -893,8 +882,8 @@ public class SubmissionPublisherTest ext */ public void testHandledDroppedTimedOffer() { AtomicInteger calls = new AtomicInteger(); - SubmissionPublisher p = new SubmissionPublisher( - basicExecutor, 4); + SubmissionPublisher p + = new SubmissionPublisher<>(basicExecutor, 4); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); @@ -923,8 +912,8 @@ public class SubmissionPublisherTest ext */ public void testRecoveredHandledDroppedTimedOffer() { AtomicInteger calls = new AtomicInteger(); - SubmissionPublisher p = new SubmissionPublisher( - basicExecutor, 4); + SubmissionPublisher p + = new SubmissionPublisher<>(basicExecutor, 4); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); @@ -948,4 +937,46 @@ public class SubmissionPublisherTest ext assertTrue(calls.get() >= 2); } + /** + * consume returns a CompletableFuture that is done when + * publisher completes + */ + public void testConsume() { + AtomicInteger sum = new AtomicInteger(); + SubmissionPublisher p = basicPublisher(); + CompletableFuture f = + p.consume((Integer x) -> sum.getAndAdd(x.intValue())); + int n = 20; + for (int i = 1; i <= n; ++i) + p.submit(i); + p.close(); + f.join(); + assertEquals((n * (n + 1)) / 2, sum.get()); + } + + /** + * consume(null) throws NPE + */ + public void testConsumeNPE() { + SubmissionPublisher p = basicPublisher(); + try { + CompletableFuture f = p.consume(null); + shouldThrow(); + } catch (NullPointerException success) {} + } + + /** + * consume eventually stops processing published items if cancelled + */ + public void testCancelledConsume() { + AtomicInteger count = new AtomicInteger(); + SubmissionPublisher p = basicPublisher(); + CompletableFuture f = p.consume(x -> count.getAndIncrement()); + f.cancel(true); + int n = 1000000; // arbitrary limit + for (int i = 1; i <= n; ++i) + p.submit(i); + assertTrue(count.get() < n); + } + }