ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SubmissionPublisherTest.java
Revision: 1.10
Committed: Sat Sep 12 11:25:15 2015 UTC (8 years, 7 months ago) by dl
Branch: MAIN
Changes since 1.9: +53 -9 lines
Log Message:
Remove Flow.stream for now; move consume to SubmissionPublisher

File Contents

# Content
1 /*
2 * Written by Doug Lea and Martin Buchholz with assistance from
3 * members of JCP JSR-166 Expert Group and released to the public
4 * domain, as explained at
5 * http://creativecommons.org/publicdomain/zero/1.0/
6 */
7
8 import java.util.concurrent.CompletableFuture;
9 import java.util.concurrent.Executor;
10 import java.util.concurrent.Executors;
11 import java.util.concurrent.Flow;
12 import java.util.concurrent.ForkJoinPool;
13 import java.util.concurrent.LinkedBlockingQueue;
14 import java.util.concurrent.SubmissionPublisher;
15 import java.util.concurrent.ThreadFactory;
16 import java.util.concurrent.ThreadPoolExecutor;
17 import java.util.concurrent.TimeUnit;
18 import java.util.concurrent.atomic.AtomicInteger;
19 import java.util.function.BiConsumer;
20 import java.util.function.BiFunction;
21 import java.util.function.BiPredicate;
22 import java.util.stream.Stream;
23 import junit.framework.Test;
24 import junit.framework.TestSuite;
25
26 import static java.util.concurrent.Flow.Publisher;
27 import static java.util.concurrent.Flow.Subscriber;
28 import static java.util.concurrent.Flow.Subscription;
29 import static java.util.concurrent.TimeUnit.MILLISECONDS;
30 import static java.util.concurrent.TimeUnit.SECONDS;
31
32 public class SubmissionPublisherTest extends JSR166TestCase {
33
34 public static void main(String[] args) {
35 main(suite(), args);
36 }
37 public static Test suite() {
38 return new TestSuite(SubmissionPublisherTest.class);
39 }
40
41 // Factory for single thread pool in case commonPool parallelism is zero
42 static final class DaemonThreadFactory implements ThreadFactory {
43 public Thread newThread(Runnable r) {
44 Thread t = new Thread(r);
45 t.setDaemon(true);
46 return t;
47 }
48 }
49
50 static final Executor basicExecutor =
51 (ForkJoinPool.getCommonPoolParallelism() > 1) ?
52 ForkJoinPool.commonPool() :
53 new ThreadPoolExecutor(1, 1, 60, SECONDS,
54 new LinkedBlockingQueue<Runnable>(),
55 new DaemonThreadFactory());
56
57 static SubmissionPublisher<Integer> basicPublisher() {
58 return new SubmissionPublisher<Integer>(basicExecutor,
59 Flow.defaultBufferSize());
60 }
61
62 static class SPException extends RuntimeException {}
63
64 class TestSubscriber implements Subscriber<Integer> {
65 volatile Subscription sn;
66 int last; // Requires that onNexts are in numeric order
67 volatile int nexts;
68 volatile int errors;
69 volatile int completes;
70 volatile boolean throwOnCall = false;
71 volatile boolean request = true;
72 volatile Throwable lastError;
73
74 public synchronized void onSubscribe(Subscription s) {
75 threadAssertTrue(sn == null);
76 sn = s;
77 notifyAll();
78 if (throwOnCall)
79 throw new SPException();
80 if (request)
81 sn.request(1L);
82 }
83 public synchronized void onNext(Integer t) {
84 ++nexts;
85 notifyAll();
86 int current = t.intValue();
87 threadAssertTrue(current >= last);
88 last = current;
89 if (request)
90 sn.request(1L);
91 if (throwOnCall)
92 throw new SPException();
93 }
94 public synchronized void onError(Throwable t) {
95 threadAssertTrue(completes == 0);
96 threadAssertTrue(errors == 0);
97 lastError = t;
98 ++errors;
99 notifyAll();
100 }
101 public synchronized void onComplete() {
102 threadAssertTrue(completes == 0);
103 ++completes;
104 notifyAll();
105 }
106
107 synchronized void awaitSubscribe() {
108 while (sn == null) {
109 try {
110 wait();
111 } catch (Exception ex) {
112 threadUnexpectedException(ex);
113 break;
114 }
115 }
116 }
117 synchronized void awaitNext(int n) {
118 while (nexts < n) {
119 try {
120 wait();
121 } catch (Exception ex) {
122 threadUnexpectedException(ex);
123 break;
124 }
125 }
126 }
127 synchronized void awaitComplete() {
128 while (completes == 0 && errors == 0) {
129 try {
130 wait();
131 } catch (Exception ex) {
132 threadUnexpectedException(ex);
133 break;
134 }
135 }
136 }
137 synchronized void awaitError() {
138 while (errors == 0) {
139 try {
140 wait();
141 } catch (Exception ex) {
142 threadUnexpectedException(ex);
143 break;
144 }
145 }
146 }
147
148 }
149
150 /**
151 * A new SubmissionPublisher has no subscribers, a non-null
152 * executor, a power-of-two capacity, is not closed, and reports
153 * zero demand and lag
154 */
155 void checkInitialState(SubmissionPublisher<?> p) {
156 assertFalse(p.hasSubscribers());
157 assertEquals(0, p.getNumberOfSubscribers());
158 assertTrue(p.getSubscribers().isEmpty());
159 assertFalse(p.isClosed());
160 assertNull(p.getClosedException());
161 int n = p.getMaxBufferCapacity();
162 assertTrue((n & (n - 1)) == 0); // power of two
163 assertNotNull(p.getExecutor());
164 assertEquals(0, p.estimateMinimumDemand());
165 assertEquals(0, p.estimateMaximumLag());
166 }
167
168 /**
169 * A default-constructed SubmissionPublisher has no subscribers,
170 * is not closed, has default buffer size, and uses the
171 * ForkJoinPool.commonPool executor
172 */
173 public void testConstructor1() {
174 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>();
175 checkInitialState(p);
176 assertSame(p.getExecutor(), ForkJoinPool.commonPool());
177 assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
178 }
179
180 /**
181 * A new SubmissionPublisher has no subscribers, is not closed,
182 * has the given buffer size, and uses the given executor
183 */
184 public void testConstructor2() {
185 Executor e = Executors.newFixedThreadPool(1);
186 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(e, 8);
187 checkInitialState(p);
188 assertSame(p.getExecutor(), e);
189 assertEquals(8, p.getMaxBufferCapacity());
190 }
191
192 /**
193 * A null Executor argument to SubmissionPublisher constructor throws NPE
194 */
195 public void testConstructor3() {
196 try {
197 new SubmissionPublisher<Integer>(null, 8);
198 shouldThrow();
199 } catch (NullPointerException success) {}
200 }
201
202 /**
203 * A negative capacity argument to SubmissionPublisher constructor
204 * throws IAE
205 */
206 public void testConstructor4() {
207 Executor e = Executors.newFixedThreadPool(1);
208 try {
209 new SubmissionPublisher<Integer>(e, -1);
210 shouldThrow();
211 } catch (IllegalArgumentException success) {}
212 }
213
214 /**
215 * A closed publisher reports isClosed with no closedException and
216 * throws ISE upon attempted submission; a subsequent close or
217 * closeExceptionally has no additional effect.
218 */
219 public void testClose() {
220 SubmissionPublisher<Integer> p = basicPublisher();
221 checkInitialState(p);
222 p.close();
223 assertTrue(p.isClosed());
224 assertNull(p.getClosedException());
225 try {
226 p.submit(1);
227 shouldThrow();
228 } catch (IllegalStateException success) {}
229 Throwable ex = new SPException();
230 p.closeExceptionally(ex);
231 assertTrue(p.isClosed());
232 assertNull(p.getClosedException());
233 }
234
235 /**
236 * A publisher closedExceptionally reports isClosed with the
237 * closedException and throws ISE upon attempted submission; a
238 * subsequent close or closeExceptionally has no additional
239 * effect.
240 */
241 public void testCloseExceptionally() {
242 SubmissionPublisher<Integer> p = basicPublisher();
243 checkInitialState(p);
244 Throwable ex = new SPException();
245 p.closeExceptionally(ex);
246 assertTrue(p.isClosed());
247 assertSame(p.getClosedException(), ex);
248 try {
249 p.submit(1);
250 shouldThrow();
251 } catch (IllegalStateException success) {}
252 p.close();
253 assertTrue(p.isClosed());
254 assertSame(p.getClosedException(), ex);
255 }
256
257 /**
258 * Upon subscription, the subscriber's onSubscribe is called, no
259 * other Subscriber methods are invoked, the publisher
260 * hasSubscribers, isSubscribed is true, and existing
261 * subscriptions are unaffected.
262 */
263 public void testSubscribe1() {
264 TestSubscriber s = new TestSubscriber();
265 SubmissionPublisher<Integer> p = basicPublisher();
266 p.subscribe(s);
267 assertTrue(p.hasSubscribers());
268 assertEquals(1, p.getNumberOfSubscribers());
269 assertTrue(p.getSubscribers().contains(s));
270 assertTrue(p.isSubscribed(s));
271 s.awaitSubscribe();
272 assertNotNull(s.sn);
273 assertEquals(0, s.nexts);
274 assertEquals(0, s.errors);
275 assertEquals(0, s.completes);
276 TestSubscriber s2 = new TestSubscriber();
277 p.subscribe(s2);
278 assertTrue(p.hasSubscribers());
279 assertEquals(2, p.getNumberOfSubscribers());
280 assertTrue(p.getSubscribers().contains(s));
281 assertTrue(p.getSubscribers().contains(s2));
282 assertTrue(p.isSubscribed(s));
283 assertTrue(p.isSubscribed(s2));
284 s2.awaitSubscribe();
285 assertNotNull(s2.sn);
286 assertEquals(0, s2.nexts);
287 assertEquals(0, s2.errors);
288 assertEquals(0, s2.completes);
289 p.close();
290 }
291
292 /**
293 * If closed, upon subscription, the subscriber's onComplete
294 * method is invoked
295 */
296 public void testSubscribe2() {
297 TestSubscriber s = new TestSubscriber();
298 SubmissionPublisher<Integer> p = basicPublisher();
299 p.close();
300 p.subscribe(s);
301 s.awaitComplete();
302 assertEquals(0, s.nexts);
303 assertEquals(0, s.errors);
304 assertEquals(1, s.completes, 1);
305 }
306
307 /**
308 * If closedExceptionally, upon subscription, the subscriber's
309 * onError method is invoked
310 */
311 public void testSubscribe3() {
312 TestSubscriber s = new TestSubscriber();
313 SubmissionPublisher<Integer> p = basicPublisher();
314 Throwable ex = new SPException();
315 p.closeExceptionally(ex);
316 assertTrue(p.isClosed());
317 assertSame(p.getClosedException(), ex);
318 p.subscribe(s);
319 s.awaitError();
320 assertEquals(0, s.nexts);
321 assertEquals(1, s.errors);
322 }
323
324 /**
325 * Upon attempted resubscription, the subscriber's onError is
326 * called and the subscription is cancelled.
327 */
328 public void testSubscribe4() {
329 TestSubscriber s = new TestSubscriber();
330 SubmissionPublisher<Integer> p = basicPublisher();
331 p.subscribe(s);
332 assertTrue(p.hasSubscribers());
333 assertEquals(1, p.getNumberOfSubscribers());
334 assertTrue(p.getSubscribers().contains(s));
335 assertTrue(p.isSubscribed(s));
336 s.awaitSubscribe();
337 assertNotNull(s.sn);
338 assertEquals(0, s.nexts);
339 assertEquals(0, s.errors);
340 assertEquals(0, s.completes);
341 p.subscribe(s);
342 s.awaitError();
343 assertEquals(0, s.nexts);
344 assertEquals(1, s.errors);
345 assertFalse(p.isSubscribed(s));
346 }
347
348 /**
349 * An exception thrown in onSubscribe causes onError
350 */
351 public void testSubscribe5() {
352 TestSubscriber s = new TestSubscriber();
353 SubmissionPublisher<Integer> p = basicPublisher();
354 s.throwOnCall = true;
355 try {
356 p.subscribe(s);
357 } catch (Exception ok) {}
358 s.awaitError();
359 assertEquals(0, s.nexts);
360 assertEquals(1, s.errors);
361 assertEquals(0, s.completes);
362 }
363
364 /**
365 * subscribe(null) throws NPE
366 */
367 public void testSubscribe6() {
368 SubmissionPublisher<Integer> p = basicPublisher();
369 try {
370 p.subscribe(null);
371 shouldThrow();
372 } catch (NullPointerException success) {}
373 checkInitialState(p);
374 }
375
376 /**
377 * Closing a publisher causes onComplete to subscribers
378 */
379 public void testCloseCompletes() {
380 SubmissionPublisher<Integer> p = basicPublisher();
381 TestSubscriber s1 = new TestSubscriber();
382 TestSubscriber s2 = new TestSubscriber();
383 p.subscribe(s1);
384 p.subscribe(s2);
385 p.submit(1);
386 p.close();
387 assertTrue(p.isClosed());
388 assertNull(p.getClosedException());
389 s1.awaitComplete();
390 assertEquals(1, s1.nexts);
391 assertEquals(1, s1.completes);
392 s2.awaitComplete();
393 assertEquals(1, s2.nexts);
394 assertEquals(1, s2.completes);
395 }
396
397 /**
398 * Closing a publisher exceptionally causes onError to subscribers
399 */
400 public void testCloseExceptionallyError() {
401 SubmissionPublisher<Integer> p = basicPublisher();
402 TestSubscriber s1 = new TestSubscriber();
403 TestSubscriber s2 = new TestSubscriber();
404 p.subscribe(s1);
405 p.subscribe(s2);
406 p.submit(1);
407 p.closeExceptionally(new SPException());
408 assertTrue(p.isClosed());
409 s1.awaitError();
410 assertTrue(s1.nexts <= 1);
411 assertEquals(1, s1.errors);
412 s2.awaitError();
413 assertTrue(s2.nexts <= 1);
414 assertEquals(1, s2.errors);
415 }
416
417 /**
418 * Cancelling a subscription eventually causes no more onNexts to be issued
419 */
420 public void testCancel() {
421 SubmissionPublisher<Integer> p = basicPublisher();
422 TestSubscriber s1 = new TestSubscriber();
423 TestSubscriber s2 = new TestSubscriber();
424 p.subscribe(s1);
425 p.subscribe(s2);
426 s1.awaitSubscribe();
427 p.submit(1);
428 s1.sn.cancel();
429 for (int i = 2; i <= 20; ++i)
430 p.submit(i);
431 p.close();
432 s2.awaitComplete();
433 assertEquals(20, s2.nexts);
434 assertEquals(1, s2.completes);
435 assertTrue(s1.nexts < 20);
436 assertFalse(p.isSubscribed(s1));
437 }
438
439 /**
440 * Throwing an exception in onNext causes onError
441 */
442 public void testThrowOnNext() {
443 SubmissionPublisher<Integer> p = basicPublisher();
444 TestSubscriber s1 = new TestSubscriber();
445 TestSubscriber s2 = new TestSubscriber();
446 p.subscribe(s1);
447 p.subscribe(s2);
448 s1.awaitSubscribe();
449 p.submit(1);
450 s1.throwOnCall = true;
451 p.submit(2);
452 p.close();
453 s2.awaitComplete();
454 assertEquals(2, s2.nexts);
455 s1.awaitComplete();
456 assertEquals(1, s1.errors);
457 }
458
459 /**
460 * If a handler is supplied in constructor, it is invoked when
461 * subscriber throws an exception in onNext
462 */
463 public void testThrowOnNextHandler() {
464 AtomicInteger calls = new AtomicInteger();
465 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>
466 (basicExecutor, 8,
467 (s, e) -> calls.getAndIncrement());
468 TestSubscriber s1 = new TestSubscriber();
469 TestSubscriber s2 = new TestSubscriber();
470 p.subscribe(s1);
471 p.subscribe(s2);
472 s1.awaitSubscribe();
473 p.submit(1);
474 s1.throwOnCall = true;
475 p.submit(2);
476 p.close();
477 s2.awaitComplete();
478 assertEquals(2, s2.nexts);
479 assertEquals(1, s2.completes);
480 s1.awaitError();
481 assertEquals(1, s1.errors);
482 assertEquals(1, calls.get());
483 }
484
485 /**
486 * onNext items are issued in the same order to each subscriber
487 */
488 public void testOrder() {
489 SubmissionPublisher<Integer> p = basicPublisher();
490 TestSubscriber s1 = new TestSubscriber();
491 TestSubscriber s2 = new TestSubscriber();
492 p.subscribe(s1);
493 p.subscribe(s2);
494 for (int i = 1; i <= 20; ++i)
495 p.submit(i);
496 p.close();
497 s2.awaitComplete();
498 s1.awaitComplete();
499 assertEquals(20, s2.nexts);
500 assertEquals(1, s2.completes);
501 assertEquals(20, s1.nexts);
502 assertEquals(1, s1.completes);
503 }
504
505 /**
506 * onNext is issued only if requested
507 */
508 public void testRequest1() {
509 SubmissionPublisher<Integer> p = basicPublisher();
510 TestSubscriber s1 = new TestSubscriber();
511 s1.request = false;
512 p.subscribe(s1);
513 s1.awaitSubscribe();
514 assertTrue(p.estimateMinimumDemand() == 0);
515 TestSubscriber s2 = new TestSubscriber();
516 p.subscribe(s2);
517 p.submit(1);
518 p.submit(2);
519 s2.awaitNext(1);
520 assertEquals(0, s1.nexts);
521 s1.sn.request(3);
522 p.submit(3);
523 p.close();
524 s2.awaitComplete();
525 assertEquals(3, s2.nexts);
526 assertEquals(1, s2.completes);
527 s1.awaitComplete();
528 assertTrue(s1.nexts > 0);
529 assertEquals(1, s1.completes);
530 }
531
532 /**
533 * onNext is not issued when requests become zero
534 */
535 public void testRequest2() {
536 SubmissionPublisher<Integer> p = basicPublisher();
537 TestSubscriber s1 = new TestSubscriber();
538 TestSubscriber s2 = new TestSubscriber();
539 p.subscribe(s1);
540 p.subscribe(s2);
541 s2.awaitSubscribe();
542 s1.awaitSubscribe();
543 s1.request = false;
544 p.submit(1);
545 p.submit(2);
546 p.close();
547 s2.awaitComplete();
548 assertEquals(2, s2.nexts);
549 assertEquals(1, s2.completes);
550 s1.awaitNext(1);
551 assertEquals(1, s1.nexts);
552 }
553
554 /**
555 * Negative request causes error
556 */
557 public void testRequest3() {
558 SubmissionPublisher<Integer> p = basicPublisher();
559 TestSubscriber s1 = new TestSubscriber();
560 TestSubscriber s2 = new TestSubscriber();
561 p.subscribe(s1);
562 p.subscribe(s2);
563 s2.awaitSubscribe();
564 s1.awaitSubscribe();
565 s1.sn.request(-1L);
566 p.submit(1);
567 p.submit(2);
568 p.close();
569 s2.awaitComplete();
570 assertEquals(2, s2.nexts);
571 assertEquals(1, s2.completes);
572 s1.awaitError();
573 assertEquals(1, s1.errors);
574 assertTrue(s1.lastError instanceof IllegalArgumentException);
575 }
576
577 /**
578 * estimateMinimumDemand reports 0 until request, nonzero after
579 * request, and zero again after delivery
580 */
581 public void testEstimateMinimumDemand() {
582 TestSubscriber s = new TestSubscriber();
583 SubmissionPublisher<Integer> p = basicPublisher();
584 s.request = false;
585 p.subscribe(s);
586 s.awaitSubscribe();
587 assertEquals(0, p.estimateMinimumDemand());
588 s.sn.request(1);
589 assertEquals(1, p.estimateMinimumDemand());
590 p.submit(1);
591 s.awaitNext(1);
592 assertEquals(0, p.estimateMinimumDemand());
593 }
594
595 /**
596 * submit to a publisher with no subscribers returns lag 0
597 */
598 public void testEmptySubmit() {
599 SubmissionPublisher<Integer> p = basicPublisher();
600 assertEquals(0, p.submit(1));
601 }
602
603 /**
604 * submit(null) throws NPE
605 */
606 public void testNullSubmit() {
607 SubmissionPublisher<Integer> p = basicPublisher();
608 try {
609 p.submit(null);
610 shouldThrow();
611 } catch (NullPointerException success) {}
612 }
613
614 /**
615 * submit returns number of lagged items, compatible with result
616 * of estimateMaximumLag.
617 */
618 public void testLaggedSubmit() {
619 SubmissionPublisher<Integer> p = basicPublisher();
620 TestSubscriber s1 = new TestSubscriber();
621 s1.request = false;
622 TestSubscriber s2 = new TestSubscriber();
623 s2.request = false;
624 p.subscribe(s1);
625 p.subscribe(s2);
626 s2.awaitSubscribe();
627 s1.awaitSubscribe();
628 assertEquals(1, p.submit(1));
629 assertTrue(p.estimateMaximumLag() >= 1);
630 assertTrue(p.submit(2) >= 2);
631 assertTrue(p.estimateMaximumLag() >= 2);
632 s1.sn.request(4);
633 assertTrue(p.submit(3) >= 3);
634 assertTrue(p.estimateMaximumLag() >= 3);
635 s2.sn.request(4);
636 p.submit(4);
637 p.close();
638 s2.awaitComplete();
639 assertEquals(4, s2.nexts);
640 s1.awaitComplete();
641 assertEquals(4, s2.nexts);
642 }
643
644 /**
645 * submit eventually issues requested items when buffer capacity is 1
646 */
647 public void testCap1Submit() {
648 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
649 basicExecutor, 1);
650 TestSubscriber s1 = new TestSubscriber();
651 TestSubscriber s2 = new TestSubscriber();
652 p.subscribe(s1);
653 p.subscribe(s2);
654 for (int i = 1; i <= 20; ++i) {
655 assertTrue(p.estimateMinimumDemand() <= 1);
656 assertTrue(p.submit(i) >= 0);
657 }
658 p.close();
659 s2.awaitComplete();
660 s1.awaitComplete();
661 assertEquals(20, s2.nexts);
662 assertEquals(1, s2.completes);
663 assertEquals(20, s1.nexts);
664 assertEquals(1, s1.completes);
665 }
666
667 static boolean noopHandle(AtomicInteger count) {
668 count.getAndIncrement();
669 return false;
670 }
671
672 static boolean reqHandle(AtomicInteger count, Subscriber s) {
673 count.getAndIncrement();
674 ((TestSubscriber)s).sn.request(Long.MAX_VALUE);
675 return true;
676 }
677
678 /**
679 * offer to a publisher with no subscribers returns lag 0
680 */
681 public void testEmptyOffer() {
682 SubmissionPublisher<Integer> p = basicPublisher();
683 assertEquals(0, p.offer(1, null));
684 }
685
686 /**
687 * offer(null) throws NPE
688 */
689 public void testNullOffer() {
690 SubmissionPublisher<Integer> p = basicPublisher();
691 try {
692 p.offer(null, null);
693 shouldThrow();
694 } catch (NullPointerException success) {}
695 }
696
697 /**
698 * offer returns number of lagged items if not saturated
699 */
700 public void testLaggedOffer() {
701 SubmissionPublisher<Integer> p = basicPublisher();
702 TestSubscriber s1 = new TestSubscriber();
703 s1.request = false;
704 TestSubscriber s2 = new TestSubscriber();
705 s2.request = false;
706 p.subscribe(s1);
707 p.subscribe(s2);
708 s2.awaitSubscribe();
709 s1.awaitSubscribe();
710 assertTrue(p.offer(1, null) >= 1);
711 assertTrue(p.offer(2, null) >= 2);
712 s1.sn.request(4);
713 assertTrue(p.offer(3, null) >= 3);
714 s2.sn.request(4);
715 p.offer(4, null);
716 p.close();
717 s2.awaitComplete();
718 assertEquals(4, s2.nexts);
719 s1.awaitComplete();
720 assertEquals(4, s2.nexts);
721 }
722
723 /**
724 * offer reports drops if saturated
725 */
726 public void testDroppedOffer() {
727 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
728 basicExecutor, 4);
729 TestSubscriber s1 = new TestSubscriber();
730 s1.request = false;
731 TestSubscriber s2 = new TestSubscriber();
732 s2.request = false;
733 p.subscribe(s1);
734 p.subscribe(s2);
735 s2.awaitSubscribe();
736 s1.awaitSubscribe();
737 for (int i = 1; i <= 4; ++i)
738 assertTrue(p.offer(i, null) >= 0);
739 p.offer(5, null);
740 assertTrue(p.offer(6, null) < 0);
741 s1.sn.request(64);
742 assertTrue(p.offer(7, null) < 0);
743 s2.sn.request(64);
744 p.close();
745 s2.awaitComplete();
746 assertTrue(s2.nexts >= 4);
747 s1.awaitComplete();
748 assertTrue(s1.nexts >= 4);
749 }
750
751 /**
752 * offer invokes drop handler if saturated
753 */
754 public void testHandledDroppedOffer() {
755 AtomicInteger calls = new AtomicInteger();
756 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
757 basicExecutor, 4);
758 TestSubscriber s1 = new TestSubscriber();
759 s1.request = false;
760 TestSubscriber s2 = new TestSubscriber();
761 s2.request = false;
762 p.subscribe(s1);
763 p.subscribe(s2);
764 s2.awaitSubscribe();
765 s1.awaitSubscribe();
766 for (int i = 1; i <= 4; ++i)
767 assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
768 p.offer(4, (s, x) -> noopHandle(calls));
769 assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
770 s1.sn.request(64);
771 assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
772 s2.sn.request(64);
773 p.close();
774 s2.awaitComplete();
775 s1.awaitComplete();
776 assertTrue(calls.get() >= 4);
777 }
778
779 /**
780 * offer succeeds if drop handler forces request
781 */
782 public void testRecoveredHandledDroppedOffer() {
783 AtomicInteger calls = new AtomicInteger();
784 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
785 basicExecutor, 4);
786 TestSubscriber s1 = new TestSubscriber();
787 s1.request = false;
788 TestSubscriber s2 = new TestSubscriber();
789 s2.request = false;
790 p.subscribe(s1);
791 p.subscribe(s2);
792 s2.awaitSubscribe();
793 s1.awaitSubscribe();
794 int n = 0;
795 for (int i = 1; i <= 8; ++i) {
796 int d = p.offer(i, (s, x) -> reqHandle(calls, s));
797 n = n + 2 + (d < 0 ? d : 0);
798 }
799 p.close();
800 s2.awaitComplete();
801 s1.awaitComplete();
802 assertEquals(n, s1.nexts + s2.nexts);
803 assertTrue(calls.get() >= 2);
804 }
805
806 /**
807 * Timed offer to a publisher with no subscribers returns lag 0
808 */
809 public void testEmptyTimedOffer() {
810 SubmissionPublisher<Integer> p = basicPublisher();
811 long startTime = System.nanoTime();
812 assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null));
813 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
814 }
815
816 /**
817 * Timed offer with null item or TimeUnit throws NPE
818 */
819 public void testNullTimedOffer() {
820 SubmissionPublisher<Integer> p = basicPublisher();
821 long startTime = System.nanoTime();
822 try {
823 p.offer(null, LONG_DELAY_MS, MILLISECONDS, null);
824 shouldThrow();
825 } catch (NullPointerException success) {}
826 try {
827 p.offer(1, LONG_DELAY_MS, null, null);
828 shouldThrow();
829 } catch (NullPointerException success) {}
830 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
831 }
832
833 /**
834 * Timed offer returns number of lagged items if not saturated
835 */
836 public void testLaggedTimedOffer() {
837 SubmissionPublisher<Integer> p = basicPublisher();
838 TestSubscriber s1 = new TestSubscriber();
839 s1.request = false;
840 TestSubscriber s2 = new TestSubscriber();
841 s2.request = false;
842 p.subscribe(s1);
843 p.subscribe(s2);
844 s2.awaitSubscribe();
845 s1.awaitSubscribe();
846 long startTime = System.nanoTime();
847 assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
848 assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
849 s1.sn.request(4);
850 assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
851 s2.sn.request(4);
852 p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
853 p.close();
854 s2.awaitComplete();
855 assertEquals(4, s2.nexts);
856 s1.awaitComplete();
857 assertEquals(4, s2.nexts);
858 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
859 }
860
861 /**
862 * Timed offer reports drops if saturated
863 */
864 public void testDroppedTimedOffer() {
865 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
866 basicExecutor, 4);
867 TestSubscriber s1 = new TestSubscriber();
868 s1.request = false;
869 TestSubscriber s2 = new TestSubscriber();
870 s2.request = false;
871 p.subscribe(s1);
872 p.subscribe(s2);
873 s2.awaitSubscribe();
874 s1.awaitSubscribe();
875 long delay = timeoutMillis();
876 for (int i = 1; i <= 4; ++i)
877 assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0);
878 long startTime = System.nanoTime();
879 assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0);
880 s1.sn.request(64);
881 assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0);
882 // 2 * delay should elapse but check only 1 * delay to allow timer slop
883 assertTrue(millisElapsedSince(startTime) >= delay);
884 s2.sn.request(64);
885 p.close();
886 s2.awaitComplete();
887 assertTrue(s2.nexts >= 2);
888 s1.awaitComplete();
889 assertTrue(s1.nexts >= 2);
890 }
891
892 /**
893 * Timed offer invokes drop handler if saturated
894 */
895 public void testHandledDroppedTimedOffer() {
896 AtomicInteger calls = new AtomicInteger();
897 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
898 basicExecutor, 4);
899 TestSubscriber s1 = new TestSubscriber();
900 s1.request = false;
901 TestSubscriber s2 = new TestSubscriber();
902 s2.request = false;
903 p.subscribe(s1);
904 p.subscribe(s2);
905 s2.awaitSubscribe();
906 s1.awaitSubscribe();
907 long delay = timeoutMillis();
908 for (int i = 1; i <= 4; ++i)
909 assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
910 long startTime = System.nanoTime();
911 assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
912 s1.sn.request(64);
913 assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
914 assertTrue(millisElapsedSince(startTime) >= delay);
915 s2.sn.request(64);
916 p.close();
917 s2.awaitComplete();
918 s1.awaitComplete();
919 assertTrue(calls.get() >= 2);
920 }
921
922 /**
923 * Timed offer succeeds if drop handler forces request
924 */
925 public void testRecoveredHandledDroppedTimedOffer() {
926 AtomicInteger calls = new AtomicInteger();
927 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
928 basicExecutor, 4);
929 TestSubscriber s1 = new TestSubscriber();
930 s1.request = false;
931 TestSubscriber s2 = new TestSubscriber();
932 s2.request = false;
933 p.subscribe(s1);
934 p.subscribe(s2);
935 s2.awaitSubscribe();
936 s1.awaitSubscribe();
937 int n = 0;
938 long delay = timeoutMillis();
939 long startTime = System.nanoTime();
940 for (int i = 1; i <= 6; ++i) {
941 int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
942 n = n + 2 + (d < 0 ? d : 0);
943 }
944 assertTrue(millisElapsedSince(startTime) >= delay);
945 p.close();
946 s2.awaitComplete();
947 s1.awaitComplete();
948 assertEquals(n, s1.nexts + s2.nexts);
949 assertTrue(calls.get() >= 2);
950 }
951
952 /**
953 * consume returns a CompletableFuture that is done when
954 * publisher completes
955 */
956 public void testConsume() {
957 AtomicInteger sum = new AtomicInteger();
958 SubmissionPublisher<Integer> p = basicPublisher();
959 CompletableFuture<Void> f =
960 p.consume((Integer x) -> { sum.getAndAdd(x.intValue()); });
961 int n = 20;
962 for (int i = 1; i <= n; ++i)
963 p.submit(i);
964 p.close();
965 f.join();
966 assertEquals((n * (n + 1)) / 2, sum.get());
967 }
968
969 /**
970 * consume(null) throws NPE
971 */
972 public void testConsumeNPE() {
973 SubmissionPublisher<Integer> p = basicPublisher();
974 try {
975 CompletableFuture<Void> f = p.consume(null);
976 shouldThrow();
977 } catch(NullPointerException success) {
978 }
979 }
980
981 /**
982 * consume eventually stops processing published items if cancelled
983 */
984 public void testCancelledConsume() {
985 AtomicInteger count = new AtomicInteger();
986 SubmissionPublisher<Integer> p = basicPublisher();
987 CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
988 f.cancel(true);
989 int n = 1000000; // arbitrary limit
990 for (int i = 1; i <= n; ++i)
991 p.submit(i);
992 assertTrue(count.get() < n);
993 }
994
995 }