ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SubmissionPublisherTest.java
Revision: 1.9
Committed: Tue Sep 8 19:44:10 2015 UTC (8 years, 7 months ago) by dl
Branch: MAIN
Changes since 1.8: +20 -11 lines
Log Message:
Faster timeout checks

File Contents

# Content
1 /*
2 * Written by Doug Lea and Martin Buchholz with assistance from
3 * members of JCP JSR-166 Expert Group and released to the public
4 * domain, as explained at
5 * http://creativecommons.org/publicdomain/zero/1.0/
6 */
7
8 import static java.util.concurrent.TimeUnit.MILLISECONDS;
9 import static java.util.concurrent.TimeUnit.SECONDS;
10
11 import java.util.concurrent.Executor;
12 import java.util.concurrent.Executors;
13 import java.util.concurrent.Flow;
14 import static java.util.concurrent.Flow.Publisher;
15 import static java.util.concurrent.Flow.Subscriber;
16 import static java.util.concurrent.Flow.Subscription;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.ForkJoinPool;
19 import java.util.concurrent.SubmissionPublisher;
20 import java.util.concurrent.ThreadFactory;
21 import java.util.concurrent.ThreadPoolExecutor;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.atomic.AtomicInteger;
24 import java.util.function.BiConsumer;
25 import java.util.function.BiPredicate;
26 import java.util.function.BiFunction;
27
28 import junit.framework.Test;
29 import junit.framework.TestSuite;
30
31 public class SubmissionPublisherTest extends JSR166TestCase {
32
33 public static void main(String[] args) {
34 main(suite(), args);
35 }
36 public static Test suite() {
37 return new TestSuite(SubmissionPublisherTest.class);
38 }
39
40 // Factory for single thread pool in case commonPool parallelism is zero
41 static final class DaemonThreadFactory implements ThreadFactory {
42 public Thread newThread(Runnable r) {
43 Thread t = new Thread(r);
44 t.setDaemon(true);
45 return t;
46 }
47 }
48
49 static final Executor basicExecutor =
50 (ForkJoinPool.getCommonPoolParallelism() > 1) ?
51 ForkJoinPool.commonPool() :
52 new ThreadPoolExecutor(1, 1, 60, SECONDS,
53 new LinkedBlockingQueue<Runnable>(),
54 new DaemonThreadFactory());
55
56 static SubmissionPublisher<Integer> basicPublisher() {
57 return new SubmissionPublisher<Integer>(basicExecutor,
58 Flow.defaultBufferSize());
59 }
60
61 static class SPException extends RuntimeException {}
62
63 class TestSubscriber implements Subscriber<Integer> {
64 volatile Subscription sn;
65 int last; // Requires that onNexts are in numeric order
66 volatile int nexts;
67 volatile int errors;
68 volatile int completes;
69 volatile boolean throwOnCall = false;
70 volatile boolean request = true;
71 volatile Throwable lastError;
72
73 public synchronized void onSubscribe(Subscription s) {
74 threadAssertTrue(sn == null);
75 sn = s;
76 notifyAll();
77 if (throwOnCall)
78 throw new SPException();
79 if (request)
80 sn.request(1L);
81 }
82 public synchronized void onNext(Integer t) {
83 ++nexts;
84 notifyAll();
85 int current = t.intValue();
86 threadAssertTrue(current >= last);
87 last = current;
88 if (request)
89 sn.request(1L);
90 if (throwOnCall)
91 throw new SPException();
92 }
93 public synchronized void onError(Throwable t) {
94 threadAssertTrue(completes == 0);
95 threadAssertTrue(errors == 0);
96 lastError = t;
97 ++errors;
98 notifyAll();
99 }
100 public synchronized void onComplete() {
101 threadAssertTrue(completes == 0);
102 ++completes;
103 notifyAll();
104 }
105
106 synchronized void awaitSubscribe() {
107 while (sn == null) {
108 try {
109 wait();
110 } catch (Exception ex) {
111 threadUnexpectedException(ex);
112 break;
113 }
114 }
115 }
116 synchronized void awaitNext(int n) {
117 while (nexts < n) {
118 try {
119 wait();
120 } catch (Exception ex) {
121 threadUnexpectedException(ex);
122 break;
123 }
124 }
125 }
126 synchronized void awaitComplete() {
127 while (completes == 0 && errors == 0) {
128 try {
129 wait();
130 } catch (Exception ex) {
131 threadUnexpectedException(ex);
132 break;
133 }
134 }
135 }
136 synchronized void awaitError() {
137 while (errors == 0) {
138 try {
139 wait();
140 } catch (Exception ex) {
141 threadUnexpectedException(ex);
142 break;
143 }
144 }
145 }
146
147 }
148
149 /**
150 * A new SubmissionPublisher has no subscribers, a non-null
151 * executor, a power-of-two capacity, is not closed, and reports
152 * zero demand and lag
153 */
154 void checkInitialState(SubmissionPublisher<?> p) {
155 assertFalse(p.hasSubscribers());
156 assertEquals(0, p.getNumberOfSubscribers());
157 assertTrue(p.getSubscribers().isEmpty());
158 assertFalse(p.isClosed());
159 assertNull(p.getClosedException());
160 int n = p.getMaxBufferCapacity();
161 assertTrue((n & (n - 1)) == 0); // power of two
162 assertNotNull(p.getExecutor());
163 assertEquals(0, p.estimateMinimumDemand());
164 assertEquals(0, p.estimateMaximumLag());
165 }
166
167 /**
168 * A default-constructed SubmissionPublisher has no subscribers,
169 * is not closed, has default buffer size, and uses the
170 * ForkJoinPool.commonPool executor
171 */
172 public void testConstructor1() {
173 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>();
174 checkInitialState(p);
175 assertSame(p.getExecutor(), ForkJoinPool.commonPool());
176 assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
177 }
178
179 /**
180 * A new SubmissionPublisher has no subscribers, is not closed,
181 * has the given buffer size, and uses the given executor
182 */
183 public void testConstructor2() {
184 Executor e = Executors.newFixedThreadPool(1);
185 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(e, 8);
186 checkInitialState(p);
187 assertSame(p.getExecutor(), e);
188 assertEquals(8, p.getMaxBufferCapacity());
189 }
190
191 /**
192 * A null Executor argument to SubmissionPublisher constructor throws NPE
193 */
194 public void testConstructor3() {
195 try {
196 new SubmissionPublisher<Integer>(null, 8);
197 shouldThrow();
198 } catch (NullPointerException success) {}
199 }
200
201 /**
202 * A negative capacity argument to SubmissionPublisher constructor
203 * throws IAE
204 */
205 public void testConstructor4() {
206 Executor e = Executors.newFixedThreadPool(1);
207 try {
208 new SubmissionPublisher<Integer>(e, -1);
209 shouldThrow();
210 } catch (IllegalArgumentException success) {}
211 }
212
213 /**
214 * A closed publisher reports isClosed with no closedException and
215 * throws ISE upon attempted submission; a subsequent close or
216 * closeExceptionally has no additional effect.
217 */
218 public void testClose() {
219 SubmissionPublisher<Integer> p = basicPublisher();
220 checkInitialState(p);
221 p.close();
222 assertTrue(p.isClosed());
223 assertNull(p.getClosedException());
224 try {
225 p.submit(1);
226 shouldThrow();
227 } catch (IllegalStateException success) {}
228 Throwable ex = new SPException();
229 p.closeExceptionally(ex);
230 assertTrue(p.isClosed());
231 assertNull(p.getClosedException());
232 }
233
234 /**
235 * A publisher closedExceptionally reports isClosed with the
236 * closedException and throws ISE upon attempted submission; a
237 * subsequent close or closeExceptionally has no additional
238 * effect.
239 */
240 public void testCloseExceptionally() {
241 SubmissionPublisher<Integer> p = basicPublisher();
242 checkInitialState(p);
243 Throwable ex = new SPException();
244 p.closeExceptionally(ex);
245 assertTrue(p.isClosed());
246 assertSame(p.getClosedException(), ex);
247 try {
248 p.submit(1);
249 shouldThrow();
250 } catch (IllegalStateException success) {}
251 p.close();
252 assertTrue(p.isClosed());
253 assertSame(p.getClosedException(), ex);
254 }
255
256 /**
257 * Upon subscription, the subscriber's onSubscribe is called, no
258 * other Subscriber methods are invoked, the publisher
259 * hasSubscribers, isSubscribed is true, and existing
260 * subscriptions are unaffected.
261 */
262 public void testSubscribe1() {
263 TestSubscriber s = new TestSubscriber();
264 SubmissionPublisher<Integer> p = basicPublisher();
265 p.subscribe(s);
266 assertTrue(p.hasSubscribers());
267 assertEquals(1, p.getNumberOfSubscribers());
268 assertTrue(p.getSubscribers().contains(s));
269 assertTrue(p.isSubscribed(s));
270 s.awaitSubscribe();
271 assertNotNull(s.sn);
272 assertEquals(0, s.nexts);
273 assertEquals(0, s.errors);
274 assertEquals(0, s.completes);
275 TestSubscriber s2 = new TestSubscriber();
276 p.subscribe(s2);
277 assertTrue(p.hasSubscribers());
278 assertEquals(2, p.getNumberOfSubscribers());
279 assertTrue(p.getSubscribers().contains(s));
280 assertTrue(p.getSubscribers().contains(s2));
281 assertTrue(p.isSubscribed(s));
282 assertTrue(p.isSubscribed(s2));
283 s2.awaitSubscribe();
284 assertNotNull(s2.sn);
285 assertEquals(0, s2.nexts);
286 assertEquals(0, s2.errors);
287 assertEquals(0, s2.completes);
288 p.close();
289 }
290
291 /**
292 * If closed, upon subscription, the subscriber's onComplete
293 * method is invoked
294 */
295 public void testSubscribe2() {
296 TestSubscriber s = new TestSubscriber();
297 SubmissionPublisher<Integer> p = basicPublisher();
298 p.close();
299 p.subscribe(s);
300 s.awaitComplete();
301 assertEquals(0, s.nexts);
302 assertEquals(0, s.errors);
303 assertEquals(1, s.completes, 1);
304 }
305
306 /**
307 * If closedExceptionally, upon subscription, the subscriber's
308 * onError method is invoked
309 */
310 public void testSubscribe3() {
311 TestSubscriber s = new TestSubscriber();
312 SubmissionPublisher<Integer> p = basicPublisher();
313 Throwable ex = new SPException();
314 p.closeExceptionally(ex);
315 assertTrue(p.isClosed());
316 assertSame(p.getClosedException(), ex);
317 p.subscribe(s);
318 s.awaitError();
319 assertEquals(0, s.nexts);
320 assertEquals(1, s.errors);
321 }
322
323 /**
324 * Upon attempted resubscription, the subscriber's onError is
325 * called and the subscription is cancelled.
326 */
327 public void testSubscribe4() {
328 TestSubscriber s = new TestSubscriber();
329 SubmissionPublisher<Integer> p = basicPublisher();
330 p.subscribe(s);
331 assertTrue(p.hasSubscribers());
332 assertEquals(1, p.getNumberOfSubscribers());
333 assertTrue(p.getSubscribers().contains(s));
334 assertTrue(p.isSubscribed(s));
335 s.awaitSubscribe();
336 assertNotNull(s.sn);
337 assertEquals(0, s.nexts);
338 assertEquals(0, s.errors);
339 assertEquals(0, s.completes);
340 p.subscribe(s);
341 s.awaitError();
342 assertEquals(0, s.nexts);
343 assertEquals(1, s.errors);
344 assertFalse(p.isSubscribed(s));
345 }
346
347 /**
348 * An exception thrown in onSubscribe causes onError
349 */
350 public void testSubscribe5() {
351 TestSubscriber s = new TestSubscriber();
352 SubmissionPublisher<Integer> p = basicPublisher();
353 s.throwOnCall = true;
354 try {
355 p.subscribe(s);
356 } catch (Exception ok) {}
357 s.awaitError();
358 assertEquals(0, s.nexts);
359 assertEquals(1, s.errors);
360 assertEquals(0, s.completes);
361 }
362
363 /**
364 * subscribe(null) throws NPE
365 */
366 public void testSubscribe6() {
367 SubmissionPublisher<Integer> p = basicPublisher();
368 try {
369 p.subscribe(null);
370 shouldThrow();
371 } catch (NullPointerException success) {}
372 checkInitialState(p);
373 }
374
375 /**
376 * Closing a publisher causes onComplete to subscribers
377 */
378 public void testCloseCompletes() {
379 SubmissionPublisher<Integer> p = basicPublisher();
380 TestSubscriber s1 = new TestSubscriber();
381 TestSubscriber s2 = new TestSubscriber();
382 p.subscribe(s1);
383 p.subscribe(s2);
384 p.submit(1);
385 p.close();
386 assertTrue(p.isClosed());
387 assertNull(p.getClosedException());
388 s1.awaitComplete();
389 assertEquals(1, s1.nexts);
390 assertEquals(1, s1.completes);
391 s2.awaitComplete();
392 assertEquals(1, s2.nexts);
393 assertEquals(1, s2.completes);
394 }
395
396 /**
397 * Closing a publisher exceptionally causes onError to subscribers
398 */
399 public void testCloseExceptionallyError() {
400 SubmissionPublisher<Integer> p = basicPublisher();
401 TestSubscriber s1 = new TestSubscriber();
402 TestSubscriber s2 = new TestSubscriber();
403 p.subscribe(s1);
404 p.subscribe(s2);
405 p.submit(1);
406 p.closeExceptionally(new SPException());
407 assertTrue(p.isClosed());
408 s1.awaitError();
409 assertTrue(s1.nexts <= 1);
410 assertEquals(1, s1.errors);
411 s2.awaitError();
412 assertTrue(s2.nexts <= 1);
413 assertEquals(1, s2.errors);
414 }
415
416 /**
417 * Cancelling a subscription eventually causes no more onNexts to be issued
418 */
419 public void testCancel() {
420 SubmissionPublisher<Integer> p = basicPublisher();
421 TestSubscriber s1 = new TestSubscriber();
422 TestSubscriber s2 = new TestSubscriber();
423 p.subscribe(s1);
424 p.subscribe(s2);
425 s1.awaitSubscribe();
426 p.submit(1);
427 s1.sn.cancel();
428 for (int i = 2; i <= 20; ++i)
429 p.submit(i);
430 p.close();
431 s2.awaitComplete();
432 assertEquals(20, s2.nexts);
433 assertEquals(1, s2.completes);
434 assertTrue(s1.nexts < 20);
435 assertFalse(p.isSubscribed(s1));
436 }
437
438 /**
439 * Throwing an exception in onNext causes onError
440 */
441 public void testThrowOnNext() {
442 SubmissionPublisher<Integer> p = basicPublisher();
443 TestSubscriber s1 = new TestSubscriber();
444 TestSubscriber s2 = new TestSubscriber();
445 p.subscribe(s1);
446 p.subscribe(s2);
447 s1.awaitSubscribe();
448 p.submit(1);
449 s1.throwOnCall = true;
450 p.submit(2);
451 p.close();
452 s2.awaitComplete();
453 assertEquals(2, s2.nexts);
454 s1.awaitComplete();
455 assertEquals(1, s1.errors);
456 }
457
458 /**
459 * If a handler is supplied in constructor, it is invoked when
460 * subscriber throws an exception in onNext
461 */
462 public void testThrowOnNextHandler() {
463 AtomicInteger calls = new AtomicInteger();
464 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>
465 (basicExecutor, 8,
466 (s, e) -> calls.getAndIncrement());
467 TestSubscriber s1 = new TestSubscriber();
468 TestSubscriber s2 = new TestSubscriber();
469 p.subscribe(s1);
470 p.subscribe(s2);
471 s1.awaitSubscribe();
472 p.submit(1);
473 s1.throwOnCall = true;
474 p.submit(2);
475 p.close();
476 s2.awaitComplete();
477 assertEquals(2, s2.nexts);
478 assertEquals(1, s2.completes);
479 s1.awaitError();
480 assertEquals(1, s1.errors);
481 assertEquals(1, calls.get());
482 }
483
484 /**
485 * onNext items are issued in the same order to each subscriber
486 */
487 public void testOrder() {
488 SubmissionPublisher<Integer> p = basicPublisher();
489 TestSubscriber s1 = new TestSubscriber();
490 TestSubscriber s2 = new TestSubscriber();
491 p.subscribe(s1);
492 p.subscribe(s2);
493 for (int i = 1; i <= 20; ++i)
494 p.submit(i);
495 p.close();
496 s2.awaitComplete();
497 s1.awaitComplete();
498 assertEquals(20, s2.nexts);
499 assertEquals(1, s2.completes);
500 assertEquals(20, s1.nexts);
501 assertEquals(1, s1.completes);
502 }
503
504 /**
505 * onNext is issued only if requested
506 */
507 public void testRequest1() {
508 SubmissionPublisher<Integer> p = basicPublisher();
509 TestSubscriber s1 = new TestSubscriber();
510 s1.request = false;
511 p.subscribe(s1);
512 s1.awaitSubscribe();
513 assertTrue(p.estimateMinimumDemand() == 0);
514 TestSubscriber s2 = new TestSubscriber();
515 p.subscribe(s2);
516 p.submit(1);
517 p.submit(2);
518 s2.awaitNext(1);
519 assertEquals(0, s1.nexts);
520 s1.sn.request(3);
521 p.submit(3);
522 p.close();
523 s2.awaitComplete();
524 assertEquals(3, s2.nexts);
525 assertEquals(1, s2.completes);
526 s1.awaitComplete();
527 assertTrue(s1.nexts > 0);
528 assertEquals(1, s1.completes);
529 }
530
531 /**
532 * onNext is not issued when requests become zero
533 */
534 public void testRequest2() {
535 SubmissionPublisher<Integer> p = basicPublisher();
536 TestSubscriber s1 = new TestSubscriber();
537 TestSubscriber s2 = new TestSubscriber();
538 p.subscribe(s1);
539 p.subscribe(s2);
540 s2.awaitSubscribe();
541 s1.awaitSubscribe();
542 s1.request = false;
543 p.submit(1);
544 p.submit(2);
545 p.close();
546 s2.awaitComplete();
547 assertEquals(2, s2.nexts);
548 assertEquals(1, s2.completes);
549 s1.awaitNext(1);
550 assertEquals(1, s1.nexts);
551 }
552
553 /**
554 * Negative request causes error
555 */
556 public void testRequest3() {
557 SubmissionPublisher<Integer> p = basicPublisher();
558 TestSubscriber s1 = new TestSubscriber();
559 TestSubscriber s2 = new TestSubscriber();
560 p.subscribe(s1);
561 p.subscribe(s2);
562 s2.awaitSubscribe();
563 s1.awaitSubscribe();
564 s1.sn.request(-1L);
565 p.submit(1);
566 p.submit(2);
567 p.close();
568 s2.awaitComplete();
569 assertEquals(2, s2.nexts);
570 assertEquals(1, s2.completes);
571 s1.awaitError();
572 assertEquals(1, s1.errors);
573 assertTrue(s1.lastError instanceof IllegalArgumentException);
574 }
575
576 /**
577 * estimateMinimumDemand reports 0 until request, nonzero after
578 * request, and zero again after delivery
579 */
580 public void testEstimateMinimumDemand() {
581 TestSubscriber s = new TestSubscriber();
582 SubmissionPublisher<Integer> p = basicPublisher();
583 s.request = false;
584 p.subscribe(s);
585 s.awaitSubscribe();
586 assertEquals(0, p.estimateMinimumDemand());
587 s.sn.request(1);
588 assertEquals(1, p.estimateMinimumDemand());
589 p.submit(1);
590 s.awaitNext(1);
591 assertEquals(0, p.estimateMinimumDemand());
592 }
593
594 /**
595 * submit to a publisher with no subscribers returns lag 0
596 */
597 public void testEmptySubmit() {
598 SubmissionPublisher<Integer> p = basicPublisher();
599 assertEquals(0, p.submit(1));
600 }
601
602 /**
603 * submit(null) throws NPE
604 */
605 public void testNullSubmit() {
606 SubmissionPublisher<Integer> p = basicPublisher();
607 try {
608 p.submit(null);
609 shouldThrow();
610 } catch (NullPointerException success) {}
611 }
612
613 /**
614 * submit returns number of lagged items, compatible with result
615 * of estimateMaximumLag.
616 */
617 public void testLaggedSubmit() {
618 SubmissionPublisher<Integer> p = basicPublisher();
619 TestSubscriber s1 = new TestSubscriber();
620 s1.request = false;
621 TestSubscriber s2 = new TestSubscriber();
622 s2.request = false;
623 p.subscribe(s1);
624 p.subscribe(s2);
625 s2.awaitSubscribe();
626 s1.awaitSubscribe();
627 assertEquals(1, p.submit(1));
628 assertTrue(p.estimateMaximumLag() >= 1);
629 assertTrue(p.submit(2) >= 2);
630 assertTrue(p.estimateMaximumLag() >= 2);
631 s1.sn.request(4);
632 assertTrue(p.submit(3) >= 3);
633 assertTrue(p.estimateMaximumLag() >= 3);
634 s2.sn.request(4);
635 p.submit(4);
636 p.close();
637 s2.awaitComplete();
638 assertEquals(4, s2.nexts);
639 s1.awaitComplete();
640 assertEquals(4, s2.nexts);
641 }
642
643 /**
644 * submit eventually issues requested items when buffer capacity is 1
645 */
646 public void testCap1Submit() {
647 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
648 basicExecutor, 1);
649 TestSubscriber s1 = new TestSubscriber();
650 TestSubscriber s2 = new TestSubscriber();
651 p.subscribe(s1);
652 p.subscribe(s2);
653 for (int i = 1; i <= 20; ++i) {
654 assertTrue(p.estimateMinimumDemand() <= 1);
655 assertTrue(p.submit(i) >= 0);
656 }
657 p.close();
658 s2.awaitComplete();
659 s1.awaitComplete();
660 assertEquals(20, s2.nexts);
661 assertEquals(1, s2.completes);
662 assertEquals(20, s1.nexts);
663 assertEquals(1, s1.completes);
664 }
665
666 static boolean noopHandle(AtomicInteger count) {
667 count.getAndIncrement();
668 return false;
669 }
670
671 static boolean reqHandle(AtomicInteger count, Subscriber s) {
672 count.getAndIncrement();
673 ((TestSubscriber)s).sn.request(Long.MAX_VALUE);
674 return true;
675 }
676
677 /**
678 * offer to a publisher with no subscribers returns lag 0
679 */
680 public void testEmptyOffer() {
681 SubmissionPublisher<Integer> p = basicPublisher();
682 assertEquals(0, p.offer(1, null));
683 }
684
685 /**
686 * offer(null) throws NPE
687 */
688 public void testNullOffer() {
689 SubmissionPublisher<Integer> p = basicPublisher();
690 try {
691 p.offer(null, null);
692 shouldThrow();
693 } catch (NullPointerException success) {}
694 }
695
696 /**
697 * offer returns number of lagged items if not saturated
698 */
699 public void testLaggedOffer() {
700 SubmissionPublisher<Integer> p = basicPublisher();
701 TestSubscriber s1 = new TestSubscriber();
702 s1.request = false;
703 TestSubscriber s2 = new TestSubscriber();
704 s2.request = false;
705 p.subscribe(s1);
706 p.subscribe(s2);
707 s2.awaitSubscribe();
708 s1.awaitSubscribe();
709 assertTrue(p.offer(1, null) >= 1);
710 assertTrue(p.offer(2, null) >= 2);
711 s1.sn.request(4);
712 assertTrue(p.offer(3, null) >= 3);
713 s2.sn.request(4);
714 p.offer(4, null);
715 p.close();
716 s2.awaitComplete();
717 assertEquals(4, s2.nexts);
718 s1.awaitComplete();
719 assertEquals(4, s2.nexts);
720 }
721
722 /**
723 * offer reports drops if saturated
724 */
725 public void testDroppedOffer() {
726 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
727 basicExecutor, 4);
728 TestSubscriber s1 = new TestSubscriber();
729 s1.request = false;
730 TestSubscriber s2 = new TestSubscriber();
731 s2.request = false;
732 p.subscribe(s1);
733 p.subscribe(s2);
734 s2.awaitSubscribe();
735 s1.awaitSubscribe();
736 for (int i = 1; i <= 4; ++i)
737 assertTrue(p.offer(i, null) >= 0);
738 p.offer(5, null);
739 assertTrue(p.offer(6, null) < 0);
740 s1.sn.request(64);
741 assertTrue(p.offer(7, null) < 0);
742 s2.sn.request(64);
743 p.close();
744 s2.awaitComplete();
745 assertTrue(s2.nexts >= 4);
746 s1.awaitComplete();
747 assertTrue(s1.nexts >= 4);
748 }
749
750 /**
751 * offer invokes drop handler if saturated
752 */
753 public void testHandledDroppedOffer() {
754 AtomicInteger calls = new AtomicInteger();
755 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
756 basicExecutor, 4);
757 TestSubscriber s1 = new TestSubscriber();
758 s1.request = false;
759 TestSubscriber s2 = new TestSubscriber();
760 s2.request = false;
761 p.subscribe(s1);
762 p.subscribe(s2);
763 s2.awaitSubscribe();
764 s1.awaitSubscribe();
765 for (int i = 1; i <= 4; ++i)
766 assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
767 p.offer(4, (s, x) -> noopHandle(calls));
768 assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
769 s1.sn.request(64);
770 assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
771 s2.sn.request(64);
772 p.close();
773 s2.awaitComplete();
774 s1.awaitComplete();
775 assertTrue(calls.get() >= 4);
776 }
777
778 /**
779 * offer succeeds if drop handler forces request
780 */
781 public void testRecoveredHandledDroppedOffer() {
782 AtomicInteger calls = new AtomicInteger();
783 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
784 basicExecutor, 4);
785 TestSubscriber s1 = new TestSubscriber();
786 s1.request = false;
787 TestSubscriber s2 = new TestSubscriber();
788 s2.request = false;
789 p.subscribe(s1);
790 p.subscribe(s2);
791 s2.awaitSubscribe();
792 s1.awaitSubscribe();
793 int n = 0;
794 for (int i = 1; i <= 8; ++i) {
795 int d = p.offer(i, (s, x) -> reqHandle(calls, s));
796 n = n + 2 + (d < 0 ? d : 0);
797 }
798 p.close();
799 s2.awaitComplete();
800 s1.awaitComplete();
801 assertEquals(n, s1.nexts + s2.nexts);
802 assertTrue(calls.get() >= 2);
803 }
804
805 /**
806 * Timed offer to a publisher with no subscribers returns lag 0
807 */
808 public void testEmptyTimedOffer() {
809 SubmissionPublisher<Integer> p = basicPublisher();
810 long startTime = System.nanoTime();
811 assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null));
812 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
813 }
814
815 /**
816 * Timed offer with null item or TimeUnit throws NPE
817 */
818 public void testNullTimedOffer() {
819 SubmissionPublisher<Integer> p = basicPublisher();
820 long startTime = System.nanoTime();
821 try {
822 p.offer(null, LONG_DELAY_MS, MILLISECONDS, null);
823 shouldThrow();
824 } catch (NullPointerException success) {}
825 try {
826 p.offer(1, LONG_DELAY_MS, null, null);
827 shouldThrow();
828 } catch (NullPointerException success) {}
829 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
830 }
831
832 /**
833 * Timed offer returns number of lagged items if not saturated
834 */
835 public void testLaggedTimedOffer() {
836 SubmissionPublisher<Integer> p = basicPublisher();
837 TestSubscriber s1 = new TestSubscriber();
838 s1.request = false;
839 TestSubscriber s2 = new TestSubscriber();
840 s2.request = false;
841 p.subscribe(s1);
842 p.subscribe(s2);
843 s2.awaitSubscribe();
844 s1.awaitSubscribe();
845 long startTime = System.nanoTime();
846 assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
847 assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
848 s1.sn.request(4);
849 assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
850 s2.sn.request(4);
851 p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
852 p.close();
853 s2.awaitComplete();
854 assertEquals(4, s2.nexts);
855 s1.awaitComplete();
856 assertEquals(4, s2.nexts);
857 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
858 }
859
860 /**
861 * Timed offer reports drops if saturated
862 */
863 public void testDroppedTimedOffer() {
864 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
865 basicExecutor, 4);
866 TestSubscriber s1 = new TestSubscriber();
867 s1.request = false;
868 TestSubscriber s2 = new TestSubscriber();
869 s2.request = false;
870 p.subscribe(s1);
871 p.subscribe(s2);
872 s2.awaitSubscribe();
873 s1.awaitSubscribe();
874 long delay = timeoutMillis();
875 for (int i = 1; i <= 4; ++i)
876 assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0);
877 long startTime = System.nanoTime();
878 assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0);
879 s1.sn.request(64);
880 assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0);
881 // 2 * delay should elapse but check only 1 * delay to allow timer slop
882 assertTrue(millisElapsedSince(startTime) >= delay);
883 s2.sn.request(64);
884 p.close();
885 s2.awaitComplete();
886 assertTrue(s2.nexts >= 2);
887 s1.awaitComplete();
888 assertTrue(s1.nexts >= 2);
889 }
890
891 /**
892 * Timed offer invokes drop handler if saturated
893 */
894 public void testHandledDroppedTimedOffer() {
895 AtomicInteger calls = new AtomicInteger();
896 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
897 basicExecutor, 4);
898 TestSubscriber s1 = new TestSubscriber();
899 s1.request = false;
900 TestSubscriber s2 = new TestSubscriber();
901 s2.request = false;
902 p.subscribe(s1);
903 p.subscribe(s2);
904 s2.awaitSubscribe();
905 s1.awaitSubscribe();
906 long delay = timeoutMillis();
907 for (int i = 1; i <= 4; ++i)
908 assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
909 long startTime = System.nanoTime();
910 assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
911 s1.sn.request(64);
912 assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
913 assertTrue(millisElapsedSince(startTime) >= delay);
914 s2.sn.request(64);
915 p.close();
916 s2.awaitComplete();
917 s1.awaitComplete();
918 assertTrue(calls.get() >= 2);
919 }
920
921 /**
922 * Timed offer succeeds if drop handler forces request
923 */
924 public void testRecoveredHandledDroppedTimedOffer() {
925 AtomicInteger calls = new AtomicInteger();
926 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
927 basicExecutor, 4);
928 TestSubscriber s1 = new TestSubscriber();
929 s1.request = false;
930 TestSubscriber s2 = new TestSubscriber();
931 s2.request = false;
932 p.subscribe(s1);
933 p.subscribe(s2);
934 s2.awaitSubscribe();
935 s1.awaitSubscribe();
936 int n = 0;
937 long delay = timeoutMillis();
938 long startTime = System.nanoTime();
939 for (int i = 1; i <= 6; ++i) {
940 int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
941 n = n + 2 + (d < 0 ? d : 0);
942 }
943 assertTrue(millisElapsedSince(startTime) >= delay);
944 p.close();
945 s2.awaitComplete();
946 s1.awaitComplete();
947 assertEquals(n, s1.nexts + s2.nexts);
948 assertTrue(calls.get() >= 2);
949 }
950
951 }