ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SubmissionPublisherTest.java
(Generate patch)

Comparing jsr166/src/test/tck/SubmissionPublisherTest.java (file contents):
Revision 1.1 by dl, Mon Sep 7 17:14:06 2015 UTC vs.
Revision 1.12 by dl, Sat Sep 12 17:18:13 2015 UTC

# Line 5 | Line 5
5   * http://creativecommons.org/publicdomain/zero/1.0/
6   */
7  
8 < import static java.util.concurrent.TimeUnit.MILLISECONDS;
9 < import static java.util.concurrent.TimeUnit.SECONDS;
10 <
8 > import java.util.concurrent.CompletableFuture;
9   import java.util.concurrent.Executor;
10   import java.util.concurrent.Executors;
11   import java.util.concurrent.Flow;
14 import static java.util.concurrent.Flow.Publisher;
15 import static java.util.concurrent.Flow.Subscriber;
16 import static java.util.concurrent.Flow.Subscription;
17 import java.util.concurrent.LinkedBlockingQueue;
12   import java.util.concurrent.ForkJoinPool;
13 + import java.util.concurrent.LinkedBlockingQueue;
14   import java.util.concurrent.SubmissionPublisher;
15   import java.util.concurrent.ThreadFactory;
16   import java.util.concurrent.ThreadPoolExecutor;
17   import java.util.concurrent.TimeUnit;
18   import java.util.concurrent.atomic.AtomicInteger;
19   import java.util.function.BiConsumer;
25 import java.util.function.BiPredicate;
20   import java.util.function.BiFunction;
21 <
21 > import java.util.function.BiPredicate;
22 > import java.util.stream.Stream;
23   import junit.framework.Test;
24   import junit.framework.TestSuite;
25  
26 + import static java.util.concurrent.Flow.Publisher;
27 + import static java.util.concurrent.Flow.Subscriber;
28 + import static java.util.concurrent.Flow.Subscription;
29 + import static java.util.concurrent.TimeUnit.MILLISECONDS;
30 + import static java.util.concurrent.TimeUnit.SECONDS;
31 +
32   public class SubmissionPublisherTest extends JSR166TestCase {
33  
34      public static void main(String[] args) {
# Line 37 | Line 38 | public class SubmissionPublisherTest ext
38          return new TestSuite(SubmissionPublisherTest.class);
39      }
40  
41 <    // Factory for single thread pool in case commonPool parallelism is zero
41 <    static final class DaemonThreadFactory implements ThreadFactory {
42 <        public Thread newThread(Runnable r) {
43 <            Thread t = new Thread(r);
44 <            t.setDaemon(true);
45 <            return t;
46 <        }
47 <    }
41 >    final Executor basicExecutor = basicPublisher().getExecutor();
42      
49    static final Executor basicExecutor =
50        (ForkJoinPool.getCommonPoolParallelism() > 0) ?
51        ForkJoinPool.commonPool() :
52        new ThreadPoolExecutor(1, 1, 60, SECONDS,
53                               new LinkedBlockingQueue<Runnable>(),
54                               new DaemonThreadFactory());
55        
43      static SubmissionPublisher<Integer> basicPublisher() {
44 <        return new SubmissionPublisher<Integer>(basicExecutor,
58 <                                                Flow.defaultBufferSize());
44 >        return new SubmissionPublisher<Integer>();
45      }
46 <    
46 >
47      static class SPException extends RuntimeException {}
48  
49      class TestSubscriber implements Subscriber<Integer> {
# Line 153 | Line 139 | public class SubmissionPublisherTest ext
139       */
140      void checkInitialState(SubmissionPublisher<?> p) {
141          assertFalse(p.hasSubscribers());
142 <        assertEquals(p.getNumberOfSubscribers(), 0);
142 >        assertEquals(0, p.getNumberOfSubscribers());
143          assertTrue(p.getSubscribers().isEmpty());
144          assertFalse(p.isClosed());
145          assertNull(p.getClosedException());
146          int n = p.getMaxBufferCapacity();
147          assertTrue((n & (n - 1)) == 0); // power of two
148          assertNotNull(p.getExecutor());
149 <        assertEquals(p.estimateMinimumDemand(), 0);
150 <        assertEquals(p.estimateMaximumLag(), 0);
149 >        assertEquals(0, p.estimateMinimumDemand());
150 >        assertEquals(0, p.estimateMaximumLag());
151      }
152  
153      /**
154       * A default-constructed SubmissionPublisher has no subscribers,
155       * is not closed, has default buffer size, and uses the
156 <     * ForkJoinPool.commonPool executor
156 >     * defaultExecutor
157       */
158      public void testConstructor1() {
159          SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>();
160          checkInitialState(p);
175        assertSame(p.getExecutor(), ForkJoinPool.commonPool());
161          assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
162 +        Executor e = p.getExecutor(), c = ForkJoinPool.commonPool();
163 +        if (ForkJoinPool.getCommonPoolParallelism() > 1)
164 +            assertSame(e, c);
165 +        else
166 +            assertNotSame(e, c);
167      }
168  
169      /**
# Line 185 | Line 175 | public class SubmissionPublisherTest ext
175          SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(e, 8);
176          checkInitialState(p);
177          assertSame(p.getExecutor(), e);
178 <        assertEquals(p.getMaxBufferCapacity(), 8);
178 >        assertEquals(8, p.getMaxBufferCapacity());
179      }
180  
181      /**
# Line 193 | Line 183 | public class SubmissionPublisherTest ext
183       */
184      public void testConstructor3() {
185          try {
186 <            SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(null, 8);
186 >            new SubmissionPublisher<Integer>(null, 8);
187              shouldThrow();
188 <        } catch (NullPointerException success) {
199 <        }
188 >        } catch (NullPointerException success) {}
189      }
190  
191      /**
# Line 206 | Line 195 | public class SubmissionPublisherTest ext
195      public void testConstructor4() {
196          Executor e = Executors.newFixedThreadPool(1);
197          try {
198 <            SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(e, -1);
198 >            new SubmissionPublisher<Integer>(e, -1);
199              shouldThrow();
200 <        } catch (IllegalArgumentException success) {
212 <        }
200 >        } catch (IllegalArgumentException success) {}
201      }
202  
203      /**
# Line 226 | Line 214 | public class SubmissionPublisherTest ext
214          try {
215              p.submit(1);
216              shouldThrow();
217 <        }
230 <        catch(IllegalStateException success) {
231 <        }
217 >        } catch (IllegalStateException success) {}
218          Throwable ex = new SPException();
219          p.closeExceptionally(ex);
220          assertTrue(p.isClosed());
# Line 251 | Line 237 | public class SubmissionPublisherTest ext
237          try {
238              p.submit(1);
239              shouldThrow();
240 <        }
255 <        catch(IllegalStateException success) {
256 <        }
240 >        } catch (IllegalStateException success) {}
241          p.close();
242          assertTrue(p.isClosed());
243          assertSame(p.getClosedException(), ex);
# Line 270 | Line 254 | public class SubmissionPublisherTest ext
254          SubmissionPublisher<Integer> p = basicPublisher();
255          p.subscribe(s);
256          assertTrue(p.hasSubscribers());
257 <        assertEquals(p.getNumberOfSubscribers(), 1);
257 >        assertEquals(1, p.getNumberOfSubscribers());
258          assertTrue(p.getSubscribers().contains(s));
259          assertTrue(p.isSubscribed(s));
260          s.awaitSubscribe();
261          assertNotNull(s.sn);
262 <        assertEquals(s.nexts, 0);
263 <        assertEquals(s.errors, 0);
264 <        assertEquals(s.completes, 0);
262 >        assertEquals(0, s.nexts);
263 >        assertEquals(0, s.errors);
264 >        assertEquals(0, s.completes);
265          TestSubscriber s2 = new TestSubscriber();
266          p.subscribe(s2);
267          assertTrue(p.hasSubscribers());
268 <        assertEquals(p.getNumberOfSubscribers(), 2);
268 >        assertEquals(2, p.getNumberOfSubscribers());
269          assertTrue(p.getSubscribers().contains(s));
270          assertTrue(p.getSubscribers().contains(s2));
271          assertTrue(p.isSubscribed(s));
272          assertTrue(p.isSubscribed(s2));
273          s2.awaitSubscribe();
274          assertNotNull(s2.sn);
275 <        assertEquals(s2.nexts, 0);
276 <        assertEquals(s2.errors, 0);
277 <        assertEquals(s2.completes, 0);
275 >        assertEquals(0, s2.nexts);
276 >        assertEquals(0, s2.errors);
277 >        assertEquals(0, s2.completes);
278 >        p.close();
279      }
280  
281      /**
# Line 303 | Line 288 | public class SubmissionPublisherTest ext
288          p.close();
289          p.subscribe(s);
290          s.awaitComplete();
291 <        assertEquals(s.nexts, 0);
292 <        assertEquals(s.errors, 0);
293 <        assertEquals(s.completes, 1);
291 >        assertEquals(0, s.nexts);
292 >        assertEquals(0, s.errors);
293 >        assertEquals(1, s.completes, 1);
294      }
295  
296      /**
# Line 321 | Line 306 | public class SubmissionPublisherTest ext
306          assertSame(p.getClosedException(), ex);
307          p.subscribe(s);
308          s.awaitError();
309 <        assertEquals(s.nexts, 0);
310 <        assertEquals(s.errors, 1);
309 >        assertEquals(0, s.nexts);
310 >        assertEquals(1, s.errors);
311      }
312  
313      /**
# Line 334 | Line 319 | public class SubmissionPublisherTest ext
319          SubmissionPublisher<Integer> p = basicPublisher();
320          p.subscribe(s);
321          assertTrue(p.hasSubscribers());
322 <        assertEquals(p.getNumberOfSubscribers(), 1);
322 >        assertEquals(1, p.getNumberOfSubscribers());
323          assertTrue(p.getSubscribers().contains(s));
324          assertTrue(p.isSubscribed(s));
325          s.awaitSubscribe();
326          assertNotNull(s.sn);
327 <        assertEquals(s.nexts, 0);
328 <        assertEquals(s.errors, 0);
329 <        assertEquals(s.completes, 0);
327 >        assertEquals(0, s.nexts);
328 >        assertEquals(0, s.errors);
329 >        assertEquals(0, s.completes);
330          p.subscribe(s);
331          s.awaitError();
332 <        assertEquals(s.nexts, 0);
333 <        assertEquals(s.errors, 1);
332 >        assertEquals(0, s.nexts);
333 >        assertEquals(1, s.errors);
334          assertFalse(p.isSubscribed(s));
335      }
336  
# Line 358 | Line 343 | public class SubmissionPublisherTest ext
343          s.throwOnCall = true;
344          try {
345              p.subscribe(s);
346 <        } catch(Exception ok) {
362 <        }
346 >        } catch (Exception ok) {}
347          s.awaitError();
348 <        assertEquals(s.nexts, 0);
349 <        assertEquals(s.errors, 1);
350 <        assertEquals(s.completes, 0);
348 >        assertEquals(0, s.nexts);
349 >        assertEquals(1, s.errors);
350 >        assertEquals(0, s.completes);
351      }
352  
353      /**
354 <     * subscribe(null) thows NPE
354 >     * subscribe(null) throws NPE
355       */
356      public void testSubscribe6() {
357          SubmissionPublisher<Integer> p = basicPublisher();
358          try {
359              p.subscribe(null);
360 <        } catch(NullPointerException success) {
361 <        }
360 >            shouldThrow();
361 >        } catch (NullPointerException success) {}
362          checkInitialState(p);
363      }
364  
# Line 392 | Line 376 | public class SubmissionPublisherTest ext
376          assertTrue(p.isClosed());
377          assertNull(p.getClosedException());
378          s1.awaitComplete();
379 <        assertEquals(s1.nexts, 1);
380 <        assertEquals(s1.completes, 1);
379 >        assertEquals(1, s1.nexts);
380 >        assertEquals(1, s1.completes);
381          s2.awaitComplete();
382 <        assertEquals(s2.nexts, 1);
383 <        assertEquals(s2.completes, 1);
382 >        assertEquals(1, s2.nexts);
383 >        assertEquals(1, s2.completes);
384      }
385  
386      /**
# Line 413 | Line 397 | public class SubmissionPublisherTest ext
397          assertTrue(p.isClosed());
398          s1.awaitError();
399          assertTrue(s1.nexts <= 1);
400 <        assertEquals(s1.errors, 1);
400 >        assertEquals(1, s1.errors);
401          s2.awaitError();
402          assertTrue(s2.nexts <= 1);
403 <        assertEquals(s2.errors, 1);
403 >        assertEquals(1, s2.errors);
404      }
405  
406      /**
# Line 435 | Line 419 | public class SubmissionPublisherTest ext
419              p.submit(i);
420          p.close();
421          s2.awaitComplete();
422 <        assertEquals(s2.nexts, 20);
423 <        assertEquals(s2.completes, 1);
422 >        assertEquals(20, s2.nexts);
423 >        assertEquals(1, s2.completes);
424          assertTrue(s1.nexts < 20);
425          assertFalse(p.isSubscribed(s1));
426      }
# Line 456 | Line 440 | public class SubmissionPublisherTest ext
440          p.submit(2);
441          p.close();
442          s2.awaitComplete();
443 <        assertEquals(s2.nexts, 2);
443 >        assertEquals(2, s2.nexts);
444          s1.awaitComplete();
445 <        assertEquals(s1.errors, 1);
445 >        assertEquals(1, s1.errors);
446      }
447  
448      /**
449 <     * If a handler is supplied in conctructor, it is invoked when
449 >     * If a handler is supplied in constructor, it is invoked when
450       * subscriber throws an exception in onNext
451       */
452      public void testThrowOnNextHandler() {
# Line 480 | Line 464 | public class SubmissionPublisherTest ext
464          p.submit(2);
465          p.close();
466          s2.awaitComplete();
467 <        assertEquals(s2.nexts, 2);
468 <        assertEquals(s2.completes, 1);
467 >        assertEquals(2, s2.nexts);
468 >        assertEquals(1, s2.completes);
469          s1.awaitError();
470 <        assertEquals(s1.errors, 1);
471 <        assertEquals(calls.get(), 1);
470 >        assertEquals(1, s1.errors);
471 >        assertEquals(1, calls.get());
472      }
473  
474      /**
# Line 501 | Line 485 | public class SubmissionPublisherTest ext
485          p.close();
486          s2.awaitComplete();
487          s1.awaitComplete();
488 <        assertEquals(s2.nexts, 20);
489 <        assertEquals(s2.completes, 1);
490 <        assertEquals(s1.nexts, 20);
491 <        assertEquals(s1.completes, 1);
488 >        assertEquals(20, s2.nexts);
489 >        assertEquals(1, s2.completes);
490 >        assertEquals(20, s1.nexts);
491 >        assertEquals(1, s1.completes);
492      }
493  
494      /**
# Line 522 | Line 506 | public class SubmissionPublisherTest ext
506          p.submit(1);
507          p.submit(2);
508          s2.awaitNext(1);
509 <        assertEquals(s1.nexts, 0);
509 >        assertEquals(0, s1.nexts);
510          s1.sn.request(3);
511          p.submit(3);
512          p.close();
513          s2.awaitComplete();
514 <        assertEquals(s2.nexts, 3);
515 <        assertEquals(s2.completes, 1);
514 >        assertEquals(3, s2.nexts);
515 >        assertEquals(1, s2.completes);
516          s1.awaitComplete();
517          assertTrue(s1.nexts > 0);
518 <        assertEquals(s1.completes, 1);
518 >        assertEquals(1, s1.completes);
519      }
520  
521      /**
# Line 550 | Line 534 | public class SubmissionPublisherTest ext
534          p.submit(2);
535          p.close();
536          s2.awaitComplete();
537 <        assertEquals(s2.nexts, 2);
538 <        assertEquals(s2.completes, 1);
537 >        assertEquals(2, s2.nexts);
538 >        assertEquals(1, s2.completes);
539          s1.awaitNext(1);
540 <        assertEquals(s1.nexts, 1);
540 >        assertEquals(1, s1.nexts);
541      }
542  
543      /**
# Line 572 | Line 556 | public class SubmissionPublisherTest ext
556          p.submit(2);
557          p.close();
558          s2.awaitComplete();
559 <        assertEquals(s2.nexts, 2);
560 <        assertEquals(s2.completes, 1);
559 >        assertEquals(2, s2.nexts);
560 >        assertEquals(1, s2.completes);
561          s1.awaitError();
562 <        assertEquals(s1.errors, 1);
562 >        assertEquals(1, s1.errors);
563          assertTrue(s1.lastError instanceof IllegalArgumentException);
564      }
565  
# Line 589 | Line 573 | public class SubmissionPublisherTest ext
573          s.request = false;
574          p.subscribe(s);
575          s.awaitSubscribe();
576 <        assertEquals(p.estimateMinimumDemand(), 0);
576 >        assertEquals(0, p.estimateMinimumDemand());
577          s.sn.request(1);
578 <        assertEquals(p.estimateMinimumDemand(), 1);
578 >        assertEquals(1, p.estimateMinimumDemand());
579          p.submit(1);
580          s.awaitNext(1);
581 <        assertEquals(p.estimateMinimumDemand(), 0);
581 >        assertEquals(0, p.estimateMinimumDemand());
582      }
583  
584      /**
585 <     * Submit to a publisher with no subscribers returns lag 0
585 >     * submit to a publisher with no subscribers returns lag 0
586       */
587      public void testEmptySubmit() {
588          SubmissionPublisher<Integer> p = basicPublisher();
589 <        assertEquals(p.submit(1), 0);
589 >        assertEquals(0, p.submit(1));
590      }
591  
592      /**
593 <     * Submit(null) throws NPE
593 >     * submit(null) throws NPE
594       */
595      public void testNullSubmit() {
596          SubmissionPublisher<Integer> p = basicPublisher();
597          try {
598              p.submit(null);
599 <        } catch (NullPointerException success) {
600 <        }
599 >            shouldThrow();
600 >        } catch (NullPointerException success) {}
601      }
602  
603      /**
604 <     * Submit returns number of lagged items, compatible with result
604 >     * submit returns number of lagged items, compatible with result
605       * of estimateMaximumLag.
606       */
607      public void testLaggedSubmit() {
# Line 630 | Line 614 | public class SubmissionPublisherTest ext
614          p.subscribe(s2);
615          s2.awaitSubscribe();
616          s1.awaitSubscribe();
617 <        assertEquals(p.submit(1), 1);
617 >        assertEquals(1, p.submit(1));
618          assertTrue(p.estimateMaximumLag() >= 1);
619          assertTrue(p.submit(2) >= 2);
620          assertTrue(p.estimateMaximumLag() >= 2);
# Line 641 | Line 625 | public class SubmissionPublisherTest ext
625          p.submit(4);
626          p.close();
627          s2.awaitComplete();
628 <        assertEquals(s2.nexts, 4);
628 >        assertEquals(4, s2.nexts);
629          s1.awaitComplete();
630 <        assertEquals(s2.nexts, 4);
630 >        assertEquals(4, s2.nexts);
631      }
632  
633      /**
# Line 663 | Line 647 | public class SubmissionPublisherTest ext
647          p.close();
648          s2.awaitComplete();
649          s1.awaitComplete();
650 <        assertEquals(s2.nexts, 20);
651 <        assertEquals(s2.completes, 1);
652 <        assertEquals(s1.nexts, 20);
653 <        assertEquals(s1.completes, 1);
650 >        assertEquals(20, s2.nexts);
651 >        assertEquals(1, s2.completes);
652 >        assertEquals(20, s1.nexts);
653 >        assertEquals(1, s1.completes);
654      }
655 <    
655 >
656      static boolean noopHandle(AtomicInteger count) {
657          count.getAndIncrement();
658          return false;
# Line 681 | Line 665 | public class SubmissionPublisherTest ext
665      }
666  
667      /**
668 <     * Offer to a publisher with no subscribers returns lag 0
668 >     * offer to a publisher with no subscribers returns lag 0
669       */
670      public void testEmptyOffer() {
671          SubmissionPublisher<Integer> p = basicPublisher();
672 <        assertEquals(p.offer(1, null), 0);
672 >        assertEquals(0, p.offer(1, null));
673      }
674  
675      /**
676 <     * Offer(null) throws NPE
676 >     * offer(null) throws NPE
677       */
678      public void testNullOffer() {
679          SubmissionPublisher<Integer> p = basicPublisher();
680          try {
681              p.offer(null, null);
682 <        } catch (NullPointerException success) {
683 <        }
682 >            shouldThrow();
683 >        } catch (NullPointerException success) {}
684      }
685  
686      /**
687 <     * Offer returns number of lagged items if not saturated
687 >     * offer returns number of lagged items if not saturated
688       */
689      public void testLaggedOffer() {
690          SubmissionPublisher<Integer> p = basicPublisher();
# Line 720 | Line 704 | public class SubmissionPublisherTest ext
704          p.offer(4, null);
705          p.close();
706          s2.awaitComplete();
707 <        assertEquals(s2.nexts, 4);
707 >        assertEquals(4, s2.nexts);
708          s1.awaitComplete();
709 <        assertEquals(s2.nexts, 4);
709 >        assertEquals(4, s2.nexts);
710      }
711  
712      /**
713 <     * Offer reports drops if saturated
713 >     * offer reports drops if saturated
714       */
715      public void testDroppedOffer() {
716          SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
# Line 754 | Line 738 | public class SubmissionPublisherTest ext
738      }
739  
740      /**
741 <     * Offer invokes drop handler if saturated
741 >     * offer invokes drop handler if saturated
742       */
743      public void testHandledDroppedOffer() {
744          AtomicInteger calls = new AtomicInteger();
# Line 781 | Line 765 | public class SubmissionPublisherTest ext
765          assertTrue(calls.get() >= 4);
766      }
767  
784
768      /**
769 <     * Offer succeeds if drop handler forces request
769 >     * offer succeeds if drop handler forces request
770       */
771      public void testRecoveredHandledDroppedOffer() {
772          AtomicInteger calls = new AtomicInteger();
# Line 805 | Line 788 | public class SubmissionPublisherTest ext
788          p.close();
789          s2.awaitComplete();
790          s1.awaitComplete();
791 <        assertEquals(s1.nexts + s2.nexts, n);
791 >        assertEquals(n, s1.nexts + s2.nexts);
792          assertTrue(calls.get() >= 2);
793      }
794  
812
795      /**
796 <     * TimedOffer to a publisher with no subscribers returns lag 0
796 >     * Timed offer to a publisher with no subscribers returns lag 0
797       */
798      public void testEmptyTimedOffer() {
799          SubmissionPublisher<Integer> p = basicPublisher();
800 <        assertEquals(p.offer(1, null), 0);
800 >        long startTime = System.nanoTime();
801 >        assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null));
802 >        assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
803      }
804  
805      /**
806 <     * Timed Offer with null item or TimeUnit throws NPE
806 >     * Timed offer with null item or TimeUnit throws NPE
807       */
808      public void testNullTimedOffer() {
809          SubmissionPublisher<Integer> p = basicPublisher();
810 +        long startTime = System.nanoTime();
811          try {
812 <            p.offer(null, SHORT_DELAY_MS, MILLISECONDS, null);
813 <        } catch (NullPointerException success) {
814 <        }
812 >            p.offer(null, LONG_DELAY_MS, MILLISECONDS, null);
813 >            shouldThrow();
814 >        } catch (NullPointerException success) {}
815          try {
816 <            p.offer(1, SHORT_DELAY_MS, null, null);
817 <        } catch (NullPointerException success) {
818 <        }
816 >            p.offer(1, LONG_DELAY_MS, null, null);
817 >            shouldThrow();
818 >        } catch (NullPointerException success) {}
819 >        assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
820      }
821  
822      /**
823 <     * Timed Offer returns number of lagged items if not saturated
823 >     * Timed offer returns number of lagged items if not saturated
824       */
825      public void testLaggedTimedOffer() {
826          SubmissionPublisher<Integer> p = basicPublisher();
# Line 846 | Line 832 | public class SubmissionPublisherTest ext
832          p.subscribe(s2);
833          s2.awaitSubscribe();
834          s1.awaitSubscribe();
835 <        assertTrue(p.offer(1, SHORT_DELAY_MS, MILLISECONDS, null) >= 1);
836 <        assertTrue(p.offer(2, SHORT_DELAY_MS, MILLISECONDS, null) >= 2);
835 >        long startTime = System.nanoTime();
836 >        assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
837 >        assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
838          s1.sn.request(4);
839 <        assertTrue(p.offer(3, SHORT_DELAY_MS, MILLISECONDS, null) >= 3);
839 >        assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
840          s2.sn.request(4);
841 <        p.offer(4, SHORT_DELAY_MS, MILLISECONDS, null);
841 >        p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
842          p.close();
843          s2.awaitComplete();
844 <        assertEquals(s2.nexts, 4);
844 >        assertEquals(4, s2.nexts);
845          s1.awaitComplete();
846 <        assertEquals(s2.nexts, 4);
846 >        assertEquals(4, s2.nexts);
847 >        assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
848      }
849  
850      /**
851 <     * Timed Offer reports drops if saturated
851 >     * Timed offer reports drops if saturated
852       */
853      public void testDroppedTimedOffer() {
854          SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
# Line 873 | Line 861 | public class SubmissionPublisherTest ext
861          p.subscribe(s2);
862          s2.awaitSubscribe();
863          s1.awaitSubscribe();
864 +        long delay = timeoutMillis();
865          for (int i = 1; i <= 4; ++i)
866 <            assertTrue(p.offer(i, SHORT_DELAY_MS, MILLISECONDS, null) >= 0);
867 <        p.offer(5, SHORT_DELAY_MS, MILLISECONDS, null);
868 <        assertTrue(p.offer(6, SHORT_DELAY_MS, MILLISECONDS, null) < 0);
866 >            assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0);
867 >        long startTime = System.nanoTime();
868 >        assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0);
869          s1.sn.request(64);
870 <        assertTrue(p.offer(7, SHORT_DELAY_MS, MILLISECONDS, null) < 0);
870 >        assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0);
871 >        // 2 * delay should elapse but check only 1 * delay to allow timer slop
872 >        assertTrue(millisElapsedSince(startTime) >= delay);
873          s2.sn.request(64);
874          p.close();
875          s2.awaitComplete();
# Line 888 | Line 879 | public class SubmissionPublisherTest ext
879      }
880  
881      /**
882 <     * Timed Offer invokes drop handler if saturated
882 >     * Timed offer invokes drop handler if saturated
883       */
884      public void testHandledDroppedTimedOffer() {
885          AtomicInteger calls = new AtomicInteger();
# Line 902 | Line 893 | public class SubmissionPublisherTest ext
893          p.subscribe(s2);
894          s2.awaitSubscribe();
895          s1.awaitSubscribe();
896 +        long delay = timeoutMillis();
897          for (int i = 1; i <= 4; ++i)
898 <            assertTrue(p.offer(i, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
899 <        p.offer(5, (s, x) -> noopHandle(calls));
900 <        assertTrue(p.offer(6, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
898 >            assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
899 >        long startTime = System.nanoTime();
900 >        assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
901          s1.sn.request(64);
902 <        assertTrue(p.offer(7, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
902 >        assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
903 >        assertTrue(millisElapsedSince(startTime) >= delay);
904          s2.sn.request(64);
905          p.close();
906          s2.awaitComplete();
# Line 916 | Line 909 | public class SubmissionPublisherTest ext
909      }
910  
911      /**
912 <     * Timed Offer succeeds if drop handler forces request
912 >     * Timed offer succeeds if drop handler forces request
913       */
914      public void testRecoveredHandledDroppedTimedOffer() {
915          AtomicInteger calls = new AtomicInteger();
# Line 931 | Line 924 | public class SubmissionPublisherTest ext
924          s2.awaitSubscribe();
925          s1.awaitSubscribe();
926          int n = 0;
927 <        for (int i = 1; i <= 8; ++i) {
928 <            int d = p.offer(i, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> reqHandle(calls, s));
927 >        long delay = timeoutMillis();
928 >        long startTime = System.nanoTime();
929 >        for (int i = 1; i <= 6; ++i) {
930 >            int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
931              n = n + 2 + (d < 0 ? d : 0);
932          }
933 +        assertTrue(millisElapsedSince(startTime) >= delay);
934          p.close();
935          s2.awaitComplete();
936          s1.awaitComplete();
937 <        assertEquals(s1.nexts + s2.nexts, n);
937 >        assertEquals(n, s1.nexts + s2.nexts);
938          assertTrue(calls.get() >= 2);
939      }
940  
941 +    /**
942 +     * consume returns a CompletableFuture that is done when
943 +     * publisher completes
944 +     */
945 +    public void testConsume() {
946 +        AtomicInteger sum = new AtomicInteger();
947 +        SubmissionPublisher<Integer> p = basicPublisher();
948 +        CompletableFuture<Void> f =
949 +            p.consume((Integer x) -> { sum.getAndAdd(x.intValue()); });
950 +        int n = 20;
951 +        for (int i = 1; i <= n; ++i)
952 +            p.submit(i);
953 +        p.close();
954 +        f.join();
955 +        assertEquals((n * (n + 1)) / 2, sum.get());
956 +    }
957 +
958 +    /**
959 +     * consume(null) throws NPE
960 +     */
961 +    public void testConsumeNPE() {
962 +        SubmissionPublisher<Integer> p = basicPublisher();
963 +        try {
964 +            CompletableFuture<Void> f = p.consume(null);
965 +            shouldThrow();
966 +        } catch (NullPointerException success) {}
967 +    }
968 +
969 +    /**
970 +     * consume eventually stops processing published items if cancelled
971 +     */
972 +    public void testCancelledConsume() {
973 +        AtomicInteger count = new AtomicInteger();
974 +        SubmissionPublisher<Integer> p = basicPublisher();
975 +        CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
976 +        f.cancel(true);
977 +        int n = 1000000; // arbitrary limit
978 +        for (int i = 1; i <= n; ++i)
979 +            p.submit(i);
980 +        assertTrue(count.get() < n);
981 +    }
982  
983   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines