--- jsr166/src/test/tck/SubmissionPublisherTest.java 2015/09/07 20:33:41 1.4 +++ jsr166/src/test/tck/SubmissionPublisherTest.java 2017/12/11 00:16:40 1.25 @@ -5,29 +5,21 @@ * http://creativecommons.org/publicdomain/zero/1.0/ */ -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; - +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.Flow; -import static java.util.concurrent.Flow.Publisher; -import static java.util.concurrent.Flow.Subscriber; -import static java.util.concurrent.Flow.Subscription; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.SubmissionPublisher; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiConsumer; -import java.util.function.BiPredicate; -import java.util.function.BiFunction; - import junit.framework.Test; import junit.framework.TestSuite; +import static java.util.concurrent.Flow.Subscriber; +import static java.util.concurrent.Flow.Subscription; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + public class SubmissionPublisherTest extends JSR166TestCase { public static void main(String[] args) { @@ -37,25 +29,10 @@ public class SubmissionPublisherTest ext return new TestSuite(SubmissionPublisherTest.class); } - // Factory for single thread pool in case commonPool parallelism is zero - static final class DaemonThreadFactory implements ThreadFactory { - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setDaemon(true); - return t; - } - } - - static final Executor basicExecutor = - (ForkJoinPool.getCommonPoolParallelism() > 0) ? - ForkJoinPool.commonPool() : - new ThreadPoolExecutor(1, 1, 60, SECONDS, - new LinkedBlockingQueue(), - new DaemonThreadFactory()); + final Executor basicExecutor = basicPublisher().getExecutor(); static SubmissionPublisher basicPublisher() { - return new SubmissionPublisher(basicExecutor, - Flow.defaultBufferSize()); + return new SubmissionPublisher(); } static class SPException extends RuntimeException {} @@ -153,27 +130,31 @@ public class SubmissionPublisherTest ext */ void checkInitialState(SubmissionPublisher p) { assertFalse(p.hasSubscribers()); - assertEquals(p.getNumberOfSubscribers(), 0); + assertEquals(0, p.getNumberOfSubscribers()); assertTrue(p.getSubscribers().isEmpty()); assertFalse(p.isClosed()); assertNull(p.getClosedException()); int n = p.getMaxBufferCapacity(); assertTrue((n & (n - 1)) == 0); // power of two assertNotNull(p.getExecutor()); - assertEquals(p.estimateMinimumDemand(), 0); - assertEquals(p.estimateMaximumLag(), 0); + assertEquals(0, p.estimateMinimumDemand()); + assertEquals(0, p.estimateMaximumLag()); } /** * A default-constructed SubmissionPublisher has no subscribers, * is not closed, has default buffer size, and uses the - * ForkJoinPool.commonPool executor + * defaultExecutor */ public void testConstructor1() { - SubmissionPublisher p = new SubmissionPublisher(); + SubmissionPublisher p = new SubmissionPublisher<>(); checkInitialState(p); - assertSame(p.getExecutor(), ForkJoinPool.commonPool()); assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize()); + Executor e = p.getExecutor(), c = ForkJoinPool.commonPool(); + if (ForkJoinPool.getCommonPoolParallelism() > 1) + assertSame(e, c); + else + assertNotSame(e, c); } /** @@ -182,14 +163,15 @@ public class SubmissionPublisherTest ext */ public void testConstructor2() { Executor e = Executors.newFixedThreadPool(1); - SubmissionPublisher p = new SubmissionPublisher(e, 8); + SubmissionPublisher p = new SubmissionPublisher<>(e, 8); checkInitialState(p); assertSame(p.getExecutor(), e); - assertEquals(p.getMaxBufferCapacity(), 8); + assertEquals(8, p.getMaxBufferCapacity()); } /** - * A null Executor argument to SubmissionPublisher constructor throws NPE + * A null Executor argument to SubmissionPublisher constructor + * throws NullPointerException */ public void testConstructor3() { try { @@ -200,7 +182,7 @@ public class SubmissionPublisherTest ext /** * A negative capacity argument to SubmissionPublisher constructor - * throws IAE + * throws IllegalArgumentException */ public void testConstructor4() { Executor e = Executors.newFixedThreadPool(1); @@ -212,8 +194,9 @@ public class SubmissionPublisherTest ext /** * A closed publisher reports isClosed with no closedException and - * throws ISE upon attempted submission; a subsequent close or - * closeExceptionally has no additional effect. + * throws IllegalStateException upon attempted submission; a + * subsequent close or closeExceptionally has no additional + * effect. */ public void testClose() { SubmissionPublisher p = basicPublisher(); @@ -233,9 +216,9 @@ public class SubmissionPublisherTest ext /** * A publisher closedExceptionally reports isClosed with the - * closedException and throws ISE upon attempted submission; a - * subsequent close or closeExceptionally has no additional - * effect. + * closedException and throws IllegalStateException upon attempted + * submission; a subsequent close or closeExceptionally has no + * additional effect. */ public void testCloseExceptionally() { SubmissionPublisher p = basicPublisher(); @@ -264,27 +247,28 @@ public class SubmissionPublisherTest ext SubmissionPublisher p = basicPublisher(); p.subscribe(s); assertTrue(p.hasSubscribers()); - assertEquals(p.getNumberOfSubscribers(), 1); + assertEquals(1, p.getNumberOfSubscribers()); assertTrue(p.getSubscribers().contains(s)); assertTrue(p.isSubscribed(s)); s.awaitSubscribe(); assertNotNull(s.sn); - assertEquals(s.nexts, 0); - assertEquals(s.errors, 0); - assertEquals(s.completes, 0); + assertEquals(0, s.nexts); + assertEquals(0, s.errors); + assertEquals(0, s.completes); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s2); assertTrue(p.hasSubscribers()); - assertEquals(p.getNumberOfSubscribers(), 2); + assertEquals(2, p.getNumberOfSubscribers()); assertTrue(p.getSubscribers().contains(s)); assertTrue(p.getSubscribers().contains(s2)); assertTrue(p.isSubscribed(s)); assertTrue(p.isSubscribed(s2)); s2.awaitSubscribe(); assertNotNull(s2.sn); - assertEquals(s2.nexts, 0); - assertEquals(s2.errors, 0); - assertEquals(s2.completes, 0); + assertEquals(0, s2.nexts); + assertEquals(0, s2.errors); + assertEquals(0, s2.completes); + p.close(); } /** @@ -297,9 +281,9 @@ public class SubmissionPublisherTest ext p.close(); p.subscribe(s); s.awaitComplete(); - assertEquals(s.nexts, 0); - assertEquals(s.errors, 0); - assertEquals(s.completes, 1); + assertEquals(0, s.nexts); + assertEquals(0, s.errors); + assertEquals(1, s.completes, 1); } /** @@ -315,8 +299,8 @@ public class SubmissionPublisherTest ext assertSame(p.getClosedException(), ex); p.subscribe(s); s.awaitError(); - assertEquals(s.nexts, 0); - assertEquals(s.errors, 1); + assertEquals(0, s.nexts); + assertEquals(1, s.errors); } /** @@ -328,18 +312,18 @@ public class SubmissionPublisherTest ext SubmissionPublisher p = basicPublisher(); p.subscribe(s); assertTrue(p.hasSubscribers()); - assertEquals(p.getNumberOfSubscribers(), 1); + assertEquals(1, p.getNumberOfSubscribers()); assertTrue(p.getSubscribers().contains(s)); assertTrue(p.isSubscribed(s)); s.awaitSubscribe(); assertNotNull(s.sn); - assertEquals(s.nexts, 0); - assertEquals(s.errors, 0); - assertEquals(s.completes, 0); + assertEquals(0, s.nexts); + assertEquals(0, s.errors); + assertEquals(0, s.completes); p.subscribe(s); s.awaitError(); - assertEquals(s.nexts, 0); - assertEquals(s.errors, 1); + assertEquals(0, s.nexts); + assertEquals(1, s.errors); assertFalse(p.isSubscribed(s)); } @@ -354,13 +338,13 @@ public class SubmissionPublisherTest ext p.subscribe(s); } catch (Exception ok) {} s.awaitError(); - assertEquals(s.nexts, 0); - assertEquals(s.errors, 1); - assertEquals(s.completes, 0); + assertEquals(0, s.nexts); + assertEquals(1, s.errors); + assertEquals(0, s.completes); } /** - * subscribe(null) thows NPE + * subscribe(null) throws NPE */ public void testSubscribe6() { SubmissionPublisher p = basicPublisher(); @@ -385,15 +369,16 @@ public class SubmissionPublisherTest ext assertTrue(p.isClosed()); assertNull(p.getClosedException()); s1.awaitComplete(); - assertEquals(s1.nexts, 1); - assertEquals(s1.completes, 1); + assertEquals(1, s1.nexts); + assertEquals(1, s1.completes); s2.awaitComplete(); - assertEquals(s2.nexts, 1); - assertEquals(s2.completes, 1); + assertEquals(1, s2.nexts); + assertEquals(1, s2.completes); } /** * Closing a publisher exceptionally causes onError to subscribers + * after they are subscribed */ public void testCloseExceptionallyError() { SubmissionPublisher p = basicPublisher(); @@ -404,19 +389,22 @@ public class SubmissionPublisherTest ext p.submit(1); p.closeExceptionally(new SPException()); assertTrue(p.isClosed()); + s1.awaitSubscribe(); s1.awaitError(); assertTrue(s1.nexts <= 1); - assertEquals(s1.errors, 1); + assertEquals(1, s1.errors); + s2.awaitSubscribe(); s2.awaitError(); assertTrue(s2.nexts <= 1); - assertEquals(s2.errors, 1); + assertEquals(1, s2.errors); } /** * Cancelling a subscription eventually causes no more onNexts to be issued */ public void testCancel() { - SubmissionPublisher p = basicPublisher(); + SubmissionPublisher p = + new SubmissionPublisher(basicExecutor, 4); // must be < 20 TestSubscriber s1 = new TestSubscriber(); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s1); @@ -428,8 +416,8 @@ public class SubmissionPublisherTest ext p.submit(i); p.close(); s2.awaitComplete(); - assertEquals(s2.nexts, 20); - assertEquals(s2.completes, 1); + assertEquals(20, s2.nexts); + assertEquals(1, s2.completes); assertTrue(s1.nexts < 20); assertFalse(p.isSubscribed(s1)); } @@ -449,20 +437,19 @@ public class SubmissionPublisherTest ext p.submit(2); p.close(); s2.awaitComplete(); - assertEquals(s2.nexts, 2); + assertEquals(2, s2.nexts); s1.awaitComplete(); - assertEquals(s1.errors, 1); + assertEquals(1, s1.errors); } /** - * If a handler is supplied in conctructor, it is invoked when + * If a handler is supplied in constructor, it is invoked when * subscriber throws an exception in onNext */ public void testThrowOnNextHandler() { AtomicInteger calls = new AtomicInteger(); - SubmissionPublisher p = new SubmissionPublisher - (basicExecutor, 8, - (s, e) -> calls.getAndIncrement()); + SubmissionPublisher p = new SubmissionPublisher<>( + basicExecutor, 8, (s, e) -> calls.getAndIncrement()); TestSubscriber s1 = new TestSubscriber(); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s1); @@ -473,11 +460,11 @@ public class SubmissionPublisherTest ext p.submit(2); p.close(); s2.awaitComplete(); - assertEquals(s2.nexts, 2); - assertEquals(s2.completes, 1); + assertEquals(2, s2.nexts); + assertEquals(1, s2.completes); s1.awaitError(); - assertEquals(s1.errors, 1); - assertEquals(calls.get(), 1); + assertEquals(1, s1.errors); + assertEquals(1, calls.get()); } /** @@ -494,10 +481,10 @@ public class SubmissionPublisherTest ext p.close(); s2.awaitComplete(); s1.awaitComplete(); - assertEquals(s2.nexts, 20); - assertEquals(s2.completes, 1); - assertEquals(s1.nexts, 20); - assertEquals(s1.completes, 1); + assertEquals(20, s2.nexts); + assertEquals(1, s2.completes); + assertEquals(20, s1.nexts); + assertEquals(1, s1.completes); } /** @@ -509,22 +496,22 @@ public class SubmissionPublisherTest ext s1.request = false; p.subscribe(s1); s1.awaitSubscribe(); - assertTrue(p.estimateMinimumDemand() == 0); + assertEquals(0, p.estimateMinimumDemand()); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s2); p.submit(1); p.submit(2); s2.awaitNext(1); - assertEquals(s1.nexts, 0); + assertEquals(0, s1.nexts); s1.sn.request(3); p.submit(3); p.close(); s2.awaitComplete(); - assertEquals(s2.nexts, 3); - assertEquals(s2.completes, 1); + assertEquals(3, s2.nexts); + assertEquals(1, s2.completes); s1.awaitComplete(); assertTrue(s1.nexts > 0); - assertEquals(s1.completes, 1); + assertEquals(1, s1.completes); } /** @@ -543,38 +530,45 @@ public class SubmissionPublisherTest ext p.submit(2); p.close(); s2.awaitComplete(); - assertEquals(s2.nexts, 2); - assertEquals(s2.completes, 1); + assertEquals(2, s2.nexts); + assertEquals(1, s2.completes); s1.awaitNext(1); - assertEquals(s1.nexts, 1); + assertEquals(1, s1.nexts); } /** - * Negative request causes error + * Non-positive request causes error */ public void testRequest3() { SubmissionPublisher p = basicPublisher(); TestSubscriber s1 = new TestSubscriber(); TestSubscriber s2 = new TestSubscriber(); + TestSubscriber s3 = new TestSubscriber(); p.subscribe(s1); p.subscribe(s2); + p.subscribe(s3); + s3.awaitSubscribe(); s2.awaitSubscribe(); s1.awaitSubscribe(); s1.sn.request(-1L); + s3.sn.request(0L); p.submit(1); p.submit(2); p.close(); s2.awaitComplete(); - assertEquals(s2.nexts, 2); - assertEquals(s2.completes, 1); + assertEquals(2, s2.nexts); + assertEquals(1, s2.completes); s1.awaitError(); - assertEquals(s1.errors, 1); + assertEquals(1, s1.errors); assertTrue(s1.lastError instanceof IllegalArgumentException); + s3.awaitError(); + assertEquals(1, s3.errors); + assertTrue(s3.lastError instanceof IllegalArgumentException); } /** * estimateMinimumDemand reports 0 until request, nonzero after - * request, and zero again after delivery + * request */ public void testEstimateMinimumDemand() { TestSubscriber s = new TestSubscriber(); @@ -582,12 +576,9 @@ public class SubmissionPublisherTest ext s.request = false; p.subscribe(s); s.awaitSubscribe(); - assertEquals(p.estimateMinimumDemand(), 0); + assertEquals(0, p.estimateMinimumDemand()); s.sn.request(1); - assertEquals(p.estimateMinimumDemand(), 1); - p.submit(1); - s.awaitNext(1); - assertEquals(p.estimateMinimumDemand(), 0); + assertEquals(1, p.estimateMinimumDemand()); } /** @@ -595,7 +586,7 @@ public class SubmissionPublisherTest ext */ public void testEmptySubmit() { SubmissionPublisher p = basicPublisher(); - assertEquals(p.submit(1), 0); + assertEquals(0, p.submit(1)); } /** @@ -623,7 +614,7 @@ public class SubmissionPublisherTest ext p.subscribe(s2); s2.awaitSubscribe(); s1.awaitSubscribe(); - assertEquals(p.submit(1), 1); + assertEquals(1, p.submit(1)); assertTrue(p.estimateMaximumLag() >= 1); assertTrue(p.submit(2) >= 2); assertTrue(p.estimateMaximumLag() >= 2); @@ -634,32 +625,31 @@ public class SubmissionPublisherTest ext p.submit(4); p.close(); s2.awaitComplete(); - assertEquals(s2.nexts, 4); + assertEquals(4, s2.nexts); s1.awaitComplete(); - assertEquals(s2.nexts, 4); + assertEquals(4, s2.nexts); } /** * submit eventually issues requested items when buffer capacity is 1 */ public void testCap1Submit() { - SubmissionPublisher p = new SubmissionPublisher( - basicExecutor, 1); + SubmissionPublisher p + = new SubmissionPublisher<>(basicExecutor, 1); TestSubscriber s1 = new TestSubscriber(); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s1); p.subscribe(s2); for (int i = 1; i <= 20; ++i) { - assertTrue(p.estimateMinimumDemand() <= 1); assertTrue(p.submit(i) >= 0); } p.close(); s2.awaitComplete(); s1.awaitComplete(); - assertEquals(s2.nexts, 20); - assertEquals(s2.completes, 1); - assertEquals(s1.nexts, 20); - assertEquals(s1.completes, 1); + assertEquals(20, s2.nexts); + assertEquals(1, s2.completes); + assertEquals(20, s1.nexts); + assertEquals(1, s1.completes); } static boolean noopHandle(AtomicInteger count) { @@ -713,17 +703,17 @@ public class SubmissionPublisherTest ext p.offer(4, null); p.close(); s2.awaitComplete(); - assertEquals(s2.nexts, 4); + assertEquals(4, s2.nexts); s1.awaitComplete(); - assertEquals(s2.nexts, 4); + assertEquals(4, s2.nexts); } /** * offer reports drops if saturated */ public void testDroppedOffer() { - SubmissionPublisher p = new SubmissionPublisher( - basicExecutor, 4); + SubmissionPublisher p + = new SubmissionPublisher<>(basicExecutor, 4); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); @@ -751,8 +741,8 @@ public class SubmissionPublisherTest ext */ public void testHandledDroppedOffer() { AtomicInteger calls = new AtomicInteger(); - SubmissionPublisher p = new SubmissionPublisher( - basicExecutor, 4); + SubmissionPublisher p + = new SubmissionPublisher<>(basicExecutor, 4); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); @@ -774,14 +764,13 @@ public class SubmissionPublisherTest ext assertTrue(calls.get() >= 4); } - /** * offer succeeds if drop handler forces request */ public void testRecoveredHandledDroppedOffer() { AtomicInteger calls = new AtomicInteger(); - SubmissionPublisher p = new SubmissionPublisher( - basicExecutor, 4); + SubmissionPublisher p + = new SubmissionPublisher<>(basicExecutor, 4); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); @@ -798,17 +787,18 @@ public class SubmissionPublisherTest ext p.close(); s2.awaitComplete(); s1.awaitComplete(); - assertEquals(s1.nexts + s2.nexts, n); + assertEquals(n, s1.nexts + s2.nexts); assertTrue(calls.get() >= 2); } - /** * Timed offer to a publisher with no subscribers returns lag 0 */ public void testEmptyTimedOffer() { SubmissionPublisher p = basicPublisher(); + long startTime = System.nanoTime(); assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null)); + assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2); } /** @@ -816,14 +806,16 @@ public class SubmissionPublisherTest ext */ public void testNullTimedOffer() { SubmissionPublisher p = basicPublisher(); + long startTime = System.nanoTime(); try { - p.offer(null, SHORT_DELAY_MS, MILLISECONDS, null); + p.offer(null, LONG_DELAY_MS, MILLISECONDS, null); shouldThrow(); } catch (NullPointerException success) {} try { - p.offer(1, SHORT_DELAY_MS, null, null); + p.offer(1, LONG_DELAY_MS, null, null); shouldThrow(); } catch (NullPointerException success) {} + assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2); } /** @@ -839,25 +831,27 @@ public class SubmissionPublisherTest ext p.subscribe(s2); s2.awaitSubscribe(); s1.awaitSubscribe(); - assertTrue(p.offer(1, SHORT_DELAY_MS, MILLISECONDS, null) >= 1); - assertTrue(p.offer(2, SHORT_DELAY_MS, MILLISECONDS, null) >= 2); + long startTime = System.nanoTime(); + assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1); + assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2); s1.sn.request(4); - assertTrue(p.offer(3, SHORT_DELAY_MS, MILLISECONDS, null) >= 3); + assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3); s2.sn.request(4); - p.offer(4, SHORT_DELAY_MS, MILLISECONDS, null); + p.offer(4, LONG_DELAY_MS, MILLISECONDS, null); p.close(); s2.awaitComplete(); - assertEquals(s2.nexts, 4); + assertEquals(4, s2.nexts); s1.awaitComplete(); - assertEquals(s2.nexts, 4); + assertEquals(4, s2.nexts); + assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2); } /** * Timed offer reports drops if saturated */ public void testDroppedTimedOffer() { - SubmissionPublisher p = new SubmissionPublisher( - basicExecutor, 4); + SubmissionPublisher p + = new SubmissionPublisher<>(basicExecutor, 4); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); @@ -866,12 +860,15 @@ public class SubmissionPublisherTest ext p.subscribe(s2); s2.awaitSubscribe(); s1.awaitSubscribe(); + long delay = timeoutMillis(); for (int i = 1; i <= 4; ++i) - assertTrue(p.offer(i, SHORT_DELAY_MS, MILLISECONDS, null) >= 0); - p.offer(5, SHORT_DELAY_MS, MILLISECONDS, null); - assertTrue(p.offer(6, SHORT_DELAY_MS, MILLISECONDS, null) < 0); + assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0); + long startTime = System.nanoTime(); + assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0); s1.sn.request(64); - assertTrue(p.offer(7, SHORT_DELAY_MS, MILLISECONDS, null) < 0); + assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0); + // 2 * delay should elapse but check only 1 * delay to allow timer slop + assertTrue(millisElapsedSince(startTime) >= delay); s2.sn.request(64); p.close(); s2.awaitComplete(); @@ -885,8 +882,8 @@ public class SubmissionPublisherTest ext */ public void testHandledDroppedTimedOffer() { AtomicInteger calls = new AtomicInteger(); - SubmissionPublisher p = new SubmissionPublisher( - basicExecutor, 4); + SubmissionPublisher p + = new SubmissionPublisher<>(basicExecutor, 4); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); @@ -895,12 +892,14 @@ public class SubmissionPublisherTest ext p.subscribe(s2); s2.awaitSubscribe(); s1.awaitSubscribe(); + long delay = timeoutMillis(); for (int i = 1; i <= 4; ++i) - assertTrue(p.offer(i, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0); - p.offer(5, (s, x) -> noopHandle(calls)); - assertTrue(p.offer(6, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0); + assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0); + long startTime = System.nanoTime(); + assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0); s1.sn.request(64); - assertTrue(p.offer(7, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0); + assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0); + assertTrue(millisElapsedSince(startTime) >= delay); s2.sn.request(64); p.close(); s2.awaitComplete(); @@ -913,8 +912,8 @@ public class SubmissionPublisherTest ext */ public void testRecoveredHandledDroppedTimedOffer() { AtomicInteger calls = new AtomicInteger(); - SubmissionPublisher p = new SubmissionPublisher( - basicExecutor, 4); + SubmissionPublisher p + = new SubmissionPublisher<>(basicExecutor, 4); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); @@ -924,16 +923,88 @@ public class SubmissionPublisherTest ext s2.awaitSubscribe(); s1.awaitSubscribe(); int n = 0; - for (int i = 1; i <= 8; ++i) { - int d = p.offer(i, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> reqHandle(calls, s)); + long delay = timeoutMillis(); + long startTime = System.nanoTime(); + for (int i = 1; i <= 6; ++i) { + int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s)); n = n + 2 + (d < 0 ? d : 0); } + assertTrue(millisElapsedSince(startTime) >= delay); p.close(); s2.awaitComplete(); s1.awaitComplete(); - assertEquals(s1.nexts + s2.nexts, n); + assertEquals(n, s1.nexts + s2.nexts); assertTrue(calls.get() >= 2); } + /** + * consume returns a CompletableFuture that is done when + * publisher completes + */ + public void testConsume() { + AtomicInteger sum = new AtomicInteger(); + SubmissionPublisher p = basicPublisher(); + CompletableFuture f = + p.consume((Integer x) -> sum.getAndAdd(x.intValue())); + int n = 20; + for (int i = 1; i <= n; ++i) + p.submit(i); + p.close(); + f.join(); + assertEquals((n * (n + 1)) / 2, sum.get()); + } + + /** + * consume(null) throws NPE + */ + public void testConsumeNPE() { + SubmissionPublisher p = basicPublisher(); + try { + CompletableFuture f = p.consume(null); + shouldThrow(); + } catch (NullPointerException success) {} + } + + /** + * consume eventually stops processing published items if cancelled + */ + public void testCancelledConsume() { + AtomicInteger count = new AtomicInteger(); + SubmissionPublisher p = basicPublisher(); + CompletableFuture f = p.consume(x -> count.getAndIncrement()); + f.cancel(true); + int n = 1000000; // arbitrary limit + for (int i = 1; i <= n; ++i) + p.submit(i); + assertTrue(count.get() < n); + } + /** + * Tests scenario for + * JDK-8187947: A race condition in SubmissionPublisher + * cvs update -D '2017-11-25' src/main/java/util/concurrent/SubmissionPublisher.java && ant -Djsr166.expensiveTests=true -Djsr166.tckTestClass=SubmissionPublisherTest -Djsr166.methodFilter=testMissedSignal tck; cvs update -A src/main/java/util/concurrent/SubmissionPublisher.java + */ + public void testMissedSignal_8187947() throws Exception { + if (!atLeastJava9()) return; // backport to jdk8 too hard + final int N = expensiveTests ? (1 << 20) : (1 << 10); + final CountDownLatch finished = new CountDownLatch(1); + final SubmissionPublisher pub = new SubmissionPublisher<>(); + class Sub implements Subscriber { + int received; + public void onSubscribe(Subscription s) { + s.request(N); + } + public void onNext(Boolean item) { + if (++received == N) + finished.countDown(); + else + CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE)); + } + public void onError(Throwable t) { throw new AssertionError(t); } + public void onComplete() {} + } + pub.subscribe(new Sub()); + CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE)); + await(finished); + } }