ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SubmissionPublisherTest.java
Revision: 1.19
Committed: Thu Mar 9 00:11:16 2017 UTC (7 years, 1 month ago) by dl
Branch: MAIN
Changes since 1.18: +8 -1 lines
Log Message:
Conform to reactive-stream.org -- reject request(0)

File Contents

# Content
1 /*
2 * Written by Doug Lea and Martin Buchholz with assistance from
3 * members of JCP JSR-166 Expert Group and released to the public
4 * domain, as explained at
5 * http://creativecommons.org/publicdomain/zero/1.0/
6 */
7
8 import java.util.concurrent.CompletableFuture;
9 import java.util.concurrent.Executor;
10 import java.util.concurrent.Executors;
11 import java.util.concurrent.Flow;
12 import java.util.concurrent.ForkJoinPool;
13 import java.util.concurrent.SubmissionPublisher;
14 import java.util.concurrent.atomic.AtomicInteger;
15 import junit.framework.Test;
16 import junit.framework.TestSuite;
17
18 import static java.util.concurrent.Flow.Subscriber;
19 import static java.util.concurrent.Flow.Subscription;
20 import static java.util.concurrent.TimeUnit.MILLISECONDS;
21
22 public class SubmissionPublisherTest extends JSR166TestCase {
23
24 public static void main(String[] args) {
25 main(suite(), args);
26 }
27 public static Test suite() {
28 return new TestSuite(SubmissionPublisherTest.class);
29 }
30
31 final Executor basicExecutor = basicPublisher().getExecutor();
32
33 static SubmissionPublisher<Integer> basicPublisher() {
34 return new SubmissionPublisher<Integer>();
35 }
36
37 static class SPException extends RuntimeException {}
38
39 class TestSubscriber implements Subscriber<Integer> {
40 volatile Subscription sn;
41 int last; // Requires that onNexts are in numeric order
42 volatile int nexts;
43 volatile int errors;
44 volatile int completes;
45 volatile boolean throwOnCall = false;
46 volatile boolean request = true;
47 volatile Throwable lastError;
48
49 public synchronized void onSubscribe(Subscription s) {
50 threadAssertTrue(sn == null);
51 sn = s;
52 notifyAll();
53 if (throwOnCall)
54 throw new SPException();
55 if (request)
56 sn.request(1L);
57 }
58 public synchronized void onNext(Integer t) {
59 ++nexts;
60 notifyAll();
61 int current = t.intValue();
62 threadAssertTrue(current >= last);
63 last = current;
64 if (request)
65 sn.request(1L);
66 if (throwOnCall)
67 throw new SPException();
68 }
69 public synchronized void onError(Throwable t) {
70 threadAssertTrue(completes == 0);
71 threadAssertTrue(errors == 0);
72 lastError = t;
73 ++errors;
74 notifyAll();
75 }
76 public synchronized void onComplete() {
77 threadAssertTrue(completes == 0);
78 ++completes;
79 notifyAll();
80 }
81
82 synchronized void awaitSubscribe() {
83 while (sn == null) {
84 try {
85 wait();
86 } catch (Exception ex) {
87 threadUnexpectedException(ex);
88 break;
89 }
90 }
91 }
92 synchronized void awaitNext(int n) {
93 while (nexts < n) {
94 try {
95 wait();
96 } catch (Exception ex) {
97 threadUnexpectedException(ex);
98 break;
99 }
100 }
101 }
102 synchronized void awaitComplete() {
103 while (completes == 0 && errors == 0) {
104 try {
105 wait();
106 } catch (Exception ex) {
107 threadUnexpectedException(ex);
108 break;
109 }
110 }
111 }
112 synchronized void awaitError() {
113 while (errors == 0) {
114 try {
115 wait();
116 } catch (Exception ex) {
117 threadUnexpectedException(ex);
118 break;
119 }
120 }
121 }
122
123 }
124
125 /**
126 * A new SubmissionPublisher has no subscribers, a non-null
127 * executor, a power-of-two capacity, is not closed, and reports
128 * zero demand and lag
129 */
130 void checkInitialState(SubmissionPublisher<?> p) {
131 assertFalse(p.hasSubscribers());
132 assertEquals(0, p.getNumberOfSubscribers());
133 assertTrue(p.getSubscribers().isEmpty());
134 assertFalse(p.isClosed());
135 assertNull(p.getClosedException());
136 int n = p.getMaxBufferCapacity();
137 assertTrue((n & (n - 1)) == 0); // power of two
138 assertNotNull(p.getExecutor());
139 assertEquals(0, p.estimateMinimumDemand());
140 assertEquals(0, p.estimateMaximumLag());
141 }
142
143 /**
144 * A default-constructed SubmissionPublisher has no subscribers,
145 * is not closed, has default buffer size, and uses the
146 * defaultExecutor
147 */
148 public void testConstructor1() {
149 SubmissionPublisher<Integer> p = new SubmissionPublisher<>();
150 checkInitialState(p);
151 assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
152 Executor e = p.getExecutor(), c = ForkJoinPool.commonPool();
153 if (ForkJoinPool.getCommonPoolParallelism() > 1)
154 assertSame(e, c);
155 else
156 assertNotSame(e, c);
157 }
158
159 /**
160 * A new SubmissionPublisher has no subscribers, is not closed,
161 * has the given buffer size, and uses the given executor
162 */
163 public void testConstructor2() {
164 Executor e = Executors.newFixedThreadPool(1);
165 SubmissionPublisher<Integer> p = new SubmissionPublisher<>(e, 8);
166 checkInitialState(p);
167 assertSame(p.getExecutor(), e);
168 assertEquals(8, p.getMaxBufferCapacity());
169 }
170
171 /**
172 * A null Executor argument to SubmissionPublisher constructor throws NPE
173 */
174 public void testConstructor3() {
175 try {
176 new SubmissionPublisher<Integer>(null, 8);
177 shouldThrow();
178 } catch (NullPointerException success) {}
179 }
180
181 /**
182 * A negative capacity argument to SubmissionPublisher constructor
183 * throws IAE
184 */
185 public void testConstructor4() {
186 Executor e = Executors.newFixedThreadPool(1);
187 try {
188 new SubmissionPublisher<Integer>(e, -1);
189 shouldThrow();
190 } catch (IllegalArgumentException success) {}
191 }
192
193 /**
194 * A closed publisher reports isClosed with no closedException and
195 * throws ISE upon attempted submission; a subsequent close or
196 * closeExceptionally has no additional effect.
197 */
198 public void testClose() {
199 SubmissionPublisher<Integer> p = basicPublisher();
200 checkInitialState(p);
201 p.close();
202 assertTrue(p.isClosed());
203 assertNull(p.getClosedException());
204 try {
205 p.submit(1);
206 shouldThrow();
207 } catch (IllegalStateException success) {}
208 Throwable ex = new SPException();
209 p.closeExceptionally(ex);
210 assertTrue(p.isClosed());
211 assertNull(p.getClosedException());
212 }
213
214 /**
215 * A publisher closedExceptionally reports isClosed with the
216 * closedException and throws ISE upon attempted submission; a
217 * subsequent close or closeExceptionally has no additional
218 * effect.
219 */
220 public void testCloseExceptionally() {
221 SubmissionPublisher<Integer> p = basicPublisher();
222 checkInitialState(p);
223 Throwable ex = new SPException();
224 p.closeExceptionally(ex);
225 assertTrue(p.isClosed());
226 assertSame(p.getClosedException(), ex);
227 try {
228 p.submit(1);
229 shouldThrow();
230 } catch (IllegalStateException success) {}
231 p.close();
232 assertTrue(p.isClosed());
233 assertSame(p.getClosedException(), ex);
234 }
235
236 /**
237 * Upon subscription, the subscriber's onSubscribe is called, no
238 * other Subscriber methods are invoked, the publisher
239 * hasSubscribers, isSubscribed is true, and existing
240 * subscriptions are unaffected.
241 */
242 public void testSubscribe1() {
243 TestSubscriber s = new TestSubscriber();
244 SubmissionPublisher<Integer> p = basicPublisher();
245 p.subscribe(s);
246 assertTrue(p.hasSubscribers());
247 assertEquals(1, p.getNumberOfSubscribers());
248 assertTrue(p.getSubscribers().contains(s));
249 assertTrue(p.isSubscribed(s));
250 s.awaitSubscribe();
251 assertNotNull(s.sn);
252 assertEquals(0, s.nexts);
253 assertEquals(0, s.errors);
254 assertEquals(0, s.completes);
255 TestSubscriber s2 = new TestSubscriber();
256 p.subscribe(s2);
257 assertTrue(p.hasSubscribers());
258 assertEquals(2, p.getNumberOfSubscribers());
259 assertTrue(p.getSubscribers().contains(s));
260 assertTrue(p.getSubscribers().contains(s2));
261 assertTrue(p.isSubscribed(s));
262 assertTrue(p.isSubscribed(s2));
263 s2.awaitSubscribe();
264 assertNotNull(s2.sn);
265 assertEquals(0, s2.nexts);
266 assertEquals(0, s2.errors);
267 assertEquals(0, s2.completes);
268 p.close();
269 }
270
271 /**
272 * If closed, upon subscription, the subscriber's onComplete
273 * method is invoked
274 */
275 public void testSubscribe2() {
276 TestSubscriber s = new TestSubscriber();
277 SubmissionPublisher<Integer> p = basicPublisher();
278 p.close();
279 p.subscribe(s);
280 s.awaitComplete();
281 assertEquals(0, s.nexts);
282 assertEquals(0, s.errors);
283 assertEquals(1, s.completes, 1);
284 }
285
286 /**
287 * If closedExceptionally, upon subscription, the subscriber's
288 * onError method is invoked
289 */
290 public void testSubscribe3() {
291 TestSubscriber s = new TestSubscriber();
292 SubmissionPublisher<Integer> p = basicPublisher();
293 Throwable ex = new SPException();
294 p.closeExceptionally(ex);
295 assertTrue(p.isClosed());
296 assertSame(p.getClosedException(), ex);
297 p.subscribe(s);
298 s.awaitError();
299 assertEquals(0, s.nexts);
300 assertEquals(1, s.errors);
301 }
302
303 /**
304 * Upon attempted resubscription, the subscriber's onError is
305 * called and the subscription is cancelled.
306 */
307 public void testSubscribe4() {
308 TestSubscriber s = new TestSubscriber();
309 SubmissionPublisher<Integer> p = basicPublisher();
310 p.subscribe(s);
311 assertTrue(p.hasSubscribers());
312 assertEquals(1, p.getNumberOfSubscribers());
313 assertTrue(p.getSubscribers().contains(s));
314 assertTrue(p.isSubscribed(s));
315 s.awaitSubscribe();
316 assertNotNull(s.sn);
317 assertEquals(0, s.nexts);
318 assertEquals(0, s.errors);
319 assertEquals(0, s.completes);
320 p.subscribe(s);
321 s.awaitError();
322 assertEquals(0, s.nexts);
323 assertEquals(1, s.errors);
324 assertFalse(p.isSubscribed(s));
325 }
326
327 /**
328 * An exception thrown in onSubscribe causes onError
329 */
330 public void testSubscribe5() {
331 TestSubscriber s = new TestSubscriber();
332 SubmissionPublisher<Integer> p = basicPublisher();
333 s.throwOnCall = true;
334 try {
335 p.subscribe(s);
336 } catch (Exception ok) {}
337 s.awaitError();
338 assertEquals(0, s.nexts);
339 assertEquals(1, s.errors);
340 assertEquals(0, s.completes);
341 }
342
343 /**
344 * subscribe(null) throws NPE
345 */
346 public void testSubscribe6() {
347 SubmissionPublisher<Integer> p = basicPublisher();
348 try {
349 p.subscribe(null);
350 shouldThrow();
351 } catch (NullPointerException success) {}
352 checkInitialState(p);
353 }
354
355 /**
356 * Closing a publisher causes onComplete to subscribers
357 */
358 public void testCloseCompletes() {
359 SubmissionPublisher<Integer> p = basicPublisher();
360 TestSubscriber s1 = new TestSubscriber();
361 TestSubscriber s2 = new TestSubscriber();
362 p.subscribe(s1);
363 p.subscribe(s2);
364 p.submit(1);
365 p.close();
366 assertTrue(p.isClosed());
367 assertNull(p.getClosedException());
368 s1.awaitComplete();
369 assertEquals(1, s1.nexts);
370 assertEquals(1, s1.completes);
371 s2.awaitComplete();
372 assertEquals(1, s2.nexts);
373 assertEquals(1, s2.completes);
374 }
375
376 /**
377 * Closing a publisher exceptionally causes onError to subscribers
378 * after they are subscribed
379 */
380 public void testCloseExceptionallyError() {
381 SubmissionPublisher<Integer> p = basicPublisher();
382 TestSubscriber s1 = new TestSubscriber();
383 TestSubscriber s2 = new TestSubscriber();
384 p.subscribe(s1);
385 p.subscribe(s2);
386 p.submit(1);
387 p.closeExceptionally(new SPException());
388 assertTrue(p.isClosed());
389 s1.awaitSubscribe();
390 s1.awaitError();
391 assertTrue(s1.nexts <= 1);
392 assertEquals(1, s1.errors);
393 s2.awaitSubscribe();
394 s2.awaitError();
395 assertTrue(s2.nexts <= 1);
396 assertEquals(1, s2.errors);
397 }
398
399 /**
400 * Cancelling a subscription eventually causes no more onNexts to be issued
401 */
402 public void testCancel() {
403 SubmissionPublisher<Integer> p = basicPublisher();
404 TestSubscriber s1 = new TestSubscriber();
405 TestSubscriber s2 = new TestSubscriber();
406 p.subscribe(s1);
407 p.subscribe(s2);
408 s1.awaitSubscribe();
409 p.submit(1);
410 s1.sn.cancel();
411 for (int i = 2; i <= 20; ++i)
412 p.submit(i);
413 p.close();
414 s2.awaitComplete();
415 assertEquals(20, s2.nexts);
416 assertEquals(1, s2.completes);
417 assertTrue(s1.nexts < 20);
418 assertFalse(p.isSubscribed(s1));
419 }
420
421 /**
422 * Throwing an exception in onNext causes onError
423 */
424 public void testThrowOnNext() {
425 SubmissionPublisher<Integer> p = basicPublisher();
426 TestSubscriber s1 = new TestSubscriber();
427 TestSubscriber s2 = new TestSubscriber();
428 p.subscribe(s1);
429 p.subscribe(s2);
430 s1.awaitSubscribe();
431 p.submit(1);
432 s1.throwOnCall = true;
433 p.submit(2);
434 p.close();
435 s2.awaitComplete();
436 assertEquals(2, s2.nexts);
437 s1.awaitComplete();
438 assertEquals(1, s1.errors);
439 }
440
441 /**
442 * If a handler is supplied in constructor, it is invoked when
443 * subscriber throws an exception in onNext
444 */
445 public void testThrowOnNextHandler() {
446 AtomicInteger calls = new AtomicInteger();
447 SubmissionPublisher<Integer> p = new SubmissionPublisher<>(
448 basicExecutor, 8, (s, e) -> calls.getAndIncrement());
449 TestSubscriber s1 = new TestSubscriber();
450 TestSubscriber s2 = new TestSubscriber();
451 p.subscribe(s1);
452 p.subscribe(s2);
453 s1.awaitSubscribe();
454 p.submit(1);
455 s1.throwOnCall = true;
456 p.submit(2);
457 p.close();
458 s2.awaitComplete();
459 assertEquals(2, s2.nexts);
460 assertEquals(1, s2.completes);
461 s1.awaitError();
462 assertEquals(1, s1.errors);
463 assertEquals(1, calls.get());
464 }
465
466 /**
467 * onNext items are issued in the same order to each subscriber
468 */
469 public void testOrder() {
470 SubmissionPublisher<Integer> p = basicPublisher();
471 TestSubscriber s1 = new TestSubscriber();
472 TestSubscriber s2 = new TestSubscriber();
473 p.subscribe(s1);
474 p.subscribe(s2);
475 for (int i = 1; i <= 20; ++i)
476 p.submit(i);
477 p.close();
478 s2.awaitComplete();
479 s1.awaitComplete();
480 assertEquals(20, s2.nexts);
481 assertEquals(1, s2.completes);
482 assertEquals(20, s1.nexts);
483 assertEquals(1, s1.completes);
484 }
485
486 /**
487 * onNext is issued only if requested
488 */
489 public void testRequest1() {
490 SubmissionPublisher<Integer> p = basicPublisher();
491 TestSubscriber s1 = new TestSubscriber();
492 s1.request = false;
493 p.subscribe(s1);
494 s1.awaitSubscribe();
495 assertTrue(p.estimateMinimumDemand() == 0);
496 TestSubscriber s2 = new TestSubscriber();
497 p.subscribe(s2);
498 p.submit(1);
499 p.submit(2);
500 s2.awaitNext(1);
501 assertEquals(0, s1.nexts);
502 s1.sn.request(3);
503 p.submit(3);
504 p.close();
505 s2.awaitComplete();
506 assertEquals(3, s2.nexts);
507 assertEquals(1, s2.completes);
508 s1.awaitComplete();
509 assertTrue(s1.nexts > 0);
510 assertEquals(1, s1.completes);
511 }
512
513 /**
514 * onNext is not issued when requests become zero
515 */
516 public void testRequest2() {
517 SubmissionPublisher<Integer> p = basicPublisher();
518 TestSubscriber s1 = new TestSubscriber();
519 TestSubscriber s2 = new TestSubscriber();
520 p.subscribe(s1);
521 p.subscribe(s2);
522 s2.awaitSubscribe();
523 s1.awaitSubscribe();
524 s1.request = false;
525 p.submit(1);
526 p.submit(2);
527 p.close();
528 s2.awaitComplete();
529 assertEquals(2, s2.nexts);
530 assertEquals(1, s2.completes);
531 s1.awaitNext(1);
532 assertEquals(1, s1.nexts);
533 }
534
535 /**
536 * Non-positive request causes error
537 */
538 public void testRequest3() {
539 SubmissionPublisher<Integer> p = basicPublisher();
540 TestSubscriber s1 = new TestSubscriber();
541 TestSubscriber s2 = new TestSubscriber();
542 TestSubscriber s3 = new TestSubscriber();
543 p.subscribe(s1);
544 p.subscribe(s2);
545 p.subscribe(s3);
546 s3.awaitSubscribe();
547 s2.awaitSubscribe();
548 s1.awaitSubscribe();
549 s1.sn.request(-1L);
550 s3.sn.request(0L);
551 p.submit(1);
552 p.submit(2);
553 p.close();
554 s2.awaitComplete();
555 assertEquals(2, s2.nexts);
556 assertEquals(1, s2.completes);
557 s1.awaitError();
558 assertEquals(1, s1.errors);
559 assertTrue(s1.lastError instanceof IllegalArgumentException);
560 s3.awaitError();
561 assertEquals(1, s3.errors);
562 assertTrue(s3.lastError instanceof IllegalArgumentException);
563 }
564
565 /**
566 * estimateMinimumDemand reports 0 until request, nonzero after
567 * request, and zero again after delivery
568 */
569 public void testEstimateMinimumDemand() {
570 TestSubscriber s = new TestSubscriber();
571 SubmissionPublisher<Integer> p = basicPublisher();
572 s.request = false;
573 p.subscribe(s);
574 s.awaitSubscribe();
575 assertEquals(0, p.estimateMinimumDemand());
576 s.sn.request(1);
577 assertEquals(1, p.estimateMinimumDemand());
578 p.submit(1);
579 s.awaitNext(1);
580 assertEquals(0, p.estimateMinimumDemand());
581 }
582
583 /**
584 * submit to a publisher with no subscribers returns lag 0
585 */
586 public void testEmptySubmit() {
587 SubmissionPublisher<Integer> p = basicPublisher();
588 assertEquals(0, p.submit(1));
589 }
590
591 /**
592 * submit(null) throws NPE
593 */
594 public void testNullSubmit() {
595 SubmissionPublisher<Integer> p = basicPublisher();
596 try {
597 p.submit(null);
598 shouldThrow();
599 } catch (NullPointerException success) {}
600 }
601
602 /**
603 * submit returns number of lagged items, compatible with result
604 * of estimateMaximumLag.
605 */
606 public void testLaggedSubmit() {
607 SubmissionPublisher<Integer> p = basicPublisher();
608 TestSubscriber s1 = new TestSubscriber();
609 s1.request = false;
610 TestSubscriber s2 = new TestSubscriber();
611 s2.request = false;
612 p.subscribe(s1);
613 p.subscribe(s2);
614 s2.awaitSubscribe();
615 s1.awaitSubscribe();
616 assertEquals(1, p.submit(1));
617 assertTrue(p.estimateMaximumLag() >= 1);
618 assertTrue(p.submit(2) >= 2);
619 assertTrue(p.estimateMaximumLag() >= 2);
620 s1.sn.request(4);
621 assertTrue(p.submit(3) >= 3);
622 assertTrue(p.estimateMaximumLag() >= 3);
623 s2.sn.request(4);
624 p.submit(4);
625 p.close();
626 s2.awaitComplete();
627 assertEquals(4, s2.nexts);
628 s1.awaitComplete();
629 assertEquals(4, s2.nexts);
630 }
631
632 /**
633 * submit eventually issues requested items when buffer capacity is 1
634 */
635 public void testCap1Submit() {
636 SubmissionPublisher<Integer> p
637 = new SubmissionPublisher<>(basicExecutor, 1);
638 TestSubscriber s1 = new TestSubscriber();
639 TestSubscriber s2 = new TestSubscriber();
640 p.subscribe(s1);
641 p.subscribe(s2);
642 for (int i = 1; i <= 20; ++i) {
643 assertTrue(p.estimateMinimumDemand() <= 1);
644 assertTrue(p.submit(i) >= 0);
645 }
646 p.close();
647 s2.awaitComplete();
648 s1.awaitComplete();
649 assertEquals(20, s2.nexts);
650 assertEquals(1, s2.completes);
651 assertEquals(20, s1.nexts);
652 assertEquals(1, s1.completes);
653 }
654
655 static boolean noopHandle(AtomicInteger count) {
656 count.getAndIncrement();
657 return false;
658 }
659
660 static boolean reqHandle(AtomicInteger count, Subscriber s) {
661 count.getAndIncrement();
662 ((TestSubscriber)s).sn.request(Long.MAX_VALUE);
663 return true;
664 }
665
666 /**
667 * offer to a publisher with no subscribers returns lag 0
668 */
669 public void testEmptyOffer() {
670 SubmissionPublisher<Integer> p = basicPublisher();
671 assertEquals(0, p.offer(1, null));
672 }
673
674 /**
675 * offer(null) throws NPE
676 */
677 public void testNullOffer() {
678 SubmissionPublisher<Integer> p = basicPublisher();
679 try {
680 p.offer(null, null);
681 shouldThrow();
682 } catch (NullPointerException success) {}
683 }
684
685 /**
686 * offer returns number of lagged items if not saturated
687 */
688 public void testLaggedOffer() {
689 SubmissionPublisher<Integer> p = basicPublisher();
690 TestSubscriber s1 = new TestSubscriber();
691 s1.request = false;
692 TestSubscriber s2 = new TestSubscriber();
693 s2.request = false;
694 p.subscribe(s1);
695 p.subscribe(s2);
696 s2.awaitSubscribe();
697 s1.awaitSubscribe();
698 assertTrue(p.offer(1, null) >= 1);
699 assertTrue(p.offer(2, null) >= 2);
700 s1.sn.request(4);
701 assertTrue(p.offer(3, null) >= 3);
702 s2.sn.request(4);
703 p.offer(4, null);
704 p.close();
705 s2.awaitComplete();
706 assertEquals(4, s2.nexts);
707 s1.awaitComplete();
708 assertEquals(4, s2.nexts);
709 }
710
711 /**
712 * offer reports drops if saturated
713 */
714 public void testDroppedOffer() {
715 SubmissionPublisher<Integer> p
716 = new SubmissionPublisher<>(basicExecutor, 4);
717 TestSubscriber s1 = new TestSubscriber();
718 s1.request = false;
719 TestSubscriber s2 = new TestSubscriber();
720 s2.request = false;
721 p.subscribe(s1);
722 p.subscribe(s2);
723 s2.awaitSubscribe();
724 s1.awaitSubscribe();
725 for (int i = 1; i <= 4; ++i)
726 assertTrue(p.offer(i, null) >= 0);
727 p.offer(5, null);
728 assertTrue(p.offer(6, null) < 0);
729 s1.sn.request(64);
730 assertTrue(p.offer(7, null) < 0);
731 s2.sn.request(64);
732 p.close();
733 s2.awaitComplete();
734 assertTrue(s2.nexts >= 4);
735 s1.awaitComplete();
736 assertTrue(s1.nexts >= 4);
737 }
738
739 /**
740 * offer invokes drop handler if saturated
741 */
742 public void testHandledDroppedOffer() {
743 AtomicInteger calls = new AtomicInteger();
744 SubmissionPublisher<Integer> p
745 = new SubmissionPublisher<>(basicExecutor, 4);
746 TestSubscriber s1 = new TestSubscriber();
747 s1.request = false;
748 TestSubscriber s2 = new TestSubscriber();
749 s2.request = false;
750 p.subscribe(s1);
751 p.subscribe(s2);
752 s2.awaitSubscribe();
753 s1.awaitSubscribe();
754 for (int i = 1; i <= 4; ++i)
755 assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
756 p.offer(4, (s, x) -> noopHandle(calls));
757 assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
758 s1.sn.request(64);
759 assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
760 s2.sn.request(64);
761 p.close();
762 s2.awaitComplete();
763 s1.awaitComplete();
764 assertTrue(calls.get() >= 4);
765 }
766
767 /**
768 * offer succeeds if drop handler forces request
769 */
770 public void testRecoveredHandledDroppedOffer() {
771 AtomicInteger calls = new AtomicInteger();
772 SubmissionPublisher<Integer> p
773 = new SubmissionPublisher<>(basicExecutor, 4);
774 TestSubscriber s1 = new TestSubscriber();
775 s1.request = false;
776 TestSubscriber s2 = new TestSubscriber();
777 s2.request = false;
778 p.subscribe(s1);
779 p.subscribe(s2);
780 s2.awaitSubscribe();
781 s1.awaitSubscribe();
782 int n = 0;
783 for (int i = 1; i <= 8; ++i) {
784 int d = p.offer(i, (s, x) -> reqHandle(calls, s));
785 n = n + 2 + (d < 0 ? d : 0);
786 }
787 p.close();
788 s2.awaitComplete();
789 s1.awaitComplete();
790 assertEquals(n, s1.nexts + s2.nexts);
791 assertTrue(calls.get() >= 2);
792 }
793
794 /**
795 * Timed offer to a publisher with no subscribers returns lag 0
796 */
797 public void testEmptyTimedOffer() {
798 SubmissionPublisher<Integer> p = basicPublisher();
799 long startTime = System.nanoTime();
800 assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null));
801 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
802 }
803
804 /**
805 * Timed offer with null item or TimeUnit throws NPE
806 */
807 public void testNullTimedOffer() {
808 SubmissionPublisher<Integer> p = basicPublisher();
809 long startTime = System.nanoTime();
810 try {
811 p.offer(null, LONG_DELAY_MS, MILLISECONDS, null);
812 shouldThrow();
813 } catch (NullPointerException success) {}
814 try {
815 p.offer(1, LONG_DELAY_MS, null, null);
816 shouldThrow();
817 } catch (NullPointerException success) {}
818 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
819 }
820
821 /**
822 * Timed offer returns number of lagged items if not saturated
823 */
824 public void testLaggedTimedOffer() {
825 SubmissionPublisher<Integer> p = basicPublisher();
826 TestSubscriber s1 = new TestSubscriber();
827 s1.request = false;
828 TestSubscriber s2 = new TestSubscriber();
829 s2.request = false;
830 p.subscribe(s1);
831 p.subscribe(s2);
832 s2.awaitSubscribe();
833 s1.awaitSubscribe();
834 long startTime = System.nanoTime();
835 assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
836 assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
837 s1.sn.request(4);
838 assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
839 s2.sn.request(4);
840 p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
841 p.close();
842 s2.awaitComplete();
843 assertEquals(4, s2.nexts);
844 s1.awaitComplete();
845 assertEquals(4, s2.nexts);
846 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
847 }
848
849 /**
850 * Timed offer reports drops if saturated
851 */
852 public void testDroppedTimedOffer() {
853 SubmissionPublisher<Integer> p
854 = new SubmissionPublisher<>(basicExecutor, 4);
855 TestSubscriber s1 = new TestSubscriber();
856 s1.request = false;
857 TestSubscriber s2 = new TestSubscriber();
858 s2.request = false;
859 p.subscribe(s1);
860 p.subscribe(s2);
861 s2.awaitSubscribe();
862 s1.awaitSubscribe();
863 long delay = timeoutMillis();
864 for (int i = 1; i <= 4; ++i)
865 assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0);
866 long startTime = System.nanoTime();
867 assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0);
868 s1.sn.request(64);
869 assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0);
870 // 2 * delay should elapse but check only 1 * delay to allow timer slop
871 assertTrue(millisElapsedSince(startTime) >= delay);
872 s2.sn.request(64);
873 p.close();
874 s2.awaitComplete();
875 assertTrue(s2.nexts >= 2);
876 s1.awaitComplete();
877 assertTrue(s1.nexts >= 2);
878 }
879
880 /**
881 * Timed offer invokes drop handler if saturated
882 */
883 public void testHandledDroppedTimedOffer() {
884 AtomicInteger calls = new AtomicInteger();
885 SubmissionPublisher<Integer> p
886 = new SubmissionPublisher<>(basicExecutor, 4);
887 TestSubscriber s1 = new TestSubscriber();
888 s1.request = false;
889 TestSubscriber s2 = new TestSubscriber();
890 s2.request = false;
891 p.subscribe(s1);
892 p.subscribe(s2);
893 s2.awaitSubscribe();
894 s1.awaitSubscribe();
895 long delay = timeoutMillis();
896 for (int i = 1; i <= 4; ++i)
897 assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
898 long startTime = System.nanoTime();
899 assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
900 s1.sn.request(64);
901 assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
902 assertTrue(millisElapsedSince(startTime) >= delay);
903 s2.sn.request(64);
904 p.close();
905 s2.awaitComplete();
906 s1.awaitComplete();
907 assertTrue(calls.get() >= 2);
908 }
909
910 /**
911 * Timed offer succeeds if drop handler forces request
912 */
913 public void testRecoveredHandledDroppedTimedOffer() {
914 AtomicInteger calls = new AtomicInteger();
915 SubmissionPublisher<Integer> p
916 = new SubmissionPublisher<>(basicExecutor, 4);
917 TestSubscriber s1 = new TestSubscriber();
918 s1.request = false;
919 TestSubscriber s2 = new TestSubscriber();
920 s2.request = false;
921 p.subscribe(s1);
922 p.subscribe(s2);
923 s2.awaitSubscribe();
924 s1.awaitSubscribe();
925 int n = 0;
926 long delay = timeoutMillis();
927 long startTime = System.nanoTime();
928 for (int i = 1; i <= 6; ++i) {
929 int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
930 n = n + 2 + (d < 0 ? d : 0);
931 }
932 assertTrue(millisElapsedSince(startTime) >= delay);
933 p.close();
934 s2.awaitComplete();
935 s1.awaitComplete();
936 assertEquals(n, s1.nexts + s2.nexts);
937 assertTrue(calls.get() >= 2);
938 }
939
940 /**
941 * consume returns a CompletableFuture that is done when
942 * publisher completes
943 */
944 public void testConsume() {
945 AtomicInteger sum = new AtomicInteger();
946 SubmissionPublisher<Integer> p = basicPublisher();
947 CompletableFuture<Void> f =
948 p.consume((Integer x) -> sum.getAndAdd(x.intValue()));
949 int n = 20;
950 for (int i = 1; i <= n; ++i)
951 p.submit(i);
952 p.close();
953 f.join();
954 assertEquals((n * (n + 1)) / 2, sum.get());
955 }
956
957 /**
958 * consume(null) throws NPE
959 */
960 public void testConsumeNPE() {
961 SubmissionPublisher<Integer> p = basicPublisher();
962 try {
963 CompletableFuture<Void> f = p.consume(null);
964 shouldThrow();
965 } catch (NullPointerException success) {}
966 }
967
968 /**
969 * consume eventually stops processing published items if cancelled
970 */
971 public void testCancelledConsume() {
972 AtomicInteger count = new AtomicInteger();
973 SubmissionPublisher<Integer> p = basicPublisher();
974 CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
975 f.cancel(true);
976 int n = 1000000; // arbitrary limit
977 for (int i = 1; i <= n; ++i)
978 p.submit(i);
979 assertTrue(count.get() < n);
980 }
981
982 }