ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SubmissionPublisherTest.java
Revision: 1.1
Committed: Mon Sep 7 17:14:06 2015 UTC (8 years, 7 months ago) by dl
Branch: MAIN
Log Message:
Add SubmissionPublisher tests

File Contents

# Content
1 /*
2 * Written by Doug Lea and Martin Buchholz with assistance from
3 * members of JCP JSR-166 Expert Group and released to the public
4 * domain, as explained at
5 * http://creativecommons.org/publicdomain/zero/1.0/
6 */
7
8 import static java.util.concurrent.TimeUnit.MILLISECONDS;
9 import static java.util.concurrent.TimeUnit.SECONDS;
10
11 import java.util.concurrent.Executor;
12 import java.util.concurrent.Executors;
13 import java.util.concurrent.Flow;
14 import static java.util.concurrent.Flow.Publisher;
15 import static java.util.concurrent.Flow.Subscriber;
16 import static java.util.concurrent.Flow.Subscription;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.ForkJoinPool;
19 import java.util.concurrent.SubmissionPublisher;
20 import java.util.concurrent.ThreadFactory;
21 import java.util.concurrent.ThreadPoolExecutor;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.atomic.AtomicInteger;
24 import java.util.function.BiConsumer;
25 import java.util.function.BiPredicate;
26 import java.util.function.BiFunction;
27
28 import junit.framework.Test;
29 import junit.framework.TestSuite;
30
31 public class SubmissionPublisherTest extends JSR166TestCase {
32
33 public static void main(String[] args) {
34 main(suite(), args);
35 }
36 public static Test suite() {
37 return new TestSuite(SubmissionPublisherTest.class);
38 }
39
40 // Factory for single thread pool in case commonPool parallelism is zero
41 static final class DaemonThreadFactory implements ThreadFactory {
42 public Thread newThread(Runnable r) {
43 Thread t = new Thread(r);
44 t.setDaemon(true);
45 return t;
46 }
47 }
48
49 static final Executor basicExecutor =
50 (ForkJoinPool.getCommonPoolParallelism() > 0) ?
51 ForkJoinPool.commonPool() :
52 new ThreadPoolExecutor(1, 1, 60, SECONDS,
53 new LinkedBlockingQueue<Runnable>(),
54 new DaemonThreadFactory());
55
56 static SubmissionPublisher<Integer> basicPublisher() {
57 return new SubmissionPublisher<Integer>(basicExecutor,
58 Flow.defaultBufferSize());
59 }
60
61 static class SPException extends RuntimeException {}
62
63 class TestSubscriber implements Subscriber<Integer> {
64 volatile Subscription sn;
65 int last; // Requires that onNexts are in numeric order
66 volatile int nexts;
67 volatile int errors;
68 volatile int completes;
69 volatile boolean throwOnCall = false;
70 volatile boolean request = true;
71 volatile Throwable lastError;
72
73 public synchronized void onSubscribe(Subscription s) {
74 threadAssertTrue(sn == null);
75 sn = s;
76 notifyAll();
77 if (throwOnCall)
78 throw new SPException();
79 if (request)
80 sn.request(1L);
81 }
82 public synchronized void onNext(Integer t) {
83 ++nexts;
84 notifyAll();
85 int current = t.intValue();
86 threadAssertTrue(current >= last);
87 last = current;
88 if (request)
89 sn.request(1L);
90 if (throwOnCall)
91 throw new SPException();
92 }
93 public synchronized void onError(Throwable t) {
94 threadAssertTrue(completes == 0);
95 threadAssertTrue(errors == 0);
96 lastError = t;
97 ++errors;
98 notifyAll();
99 }
100 public synchronized void onComplete() {
101 threadAssertTrue(completes == 0);
102 ++completes;
103 notifyAll();
104 }
105
106 synchronized void awaitSubscribe() {
107 while (sn == null) {
108 try {
109 wait();
110 } catch (Exception ex) {
111 threadUnexpectedException(ex);
112 break;
113 }
114 }
115 }
116 synchronized void awaitNext(int n) {
117 while (nexts < n) {
118 try {
119 wait();
120 } catch (Exception ex) {
121 threadUnexpectedException(ex);
122 break;
123 }
124 }
125 }
126 synchronized void awaitComplete() {
127 while (completes == 0 && errors == 0) {
128 try {
129 wait();
130 } catch (Exception ex) {
131 threadUnexpectedException(ex);
132 break;
133 }
134 }
135 }
136 synchronized void awaitError() {
137 while (errors == 0) {
138 try {
139 wait();
140 } catch (Exception ex) {
141 threadUnexpectedException(ex);
142 break;
143 }
144 }
145 }
146
147 }
148
149 /**
150 * A new SubmissionPublisher has no subscribers, a non-null
151 * executor, a power-of-two capacity, is not closed, and reports
152 * zero demand and lag
153 */
154 void checkInitialState(SubmissionPublisher<?> p) {
155 assertFalse(p.hasSubscribers());
156 assertEquals(p.getNumberOfSubscribers(), 0);
157 assertTrue(p.getSubscribers().isEmpty());
158 assertFalse(p.isClosed());
159 assertNull(p.getClosedException());
160 int n = p.getMaxBufferCapacity();
161 assertTrue((n & (n - 1)) == 0); // power of two
162 assertNotNull(p.getExecutor());
163 assertEquals(p.estimateMinimumDemand(), 0);
164 assertEquals(p.estimateMaximumLag(), 0);
165 }
166
167 /**
168 * A default-constructed SubmissionPublisher has no subscribers,
169 * is not closed, has default buffer size, and uses the
170 * ForkJoinPool.commonPool executor
171 */
172 public void testConstructor1() {
173 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>();
174 checkInitialState(p);
175 assertSame(p.getExecutor(), ForkJoinPool.commonPool());
176 assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
177 }
178
179 /**
180 * A new SubmissionPublisher has no subscribers, is not closed,
181 * has the given buffer size, and uses the given executor
182 */
183 public void testConstructor2() {
184 Executor e = Executors.newFixedThreadPool(1);
185 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(e, 8);
186 checkInitialState(p);
187 assertSame(p.getExecutor(), e);
188 assertEquals(p.getMaxBufferCapacity(), 8);
189 }
190
191 /**
192 * A null Executor argument to SubmissionPublisher constructor throws NPE
193 */
194 public void testConstructor3() {
195 try {
196 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(null, 8);
197 shouldThrow();
198 } catch (NullPointerException success) {
199 }
200 }
201
202 /**
203 * A negative capacity argument to SubmissionPublisher constructor
204 * throws IAE
205 */
206 public void testConstructor4() {
207 Executor e = Executors.newFixedThreadPool(1);
208 try {
209 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(e, -1);
210 shouldThrow();
211 } catch (IllegalArgumentException success) {
212 }
213 }
214
215 /**
216 * A closed publisher reports isClosed with no closedException and
217 * throws ISE upon attempted submission; a subsequent close or
218 * closeExceptionally has no additional effect.
219 */
220 public void testClose() {
221 SubmissionPublisher<Integer> p = basicPublisher();
222 checkInitialState(p);
223 p.close();
224 assertTrue(p.isClosed());
225 assertNull(p.getClosedException());
226 try {
227 p.submit(1);
228 shouldThrow();
229 }
230 catch(IllegalStateException success) {
231 }
232 Throwable ex = new SPException();
233 p.closeExceptionally(ex);
234 assertTrue(p.isClosed());
235 assertNull(p.getClosedException());
236 }
237
238 /**
239 * A publisher closedExceptionally reports isClosed with the
240 * closedException and throws ISE upon attempted submission; a
241 * subsequent close or closeExceptionally has no additional
242 * effect.
243 */
244 public void testCloseExceptionally() {
245 SubmissionPublisher<Integer> p = basicPublisher();
246 checkInitialState(p);
247 Throwable ex = new SPException();
248 p.closeExceptionally(ex);
249 assertTrue(p.isClosed());
250 assertSame(p.getClosedException(), ex);
251 try {
252 p.submit(1);
253 shouldThrow();
254 }
255 catch(IllegalStateException success) {
256 }
257 p.close();
258 assertTrue(p.isClosed());
259 assertSame(p.getClosedException(), ex);
260 }
261
262 /**
263 * Upon subscription, the subscriber's onSubscribe is called, no
264 * other Subscriber methods are invoked, the publisher
265 * hasSubscribers, isSubscribed is true, and existing
266 * subscriptions are unaffected.
267 */
268 public void testSubscribe1() {
269 TestSubscriber s = new TestSubscriber();
270 SubmissionPublisher<Integer> p = basicPublisher();
271 p.subscribe(s);
272 assertTrue(p.hasSubscribers());
273 assertEquals(p.getNumberOfSubscribers(), 1);
274 assertTrue(p.getSubscribers().contains(s));
275 assertTrue(p.isSubscribed(s));
276 s.awaitSubscribe();
277 assertNotNull(s.sn);
278 assertEquals(s.nexts, 0);
279 assertEquals(s.errors, 0);
280 assertEquals(s.completes, 0);
281 TestSubscriber s2 = new TestSubscriber();
282 p.subscribe(s2);
283 assertTrue(p.hasSubscribers());
284 assertEquals(p.getNumberOfSubscribers(), 2);
285 assertTrue(p.getSubscribers().contains(s));
286 assertTrue(p.getSubscribers().contains(s2));
287 assertTrue(p.isSubscribed(s));
288 assertTrue(p.isSubscribed(s2));
289 s2.awaitSubscribe();
290 assertNotNull(s2.sn);
291 assertEquals(s2.nexts, 0);
292 assertEquals(s2.errors, 0);
293 assertEquals(s2.completes, 0);
294 }
295
296 /**
297 * If closed, upon subscription, the subscriber's onComplete
298 * method is invoked
299 */
300 public void testSubscribe2() {
301 TestSubscriber s = new TestSubscriber();
302 SubmissionPublisher<Integer> p = basicPublisher();
303 p.close();
304 p.subscribe(s);
305 s.awaitComplete();
306 assertEquals(s.nexts, 0);
307 assertEquals(s.errors, 0);
308 assertEquals(s.completes, 1);
309 }
310
311 /**
312 * If closedExceptionally, upon subscription, the subscriber's
313 * onError method is invoked
314 */
315 public void testSubscribe3() {
316 TestSubscriber s = new TestSubscriber();
317 SubmissionPublisher<Integer> p = basicPublisher();
318 Throwable ex = new SPException();
319 p.closeExceptionally(ex);
320 assertTrue(p.isClosed());
321 assertSame(p.getClosedException(), ex);
322 p.subscribe(s);
323 s.awaitError();
324 assertEquals(s.nexts, 0);
325 assertEquals(s.errors, 1);
326 }
327
328 /**
329 * Upon attempted resubscription, the subscriber's onError is
330 * called and the subscription is cancelled.
331 */
332 public void testSubscribe4() {
333 TestSubscriber s = new TestSubscriber();
334 SubmissionPublisher<Integer> p = basicPublisher();
335 p.subscribe(s);
336 assertTrue(p.hasSubscribers());
337 assertEquals(p.getNumberOfSubscribers(), 1);
338 assertTrue(p.getSubscribers().contains(s));
339 assertTrue(p.isSubscribed(s));
340 s.awaitSubscribe();
341 assertNotNull(s.sn);
342 assertEquals(s.nexts, 0);
343 assertEquals(s.errors, 0);
344 assertEquals(s.completes, 0);
345 p.subscribe(s);
346 s.awaitError();
347 assertEquals(s.nexts, 0);
348 assertEquals(s.errors, 1);
349 assertFalse(p.isSubscribed(s));
350 }
351
352 /**
353 * An exception thrown in onSubscribe causes onError
354 */
355 public void testSubscribe5() {
356 TestSubscriber s = new TestSubscriber();
357 SubmissionPublisher<Integer> p = basicPublisher();
358 s.throwOnCall = true;
359 try {
360 p.subscribe(s);
361 } catch(Exception ok) {
362 }
363 s.awaitError();
364 assertEquals(s.nexts, 0);
365 assertEquals(s.errors, 1);
366 assertEquals(s.completes, 0);
367 }
368
369 /**
370 * subscribe(null) thows NPE
371 */
372 public void testSubscribe6() {
373 SubmissionPublisher<Integer> p = basicPublisher();
374 try {
375 p.subscribe(null);
376 } catch(NullPointerException success) {
377 }
378 checkInitialState(p);
379 }
380
381 /**
382 * Closing a publisher causes onComplete to subscribers
383 */
384 public void testCloseCompletes() {
385 SubmissionPublisher<Integer> p = basicPublisher();
386 TestSubscriber s1 = new TestSubscriber();
387 TestSubscriber s2 = new TestSubscriber();
388 p.subscribe(s1);
389 p.subscribe(s2);
390 p.submit(1);
391 p.close();
392 assertTrue(p.isClosed());
393 assertNull(p.getClosedException());
394 s1.awaitComplete();
395 assertEquals(s1.nexts, 1);
396 assertEquals(s1.completes, 1);
397 s2.awaitComplete();
398 assertEquals(s2.nexts, 1);
399 assertEquals(s2.completes, 1);
400 }
401
402 /**
403 * Closing a publisher exceptionally causes onError to subscribers
404 */
405 public void testCloseExceptionallyError() {
406 SubmissionPublisher<Integer> p = basicPublisher();
407 TestSubscriber s1 = new TestSubscriber();
408 TestSubscriber s2 = new TestSubscriber();
409 p.subscribe(s1);
410 p.subscribe(s2);
411 p.submit(1);
412 p.closeExceptionally(new SPException());
413 assertTrue(p.isClosed());
414 s1.awaitError();
415 assertTrue(s1.nexts <= 1);
416 assertEquals(s1.errors, 1);
417 s2.awaitError();
418 assertTrue(s2.nexts <= 1);
419 assertEquals(s2.errors, 1);
420 }
421
422 /**
423 * Cancelling a subscription eventually causes no more onNexts to be issued
424 */
425 public void testCancel() {
426 SubmissionPublisher<Integer> p = basicPublisher();
427 TestSubscriber s1 = new TestSubscriber();
428 TestSubscriber s2 = new TestSubscriber();
429 p.subscribe(s1);
430 p.subscribe(s2);
431 s1.awaitSubscribe();
432 p.submit(1);
433 s1.sn.cancel();
434 for (int i = 2; i <= 20; ++i)
435 p.submit(i);
436 p.close();
437 s2.awaitComplete();
438 assertEquals(s2.nexts, 20);
439 assertEquals(s2.completes, 1);
440 assertTrue(s1.nexts < 20);
441 assertFalse(p.isSubscribed(s1));
442 }
443
444 /**
445 * Throwing an exception in onNext causes onError
446 */
447 public void testThrowOnNext() {
448 SubmissionPublisher<Integer> p = basicPublisher();
449 TestSubscriber s1 = new TestSubscriber();
450 TestSubscriber s2 = new TestSubscriber();
451 p.subscribe(s1);
452 p.subscribe(s2);
453 s1.awaitSubscribe();
454 p.submit(1);
455 s1.throwOnCall = true;
456 p.submit(2);
457 p.close();
458 s2.awaitComplete();
459 assertEquals(s2.nexts, 2);
460 s1.awaitComplete();
461 assertEquals(s1.errors, 1);
462 }
463
464 /**
465 * If a handler is supplied in conctructor, it is invoked when
466 * subscriber throws an exception in onNext
467 */
468 public void testThrowOnNextHandler() {
469 AtomicInteger calls = new AtomicInteger();
470 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>
471 (basicExecutor, 8,
472 (s, e) -> calls.getAndIncrement());
473 TestSubscriber s1 = new TestSubscriber();
474 TestSubscriber s2 = new TestSubscriber();
475 p.subscribe(s1);
476 p.subscribe(s2);
477 s1.awaitSubscribe();
478 p.submit(1);
479 s1.throwOnCall = true;
480 p.submit(2);
481 p.close();
482 s2.awaitComplete();
483 assertEquals(s2.nexts, 2);
484 assertEquals(s2.completes, 1);
485 s1.awaitError();
486 assertEquals(s1.errors, 1);
487 assertEquals(calls.get(), 1);
488 }
489
490 /**
491 * onNext items are issued in the same order to each subscriber
492 */
493 public void testOrder() {
494 SubmissionPublisher<Integer> p = basicPublisher();
495 TestSubscriber s1 = new TestSubscriber();
496 TestSubscriber s2 = new TestSubscriber();
497 p.subscribe(s1);
498 p.subscribe(s2);
499 for (int i = 1; i <= 20; ++i)
500 p.submit(i);
501 p.close();
502 s2.awaitComplete();
503 s1.awaitComplete();
504 assertEquals(s2.nexts, 20);
505 assertEquals(s2.completes, 1);
506 assertEquals(s1.nexts, 20);
507 assertEquals(s1.completes, 1);
508 }
509
510 /**
511 * onNext is issued only if requested
512 */
513 public void testRequest1() {
514 SubmissionPublisher<Integer> p = basicPublisher();
515 TestSubscriber s1 = new TestSubscriber();
516 s1.request = false;
517 p.subscribe(s1);
518 s1.awaitSubscribe();
519 assertTrue(p.estimateMinimumDemand() == 0);
520 TestSubscriber s2 = new TestSubscriber();
521 p.subscribe(s2);
522 p.submit(1);
523 p.submit(2);
524 s2.awaitNext(1);
525 assertEquals(s1.nexts, 0);
526 s1.sn.request(3);
527 p.submit(3);
528 p.close();
529 s2.awaitComplete();
530 assertEquals(s2.nexts, 3);
531 assertEquals(s2.completes, 1);
532 s1.awaitComplete();
533 assertTrue(s1.nexts > 0);
534 assertEquals(s1.completes, 1);
535 }
536
537 /**
538 * onNext is not issued when requests become zero
539 */
540 public void testRequest2() {
541 SubmissionPublisher<Integer> p = basicPublisher();
542 TestSubscriber s1 = new TestSubscriber();
543 TestSubscriber s2 = new TestSubscriber();
544 p.subscribe(s1);
545 p.subscribe(s2);
546 s2.awaitSubscribe();
547 s1.awaitSubscribe();
548 s1.request = false;
549 p.submit(1);
550 p.submit(2);
551 p.close();
552 s2.awaitComplete();
553 assertEquals(s2.nexts, 2);
554 assertEquals(s2.completes, 1);
555 s1.awaitNext(1);
556 assertEquals(s1.nexts, 1);
557 }
558
559 /**
560 * Negative request causes error
561 */
562 public void testRequest3() {
563 SubmissionPublisher<Integer> p = basicPublisher();
564 TestSubscriber s1 = new TestSubscriber();
565 TestSubscriber s2 = new TestSubscriber();
566 p.subscribe(s1);
567 p.subscribe(s2);
568 s2.awaitSubscribe();
569 s1.awaitSubscribe();
570 s1.sn.request(-1L);
571 p.submit(1);
572 p.submit(2);
573 p.close();
574 s2.awaitComplete();
575 assertEquals(s2.nexts, 2);
576 assertEquals(s2.completes, 1);
577 s1.awaitError();
578 assertEquals(s1.errors, 1);
579 assertTrue(s1.lastError instanceof IllegalArgumentException);
580 }
581
582 /**
583 * estimateMinimumDemand reports 0 until request, nonzero after
584 * request, and zero again after delivery
585 */
586 public void testEstimateMinimumDemand() {
587 TestSubscriber s = new TestSubscriber();
588 SubmissionPublisher<Integer> p = basicPublisher();
589 s.request = false;
590 p.subscribe(s);
591 s.awaitSubscribe();
592 assertEquals(p.estimateMinimumDemand(), 0);
593 s.sn.request(1);
594 assertEquals(p.estimateMinimumDemand(), 1);
595 p.submit(1);
596 s.awaitNext(1);
597 assertEquals(p.estimateMinimumDemand(), 0);
598 }
599
600 /**
601 * Submit to a publisher with no subscribers returns lag 0
602 */
603 public void testEmptySubmit() {
604 SubmissionPublisher<Integer> p = basicPublisher();
605 assertEquals(p.submit(1), 0);
606 }
607
608 /**
609 * Submit(null) throws NPE
610 */
611 public void testNullSubmit() {
612 SubmissionPublisher<Integer> p = basicPublisher();
613 try {
614 p.submit(null);
615 } catch (NullPointerException success) {
616 }
617 }
618
619 /**
620 * Submit returns number of lagged items, compatible with result
621 * of estimateMaximumLag.
622 */
623 public void testLaggedSubmit() {
624 SubmissionPublisher<Integer> p = basicPublisher();
625 TestSubscriber s1 = new TestSubscriber();
626 s1.request = false;
627 TestSubscriber s2 = new TestSubscriber();
628 s2.request = false;
629 p.subscribe(s1);
630 p.subscribe(s2);
631 s2.awaitSubscribe();
632 s1.awaitSubscribe();
633 assertEquals(p.submit(1), 1);
634 assertTrue(p.estimateMaximumLag() >= 1);
635 assertTrue(p.submit(2) >= 2);
636 assertTrue(p.estimateMaximumLag() >= 2);
637 s1.sn.request(4);
638 assertTrue(p.submit(3) >= 3);
639 assertTrue(p.estimateMaximumLag() >= 3);
640 s2.sn.request(4);
641 p.submit(4);
642 p.close();
643 s2.awaitComplete();
644 assertEquals(s2.nexts, 4);
645 s1.awaitComplete();
646 assertEquals(s2.nexts, 4);
647 }
648
649 /**
650 * submit eventually issues requested items when buffer capacity is 1
651 */
652 public void testCap1Submit() {
653 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
654 basicExecutor, 1);
655 TestSubscriber s1 = new TestSubscriber();
656 TestSubscriber s2 = new TestSubscriber();
657 p.subscribe(s1);
658 p.subscribe(s2);
659 for (int i = 1; i <= 20; ++i) {
660 assertTrue(p.estimateMinimumDemand() <= 1);
661 assertTrue(p.submit(i) >= 0);
662 }
663 p.close();
664 s2.awaitComplete();
665 s1.awaitComplete();
666 assertEquals(s2.nexts, 20);
667 assertEquals(s2.completes, 1);
668 assertEquals(s1.nexts, 20);
669 assertEquals(s1.completes, 1);
670 }
671
672 static boolean noopHandle(AtomicInteger count) {
673 count.getAndIncrement();
674 return false;
675 }
676
677 static boolean reqHandle(AtomicInteger count, Subscriber s) {
678 count.getAndIncrement();
679 ((TestSubscriber)s).sn.request(Long.MAX_VALUE);
680 return true;
681 }
682
683 /**
684 * Offer to a publisher with no subscribers returns lag 0
685 */
686 public void testEmptyOffer() {
687 SubmissionPublisher<Integer> p = basicPublisher();
688 assertEquals(p.offer(1, null), 0);
689 }
690
691 /**
692 * Offer(null) throws NPE
693 */
694 public void testNullOffer() {
695 SubmissionPublisher<Integer> p = basicPublisher();
696 try {
697 p.offer(null, null);
698 } catch (NullPointerException success) {
699 }
700 }
701
702 /**
703 * Offer returns number of lagged items if not saturated
704 */
705 public void testLaggedOffer() {
706 SubmissionPublisher<Integer> p = basicPublisher();
707 TestSubscriber s1 = new TestSubscriber();
708 s1.request = false;
709 TestSubscriber s2 = new TestSubscriber();
710 s2.request = false;
711 p.subscribe(s1);
712 p.subscribe(s2);
713 s2.awaitSubscribe();
714 s1.awaitSubscribe();
715 assertTrue(p.offer(1, null) >= 1);
716 assertTrue(p.offer(2, null) >= 2);
717 s1.sn.request(4);
718 assertTrue(p.offer(3, null) >= 3);
719 s2.sn.request(4);
720 p.offer(4, null);
721 p.close();
722 s2.awaitComplete();
723 assertEquals(s2.nexts, 4);
724 s1.awaitComplete();
725 assertEquals(s2.nexts, 4);
726 }
727
728 /**
729 * Offer reports drops if saturated
730 */
731 public void testDroppedOffer() {
732 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
733 basicExecutor, 4);
734 TestSubscriber s1 = new TestSubscriber();
735 s1.request = false;
736 TestSubscriber s2 = new TestSubscriber();
737 s2.request = false;
738 p.subscribe(s1);
739 p.subscribe(s2);
740 s2.awaitSubscribe();
741 s1.awaitSubscribe();
742 for (int i = 1; i <= 4; ++i)
743 assertTrue(p.offer(i, null) >= 0);
744 p.offer(5, null);
745 assertTrue(p.offer(6, null) < 0);
746 s1.sn.request(64);
747 assertTrue(p.offer(7, null) < 0);
748 s2.sn.request(64);
749 p.close();
750 s2.awaitComplete();
751 assertTrue(s2.nexts >= 4);
752 s1.awaitComplete();
753 assertTrue(s1.nexts >= 4);
754 }
755
756 /**
757 * Offer invokes drop handler if saturated
758 */
759 public void testHandledDroppedOffer() {
760 AtomicInteger calls = new AtomicInteger();
761 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
762 basicExecutor, 4);
763 TestSubscriber s1 = new TestSubscriber();
764 s1.request = false;
765 TestSubscriber s2 = new TestSubscriber();
766 s2.request = false;
767 p.subscribe(s1);
768 p.subscribe(s2);
769 s2.awaitSubscribe();
770 s1.awaitSubscribe();
771 for (int i = 1; i <= 4; ++i)
772 assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
773 p.offer(4, (s, x) -> noopHandle(calls));
774 assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
775 s1.sn.request(64);
776 assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
777 s2.sn.request(64);
778 p.close();
779 s2.awaitComplete();
780 s1.awaitComplete();
781 assertTrue(calls.get() >= 4);
782 }
783
784
785 /**
786 * Offer succeeds if drop handler forces request
787 */
788 public void testRecoveredHandledDroppedOffer() {
789 AtomicInteger calls = new AtomicInteger();
790 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
791 basicExecutor, 4);
792 TestSubscriber s1 = new TestSubscriber();
793 s1.request = false;
794 TestSubscriber s2 = new TestSubscriber();
795 s2.request = false;
796 p.subscribe(s1);
797 p.subscribe(s2);
798 s2.awaitSubscribe();
799 s1.awaitSubscribe();
800 int n = 0;
801 for (int i = 1; i <= 8; ++i) {
802 int d = p.offer(i, (s, x) -> reqHandle(calls, s));
803 n = n + 2 + (d < 0 ? d : 0);
804 }
805 p.close();
806 s2.awaitComplete();
807 s1.awaitComplete();
808 assertEquals(s1.nexts + s2.nexts, n);
809 assertTrue(calls.get() >= 2);
810 }
811
812
813 /**
814 * TimedOffer to a publisher with no subscribers returns lag 0
815 */
816 public void testEmptyTimedOffer() {
817 SubmissionPublisher<Integer> p = basicPublisher();
818 assertEquals(p.offer(1, null), 0);
819 }
820
821 /**
822 * Timed Offer with null item or TimeUnit throws NPE
823 */
824 public void testNullTimedOffer() {
825 SubmissionPublisher<Integer> p = basicPublisher();
826 try {
827 p.offer(null, SHORT_DELAY_MS, MILLISECONDS, null);
828 } catch (NullPointerException success) {
829 }
830 try {
831 p.offer(1, SHORT_DELAY_MS, null, null);
832 } catch (NullPointerException success) {
833 }
834 }
835
836 /**
837 * Timed Offer returns number of lagged items if not saturated
838 */
839 public void testLaggedTimedOffer() {
840 SubmissionPublisher<Integer> p = basicPublisher();
841 TestSubscriber s1 = new TestSubscriber();
842 s1.request = false;
843 TestSubscriber s2 = new TestSubscriber();
844 s2.request = false;
845 p.subscribe(s1);
846 p.subscribe(s2);
847 s2.awaitSubscribe();
848 s1.awaitSubscribe();
849 assertTrue(p.offer(1, SHORT_DELAY_MS, MILLISECONDS, null) >= 1);
850 assertTrue(p.offer(2, SHORT_DELAY_MS, MILLISECONDS, null) >= 2);
851 s1.sn.request(4);
852 assertTrue(p.offer(3, SHORT_DELAY_MS, MILLISECONDS, null) >= 3);
853 s2.sn.request(4);
854 p.offer(4, SHORT_DELAY_MS, MILLISECONDS, null);
855 p.close();
856 s2.awaitComplete();
857 assertEquals(s2.nexts, 4);
858 s1.awaitComplete();
859 assertEquals(s2.nexts, 4);
860 }
861
862 /**
863 * Timed Offer reports drops if saturated
864 */
865 public void testDroppedTimedOffer() {
866 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
867 basicExecutor, 4);
868 TestSubscriber s1 = new TestSubscriber();
869 s1.request = false;
870 TestSubscriber s2 = new TestSubscriber();
871 s2.request = false;
872 p.subscribe(s1);
873 p.subscribe(s2);
874 s2.awaitSubscribe();
875 s1.awaitSubscribe();
876 for (int i = 1; i <= 4; ++i)
877 assertTrue(p.offer(i, SHORT_DELAY_MS, MILLISECONDS, null) >= 0);
878 p.offer(5, SHORT_DELAY_MS, MILLISECONDS, null);
879 assertTrue(p.offer(6, SHORT_DELAY_MS, MILLISECONDS, null) < 0);
880 s1.sn.request(64);
881 assertTrue(p.offer(7, SHORT_DELAY_MS, MILLISECONDS, null) < 0);
882 s2.sn.request(64);
883 p.close();
884 s2.awaitComplete();
885 assertTrue(s2.nexts >= 2);
886 s1.awaitComplete();
887 assertTrue(s1.nexts >= 2);
888 }
889
890 /**
891 * Timed Offer invokes drop handler if saturated
892 */
893 public void testHandledDroppedTimedOffer() {
894 AtomicInteger calls = new AtomicInteger();
895 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
896 basicExecutor, 4);
897 TestSubscriber s1 = new TestSubscriber();
898 s1.request = false;
899 TestSubscriber s2 = new TestSubscriber();
900 s2.request = false;
901 p.subscribe(s1);
902 p.subscribe(s2);
903 s2.awaitSubscribe();
904 s1.awaitSubscribe();
905 for (int i = 1; i <= 4; ++i)
906 assertTrue(p.offer(i, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
907 p.offer(5, (s, x) -> noopHandle(calls));
908 assertTrue(p.offer(6, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
909 s1.sn.request(64);
910 assertTrue(p.offer(7, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
911 s2.sn.request(64);
912 p.close();
913 s2.awaitComplete();
914 s1.awaitComplete();
915 assertTrue(calls.get() >= 2);
916 }
917
918 /**
919 * Timed Offer succeeds if drop handler forces request
920 */
921 public void testRecoveredHandledDroppedTimedOffer() {
922 AtomicInteger calls = new AtomicInteger();
923 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
924 basicExecutor, 4);
925 TestSubscriber s1 = new TestSubscriber();
926 s1.request = false;
927 TestSubscriber s2 = new TestSubscriber();
928 s2.request = false;
929 p.subscribe(s1);
930 p.subscribe(s2);
931 s2.awaitSubscribe();
932 s1.awaitSubscribe();
933 int n = 0;
934 for (int i = 1; i <= 8; ++i) {
935 int d = p.offer(i, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> reqHandle(calls, s));
936 n = n + 2 + (d < 0 ? d : 0);
937 }
938 p.close();
939 s2.awaitComplete();
940 s1.awaitComplete();
941 assertEquals(s1.nexts + s2.nexts, n);
942 assertTrue(calls.get() >= 2);
943 }
944
945
946 }