ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SubmissionPublisherTest.java
Revision: 1.16
Committed: Sun Dec 11 22:11:45 2016 UTC (7 years, 4 months ago) by dl
Branch: MAIN
Changes since 1.15: +3 -0 lines
Log Message:
Test that onSubscribe has precedence over onError

File Contents

# Content
1 /*
2 * Written by Doug Lea and Martin Buchholz with assistance from
3 * members of JCP JSR-166 Expert Group and released to the public
4 * domain, as explained at
5 * http://creativecommons.org/publicdomain/zero/1.0/
6 */
7
8 import java.util.concurrent.CompletableFuture;
9 import java.util.concurrent.Executor;
10 import java.util.concurrent.Executors;
11 import java.util.concurrent.Flow;
12 import java.util.concurrent.ForkJoinPool;
13 import java.util.concurrent.SubmissionPublisher;
14 import java.util.concurrent.atomic.AtomicInteger;
15 import junit.framework.Test;
16 import junit.framework.TestSuite;
17
18 import static java.util.concurrent.Flow.Subscriber;
19 import static java.util.concurrent.Flow.Subscription;
20 import static java.util.concurrent.TimeUnit.MILLISECONDS;
21
22 public class SubmissionPublisherTest extends JSR166TestCase {
23
24 public static void main(String[] args) {
25 main(suite(), args);
26 }
27 public static Test suite() {
28 return new TestSuite(SubmissionPublisherTest.class);
29 }
30
31 final Executor basicExecutor = basicPublisher().getExecutor();
32
33 static SubmissionPublisher<Integer> basicPublisher() {
34 return new SubmissionPublisher<Integer>();
35 }
36
37 static class SPException extends RuntimeException {}
38
39 class TestSubscriber implements Subscriber<Integer> {
40 volatile Subscription sn;
41 int last; // Requires that onNexts are in numeric order
42 volatile int nexts;
43 volatile int errors;
44 volatile int completes;
45 volatile boolean throwOnCall = false;
46 volatile boolean request = true;
47 volatile Throwable lastError;
48
49 public synchronized void onSubscribe(Subscription s) {
50 threadAssertTrue(sn == null);
51 sn = s;
52 notifyAll();
53 if (throwOnCall)
54 throw new SPException();
55 if (request)
56 sn.request(1L);
57 }
58 public synchronized void onNext(Integer t) {
59 ++nexts;
60 notifyAll();
61 int current = t.intValue();
62 threadAssertTrue(current >= last);
63 last = current;
64 if (request)
65 sn.request(1L);
66 if (throwOnCall)
67 throw new SPException();
68 }
69 public synchronized void onError(Throwable t) {
70 threadAssertTrue(completes == 0);
71 threadAssertTrue(errors == 0);
72 lastError = t;
73 ++errors;
74 notifyAll();
75 }
76 public synchronized void onComplete() {
77 threadAssertTrue(completes == 0);
78 ++completes;
79 notifyAll();
80 }
81
82 synchronized void awaitSubscribe() {
83 while (sn == null) {
84 try {
85 wait();
86 } catch (Exception ex) {
87 threadUnexpectedException(ex);
88 break;
89 }
90 }
91 }
92 synchronized void awaitNext(int n) {
93 while (nexts < n) {
94 try {
95 wait();
96 } catch (Exception ex) {
97 threadUnexpectedException(ex);
98 break;
99 }
100 }
101 }
102 synchronized void awaitComplete() {
103 while (completes == 0 && errors == 0) {
104 try {
105 wait();
106 } catch (Exception ex) {
107 threadUnexpectedException(ex);
108 break;
109 }
110 }
111 }
112 synchronized void awaitError() {
113 while (errors == 0) {
114 try {
115 wait();
116 } catch (Exception ex) {
117 threadUnexpectedException(ex);
118 break;
119 }
120 }
121 }
122
123 }
124
125 /**
126 * A new SubmissionPublisher has no subscribers, a non-null
127 * executor, a power-of-two capacity, is not closed, and reports
128 * zero demand and lag
129 */
130 void checkInitialState(SubmissionPublisher<?> p) {
131 assertFalse(p.hasSubscribers());
132 assertEquals(0, p.getNumberOfSubscribers());
133 assertTrue(p.getSubscribers().isEmpty());
134 assertFalse(p.isClosed());
135 assertNull(p.getClosedException());
136 int n = p.getMaxBufferCapacity();
137 assertTrue((n & (n - 1)) == 0); // power of two
138 assertNotNull(p.getExecutor());
139 assertEquals(0, p.estimateMinimumDemand());
140 assertEquals(0, p.estimateMaximumLag());
141 }
142
143 /**
144 * A default-constructed SubmissionPublisher has no subscribers,
145 * is not closed, has default buffer size, and uses the
146 * defaultExecutor
147 */
148 public void testConstructor1() {
149 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>();
150 checkInitialState(p);
151 assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
152 Executor e = p.getExecutor(), c = ForkJoinPool.commonPool();
153 if (ForkJoinPool.getCommonPoolParallelism() > 1)
154 assertSame(e, c);
155 else
156 assertNotSame(e, c);
157 }
158
159 /**
160 * A new SubmissionPublisher has no subscribers, is not closed,
161 * has the given buffer size, and uses the given executor
162 */
163 public void testConstructor2() {
164 Executor e = Executors.newFixedThreadPool(1);
165 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(e, 8);
166 checkInitialState(p);
167 assertSame(p.getExecutor(), e);
168 assertEquals(8, p.getMaxBufferCapacity());
169 }
170
171 /**
172 * A null Executor argument to SubmissionPublisher constructor throws NPE
173 */
174 public void testConstructor3() {
175 try {
176 new SubmissionPublisher<Integer>(null, 8);
177 shouldThrow();
178 } catch (NullPointerException success) {}
179 }
180
181 /**
182 * A negative capacity argument to SubmissionPublisher constructor
183 * throws IAE
184 */
185 public void testConstructor4() {
186 Executor e = Executors.newFixedThreadPool(1);
187 try {
188 new SubmissionPublisher<Integer>(e, -1);
189 shouldThrow();
190 } catch (IllegalArgumentException success) {}
191 }
192
193 /**
194 * A closed publisher reports isClosed with no closedException and
195 * throws ISE upon attempted submission; a subsequent close or
196 * closeExceptionally has no additional effect.
197 */
198 public void testClose() {
199 SubmissionPublisher<Integer> p = basicPublisher();
200 checkInitialState(p);
201 p.close();
202 assertTrue(p.isClosed());
203 assertNull(p.getClosedException());
204 try {
205 p.submit(1);
206 shouldThrow();
207 } catch (IllegalStateException success) {}
208 Throwable ex = new SPException();
209 p.closeExceptionally(ex);
210 assertTrue(p.isClosed());
211 assertNull(p.getClosedException());
212 }
213
214 /**
215 * A publisher closedExceptionally reports isClosed with the
216 * closedException and throws ISE upon attempted submission; a
217 * subsequent close or closeExceptionally has no additional
218 * effect.
219 */
220 public void testCloseExceptionally() {
221 SubmissionPublisher<Integer> p = basicPublisher();
222 checkInitialState(p);
223 Throwable ex = new SPException();
224 p.closeExceptionally(ex);
225 assertTrue(p.isClosed());
226 assertSame(p.getClosedException(), ex);
227 try {
228 p.submit(1);
229 shouldThrow();
230 } catch (IllegalStateException success) {}
231 p.close();
232 assertTrue(p.isClosed());
233 assertSame(p.getClosedException(), ex);
234 }
235
236 /**
237 * Upon subscription, the subscriber's onSubscribe is called, no
238 * other Subscriber methods are invoked, the publisher
239 * hasSubscribers, isSubscribed is true, and existing
240 * subscriptions are unaffected.
241 */
242 public void testSubscribe1() {
243 TestSubscriber s = new TestSubscriber();
244 SubmissionPublisher<Integer> p = basicPublisher();
245 p.subscribe(s);
246 assertTrue(p.hasSubscribers());
247 assertEquals(1, p.getNumberOfSubscribers());
248 assertTrue(p.getSubscribers().contains(s));
249 assertTrue(p.isSubscribed(s));
250 s.awaitSubscribe();
251 assertNotNull(s.sn);
252 assertEquals(0, s.nexts);
253 assertEquals(0, s.errors);
254 assertEquals(0, s.completes);
255 TestSubscriber s2 = new TestSubscriber();
256 p.subscribe(s2);
257 assertTrue(p.hasSubscribers());
258 assertEquals(2, p.getNumberOfSubscribers());
259 assertTrue(p.getSubscribers().contains(s));
260 assertTrue(p.getSubscribers().contains(s2));
261 assertTrue(p.isSubscribed(s));
262 assertTrue(p.isSubscribed(s2));
263 s2.awaitSubscribe();
264 assertNotNull(s2.sn);
265 assertEquals(0, s2.nexts);
266 assertEquals(0, s2.errors);
267 assertEquals(0, s2.completes);
268 p.close();
269 }
270
271 /**
272 * If closed, upon subscription, the subscriber's onComplete
273 * method is invoked
274 */
275 public void testSubscribe2() {
276 TestSubscriber s = new TestSubscriber();
277 SubmissionPublisher<Integer> p = basicPublisher();
278 p.close();
279 p.subscribe(s);
280 s.awaitComplete();
281 assertEquals(0, s.nexts);
282 assertEquals(0, s.errors);
283 assertEquals(1, s.completes, 1);
284 }
285
286 /**
287 * If closedExceptionally, upon subscription, the subscriber's
288 * onError method is invoked
289 */
290 public void testSubscribe3() {
291 TestSubscriber s = new TestSubscriber();
292 SubmissionPublisher<Integer> p = basicPublisher();
293 Throwable ex = new SPException();
294 p.closeExceptionally(ex);
295 assertTrue(p.isClosed());
296 assertSame(p.getClosedException(), ex);
297 p.subscribe(s);
298 s.awaitError();
299 assertEquals(0, s.nexts);
300 assertEquals(1, s.errors);
301 }
302
303 /**
304 * Upon attempted resubscription, the subscriber's onError is
305 * called and the subscription is cancelled.
306 */
307 public void testSubscribe4() {
308 TestSubscriber s = new TestSubscriber();
309 SubmissionPublisher<Integer> p = basicPublisher();
310 p.subscribe(s);
311 assertTrue(p.hasSubscribers());
312 assertEquals(1, p.getNumberOfSubscribers());
313 assertTrue(p.getSubscribers().contains(s));
314 assertTrue(p.isSubscribed(s));
315 s.awaitSubscribe();
316 assertNotNull(s.sn);
317 assertEquals(0, s.nexts);
318 assertEquals(0, s.errors);
319 assertEquals(0, s.completes);
320 p.subscribe(s);
321 s.awaitError();
322 assertEquals(0, s.nexts);
323 assertEquals(1, s.errors);
324 assertFalse(p.isSubscribed(s));
325 }
326
327 /**
328 * An exception thrown in onSubscribe causes onError
329 */
330 public void testSubscribe5() {
331 TestSubscriber s = new TestSubscriber();
332 SubmissionPublisher<Integer> p = basicPublisher();
333 s.throwOnCall = true;
334 try {
335 p.subscribe(s);
336 } catch (Exception ok) {}
337 s.awaitError();
338 assertEquals(0, s.nexts);
339 assertEquals(1, s.errors);
340 assertEquals(0, s.completes);
341 }
342
343 /**
344 * subscribe(null) throws NPE
345 */
346 public void testSubscribe6() {
347 SubmissionPublisher<Integer> p = basicPublisher();
348 try {
349 p.subscribe(null);
350 shouldThrow();
351 } catch (NullPointerException success) {}
352 checkInitialState(p);
353 }
354
355 /**
356 * Closing a publisher causes onComplete to subscribers
357 */
358 public void testCloseCompletes() {
359 SubmissionPublisher<Integer> p = basicPublisher();
360 TestSubscriber s1 = new TestSubscriber();
361 TestSubscriber s2 = new TestSubscriber();
362 p.subscribe(s1);
363 p.subscribe(s2);
364 p.submit(1);
365 p.close();
366 assertTrue(p.isClosed());
367 assertNull(p.getClosedException());
368 s1.awaitComplete();
369 assertEquals(1, s1.nexts);
370 assertEquals(1, s1.completes);
371 s2.awaitComplete();
372 assertEquals(1, s2.nexts);
373 assertEquals(1, s2.completes);
374 }
375
376 /**
377 * Closing a publisher exceptionally causes onError to subscribers
378 * after they are subscribed
379 */
380 public void testCloseExceptionallyError() {
381 SubmissionPublisher<Integer> p = basicPublisher();
382 TestSubscriber s1 = new TestSubscriber();
383 TestSubscriber s2 = new TestSubscriber();
384 p.subscribe(s1);
385 p.subscribe(s2);
386 p.submit(1);
387 p.closeExceptionally(new SPException());
388 assertTrue(p.isClosed());
389 s1.awaitSubscribe();
390 s1.awaitError();
391 assertTrue(s1.nexts <= 1);
392 assertEquals(1, s1.errors);
393 s1.awaitSubscribe();
394 s2.awaitError();
395 assertTrue(s2.nexts <= 1);
396 assertEquals(1, s2.errors);
397 }
398
399 /**
400 * Cancelling a subscription eventually causes no more onNexts to be issued
401 */
402 public void testCancel() {
403 SubmissionPublisher<Integer> p = basicPublisher();
404 TestSubscriber s1 = new TestSubscriber();
405 TestSubscriber s2 = new TestSubscriber();
406 p.subscribe(s1);
407 p.subscribe(s2);
408 s1.awaitSubscribe();
409 p.submit(1);
410 s1.sn.cancel();
411 for (int i = 2; i <= 20; ++i)
412 p.submit(i);
413 p.close();
414 s2.awaitComplete();
415 assertEquals(20, s2.nexts);
416 assertEquals(1, s2.completes);
417 assertTrue(s1.nexts < 20);
418 assertFalse(p.isSubscribed(s1));
419 }
420
421 /**
422 * Throwing an exception in onNext causes onError
423 */
424 public void testThrowOnNext() {
425 SubmissionPublisher<Integer> p = basicPublisher();
426 TestSubscriber s1 = new TestSubscriber();
427 TestSubscriber s2 = new TestSubscriber();
428 p.subscribe(s1);
429 p.subscribe(s2);
430 s1.awaitSubscribe();
431 p.submit(1);
432 s1.throwOnCall = true;
433 p.submit(2);
434 p.close();
435 s2.awaitComplete();
436 assertEquals(2, s2.nexts);
437 s1.awaitComplete();
438 assertEquals(1, s1.errors);
439 }
440
441 /**
442 * If a handler is supplied in constructor, it is invoked when
443 * subscriber throws an exception in onNext
444 */
445 public void testThrowOnNextHandler() {
446 AtomicInteger calls = new AtomicInteger();
447 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>
448 (basicExecutor, 8,
449 (s, e) -> calls.getAndIncrement());
450 TestSubscriber s1 = new TestSubscriber();
451 TestSubscriber s2 = new TestSubscriber();
452 p.subscribe(s1);
453 p.subscribe(s2);
454 s1.awaitSubscribe();
455 p.submit(1);
456 s1.throwOnCall = true;
457 p.submit(2);
458 p.close();
459 s2.awaitComplete();
460 assertEquals(2, s2.nexts);
461 assertEquals(1, s2.completes);
462 s1.awaitError();
463 assertEquals(1, s1.errors);
464 assertEquals(1, calls.get());
465 }
466
467 /**
468 * onNext items are issued in the same order to each subscriber
469 */
470 public void testOrder() {
471 SubmissionPublisher<Integer> p = basicPublisher();
472 TestSubscriber s1 = new TestSubscriber();
473 TestSubscriber s2 = new TestSubscriber();
474 p.subscribe(s1);
475 p.subscribe(s2);
476 for (int i = 1; i <= 20; ++i)
477 p.submit(i);
478 p.close();
479 s2.awaitComplete();
480 s1.awaitComplete();
481 assertEquals(20, s2.nexts);
482 assertEquals(1, s2.completes);
483 assertEquals(20, s1.nexts);
484 assertEquals(1, s1.completes);
485 }
486
487 /**
488 * onNext is issued only if requested
489 */
490 public void testRequest1() {
491 SubmissionPublisher<Integer> p = basicPublisher();
492 TestSubscriber s1 = new TestSubscriber();
493 s1.request = false;
494 p.subscribe(s1);
495 s1.awaitSubscribe();
496 assertTrue(p.estimateMinimumDemand() == 0);
497 TestSubscriber s2 = new TestSubscriber();
498 p.subscribe(s2);
499 p.submit(1);
500 p.submit(2);
501 s2.awaitNext(1);
502 assertEquals(0, s1.nexts);
503 s1.sn.request(3);
504 p.submit(3);
505 p.close();
506 s2.awaitComplete();
507 assertEquals(3, s2.nexts);
508 assertEquals(1, s2.completes);
509 s1.awaitComplete();
510 assertTrue(s1.nexts > 0);
511 assertEquals(1, s1.completes);
512 }
513
514 /**
515 * onNext is not issued when requests become zero
516 */
517 public void testRequest2() {
518 SubmissionPublisher<Integer> p = basicPublisher();
519 TestSubscriber s1 = new TestSubscriber();
520 TestSubscriber s2 = new TestSubscriber();
521 p.subscribe(s1);
522 p.subscribe(s2);
523 s2.awaitSubscribe();
524 s1.awaitSubscribe();
525 s1.request = false;
526 p.submit(1);
527 p.submit(2);
528 p.close();
529 s2.awaitComplete();
530 assertEquals(2, s2.nexts);
531 assertEquals(1, s2.completes);
532 s1.awaitNext(1);
533 assertEquals(1, s1.nexts);
534 }
535
536 /**
537 * Negative request causes error
538 */
539 public void testRequest3() {
540 SubmissionPublisher<Integer> p = basicPublisher();
541 TestSubscriber s1 = new TestSubscriber();
542 TestSubscriber s2 = new TestSubscriber();
543 p.subscribe(s1);
544 p.subscribe(s2);
545 s2.awaitSubscribe();
546 s1.awaitSubscribe();
547 s1.sn.request(-1L);
548 p.submit(1);
549 p.submit(2);
550 p.close();
551 s2.awaitComplete();
552 assertEquals(2, s2.nexts);
553 assertEquals(1, s2.completes);
554 s1.awaitError();
555 assertEquals(1, s1.errors);
556 assertTrue(s1.lastError instanceof IllegalArgumentException);
557 }
558
559 /**
560 * estimateMinimumDemand reports 0 until request, nonzero after
561 * request, and zero again after delivery
562 */
563 public void testEstimateMinimumDemand() {
564 TestSubscriber s = new TestSubscriber();
565 SubmissionPublisher<Integer> p = basicPublisher();
566 s.request = false;
567 p.subscribe(s);
568 s.awaitSubscribe();
569 assertEquals(0, p.estimateMinimumDemand());
570 s.sn.request(1);
571 assertEquals(1, p.estimateMinimumDemand());
572 p.submit(1);
573 s.awaitNext(1);
574 assertEquals(0, p.estimateMinimumDemand());
575 }
576
577 /**
578 * submit to a publisher with no subscribers returns lag 0
579 */
580 public void testEmptySubmit() {
581 SubmissionPublisher<Integer> p = basicPublisher();
582 assertEquals(0, p.submit(1));
583 }
584
585 /**
586 * submit(null) throws NPE
587 */
588 public void testNullSubmit() {
589 SubmissionPublisher<Integer> p = basicPublisher();
590 try {
591 p.submit(null);
592 shouldThrow();
593 } catch (NullPointerException success) {}
594 }
595
596 /**
597 * submit returns number of lagged items, compatible with result
598 * of estimateMaximumLag.
599 */
600 public void testLaggedSubmit() {
601 SubmissionPublisher<Integer> p = basicPublisher();
602 TestSubscriber s1 = new TestSubscriber();
603 s1.request = false;
604 TestSubscriber s2 = new TestSubscriber();
605 s2.request = false;
606 p.subscribe(s1);
607 p.subscribe(s2);
608 s2.awaitSubscribe();
609 s1.awaitSubscribe();
610 assertEquals(1, p.submit(1));
611 assertTrue(p.estimateMaximumLag() >= 1);
612 assertTrue(p.submit(2) >= 2);
613 assertTrue(p.estimateMaximumLag() >= 2);
614 s1.sn.request(4);
615 assertTrue(p.submit(3) >= 3);
616 assertTrue(p.estimateMaximumLag() >= 3);
617 s2.sn.request(4);
618 p.submit(4);
619 p.close();
620 s2.awaitComplete();
621 assertEquals(4, s2.nexts);
622 s1.awaitComplete();
623 assertEquals(4, s2.nexts);
624 }
625
626 /**
627 * submit eventually issues requested items when buffer capacity is 1
628 */
629 public void testCap1Submit() {
630 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
631 basicExecutor, 1);
632 TestSubscriber s1 = new TestSubscriber();
633 TestSubscriber s2 = new TestSubscriber();
634 p.subscribe(s1);
635 p.subscribe(s2);
636 for (int i = 1; i <= 20; ++i) {
637 assertTrue(p.estimateMinimumDemand() <= 1);
638 assertTrue(p.submit(i) >= 0);
639 }
640 p.close();
641 s2.awaitComplete();
642 s1.awaitComplete();
643 assertEquals(20, s2.nexts);
644 assertEquals(1, s2.completes);
645 assertEquals(20, s1.nexts);
646 assertEquals(1, s1.completes);
647 }
648
649 static boolean noopHandle(AtomicInteger count) {
650 count.getAndIncrement();
651 return false;
652 }
653
654 static boolean reqHandle(AtomicInteger count, Subscriber s) {
655 count.getAndIncrement();
656 ((TestSubscriber)s).sn.request(Long.MAX_VALUE);
657 return true;
658 }
659
660 /**
661 * offer to a publisher with no subscribers returns lag 0
662 */
663 public void testEmptyOffer() {
664 SubmissionPublisher<Integer> p = basicPublisher();
665 assertEquals(0, p.offer(1, null));
666 }
667
668 /**
669 * offer(null) throws NPE
670 */
671 public void testNullOffer() {
672 SubmissionPublisher<Integer> p = basicPublisher();
673 try {
674 p.offer(null, null);
675 shouldThrow();
676 } catch (NullPointerException success) {}
677 }
678
679 /**
680 * offer returns number of lagged items if not saturated
681 */
682 public void testLaggedOffer() {
683 SubmissionPublisher<Integer> p = basicPublisher();
684 TestSubscriber s1 = new TestSubscriber();
685 s1.request = false;
686 TestSubscriber s2 = new TestSubscriber();
687 s2.request = false;
688 p.subscribe(s1);
689 p.subscribe(s2);
690 s2.awaitSubscribe();
691 s1.awaitSubscribe();
692 assertTrue(p.offer(1, null) >= 1);
693 assertTrue(p.offer(2, null) >= 2);
694 s1.sn.request(4);
695 assertTrue(p.offer(3, null) >= 3);
696 s2.sn.request(4);
697 p.offer(4, null);
698 p.close();
699 s2.awaitComplete();
700 assertEquals(4, s2.nexts);
701 s1.awaitComplete();
702 assertEquals(4, s2.nexts);
703 }
704
705 /**
706 * offer reports drops if saturated
707 */
708 public void testDroppedOffer() {
709 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
710 basicExecutor, 4);
711 TestSubscriber s1 = new TestSubscriber();
712 s1.request = false;
713 TestSubscriber s2 = new TestSubscriber();
714 s2.request = false;
715 p.subscribe(s1);
716 p.subscribe(s2);
717 s2.awaitSubscribe();
718 s1.awaitSubscribe();
719 for (int i = 1; i <= 4; ++i)
720 assertTrue(p.offer(i, null) >= 0);
721 p.offer(5, null);
722 assertTrue(p.offer(6, null) < 0);
723 s1.sn.request(64);
724 assertTrue(p.offer(7, null) < 0);
725 s2.sn.request(64);
726 p.close();
727 s2.awaitComplete();
728 assertTrue(s2.nexts >= 4);
729 s1.awaitComplete();
730 assertTrue(s1.nexts >= 4);
731 }
732
733 /**
734 * offer invokes drop handler if saturated
735 */
736 public void testHandledDroppedOffer() {
737 AtomicInteger calls = new AtomicInteger();
738 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
739 basicExecutor, 4);
740 TestSubscriber s1 = new TestSubscriber();
741 s1.request = false;
742 TestSubscriber s2 = new TestSubscriber();
743 s2.request = false;
744 p.subscribe(s1);
745 p.subscribe(s2);
746 s2.awaitSubscribe();
747 s1.awaitSubscribe();
748 for (int i = 1; i <= 4; ++i)
749 assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
750 p.offer(4, (s, x) -> noopHandle(calls));
751 assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
752 s1.sn.request(64);
753 assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
754 s2.sn.request(64);
755 p.close();
756 s2.awaitComplete();
757 s1.awaitComplete();
758 assertTrue(calls.get() >= 4);
759 }
760
761 /**
762 * offer succeeds if drop handler forces request
763 */
764 public void testRecoveredHandledDroppedOffer() {
765 AtomicInteger calls = new AtomicInteger();
766 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
767 basicExecutor, 4);
768 TestSubscriber s1 = new TestSubscriber();
769 s1.request = false;
770 TestSubscriber s2 = new TestSubscriber();
771 s2.request = false;
772 p.subscribe(s1);
773 p.subscribe(s2);
774 s2.awaitSubscribe();
775 s1.awaitSubscribe();
776 int n = 0;
777 for (int i = 1; i <= 8; ++i) {
778 int d = p.offer(i, (s, x) -> reqHandle(calls, s));
779 n = n + 2 + (d < 0 ? d : 0);
780 }
781 p.close();
782 s2.awaitComplete();
783 s1.awaitComplete();
784 assertEquals(n, s1.nexts + s2.nexts);
785 assertTrue(calls.get() >= 2);
786 }
787
788 /**
789 * Timed offer to a publisher with no subscribers returns lag 0
790 */
791 public void testEmptyTimedOffer() {
792 SubmissionPublisher<Integer> p = basicPublisher();
793 long startTime = System.nanoTime();
794 assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null));
795 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
796 }
797
798 /**
799 * Timed offer with null item or TimeUnit throws NPE
800 */
801 public void testNullTimedOffer() {
802 SubmissionPublisher<Integer> p = basicPublisher();
803 long startTime = System.nanoTime();
804 try {
805 p.offer(null, LONG_DELAY_MS, MILLISECONDS, null);
806 shouldThrow();
807 } catch (NullPointerException success) {}
808 try {
809 p.offer(1, LONG_DELAY_MS, null, null);
810 shouldThrow();
811 } catch (NullPointerException success) {}
812 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
813 }
814
815 /**
816 * Timed offer returns number of lagged items if not saturated
817 */
818 public void testLaggedTimedOffer() {
819 SubmissionPublisher<Integer> p = basicPublisher();
820 TestSubscriber s1 = new TestSubscriber();
821 s1.request = false;
822 TestSubscriber s2 = new TestSubscriber();
823 s2.request = false;
824 p.subscribe(s1);
825 p.subscribe(s2);
826 s2.awaitSubscribe();
827 s1.awaitSubscribe();
828 long startTime = System.nanoTime();
829 assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
830 assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
831 s1.sn.request(4);
832 assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
833 s2.sn.request(4);
834 p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
835 p.close();
836 s2.awaitComplete();
837 assertEquals(4, s2.nexts);
838 s1.awaitComplete();
839 assertEquals(4, s2.nexts);
840 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
841 }
842
843 /**
844 * Timed offer reports drops if saturated
845 */
846 public void testDroppedTimedOffer() {
847 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
848 basicExecutor, 4);
849 TestSubscriber s1 = new TestSubscriber();
850 s1.request = false;
851 TestSubscriber s2 = new TestSubscriber();
852 s2.request = false;
853 p.subscribe(s1);
854 p.subscribe(s2);
855 s2.awaitSubscribe();
856 s1.awaitSubscribe();
857 long delay = timeoutMillis();
858 for (int i = 1; i <= 4; ++i)
859 assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0);
860 long startTime = System.nanoTime();
861 assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0);
862 s1.sn.request(64);
863 assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0);
864 // 2 * delay should elapse but check only 1 * delay to allow timer slop
865 assertTrue(millisElapsedSince(startTime) >= delay);
866 s2.sn.request(64);
867 p.close();
868 s2.awaitComplete();
869 assertTrue(s2.nexts >= 2);
870 s1.awaitComplete();
871 assertTrue(s1.nexts >= 2);
872 }
873
874 /**
875 * Timed offer invokes drop handler if saturated
876 */
877 public void testHandledDroppedTimedOffer() {
878 AtomicInteger calls = new AtomicInteger();
879 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
880 basicExecutor, 4);
881 TestSubscriber s1 = new TestSubscriber();
882 s1.request = false;
883 TestSubscriber s2 = new TestSubscriber();
884 s2.request = false;
885 p.subscribe(s1);
886 p.subscribe(s2);
887 s2.awaitSubscribe();
888 s1.awaitSubscribe();
889 long delay = timeoutMillis();
890 for (int i = 1; i <= 4; ++i)
891 assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
892 long startTime = System.nanoTime();
893 assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
894 s1.sn.request(64);
895 assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
896 assertTrue(millisElapsedSince(startTime) >= delay);
897 s2.sn.request(64);
898 p.close();
899 s2.awaitComplete();
900 s1.awaitComplete();
901 assertTrue(calls.get() >= 2);
902 }
903
904 /**
905 * Timed offer succeeds if drop handler forces request
906 */
907 public void testRecoveredHandledDroppedTimedOffer() {
908 AtomicInteger calls = new AtomicInteger();
909 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
910 basicExecutor, 4);
911 TestSubscriber s1 = new TestSubscriber();
912 s1.request = false;
913 TestSubscriber s2 = new TestSubscriber();
914 s2.request = false;
915 p.subscribe(s1);
916 p.subscribe(s2);
917 s2.awaitSubscribe();
918 s1.awaitSubscribe();
919 int n = 0;
920 long delay = timeoutMillis();
921 long startTime = System.nanoTime();
922 for (int i = 1; i <= 6; ++i) {
923 int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
924 n = n + 2 + (d < 0 ? d : 0);
925 }
926 assertTrue(millisElapsedSince(startTime) >= delay);
927 p.close();
928 s2.awaitComplete();
929 s1.awaitComplete();
930 assertEquals(n, s1.nexts + s2.nexts);
931 assertTrue(calls.get() >= 2);
932 }
933
934 /**
935 * consume returns a CompletableFuture that is done when
936 * publisher completes
937 */
938 public void testConsume() {
939 AtomicInteger sum = new AtomicInteger();
940 SubmissionPublisher<Integer> p = basicPublisher();
941 CompletableFuture<Void> f =
942 p.consume((Integer x) -> sum.getAndAdd(x.intValue()));
943 int n = 20;
944 for (int i = 1; i <= n; ++i)
945 p.submit(i);
946 p.close();
947 f.join();
948 assertEquals((n * (n + 1)) / 2, sum.get());
949 }
950
951 /**
952 * consume(null) throws NPE
953 */
954 public void testConsumeNPE() {
955 SubmissionPublisher<Integer> p = basicPublisher();
956 try {
957 CompletableFuture<Void> f = p.consume(null);
958 shouldThrow();
959 } catch (NullPointerException success) {}
960 }
961
962 /**
963 * consume eventually stops processing published items if cancelled
964 */
965 public void testCancelledConsume() {
966 AtomicInteger count = new AtomicInteger();
967 SubmissionPublisher<Integer> p = basicPublisher();
968 CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
969 f.cancel(true);
970 int n = 1000000; // arbitrary limit
971 for (int i = 1; i <= n; ++i)
972 p.submit(i);
973 assertTrue(count.get() < n);
974 }
975
976 }