ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SubmissionPublisherTest.java
Revision: 1.18
Committed: Wed Jan 4 06:09:58 2017 UTC (7 years, 3 months ago) by jsr166
Branch: MAIN
Changes since 1.17: +18 -19 lines
Log Message:
convert to Diamond

File Contents

# Content
1 /*
2 * Written by Doug Lea and Martin Buchholz with assistance from
3 * members of JCP JSR-166 Expert Group and released to the public
4 * domain, as explained at
5 * http://creativecommons.org/publicdomain/zero/1.0/
6 */
7
8 import java.util.concurrent.CompletableFuture;
9 import java.util.concurrent.Executor;
10 import java.util.concurrent.Executors;
11 import java.util.concurrent.Flow;
12 import java.util.concurrent.ForkJoinPool;
13 import java.util.concurrent.SubmissionPublisher;
14 import java.util.concurrent.atomic.AtomicInteger;
15 import junit.framework.Test;
16 import junit.framework.TestSuite;
17
18 import static java.util.concurrent.Flow.Subscriber;
19 import static java.util.concurrent.Flow.Subscription;
20 import static java.util.concurrent.TimeUnit.MILLISECONDS;
21
22 public class SubmissionPublisherTest extends JSR166TestCase {
23
24 public static void main(String[] args) {
25 main(suite(), args);
26 }
27 public static Test suite() {
28 return new TestSuite(SubmissionPublisherTest.class);
29 }
30
31 final Executor basicExecutor = basicPublisher().getExecutor();
32
33 static SubmissionPublisher<Integer> basicPublisher() {
34 return new SubmissionPublisher<Integer>();
35 }
36
37 static class SPException extends RuntimeException {}
38
39 class TestSubscriber implements Subscriber<Integer> {
40 volatile Subscription sn;
41 int last; // Requires that onNexts are in numeric order
42 volatile int nexts;
43 volatile int errors;
44 volatile int completes;
45 volatile boolean throwOnCall = false;
46 volatile boolean request = true;
47 volatile Throwable lastError;
48
49 public synchronized void onSubscribe(Subscription s) {
50 threadAssertTrue(sn == null);
51 sn = s;
52 notifyAll();
53 if (throwOnCall)
54 throw new SPException();
55 if (request)
56 sn.request(1L);
57 }
58 public synchronized void onNext(Integer t) {
59 ++nexts;
60 notifyAll();
61 int current = t.intValue();
62 threadAssertTrue(current >= last);
63 last = current;
64 if (request)
65 sn.request(1L);
66 if (throwOnCall)
67 throw new SPException();
68 }
69 public synchronized void onError(Throwable t) {
70 threadAssertTrue(completes == 0);
71 threadAssertTrue(errors == 0);
72 lastError = t;
73 ++errors;
74 notifyAll();
75 }
76 public synchronized void onComplete() {
77 threadAssertTrue(completes == 0);
78 ++completes;
79 notifyAll();
80 }
81
82 synchronized void awaitSubscribe() {
83 while (sn == null) {
84 try {
85 wait();
86 } catch (Exception ex) {
87 threadUnexpectedException(ex);
88 break;
89 }
90 }
91 }
92 synchronized void awaitNext(int n) {
93 while (nexts < n) {
94 try {
95 wait();
96 } catch (Exception ex) {
97 threadUnexpectedException(ex);
98 break;
99 }
100 }
101 }
102 synchronized void awaitComplete() {
103 while (completes == 0 && errors == 0) {
104 try {
105 wait();
106 } catch (Exception ex) {
107 threadUnexpectedException(ex);
108 break;
109 }
110 }
111 }
112 synchronized void awaitError() {
113 while (errors == 0) {
114 try {
115 wait();
116 } catch (Exception ex) {
117 threadUnexpectedException(ex);
118 break;
119 }
120 }
121 }
122
123 }
124
125 /**
126 * A new SubmissionPublisher has no subscribers, a non-null
127 * executor, a power-of-two capacity, is not closed, and reports
128 * zero demand and lag
129 */
130 void checkInitialState(SubmissionPublisher<?> p) {
131 assertFalse(p.hasSubscribers());
132 assertEquals(0, p.getNumberOfSubscribers());
133 assertTrue(p.getSubscribers().isEmpty());
134 assertFalse(p.isClosed());
135 assertNull(p.getClosedException());
136 int n = p.getMaxBufferCapacity();
137 assertTrue((n & (n - 1)) == 0); // power of two
138 assertNotNull(p.getExecutor());
139 assertEquals(0, p.estimateMinimumDemand());
140 assertEquals(0, p.estimateMaximumLag());
141 }
142
143 /**
144 * A default-constructed SubmissionPublisher has no subscribers,
145 * is not closed, has default buffer size, and uses the
146 * defaultExecutor
147 */
148 public void testConstructor1() {
149 SubmissionPublisher<Integer> p = new SubmissionPublisher<>();
150 checkInitialState(p);
151 assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
152 Executor e = p.getExecutor(), c = ForkJoinPool.commonPool();
153 if (ForkJoinPool.getCommonPoolParallelism() > 1)
154 assertSame(e, c);
155 else
156 assertNotSame(e, c);
157 }
158
159 /**
160 * A new SubmissionPublisher has no subscribers, is not closed,
161 * has the given buffer size, and uses the given executor
162 */
163 public void testConstructor2() {
164 Executor e = Executors.newFixedThreadPool(1);
165 SubmissionPublisher<Integer> p = new SubmissionPublisher<>(e, 8);
166 checkInitialState(p);
167 assertSame(p.getExecutor(), e);
168 assertEquals(8, p.getMaxBufferCapacity());
169 }
170
171 /**
172 * A null Executor argument to SubmissionPublisher constructor throws NPE
173 */
174 public void testConstructor3() {
175 try {
176 new SubmissionPublisher<Integer>(null, 8);
177 shouldThrow();
178 } catch (NullPointerException success) {}
179 }
180
181 /**
182 * A negative capacity argument to SubmissionPublisher constructor
183 * throws IAE
184 */
185 public void testConstructor4() {
186 Executor e = Executors.newFixedThreadPool(1);
187 try {
188 new SubmissionPublisher<Integer>(e, -1);
189 shouldThrow();
190 } catch (IllegalArgumentException success) {}
191 }
192
193 /**
194 * A closed publisher reports isClosed with no closedException and
195 * throws ISE upon attempted submission; a subsequent close or
196 * closeExceptionally has no additional effect.
197 */
198 public void testClose() {
199 SubmissionPublisher<Integer> p = basicPublisher();
200 checkInitialState(p);
201 p.close();
202 assertTrue(p.isClosed());
203 assertNull(p.getClosedException());
204 try {
205 p.submit(1);
206 shouldThrow();
207 } catch (IllegalStateException success) {}
208 Throwable ex = new SPException();
209 p.closeExceptionally(ex);
210 assertTrue(p.isClosed());
211 assertNull(p.getClosedException());
212 }
213
214 /**
215 * A publisher closedExceptionally reports isClosed with the
216 * closedException and throws ISE upon attempted submission; a
217 * subsequent close or closeExceptionally has no additional
218 * effect.
219 */
220 public void testCloseExceptionally() {
221 SubmissionPublisher<Integer> p = basicPublisher();
222 checkInitialState(p);
223 Throwable ex = new SPException();
224 p.closeExceptionally(ex);
225 assertTrue(p.isClosed());
226 assertSame(p.getClosedException(), ex);
227 try {
228 p.submit(1);
229 shouldThrow();
230 } catch (IllegalStateException success) {}
231 p.close();
232 assertTrue(p.isClosed());
233 assertSame(p.getClosedException(), ex);
234 }
235
236 /**
237 * Upon subscription, the subscriber's onSubscribe is called, no
238 * other Subscriber methods are invoked, the publisher
239 * hasSubscribers, isSubscribed is true, and existing
240 * subscriptions are unaffected.
241 */
242 public void testSubscribe1() {
243 TestSubscriber s = new TestSubscriber();
244 SubmissionPublisher<Integer> p = basicPublisher();
245 p.subscribe(s);
246 assertTrue(p.hasSubscribers());
247 assertEquals(1, p.getNumberOfSubscribers());
248 assertTrue(p.getSubscribers().contains(s));
249 assertTrue(p.isSubscribed(s));
250 s.awaitSubscribe();
251 assertNotNull(s.sn);
252 assertEquals(0, s.nexts);
253 assertEquals(0, s.errors);
254 assertEquals(0, s.completes);
255 TestSubscriber s2 = new TestSubscriber();
256 p.subscribe(s2);
257 assertTrue(p.hasSubscribers());
258 assertEquals(2, p.getNumberOfSubscribers());
259 assertTrue(p.getSubscribers().contains(s));
260 assertTrue(p.getSubscribers().contains(s2));
261 assertTrue(p.isSubscribed(s));
262 assertTrue(p.isSubscribed(s2));
263 s2.awaitSubscribe();
264 assertNotNull(s2.sn);
265 assertEquals(0, s2.nexts);
266 assertEquals(0, s2.errors);
267 assertEquals(0, s2.completes);
268 p.close();
269 }
270
271 /**
272 * If closed, upon subscription, the subscriber's onComplete
273 * method is invoked
274 */
275 public void testSubscribe2() {
276 TestSubscriber s = new TestSubscriber();
277 SubmissionPublisher<Integer> p = basicPublisher();
278 p.close();
279 p.subscribe(s);
280 s.awaitComplete();
281 assertEquals(0, s.nexts);
282 assertEquals(0, s.errors);
283 assertEquals(1, s.completes, 1);
284 }
285
286 /**
287 * If closedExceptionally, upon subscription, the subscriber's
288 * onError method is invoked
289 */
290 public void testSubscribe3() {
291 TestSubscriber s = new TestSubscriber();
292 SubmissionPublisher<Integer> p = basicPublisher();
293 Throwable ex = new SPException();
294 p.closeExceptionally(ex);
295 assertTrue(p.isClosed());
296 assertSame(p.getClosedException(), ex);
297 p.subscribe(s);
298 s.awaitError();
299 assertEquals(0, s.nexts);
300 assertEquals(1, s.errors);
301 }
302
303 /**
304 * Upon attempted resubscription, the subscriber's onError is
305 * called and the subscription is cancelled.
306 */
307 public void testSubscribe4() {
308 TestSubscriber s = new TestSubscriber();
309 SubmissionPublisher<Integer> p = basicPublisher();
310 p.subscribe(s);
311 assertTrue(p.hasSubscribers());
312 assertEquals(1, p.getNumberOfSubscribers());
313 assertTrue(p.getSubscribers().contains(s));
314 assertTrue(p.isSubscribed(s));
315 s.awaitSubscribe();
316 assertNotNull(s.sn);
317 assertEquals(0, s.nexts);
318 assertEquals(0, s.errors);
319 assertEquals(0, s.completes);
320 p.subscribe(s);
321 s.awaitError();
322 assertEquals(0, s.nexts);
323 assertEquals(1, s.errors);
324 assertFalse(p.isSubscribed(s));
325 }
326
327 /**
328 * An exception thrown in onSubscribe causes onError
329 */
330 public void testSubscribe5() {
331 TestSubscriber s = new TestSubscriber();
332 SubmissionPublisher<Integer> p = basicPublisher();
333 s.throwOnCall = true;
334 try {
335 p.subscribe(s);
336 } catch (Exception ok) {}
337 s.awaitError();
338 assertEquals(0, s.nexts);
339 assertEquals(1, s.errors);
340 assertEquals(0, s.completes);
341 }
342
343 /**
344 * subscribe(null) throws NPE
345 */
346 public void testSubscribe6() {
347 SubmissionPublisher<Integer> p = basicPublisher();
348 try {
349 p.subscribe(null);
350 shouldThrow();
351 } catch (NullPointerException success) {}
352 checkInitialState(p);
353 }
354
355 /**
356 * Closing a publisher causes onComplete to subscribers
357 */
358 public void testCloseCompletes() {
359 SubmissionPublisher<Integer> p = basicPublisher();
360 TestSubscriber s1 = new TestSubscriber();
361 TestSubscriber s2 = new TestSubscriber();
362 p.subscribe(s1);
363 p.subscribe(s2);
364 p.submit(1);
365 p.close();
366 assertTrue(p.isClosed());
367 assertNull(p.getClosedException());
368 s1.awaitComplete();
369 assertEquals(1, s1.nexts);
370 assertEquals(1, s1.completes);
371 s2.awaitComplete();
372 assertEquals(1, s2.nexts);
373 assertEquals(1, s2.completes);
374 }
375
376 /**
377 * Closing a publisher exceptionally causes onError to subscribers
378 * after they are subscribed
379 */
380 public void testCloseExceptionallyError() {
381 SubmissionPublisher<Integer> p = basicPublisher();
382 TestSubscriber s1 = new TestSubscriber();
383 TestSubscriber s2 = new TestSubscriber();
384 p.subscribe(s1);
385 p.subscribe(s2);
386 p.submit(1);
387 p.closeExceptionally(new SPException());
388 assertTrue(p.isClosed());
389 s1.awaitSubscribe();
390 s1.awaitError();
391 assertTrue(s1.nexts <= 1);
392 assertEquals(1, s1.errors);
393 s2.awaitSubscribe();
394 s2.awaitError();
395 assertTrue(s2.nexts <= 1);
396 assertEquals(1, s2.errors);
397 }
398
399 /**
400 * Cancelling a subscription eventually causes no more onNexts to be issued
401 */
402 public void testCancel() {
403 SubmissionPublisher<Integer> p = basicPublisher();
404 TestSubscriber s1 = new TestSubscriber();
405 TestSubscriber s2 = new TestSubscriber();
406 p.subscribe(s1);
407 p.subscribe(s2);
408 s1.awaitSubscribe();
409 p.submit(1);
410 s1.sn.cancel();
411 for (int i = 2; i <= 20; ++i)
412 p.submit(i);
413 p.close();
414 s2.awaitComplete();
415 assertEquals(20, s2.nexts);
416 assertEquals(1, s2.completes);
417 assertTrue(s1.nexts < 20);
418 assertFalse(p.isSubscribed(s1));
419 }
420
421 /**
422 * Throwing an exception in onNext causes onError
423 */
424 public void testThrowOnNext() {
425 SubmissionPublisher<Integer> p = basicPublisher();
426 TestSubscriber s1 = new TestSubscriber();
427 TestSubscriber s2 = new TestSubscriber();
428 p.subscribe(s1);
429 p.subscribe(s2);
430 s1.awaitSubscribe();
431 p.submit(1);
432 s1.throwOnCall = true;
433 p.submit(2);
434 p.close();
435 s2.awaitComplete();
436 assertEquals(2, s2.nexts);
437 s1.awaitComplete();
438 assertEquals(1, s1.errors);
439 }
440
441 /**
442 * If a handler is supplied in constructor, it is invoked when
443 * subscriber throws an exception in onNext
444 */
445 public void testThrowOnNextHandler() {
446 AtomicInteger calls = new AtomicInteger();
447 SubmissionPublisher<Integer> p = new SubmissionPublisher<>(
448 basicExecutor, 8, (s, e) -> calls.getAndIncrement());
449 TestSubscriber s1 = new TestSubscriber();
450 TestSubscriber s2 = new TestSubscriber();
451 p.subscribe(s1);
452 p.subscribe(s2);
453 s1.awaitSubscribe();
454 p.submit(1);
455 s1.throwOnCall = true;
456 p.submit(2);
457 p.close();
458 s2.awaitComplete();
459 assertEquals(2, s2.nexts);
460 assertEquals(1, s2.completes);
461 s1.awaitError();
462 assertEquals(1, s1.errors);
463 assertEquals(1, calls.get());
464 }
465
466 /**
467 * onNext items are issued in the same order to each subscriber
468 */
469 public void testOrder() {
470 SubmissionPublisher<Integer> p = basicPublisher();
471 TestSubscriber s1 = new TestSubscriber();
472 TestSubscriber s2 = new TestSubscriber();
473 p.subscribe(s1);
474 p.subscribe(s2);
475 for (int i = 1; i <= 20; ++i)
476 p.submit(i);
477 p.close();
478 s2.awaitComplete();
479 s1.awaitComplete();
480 assertEquals(20, s2.nexts);
481 assertEquals(1, s2.completes);
482 assertEquals(20, s1.nexts);
483 assertEquals(1, s1.completes);
484 }
485
486 /**
487 * onNext is issued only if requested
488 */
489 public void testRequest1() {
490 SubmissionPublisher<Integer> p = basicPublisher();
491 TestSubscriber s1 = new TestSubscriber();
492 s1.request = false;
493 p.subscribe(s1);
494 s1.awaitSubscribe();
495 assertTrue(p.estimateMinimumDemand() == 0);
496 TestSubscriber s2 = new TestSubscriber();
497 p.subscribe(s2);
498 p.submit(1);
499 p.submit(2);
500 s2.awaitNext(1);
501 assertEquals(0, s1.nexts);
502 s1.sn.request(3);
503 p.submit(3);
504 p.close();
505 s2.awaitComplete();
506 assertEquals(3, s2.nexts);
507 assertEquals(1, s2.completes);
508 s1.awaitComplete();
509 assertTrue(s1.nexts > 0);
510 assertEquals(1, s1.completes);
511 }
512
513 /**
514 * onNext is not issued when requests become zero
515 */
516 public void testRequest2() {
517 SubmissionPublisher<Integer> p = basicPublisher();
518 TestSubscriber s1 = new TestSubscriber();
519 TestSubscriber s2 = new TestSubscriber();
520 p.subscribe(s1);
521 p.subscribe(s2);
522 s2.awaitSubscribe();
523 s1.awaitSubscribe();
524 s1.request = false;
525 p.submit(1);
526 p.submit(2);
527 p.close();
528 s2.awaitComplete();
529 assertEquals(2, s2.nexts);
530 assertEquals(1, s2.completes);
531 s1.awaitNext(1);
532 assertEquals(1, s1.nexts);
533 }
534
535 /**
536 * Negative request causes error
537 */
538 public void testRequest3() {
539 SubmissionPublisher<Integer> p = basicPublisher();
540 TestSubscriber s1 = new TestSubscriber();
541 TestSubscriber s2 = new TestSubscriber();
542 p.subscribe(s1);
543 p.subscribe(s2);
544 s2.awaitSubscribe();
545 s1.awaitSubscribe();
546 s1.sn.request(-1L);
547 p.submit(1);
548 p.submit(2);
549 p.close();
550 s2.awaitComplete();
551 assertEquals(2, s2.nexts);
552 assertEquals(1, s2.completes);
553 s1.awaitError();
554 assertEquals(1, s1.errors);
555 assertTrue(s1.lastError instanceof IllegalArgumentException);
556 }
557
558 /**
559 * estimateMinimumDemand reports 0 until request, nonzero after
560 * request, and zero again after delivery
561 */
562 public void testEstimateMinimumDemand() {
563 TestSubscriber s = new TestSubscriber();
564 SubmissionPublisher<Integer> p = basicPublisher();
565 s.request = false;
566 p.subscribe(s);
567 s.awaitSubscribe();
568 assertEquals(0, p.estimateMinimumDemand());
569 s.sn.request(1);
570 assertEquals(1, p.estimateMinimumDemand());
571 p.submit(1);
572 s.awaitNext(1);
573 assertEquals(0, p.estimateMinimumDemand());
574 }
575
576 /**
577 * submit to a publisher with no subscribers returns lag 0
578 */
579 public void testEmptySubmit() {
580 SubmissionPublisher<Integer> p = basicPublisher();
581 assertEquals(0, p.submit(1));
582 }
583
584 /**
585 * submit(null) throws NPE
586 */
587 public void testNullSubmit() {
588 SubmissionPublisher<Integer> p = basicPublisher();
589 try {
590 p.submit(null);
591 shouldThrow();
592 } catch (NullPointerException success) {}
593 }
594
595 /**
596 * submit returns number of lagged items, compatible with result
597 * of estimateMaximumLag.
598 */
599 public void testLaggedSubmit() {
600 SubmissionPublisher<Integer> p = basicPublisher();
601 TestSubscriber s1 = new TestSubscriber();
602 s1.request = false;
603 TestSubscriber s2 = new TestSubscriber();
604 s2.request = false;
605 p.subscribe(s1);
606 p.subscribe(s2);
607 s2.awaitSubscribe();
608 s1.awaitSubscribe();
609 assertEquals(1, p.submit(1));
610 assertTrue(p.estimateMaximumLag() >= 1);
611 assertTrue(p.submit(2) >= 2);
612 assertTrue(p.estimateMaximumLag() >= 2);
613 s1.sn.request(4);
614 assertTrue(p.submit(3) >= 3);
615 assertTrue(p.estimateMaximumLag() >= 3);
616 s2.sn.request(4);
617 p.submit(4);
618 p.close();
619 s2.awaitComplete();
620 assertEquals(4, s2.nexts);
621 s1.awaitComplete();
622 assertEquals(4, s2.nexts);
623 }
624
625 /**
626 * submit eventually issues requested items when buffer capacity is 1
627 */
628 public void testCap1Submit() {
629 SubmissionPublisher<Integer> p
630 = new SubmissionPublisher<>(basicExecutor, 1);
631 TestSubscriber s1 = new TestSubscriber();
632 TestSubscriber s2 = new TestSubscriber();
633 p.subscribe(s1);
634 p.subscribe(s2);
635 for (int i = 1; i <= 20; ++i) {
636 assertTrue(p.estimateMinimumDemand() <= 1);
637 assertTrue(p.submit(i) >= 0);
638 }
639 p.close();
640 s2.awaitComplete();
641 s1.awaitComplete();
642 assertEquals(20, s2.nexts);
643 assertEquals(1, s2.completes);
644 assertEquals(20, s1.nexts);
645 assertEquals(1, s1.completes);
646 }
647
648 static boolean noopHandle(AtomicInteger count) {
649 count.getAndIncrement();
650 return false;
651 }
652
653 static boolean reqHandle(AtomicInteger count, Subscriber s) {
654 count.getAndIncrement();
655 ((TestSubscriber)s).sn.request(Long.MAX_VALUE);
656 return true;
657 }
658
659 /**
660 * offer to a publisher with no subscribers returns lag 0
661 */
662 public void testEmptyOffer() {
663 SubmissionPublisher<Integer> p = basicPublisher();
664 assertEquals(0, p.offer(1, null));
665 }
666
667 /**
668 * offer(null) throws NPE
669 */
670 public void testNullOffer() {
671 SubmissionPublisher<Integer> p = basicPublisher();
672 try {
673 p.offer(null, null);
674 shouldThrow();
675 } catch (NullPointerException success) {}
676 }
677
678 /**
679 * offer returns number of lagged items if not saturated
680 */
681 public void testLaggedOffer() {
682 SubmissionPublisher<Integer> p = basicPublisher();
683 TestSubscriber s1 = new TestSubscriber();
684 s1.request = false;
685 TestSubscriber s2 = new TestSubscriber();
686 s2.request = false;
687 p.subscribe(s1);
688 p.subscribe(s2);
689 s2.awaitSubscribe();
690 s1.awaitSubscribe();
691 assertTrue(p.offer(1, null) >= 1);
692 assertTrue(p.offer(2, null) >= 2);
693 s1.sn.request(4);
694 assertTrue(p.offer(3, null) >= 3);
695 s2.sn.request(4);
696 p.offer(4, null);
697 p.close();
698 s2.awaitComplete();
699 assertEquals(4, s2.nexts);
700 s1.awaitComplete();
701 assertEquals(4, s2.nexts);
702 }
703
704 /**
705 * offer reports drops if saturated
706 */
707 public void testDroppedOffer() {
708 SubmissionPublisher<Integer> p
709 = new SubmissionPublisher<>(basicExecutor, 4);
710 TestSubscriber s1 = new TestSubscriber();
711 s1.request = false;
712 TestSubscriber s2 = new TestSubscriber();
713 s2.request = false;
714 p.subscribe(s1);
715 p.subscribe(s2);
716 s2.awaitSubscribe();
717 s1.awaitSubscribe();
718 for (int i = 1; i <= 4; ++i)
719 assertTrue(p.offer(i, null) >= 0);
720 p.offer(5, null);
721 assertTrue(p.offer(6, null) < 0);
722 s1.sn.request(64);
723 assertTrue(p.offer(7, null) < 0);
724 s2.sn.request(64);
725 p.close();
726 s2.awaitComplete();
727 assertTrue(s2.nexts >= 4);
728 s1.awaitComplete();
729 assertTrue(s1.nexts >= 4);
730 }
731
732 /**
733 * offer invokes drop handler if saturated
734 */
735 public void testHandledDroppedOffer() {
736 AtomicInteger calls = new AtomicInteger();
737 SubmissionPublisher<Integer> p
738 = new SubmissionPublisher<>(basicExecutor, 4);
739 TestSubscriber s1 = new TestSubscriber();
740 s1.request = false;
741 TestSubscriber s2 = new TestSubscriber();
742 s2.request = false;
743 p.subscribe(s1);
744 p.subscribe(s2);
745 s2.awaitSubscribe();
746 s1.awaitSubscribe();
747 for (int i = 1; i <= 4; ++i)
748 assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
749 p.offer(4, (s, x) -> noopHandle(calls));
750 assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
751 s1.sn.request(64);
752 assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
753 s2.sn.request(64);
754 p.close();
755 s2.awaitComplete();
756 s1.awaitComplete();
757 assertTrue(calls.get() >= 4);
758 }
759
760 /**
761 * offer succeeds if drop handler forces request
762 */
763 public void testRecoveredHandledDroppedOffer() {
764 AtomicInteger calls = new AtomicInteger();
765 SubmissionPublisher<Integer> p
766 = new SubmissionPublisher<>(basicExecutor, 4);
767 TestSubscriber s1 = new TestSubscriber();
768 s1.request = false;
769 TestSubscriber s2 = new TestSubscriber();
770 s2.request = false;
771 p.subscribe(s1);
772 p.subscribe(s2);
773 s2.awaitSubscribe();
774 s1.awaitSubscribe();
775 int n = 0;
776 for (int i = 1; i <= 8; ++i) {
777 int d = p.offer(i, (s, x) -> reqHandle(calls, s));
778 n = n + 2 + (d < 0 ? d : 0);
779 }
780 p.close();
781 s2.awaitComplete();
782 s1.awaitComplete();
783 assertEquals(n, s1.nexts + s2.nexts);
784 assertTrue(calls.get() >= 2);
785 }
786
787 /**
788 * Timed offer to a publisher with no subscribers returns lag 0
789 */
790 public void testEmptyTimedOffer() {
791 SubmissionPublisher<Integer> p = basicPublisher();
792 long startTime = System.nanoTime();
793 assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null));
794 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
795 }
796
797 /**
798 * Timed offer with null item or TimeUnit throws NPE
799 */
800 public void testNullTimedOffer() {
801 SubmissionPublisher<Integer> p = basicPublisher();
802 long startTime = System.nanoTime();
803 try {
804 p.offer(null, LONG_DELAY_MS, MILLISECONDS, null);
805 shouldThrow();
806 } catch (NullPointerException success) {}
807 try {
808 p.offer(1, LONG_DELAY_MS, null, null);
809 shouldThrow();
810 } catch (NullPointerException success) {}
811 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
812 }
813
814 /**
815 * Timed offer returns number of lagged items if not saturated
816 */
817 public void testLaggedTimedOffer() {
818 SubmissionPublisher<Integer> p = basicPublisher();
819 TestSubscriber s1 = new TestSubscriber();
820 s1.request = false;
821 TestSubscriber s2 = new TestSubscriber();
822 s2.request = false;
823 p.subscribe(s1);
824 p.subscribe(s2);
825 s2.awaitSubscribe();
826 s1.awaitSubscribe();
827 long startTime = System.nanoTime();
828 assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
829 assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
830 s1.sn.request(4);
831 assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
832 s2.sn.request(4);
833 p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
834 p.close();
835 s2.awaitComplete();
836 assertEquals(4, s2.nexts);
837 s1.awaitComplete();
838 assertEquals(4, s2.nexts);
839 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
840 }
841
842 /**
843 * Timed offer reports drops if saturated
844 */
845 public void testDroppedTimedOffer() {
846 SubmissionPublisher<Integer> p
847 = new SubmissionPublisher<>(basicExecutor, 4);
848 TestSubscriber s1 = new TestSubscriber();
849 s1.request = false;
850 TestSubscriber s2 = new TestSubscriber();
851 s2.request = false;
852 p.subscribe(s1);
853 p.subscribe(s2);
854 s2.awaitSubscribe();
855 s1.awaitSubscribe();
856 long delay = timeoutMillis();
857 for (int i = 1; i <= 4; ++i)
858 assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0);
859 long startTime = System.nanoTime();
860 assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0);
861 s1.sn.request(64);
862 assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0);
863 // 2 * delay should elapse but check only 1 * delay to allow timer slop
864 assertTrue(millisElapsedSince(startTime) >= delay);
865 s2.sn.request(64);
866 p.close();
867 s2.awaitComplete();
868 assertTrue(s2.nexts >= 2);
869 s1.awaitComplete();
870 assertTrue(s1.nexts >= 2);
871 }
872
873 /**
874 * Timed offer invokes drop handler if saturated
875 */
876 public void testHandledDroppedTimedOffer() {
877 AtomicInteger calls = new AtomicInteger();
878 SubmissionPublisher<Integer> p
879 = new SubmissionPublisher<>(basicExecutor, 4);
880 TestSubscriber s1 = new TestSubscriber();
881 s1.request = false;
882 TestSubscriber s2 = new TestSubscriber();
883 s2.request = false;
884 p.subscribe(s1);
885 p.subscribe(s2);
886 s2.awaitSubscribe();
887 s1.awaitSubscribe();
888 long delay = timeoutMillis();
889 for (int i = 1; i <= 4; ++i)
890 assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
891 long startTime = System.nanoTime();
892 assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
893 s1.sn.request(64);
894 assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
895 assertTrue(millisElapsedSince(startTime) >= delay);
896 s2.sn.request(64);
897 p.close();
898 s2.awaitComplete();
899 s1.awaitComplete();
900 assertTrue(calls.get() >= 2);
901 }
902
903 /**
904 * Timed offer succeeds if drop handler forces request
905 */
906 public void testRecoveredHandledDroppedTimedOffer() {
907 AtomicInteger calls = new AtomicInteger();
908 SubmissionPublisher<Integer> p
909 = new SubmissionPublisher<>(basicExecutor, 4);
910 TestSubscriber s1 = new TestSubscriber();
911 s1.request = false;
912 TestSubscriber s2 = new TestSubscriber();
913 s2.request = false;
914 p.subscribe(s1);
915 p.subscribe(s2);
916 s2.awaitSubscribe();
917 s1.awaitSubscribe();
918 int n = 0;
919 long delay = timeoutMillis();
920 long startTime = System.nanoTime();
921 for (int i = 1; i <= 6; ++i) {
922 int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
923 n = n + 2 + (d < 0 ? d : 0);
924 }
925 assertTrue(millisElapsedSince(startTime) >= delay);
926 p.close();
927 s2.awaitComplete();
928 s1.awaitComplete();
929 assertEquals(n, s1.nexts + s2.nexts);
930 assertTrue(calls.get() >= 2);
931 }
932
933 /**
934 * consume returns a CompletableFuture that is done when
935 * publisher completes
936 */
937 public void testConsume() {
938 AtomicInteger sum = new AtomicInteger();
939 SubmissionPublisher<Integer> p = basicPublisher();
940 CompletableFuture<Void> f =
941 p.consume((Integer x) -> sum.getAndAdd(x.intValue()));
942 int n = 20;
943 for (int i = 1; i <= n; ++i)
944 p.submit(i);
945 p.close();
946 f.join();
947 assertEquals((n * (n + 1)) / 2, sum.get());
948 }
949
950 /**
951 * consume(null) throws NPE
952 */
953 public void testConsumeNPE() {
954 SubmissionPublisher<Integer> p = basicPublisher();
955 try {
956 CompletableFuture<Void> f = p.consume(null);
957 shouldThrow();
958 } catch (NullPointerException success) {}
959 }
960
961 /**
962 * consume eventually stops processing published items if cancelled
963 */
964 public void testCancelledConsume() {
965 AtomicInteger count = new AtomicInteger();
966 SubmissionPublisher<Integer> p = basicPublisher();
967 CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
968 f.cancel(true);
969 int n = 1000000; // arbitrary limit
970 for (int i = 1; i <= n; ++i)
971 p.submit(i);
972 assertTrue(count.get() < n);
973 }
974
975 }