--- jsr166/src/test/tck/SubmissionPublisherTest.java 2017/11/26 21:37:56 1.23 +++ jsr166/src/test/tck/SubmissionPublisherTest.java 2018/10/24 21:15:49 1.29 @@ -6,6 +6,7 @@ */ import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.Flow; @@ -333,9 +334,7 @@ public class SubmissionPublisherTest ext TestSubscriber s = new TestSubscriber(); SubmissionPublisher p = basicPublisher(); s.throwOnCall = true; - try { - p.subscribe(s); - } catch (Exception ok) {} + p.subscribe(s); s.awaitError(); assertEquals(0, s.nexts); assertEquals(1, s.errors); @@ -403,7 +402,7 @@ public class SubmissionPublisherTest ext */ public void testCancel() { SubmissionPublisher p = - new SubmissionPublisher(basicExecutor, 4); // must be < 20 + new SubmissionPublisher<>(basicExecutor, 4); // must be < 20 TestSubscriber s1 = new TestSubscriber(); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s1); @@ -978,4 +977,38 @@ public class SubmissionPublisherTest ext assertTrue(count.get() < n); } + /** + * Tests scenario for + * JDK-8187947: A race condition in SubmissionPublisher + * cvs update -D '2017-11-25' src/main/java/util/concurrent/SubmissionPublisher.java && ant -Djsr166.expensiveTests=true -Djsr166.tckTestClass=SubmissionPublisherTest -Djsr166.methodFilter=testMissedSignal tck; cvs update -A src/main/java/util/concurrent/SubmissionPublisher.java + */ + public void testMissedSignal_8187947() throws Exception { + if (!atLeastJava9()) return; // backport to jdk8 too hard + final int N = + ((ForkJoinPool.getCommonPoolParallelism() < 2) // JDK-8212899 + ? (1 << 5) + : (1 << 10)) + * (expensiveTests ? (1 << 10) : 1); + final CountDownLatch finished = new CountDownLatch(1); + final SubmissionPublisher pub = new SubmissionPublisher<>(); + class Sub implements Subscriber { + int received; + public void onSubscribe(Subscription s) { + s.request(N); + } + public void onNext(Boolean item) { + if (++received == N) + finished.countDown(); + else + CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE)); + } + public void onError(Throwable t) { throw new AssertionError(t); } + public void onComplete() {} + } + pub.subscribe(new Sub()); + checkTimedGet( + CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE)), + null); + await(finished); + } }