--- jsr166/src/test/tck/SubmissionPublisherTest.java 2015/09/12 11:25:15 1.10 +++ jsr166/src/test/tck/SubmissionPublisherTest.java 2016/12/11 22:11:45 1.16 @@ -10,24 +10,14 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.Flow; import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SubmissionPublisher; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import java.util.function.BiPredicate; -import java.util.stream.Stream; import junit.framework.Test; import junit.framework.TestSuite; -import static java.util.concurrent.Flow.Publisher; import static java.util.concurrent.Flow.Subscriber; import static java.util.concurrent.Flow.Subscription; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; public class SubmissionPublisherTest extends JSR166TestCase { @@ -38,25 +28,10 @@ public class SubmissionPublisherTest ext return new TestSuite(SubmissionPublisherTest.class); } - // Factory for single thread pool in case commonPool parallelism is zero - static final class DaemonThreadFactory implements ThreadFactory { - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setDaemon(true); - return t; - } - } - - static final Executor basicExecutor = - (ForkJoinPool.getCommonPoolParallelism() > 1) ? - ForkJoinPool.commonPool() : - new ThreadPoolExecutor(1, 1, 60, SECONDS, - new LinkedBlockingQueue(), - new DaemonThreadFactory()); + final Executor basicExecutor = basicPublisher().getExecutor(); static SubmissionPublisher basicPublisher() { - return new SubmissionPublisher(basicExecutor, - Flow.defaultBufferSize()); + return new SubmissionPublisher(); } static class SPException extends RuntimeException {} @@ -168,13 +143,17 @@ public class SubmissionPublisherTest ext /** * A default-constructed SubmissionPublisher has no subscribers, * is not closed, has default buffer size, and uses the - * ForkJoinPool.commonPool executor + * defaultExecutor */ public void testConstructor1() { SubmissionPublisher p = new SubmissionPublisher(); checkInitialState(p); - assertSame(p.getExecutor(), ForkJoinPool.commonPool()); assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize()); + Executor e = p.getExecutor(), c = ForkJoinPool.commonPool(); + if (ForkJoinPool.getCommonPoolParallelism() > 1) + assertSame(e, c); + else + assertNotSame(e, c); } /** @@ -396,6 +375,7 @@ public class SubmissionPublisherTest ext /** * Closing a publisher exceptionally causes onError to subscribers + * after they are subscribed */ public void testCloseExceptionallyError() { SubmissionPublisher p = basicPublisher(); @@ -406,9 +386,11 @@ public class SubmissionPublisherTest ext p.submit(1); p.closeExceptionally(new SPException()); assertTrue(p.isClosed()); + s1.awaitSubscribe(); s1.awaitError(); assertTrue(s1.nexts <= 1); assertEquals(1, s1.errors); + s1.awaitSubscribe(); s2.awaitError(); assertTrue(s2.nexts <= 1); assertEquals(1, s2.errors); @@ -956,8 +938,8 @@ public class SubmissionPublisherTest ext public void testConsume() { AtomicInteger sum = new AtomicInteger(); SubmissionPublisher p = basicPublisher(); - CompletableFuture f = - p.consume((Integer x) -> { sum.getAndAdd(x.intValue()); }); + CompletableFuture f = + p.consume((Integer x) -> sum.getAndAdd(x.intValue())); int n = 20; for (int i = 1; i <= n; ++i) p.submit(i); @@ -974,8 +956,7 @@ public class SubmissionPublisherTest ext try { CompletableFuture f = p.consume(null); shouldThrow(); - } catch(NullPointerException success) { - } + } catch (NullPointerException success) {} } /** @@ -991,5 +972,5 @@ public class SubmissionPublisherTest ext p.submit(i); assertTrue(count.get() < n); } - + }