--- jsr166/src/test/tck/SubmissionPublisherTest.java 2015/09/12 11:25:15 1.10 +++ jsr166/src/test/tck/SubmissionPublisherTest.java 2018/01/07 22:59:18 1.26 @@ -6,28 +6,19 @@ */ import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.Flow; import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SubmissionPublisher; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import java.util.function.BiPredicate; -import java.util.stream.Stream; import junit.framework.Test; import junit.framework.TestSuite; -import static java.util.concurrent.Flow.Publisher; import static java.util.concurrent.Flow.Subscriber; import static java.util.concurrent.Flow.Subscription; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; public class SubmissionPublisherTest extends JSR166TestCase { @@ -38,25 +29,10 @@ public class SubmissionPublisherTest ext return new TestSuite(SubmissionPublisherTest.class); } - // Factory for single thread pool in case commonPool parallelism is zero - static final class DaemonThreadFactory implements ThreadFactory { - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setDaemon(true); - return t; - } - } - - static final Executor basicExecutor = - (ForkJoinPool.getCommonPoolParallelism() > 1) ? - ForkJoinPool.commonPool() : - new ThreadPoolExecutor(1, 1, 60, SECONDS, - new LinkedBlockingQueue(), - new DaemonThreadFactory()); + final Executor basicExecutor = basicPublisher().getExecutor(); static SubmissionPublisher basicPublisher() { - return new SubmissionPublisher(basicExecutor, - Flow.defaultBufferSize()); + return new SubmissionPublisher(); } static class SPException extends RuntimeException {} @@ -168,13 +144,17 @@ public class SubmissionPublisherTest ext /** * A default-constructed SubmissionPublisher has no subscribers, * is not closed, has default buffer size, and uses the - * ForkJoinPool.commonPool executor + * defaultExecutor */ public void testConstructor1() { - SubmissionPublisher p = new SubmissionPublisher(); + SubmissionPublisher p = new SubmissionPublisher<>(); checkInitialState(p); - assertSame(p.getExecutor(), ForkJoinPool.commonPool()); assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize()); + Executor e = p.getExecutor(), c = ForkJoinPool.commonPool(); + if (ForkJoinPool.getCommonPoolParallelism() > 1) + assertSame(e, c); + else + assertNotSame(e, c); } /** @@ -183,14 +163,15 @@ public class SubmissionPublisherTest ext */ public void testConstructor2() { Executor e = Executors.newFixedThreadPool(1); - SubmissionPublisher p = new SubmissionPublisher(e, 8); + SubmissionPublisher p = new SubmissionPublisher<>(e, 8); checkInitialState(p); assertSame(p.getExecutor(), e); assertEquals(8, p.getMaxBufferCapacity()); } /** - * A null Executor argument to SubmissionPublisher constructor throws NPE + * A null Executor argument to SubmissionPublisher constructor + * throws NullPointerException */ public void testConstructor3() { try { @@ -201,7 +182,7 @@ public class SubmissionPublisherTest ext /** * A negative capacity argument to SubmissionPublisher constructor - * throws IAE + * throws IllegalArgumentException */ public void testConstructor4() { Executor e = Executors.newFixedThreadPool(1); @@ -213,8 +194,9 @@ public class SubmissionPublisherTest ext /** * A closed publisher reports isClosed with no closedException and - * throws ISE upon attempted submission; a subsequent close or - * closeExceptionally has no additional effect. + * throws IllegalStateException upon attempted submission; a + * subsequent close or closeExceptionally has no additional + * effect. */ public void testClose() { SubmissionPublisher p = basicPublisher(); @@ -234,9 +216,9 @@ public class SubmissionPublisherTest ext /** * A publisher closedExceptionally reports isClosed with the - * closedException and throws ISE upon attempted submission; a - * subsequent close or closeExceptionally has no additional - * effect. + * closedException and throws IllegalStateException upon attempted + * submission; a subsequent close or closeExceptionally has no + * additional effect. */ public void testCloseExceptionally() { SubmissionPublisher p = basicPublisher(); @@ -396,6 +378,7 @@ public class SubmissionPublisherTest ext /** * Closing a publisher exceptionally causes onError to subscribers + * after they are subscribed */ public void testCloseExceptionallyError() { SubmissionPublisher p = basicPublisher(); @@ -406,9 +389,11 @@ public class SubmissionPublisherTest ext p.submit(1); p.closeExceptionally(new SPException()); assertTrue(p.isClosed()); + s1.awaitSubscribe(); s1.awaitError(); assertTrue(s1.nexts <= 1); assertEquals(1, s1.errors); + s2.awaitSubscribe(); s2.awaitError(); assertTrue(s2.nexts <= 1); assertEquals(1, s2.errors); @@ -418,7 +403,8 @@ public class SubmissionPublisherTest ext * Cancelling a subscription eventually causes no more onNexts to be issued */ public void testCancel() { - SubmissionPublisher p = basicPublisher(); + SubmissionPublisher p = + new SubmissionPublisher<>(basicExecutor, 4); // must be < 20 TestSubscriber s1 = new TestSubscriber(); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s1); @@ -462,9 +448,8 @@ public class SubmissionPublisherTest ext */ public void testThrowOnNextHandler() { AtomicInteger calls = new AtomicInteger(); - SubmissionPublisher p = new SubmissionPublisher - (basicExecutor, 8, - (s, e) -> calls.getAndIncrement()); + SubmissionPublisher p = new SubmissionPublisher<>( + basicExecutor, 8, (s, e) -> calls.getAndIncrement()); TestSubscriber s1 = new TestSubscriber(); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s1); @@ -511,7 +496,7 @@ public class SubmissionPublisherTest ext s1.request = false; p.subscribe(s1); s1.awaitSubscribe(); - assertTrue(p.estimateMinimumDemand() == 0); + assertEquals(0, p.estimateMinimumDemand()); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s2); p.submit(1); @@ -552,17 +537,21 @@ public class SubmissionPublisherTest ext } /** - * Negative request causes error + * Non-positive request causes error */ public void testRequest3() { SubmissionPublisher p = basicPublisher(); TestSubscriber s1 = new TestSubscriber(); TestSubscriber s2 = new TestSubscriber(); + TestSubscriber s3 = new TestSubscriber(); p.subscribe(s1); p.subscribe(s2); + p.subscribe(s3); + s3.awaitSubscribe(); s2.awaitSubscribe(); s1.awaitSubscribe(); s1.sn.request(-1L); + s3.sn.request(0L); p.submit(1); p.submit(2); p.close(); @@ -572,11 +561,14 @@ public class SubmissionPublisherTest ext s1.awaitError(); assertEquals(1, s1.errors); assertTrue(s1.lastError instanceof IllegalArgumentException); + s3.awaitError(); + assertEquals(1, s3.errors); + assertTrue(s3.lastError instanceof IllegalArgumentException); } /** * estimateMinimumDemand reports 0 until request, nonzero after - * request, and zero again after delivery + * request */ public void testEstimateMinimumDemand() { TestSubscriber s = new TestSubscriber(); @@ -587,9 +579,6 @@ public class SubmissionPublisherTest ext assertEquals(0, p.estimateMinimumDemand()); s.sn.request(1); assertEquals(1, p.estimateMinimumDemand()); - p.submit(1); - s.awaitNext(1); - assertEquals(0, p.estimateMinimumDemand()); } /** @@ -645,14 +634,13 @@ public class SubmissionPublisherTest ext * submit eventually issues requested items when buffer capacity is 1 */ public void testCap1Submit() { - SubmissionPublisher p = new SubmissionPublisher( - basicExecutor, 1); + SubmissionPublisher p + = new SubmissionPublisher<>(basicExecutor, 1); TestSubscriber s1 = new TestSubscriber(); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s1); p.subscribe(s2); for (int i = 1; i <= 20; ++i) { - assertTrue(p.estimateMinimumDemand() <= 1); assertTrue(p.submit(i) >= 0); } p.close(); @@ -724,8 +712,8 @@ public class SubmissionPublisherTest ext * offer reports drops if saturated */ public void testDroppedOffer() { - SubmissionPublisher p = new SubmissionPublisher( - basicExecutor, 4); + SubmissionPublisher p + = new SubmissionPublisher<>(basicExecutor, 4); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); @@ -753,8 +741,8 @@ public class SubmissionPublisherTest ext */ public void testHandledDroppedOffer() { AtomicInteger calls = new AtomicInteger(); - SubmissionPublisher p = new SubmissionPublisher( - basicExecutor, 4); + SubmissionPublisher p + = new SubmissionPublisher<>(basicExecutor, 4); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); @@ -781,8 +769,8 @@ public class SubmissionPublisherTest ext */ public void testRecoveredHandledDroppedOffer() { AtomicInteger calls = new AtomicInteger(); - SubmissionPublisher p = new SubmissionPublisher( - basicExecutor, 4); + SubmissionPublisher p + = new SubmissionPublisher<>(basicExecutor, 4); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); @@ -862,8 +850,8 @@ public class SubmissionPublisherTest ext * Timed offer reports drops if saturated */ public void testDroppedTimedOffer() { - SubmissionPublisher p = new SubmissionPublisher( - basicExecutor, 4); + SubmissionPublisher p + = new SubmissionPublisher<>(basicExecutor, 4); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); @@ -894,8 +882,8 @@ public class SubmissionPublisherTest ext */ public void testHandledDroppedTimedOffer() { AtomicInteger calls = new AtomicInteger(); - SubmissionPublisher p = new SubmissionPublisher( - basicExecutor, 4); + SubmissionPublisher p + = new SubmissionPublisher<>(basicExecutor, 4); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); @@ -924,8 +912,8 @@ public class SubmissionPublisherTest ext */ public void testRecoveredHandledDroppedTimedOffer() { AtomicInteger calls = new AtomicInteger(); - SubmissionPublisher p = new SubmissionPublisher( - basicExecutor, 4); + SubmissionPublisher p + = new SubmissionPublisher<>(basicExecutor, 4); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); @@ -956,8 +944,8 @@ public class SubmissionPublisherTest ext public void testConsume() { AtomicInteger sum = new AtomicInteger(); SubmissionPublisher p = basicPublisher(); - CompletableFuture f = - p.consume((Integer x) -> { sum.getAndAdd(x.intValue()); }); + CompletableFuture f = + p.consume((Integer x) -> sum.getAndAdd(x.intValue())); int n = 20; for (int i = 1; i <= n; ++i) p.submit(i); @@ -974,8 +962,7 @@ public class SubmissionPublisherTest ext try { CompletableFuture f = p.consume(null); shouldThrow(); - } catch(NullPointerException success) { - } + } catch (NullPointerException success) {} } /** @@ -991,5 +978,33 @@ public class SubmissionPublisherTest ext p.submit(i); assertTrue(count.get() < n); } - + + /** + * Tests scenario for + * JDK-8187947: A race condition in SubmissionPublisher + * cvs update -D '2017-11-25' src/main/java/util/concurrent/SubmissionPublisher.java && ant -Djsr166.expensiveTests=true -Djsr166.tckTestClass=SubmissionPublisherTest -Djsr166.methodFilter=testMissedSignal tck; cvs update -A src/main/java/util/concurrent/SubmissionPublisher.java + */ + public void testMissedSignal_8187947() throws Exception { + if (!atLeastJava9()) return; // backport to jdk8 too hard + final int N = expensiveTests ? (1 << 20) : (1 << 10); + final CountDownLatch finished = new CountDownLatch(1); + final SubmissionPublisher pub = new SubmissionPublisher<>(); + class Sub implements Subscriber { + int received; + public void onSubscribe(Subscription s) { + s.request(N); + } + public void onNext(Boolean item) { + if (++received == N) + finished.countDown(); + else + CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE)); + } + public void onError(Throwable t) { throw new AssertionError(t); } + public void onComplete() {} + } + pub.subscribe(new Sub()); + CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE)); + await(finished); + } }