ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SubmissionPublisherTest.java
(Generate patch)

Comparing jsr166/src/test/tck/SubmissionPublisherTest.java (file contents):
Revision 1.1 by dl, Mon Sep 7 17:14:06 2015 UTC vs.
Revision 1.11 by jsr166, Sat Sep 12 17:11:12 2015 UTC

# Line 5 | Line 5
5   * http://creativecommons.org/publicdomain/zero/1.0/
6   */
7  
8 < import static java.util.concurrent.TimeUnit.MILLISECONDS;
9 < import static java.util.concurrent.TimeUnit.SECONDS;
10 <
8 > import java.util.concurrent.CompletableFuture;
9   import java.util.concurrent.Executor;
10   import java.util.concurrent.Executors;
11   import java.util.concurrent.Flow;
14 import static java.util.concurrent.Flow.Publisher;
15 import static java.util.concurrent.Flow.Subscriber;
16 import static java.util.concurrent.Flow.Subscription;
17 import java.util.concurrent.LinkedBlockingQueue;
12   import java.util.concurrent.ForkJoinPool;
13 + import java.util.concurrent.LinkedBlockingQueue;
14   import java.util.concurrent.SubmissionPublisher;
15   import java.util.concurrent.ThreadFactory;
16   import java.util.concurrent.ThreadPoolExecutor;
17   import java.util.concurrent.TimeUnit;
18   import java.util.concurrent.atomic.AtomicInteger;
19   import java.util.function.BiConsumer;
25 import java.util.function.BiPredicate;
20   import java.util.function.BiFunction;
21 <
21 > import java.util.function.BiPredicate;
22 > import java.util.stream.Stream;
23   import junit.framework.Test;
24   import junit.framework.TestSuite;
25  
26 + import static java.util.concurrent.Flow.Publisher;
27 + import static java.util.concurrent.Flow.Subscriber;
28 + import static java.util.concurrent.Flow.Subscription;
29 + import static java.util.concurrent.TimeUnit.MILLISECONDS;
30 + import static java.util.concurrent.TimeUnit.SECONDS;
31 +
32   public class SubmissionPublisherTest extends JSR166TestCase {
33  
34      public static void main(String[] args) {
# Line 45 | Line 46 | public class SubmissionPublisherTest ext
46              return t;
47          }
48      }
49 <    
49 >
50      static final Executor basicExecutor =
51 <        (ForkJoinPool.getCommonPoolParallelism() > 0) ?
51 >        (ForkJoinPool.getCommonPoolParallelism() > 1) ?
52          ForkJoinPool.commonPool() :
53          new ThreadPoolExecutor(1, 1, 60, SECONDS,
54                                 new LinkedBlockingQueue<Runnable>(),
55                                 new DaemonThreadFactory());
56 <        
56 >
57      static SubmissionPublisher<Integer> basicPublisher() {
58          return new SubmissionPublisher<Integer>(basicExecutor,
59                                                  Flow.defaultBufferSize());
60      }
61 <    
61 >
62      static class SPException extends RuntimeException {}
63  
64      class TestSubscriber implements Subscriber<Integer> {
# Line 153 | Line 154 | public class SubmissionPublisherTest ext
154       */
155      void checkInitialState(SubmissionPublisher<?> p) {
156          assertFalse(p.hasSubscribers());
157 <        assertEquals(p.getNumberOfSubscribers(), 0);
157 >        assertEquals(0, p.getNumberOfSubscribers());
158          assertTrue(p.getSubscribers().isEmpty());
159          assertFalse(p.isClosed());
160          assertNull(p.getClosedException());
161          int n = p.getMaxBufferCapacity();
162          assertTrue((n & (n - 1)) == 0); // power of two
163          assertNotNull(p.getExecutor());
164 <        assertEquals(p.estimateMinimumDemand(), 0);
165 <        assertEquals(p.estimateMaximumLag(), 0);
164 >        assertEquals(0, p.estimateMinimumDemand());
165 >        assertEquals(0, p.estimateMaximumLag());
166      }
167  
168      /**
# Line 185 | Line 186 | public class SubmissionPublisherTest ext
186          SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(e, 8);
187          checkInitialState(p);
188          assertSame(p.getExecutor(), e);
189 <        assertEquals(p.getMaxBufferCapacity(), 8);
189 >        assertEquals(8, p.getMaxBufferCapacity());
190      }
191  
192      /**
# Line 193 | Line 194 | public class SubmissionPublisherTest ext
194       */
195      public void testConstructor3() {
196          try {
197 <            SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(null, 8);
197 >            new SubmissionPublisher<Integer>(null, 8);
198              shouldThrow();
199 <        } catch (NullPointerException success) {
199 <        }
199 >        } catch (NullPointerException success) {}
200      }
201  
202      /**
# Line 206 | Line 206 | public class SubmissionPublisherTest ext
206      public void testConstructor4() {
207          Executor e = Executors.newFixedThreadPool(1);
208          try {
209 <            SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(e, -1);
209 >            new SubmissionPublisher<Integer>(e, -1);
210              shouldThrow();
211 <        } catch (IllegalArgumentException success) {
212 <        }
211 >        } catch (IllegalArgumentException success) {}
212      }
213  
214      /**
# Line 226 | Line 225 | public class SubmissionPublisherTest ext
225          try {
226              p.submit(1);
227              shouldThrow();
228 <        }
230 <        catch(IllegalStateException success) {
231 <        }
228 >        } catch (IllegalStateException success) {}
229          Throwable ex = new SPException();
230          p.closeExceptionally(ex);
231          assertTrue(p.isClosed());
# Line 251 | Line 248 | public class SubmissionPublisherTest ext
248          try {
249              p.submit(1);
250              shouldThrow();
251 <        }
255 <        catch(IllegalStateException success) {
256 <        }
251 >        } catch (IllegalStateException success) {}
252          p.close();
253          assertTrue(p.isClosed());
254          assertSame(p.getClosedException(), ex);
# Line 270 | Line 265 | public class SubmissionPublisherTest ext
265          SubmissionPublisher<Integer> p = basicPublisher();
266          p.subscribe(s);
267          assertTrue(p.hasSubscribers());
268 <        assertEquals(p.getNumberOfSubscribers(), 1);
268 >        assertEquals(1, p.getNumberOfSubscribers());
269          assertTrue(p.getSubscribers().contains(s));
270          assertTrue(p.isSubscribed(s));
271          s.awaitSubscribe();
272          assertNotNull(s.sn);
273 <        assertEquals(s.nexts, 0);
274 <        assertEquals(s.errors, 0);
275 <        assertEquals(s.completes, 0);
273 >        assertEquals(0, s.nexts);
274 >        assertEquals(0, s.errors);
275 >        assertEquals(0, s.completes);
276          TestSubscriber s2 = new TestSubscriber();
277          p.subscribe(s2);
278          assertTrue(p.hasSubscribers());
279 <        assertEquals(p.getNumberOfSubscribers(), 2);
279 >        assertEquals(2, p.getNumberOfSubscribers());
280          assertTrue(p.getSubscribers().contains(s));
281          assertTrue(p.getSubscribers().contains(s2));
282          assertTrue(p.isSubscribed(s));
283          assertTrue(p.isSubscribed(s2));
284          s2.awaitSubscribe();
285          assertNotNull(s2.sn);
286 <        assertEquals(s2.nexts, 0);
287 <        assertEquals(s2.errors, 0);
288 <        assertEquals(s2.completes, 0);
286 >        assertEquals(0, s2.nexts);
287 >        assertEquals(0, s2.errors);
288 >        assertEquals(0, s2.completes);
289 >        p.close();
290      }
291  
292      /**
# Line 303 | Line 299 | public class SubmissionPublisherTest ext
299          p.close();
300          p.subscribe(s);
301          s.awaitComplete();
302 <        assertEquals(s.nexts, 0);
303 <        assertEquals(s.errors, 0);
304 <        assertEquals(s.completes, 1);
302 >        assertEquals(0, s.nexts);
303 >        assertEquals(0, s.errors);
304 >        assertEquals(1, s.completes, 1);
305      }
306  
307      /**
# Line 321 | Line 317 | public class SubmissionPublisherTest ext
317          assertSame(p.getClosedException(), ex);
318          p.subscribe(s);
319          s.awaitError();
320 <        assertEquals(s.nexts, 0);
321 <        assertEquals(s.errors, 1);
320 >        assertEquals(0, s.nexts);
321 >        assertEquals(1, s.errors);
322      }
323  
324      /**
# Line 334 | Line 330 | public class SubmissionPublisherTest ext
330          SubmissionPublisher<Integer> p = basicPublisher();
331          p.subscribe(s);
332          assertTrue(p.hasSubscribers());
333 <        assertEquals(p.getNumberOfSubscribers(), 1);
333 >        assertEquals(1, p.getNumberOfSubscribers());
334          assertTrue(p.getSubscribers().contains(s));
335          assertTrue(p.isSubscribed(s));
336          s.awaitSubscribe();
337          assertNotNull(s.sn);
338 <        assertEquals(s.nexts, 0);
339 <        assertEquals(s.errors, 0);
340 <        assertEquals(s.completes, 0);
338 >        assertEquals(0, s.nexts);
339 >        assertEquals(0, s.errors);
340 >        assertEquals(0, s.completes);
341          p.subscribe(s);
342          s.awaitError();
343 <        assertEquals(s.nexts, 0);
344 <        assertEquals(s.errors, 1);
343 >        assertEquals(0, s.nexts);
344 >        assertEquals(1, s.errors);
345          assertFalse(p.isSubscribed(s));
346      }
347  
# Line 358 | Line 354 | public class SubmissionPublisherTest ext
354          s.throwOnCall = true;
355          try {
356              p.subscribe(s);
357 <        } catch(Exception ok) {
362 <        }
357 >        } catch (Exception ok) {}
358          s.awaitError();
359 <        assertEquals(s.nexts, 0);
360 <        assertEquals(s.errors, 1);
361 <        assertEquals(s.completes, 0);
359 >        assertEquals(0, s.nexts);
360 >        assertEquals(1, s.errors);
361 >        assertEquals(0, s.completes);
362      }
363  
364      /**
365 <     * subscribe(null) thows NPE
365 >     * subscribe(null) throws NPE
366       */
367      public void testSubscribe6() {
368          SubmissionPublisher<Integer> p = basicPublisher();
369          try {
370              p.subscribe(null);
371 <        } catch(NullPointerException success) {
372 <        }
371 >            shouldThrow();
372 >        } catch (NullPointerException success) {}
373          checkInitialState(p);
374      }
375  
# Line 392 | Line 387 | public class SubmissionPublisherTest ext
387          assertTrue(p.isClosed());
388          assertNull(p.getClosedException());
389          s1.awaitComplete();
390 <        assertEquals(s1.nexts, 1);
391 <        assertEquals(s1.completes, 1);
390 >        assertEquals(1, s1.nexts);
391 >        assertEquals(1, s1.completes);
392          s2.awaitComplete();
393 <        assertEquals(s2.nexts, 1);
394 <        assertEquals(s2.completes, 1);
393 >        assertEquals(1, s2.nexts);
394 >        assertEquals(1, s2.completes);
395      }
396  
397      /**
# Line 413 | Line 408 | public class SubmissionPublisherTest ext
408          assertTrue(p.isClosed());
409          s1.awaitError();
410          assertTrue(s1.nexts <= 1);
411 <        assertEquals(s1.errors, 1);
411 >        assertEquals(1, s1.errors);
412          s2.awaitError();
413          assertTrue(s2.nexts <= 1);
414 <        assertEquals(s2.errors, 1);
414 >        assertEquals(1, s2.errors);
415      }
416  
417      /**
# Line 435 | Line 430 | public class SubmissionPublisherTest ext
430              p.submit(i);
431          p.close();
432          s2.awaitComplete();
433 <        assertEquals(s2.nexts, 20);
434 <        assertEquals(s2.completes, 1);
433 >        assertEquals(20, s2.nexts);
434 >        assertEquals(1, s2.completes);
435          assertTrue(s1.nexts < 20);
436          assertFalse(p.isSubscribed(s1));
437      }
# Line 456 | Line 451 | public class SubmissionPublisherTest ext
451          p.submit(2);
452          p.close();
453          s2.awaitComplete();
454 <        assertEquals(s2.nexts, 2);
454 >        assertEquals(2, s2.nexts);
455          s1.awaitComplete();
456 <        assertEquals(s1.errors, 1);
456 >        assertEquals(1, s1.errors);
457      }
458  
459      /**
460 <     * If a handler is supplied in conctructor, it is invoked when
460 >     * If a handler is supplied in constructor, it is invoked when
461       * subscriber throws an exception in onNext
462       */
463      public void testThrowOnNextHandler() {
# Line 480 | Line 475 | public class SubmissionPublisherTest ext
475          p.submit(2);
476          p.close();
477          s2.awaitComplete();
478 <        assertEquals(s2.nexts, 2);
479 <        assertEquals(s2.completes, 1);
478 >        assertEquals(2, s2.nexts);
479 >        assertEquals(1, s2.completes);
480          s1.awaitError();
481 <        assertEquals(s1.errors, 1);
482 <        assertEquals(calls.get(), 1);
481 >        assertEquals(1, s1.errors);
482 >        assertEquals(1, calls.get());
483      }
484  
485      /**
# Line 501 | Line 496 | public class SubmissionPublisherTest ext
496          p.close();
497          s2.awaitComplete();
498          s1.awaitComplete();
499 <        assertEquals(s2.nexts, 20);
500 <        assertEquals(s2.completes, 1);
501 <        assertEquals(s1.nexts, 20);
502 <        assertEquals(s1.completes, 1);
499 >        assertEquals(20, s2.nexts);
500 >        assertEquals(1, s2.completes);
501 >        assertEquals(20, s1.nexts);
502 >        assertEquals(1, s1.completes);
503      }
504  
505      /**
# Line 522 | Line 517 | public class SubmissionPublisherTest ext
517          p.submit(1);
518          p.submit(2);
519          s2.awaitNext(1);
520 <        assertEquals(s1.nexts, 0);
520 >        assertEquals(0, s1.nexts);
521          s1.sn.request(3);
522          p.submit(3);
523          p.close();
524          s2.awaitComplete();
525 <        assertEquals(s2.nexts, 3);
526 <        assertEquals(s2.completes, 1);
525 >        assertEquals(3, s2.nexts);
526 >        assertEquals(1, s2.completes);
527          s1.awaitComplete();
528          assertTrue(s1.nexts > 0);
529 <        assertEquals(s1.completes, 1);
529 >        assertEquals(1, s1.completes);
530      }
531  
532      /**
# Line 550 | Line 545 | public class SubmissionPublisherTest ext
545          p.submit(2);
546          p.close();
547          s2.awaitComplete();
548 <        assertEquals(s2.nexts, 2);
549 <        assertEquals(s2.completes, 1);
548 >        assertEquals(2, s2.nexts);
549 >        assertEquals(1, s2.completes);
550          s1.awaitNext(1);
551 <        assertEquals(s1.nexts, 1);
551 >        assertEquals(1, s1.nexts);
552      }
553  
554      /**
# Line 572 | Line 567 | public class SubmissionPublisherTest ext
567          p.submit(2);
568          p.close();
569          s2.awaitComplete();
570 <        assertEquals(s2.nexts, 2);
571 <        assertEquals(s2.completes, 1);
570 >        assertEquals(2, s2.nexts);
571 >        assertEquals(1, s2.completes);
572          s1.awaitError();
573 <        assertEquals(s1.errors, 1);
573 >        assertEquals(1, s1.errors);
574          assertTrue(s1.lastError instanceof IllegalArgumentException);
575      }
576  
# Line 589 | Line 584 | public class SubmissionPublisherTest ext
584          s.request = false;
585          p.subscribe(s);
586          s.awaitSubscribe();
587 <        assertEquals(p.estimateMinimumDemand(), 0);
587 >        assertEquals(0, p.estimateMinimumDemand());
588          s.sn.request(1);
589 <        assertEquals(p.estimateMinimumDemand(), 1);
589 >        assertEquals(1, p.estimateMinimumDemand());
590          p.submit(1);
591          s.awaitNext(1);
592 <        assertEquals(p.estimateMinimumDemand(), 0);
592 >        assertEquals(0, p.estimateMinimumDemand());
593      }
594  
595      /**
596 <     * Submit to a publisher with no subscribers returns lag 0
596 >     * submit to a publisher with no subscribers returns lag 0
597       */
598      public void testEmptySubmit() {
599          SubmissionPublisher<Integer> p = basicPublisher();
600 <        assertEquals(p.submit(1), 0);
600 >        assertEquals(0, p.submit(1));
601      }
602  
603      /**
604 <     * Submit(null) throws NPE
604 >     * submit(null) throws NPE
605       */
606      public void testNullSubmit() {
607          SubmissionPublisher<Integer> p = basicPublisher();
608          try {
609              p.submit(null);
610 <        } catch (NullPointerException success) {
611 <        }
610 >            shouldThrow();
611 >        } catch (NullPointerException success) {}
612      }
613  
614      /**
615 <     * Submit returns number of lagged items, compatible with result
615 >     * submit returns number of lagged items, compatible with result
616       * of estimateMaximumLag.
617       */
618      public void testLaggedSubmit() {
# Line 630 | Line 625 | public class SubmissionPublisherTest ext
625          p.subscribe(s2);
626          s2.awaitSubscribe();
627          s1.awaitSubscribe();
628 <        assertEquals(p.submit(1), 1);
628 >        assertEquals(1, p.submit(1));
629          assertTrue(p.estimateMaximumLag() >= 1);
630          assertTrue(p.submit(2) >= 2);
631          assertTrue(p.estimateMaximumLag() >= 2);
# Line 641 | Line 636 | public class SubmissionPublisherTest ext
636          p.submit(4);
637          p.close();
638          s2.awaitComplete();
639 <        assertEquals(s2.nexts, 4);
639 >        assertEquals(4, s2.nexts);
640          s1.awaitComplete();
641 <        assertEquals(s2.nexts, 4);
641 >        assertEquals(4, s2.nexts);
642      }
643  
644      /**
# Line 663 | Line 658 | public class SubmissionPublisherTest ext
658          p.close();
659          s2.awaitComplete();
660          s1.awaitComplete();
661 <        assertEquals(s2.nexts, 20);
662 <        assertEquals(s2.completes, 1);
663 <        assertEquals(s1.nexts, 20);
664 <        assertEquals(s1.completes, 1);
661 >        assertEquals(20, s2.nexts);
662 >        assertEquals(1, s2.completes);
663 >        assertEquals(20, s1.nexts);
664 >        assertEquals(1, s1.completes);
665      }
666 <    
666 >
667      static boolean noopHandle(AtomicInteger count) {
668          count.getAndIncrement();
669          return false;
# Line 681 | Line 676 | public class SubmissionPublisherTest ext
676      }
677  
678      /**
679 <     * Offer to a publisher with no subscribers returns lag 0
679 >     * offer to a publisher with no subscribers returns lag 0
680       */
681      public void testEmptyOffer() {
682          SubmissionPublisher<Integer> p = basicPublisher();
683 <        assertEquals(p.offer(1, null), 0);
683 >        assertEquals(0, p.offer(1, null));
684      }
685  
686      /**
687 <     * Offer(null) throws NPE
687 >     * offer(null) throws NPE
688       */
689      public void testNullOffer() {
690          SubmissionPublisher<Integer> p = basicPublisher();
691          try {
692              p.offer(null, null);
693 <        } catch (NullPointerException success) {
694 <        }
693 >            shouldThrow();
694 >        } catch (NullPointerException success) {}
695      }
696  
697      /**
698 <     * Offer returns number of lagged items if not saturated
698 >     * offer returns number of lagged items if not saturated
699       */
700      public void testLaggedOffer() {
701          SubmissionPublisher<Integer> p = basicPublisher();
# Line 720 | Line 715 | public class SubmissionPublisherTest ext
715          p.offer(4, null);
716          p.close();
717          s2.awaitComplete();
718 <        assertEquals(s2.nexts, 4);
718 >        assertEquals(4, s2.nexts);
719          s1.awaitComplete();
720 <        assertEquals(s2.nexts, 4);
720 >        assertEquals(4, s2.nexts);
721      }
722  
723      /**
724 <     * Offer reports drops if saturated
724 >     * offer reports drops if saturated
725       */
726      public void testDroppedOffer() {
727          SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
# Line 754 | Line 749 | public class SubmissionPublisherTest ext
749      }
750  
751      /**
752 <     * Offer invokes drop handler if saturated
752 >     * offer invokes drop handler if saturated
753       */
754      public void testHandledDroppedOffer() {
755          AtomicInteger calls = new AtomicInteger();
# Line 781 | Line 776 | public class SubmissionPublisherTest ext
776          assertTrue(calls.get() >= 4);
777      }
778  
784
779      /**
780 <     * Offer succeeds if drop handler forces request
780 >     * offer succeeds if drop handler forces request
781       */
782      public void testRecoveredHandledDroppedOffer() {
783          AtomicInteger calls = new AtomicInteger();
# Line 805 | Line 799 | public class SubmissionPublisherTest ext
799          p.close();
800          s2.awaitComplete();
801          s1.awaitComplete();
802 <        assertEquals(s1.nexts + s2.nexts, n);
802 >        assertEquals(n, s1.nexts + s2.nexts);
803          assertTrue(calls.get() >= 2);
804      }
805  
812
806      /**
807 <     * TimedOffer to a publisher with no subscribers returns lag 0
807 >     * Timed offer to a publisher with no subscribers returns lag 0
808       */
809      public void testEmptyTimedOffer() {
810          SubmissionPublisher<Integer> p = basicPublisher();
811 <        assertEquals(p.offer(1, null), 0);
811 >        long startTime = System.nanoTime();
812 >        assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null));
813 >        assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
814      }
815  
816      /**
817 <     * Timed Offer with null item or TimeUnit throws NPE
817 >     * Timed offer with null item or TimeUnit throws NPE
818       */
819      public void testNullTimedOffer() {
820          SubmissionPublisher<Integer> p = basicPublisher();
821 +        long startTime = System.nanoTime();
822          try {
823 <            p.offer(null, SHORT_DELAY_MS, MILLISECONDS, null);
824 <        } catch (NullPointerException success) {
825 <        }
823 >            p.offer(null, LONG_DELAY_MS, MILLISECONDS, null);
824 >            shouldThrow();
825 >        } catch (NullPointerException success) {}
826          try {
827 <            p.offer(1, SHORT_DELAY_MS, null, null);
828 <        } catch (NullPointerException success) {
829 <        }
827 >            p.offer(1, LONG_DELAY_MS, null, null);
828 >            shouldThrow();
829 >        } catch (NullPointerException success) {}
830 >        assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
831      }
832  
833      /**
834 <     * Timed Offer returns number of lagged items if not saturated
834 >     * Timed offer returns number of lagged items if not saturated
835       */
836      public void testLaggedTimedOffer() {
837          SubmissionPublisher<Integer> p = basicPublisher();
# Line 846 | Line 843 | public class SubmissionPublisherTest ext
843          p.subscribe(s2);
844          s2.awaitSubscribe();
845          s1.awaitSubscribe();
846 <        assertTrue(p.offer(1, SHORT_DELAY_MS, MILLISECONDS, null) >= 1);
847 <        assertTrue(p.offer(2, SHORT_DELAY_MS, MILLISECONDS, null) >= 2);
846 >        long startTime = System.nanoTime();
847 >        assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
848 >        assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
849          s1.sn.request(4);
850 <        assertTrue(p.offer(3, SHORT_DELAY_MS, MILLISECONDS, null) >= 3);
850 >        assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
851          s2.sn.request(4);
852 <        p.offer(4, SHORT_DELAY_MS, MILLISECONDS, null);
852 >        p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
853          p.close();
854          s2.awaitComplete();
855 <        assertEquals(s2.nexts, 4);
855 >        assertEquals(4, s2.nexts);
856          s1.awaitComplete();
857 <        assertEquals(s2.nexts, 4);
857 >        assertEquals(4, s2.nexts);
858 >        assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
859      }
860  
861      /**
862 <     * Timed Offer reports drops if saturated
862 >     * Timed offer reports drops if saturated
863       */
864      public void testDroppedTimedOffer() {
865          SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
# Line 873 | Line 872 | public class SubmissionPublisherTest ext
872          p.subscribe(s2);
873          s2.awaitSubscribe();
874          s1.awaitSubscribe();
875 +        long delay = timeoutMillis();
876          for (int i = 1; i <= 4; ++i)
877 <            assertTrue(p.offer(i, SHORT_DELAY_MS, MILLISECONDS, null) >= 0);
878 <        p.offer(5, SHORT_DELAY_MS, MILLISECONDS, null);
879 <        assertTrue(p.offer(6, SHORT_DELAY_MS, MILLISECONDS, null) < 0);
877 >            assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0);
878 >        long startTime = System.nanoTime();
879 >        assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0);
880          s1.sn.request(64);
881 <        assertTrue(p.offer(7, SHORT_DELAY_MS, MILLISECONDS, null) < 0);
881 >        assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0);
882 >        // 2 * delay should elapse but check only 1 * delay to allow timer slop
883 >        assertTrue(millisElapsedSince(startTime) >= delay);
884          s2.sn.request(64);
885          p.close();
886          s2.awaitComplete();
# Line 888 | Line 890 | public class SubmissionPublisherTest ext
890      }
891  
892      /**
893 <     * Timed Offer invokes drop handler if saturated
893 >     * Timed offer invokes drop handler if saturated
894       */
895      public void testHandledDroppedTimedOffer() {
896          AtomicInteger calls = new AtomicInteger();
# Line 902 | Line 904 | public class SubmissionPublisherTest ext
904          p.subscribe(s2);
905          s2.awaitSubscribe();
906          s1.awaitSubscribe();
907 +        long delay = timeoutMillis();
908          for (int i = 1; i <= 4; ++i)
909 <            assertTrue(p.offer(i, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
910 <        p.offer(5, (s, x) -> noopHandle(calls));
911 <        assertTrue(p.offer(6, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
909 >            assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
910 >        long startTime = System.nanoTime();
911 >        assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
912          s1.sn.request(64);
913 <        assertTrue(p.offer(7, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
913 >        assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
914 >        assertTrue(millisElapsedSince(startTime) >= delay);
915          s2.sn.request(64);
916          p.close();
917          s2.awaitComplete();
# Line 916 | Line 920 | public class SubmissionPublisherTest ext
920      }
921  
922      /**
923 <     * Timed Offer succeeds if drop handler forces request
923 >     * Timed offer succeeds if drop handler forces request
924       */
925      public void testRecoveredHandledDroppedTimedOffer() {
926          AtomicInteger calls = new AtomicInteger();
# Line 931 | Line 935 | public class SubmissionPublisherTest ext
935          s2.awaitSubscribe();
936          s1.awaitSubscribe();
937          int n = 0;
938 <        for (int i = 1; i <= 8; ++i) {
939 <            int d = p.offer(i, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> reqHandle(calls, s));
938 >        long delay = timeoutMillis();
939 >        long startTime = System.nanoTime();
940 >        for (int i = 1; i <= 6; ++i) {
941 >            int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
942              n = n + 2 + (d < 0 ? d : 0);
943          }
944 +        assertTrue(millisElapsedSince(startTime) >= delay);
945          p.close();
946          s2.awaitComplete();
947          s1.awaitComplete();
948 <        assertEquals(s1.nexts + s2.nexts, n);
948 >        assertEquals(n, s1.nexts + s2.nexts);
949          assertTrue(calls.get() >= 2);
950      }
951  
952 +    /**
953 +     * consume returns a CompletableFuture that is done when
954 +     * publisher completes
955 +     */
956 +    public void testConsume() {
957 +        AtomicInteger sum = new AtomicInteger();
958 +        SubmissionPublisher<Integer> p = basicPublisher();
959 +        CompletableFuture<Void> f =
960 +            p.consume((Integer x) -> { sum.getAndAdd(x.intValue()); });
961 +        int n = 20;
962 +        for (int i = 1; i <= n; ++i)
963 +            p.submit(i);
964 +        p.close();
965 +        f.join();
966 +        assertEquals((n * (n + 1)) / 2, sum.get());
967 +    }
968 +
969 +    /**
970 +     * consume(null) throws NPE
971 +     */
972 +    public void testConsumeNPE() {
973 +        SubmissionPublisher<Integer> p = basicPublisher();
974 +        try {
975 +            CompletableFuture<Void> f = p.consume(null);
976 +            shouldThrow();
977 +        } catch (NullPointerException success) {}
978 +    }
979 +
980 +    /**
981 +     * consume eventually stops processing published items if cancelled
982 +     */
983 +    public void testCancelledConsume() {
984 +        AtomicInteger count = new AtomicInteger();
985 +        SubmissionPublisher<Integer> p = basicPublisher();
986 +        CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
987 +        f.cancel(true);
988 +        int n = 1000000; // arbitrary limit
989 +        for (int i = 1; i <= n; ++i)
990 +            p.submit(i);
991 +        assertTrue(count.get() < n);
992 +    }
993  
994   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines