ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SubmissionPublisherTest.java
Revision: 1.21
Committed: Mon May 29 22:44:27 2017 UTC (6 years, 11 months ago) by jsr166
Branch: MAIN
Changes since 1.20: +9 -7 lines
Log Message:
more timeout handling rework; remove most uses of MEDIUM_DELAY_MS; randomize timeouts and TimeUnits; write out IAE and ISE

File Contents

# Content
1 /*
2 * Written by Doug Lea and Martin Buchholz with assistance from
3 * members of JCP JSR-166 Expert Group and released to the public
4 * domain, as explained at
5 * http://creativecommons.org/publicdomain/zero/1.0/
6 */
7
8 import java.util.concurrent.CompletableFuture;
9 import java.util.concurrent.Executor;
10 import java.util.concurrent.Executors;
11 import java.util.concurrent.Flow;
12 import java.util.concurrent.ForkJoinPool;
13 import java.util.concurrent.SubmissionPublisher;
14 import java.util.concurrent.atomic.AtomicInteger;
15 import junit.framework.Test;
16 import junit.framework.TestSuite;
17
18 import static java.util.concurrent.Flow.Subscriber;
19 import static java.util.concurrent.Flow.Subscription;
20 import static java.util.concurrent.TimeUnit.MILLISECONDS;
21
22 public class SubmissionPublisherTest extends JSR166TestCase {
23
24 public static void main(String[] args) {
25 main(suite(), args);
26 }
27 public static Test suite() {
28 return new TestSuite(SubmissionPublisherTest.class);
29 }
30
31 final Executor basicExecutor = basicPublisher().getExecutor();
32
33 static SubmissionPublisher<Integer> basicPublisher() {
34 return new SubmissionPublisher<Integer>();
35 }
36
37 static class SPException extends RuntimeException {}
38
39 class TestSubscriber implements Subscriber<Integer> {
40 volatile Subscription sn;
41 int last; // Requires that onNexts are in numeric order
42 volatile int nexts;
43 volatile int errors;
44 volatile int completes;
45 volatile boolean throwOnCall = false;
46 volatile boolean request = true;
47 volatile Throwable lastError;
48
49 public synchronized void onSubscribe(Subscription s) {
50 threadAssertTrue(sn == null);
51 sn = s;
52 notifyAll();
53 if (throwOnCall)
54 throw new SPException();
55 if (request)
56 sn.request(1L);
57 }
58 public synchronized void onNext(Integer t) {
59 ++nexts;
60 notifyAll();
61 int current = t.intValue();
62 threadAssertTrue(current >= last);
63 last = current;
64 if (request)
65 sn.request(1L);
66 if (throwOnCall)
67 throw new SPException();
68 }
69 public synchronized void onError(Throwable t) {
70 threadAssertTrue(completes == 0);
71 threadAssertTrue(errors == 0);
72 lastError = t;
73 ++errors;
74 notifyAll();
75 }
76 public synchronized void onComplete() {
77 threadAssertTrue(completes == 0);
78 ++completes;
79 notifyAll();
80 }
81
82 synchronized void awaitSubscribe() {
83 while (sn == null) {
84 try {
85 wait();
86 } catch (Exception ex) {
87 threadUnexpectedException(ex);
88 break;
89 }
90 }
91 }
92 synchronized void awaitNext(int n) {
93 while (nexts < n) {
94 try {
95 wait();
96 } catch (Exception ex) {
97 threadUnexpectedException(ex);
98 break;
99 }
100 }
101 }
102 synchronized void awaitComplete() {
103 while (completes == 0 && errors == 0) {
104 try {
105 wait();
106 } catch (Exception ex) {
107 threadUnexpectedException(ex);
108 break;
109 }
110 }
111 }
112 synchronized void awaitError() {
113 while (errors == 0) {
114 try {
115 wait();
116 } catch (Exception ex) {
117 threadUnexpectedException(ex);
118 break;
119 }
120 }
121 }
122
123 }
124
125 /**
126 * A new SubmissionPublisher has no subscribers, a non-null
127 * executor, a power-of-two capacity, is not closed, and reports
128 * zero demand and lag
129 */
130 void checkInitialState(SubmissionPublisher<?> p) {
131 assertFalse(p.hasSubscribers());
132 assertEquals(0, p.getNumberOfSubscribers());
133 assertTrue(p.getSubscribers().isEmpty());
134 assertFalse(p.isClosed());
135 assertNull(p.getClosedException());
136 int n = p.getMaxBufferCapacity();
137 assertTrue((n & (n - 1)) == 0); // power of two
138 assertNotNull(p.getExecutor());
139 assertEquals(0, p.estimateMinimumDemand());
140 assertEquals(0, p.estimateMaximumLag());
141 }
142
143 /**
144 * A default-constructed SubmissionPublisher has no subscribers,
145 * is not closed, has default buffer size, and uses the
146 * defaultExecutor
147 */
148 public void testConstructor1() {
149 SubmissionPublisher<Integer> p = new SubmissionPublisher<>();
150 checkInitialState(p);
151 assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
152 Executor e = p.getExecutor(), c = ForkJoinPool.commonPool();
153 if (ForkJoinPool.getCommonPoolParallelism() > 1)
154 assertSame(e, c);
155 else
156 assertNotSame(e, c);
157 }
158
159 /**
160 * A new SubmissionPublisher has no subscribers, is not closed,
161 * has the given buffer size, and uses the given executor
162 */
163 public void testConstructor2() {
164 Executor e = Executors.newFixedThreadPool(1);
165 SubmissionPublisher<Integer> p = new SubmissionPublisher<>(e, 8);
166 checkInitialState(p);
167 assertSame(p.getExecutor(), e);
168 assertEquals(8, p.getMaxBufferCapacity());
169 }
170
171 /**
172 * A null Executor argument to SubmissionPublisher constructor
173 * throws NullPointerException
174 */
175 public void testConstructor3() {
176 try {
177 new SubmissionPublisher<Integer>(null, 8);
178 shouldThrow();
179 } catch (NullPointerException success) {}
180 }
181
182 /**
183 * A negative capacity argument to SubmissionPublisher constructor
184 * throws IllegalArgumentException
185 */
186 public void testConstructor4() {
187 Executor e = Executors.newFixedThreadPool(1);
188 try {
189 new SubmissionPublisher<Integer>(e, -1);
190 shouldThrow();
191 } catch (IllegalArgumentException success) {}
192 }
193
194 /**
195 * A closed publisher reports isClosed with no closedException and
196 * throws IllegalStateException upon attempted submission; a
197 * subsequent close or closeExceptionally has no additional
198 * effect.
199 */
200 public void testClose() {
201 SubmissionPublisher<Integer> p = basicPublisher();
202 checkInitialState(p);
203 p.close();
204 assertTrue(p.isClosed());
205 assertNull(p.getClosedException());
206 try {
207 p.submit(1);
208 shouldThrow();
209 } catch (IllegalStateException success) {}
210 Throwable ex = new SPException();
211 p.closeExceptionally(ex);
212 assertTrue(p.isClosed());
213 assertNull(p.getClosedException());
214 }
215
216 /**
217 * A publisher closedExceptionally reports isClosed with the
218 * closedException and throws IllegalStateException upon attempted
219 * submission; a subsequent close or closeExceptionally has no
220 * additional effect.
221 */
222 public void testCloseExceptionally() {
223 SubmissionPublisher<Integer> p = basicPublisher();
224 checkInitialState(p);
225 Throwable ex = new SPException();
226 p.closeExceptionally(ex);
227 assertTrue(p.isClosed());
228 assertSame(p.getClosedException(), ex);
229 try {
230 p.submit(1);
231 shouldThrow();
232 } catch (IllegalStateException success) {}
233 p.close();
234 assertTrue(p.isClosed());
235 assertSame(p.getClosedException(), ex);
236 }
237
238 /**
239 * Upon subscription, the subscriber's onSubscribe is called, no
240 * other Subscriber methods are invoked, the publisher
241 * hasSubscribers, isSubscribed is true, and existing
242 * subscriptions are unaffected.
243 */
244 public void testSubscribe1() {
245 TestSubscriber s = new TestSubscriber();
246 SubmissionPublisher<Integer> p = basicPublisher();
247 p.subscribe(s);
248 assertTrue(p.hasSubscribers());
249 assertEquals(1, p.getNumberOfSubscribers());
250 assertTrue(p.getSubscribers().contains(s));
251 assertTrue(p.isSubscribed(s));
252 s.awaitSubscribe();
253 assertNotNull(s.sn);
254 assertEquals(0, s.nexts);
255 assertEquals(0, s.errors);
256 assertEquals(0, s.completes);
257 TestSubscriber s2 = new TestSubscriber();
258 p.subscribe(s2);
259 assertTrue(p.hasSubscribers());
260 assertEquals(2, p.getNumberOfSubscribers());
261 assertTrue(p.getSubscribers().contains(s));
262 assertTrue(p.getSubscribers().contains(s2));
263 assertTrue(p.isSubscribed(s));
264 assertTrue(p.isSubscribed(s2));
265 s2.awaitSubscribe();
266 assertNotNull(s2.sn);
267 assertEquals(0, s2.nexts);
268 assertEquals(0, s2.errors);
269 assertEquals(0, s2.completes);
270 p.close();
271 }
272
273 /**
274 * If closed, upon subscription, the subscriber's onComplete
275 * method is invoked
276 */
277 public void testSubscribe2() {
278 TestSubscriber s = new TestSubscriber();
279 SubmissionPublisher<Integer> p = basicPublisher();
280 p.close();
281 p.subscribe(s);
282 s.awaitComplete();
283 assertEquals(0, s.nexts);
284 assertEquals(0, s.errors);
285 assertEquals(1, s.completes, 1);
286 }
287
288 /**
289 * If closedExceptionally, upon subscription, the subscriber's
290 * onError method is invoked
291 */
292 public void testSubscribe3() {
293 TestSubscriber s = new TestSubscriber();
294 SubmissionPublisher<Integer> p = basicPublisher();
295 Throwable ex = new SPException();
296 p.closeExceptionally(ex);
297 assertTrue(p.isClosed());
298 assertSame(p.getClosedException(), ex);
299 p.subscribe(s);
300 s.awaitError();
301 assertEquals(0, s.nexts);
302 assertEquals(1, s.errors);
303 }
304
305 /**
306 * Upon attempted resubscription, the subscriber's onError is
307 * called and the subscription is cancelled.
308 */
309 public void testSubscribe4() {
310 TestSubscriber s = new TestSubscriber();
311 SubmissionPublisher<Integer> p = basicPublisher();
312 p.subscribe(s);
313 assertTrue(p.hasSubscribers());
314 assertEquals(1, p.getNumberOfSubscribers());
315 assertTrue(p.getSubscribers().contains(s));
316 assertTrue(p.isSubscribed(s));
317 s.awaitSubscribe();
318 assertNotNull(s.sn);
319 assertEquals(0, s.nexts);
320 assertEquals(0, s.errors);
321 assertEquals(0, s.completes);
322 p.subscribe(s);
323 s.awaitError();
324 assertEquals(0, s.nexts);
325 assertEquals(1, s.errors);
326 assertFalse(p.isSubscribed(s));
327 }
328
329 /**
330 * An exception thrown in onSubscribe causes onError
331 */
332 public void testSubscribe5() {
333 TestSubscriber s = new TestSubscriber();
334 SubmissionPublisher<Integer> p = basicPublisher();
335 s.throwOnCall = true;
336 try {
337 p.subscribe(s);
338 } catch (Exception ok) {}
339 s.awaitError();
340 assertEquals(0, s.nexts);
341 assertEquals(1, s.errors);
342 assertEquals(0, s.completes);
343 }
344
345 /**
346 * subscribe(null) throws NPE
347 */
348 public void testSubscribe6() {
349 SubmissionPublisher<Integer> p = basicPublisher();
350 try {
351 p.subscribe(null);
352 shouldThrow();
353 } catch (NullPointerException success) {}
354 checkInitialState(p);
355 }
356
357 /**
358 * Closing a publisher causes onComplete to subscribers
359 */
360 public void testCloseCompletes() {
361 SubmissionPublisher<Integer> p = basicPublisher();
362 TestSubscriber s1 = new TestSubscriber();
363 TestSubscriber s2 = new TestSubscriber();
364 p.subscribe(s1);
365 p.subscribe(s2);
366 p.submit(1);
367 p.close();
368 assertTrue(p.isClosed());
369 assertNull(p.getClosedException());
370 s1.awaitComplete();
371 assertEquals(1, s1.nexts);
372 assertEquals(1, s1.completes);
373 s2.awaitComplete();
374 assertEquals(1, s2.nexts);
375 assertEquals(1, s2.completes);
376 }
377
378 /**
379 * Closing a publisher exceptionally causes onError to subscribers
380 * after they are subscribed
381 */
382 public void testCloseExceptionallyError() {
383 SubmissionPublisher<Integer> p = basicPublisher();
384 TestSubscriber s1 = new TestSubscriber();
385 TestSubscriber s2 = new TestSubscriber();
386 p.subscribe(s1);
387 p.subscribe(s2);
388 p.submit(1);
389 p.closeExceptionally(new SPException());
390 assertTrue(p.isClosed());
391 s1.awaitSubscribe();
392 s1.awaitError();
393 assertTrue(s1.nexts <= 1);
394 assertEquals(1, s1.errors);
395 s2.awaitSubscribe();
396 s2.awaitError();
397 assertTrue(s2.nexts <= 1);
398 assertEquals(1, s2.errors);
399 }
400
401 /**
402 * Cancelling a subscription eventually causes no more onNexts to be issued
403 */
404 public void testCancel() {
405 SubmissionPublisher<Integer> p = basicPublisher();
406 TestSubscriber s1 = new TestSubscriber();
407 TestSubscriber s2 = new TestSubscriber();
408 p.subscribe(s1);
409 p.subscribe(s2);
410 s1.awaitSubscribe();
411 p.submit(1);
412 s1.sn.cancel();
413 for (int i = 2; i <= 20; ++i)
414 p.submit(i);
415 p.close();
416 s2.awaitComplete();
417 assertEquals(20, s2.nexts);
418 assertEquals(1, s2.completes);
419 assertTrue(s1.nexts < 20);
420 assertFalse(p.isSubscribed(s1));
421 }
422
423 /**
424 * Throwing an exception in onNext causes onError
425 */
426 public void testThrowOnNext() {
427 SubmissionPublisher<Integer> p = basicPublisher();
428 TestSubscriber s1 = new TestSubscriber();
429 TestSubscriber s2 = new TestSubscriber();
430 p.subscribe(s1);
431 p.subscribe(s2);
432 s1.awaitSubscribe();
433 p.submit(1);
434 s1.throwOnCall = true;
435 p.submit(2);
436 p.close();
437 s2.awaitComplete();
438 assertEquals(2, s2.nexts);
439 s1.awaitComplete();
440 assertEquals(1, s1.errors);
441 }
442
443 /**
444 * If a handler is supplied in constructor, it is invoked when
445 * subscriber throws an exception in onNext
446 */
447 public void testThrowOnNextHandler() {
448 AtomicInteger calls = new AtomicInteger();
449 SubmissionPublisher<Integer> p = new SubmissionPublisher<>(
450 basicExecutor, 8, (s, e) -> calls.getAndIncrement());
451 TestSubscriber s1 = new TestSubscriber();
452 TestSubscriber s2 = new TestSubscriber();
453 p.subscribe(s1);
454 p.subscribe(s2);
455 s1.awaitSubscribe();
456 p.submit(1);
457 s1.throwOnCall = true;
458 p.submit(2);
459 p.close();
460 s2.awaitComplete();
461 assertEquals(2, s2.nexts);
462 assertEquals(1, s2.completes);
463 s1.awaitError();
464 assertEquals(1, s1.errors);
465 assertEquals(1, calls.get());
466 }
467
468 /**
469 * onNext items are issued in the same order to each subscriber
470 */
471 public void testOrder() {
472 SubmissionPublisher<Integer> p = basicPublisher();
473 TestSubscriber s1 = new TestSubscriber();
474 TestSubscriber s2 = new TestSubscriber();
475 p.subscribe(s1);
476 p.subscribe(s2);
477 for (int i = 1; i <= 20; ++i)
478 p.submit(i);
479 p.close();
480 s2.awaitComplete();
481 s1.awaitComplete();
482 assertEquals(20, s2.nexts);
483 assertEquals(1, s2.completes);
484 assertEquals(20, s1.nexts);
485 assertEquals(1, s1.completes);
486 }
487
488 /**
489 * onNext is issued only if requested
490 */
491 public void testRequest1() {
492 SubmissionPublisher<Integer> p = basicPublisher();
493 TestSubscriber s1 = new TestSubscriber();
494 s1.request = false;
495 p.subscribe(s1);
496 s1.awaitSubscribe();
497 assertEquals(0, p.estimateMinimumDemand());
498 TestSubscriber s2 = new TestSubscriber();
499 p.subscribe(s2);
500 p.submit(1);
501 p.submit(2);
502 s2.awaitNext(1);
503 assertEquals(0, s1.nexts);
504 s1.sn.request(3);
505 p.submit(3);
506 p.close();
507 s2.awaitComplete();
508 assertEquals(3, s2.nexts);
509 assertEquals(1, s2.completes);
510 s1.awaitComplete();
511 assertTrue(s1.nexts > 0);
512 assertEquals(1, s1.completes);
513 }
514
515 /**
516 * onNext is not issued when requests become zero
517 */
518 public void testRequest2() {
519 SubmissionPublisher<Integer> p = basicPublisher();
520 TestSubscriber s1 = new TestSubscriber();
521 TestSubscriber s2 = new TestSubscriber();
522 p.subscribe(s1);
523 p.subscribe(s2);
524 s2.awaitSubscribe();
525 s1.awaitSubscribe();
526 s1.request = false;
527 p.submit(1);
528 p.submit(2);
529 p.close();
530 s2.awaitComplete();
531 assertEquals(2, s2.nexts);
532 assertEquals(1, s2.completes);
533 s1.awaitNext(1);
534 assertEquals(1, s1.nexts);
535 }
536
537 /**
538 * Non-positive request causes error
539 */
540 public void testRequest3() {
541 SubmissionPublisher<Integer> p = basicPublisher();
542 TestSubscriber s1 = new TestSubscriber();
543 TestSubscriber s2 = new TestSubscriber();
544 TestSubscriber s3 = new TestSubscriber();
545 p.subscribe(s1);
546 p.subscribe(s2);
547 p.subscribe(s3);
548 s3.awaitSubscribe();
549 s2.awaitSubscribe();
550 s1.awaitSubscribe();
551 s1.sn.request(-1L);
552 s3.sn.request(0L);
553 p.submit(1);
554 p.submit(2);
555 p.close();
556 s2.awaitComplete();
557 assertEquals(2, s2.nexts);
558 assertEquals(1, s2.completes);
559 s1.awaitError();
560 assertEquals(1, s1.errors);
561 assertTrue(s1.lastError instanceof IllegalArgumentException);
562 s3.awaitError();
563 assertEquals(1, s3.errors);
564 assertTrue(s3.lastError instanceof IllegalArgumentException);
565 }
566
567 /**
568 * estimateMinimumDemand reports 0 until request, nonzero after
569 * request, and zero again after delivery
570 */
571 public void testEstimateMinimumDemand() {
572 TestSubscriber s = new TestSubscriber();
573 SubmissionPublisher<Integer> p = basicPublisher();
574 s.request = false;
575 p.subscribe(s);
576 s.awaitSubscribe();
577 assertEquals(0, p.estimateMinimumDemand());
578 s.sn.request(1);
579 assertEquals(1, p.estimateMinimumDemand());
580 p.submit(1);
581 s.awaitNext(1);
582 assertEquals(0, p.estimateMinimumDemand());
583 }
584
585 /**
586 * submit to a publisher with no subscribers returns lag 0
587 */
588 public void testEmptySubmit() {
589 SubmissionPublisher<Integer> p = basicPublisher();
590 assertEquals(0, p.submit(1));
591 }
592
593 /**
594 * submit(null) throws NPE
595 */
596 public void testNullSubmit() {
597 SubmissionPublisher<Integer> p = basicPublisher();
598 try {
599 p.submit(null);
600 shouldThrow();
601 } catch (NullPointerException success) {}
602 }
603
604 /**
605 * submit returns number of lagged items, compatible with result
606 * of estimateMaximumLag.
607 */
608 public void testLaggedSubmit() {
609 SubmissionPublisher<Integer> p = basicPublisher();
610 TestSubscriber s1 = new TestSubscriber();
611 s1.request = false;
612 TestSubscriber s2 = new TestSubscriber();
613 s2.request = false;
614 p.subscribe(s1);
615 p.subscribe(s2);
616 s2.awaitSubscribe();
617 s1.awaitSubscribe();
618 assertEquals(1, p.submit(1));
619 assertTrue(p.estimateMaximumLag() >= 1);
620 assertTrue(p.submit(2) >= 2);
621 assertTrue(p.estimateMaximumLag() >= 2);
622 s1.sn.request(4);
623 assertTrue(p.submit(3) >= 3);
624 assertTrue(p.estimateMaximumLag() >= 3);
625 s2.sn.request(4);
626 p.submit(4);
627 p.close();
628 s2.awaitComplete();
629 assertEquals(4, s2.nexts);
630 s1.awaitComplete();
631 assertEquals(4, s2.nexts);
632 }
633
634 /**
635 * submit eventually issues requested items when buffer capacity is 1
636 */
637 public void testCap1Submit() {
638 SubmissionPublisher<Integer> p
639 = new SubmissionPublisher<>(basicExecutor, 1);
640 TestSubscriber s1 = new TestSubscriber();
641 TestSubscriber s2 = new TestSubscriber();
642 p.subscribe(s1);
643 p.subscribe(s2);
644 for (int i = 1; i <= 20; ++i) {
645 assertTrue(p.estimateMinimumDemand() <= 1);
646 assertTrue(p.submit(i) >= 0);
647 }
648 p.close();
649 s2.awaitComplete();
650 s1.awaitComplete();
651 assertEquals(20, s2.nexts);
652 assertEquals(1, s2.completes);
653 assertEquals(20, s1.nexts);
654 assertEquals(1, s1.completes);
655 }
656
657 static boolean noopHandle(AtomicInteger count) {
658 count.getAndIncrement();
659 return false;
660 }
661
662 static boolean reqHandle(AtomicInteger count, Subscriber s) {
663 count.getAndIncrement();
664 ((TestSubscriber)s).sn.request(Long.MAX_VALUE);
665 return true;
666 }
667
668 /**
669 * offer to a publisher with no subscribers returns lag 0
670 */
671 public void testEmptyOffer() {
672 SubmissionPublisher<Integer> p = basicPublisher();
673 assertEquals(0, p.offer(1, null));
674 }
675
676 /**
677 * offer(null) throws NPE
678 */
679 public void testNullOffer() {
680 SubmissionPublisher<Integer> p = basicPublisher();
681 try {
682 p.offer(null, null);
683 shouldThrow();
684 } catch (NullPointerException success) {}
685 }
686
687 /**
688 * offer returns number of lagged items if not saturated
689 */
690 public void testLaggedOffer() {
691 SubmissionPublisher<Integer> p = basicPublisher();
692 TestSubscriber s1 = new TestSubscriber();
693 s1.request = false;
694 TestSubscriber s2 = new TestSubscriber();
695 s2.request = false;
696 p.subscribe(s1);
697 p.subscribe(s2);
698 s2.awaitSubscribe();
699 s1.awaitSubscribe();
700 assertTrue(p.offer(1, null) >= 1);
701 assertTrue(p.offer(2, null) >= 2);
702 s1.sn.request(4);
703 assertTrue(p.offer(3, null) >= 3);
704 s2.sn.request(4);
705 p.offer(4, null);
706 p.close();
707 s2.awaitComplete();
708 assertEquals(4, s2.nexts);
709 s1.awaitComplete();
710 assertEquals(4, s2.nexts);
711 }
712
713 /**
714 * offer reports drops if saturated
715 */
716 public void testDroppedOffer() {
717 SubmissionPublisher<Integer> p
718 = new SubmissionPublisher<>(basicExecutor, 4);
719 TestSubscriber s1 = new TestSubscriber();
720 s1.request = false;
721 TestSubscriber s2 = new TestSubscriber();
722 s2.request = false;
723 p.subscribe(s1);
724 p.subscribe(s2);
725 s2.awaitSubscribe();
726 s1.awaitSubscribe();
727 for (int i = 1; i <= 4; ++i)
728 assertTrue(p.offer(i, null) >= 0);
729 p.offer(5, null);
730 assertTrue(p.offer(6, null) < 0);
731 s1.sn.request(64);
732 assertTrue(p.offer(7, null) < 0);
733 s2.sn.request(64);
734 p.close();
735 s2.awaitComplete();
736 assertTrue(s2.nexts >= 4);
737 s1.awaitComplete();
738 assertTrue(s1.nexts >= 4);
739 }
740
741 /**
742 * offer invokes drop handler if saturated
743 */
744 public void testHandledDroppedOffer() {
745 AtomicInteger calls = new AtomicInteger();
746 SubmissionPublisher<Integer> p
747 = new SubmissionPublisher<>(basicExecutor, 4);
748 TestSubscriber s1 = new TestSubscriber();
749 s1.request = false;
750 TestSubscriber s2 = new TestSubscriber();
751 s2.request = false;
752 p.subscribe(s1);
753 p.subscribe(s2);
754 s2.awaitSubscribe();
755 s1.awaitSubscribe();
756 for (int i = 1; i <= 4; ++i)
757 assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
758 p.offer(4, (s, x) -> noopHandle(calls));
759 assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
760 s1.sn.request(64);
761 assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
762 s2.sn.request(64);
763 p.close();
764 s2.awaitComplete();
765 s1.awaitComplete();
766 assertTrue(calls.get() >= 4);
767 }
768
769 /**
770 * offer succeeds if drop handler forces request
771 */
772 public void testRecoveredHandledDroppedOffer() {
773 AtomicInteger calls = new AtomicInteger();
774 SubmissionPublisher<Integer> p
775 = new SubmissionPublisher<>(basicExecutor, 4);
776 TestSubscriber s1 = new TestSubscriber();
777 s1.request = false;
778 TestSubscriber s2 = new TestSubscriber();
779 s2.request = false;
780 p.subscribe(s1);
781 p.subscribe(s2);
782 s2.awaitSubscribe();
783 s1.awaitSubscribe();
784 int n = 0;
785 for (int i = 1; i <= 8; ++i) {
786 int d = p.offer(i, (s, x) -> reqHandle(calls, s));
787 n = n + 2 + (d < 0 ? d : 0);
788 }
789 p.close();
790 s2.awaitComplete();
791 s1.awaitComplete();
792 assertEquals(n, s1.nexts + s2.nexts);
793 assertTrue(calls.get() >= 2);
794 }
795
796 /**
797 * Timed offer to a publisher with no subscribers returns lag 0
798 */
799 public void testEmptyTimedOffer() {
800 SubmissionPublisher<Integer> p = basicPublisher();
801 long startTime = System.nanoTime();
802 assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null));
803 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
804 }
805
806 /**
807 * Timed offer with null item or TimeUnit throws NPE
808 */
809 public void testNullTimedOffer() {
810 SubmissionPublisher<Integer> p = basicPublisher();
811 long startTime = System.nanoTime();
812 try {
813 p.offer(null, LONG_DELAY_MS, MILLISECONDS, null);
814 shouldThrow();
815 } catch (NullPointerException success) {}
816 try {
817 p.offer(1, LONG_DELAY_MS, null, null);
818 shouldThrow();
819 } catch (NullPointerException success) {}
820 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
821 }
822
823 /**
824 * Timed offer returns number of lagged items if not saturated
825 */
826 public void testLaggedTimedOffer() {
827 SubmissionPublisher<Integer> p = basicPublisher();
828 TestSubscriber s1 = new TestSubscriber();
829 s1.request = false;
830 TestSubscriber s2 = new TestSubscriber();
831 s2.request = false;
832 p.subscribe(s1);
833 p.subscribe(s2);
834 s2.awaitSubscribe();
835 s1.awaitSubscribe();
836 long startTime = System.nanoTime();
837 assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
838 assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
839 s1.sn.request(4);
840 assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
841 s2.sn.request(4);
842 p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
843 p.close();
844 s2.awaitComplete();
845 assertEquals(4, s2.nexts);
846 s1.awaitComplete();
847 assertEquals(4, s2.nexts);
848 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
849 }
850
851 /**
852 * Timed offer reports drops if saturated
853 */
854 public void testDroppedTimedOffer() {
855 SubmissionPublisher<Integer> p
856 = new SubmissionPublisher<>(basicExecutor, 4);
857 TestSubscriber s1 = new TestSubscriber();
858 s1.request = false;
859 TestSubscriber s2 = new TestSubscriber();
860 s2.request = false;
861 p.subscribe(s1);
862 p.subscribe(s2);
863 s2.awaitSubscribe();
864 s1.awaitSubscribe();
865 long delay = timeoutMillis();
866 for (int i = 1; i <= 4; ++i)
867 assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0);
868 long startTime = System.nanoTime();
869 assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0);
870 s1.sn.request(64);
871 assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0);
872 // 2 * delay should elapse but check only 1 * delay to allow timer slop
873 assertTrue(millisElapsedSince(startTime) >= delay);
874 s2.sn.request(64);
875 p.close();
876 s2.awaitComplete();
877 assertTrue(s2.nexts >= 2);
878 s1.awaitComplete();
879 assertTrue(s1.nexts >= 2);
880 }
881
882 /**
883 * Timed offer invokes drop handler if saturated
884 */
885 public void testHandledDroppedTimedOffer() {
886 AtomicInteger calls = new AtomicInteger();
887 SubmissionPublisher<Integer> p
888 = new SubmissionPublisher<>(basicExecutor, 4);
889 TestSubscriber s1 = new TestSubscriber();
890 s1.request = false;
891 TestSubscriber s2 = new TestSubscriber();
892 s2.request = false;
893 p.subscribe(s1);
894 p.subscribe(s2);
895 s2.awaitSubscribe();
896 s1.awaitSubscribe();
897 long delay = timeoutMillis();
898 for (int i = 1; i <= 4; ++i)
899 assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
900 long startTime = System.nanoTime();
901 assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
902 s1.sn.request(64);
903 assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
904 assertTrue(millisElapsedSince(startTime) >= delay);
905 s2.sn.request(64);
906 p.close();
907 s2.awaitComplete();
908 s1.awaitComplete();
909 assertTrue(calls.get() >= 2);
910 }
911
912 /**
913 * Timed offer succeeds if drop handler forces request
914 */
915 public void testRecoveredHandledDroppedTimedOffer() {
916 AtomicInteger calls = new AtomicInteger();
917 SubmissionPublisher<Integer> p
918 = new SubmissionPublisher<>(basicExecutor, 4);
919 TestSubscriber s1 = new TestSubscriber();
920 s1.request = false;
921 TestSubscriber s2 = new TestSubscriber();
922 s2.request = false;
923 p.subscribe(s1);
924 p.subscribe(s2);
925 s2.awaitSubscribe();
926 s1.awaitSubscribe();
927 int n = 0;
928 long delay = timeoutMillis();
929 long startTime = System.nanoTime();
930 for (int i = 1; i <= 6; ++i) {
931 int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
932 n = n + 2 + (d < 0 ? d : 0);
933 }
934 assertTrue(millisElapsedSince(startTime) >= delay);
935 p.close();
936 s2.awaitComplete();
937 s1.awaitComplete();
938 assertEquals(n, s1.nexts + s2.nexts);
939 assertTrue(calls.get() >= 2);
940 }
941
942 /**
943 * consume returns a CompletableFuture that is done when
944 * publisher completes
945 */
946 public void testConsume() {
947 AtomicInteger sum = new AtomicInteger();
948 SubmissionPublisher<Integer> p = basicPublisher();
949 CompletableFuture<Void> f =
950 p.consume((Integer x) -> sum.getAndAdd(x.intValue()));
951 int n = 20;
952 for (int i = 1; i <= n; ++i)
953 p.submit(i);
954 p.close();
955 f.join();
956 assertEquals((n * (n + 1)) / 2, sum.get());
957 }
958
959 /**
960 * consume(null) throws NPE
961 */
962 public void testConsumeNPE() {
963 SubmissionPublisher<Integer> p = basicPublisher();
964 try {
965 CompletableFuture<Void> f = p.consume(null);
966 shouldThrow();
967 } catch (NullPointerException success) {}
968 }
969
970 /**
971 * consume eventually stops processing published items if cancelled
972 */
973 public void testCancelledConsume() {
974 AtomicInteger count = new AtomicInteger();
975 SubmissionPublisher<Integer> p = basicPublisher();
976 CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
977 f.cancel(true);
978 int n = 1000000; // arbitrary limit
979 for (int i = 1; i <= n; ++i)
980 p.submit(i);
981 assertTrue(count.get() < n);
982 }
983
984 }