ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SubmissionPublisherTest.java
Revision: 1.15
Committed: Sun Nov 6 22:50:32 2016 UTC (7 years, 5 months ago) by jsr166
Branch: MAIN
Changes since 1.14: +1 -1 lines
Log Message:
elide unnecessary braces

File Contents

# Content
1 /*
2 * Written by Doug Lea and Martin Buchholz with assistance from
3 * members of JCP JSR-166 Expert Group and released to the public
4 * domain, as explained at
5 * http://creativecommons.org/publicdomain/zero/1.0/
6 */
7
8 import java.util.concurrent.CompletableFuture;
9 import java.util.concurrent.Executor;
10 import java.util.concurrent.Executors;
11 import java.util.concurrent.Flow;
12 import java.util.concurrent.ForkJoinPool;
13 import java.util.concurrent.SubmissionPublisher;
14 import java.util.concurrent.atomic.AtomicInteger;
15 import junit.framework.Test;
16 import junit.framework.TestSuite;
17
18 import static java.util.concurrent.Flow.Subscriber;
19 import static java.util.concurrent.Flow.Subscription;
20 import static java.util.concurrent.TimeUnit.MILLISECONDS;
21
22 public class SubmissionPublisherTest extends JSR166TestCase {
23
24 public static void main(String[] args) {
25 main(suite(), args);
26 }
27 public static Test suite() {
28 return new TestSuite(SubmissionPublisherTest.class);
29 }
30
31 final Executor basicExecutor = basicPublisher().getExecutor();
32
33 static SubmissionPublisher<Integer> basicPublisher() {
34 return new SubmissionPublisher<Integer>();
35 }
36
37 static class SPException extends RuntimeException {}
38
39 class TestSubscriber implements Subscriber<Integer> {
40 volatile Subscription sn;
41 int last; // Requires that onNexts are in numeric order
42 volatile int nexts;
43 volatile int errors;
44 volatile int completes;
45 volatile boolean throwOnCall = false;
46 volatile boolean request = true;
47 volatile Throwable lastError;
48
49 public synchronized void onSubscribe(Subscription s) {
50 threadAssertTrue(sn == null);
51 sn = s;
52 notifyAll();
53 if (throwOnCall)
54 throw new SPException();
55 if (request)
56 sn.request(1L);
57 }
58 public synchronized void onNext(Integer t) {
59 ++nexts;
60 notifyAll();
61 int current = t.intValue();
62 threadAssertTrue(current >= last);
63 last = current;
64 if (request)
65 sn.request(1L);
66 if (throwOnCall)
67 throw new SPException();
68 }
69 public synchronized void onError(Throwable t) {
70 threadAssertTrue(completes == 0);
71 threadAssertTrue(errors == 0);
72 lastError = t;
73 ++errors;
74 notifyAll();
75 }
76 public synchronized void onComplete() {
77 threadAssertTrue(completes == 0);
78 ++completes;
79 notifyAll();
80 }
81
82 synchronized void awaitSubscribe() {
83 while (sn == null) {
84 try {
85 wait();
86 } catch (Exception ex) {
87 threadUnexpectedException(ex);
88 break;
89 }
90 }
91 }
92 synchronized void awaitNext(int n) {
93 while (nexts < n) {
94 try {
95 wait();
96 } catch (Exception ex) {
97 threadUnexpectedException(ex);
98 break;
99 }
100 }
101 }
102 synchronized void awaitComplete() {
103 while (completes == 0 && errors == 0) {
104 try {
105 wait();
106 } catch (Exception ex) {
107 threadUnexpectedException(ex);
108 break;
109 }
110 }
111 }
112 synchronized void awaitError() {
113 while (errors == 0) {
114 try {
115 wait();
116 } catch (Exception ex) {
117 threadUnexpectedException(ex);
118 break;
119 }
120 }
121 }
122
123 }
124
125 /**
126 * A new SubmissionPublisher has no subscribers, a non-null
127 * executor, a power-of-two capacity, is not closed, and reports
128 * zero demand and lag
129 */
130 void checkInitialState(SubmissionPublisher<?> p) {
131 assertFalse(p.hasSubscribers());
132 assertEquals(0, p.getNumberOfSubscribers());
133 assertTrue(p.getSubscribers().isEmpty());
134 assertFalse(p.isClosed());
135 assertNull(p.getClosedException());
136 int n = p.getMaxBufferCapacity();
137 assertTrue((n & (n - 1)) == 0); // power of two
138 assertNotNull(p.getExecutor());
139 assertEquals(0, p.estimateMinimumDemand());
140 assertEquals(0, p.estimateMaximumLag());
141 }
142
143 /**
144 * A default-constructed SubmissionPublisher has no subscribers,
145 * is not closed, has default buffer size, and uses the
146 * defaultExecutor
147 */
148 public void testConstructor1() {
149 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>();
150 checkInitialState(p);
151 assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
152 Executor e = p.getExecutor(), c = ForkJoinPool.commonPool();
153 if (ForkJoinPool.getCommonPoolParallelism() > 1)
154 assertSame(e, c);
155 else
156 assertNotSame(e, c);
157 }
158
159 /**
160 * A new SubmissionPublisher has no subscribers, is not closed,
161 * has the given buffer size, and uses the given executor
162 */
163 public void testConstructor2() {
164 Executor e = Executors.newFixedThreadPool(1);
165 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(e, 8);
166 checkInitialState(p);
167 assertSame(p.getExecutor(), e);
168 assertEquals(8, p.getMaxBufferCapacity());
169 }
170
171 /**
172 * A null Executor argument to SubmissionPublisher constructor throws NPE
173 */
174 public void testConstructor3() {
175 try {
176 new SubmissionPublisher<Integer>(null, 8);
177 shouldThrow();
178 } catch (NullPointerException success) {}
179 }
180
181 /**
182 * A negative capacity argument to SubmissionPublisher constructor
183 * throws IAE
184 */
185 public void testConstructor4() {
186 Executor e = Executors.newFixedThreadPool(1);
187 try {
188 new SubmissionPublisher<Integer>(e, -1);
189 shouldThrow();
190 } catch (IllegalArgumentException success) {}
191 }
192
193 /**
194 * A closed publisher reports isClosed with no closedException and
195 * throws ISE upon attempted submission; a subsequent close or
196 * closeExceptionally has no additional effect.
197 */
198 public void testClose() {
199 SubmissionPublisher<Integer> p = basicPublisher();
200 checkInitialState(p);
201 p.close();
202 assertTrue(p.isClosed());
203 assertNull(p.getClosedException());
204 try {
205 p.submit(1);
206 shouldThrow();
207 } catch (IllegalStateException success) {}
208 Throwable ex = new SPException();
209 p.closeExceptionally(ex);
210 assertTrue(p.isClosed());
211 assertNull(p.getClosedException());
212 }
213
214 /**
215 * A publisher closedExceptionally reports isClosed with the
216 * closedException and throws ISE upon attempted submission; a
217 * subsequent close or closeExceptionally has no additional
218 * effect.
219 */
220 public void testCloseExceptionally() {
221 SubmissionPublisher<Integer> p = basicPublisher();
222 checkInitialState(p);
223 Throwable ex = new SPException();
224 p.closeExceptionally(ex);
225 assertTrue(p.isClosed());
226 assertSame(p.getClosedException(), ex);
227 try {
228 p.submit(1);
229 shouldThrow();
230 } catch (IllegalStateException success) {}
231 p.close();
232 assertTrue(p.isClosed());
233 assertSame(p.getClosedException(), ex);
234 }
235
236 /**
237 * Upon subscription, the subscriber's onSubscribe is called, no
238 * other Subscriber methods are invoked, the publisher
239 * hasSubscribers, isSubscribed is true, and existing
240 * subscriptions are unaffected.
241 */
242 public void testSubscribe1() {
243 TestSubscriber s = new TestSubscriber();
244 SubmissionPublisher<Integer> p = basicPublisher();
245 p.subscribe(s);
246 assertTrue(p.hasSubscribers());
247 assertEquals(1, p.getNumberOfSubscribers());
248 assertTrue(p.getSubscribers().contains(s));
249 assertTrue(p.isSubscribed(s));
250 s.awaitSubscribe();
251 assertNotNull(s.sn);
252 assertEquals(0, s.nexts);
253 assertEquals(0, s.errors);
254 assertEquals(0, s.completes);
255 TestSubscriber s2 = new TestSubscriber();
256 p.subscribe(s2);
257 assertTrue(p.hasSubscribers());
258 assertEquals(2, p.getNumberOfSubscribers());
259 assertTrue(p.getSubscribers().contains(s));
260 assertTrue(p.getSubscribers().contains(s2));
261 assertTrue(p.isSubscribed(s));
262 assertTrue(p.isSubscribed(s2));
263 s2.awaitSubscribe();
264 assertNotNull(s2.sn);
265 assertEquals(0, s2.nexts);
266 assertEquals(0, s2.errors);
267 assertEquals(0, s2.completes);
268 p.close();
269 }
270
271 /**
272 * If closed, upon subscription, the subscriber's onComplete
273 * method is invoked
274 */
275 public void testSubscribe2() {
276 TestSubscriber s = new TestSubscriber();
277 SubmissionPublisher<Integer> p = basicPublisher();
278 p.close();
279 p.subscribe(s);
280 s.awaitComplete();
281 assertEquals(0, s.nexts);
282 assertEquals(0, s.errors);
283 assertEquals(1, s.completes, 1);
284 }
285
286 /**
287 * If closedExceptionally, upon subscription, the subscriber's
288 * onError method is invoked
289 */
290 public void testSubscribe3() {
291 TestSubscriber s = new TestSubscriber();
292 SubmissionPublisher<Integer> p = basicPublisher();
293 Throwable ex = new SPException();
294 p.closeExceptionally(ex);
295 assertTrue(p.isClosed());
296 assertSame(p.getClosedException(), ex);
297 p.subscribe(s);
298 s.awaitError();
299 assertEquals(0, s.nexts);
300 assertEquals(1, s.errors);
301 }
302
303 /**
304 * Upon attempted resubscription, the subscriber's onError is
305 * called and the subscription is cancelled.
306 */
307 public void testSubscribe4() {
308 TestSubscriber s = new TestSubscriber();
309 SubmissionPublisher<Integer> p = basicPublisher();
310 p.subscribe(s);
311 assertTrue(p.hasSubscribers());
312 assertEquals(1, p.getNumberOfSubscribers());
313 assertTrue(p.getSubscribers().contains(s));
314 assertTrue(p.isSubscribed(s));
315 s.awaitSubscribe();
316 assertNotNull(s.sn);
317 assertEquals(0, s.nexts);
318 assertEquals(0, s.errors);
319 assertEquals(0, s.completes);
320 p.subscribe(s);
321 s.awaitError();
322 assertEquals(0, s.nexts);
323 assertEquals(1, s.errors);
324 assertFalse(p.isSubscribed(s));
325 }
326
327 /**
328 * An exception thrown in onSubscribe causes onError
329 */
330 public void testSubscribe5() {
331 TestSubscriber s = new TestSubscriber();
332 SubmissionPublisher<Integer> p = basicPublisher();
333 s.throwOnCall = true;
334 try {
335 p.subscribe(s);
336 } catch (Exception ok) {}
337 s.awaitError();
338 assertEquals(0, s.nexts);
339 assertEquals(1, s.errors);
340 assertEquals(0, s.completes);
341 }
342
343 /**
344 * subscribe(null) throws NPE
345 */
346 public void testSubscribe6() {
347 SubmissionPublisher<Integer> p = basicPublisher();
348 try {
349 p.subscribe(null);
350 shouldThrow();
351 } catch (NullPointerException success) {}
352 checkInitialState(p);
353 }
354
355 /**
356 * Closing a publisher causes onComplete to subscribers
357 */
358 public void testCloseCompletes() {
359 SubmissionPublisher<Integer> p = basicPublisher();
360 TestSubscriber s1 = new TestSubscriber();
361 TestSubscriber s2 = new TestSubscriber();
362 p.subscribe(s1);
363 p.subscribe(s2);
364 p.submit(1);
365 p.close();
366 assertTrue(p.isClosed());
367 assertNull(p.getClosedException());
368 s1.awaitComplete();
369 assertEquals(1, s1.nexts);
370 assertEquals(1, s1.completes);
371 s2.awaitComplete();
372 assertEquals(1, s2.nexts);
373 assertEquals(1, s2.completes);
374 }
375
376 /**
377 * Closing a publisher exceptionally causes onError to subscribers
378 */
379 public void testCloseExceptionallyError() {
380 SubmissionPublisher<Integer> p = basicPublisher();
381 TestSubscriber s1 = new TestSubscriber();
382 TestSubscriber s2 = new TestSubscriber();
383 p.subscribe(s1);
384 p.subscribe(s2);
385 p.submit(1);
386 p.closeExceptionally(new SPException());
387 assertTrue(p.isClosed());
388 s1.awaitError();
389 assertTrue(s1.nexts <= 1);
390 assertEquals(1, s1.errors);
391 s2.awaitError();
392 assertTrue(s2.nexts <= 1);
393 assertEquals(1, s2.errors);
394 }
395
396 /**
397 * Cancelling a subscription eventually causes no more onNexts to be issued
398 */
399 public void testCancel() {
400 SubmissionPublisher<Integer> p = basicPublisher();
401 TestSubscriber s1 = new TestSubscriber();
402 TestSubscriber s2 = new TestSubscriber();
403 p.subscribe(s1);
404 p.subscribe(s2);
405 s1.awaitSubscribe();
406 p.submit(1);
407 s1.sn.cancel();
408 for (int i = 2; i <= 20; ++i)
409 p.submit(i);
410 p.close();
411 s2.awaitComplete();
412 assertEquals(20, s2.nexts);
413 assertEquals(1, s2.completes);
414 assertTrue(s1.nexts < 20);
415 assertFalse(p.isSubscribed(s1));
416 }
417
418 /**
419 * Throwing an exception in onNext causes onError
420 */
421 public void testThrowOnNext() {
422 SubmissionPublisher<Integer> p = basicPublisher();
423 TestSubscriber s1 = new TestSubscriber();
424 TestSubscriber s2 = new TestSubscriber();
425 p.subscribe(s1);
426 p.subscribe(s2);
427 s1.awaitSubscribe();
428 p.submit(1);
429 s1.throwOnCall = true;
430 p.submit(2);
431 p.close();
432 s2.awaitComplete();
433 assertEquals(2, s2.nexts);
434 s1.awaitComplete();
435 assertEquals(1, s1.errors);
436 }
437
438 /**
439 * If a handler is supplied in constructor, it is invoked when
440 * subscriber throws an exception in onNext
441 */
442 public void testThrowOnNextHandler() {
443 AtomicInteger calls = new AtomicInteger();
444 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>
445 (basicExecutor, 8,
446 (s, e) -> calls.getAndIncrement());
447 TestSubscriber s1 = new TestSubscriber();
448 TestSubscriber s2 = new TestSubscriber();
449 p.subscribe(s1);
450 p.subscribe(s2);
451 s1.awaitSubscribe();
452 p.submit(1);
453 s1.throwOnCall = true;
454 p.submit(2);
455 p.close();
456 s2.awaitComplete();
457 assertEquals(2, s2.nexts);
458 assertEquals(1, s2.completes);
459 s1.awaitError();
460 assertEquals(1, s1.errors);
461 assertEquals(1, calls.get());
462 }
463
464 /**
465 * onNext items are issued in the same order to each subscriber
466 */
467 public void testOrder() {
468 SubmissionPublisher<Integer> p = basicPublisher();
469 TestSubscriber s1 = new TestSubscriber();
470 TestSubscriber s2 = new TestSubscriber();
471 p.subscribe(s1);
472 p.subscribe(s2);
473 for (int i = 1; i <= 20; ++i)
474 p.submit(i);
475 p.close();
476 s2.awaitComplete();
477 s1.awaitComplete();
478 assertEquals(20, s2.nexts);
479 assertEquals(1, s2.completes);
480 assertEquals(20, s1.nexts);
481 assertEquals(1, s1.completes);
482 }
483
484 /**
485 * onNext is issued only if requested
486 */
487 public void testRequest1() {
488 SubmissionPublisher<Integer> p = basicPublisher();
489 TestSubscriber s1 = new TestSubscriber();
490 s1.request = false;
491 p.subscribe(s1);
492 s1.awaitSubscribe();
493 assertTrue(p.estimateMinimumDemand() == 0);
494 TestSubscriber s2 = new TestSubscriber();
495 p.subscribe(s2);
496 p.submit(1);
497 p.submit(2);
498 s2.awaitNext(1);
499 assertEquals(0, s1.nexts);
500 s1.sn.request(3);
501 p.submit(3);
502 p.close();
503 s2.awaitComplete();
504 assertEquals(3, s2.nexts);
505 assertEquals(1, s2.completes);
506 s1.awaitComplete();
507 assertTrue(s1.nexts > 0);
508 assertEquals(1, s1.completes);
509 }
510
511 /**
512 * onNext is not issued when requests become zero
513 */
514 public void testRequest2() {
515 SubmissionPublisher<Integer> p = basicPublisher();
516 TestSubscriber s1 = new TestSubscriber();
517 TestSubscriber s2 = new TestSubscriber();
518 p.subscribe(s1);
519 p.subscribe(s2);
520 s2.awaitSubscribe();
521 s1.awaitSubscribe();
522 s1.request = false;
523 p.submit(1);
524 p.submit(2);
525 p.close();
526 s2.awaitComplete();
527 assertEquals(2, s2.nexts);
528 assertEquals(1, s2.completes);
529 s1.awaitNext(1);
530 assertEquals(1, s1.nexts);
531 }
532
533 /**
534 * Negative request causes error
535 */
536 public void testRequest3() {
537 SubmissionPublisher<Integer> p = basicPublisher();
538 TestSubscriber s1 = new TestSubscriber();
539 TestSubscriber s2 = new TestSubscriber();
540 p.subscribe(s1);
541 p.subscribe(s2);
542 s2.awaitSubscribe();
543 s1.awaitSubscribe();
544 s1.sn.request(-1L);
545 p.submit(1);
546 p.submit(2);
547 p.close();
548 s2.awaitComplete();
549 assertEquals(2, s2.nexts);
550 assertEquals(1, s2.completes);
551 s1.awaitError();
552 assertEquals(1, s1.errors);
553 assertTrue(s1.lastError instanceof IllegalArgumentException);
554 }
555
556 /**
557 * estimateMinimumDemand reports 0 until request, nonzero after
558 * request, and zero again after delivery
559 */
560 public void testEstimateMinimumDemand() {
561 TestSubscriber s = new TestSubscriber();
562 SubmissionPublisher<Integer> p = basicPublisher();
563 s.request = false;
564 p.subscribe(s);
565 s.awaitSubscribe();
566 assertEquals(0, p.estimateMinimumDemand());
567 s.sn.request(1);
568 assertEquals(1, p.estimateMinimumDemand());
569 p.submit(1);
570 s.awaitNext(1);
571 assertEquals(0, p.estimateMinimumDemand());
572 }
573
574 /**
575 * submit to a publisher with no subscribers returns lag 0
576 */
577 public void testEmptySubmit() {
578 SubmissionPublisher<Integer> p = basicPublisher();
579 assertEquals(0, p.submit(1));
580 }
581
582 /**
583 * submit(null) throws NPE
584 */
585 public void testNullSubmit() {
586 SubmissionPublisher<Integer> p = basicPublisher();
587 try {
588 p.submit(null);
589 shouldThrow();
590 } catch (NullPointerException success) {}
591 }
592
593 /**
594 * submit returns number of lagged items, compatible with result
595 * of estimateMaximumLag.
596 */
597 public void testLaggedSubmit() {
598 SubmissionPublisher<Integer> p = basicPublisher();
599 TestSubscriber s1 = new TestSubscriber();
600 s1.request = false;
601 TestSubscriber s2 = new TestSubscriber();
602 s2.request = false;
603 p.subscribe(s1);
604 p.subscribe(s2);
605 s2.awaitSubscribe();
606 s1.awaitSubscribe();
607 assertEquals(1, p.submit(1));
608 assertTrue(p.estimateMaximumLag() >= 1);
609 assertTrue(p.submit(2) >= 2);
610 assertTrue(p.estimateMaximumLag() >= 2);
611 s1.sn.request(4);
612 assertTrue(p.submit(3) >= 3);
613 assertTrue(p.estimateMaximumLag() >= 3);
614 s2.sn.request(4);
615 p.submit(4);
616 p.close();
617 s2.awaitComplete();
618 assertEquals(4, s2.nexts);
619 s1.awaitComplete();
620 assertEquals(4, s2.nexts);
621 }
622
623 /**
624 * submit eventually issues requested items when buffer capacity is 1
625 */
626 public void testCap1Submit() {
627 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
628 basicExecutor, 1);
629 TestSubscriber s1 = new TestSubscriber();
630 TestSubscriber s2 = new TestSubscriber();
631 p.subscribe(s1);
632 p.subscribe(s2);
633 for (int i = 1; i <= 20; ++i) {
634 assertTrue(p.estimateMinimumDemand() <= 1);
635 assertTrue(p.submit(i) >= 0);
636 }
637 p.close();
638 s2.awaitComplete();
639 s1.awaitComplete();
640 assertEquals(20, s2.nexts);
641 assertEquals(1, s2.completes);
642 assertEquals(20, s1.nexts);
643 assertEquals(1, s1.completes);
644 }
645
646 static boolean noopHandle(AtomicInteger count) {
647 count.getAndIncrement();
648 return false;
649 }
650
651 static boolean reqHandle(AtomicInteger count, Subscriber s) {
652 count.getAndIncrement();
653 ((TestSubscriber)s).sn.request(Long.MAX_VALUE);
654 return true;
655 }
656
657 /**
658 * offer to a publisher with no subscribers returns lag 0
659 */
660 public void testEmptyOffer() {
661 SubmissionPublisher<Integer> p = basicPublisher();
662 assertEquals(0, p.offer(1, null));
663 }
664
665 /**
666 * offer(null) throws NPE
667 */
668 public void testNullOffer() {
669 SubmissionPublisher<Integer> p = basicPublisher();
670 try {
671 p.offer(null, null);
672 shouldThrow();
673 } catch (NullPointerException success) {}
674 }
675
676 /**
677 * offer returns number of lagged items if not saturated
678 */
679 public void testLaggedOffer() {
680 SubmissionPublisher<Integer> p = basicPublisher();
681 TestSubscriber s1 = new TestSubscriber();
682 s1.request = false;
683 TestSubscriber s2 = new TestSubscriber();
684 s2.request = false;
685 p.subscribe(s1);
686 p.subscribe(s2);
687 s2.awaitSubscribe();
688 s1.awaitSubscribe();
689 assertTrue(p.offer(1, null) >= 1);
690 assertTrue(p.offer(2, null) >= 2);
691 s1.sn.request(4);
692 assertTrue(p.offer(3, null) >= 3);
693 s2.sn.request(4);
694 p.offer(4, null);
695 p.close();
696 s2.awaitComplete();
697 assertEquals(4, s2.nexts);
698 s1.awaitComplete();
699 assertEquals(4, s2.nexts);
700 }
701
702 /**
703 * offer reports drops if saturated
704 */
705 public void testDroppedOffer() {
706 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
707 basicExecutor, 4);
708 TestSubscriber s1 = new TestSubscriber();
709 s1.request = false;
710 TestSubscriber s2 = new TestSubscriber();
711 s2.request = false;
712 p.subscribe(s1);
713 p.subscribe(s2);
714 s2.awaitSubscribe();
715 s1.awaitSubscribe();
716 for (int i = 1; i <= 4; ++i)
717 assertTrue(p.offer(i, null) >= 0);
718 p.offer(5, null);
719 assertTrue(p.offer(6, null) < 0);
720 s1.sn.request(64);
721 assertTrue(p.offer(7, null) < 0);
722 s2.sn.request(64);
723 p.close();
724 s2.awaitComplete();
725 assertTrue(s2.nexts >= 4);
726 s1.awaitComplete();
727 assertTrue(s1.nexts >= 4);
728 }
729
730 /**
731 * offer invokes drop handler if saturated
732 */
733 public void testHandledDroppedOffer() {
734 AtomicInteger calls = new AtomicInteger();
735 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
736 basicExecutor, 4);
737 TestSubscriber s1 = new TestSubscriber();
738 s1.request = false;
739 TestSubscriber s2 = new TestSubscriber();
740 s2.request = false;
741 p.subscribe(s1);
742 p.subscribe(s2);
743 s2.awaitSubscribe();
744 s1.awaitSubscribe();
745 for (int i = 1; i <= 4; ++i)
746 assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
747 p.offer(4, (s, x) -> noopHandle(calls));
748 assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
749 s1.sn.request(64);
750 assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
751 s2.sn.request(64);
752 p.close();
753 s2.awaitComplete();
754 s1.awaitComplete();
755 assertTrue(calls.get() >= 4);
756 }
757
758 /**
759 * offer succeeds if drop handler forces request
760 */
761 public void testRecoveredHandledDroppedOffer() {
762 AtomicInteger calls = new AtomicInteger();
763 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
764 basicExecutor, 4);
765 TestSubscriber s1 = new TestSubscriber();
766 s1.request = false;
767 TestSubscriber s2 = new TestSubscriber();
768 s2.request = false;
769 p.subscribe(s1);
770 p.subscribe(s2);
771 s2.awaitSubscribe();
772 s1.awaitSubscribe();
773 int n = 0;
774 for (int i = 1; i <= 8; ++i) {
775 int d = p.offer(i, (s, x) -> reqHandle(calls, s));
776 n = n + 2 + (d < 0 ? d : 0);
777 }
778 p.close();
779 s2.awaitComplete();
780 s1.awaitComplete();
781 assertEquals(n, s1.nexts + s2.nexts);
782 assertTrue(calls.get() >= 2);
783 }
784
785 /**
786 * Timed offer to a publisher with no subscribers returns lag 0
787 */
788 public void testEmptyTimedOffer() {
789 SubmissionPublisher<Integer> p = basicPublisher();
790 long startTime = System.nanoTime();
791 assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null));
792 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
793 }
794
795 /**
796 * Timed offer with null item or TimeUnit throws NPE
797 */
798 public void testNullTimedOffer() {
799 SubmissionPublisher<Integer> p = basicPublisher();
800 long startTime = System.nanoTime();
801 try {
802 p.offer(null, LONG_DELAY_MS, MILLISECONDS, null);
803 shouldThrow();
804 } catch (NullPointerException success) {}
805 try {
806 p.offer(1, LONG_DELAY_MS, null, null);
807 shouldThrow();
808 } catch (NullPointerException success) {}
809 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
810 }
811
812 /**
813 * Timed offer returns number of lagged items if not saturated
814 */
815 public void testLaggedTimedOffer() {
816 SubmissionPublisher<Integer> p = basicPublisher();
817 TestSubscriber s1 = new TestSubscriber();
818 s1.request = false;
819 TestSubscriber s2 = new TestSubscriber();
820 s2.request = false;
821 p.subscribe(s1);
822 p.subscribe(s2);
823 s2.awaitSubscribe();
824 s1.awaitSubscribe();
825 long startTime = System.nanoTime();
826 assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
827 assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
828 s1.sn.request(4);
829 assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
830 s2.sn.request(4);
831 p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
832 p.close();
833 s2.awaitComplete();
834 assertEquals(4, s2.nexts);
835 s1.awaitComplete();
836 assertEquals(4, s2.nexts);
837 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
838 }
839
840 /**
841 * Timed offer reports drops if saturated
842 */
843 public void testDroppedTimedOffer() {
844 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
845 basicExecutor, 4);
846 TestSubscriber s1 = new TestSubscriber();
847 s1.request = false;
848 TestSubscriber s2 = new TestSubscriber();
849 s2.request = false;
850 p.subscribe(s1);
851 p.subscribe(s2);
852 s2.awaitSubscribe();
853 s1.awaitSubscribe();
854 long delay = timeoutMillis();
855 for (int i = 1; i <= 4; ++i)
856 assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0);
857 long startTime = System.nanoTime();
858 assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0);
859 s1.sn.request(64);
860 assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0);
861 // 2 * delay should elapse but check only 1 * delay to allow timer slop
862 assertTrue(millisElapsedSince(startTime) >= delay);
863 s2.sn.request(64);
864 p.close();
865 s2.awaitComplete();
866 assertTrue(s2.nexts >= 2);
867 s1.awaitComplete();
868 assertTrue(s1.nexts >= 2);
869 }
870
871 /**
872 * Timed offer invokes drop handler if saturated
873 */
874 public void testHandledDroppedTimedOffer() {
875 AtomicInteger calls = new AtomicInteger();
876 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
877 basicExecutor, 4);
878 TestSubscriber s1 = new TestSubscriber();
879 s1.request = false;
880 TestSubscriber s2 = new TestSubscriber();
881 s2.request = false;
882 p.subscribe(s1);
883 p.subscribe(s2);
884 s2.awaitSubscribe();
885 s1.awaitSubscribe();
886 long delay = timeoutMillis();
887 for (int i = 1; i <= 4; ++i)
888 assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
889 long startTime = System.nanoTime();
890 assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
891 s1.sn.request(64);
892 assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
893 assertTrue(millisElapsedSince(startTime) >= delay);
894 s2.sn.request(64);
895 p.close();
896 s2.awaitComplete();
897 s1.awaitComplete();
898 assertTrue(calls.get() >= 2);
899 }
900
901 /**
902 * Timed offer succeeds if drop handler forces request
903 */
904 public void testRecoveredHandledDroppedTimedOffer() {
905 AtomicInteger calls = new AtomicInteger();
906 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
907 basicExecutor, 4);
908 TestSubscriber s1 = new TestSubscriber();
909 s1.request = false;
910 TestSubscriber s2 = new TestSubscriber();
911 s2.request = false;
912 p.subscribe(s1);
913 p.subscribe(s2);
914 s2.awaitSubscribe();
915 s1.awaitSubscribe();
916 int n = 0;
917 long delay = timeoutMillis();
918 long startTime = System.nanoTime();
919 for (int i = 1; i <= 6; ++i) {
920 int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
921 n = n + 2 + (d < 0 ? d : 0);
922 }
923 assertTrue(millisElapsedSince(startTime) >= delay);
924 p.close();
925 s2.awaitComplete();
926 s1.awaitComplete();
927 assertEquals(n, s1.nexts + s2.nexts);
928 assertTrue(calls.get() >= 2);
929 }
930
931 /**
932 * consume returns a CompletableFuture that is done when
933 * publisher completes
934 */
935 public void testConsume() {
936 AtomicInteger sum = new AtomicInteger();
937 SubmissionPublisher<Integer> p = basicPublisher();
938 CompletableFuture<Void> f =
939 p.consume((Integer x) -> sum.getAndAdd(x.intValue()));
940 int n = 20;
941 for (int i = 1; i <= n; ++i)
942 p.submit(i);
943 p.close();
944 f.join();
945 assertEquals((n * (n + 1)) / 2, sum.get());
946 }
947
948 /**
949 * consume(null) throws NPE
950 */
951 public void testConsumeNPE() {
952 SubmissionPublisher<Integer> p = basicPublisher();
953 try {
954 CompletableFuture<Void> f = p.consume(null);
955 shouldThrow();
956 } catch (NullPointerException success) {}
957 }
958
959 /**
960 * consume eventually stops processing published items if cancelled
961 */
962 public void testCancelledConsume() {
963 AtomicInteger count = new AtomicInteger();
964 SubmissionPublisher<Integer> p = basicPublisher();
965 CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
966 f.cancel(true);
967 int n = 1000000; // arbitrary limit
968 for (int i = 1; i <= n; ++i)
969 p.submit(i);
970 assertTrue(count.get() < n);
971 }
972
973 }