ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SubmissionPublisherTest.java
Revision: 1.4
Committed: Mon Sep 7 20:33:41 2015 UTC (8 years, 7 months ago) by jsr166
Branch: MAIN
Changes since 1.3: +2 -2 lines
Log Message:
testEmptyTimedOffer: actually use timed offer

File Contents

# Content
1 /*
2 * Written by Doug Lea and Martin Buchholz with assistance from
3 * members of JCP JSR-166 Expert Group and released to the public
4 * domain, as explained at
5 * http://creativecommons.org/publicdomain/zero/1.0/
6 */
7
8 import static java.util.concurrent.TimeUnit.MILLISECONDS;
9 import static java.util.concurrent.TimeUnit.SECONDS;
10
11 import java.util.concurrent.Executor;
12 import java.util.concurrent.Executors;
13 import java.util.concurrent.Flow;
14 import static java.util.concurrent.Flow.Publisher;
15 import static java.util.concurrent.Flow.Subscriber;
16 import static java.util.concurrent.Flow.Subscription;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.ForkJoinPool;
19 import java.util.concurrent.SubmissionPublisher;
20 import java.util.concurrent.ThreadFactory;
21 import java.util.concurrent.ThreadPoolExecutor;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.atomic.AtomicInteger;
24 import java.util.function.BiConsumer;
25 import java.util.function.BiPredicate;
26 import java.util.function.BiFunction;
27
28 import junit.framework.Test;
29 import junit.framework.TestSuite;
30
31 public class SubmissionPublisherTest extends JSR166TestCase {
32
33 public static void main(String[] args) {
34 main(suite(), args);
35 }
36 public static Test suite() {
37 return new TestSuite(SubmissionPublisherTest.class);
38 }
39
40 // Factory for single thread pool in case commonPool parallelism is zero
41 static final class DaemonThreadFactory implements ThreadFactory {
42 public Thread newThread(Runnable r) {
43 Thread t = new Thread(r);
44 t.setDaemon(true);
45 return t;
46 }
47 }
48
49 static final Executor basicExecutor =
50 (ForkJoinPool.getCommonPoolParallelism() > 0) ?
51 ForkJoinPool.commonPool() :
52 new ThreadPoolExecutor(1, 1, 60, SECONDS,
53 new LinkedBlockingQueue<Runnable>(),
54 new DaemonThreadFactory());
55
56 static SubmissionPublisher<Integer> basicPublisher() {
57 return new SubmissionPublisher<Integer>(basicExecutor,
58 Flow.defaultBufferSize());
59 }
60
61 static class SPException extends RuntimeException {}
62
63 class TestSubscriber implements Subscriber<Integer> {
64 volatile Subscription sn;
65 int last; // Requires that onNexts are in numeric order
66 volatile int nexts;
67 volatile int errors;
68 volatile int completes;
69 volatile boolean throwOnCall = false;
70 volatile boolean request = true;
71 volatile Throwable lastError;
72
73 public synchronized void onSubscribe(Subscription s) {
74 threadAssertTrue(sn == null);
75 sn = s;
76 notifyAll();
77 if (throwOnCall)
78 throw new SPException();
79 if (request)
80 sn.request(1L);
81 }
82 public synchronized void onNext(Integer t) {
83 ++nexts;
84 notifyAll();
85 int current = t.intValue();
86 threadAssertTrue(current >= last);
87 last = current;
88 if (request)
89 sn.request(1L);
90 if (throwOnCall)
91 throw new SPException();
92 }
93 public synchronized void onError(Throwable t) {
94 threadAssertTrue(completes == 0);
95 threadAssertTrue(errors == 0);
96 lastError = t;
97 ++errors;
98 notifyAll();
99 }
100 public synchronized void onComplete() {
101 threadAssertTrue(completes == 0);
102 ++completes;
103 notifyAll();
104 }
105
106 synchronized void awaitSubscribe() {
107 while (sn == null) {
108 try {
109 wait();
110 } catch (Exception ex) {
111 threadUnexpectedException(ex);
112 break;
113 }
114 }
115 }
116 synchronized void awaitNext(int n) {
117 while (nexts < n) {
118 try {
119 wait();
120 } catch (Exception ex) {
121 threadUnexpectedException(ex);
122 break;
123 }
124 }
125 }
126 synchronized void awaitComplete() {
127 while (completes == 0 && errors == 0) {
128 try {
129 wait();
130 } catch (Exception ex) {
131 threadUnexpectedException(ex);
132 break;
133 }
134 }
135 }
136 synchronized void awaitError() {
137 while (errors == 0) {
138 try {
139 wait();
140 } catch (Exception ex) {
141 threadUnexpectedException(ex);
142 break;
143 }
144 }
145 }
146
147 }
148
149 /**
150 * A new SubmissionPublisher has no subscribers, a non-null
151 * executor, a power-of-two capacity, is not closed, and reports
152 * zero demand and lag
153 */
154 void checkInitialState(SubmissionPublisher<?> p) {
155 assertFalse(p.hasSubscribers());
156 assertEquals(p.getNumberOfSubscribers(), 0);
157 assertTrue(p.getSubscribers().isEmpty());
158 assertFalse(p.isClosed());
159 assertNull(p.getClosedException());
160 int n = p.getMaxBufferCapacity();
161 assertTrue((n & (n - 1)) == 0); // power of two
162 assertNotNull(p.getExecutor());
163 assertEquals(p.estimateMinimumDemand(), 0);
164 assertEquals(p.estimateMaximumLag(), 0);
165 }
166
167 /**
168 * A default-constructed SubmissionPublisher has no subscribers,
169 * is not closed, has default buffer size, and uses the
170 * ForkJoinPool.commonPool executor
171 */
172 public void testConstructor1() {
173 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>();
174 checkInitialState(p);
175 assertSame(p.getExecutor(), ForkJoinPool.commonPool());
176 assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
177 }
178
179 /**
180 * A new SubmissionPublisher has no subscribers, is not closed,
181 * has the given buffer size, and uses the given executor
182 */
183 public void testConstructor2() {
184 Executor e = Executors.newFixedThreadPool(1);
185 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(e, 8);
186 checkInitialState(p);
187 assertSame(p.getExecutor(), e);
188 assertEquals(p.getMaxBufferCapacity(), 8);
189 }
190
191 /**
192 * A null Executor argument to SubmissionPublisher constructor throws NPE
193 */
194 public void testConstructor3() {
195 try {
196 new SubmissionPublisher<Integer>(null, 8);
197 shouldThrow();
198 } catch (NullPointerException success) {}
199 }
200
201 /**
202 * A negative capacity argument to SubmissionPublisher constructor
203 * throws IAE
204 */
205 public void testConstructor4() {
206 Executor e = Executors.newFixedThreadPool(1);
207 try {
208 new SubmissionPublisher<Integer>(e, -1);
209 shouldThrow();
210 } catch (IllegalArgumentException success) {}
211 }
212
213 /**
214 * A closed publisher reports isClosed with no closedException and
215 * throws ISE upon attempted submission; a subsequent close or
216 * closeExceptionally has no additional effect.
217 */
218 public void testClose() {
219 SubmissionPublisher<Integer> p = basicPublisher();
220 checkInitialState(p);
221 p.close();
222 assertTrue(p.isClosed());
223 assertNull(p.getClosedException());
224 try {
225 p.submit(1);
226 shouldThrow();
227 } catch (IllegalStateException success) {}
228 Throwable ex = new SPException();
229 p.closeExceptionally(ex);
230 assertTrue(p.isClosed());
231 assertNull(p.getClosedException());
232 }
233
234 /**
235 * A publisher closedExceptionally reports isClosed with the
236 * closedException and throws ISE upon attempted submission; a
237 * subsequent close or closeExceptionally has no additional
238 * effect.
239 */
240 public void testCloseExceptionally() {
241 SubmissionPublisher<Integer> p = basicPublisher();
242 checkInitialState(p);
243 Throwable ex = new SPException();
244 p.closeExceptionally(ex);
245 assertTrue(p.isClosed());
246 assertSame(p.getClosedException(), ex);
247 try {
248 p.submit(1);
249 shouldThrow();
250 } catch (IllegalStateException success) {}
251 p.close();
252 assertTrue(p.isClosed());
253 assertSame(p.getClosedException(), ex);
254 }
255
256 /**
257 * Upon subscription, the subscriber's onSubscribe is called, no
258 * other Subscriber methods are invoked, the publisher
259 * hasSubscribers, isSubscribed is true, and existing
260 * subscriptions are unaffected.
261 */
262 public void testSubscribe1() {
263 TestSubscriber s = new TestSubscriber();
264 SubmissionPublisher<Integer> p = basicPublisher();
265 p.subscribe(s);
266 assertTrue(p.hasSubscribers());
267 assertEquals(p.getNumberOfSubscribers(), 1);
268 assertTrue(p.getSubscribers().contains(s));
269 assertTrue(p.isSubscribed(s));
270 s.awaitSubscribe();
271 assertNotNull(s.sn);
272 assertEquals(s.nexts, 0);
273 assertEquals(s.errors, 0);
274 assertEquals(s.completes, 0);
275 TestSubscriber s2 = new TestSubscriber();
276 p.subscribe(s2);
277 assertTrue(p.hasSubscribers());
278 assertEquals(p.getNumberOfSubscribers(), 2);
279 assertTrue(p.getSubscribers().contains(s));
280 assertTrue(p.getSubscribers().contains(s2));
281 assertTrue(p.isSubscribed(s));
282 assertTrue(p.isSubscribed(s2));
283 s2.awaitSubscribe();
284 assertNotNull(s2.sn);
285 assertEquals(s2.nexts, 0);
286 assertEquals(s2.errors, 0);
287 assertEquals(s2.completes, 0);
288 }
289
290 /**
291 * If closed, upon subscription, the subscriber's onComplete
292 * method is invoked
293 */
294 public void testSubscribe2() {
295 TestSubscriber s = new TestSubscriber();
296 SubmissionPublisher<Integer> p = basicPublisher();
297 p.close();
298 p.subscribe(s);
299 s.awaitComplete();
300 assertEquals(s.nexts, 0);
301 assertEquals(s.errors, 0);
302 assertEquals(s.completes, 1);
303 }
304
305 /**
306 * If closedExceptionally, upon subscription, the subscriber's
307 * onError method is invoked
308 */
309 public void testSubscribe3() {
310 TestSubscriber s = new TestSubscriber();
311 SubmissionPublisher<Integer> p = basicPublisher();
312 Throwable ex = new SPException();
313 p.closeExceptionally(ex);
314 assertTrue(p.isClosed());
315 assertSame(p.getClosedException(), ex);
316 p.subscribe(s);
317 s.awaitError();
318 assertEquals(s.nexts, 0);
319 assertEquals(s.errors, 1);
320 }
321
322 /**
323 * Upon attempted resubscription, the subscriber's onError is
324 * called and the subscription is cancelled.
325 */
326 public void testSubscribe4() {
327 TestSubscriber s = new TestSubscriber();
328 SubmissionPublisher<Integer> p = basicPublisher();
329 p.subscribe(s);
330 assertTrue(p.hasSubscribers());
331 assertEquals(p.getNumberOfSubscribers(), 1);
332 assertTrue(p.getSubscribers().contains(s));
333 assertTrue(p.isSubscribed(s));
334 s.awaitSubscribe();
335 assertNotNull(s.sn);
336 assertEquals(s.nexts, 0);
337 assertEquals(s.errors, 0);
338 assertEquals(s.completes, 0);
339 p.subscribe(s);
340 s.awaitError();
341 assertEquals(s.nexts, 0);
342 assertEquals(s.errors, 1);
343 assertFalse(p.isSubscribed(s));
344 }
345
346 /**
347 * An exception thrown in onSubscribe causes onError
348 */
349 public void testSubscribe5() {
350 TestSubscriber s = new TestSubscriber();
351 SubmissionPublisher<Integer> p = basicPublisher();
352 s.throwOnCall = true;
353 try {
354 p.subscribe(s);
355 } catch (Exception ok) {}
356 s.awaitError();
357 assertEquals(s.nexts, 0);
358 assertEquals(s.errors, 1);
359 assertEquals(s.completes, 0);
360 }
361
362 /**
363 * subscribe(null) thows NPE
364 */
365 public void testSubscribe6() {
366 SubmissionPublisher<Integer> p = basicPublisher();
367 try {
368 p.subscribe(null);
369 shouldThrow();
370 } catch (NullPointerException success) {}
371 checkInitialState(p);
372 }
373
374 /**
375 * Closing a publisher causes onComplete to subscribers
376 */
377 public void testCloseCompletes() {
378 SubmissionPublisher<Integer> p = basicPublisher();
379 TestSubscriber s1 = new TestSubscriber();
380 TestSubscriber s2 = new TestSubscriber();
381 p.subscribe(s1);
382 p.subscribe(s2);
383 p.submit(1);
384 p.close();
385 assertTrue(p.isClosed());
386 assertNull(p.getClosedException());
387 s1.awaitComplete();
388 assertEquals(s1.nexts, 1);
389 assertEquals(s1.completes, 1);
390 s2.awaitComplete();
391 assertEquals(s2.nexts, 1);
392 assertEquals(s2.completes, 1);
393 }
394
395 /**
396 * Closing a publisher exceptionally causes onError to subscribers
397 */
398 public void testCloseExceptionallyError() {
399 SubmissionPublisher<Integer> p = basicPublisher();
400 TestSubscriber s1 = new TestSubscriber();
401 TestSubscriber s2 = new TestSubscriber();
402 p.subscribe(s1);
403 p.subscribe(s2);
404 p.submit(1);
405 p.closeExceptionally(new SPException());
406 assertTrue(p.isClosed());
407 s1.awaitError();
408 assertTrue(s1.nexts <= 1);
409 assertEquals(s1.errors, 1);
410 s2.awaitError();
411 assertTrue(s2.nexts <= 1);
412 assertEquals(s2.errors, 1);
413 }
414
415 /**
416 * Cancelling a subscription eventually causes no more onNexts to be issued
417 */
418 public void testCancel() {
419 SubmissionPublisher<Integer> p = basicPublisher();
420 TestSubscriber s1 = new TestSubscriber();
421 TestSubscriber s2 = new TestSubscriber();
422 p.subscribe(s1);
423 p.subscribe(s2);
424 s1.awaitSubscribe();
425 p.submit(1);
426 s1.sn.cancel();
427 for (int i = 2; i <= 20; ++i)
428 p.submit(i);
429 p.close();
430 s2.awaitComplete();
431 assertEquals(s2.nexts, 20);
432 assertEquals(s2.completes, 1);
433 assertTrue(s1.nexts < 20);
434 assertFalse(p.isSubscribed(s1));
435 }
436
437 /**
438 * Throwing an exception in onNext causes onError
439 */
440 public void testThrowOnNext() {
441 SubmissionPublisher<Integer> p = basicPublisher();
442 TestSubscriber s1 = new TestSubscriber();
443 TestSubscriber s2 = new TestSubscriber();
444 p.subscribe(s1);
445 p.subscribe(s2);
446 s1.awaitSubscribe();
447 p.submit(1);
448 s1.throwOnCall = true;
449 p.submit(2);
450 p.close();
451 s2.awaitComplete();
452 assertEquals(s2.nexts, 2);
453 s1.awaitComplete();
454 assertEquals(s1.errors, 1);
455 }
456
457 /**
458 * If a handler is supplied in conctructor, it is invoked when
459 * subscriber throws an exception in onNext
460 */
461 public void testThrowOnNextHandler() {
462 AtomicInteger calls = new AtomicInteger();
463 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>
464 (basicExecutor, 8,
465 (s, e) -> calls.getAndIncrement());
466 TestSubscriber s1 = new TestSubscriber();
467 TestSubscriber s2 = new TestSubscriber();
468 p.subscribe(s1);
469 p.subscribe(s2);
470 s1.awaitSubscribe();
471 p.submit(1);
472 s1.throwOnCall = true;
473 p.submit(2);
474 p.close();
475 s2.awaitComplete();
476 assertEquals(s2.nexts, 2);
477 assertEquals(s2.completes, 1);
478 s1.awaitError();
479 assertEquals(s1.errors, 1);
480 assertEquals(calls.get(), 1);
481 }
482
483 /**
484 * onNext items are issued in the same order to each subscriber
485 */
486 public void testOrder() {
487 SubmissionPublisher<Integer> p = basicPublisher();
488 TestSubscriber s1 = new TestSubscriber();
489 TestSubscriber s2 = new TestSubscriber();
490 p.subscribe(s1);
491 p.subscribe(s2);
492 for (int i = 1; i <= 20; ++i)
493 p.submit(i);
494 p.close();
495 s2.awaitComplete();
496 s1.awaitComplete();
497 assertEquals(s2.nexts, 20);
498 assertEquals(s2.completes, 1);
499 assertEquals(s1.nexts, 20);
500 assertEquals(s1.completes, 1);
501 }
502
503 /**
504 * onNext is issued only if requested
505 */
506 public void testRequest1() {
507 SubmissionPublisher<Integer> p = basicPublisher();
508 TestSubscriber s1 = new TestSubscriber();
509 s1.request = false;
510 p.subscribe(s1);
511 s1.awaitSubscribe();
512 assertTrue(p.estimateMinimumDemand() == 0);
513 TestSubscriber s2 = new TestSubscriber();
514 p.subscribe(s2);
515 p.submit(1);
516 p.submit(2);
517 s2.awaitNext(1);
518 assertEquals(s1.nexts, 0);
519 s1.sn.request(3);
520 p.submit(3);
521 p.close();
522 s2.awaitComplete();
523 assertEquals(s2.nexts, 3);
524 assertEquals(s2.completes, 1);
525 s1.awaitComplete();
526 assertTrue(s1.nexts > 0);
527 assertEquals(s1.completes, 1);
528 }
529
530 /**
531 * onNext is not issued when requests become zero
532 */
533 public void testRequest2() {
534 SubmissionPublisher<Integer> p = basicPublisher();
535 TestSubscriber s1 = new TestSubscriber();
536 TestSubscriber s2 = new TestSubscriber();
537 p.subscribe(s1);
538 p.subscribe(s2);
539 s2.awaitSubscribe();
540 s1.awaitSubscribe();
541 s1.request = false;
542 p.submit(1);
543 p.submit(2);
544 p.close();
545 s2.awaitComplete();
546 assertEquals(s2.nexts, 2);
547 assertEquals(s2.completes, 1);
548 s1.awaitNext(1);
549 assertEquals(s1.nexts, 1);
550 }
551
552 /**
553 * Negative request causes error
554 */
555 public void testRequest3() {
556 SubmissionPublisher<Integer> p = basicPublisher();
557 TestSubscriber s1 = new TestSubscriber();
558 TestSubscriber s2 = new TestSubscriber();
559 p.subscribe(s1);
560 p.subscribe(s2);
561 s2.awaitSubscribe();
562 s1.awaitSubscribe();
563 s1.sn.request(-1L);
564 p.submit(1);
565 p.submit(2);
566 p.close();
567 s2.awaitComplete();
568 assertEquals(s2.nexts, 2);
569 assertEquals(s2.completes, 1);
570 s1.awaitError();
571 assertEquals(s1.errors, 1);
572 assertTrue(s1.lastError instanceof IllegalArgumentException);
573 }
574
575 /**
576 * estimateMinimumDemand reports 0 until request, nonzero after
577 * request, and zero again after delivery
578 */
579 public void testEstimateMinimumDemand() {
580 TestSubscriber s = new TestSubscriber();
581 SubmissionPublisher<Integer> p = basicPublisher();
582 s.request = false;
583 p.subscribe(s);
584 s.awaitSubscribe();
585 assertEquals(p.estimateMinimumDemand(), 0);
586 s.sn.request(1);
587 assertEquals(p.estimateMinimumDemand(), 1);
588 p.submit(1);
589 s.awaitNext(1);
590 assertEquals(p.estimateMinimumDemand(), 0);
591 }
592
593 /**
594 * submit to a publisher with no subscribers returns lag 0
595 */
596 public void testEmptySubmit() {
597 SubmissionPublisher<Integer> p = basicPublisher();
598 assertEquals(p.submit(1), 0);
599 }
600
601 /**
602 * submit(null) throws NPE
603 */
604 public void testNullSubmit() {
605 SubmissionPublisher<Integer> p = basicPublisher();
606 try {
607 p.submit(null);
608 shouldThrow();
609 } catch (NullPointerException success) {}
610 }
611
612 /**
613 * submit returns number of lagged items, compatible with result
614 * of estimateMaximumLag.
615 */
616 public void testLaggedSubmit() {
617 SubmissionPublisher<Integer> p = basicPublisher();
618 TestSubscriber s1 = new TestSubscriber();
619 s1.request = false;
620 TestSubscriber s2 = new TestSubscriber();
621 s2.request = false;
622 p.subscribe(s1);
623 p.subscribe(s2);
624 s2.awaitSubscribe();
625 s1.awaitSubscribe();
626 assertEquals(p.submit(1), 1);
627 assertTrue(p.estimateMaximumLag() >= 1);
628 assertTrue(p.submit(2) >= 2);
629 assertTrue(p.estimateMaximumLag() >= 2);
630 s1.sn.request(4);
631 assertTrue(p.submit(3) >= 3);
632 assertTrue(p.estimateMaximumLag() >= 3);
633 s2.sn.request(4);
634 p.submit(4);
635 p.close();
636 s2.awaitComplete();
637 assertEquals(s2.nexts, 4);
638 s1.awaitComplete();
639 assertEquals(s2.nexts, 4);
640 }
641
642 /**
643 * submit eventually issues requested items when buffer capacity is 1
644 */
645 public void testCap1Submit() {
646 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
647 basicExecutor, 1);
648 TestSubscriber s1 = new TestSubscriber();
649 TestSubscriber s2 = new TestSubscriber();
650 p.subscribe(s1);
651 p.subscribe(s2);
652 for (int i = 1; i <= 20; ++i) {
653 assertTrue(p.estimateMinimumDemand() <= 1);
654 assertTrue(p.submit(i) >= 0);
655 }
656 p.close();
657 s2.awaitComplete();
658 s1.awaitComplete();
659 assertEquals(s2.nexts, 20);
660 assertEquals(s2.completes, 1);
661 assertEquals(s1.nexts, 20);
662 assertEquals(s1.completes, 1);
663 }
664
665 static boolean noopHandle(AtomicInteger count) {
666 count.getAndIncrement();
667 return false;
668 }
669
670 static boolean reqHandle(AtomicInteger count, Subscriber s) {
671 count.getAndIncrement();
672 ((TestSubscriber)s).sn.request(Long.MAX_VALUE);
673 return true;
674 }
675
676 /**
677 * offer to a publisher with no subscribers returns lag 0
678 */
679 public void testEmptyOffer() {
680 SubmissionPublisher<Integer> p = basicPublisher();
681 assertEquals(0, p.offer(1, null));
682 }
683
684 /**
685 * offer(null) throws NPE
686 */
687 public void testNullOffer() {
688 SubmissionPublisher<Integer> p = basicPublisher();
689 try {
690 p.offer(null, null);
691 shouldThrow();
692 } catch (NullPointerException success) {}
693 }
694
695 /**
696 * offer returns number of lagged items if not saturated
697 */
698 public void testLaggedOffer() {
699 SubmissionPublisher<Integer> p = basicPublisher();
700 TestSubscriber s1 = new TestSubscriber();
701 s1.request = false;
702 TestSubscriber s2 = new TestSubscriber();
703 s2.request = false;
704 p.subscribe(s1);
705 p.subscribe(s2);
706 s2.awaitSubscribe();
707 s1.awaitSubscribe();
708 assertTrue(p.offer(1, null) >= 1);
709 assertTrue(p.offer(2, null) >= 2);
710 s1.sn.request(4);
711 assertTrue(p.offer(3, null) >= 3);
712 s2.sn.request(4);
713 p.offer(4, null);
714 p.close();
715 s2.awaitComplete();
716 assertEquals(s2.nexts, 4);
717 s1.awaitComplete();
718 assertEquals(s2.nexts, 4);
719 }
720
721 /**
722 * offer reports drops if saturated
723 */
724 public void testDroppedOffer() {
725 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
726 basicExecutor, 4);
727 TestSubscriber s1 = new TestSubscriber();
728 s1.request = false;
729 TestSubscriber s2 = new TestSubscriber();
730 s2.request = false;
731 p.subscribe(s1);
732 p.subscribe(s2);
733 s2.awaitSubscribe();
734 s1.awaitSubscribe();
735 for (int i = 1; i <= 4; ++i)
736 assertTrue(p.offer(i, null) >= 0);
737 p.offer(5, null);
738 assertTrue(p.offer(6, null) < 0);
739 s1.sn.request(64);
740 assertTrue(p.offer(7, null) < 0);
741 s2.sn.request(64);
742 p.close();
743 s2.awaitComplete();
744 assertTrue(s2.nexts >= 4);
745 s1.awaitComplete();
746 assertTrue(s1.nexts >= 4);
747 }
748
749 /**
750 * offer invokes drop handler if saturated
751 */
752 public void testHandledDroppedOffer() {
753 AtomicInteger calls = new AtomicInteger();
754 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
755 basicExecutor, 4);
756 TestSubscriber s1 = new TestSubscriber();
757 s1.request = false;
758 TestSubscriber s2 = new TestSubscriber();
759 s2.request = false;
760 p.subscribe(s1);
761 p.subscribe(s2);
762 s2.awaitSubscribe();
763 s1.awaitSubscribe();
764 for (int i = 1; i <= 4; ++i)
765 assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
766 p.offer(4, (s, x) -> noopHandle(calls));
767 assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
768 s1.sn.request(64);
769 assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
770 s2.sn.request(64);
771 p.close();
772 s2.awaitComplete();
773 s1.awaitComplete();
774 assertTrue(calls.get() >= 4);
775 }
776
777
778 /**
779 * offer succeeds if drop handler forces request
780 */
781 public void testRecoveredHandledDroppedOffer() {
782 AtomicInteger calls = new AtomicInteger();
783 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
784 basicExecutor, 4);
785 TestSubscriber s1 = new TestSubscriber();
786 s1.request = false;
787 TestSubscriber s2 = new TestSubscriber();
788 s2.request = false;
789 p.subscribe(s1);
790 p.subscribe(s2);
791 s2.awaitSubscribe();
792 s1.awaitSubscribe();
793 int n = 0;
794 for (int i = 1; i <= 8; ++i) {
795 int d = p.offer(i, (s, x) -> reqHandle(calls, s));
796 n = n + 2 + (d < 0 ? d : 0);
797 }
798 p.close();
799 s2.awaitComplete();
800 s1.awaitComplete();
801 assertEquals(s1.nexts + s2.nexts, n);
802 assertTrue(calls.get() >= 2);
803 }
804
805
806 /**
807 * Timed offer to a publisher with no subscribers returns lag 0
808 */
809 public void testEmptyTimedOffer() {
810 SubmissionPublisher<Integer> p = basicPublisher();
811 assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null));
812 }
813
814 /**
815 * Timed offer with null item or TimeUnit throws NPE
816 */
817 public void testNullTimedOffer() {
818 SubmissionPublisher<Integer> p = basicPublisher();
819 try {
820 p.offer(null, SHORT_DELAY_MS, MILLISECONDS, null);
821 shouldThrow();
822 } catch (NullPointerException success) {}
823 try {
824 p.offer(1, SHORT_DELAY_MS, null, null);
825 shouldThrow();
826 } catch (NullPointerException success) {}
827 }
828
829 /**
830 * Timed offer returns number of lagged items if not saturated
831 */
832 public void testLaggedTimedOffer() {
833 SubmissionPublisher<Integer> p = basicPublisher();
834 TestSubscriber s1 = new TestSubscriber();
835 s1.request = false;
836 TestSubscriber s2 = new TestSubscriber();
837 s2.request = false;
838 p.subscribe(s1);
839 p.subscribe(s2);
840 s2.awaitSubscribe();
841 s1.awaitSubscribe();
842 assertTrue(p.offer(1, SHORT_DELAY_MS, MILLISECONDS, null) >= 1);
843 assertTrue(p.offer(2, SHORT_DELAY_MS, MILLISECONDS, null) >= 2);
844 s1.sn.request(4);
845 assertTrue(p.offer(3, SHORT_DELAY_MS, MILLISECONDS, null) >= 3);
846 s2.sn.request(4);
847 p.offer(4, SHORT_DELAY_MS, MILLISECONDS, null);
848 p.close();
849 s2.awaitComplete();
850 assertEquals(s2.nexts, 4);
851 s1.awaitComplete();
852 assertEquals(s2.nexts, 4);
853 }
854
855 /**
856 * Timed offer reports drops if saturated
857 */
858 public void testDroppedTimedOffer() {
859 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
860 basicExecutor, 4);
861 TestSubscriber s1 = new TestSubscriber();
862 s1.request = false;
863 TestSubscriber s2 = new TestSubscriber();
864 s2.request = false;
865 p.subscribe(s1);
866 p.subscribe(s2);
867 s2.awaitSubscribe();
868 s1.awaitSubscribe();
869 for (int i = 1; i <= 4; ++i)
870 assertTrue(p.offer(i, SHORT_DELAY_MS, MILLISECONDS, null) >= 0);
871 p.offer(5, SHORT_DELAY_MS, MILLISECONDS, null);
872 assertTrue(p.offer(6, SHORT_DELAY_MS, MILLISECONDS, null) < 0);
873 s1.sn.request(64);
874 assertTrue(p.offer(7, SHORT_DELAY_MS, MILLISECONDS, null) < 0);
875 s2.sn.request(64);
876 p.close();
877 s2.awaitComplete();
878 assertTrue(s2.nexts >= 2);
879 s1.awaitComplete();
880 assertTrue(s1.nexts >= 2);
881 }
882
883 /**
884 * Timed offer invokes drop handler if saturated
885 */
886 public void testHandledDroppedTimedOffer() {
887 AtomicInteger calls = new AtomicInteger();
888 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
889 basicExecutor, 4);
890 TestSubscriber s1 = new TestSubscriber();
891 s1.request = false;
892 TestSubscriber s2 = new TestSubscriber();
893 s2.request = false;
894 p.subscribe(s1);
895 p.subscribe(s2);
896 s2.awaitSubscribe();
897 s1.awaitSubscribe();
898 for (int i = 1; i <= 4; ++i)
899 assertTrue(p.offer(i, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
900 p.offer(5, (s, x) -> noopHandle(calls));
901 assertTrue(p.offer(6, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
902 s1.sn.request(64);
903 assertTrue(p.offer(7, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
904 s2.sn.request(64);
905 p.close();
906 s2.awaitComplete();
907 s1.awaitComplete();
908 assertTrue(calls.get() >= 2);
909 }
910
911 /**
912 * Timed offer succeeds if drop handler forces request
913 */
914 public void testRecoveredHandledDroppedTimedOffer() {
915 AtomicInteger calls = new AtomicInteger();
916 SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
917 basicExecutor, 4);
918 TestSubscriber s1 = new TestSubscriber();
919 s1.request = false;
920 TestSubscriber s2 = new TestSubscriber();
921 s2.request = false;
922 p.subscribe(s1);
923 p.subscribe(s2);
924 s2.awaitSubscribe();
925 s1.awaitSubscribe();
926 int n = 0;
927 for (int i = 1; i <= 8; ++i) {
928 int d = p.offer(i, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> reqHandle(calls, s));
929 n = n + 2 + (d < 0 ? d : 0);
930 }
931 p.close();
932 s2.awaitComplete();
933 s1.awaitComplete();
934 assertEquals(s1.nexts + s2.nexts, n);
935 assertTrue(calls.get() >= 2);
936 }
937
938
939 }