ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SubmissionPublisherTest.java
Revision: 1.12
Committed: Sat Sep 12 17:18:13 2015 UTC (8 years, 7 months ago) by dl
Branch: MAIN
Changes since 1.11: +9 -20 lines
Log Message:
Use same default executor conventions as CompletableFuture

File Contents

# Content
1 /*
2 * Written by Doug Lea and Martin Buchholz with assistance from
3 * members of JCP JSR-166 Expert Group and released to the public
4 * domain, as explained at
5 * http://creativecommons.org/publicdomain/zero/1.0/
6 */
7
8 import java.util.concurrent.CompletableFuture;
9 import java.util.concurrent.Executor;
10 import java.util.concurrent.Executors;
11 import java.util.concurrent.Flow;
12 import java.util.concurrent.ForkJoinPool;
13 import java.util.concurrent.LinkedBlockingQueue;
14 import java.util.concurrent.SubmissionPublisher;
15 import java.util.concurrent.ThreadFactory;
16 import java.util.concurrent.ThreadPoolExecutor;
17 import java.util.concurrent.TimeUnit;
18 import java.util.concurrent.atomic.AtomicInteger;
19 import java.util.function.BiConsumer;
20 import java.util.function.BiFunction;
21 import java.util.function.BiPredicate;
22 import java.util.stream.Stream;
23 import junit.framework.Test;
24 import junit.framework.TestSuite;
25
26 import static java.util.concurrent.Flow.Publisher;
27 import static java.util.concurrent.Flow.Subscriber;
28 import static java.util.concurrent.Flow.Subscription;
29 import static java.util.concurrent.TimeUnit.MILLISECONDS;
30 import static java.util.concurrent.TimeUnit.SECONDS;
31
32 public class SubmissionPublisherTest extends JSR166TestCase {
33
34 public static void main(String[] args) {
35 main(suite(), args);
36 }
37 public static Test suite() {
38 return new TestSuite(SubmissionPublisherTest.class);
39 }
40
41 final Executor basicExecutor = basicPublisher().getExecutor();
42
43 static SubmissionPublisher<Integer> basicPublisher() {
44 return new SubmissionPublisher<Integer>();
45 }
46
47 static class SPException extends RuntimeException {}
48
49 class TestSubscriber implements Subscriber<Integer> {
50 volatile Subscription sn;
51 int last; // Requires that onNexts are in numeric order
52 volatile int nexts;
53 volatile int errors;
54 volatile int completes;
55 volatile boolean throwOnCall = false;
56 volatile boolean request = true;
57 volatile Throwable lastError;
58
59 public synchronized void onSubscribe(Subscription s) {
60 threadAssertTrue(sn == null);
61 sn = s;
62 notifyAll();
63 if (throwOnCall)
64 throw new SPException();
65 if (request)
66 sn.request(1L);
67 }
68 public synchronized void onNext(Integer t) {
69 ++nexts;
70 notifyAll();
71 int current = t.intValue();
72 threadAssertTrue(current >= last);
73 last = current;
74 if (request)
75 sn.request(1L);
76 if (throwOnCall)
77 throw new SPException();
78 }
79 public synchronized void onError(Throwable t) {
80 threadAssertTrue(completes == 0);
81 threadAssertTrue(errors == 0);
82 lastError = t;
83 ++errors;
84 notifyAll();
85 }
86 public synchronized void onComplete() {
87 threadAssertTrue(completes == 0);
88 ++completes;
89 notifyAll();
90 }
91
92 synchronized void awaitSubscribe() {
93 while (sn == null) {
94 try {
95 wait();
96 } catch (Exception ex) {
97 threadUnexpectedException(ex);
98 break;
99 }
100 }
101 }
102 synchronized void awaitNext(int n) {
103 while (nexts < n) {
104 try {
105 wait();
106 } catch (Exception ex) {
107 threadUnexpectedException(ex);
108 break;
109 }
110 }
111 }
112 synchronized void awaitComplete() {
113 while (completes == 0 && errors == 0) {
114 try {
115 wait();
116 } catch (Exception ex) {
117 threadUnexpectedException(ex);
118 break;
119 }
120 }
121 }
122 synchronized void awaitError() {
123 while (errors == 0) {
124 try {
125 wait();
126 } catch (Exception ex) {
127 threadUnexpectedException(ex);
128 break;
129 }
130 }
131 }
132
133 }
134
135 /**
136 * A new SubmissionPublisher has no subscribers, a non-null
137 * executor, a power-of-two capacity, is not closed, and reports
138 * zero demand and lag
139 */
140 void checkInitialState(SubmissionPublisher<?> p) {
141 assertFalse(p.hasSubscribers());
142 assertEquals(0, p.getNumberOfSubscribers());
143 assertTrue(p.getSubscribers().isEmpty());
144 assertFalse(p.isClosed());
145 assertNull(p.getClosedException());
146 int n = p.getMaxBufferCapacity();
147 assertTrue((n & (n - 1)) == 0); // power of two
148 assertNotNull(p.getExecutor());
149 assertEquals(0, p.estimateMinimumDemand());
150 assertEquals(0, p.estimateMaximumLag());
151 }
152
153 /**
154 * A default-constructed SubmissionPublisher has no subscribers,
155 * is not closed, has default buffer size, and uses the
156 * defaultExecutor
157 */
158 public void testConstructor1() {
159 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>();
160 checkInitialState(p);
161 assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
162 Executor e = p.getExecutor(), c = ForkJoinPool.commonPool();
163 if (ForkJoinPool.getCommonPoolParallelism() > 1)
164 assertSame(e, c);
165 else
166 assertNotSame(e, c);
167 }
168
169 /**
170 * A new SubmissionPublisher has no subscribers, is not closed,
171 * has the given buffer size, and uses the given executor
172 */
173 public void testConstructor2() {
174 Executor e = Executors.newFixedThreadPool(1);
175 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(e, 8);
176 checkInitialState(p);
177 assertSame(p.getExecutor(), e);
178 assertEquals(8, p.getMaxBufferCapacity());
179 }
180
181 /**
182 * A null Executor argument to SubmissionPublisher constructor throws NPE
183 */
184 public void testConstructor3() {
185 try {
186 new SubmissionPublisher<Integer>(null, 8);
187 shouldThrow();
188 } catch (NullPointerException success) {}
189 }
190
191 /**
192 * A negative capacity argument to SubmissionPublisher constructor
193 * throws IAE
194 */
195 public void testConstructor4() {
196 Executor e = Executors.newFixedThreadPool(1);
197 try {
198 new SubmissionPublisher<Integer>(e, -1);
199 shouldThrow();
200 } catch (IllegalArgumentException success) {}
201 }
202
203 /**
204 * A closed publisher reports isClosed with no closedException and
205 * throws ISE upon attempted submission; a subsequent close or
206 * closeExceptionally has no additional effect.
207 */
208 public void testClose() {
209 SubmissionPublisher<Integer> p = basicPublisher();
210 checkInitialState(p);
211 p.close();
212 assertTrue(p.isClosed());
213 assertNull(p.getClosedException());
214 try {
215 p.submit(1);
216 shouldThrow();
217 } catch (IllegalStateException success) {}
218 Throwable ex = new SPException();
219 p.closeExceptionally(ex);
220 assertTrue(p.isClosed());
221 assertNull(p.getClosedException());
222 }
223
224 /**
225 * A publisher closedExceptionally reports isClosed with the
226 * closedException and throws ISE upon attempted submission; a
227 * subsequent close or closeExceptionally has no additional
228 * effect.
229 */
230 public void testCloseExceptionally() {
231 SubmissionPublisher<Integer> p = basicPublisher();
232 checkInitialState(p);
233 Throwable ex = new SPException();
234 p.closeExceptionally(ex);
235 assertTrue(p.isClosed());
236 assertSame(p.getClosedException(), ex);
237 try {
238 p.submit(1);
239 shouldThrow();
240 } catch (IllegalStateException success) {}
241 p.close();
242 assertTrue(p.isClosed());
243 assertSame(p.getClosedException(), ex);
244 }
245
246 /**
247 * Upon subscription, the subscriber's onSubscribe is called, no
248 * other Subscriber methods are invoked, the publisher
249 * hasSubscribers, isSubscribed is true, and existing
250 * subscriptions are unaffected.
251 */
252 public void testSubscribe1() {
253 TestSubscriber s = new TestSubscriber();
254 SubmissionPublisher<Integer> p = basicPublisher();
255 p.subscribe(s);
256 assertTrue(p.hasSubscribers());
257 assertEquals(1, p.getNumberOfSubscribers());
258 assertTrue(p.getSubscribers().contains(s));
259 assertTrue(p.isSubscribed(s));
260 s.awaitSubscribe();
261 assertNotNull(s.sn);
262 assertEquals(0, s.nexts);
263 assertEquals(0, s.errors);
264 assertEquals(0, s.completes);
265 TestSubscriber s2 = new TestSubscriber();
266 p.subscribe(s2);
267 assertTrue(p.hasSubscribers());
268 assertEquals(2, p.getNumberOfSubscribers());
269 assertTrue(p.getSubscribers().contains(s));
270 assertTrue(p.getSubscribers().contains(s2));
271 assertTrue(p.isSubscribed(s));
272 assertTrue(p.isSubscribed(s2));
273 s2.awaitSubscribe();
274 assertNotNull(s2.sn);
275 assertEquals(0, s2.nexts);
276 assertEquals(0, s2.errors);
277 assertEquals(0, s2.completes);
278 p.close();
279 }
280
281 /**
282 * If closed, upon subscription, the subscriber's onComplete
283 * method is invoked
284 */
285 public void testSubscribe2() {
286 TestSubscriber s = new TestSubscriber();
287 SubmissionPublisher<Integer> p = basicPublisher();
288 p.close();
289 p.subscribe(s);
290 s.awaitComplete();
291 assertEquals(0, s.nexts);
292 assertEquals(0, s.errors);
293 assertEquals(1, s.completes, 1);
294 }
295
296 /**
297 * If closedExceptionally, upon subscription, the subscriber's
298 * onError method is invoked
299 */
300 public void testSubscribe3() {
301 TestSubscriber s = new TestSubscriber();
302 SubmissionPublisher<Integer> p = basicPublisher();
303 Throwable ex = new SPException();
304 p.closeExceptionally(ex);
305 assertTrue(p.isClosed());
306 assertSame(p.getClosedException(), ex);
307 p.subscribe(s);
308 s.awaitError();
309 assertEquals(0, s.nexts);
310 assertEquals(1, s.errors);
311 }
312
313 /**
314 * Upon attempted resubscription, the subscriber's onError is
315 * called and the subscription is cancelled.
316 */
317 public void testSubscribe4() {
318 TestSubscriber s = new TestSubscriber();
319 SubmissionPublisher<Integer> p = basicPublisher();
320 p.subscribe(s);
321 assertTrue(p.hasSubscribers());
322 assertEquals(1, p.getNumberOfSubscribers());
323 assertTrue(p.getSubscribers().contains(s));
324 assertTrue(p.isSubscribed(s));
325 s.awaitSubscribe();
326 assertNotNull(s.sn);
327 assertEquals(0, s.nexts);
328 assertEquals(0, s.errors);
329 assertEquals(0, s.completes);
330 p.subscribe(s);
331 s.awaitError();
332 assertEquals(0, s.nexts);
333 assertEquals(1, s.errors);
334 assertFalse(p.isSubscribed(s));
335 }
336
337 /**
338 * An exception thrown in onSubscribe causes onError
339 */
340 public void testSubscribe5() {
341 TestSubscriber s = new TestSubscriber();
342 SubmissionPublisher<Integer> p = basicPublisher();
343 s.throwOnCall = true;
344 try {
345 p.subscribe(s);
346 } catch (Exception ok) {}
347 s.awaitError();
348 assertEquals(0, s.nexts);
349 assertEquals(1, s.errors);
350 assertEquals(0, s.completes);
351 }
352
353 /**
354 * subscribe(null) throws NPE
355 */
356 public void testSubscribe6() {
357 SubmissionPublisher<Integer> p = basicPublisher();
358 try {
359 p.subscribe(null);
360 shouldThrow();
361 } catch (NullPointerException success) {}
362 checkInitialState(p);
363 }
364
365 /**
366 * Closing a publisher causes onComplete to subscribers
367 */
368 public void testCloseCompletes() {
369 SubmissionPublisher<Integer> p = basicPublisher();
370 TestSubscriber s1 = new TestSubscriber();
371 TestSubscriber s2 = new TestSubscriber();
372 p.subscribe(s1);
373 p.subscribe(s2);
374 p.submit(1);
375 p.close();
376 assertTrue(p.isClosed());
377 assertNull(p.getClosedException());
378 s1.awaitComplete();
379 assertEquals(1, s1.nexts);
380 assertEquals(1, s1.completes);
381 s2.awaitComplete();
382 assertEquals(1, s2.nexts);
383 assertEquals(1, s2.completes);
384 }
385
386 /**
387 * Closing a publisher exceptionally causes onError to subscribers
388 */
389 public void testCloseExceptionallyError() {
390 SubmissionPublisher<Integer> p = basicPublisher();
391 TestSubscriber s1 = new TestSubscriber();
392 TestSubscriber s2 = new TestSubscriber();
393 p.subscribe(s1);
394 p.subscribe(s2);
395 p.submit(1);
396 p.closeExceptionally(new SPException());
397 assertTrue(p.isClosed());
398 s1.awaitError();
399 assertTrue(s1.nexts <= 1);
400 assertEquals(1, s1.errors);
401 s2.awaitError();
402 assertTrue(s2.nexts <= 1);
403 assertEquals(1, s2.errors);
404 }
405
406 /**
407 * Cancelling a subscription eventually causes no more onNexts to be issued
408 */
409 public void testCancel() {
410 SubmissionPublisher<Integer> p = basicPublisher();
411 TestSubscriber s1 = new TestSubscriber();
412 TestSubscriber s2 = new TestSubscriber();
413 p.subscribe(s1);
414 p.subscribe(s2);
415 s1.awaitSubscribe();
416 p.submit(1);
417 s1.sn.cancel();
418 for (int i = 2; i <= 20; ++i)
419 p.submit(i);
420 p.close();
421 s2.awaitComplete();
422 assertEquals(20, s2.nexts);
423 assertEquals(1, s2.completes);
424 assertTrue(s1.nexts < 20);
425 assertFalse(p.isSubscribed(s1));
426 }
427
428 /**
429 * Throwing an exception in onNext causes onError
430 */
431 public void testThrowOnNext() {
432 SubmissionPublisher<Integer> p = basicPublisher();
433 TestSubscriber s1 = new TestSubscriber();
434 TestSubscriber s2 = new TestSubscriber();
435 p.subscribe(s1);
436 p.subscribe(s2);
437 s1.awaitSubscribe();
438 p.submit(1);
439 s1.throwOnCall = true;
440 p.submit(2);
441 p.close();
442 s2.awaitComplete();
443 assertEquals(2, s2.nexts);
444 s1.awaitComplete();
445 assertEquals(1, s1.errors);
446 }
447
448 /**
449 * If a handler is supplied in constructor, it is invoked when
450 * subscriber throws an exception in onNext
451 */
452 public void testThrowOnNextHandler() {
453 AtomicInteger calls = new AtomicInteger();
454 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>
455 (basicExecutor, 8,
456 (s, e) -> calls.getAndIncrement());
457 TestSubscriber s1 = new TestSubscriber();
458 TestSubscriber s2 = new TestSubscriber();
459 p.subscribe(s1);
460 p.subscribe(s2);
461 s1.awaitSubscribe();
462 p.submit(1);
463 s1.throwOnCall = true;
464 p.submit(2);
465 p.close();
466 s2.awaitComplete();
467 assertEquals(2, s2.nexts);
468 assertEquals(1, s2.completes);
469 s1.awaitError();
470 assertEquals(1, s1.errors);
471 assertEquals(1, calls.get());
472 }
473
474 /**
475 * onNext items are issued in the same order to each subscriber
476 */
477 public void testOrder() {
478 SubmissionPublisher<Integer> p = basicPublisher();
479 TestSubscriber s1 = new TestSubscriber();
480 TestSubscriber s2 = new TestSubscriber();
481 p.subscribe(s1);
482 p.subscribe(s2);
483 for (int i = 1; i <= 20; ++i)
484 p.submit(i);
485 p.close();
486 s2.awaitComplete();
487 s1.awaitComplete();
488 assertEquals(20, s2.nexts);
489 assertEquals(1, s2.completes);
490 assertEquals(20, s1.nexts);
491 assertEquals(1, s1.completes);
492 }
493
494 /**
495 * onNext is issued only if requested
496 */
497 public void testRequest1() {
498 SubmissionPublisher<Integer> p = basicPublisher();
499 TestSubscriber s1 = new TestSubscriber();
500 s1.request = false;
501 p.subscribe(s1);
502 s1.awaitSubscribe();
503 assertTrue(p.estimateMinimumDemand() == 0);
504 TestSubscriber s2 = new TestSubscriber();
505 p.subscribe(s2);
506 p.submit(1);
507 p.submit(2);
508 s2.awaitNext(1);
509 assertEquals(0, s1.nexts);
510 s1.sn.request(3);
511 p.submit(3);
512 p.close();
513 s2.awaitComplete();
514 assertEquals(3, s2.nexts);
515 assertEquals(1, s2.completes);
516 s1.awaitComplete();
517 assertTrue(s1.nexts > 0);
518 assertEquals(1, s1.completes);
519 }
520
521 /**
522 * onNext is not issued when requests become zero
523 */
524 public void testRequest2() {
525 SubmissionPublisher<Integer> p = basicPublisher();
526 TestSubscriber s1 = new TestSubscriber();
527 TestSubscriber s2 = new TestSubscriber();
528 p.subscribe(s1);
529 p.subscribe(s2);
530 s2.awaitSubscribe();
531 s1.awaitSubscribe();
532 s1.request = false;
533 p.submit(1);
534 p.submit(2);
535 p.close();
536 s2.awaitComplete();
537 assertEquals(2, s2.nexts);
538 assertEquals(1, s2.completes);
539 s1.awaitNext(1);
540 assertEquals(1, s1.nexts);
541 }
542
543 /**
544 * Negative request causes error
545 */
546 public void testRequest3() {
547 SubmissionPublisher<Integer> p = basicPublisher();
548 TestSubscriber s1 = new TestSubscriber();
549 TestSubscriber s2 = new TestSubscriber();
550 p.subscribe(s1);
551 p.subscribe(s2);
552 s2.awaitSubscribe();
553 s1.awaitSubscribe();
554 s1.sn.request(-1L);
555 p.submit(1);
556 p.submit(2);
557 p.close();
558 s2.awaitComplete();
559 assertEquals(2, s2.nexts);
560 assertEquals(1, s2.completes);
561 s1.awaitError();
562 assertEquals(1, s1.errors);
563 assertTrue(s1.lastError instanceof IllegalArgumentException);
564 }
565
566 /**
567 * estimateMinimumDemand reports 0 until request, nonzero after
568 * request, and zero again after delivery
569 */
570 public void testEstimateMinimumDemand() {
571 TestSubscriber s = new TestSubscriber();
572 SubmissionPublisher<Integer> p = basicPublisher();
573 s.request = false;
574 p.subscribe(s);
575 s.awaitSubscribe();
576 assertEquals(0, p.estimateMinimumDemand());
577 s.sn.request(1);
578 assertEquals(1, p.estimateMinimumDemand());
579 p.submit(1);
580 s.awaitNext(1);
581 assertEquals(0, p.estimateMinimumDemand());
582 }
583
584 /**
585 * submit to a publisher with no subscribers returns lag 0
586 */
587 public void testEmptySubmit() {
588 SubmissionPublisher<Integer> p = basicPublisher();
589 assertEquals(0, p.submit(1));
590 }
591
592 /**
593 * submit(null) throws NPE
594 */
595 public void testNullSubmit() {
596 SubmissionPublisher<Integer> p = basicPublisher();
597 try {
598 p.submit(null);
599 shouldThrow();
600 } catch (NullPointerException success) {}
601 }
602
603 /**
604 * submit returns number of lagged items, compatible with result
605 * of estimateMaximumLag.
606 */
607 public void testLaggedSubmit() {
608 SubmissionPublisher<Integer> p = basicPublisher();
609 TestSubscriber s1 = new TestSubscriber();
610 s1.request = false;
611 TestSubscriber s2 = new TestSubscriber();
612 s2.request = false;
613 p.subscribe(s1);
614 p.subscribe(s2);
615 s2.awaitSubscribe();
616 s1.awaitSubscribe();
617 assertEquals(1, p.submit(1));
618 assertTrue(p.estimateMaximumLag() >= 1);
619 assertTrue(p.submit(2) >= 2);
620 assertTrue(p.estimateMaximumLag() >= 2);
621 s1.sn.request(4);
622 assertTrue(p.submit(3) >= 3);
623 assertTrue(p.estimateMaximumLag() >= 3);
624 s2.sn.request(4);
625 p.submit(4);
626 p.close();
627 s2.awaitComplete();
628 assertEquals(4, s2.nexts);
629 s1.awaitComplete();
630 assertEquals(4, s2.nexts);
631 }
632
633 /**
634 * submit eventually issues requested items when buffer capacity is 1
635 */
636 public void testCap1Submit() {
637 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
638 basicExecutor, 1);
639 TestSubscriber s1 = new TestSubscriber();
640 TestSubscriber s2 = new TestSubscriber();
641 p.subscribe(s1);
642 p.subscribe(s2);
643 for (int i = 1; i <= 20; ++i) {
644 assertTrue(p.estimateMinimumDemand() <= 1);
645 assertTrue(p.submit(i) >= 0);
646 }
647 p.close();
648 s2.awaitComplete();
649 s1.awaitComplete();
650 assertEquals(20, s2.nexts);
651 assertEquals(1, s2.completes);
652 assertEquals(20, s1.nexts);
653 assertEquals(1, s1.completes);
654 }
655
656 static boolean noopHandle(AtomicInteger count) {
657 count.getAndIncrement();
658 return false;
659 }
660
661 static boolean reqHandle(AtomicInteger count, Subscriber s) {
662 count.getAndIncrement();
663 ((TestSubscriber)s).sn.request(Long.MAX_VALUE);
664 return true;
665 }
666
667 /**
668 * offer to a publisher with no subscribers returns lag 0
669 */
670 public void testEmptyOffer() {
671 SubmissionPublisher<Integer> p = basicPublisher();
672 assertEquals(0, p.offer(1, null));
673 }
674
675 /**
676 * offer(null) throws NPE
677 */
678 public void testNullOffer() {
679 SubmissionPublisher<Integer> p = basicPublisher();
680 try {
681 p.offer(null, null);
682 shouldThrow();
683 } catch (NullPointerException success) {}
684 }
685
686 /**
687 * offer returns number of lagged items if not saturated
688 */
689 public void testLaggedOffer() {
690 SubmissionPublisher<Integer> p = basicPublisher();
691 TestSubscriber s1 = new TestSubscriber();
692 s1.request = false;
693 TestSubscriber s2 = new TestSubscriber();
694 s2.request = false;
695 p.subscribe(s1);
696 p.subscribe(s2);
697 s2.awaitSubscribe();
698 s1.awaitSubscribe();
699 assertTrue(p.offer(1, null) >= 1);
700 assertTrue(p.offer(2, null) >= 2);
701 s1.sn.request(4);
702 assertTrue(p.offer(3, null) >= 3);
703 s2.sn.request(4);
704 p.offer(4, null);
705 p.close();
706 s2.awaitComplete();
707 assertEquals(4, s2.nexts);
708 s1.awaitComplete();
709 assertEquals(4, s2.nexts);
710 }
711
712 /**
713 * offer reports drops if saturated
714 */
715 public void testDroppedOffer() {
716 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
717 basicExecutor, 4);
718 TestSubscriber s1 = new TestSubscriber();
719 s1.request = false;
720 TestSubscriber s2 = new TestSubscriber();
721 s2.request = false;
722 p.subscribe(s1);
723 p.subscribe(s2);
724 s2.awaitSubscribe();
725 s1.awaitSubscribe();
726 for (int i = 1; i <= 4; ++i)
727 assertTrue(p.offer(i, null) >= 0);
728 p.offer(5, null);
729 assertTrue(p.offer(6, null) < 0);
730 s1.sn.request(64);
731 assertTrue(p.offer(7, null) < 0);
732 s2.sn.request(64);
733 p.close();
734 s2.awaitComplete();
735 assertTrue(s2.nexts >= 4);
736 s1.awaitComplete();
737 assertTrue(s1.nexts >= 4);
738 }
739
740 /**
741 * offer invokes drop handler if saturated
742 */
743 public void testHandledDroppedOffer() {
744 AtomicInteger calls = new AtomicInteger();
745 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
746 basicExecutor, 4);
747 TestSubscriber s1 = new TestSubscriber();
748 s1.request = false;
749 TestSubscriber s2 = new TestSubscriber();
750 s2.request = false;
751 p.subscribe(s1);
752 p.subscribe(s2);
753 s2.awaitSubscribe();
754 s1.awaitSubscribe();
755 for (int i = 1; i <= 4; ++i)
756 assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
757 p.offer(4, (s, x) -> noopHandle(calls));
758 assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
759 s1.sn.request(64);
760 assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
761 s2.sn.request(64);
762 p.close();
763 s2.awaitComplete();
764 s1.awaitComplete();
765 assertTrue(calls.get() >= 4);
766 }
767
768 /**
769 * offer succeeds if drop handler forces request
770 */
771 public void testRecoveredHandledDroppedOffer() {
772 AtomicInteger calls = new AtomicInteger();
773 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
774 basicExecutor, 4);
775 TestSubscriber s1 = new TestSubscriber();
776 s1.request = false;
777 TestSubscriber s2 = new TestSubscriber();
778 s2.request = false;
779 p.subscribe(s1);
780 p.subscribe(s2);
781 s2.awaitSubscribe();
782 s1.awaitSubscribe();
783 int n = 0;
784 for (int i = 1; i <= 8; ++i) {
785 int d = p.offer(i, (s, x) -> reqHandle(calls, s));
786 n = n + 2 + (d < 0 ? d : 0);
787 }
788 p.close();
789 s2.awaitComplete();
790 s1.awaitComplete();
791 assertEquals(n, s1.nexts + s2.nexts);
792 assertTrue(calls.get() >= 2);
793 }
794
795 /**
796 * Timed offer to a publisher with no subscribers returns lag 0
797 */
798 public void testEmptyTimedOffer() {
799 SubmissionPublisher<Integer> p = basicPublisher();
800 long startTime = System.nanoTime();
801 assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null));
802 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
803 }
804
805 /**
806 * Timed offer with null item or TimeUnit throws NPE
807 */
808 public void testNullTimedOffer() {
809 SubmissionPublisher<Integer> p = basicPublisher();
810 long startTime = System.nanoTime();
811 try {
812 p.offer(null, LONG_DELAY_MS, MILLISECONDS, null);
813 shouldThrow();
814 } catch (NullPointerException success) {}
815 try {
816 p.offer(1, LONG_DELAY_MS, null, null);
817 shouldThrow();
818 } catch (NullPointerException success) {}
819 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
820 }
821
822 /**
823 * Timed offer returns number of lagged items if not saturated
824 */
825 public void testLaggedTimedOffer() {
826 SubmissionPublisher<Integer> p = basicPublisher();
827 TestSubscriber s1 = new TestSubscriber();
828 s1.request = false;
829 TestSubscriber s2 = new TestSubscriber();
830 s2.request = false;
831 p.subscribe(s1);
832 p.subscribe(s2);
833 s2.awaitSubscribe();
834 s1.awaitSubscribe();
835 long startTime = System.nanoTime();
836 assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
837 assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
838 s1.sn.request(4);
839 assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
840 s2.sn.request(4);
841 p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
842 p.close();
843 s2.awaitComplete();
844 assertEquals(4, s2.nexts);
845 s1.awaitComplete();
846 assertEquals(4, s2.nexts);
847 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
848 }
849
850 /**
851 * Timed offer reports drops if saturated
852 */
853 public void testDroppedTimedOffer() {
854 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
855 basicExecutor, 4);
856 TestSubscriber s1 = new TestSubscriber();
857 s1.request = false;
858 TestSubscriber s2 = new TestSubscriber();
859 s2.request = false;
860 p.subscribe(s1);
861 p.subscribe(s2);
862 s2.awaitSubscribe();
863 s1.awaitSubscribe();
864 long delay = timeoutMillis();
865 for (int i = 1; i <= 4; ++i)
866 assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0);
867 long startTime = System.nanoTime();
868 assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0);
869 s1.sn.request(64);
870 assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0);
871 // 2 * delay should elapse but check only 1 * delay to allow timer slop
872 assertTrue(millisElapsedSince(startTime) >= delay);
873 s2.sn.request(64);
874 p.close();
875 s2.awaitComplete();
876 assertTrue(s2.nexts >= 2);
877 s1.awaitComplete();
878 assertTrue(s1.nexts >= 2);
879 }
880
881 /**
882 * Timed offer invokes drop handler if saturated
883 */
884 public void testHandledDroppedTimedOffer() {
885 AtomicInteger calls = new AtomicInteger();
886 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
887 basicExecutor, 4);
888 TestSubscriber s1 = new TestSubscriber();
889 s1.request = false;
890 TestSubscriber s2 = new TestSubscriber();
891 s2.request = false;
892 p.subscribe(s1);
893 p.subscribe(s2);
894 s2.awaitSubscribe();
895 s1.awaitSubscribe();
896 long delay = timeoutMillis();
897 for (int i = 1; i <= 4; ++i)
898 assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
899 long startTime = System.nanoTime();
900 assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
901 s1.sn.request(64);
902 assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
903 assertTrue(millisElapsedSince(startTime) >= delay);
904 s2.sn.request(64);
905 p.close();
906 s2.awaitComplete();
907 s1.awaitComplete();
908 assertTrue(calls.get() >= 2);
909 }
910
911 /**
912 * Timed offer succeeds if drop handler forces request
913 */
914 public void testRecoveredHandledDroppedTimedOffer() {
915 AtomicInteger calls = new AtomicInteger();
916 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
917 basicExecutor, 4);
918 TestSubscriber s1 = new TestSubscriber();
919 s1.request = false;
920 TestSubscriber s2 = new TestSubscriber();
921 s2.request = false;
922 p.subscribe(s1);
923 p.subscribe(s2);
924 s2.awaitSubscribe();
925 s1.awaitSubscribe();
926 int n = 0;
927 long delay = timeoutMillis();
928 long startTime = System.nanoTime();
929 for (int i = 1; i <= 6; ++i) {
930 int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
931 n = n + 2 + (d < 0 ? d : 0);
932 }
933 assertTrue(millisElapsedSince(startTime) >= delay);
934 p.close();
935 s2.awaitComplete();
936 s1.awaitComplete();
937 assertEquals(n, s1.nexts + s2.nexts);
938 assertTrue(calls.get() >= 2);
939 }
940
941 /**
942 * consume returns a CompletableFuture that is done when
943 * publisher completes
944 */
945 public void testConsume() {
946 AtomicInteger sum = new AtomicInteger();
947 SubmissionPublisher<Integer> p = basicPublisher();
948 CompletableFuture<Void> f =
949 p.consume((Integer x) -> { sum.getAndAdd(x.intValue()); });
950 int n = 20;
951 for (int i = 1; i <= n; ++i)
952 p.submit(i);
953 p.close();
954 f.join();
955 assertEquals((n * (n + 1)) / 2, sum.get());
956 }
957
958 /**
959 * consume(null) throws NPE
960 */
961 public void testConsumeNPE() {
962 SubmissionPublisher<Integer> p = basicPublisher();
963 try {
964 CompletableFuture<Void> f = p.consume(null);
965 shouldThrow();
966 } catch (NullPointerException success) {}
967 }
968
969 /**
970 * consume eventually stops processing published items if cancelled
971 */
972 public void testCancelledConsume() {
973 AtomicInteger count = new AtomicInteger();
974 SubmissionPublisher<Integer> p = basicPublisher();
975 CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
976 f.cancel(true);
977 int n = 1000000; // arbitrary limit
978 for (int i = 1; i <= n; ++i)
979 p.submit(i);
980 assertTrue(count.get() < n);
981 }
982
983 }