--- jsr166/src/test/tck/SubmissionPublisherTest.java 2017/03/09 00:11:16 1.19 +++ jsr166/src/test/tck/SubmissionPublisherTest.java 2017/11/27 01:19:51 1.24 @@ -6,6 +6,7 @@ */ import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.Flow; @@ -169,7 +170,8 @@ public class SubmissionPublisherTest ext } /** - * A null Executor argument to SubmissionPublisher constructor throws NPE + * A null Executor argument to SubmissionPublisher constructor + * throws NullPointerException */ public void testConstructor3() { try { @@ -180,7 +182,7 @@ public class SubmissionPublisherTest ext /** * A negative capacity argument to SubmissionPublisher constructor - * throws IAE + * throws IllegalArgumentException */ public void testConstructor4() { Executor e = Executors.newFixedThreadPool(1); @@ -192,8 +194,9 @@ public class SubmissionPublisherTest ext /** * A closed publisher reports isClosed with no closedException and - * throws ISE upon attempted submission; a subsequent close or - * closeExceptionally has no additional effect. + * throws IllegalStateException upon attempted submission; a + * subsequent close or closeExceptionally has no additional + * effect. */ public void testClose() { SubmissionPublisher p = basicPublisher(); @@ -213,9 +216,9 @@ public class SubmissionPublisherTest ext /** * A publisher closedExceptionally reports isClosed with the - * closedException and throws ISE upon attempted submission; a - * subsequent close or closeExceptionally has no additional - * effect. + * closedException and throws IllegalStateException upon attempted + * submission; a subsequent close or closeExceptionally has no + * additional effect. */ public void testCloseExceptionally() { SubmissionPublisher p = basicPublisher(); @@ -400,7 +403,8 @@ public class SubmissionPublisherTest ext * Cancelling a subscription eventually causes no more onNexts to be issued */ public void testCancel() { - SubmissionPublisher p = basicPublisher(); + SubmissionPublisher p = + new SubmissionPublisher(basicExecutor, 4); // must be < 20 TestSubscriber s1 = new TestSubscriber(); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s1); @@ -492,7 +496,7 @@ public class SubmissionPublisherTest ext s1.request = false; p.subscribe(s1); s1.awaitSubscribe(); - assertTrue(p.estimateMinimumDemand() == 0); + assertEquals(0, p.estimateMinimumDemand()); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s2); p.submit(1); @@ -564,7 +568,7 @@ public class SubmissionPublisherTest ext /** * estimateMinimumDemand reports 0 until request, nonzero after - * request, and zero again after delivery + * request */ public void testEstimateMinimumDemand() { TestSubscriber s = new TestSubscriber(); @@ -575,9 +579,6 @@ public class SubmissionPublisherTest ext assertEquals(0, p.estimateMinimumDemand()); s.sn.request(1); assertEquals(1, p.estimateMinimumDemand()); - p.submit(1); - s.awaitNext(1); - assertEquals(0, p.estimateMinimumDemand()); } /** @@ -640,7 +641,6 @@ public class SubmissionPublisherTest ext p.subscribe(s1); p.subscribe(s2); for (int i = 1; i <= 20; ++i) { - assertTrue(p.estimateMinimumDemand() <= 1); assertTrue(p.submit(i) >= 0); } p.close(); @@ -979,4 +979,31 @@ public class SubmissionPublisherTest ext assertTrue(count.get() < n); } + /** + * Tests scenario for + * JDK-8187947: A race condition in SubmissionPublisher + * cvs update -D '2017-11-25' src/main/java/util/concurrent/SubmissionPublisher.java && ant -Djsr166.expensiveTests=true -Djsr166.tckTestClass=SubmissionPublisherTest -Djsr166.methodFilter=testMissedSignal tck; cvs update -A src/main/java/util/concurrent/SubmissionPublisher.java + */ + public void testMissedSignal_8187947() throws Exception { + final int N = expensiveTests ? (1 << 20) : (1 << 10); + final CountDownLatch finished = new CountDownLatch(1); + final SubmissionPublisher pub = new SubmissionPublisher<>(); + class Sub implements Subscriber { + int received; + public void onSubscribe(Subscription s) { + s.request(N); + } + public void onNext(Boolean item) { + if (++received == N) + finished.countDown(); + else + CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE)); + } + public void onError(Throwable t) { throw new AssertionError(t); } + public void onComplete() {} + } + pub.subscribe(new Sub()); + CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE)); + await(finished); + } }