--- jsr166/src/test/tck/SubmissionPublisherTest.java 2015/09/07 17:14:06 1.1 +++ jsr166/src/test/tck/SubmissionPublisherTest.java 2017/01/04 06:09:58 1.18 @@ -5,29 +5,20 @@ * http://creativecommons.org/publicdomain/zero/1.0/ */ -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; - +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.Flow; -import static java.util.concurrent.Flow.Publisher; -import static java.util.concurrent.Flow.Subscriber; -import static java.util.concurrent.Flow.Subscription; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.SubmissionPublisher; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiConsumer; -import java.util.function.BiPredicate; -import java.util.function.BiFunction; - import junit.framework.Test; import junit.framework.TestSuite; +import static java.util.concurrent.Flow.Subscriber; +import static java.util.concurrent.Flow.Subscription; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + public class SubmissionPublisherTest extends JSR166TestCase { public static void main(String[] args) { @@ -37,27 +28,12 @@ public class SubmissionPublisherTest ext return new TestSuite(SubmissionPublisherTest.class); } - // Factory for single thread pool in case commonPool parallelism is zero - static final class DaemonThreadFactory implements ThreadFactory { - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setDaemon(true); - return t; - } - } - - static final Executor basicExecutor = - (ForkJoinPool.getCommonPoolParallelism() > 0) ? - ForkJoinPool.commonPool() : - new ThreadPoolExecutor(1, 1, 60, SECONDS, - new LinkedBlockingQueue(), - new DaemonThreadFactory()); - + final Executor basicExecutor = basicPublisher().getExecutor(); + static SubmissionPublisher basicPublisher() { - return new SubmissionPublisher(basicExecutor, - Flow.defaultBufferSize()); + return new SubmissionPublisher(); } - + static class SPException extends RuntimeException {} class TestSubscriber implements Subscriber { @@ -153,27 +129,31 @@ public class SubmissionPublisherTest ext */ void checkInitialState(SubmissionPublisher p) { assertFalse(p.hasSubscribers()); - assertEquals(p.getNumberOfSubscribers(), 0); + assertEquals(0, p.getNumberOfSubscribers()); assertTrue(p.getSubscribers().isEmpty()); assertFalse(p.isClosed()); assertNull(p.getClosedException()); int n = p.getMaxBufferCapacity(); assertTrue((n & (n - 1)) == 0); // power of two assertNotNull(p.getExecutor()); - assertEquals(p.estimateMinimumDemand(), 0); - assertEquals(p.estimateMaximumLag(), 0); + assertEquals(0, p.estimateMinimumDemand()); + assertEquals(0, p.estimateMaximumLag()); } /** * A default-constructed SubmissionPublisher has no subscribers, * is not closed, has default buffer size, and uses the - * ForkJoinPool.commonPool executor + * defaultExecutor */ public void testConstructor1() { - SubmissionPublisher p = new SubmissionPublisher(); + SubmissionPublisher p = new SubmissionPublisher<>(); checkInitialState(p); - assertSame(p.getExecutor(), ForkJoinPool.commonPool()); assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize()); + Executor e = p.getExecutor(), c = ForkJoinPool.commonPool(); + if (ForkJoinPool.getCommonPoolParallelism() > 1) + assertSame(e, c); + else + assertNotSame(e, c); } /** @@ -182,10 +162,10 @@ public class SubmissionPublisherTest ext */ public void testConstructor2() { Executor e = Executors.newFixedThreadPool(1); - SubmissionPublisher p = new SubmissionPublisher(e, 8); + SubmissionPublisher p = new SubmissionPublisher<>(e, 8); checkInitialState(p); assertSame(p.getExecutor(), e); - assertEquals(p.getMaxBufferCapacity(), 8); + assertEquals(8, p.getMaxBufferCapacity()); } /** @@ -193,10 +173,9 @@ public class SubmissionPublisherTest ext */ public void testConstructor3() { try { - SubmissionPublisher p = new SubmissionPublisher(null, 8); + new SubmissionPublisher(null, 8); shouldThrow(); - } catch (NullPointerException success) { - } + } catch (NullPointerException success) {} } /** @@ -206,10 +185,9 @@ public class SubmissionPublisherTest ext public void testConstructor4() { Executor e = Executors.newFixedThreadPool(1); try { - SubmissionPublisher p = new SubmissionPublisher(e, -1); + new SubmissionPublisher(e, -1); shouldThrow(); - } catch (IllegalArgumentException success) { - } + } catch (IllegalArgumentException success) {} } /** @@ -226,9 +204,7 @@ public class SubmissionPublisherTest ext try { p.submit(1); shouldThrow(); - } - catch(IllegalStateException success) { - } + } catch (IllegalStateException success) {} Throwable ex = new SPException(); p.closeExceptionally(ex); assertTrue(p.isClosed()); @@ -251,9 +227,7 @@ public class SubmissionPublisherTest ext try { p.submit(1); shouldThrow(); - } - catch(IllegalStateException success) { - } + } catch (IllegalStateException success) {} p.close(); assertTrue(p.isClosed()); assertSame(p.getClosedException(), ex); @@ -270,27 +244,28 @@ public class SubmissionPublisherTest ext SubmissionPublisher p = basicPublisher(); p.subscribe(s); assertTrue(p.hasSubscribers()); - assertEquals(p.getNumberOfSubscribers(), 1); + assertEquals(1, p.getNumberOfSubscribers()); assertTrue(p.getSubscribers().contains(s)); assertTrue(p.isSubscribed(s)); s.awaitSubscribe(); assertNotNull(s.sn); - assertEquals(s.nexts, 0); - assertEquals(s.errors, 0); - assertEquals(s.completes, 0); + assertEquals(0, s.nexts); + assertEquals(0, s.errors); + assertEquals(0, s.completes); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s2); assertTrue(p.hasSubscribers()); - assertEquals(p.getNumberOfSubscribers(), 2); + assertEquals(2, p.getNumberOfSubscribers()); assertTrue(p.getSubscribers().contains(s)); assertTrue(p.getSubscribers().contains(s2)); assertTrue(p.isSubscribed(s)); assertTrue(p.isSubscribed(s2)); s2.awaitSubscribe(); assertNotNull(s2.sn); - assertEquals(s2.nexts, 0); - assertEquals(s2.errors, 0); - assertEquals(s2.completes, 0); + assertEquals(0, s2.nexts); + assertEquals(0, s2.errors); + assertEquals(0, s2.completes); + p.close(); } /** @@ -303,9 +278,9 @@ public class SubmissionPublisherTest ext p.close(); p.subscribe(s); s.awaitComplete(); - assertEquals(s.nexts, 0); - assertEquals(s.errors, 0); - assertEquals(s.completes, 1); + assertEquals(0, s.nexts); + assertEquals(0, s.errors); + assertEquals(1, s.completes, 1); } /** @@ -321,8 +296,8 @@ public class SubmissionPublisherTest ext assertSame(p.getClosedException(), ex); p.subscribe(s); s.awaitError(); - assertEquals(s.nexts, 0); - assertEquals(s.errors, 1); + assertEquals(0, s.nexts); + assertEquals(1, s.errors); } /** @@ -334,18 +309,18 @@ public class SubmissionPublisherTest ext SubmissionPublisher p = basicPublisher(); p.subscribe(s); assertTrue(p.hasSubscribers()); - assertEquals(p.getNumberOfSubscribers(), 1); + assertEquals(1, p.getNumberOfSubscribers()); assertTrue(p.getSubscribers().contains(s)); assertTrue(p.isSubscribed(s)); s.awaitSubscribe(); assertNotNull(s.sn); - assertEquals(s.nexts, 0); - assertEquals(s.errors, 0); - assertEquals(s.completes, 0); + assertEquals(0, s.nexts); + assertEquals(0, s.errors); + assertEquals(0, s.completes); p.subscribe(s); s.awaitError(); - assertEquals(s.nexts, 0); - assertEquals(s.errors, 1); + assertEquals(0, s.nexts); + assertEquals(1, s.errors); assertFalse(p.isSubscribed(s)); } @@ -358,23 +333,22 @@ public class SubmissionPublisherTest ext s.throwOnCall = true; try { p.subscribe(s); - } catch(Exception ok) { - } + } catch (Exception ok) {} s.awaitError(); - assertEquals(s.nexts, 0); - assertEquals(s.errors, 1); - assertEquals(s.completes, 0); + assertEquals(0, s.nexts); + assertEquals(1, s.errors); + assertEquals(0, s.completes); } /** - * subscribe(null) thows NPE + * subscribe(null) throws NPE */ public void testSubscribe6() { SubmissionPublisher p = basicPublisher(); try { p.subscribe(null); - } catch(NullPointerException success) { - } + shouldThrow(); + } catch (NullPointerException success) {} checkInitialState(p); } @@ -392,15 +366,16 @@ public class SubmissionPublisherTest ext assertTrue(p.isClosed()); assertNull(p.getClosedException()); s1.awaitComplete(); - assertEquals(s1.nexts, 1); - assertEquals(s1.completes, 1); + assertEquals(1, s1.nexts); + assertEquals(1, s1.completes); s2.awaitComplete(); - assertEquals(s2.nexts, 1); - assertEquals(s2.completes, 1); + assertEquals(1, s2.nexts); + assertEquals(1, s2.completes); } /** * Closing a publisher exceptionally causes onError to subscribers + * after they are subscribed */ public void testCloseExceptionallyError() { SubmissionPublisher p = basicPublisher(); @@ -411,12 +386,14 @@ public class SubmissionPublisherTest ext p.submit(1); p.closeExceptionally(new SPException()); assertTrue(p.isClosed()); + s1.awaitSubscribe(); s1.awaitError(); assertTrue(s1.nexts <= 1); - assertEquals(s1.errors, 1); + assertEquals(1, s1.errors); + s2.awaitSubscribe(); s2.awaitError(); assertTrue(s2.nexts <= 1); - assertEquals(s2.errors, 1); + assertEquals(1, s2.errors); } /** @@ -435,8 +412,8 @@ public class SubmissionPublisherTest ext p.submit(i); p.close(); s2.awaitComplete(); - assertEquals(s2.nexts, 20); - assertEquals(s2.completes, 1); + assertEquals(20, s2.nexts); + assertEquals(1, s2.completes); assertTrue(s1.nexts < 20); assertFalse(p.isSubscribed(s1)); } @@ -456,20 +433,19 @@ public class SubmissionPublisherTest ext p.submit(2); p.close(); s2.awaitComplete(); - assertEquals(s2.nexts, 2); + assertEquals(2, s2.nexts); s1.awaitComplete(); - assertEquals(s1.errors, 1); + assertEquals(1, s1.errors); } /** - * If a handler is supplied in conctructor, it is invoked when + * If a handler is supplied in constructor, it is invoked when * subscriber throws an exception in onNext */ public void testThrowOnNextHandler() { AtomicInteger calls = new AtomicInteger(); - SubmissionPublisher p = new SubmissionPublisher - (basicExecutor, 8, - (s, e) -> calls.getAndIncrement()); + SubmissionPublisher p = new SubmissionPublisher<>( + basicExecutor, 8, (s, e) -> calls.getAndIncrement()); TestSubscriber s1 = new TestSubscriber(); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s1); @@ -480,11 +456,11 @@ public class SubmissionPublisherTest ext p.submit(2); p.close(); s2.awaitComplete(); - assertEquals(s2.nexts, 2); - assertEquals(s2.completes, 1); + assertEquals(2, s2.nexts); + assertEquals(1, s2.completes); s1.awaitError(); - assertEquals(s1.errors, 1); - assertEquals(calls.get(), 1); + assertEquals(1, s1.errors); + assertEquals(1, calls.get()); } /** @@ -501,10 +477,10 @@ public class SubmissionPublisherTest ext p.close(); s2.awaitComplete(); s1.awaitComplete(); - assertEquals(s2.nexts, 20); - assertEquals(s2.completes, 1); - assertEquals(s1.nexts, 20); - assertEquals(s1.completes, 1); + assertEquals(20, s2.nexts); + assertEquals(1, s2.completes); + assertEquals(20, s1.nexts); + assertEquals(1, s1.completes); } /** @@ -522,16 +498,16 @@ public class SubmissionPublisherTest ext p.submit(1); p.submit(2); s2.awaitNext(1); - assertEquals(s1.nexts, 0); + assertEquals(0, s1.nexts); s1.sn.request(3); p.submit(3); p.close(); s2.awaitComplete(); - assertEquals(s2.nexts, 3); - assertEquals(s2.completes, 1); + assertEquals(3, s2.nexts); + assertEquals(1, s2.completes); s1.awaitComplete(); assertTrue(s1.nexts > 0); - assertEquals(s1.completes, 1); + assertEquals(1, s1.completes); } /** @@ -550,10 +526,10 @@ public class SubmissionPublisherTest ext p.submit(2); p.close(); s2.awaitComplete(); - assertEquals(s2.nexts, 2); - assertEquals(s2.completes, 1); + assertEquals(2, s2.nexts); + assertEquals(1, s2.completes); s1.awaitNext(1); - assertEquals(s1.nexts, 1); + assertEquals(1, s1.nexts); } /** @@ -572,10 +548,10 @@ public class SubmissionPublisherTest ext p.submit(2); p.close(); s2.awaitComplete(); - assertEquals(s2.nexts, 2); - assertEquals(s2.completes, 1); + assertEquals(2, s2.nexts); + assertEquals(1, s2.completes); s1.awaitError(); - assertEquals(s1.errors, 1); + assertEquals(1, s1.errors); assertTrue(s1.lastError instanceof IllegalArgumentException); } @@ -589,35 +565,35 @@ public class SubmissionPublisherTest ext s.request = false; p.subscribe(s); s.awaitSubscribe(); - assertEquals(p.estimateMinimumDemand(), 0); + assertEquals(0, p.estimateMinimumDemand()); s.sn.request(1); - assertEquals(p.estimateMinimumDemand(), 1); + assertEquals(1, p.estimateMinimumDemand()); p.submit(1); s.awaitNext(1); - assertEquals(p.estimateMinimumDemand(), 0); + assertEquals(0, p.estimateMinimumDemand()); } /** - * Submit to a publisher with no subscribers returns lag 0 + * submit to a publisher with no subscribers returns lag 0 */ public void testEmptySubmit() { SubmissionPublisher p = basicPublisher(); - assertEquals(p.submit(1), 0); + assertEquals(0, p.submit(1)); } /** - * Submit(null) throws NPE + * submit(null) throws NPE */ public void testNullSubmit() { SubmissionPublisher p = basicPublisher(); try { p.submit(null); - } catch (NullPointerException success) { - } + shouldThrow(); + } catch (NullPointerException success) {} } /** - * Submit returns number of lagged items, compatible with result + * submit returns number of lagged items, compatible with result * of estimateMaximumLag. */ public void testLaggedSubmit() { @@ -630,7 +606,7 @@ public class SubmissionPublisherTest ext p.subscribe(s2); s2.awaitSubscribe(); s1.awaitSubscribe(); - assertEquals(p.submit(1), 1); + assertEquals(1, p.submit(1)); assertTrue(p.estimateMaximumLag() >= 1); assertTrue(p.submit(2) >= 2); assertTrue(p.estimateMaximumLag() >= 2); @@ -641,17 +617,17 @@ public class SubmissionPublisherTest ext p.submit(4); p.close(); s2.awaitComplete(); - assertEquals(s2.nexts, 4); + assertEquals(4, s2.nexts); s1.awaitComplete(); - assertEquals(s2.nexts, 4); + assertEquals(4, s2.nexts); } /** * submit eventually issues requested items when buffer capacity is 1 */ public void testCap1Submit() { - SubmissionPublisher p = new SubmissionPublisher( - basicExecutor, 1); + SubmissionPublisher p + = new SubmissionPublisher<>(basicExecutor, 1); TestSubscriber s1 = new TestSubscriber(); TestSubscriber s2 = new TestSubscriber(); p.subscribe(s1); @@ -663,12 +639,12 @@ public class SubmissionPublisherTest ext p.close(); s2.awaitComplete(); s1.awaitComplete(); - assertEquals(s2.nexts, 20); - assertEquals(s2.completes, 1); - assertEquals(s1.nexts, 20); - assertEquals(s1.completes, 1); + assertEquals(20, s2.nexts); + assertEquals(1, s2.completes); + assertEquals(20, s1.nexts); + assertEquals(1, s1.completes); } - + static boolean noopHandle(AtomicInteger count) { count.getAndIncrement(); return false; @@ -681,26 +657,26 @@ public class SubmissionPublisherTest ext } /** - * Offer to a publisher with no subscribers returns lag 0 + * offer to a publisher with no subscribers returns lag 0 */ public void testEmptyOffer() { SubmissionPublisher p = basicPublisher(); - assertEquals(p.offer(1, null), 0); + assertEquals(0, p.offer(1, null)); } /** - * Offer(null) throws NPE + * offer(null) throws NPE */ public void testNullOffer() { SubmissionPublisher p = basicPublisher(); try { p.offer(null, null); - } catch (NullPointerException success) { - } + shouldThrow(); + } catch (NullPointerException success) {} } /** - * Offer returns number of lagged items if not saturated + * offer returns number of lagged items if not saturated */ public void testLaggedOffer() { SubmissionPublisher p = basicPublisher(); @@ -720,17 +696,17 @@ public class SubmissionPublisherTest ext p.offer(4, null); p.close(); s2.awaitComplete(); - assertEquals(s2.nexts, 4); + assertEquals(4, s2.nexts); s1.awaitComplete(); - assertEquals(s2.nexts, 4); + assertEquals(4, s2.nexts); } /** - * Offer reports drops if saturated + * offer reports drops if saturated */ public void testDroppedOffer() { - SubmissionPublisher p = new SubmissionPublisher( - basicExecutor, 4); + SubmissionPublisher p + = new SubmissionPublisher<>(basicExecutor, 4); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); @@ -754,12 +730,12 @@ public class SubmissionPublisherTest ext } /** - * Offer invokes drop handler if saturated + * offer invokes drop handler if saturated */ public void testHandledDroppedOffer() { AtomicInteger calls = new AtomicInteger(); - SubmissionPublisher p = new SubmissionPublisher( - basicExecutor, 4); + SubmissionPublisher p + = new SubmissionPublisher<>(basicExecutor, 4); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); @@ -781,14 +757,13 @@ public class SubmissionPublisherTest ext assertTrue(calls.get() >= 4); } - /** - * Offer succeeds if drop handler forces request + * offer succeeds if drop handler forces request */ public void testRecoveredHandledDroppedOffer() { AtomicInteger calls = new AtomicInteger(); - SubmissionPublisher p = new SubmissionPublisher( - basicExecutor, 4); + SubmissionPublisher p + = new SubmissionPublisher<>(basicExecutor, 4); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); @@ -805,36 +780,39 @@ public class SubmissionPublisherTest ext p.close(); s2.awaitComplete(); s1.awaitComplete(); - assertEquals(s1.nexts + s2.nexts, n); + assertEquals(n, s1.nexts + s2.nexts); assertTrue(calls.get() >= 2); } - /** - * TimedOffer to a publisher with no subscribers returns lag 0 + * Timed offer to a publisher with no subscribers returns lag 0 */ public void testEmptyTimedOffer() { SubmissionPublisher p = basicPublisher(); - assertEquals(p.offer(1, null), 0); + long startTime = System.nanoTime(); + assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null)); + assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2); } /** - * Timed Offer with null item or TimeUnit throws NPE + * Timed offer with null item or TimeUnit throws NPE */ public void testNullTimedOffer() { SubmissionPublisher p = basicPublisher(); + long startTime = System.nanoTime(); try { - p.offer(null, SHORT_DELAY_MS, MILLISECONDS, null); - } catch (NullPointerException success) { - } + p.offer(null, LONG_DELAY_MS, MILLISECONDS, null); + shouldThrow(); + } catch (NullPointerException success) {} try { - p.offer(1, SHORT_DELAY_MS, null, null); - } catch (NullPointerException success) { - } + p.offer(1, LONG_DELAY_MS, null, null); + shouldThrow(); + } catch (NullPointerException success) {} + assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2); } /** - * Timed Offer returns number of lagged items if not saturated + * Timed offer returns number of lagged items if not saturated */ public void testLaggedTimedOffer() { SubmissionPublisher p = basicPublisher(); @@ -846,25 +824,27 @@ public class SubmissionPublisherTest ext p.subscribe(s2); s2.awaitSubscribe(); s1.awaitSubscribe(); - assertTrue(p.offer(1, SHORT_DELAY_MS, MILLISECONDS, null) >= 1); - assertTrue(p.offer(2, SHORT_DELAY_MS, MILLISECONDS, null) >= 2); + long startTime = System.nanoTime(); + assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1); + assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2); s1.sn.request(4); - assertTrue(p.offer(3, SHORT_DELAY_MS, MILLISECONDS, null) >= 3); + assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3); s2.sn.request(4); - p.offer(4, SHORT_DELAY_MS, MILLISECONDS, null); + p.offer(4, LONG_DELAY_MS, MILLISECONDS, null); p.close(); s2.awaitComplete(); - assertEquals(s2.nexts, 4); + assertEquals(4, s2.nexts); s1.awaitComplete(); - assertEquals(s2.nexts, 4); + assertEquals(4, s2.nexts); + assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2); } /** - * Timed Offer reports drops if saturated + * Timed offer reports drops if saturated */ public void testDroppedTimedOffer() { - SubmissionPublisher p = new SubmissionPublisher( - basicExecutor, 4); + SubmissionPublisher p + = new SubmissionPublisher<>(basicExecutor, 4); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); @@ -873,12 +853,15 @@ public class SubmissionPublisherTest ext p.subscribe(s2); s2.awaitSubscribe(); s1.awaitSubscribe(); + long delay = timeoutMillis(); for (int i = 1; i <= 4; ++i) - assertTrue(p.offer(i, SHORT_DELAY_MS, MILLISECONDS, null) >= 0); - p.offer(5, SHORT_DELAY_MS, MILLISECONDS, null); - assertTrue(p.offer(6, SHORT_DELAY_MS, MILLISECONDS, null) < 0); + assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0); + long startTime = System.nanoTime(); + assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0); s1.sn.request(64); - assertTrue(p.offer(7, SHORT_DELAY_MS, MILLISECONDS, null) < 0); + assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0); + // 2 * delay should elapse but check only 1 * delay to allow timer slop + assertTrue(millisElapsedSince(startTime) >= delay); s2.sn.request(64); p.close(); s2.awaitComplete(); @@ -888,12 +871,12 @@ public class SubmissionPublisherTest ext } /** - * Timed Offer invokes drop handler if saturated + * Timed offer invokes drop handler if saturated */ public void testHandledDroppedTimedOffer() { AtomicInteger calls = new AtomicInteger(); - SubmissionPublisher p = new SubmissionPublisher( - basicExecutor, 4); + SubmissionPublisher p + = new SubmissionPublisher<>(basicExecutor, 4); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); @@ -902,12 +885,14 @@ public class SubmissionPublisherTest ext p.subscribe(s2); s2.awaitSubscribe(); s1.awaitSubscribe(); + long delay = timeoutMillis(); for (int i = 1; i <= 4; ++i) - assertTrue(p.offer(i, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0); - p.offer(5, (s, x) -> noopHandle(calls)); - assertTrue(p.offer(6, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0); + assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0); + long startTime = System.nanoTime(); + assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0); s1.sn.request(64); - assertTrue(p.offer(7, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0); + assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0); + assertTrue(millisElapsedSince(startTime) >= delay); s2.sn.request(64); p.close(); s2.awaitComplete(); @@ -916,12 +901,12 @@ public class SubmissionPublisherTest ext } /** - * Timed Offer succeeds if drop handler forces request + * Timed offer succeeds if drop handler forces request */ public void testRecoveredHandledDroppedTimedOffer() { AtomicInteger calls = new AtomicInteger(); - SubmissionPublisher p = new SubmissionPublisher( - basicExecutor, 4); + SubmissionPublisher p + = new SubmissionPublisher<>(basicExecutor, 4); TestSubscriber s1 = new TestSubscriber(); s1.request = false; TestSubscriber s2 = new TestSubscriber(); @@ -931,16 +916,60 @@ public class SubmissionPublisherTest ext s2.awaitSubscribe(); s1.awaitSubscribe(); int n = 0; - for (int i = 1; i <= 8; ++i) { - int d = p.offer(i, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> reqHandle(calls, s)); + long delay = timeoutMillis(); + long startTime = System.nanoTime(); + for (int i = 1; i <= 6; ++i) { + int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s)); n = n + 2 + (d < 0 ? d : 0); } + assertTrue(millisElapsedSince(startTime) >= delay); p.close(); s2.awaitComplete(); s1.awaitComplete(); - assertEquals(s1.nexts + s2.nexts, n); + assertEquals(n, s1.nexts + s2.nexts); assertTrue(calls.get() >= 2); } + /** + * consume returns a CompletableFuture that is done when + * publisher completes + */ + public void testConsume() { + AtomicInteger sum = new AtomicInteger(); + SubmissionPublisher p = basicPublisher(); + CompletableFuture f = + p.consume((Integer x) -> sum.getAndAdd(x.intValue())); + int n = 20; + for (int i = 1; i <= n; ++i) + p.submit(i); + p.close(); + f.join(); + assertEquals((n * (n + 1)) / 2, sum.get()); + } + + /** + * consume(null) throws NPE + */ + public void testConsumeNPE() { + SubmissionPublisher p = basicPublisher(); + try { + CompletableFuture f = p.consume(null); + shouldThrow(); + } catch (NullPointerException success) {} + } + + /** + * consume eventually stops processing published items if cancelled + */ + public void testCancelledConsume() { + AtomicInteger count = new AtomicInteger(); + SubmissionPublisher p = basicPublisher(); + CompletableFuture f = p.consume(x -> count.getAndIncrement()); + f.cancel(true); + int n = 1000000; // arbitrary limit + for (int i = 1; i <= n; ++i) + p.submit(i); + assertTrue(count.get() < n); + } }