ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SubmissionPublisherTest.java
Revision: 1.31
Committed: Tue Jan 26 13:33:06 2021 UTC (3 years, 3 months ago) by dl
Branch: MAIN
CVS Tags: HEAD
Changes since 1.30: +1 -1 lines
Log Message:
Replace Integer with Item class

File Contents

# Content
1 /*
2 * Written by Doug Lea and Martin Buchholz with assistance from
3 * members of JCP JSR-166 Expert Group and released to the public
4 * domain, as explained at
5 * http://creativecommons.org/publicdomain/zero/1.0/
6 */
7
8 import java.util.concurrent.CompletableFuture;
9 import java.util.concurrent.CountDownLatch;
10 import java.util.concurrent.Executor;
11 import java.util.concurrent.Executors;
12 import java.util.concurrent.Flow;
13 import java.util.concurrent.ForkJoinPool;
14 import java.util.concurrent.SubmissionPublisher;
15 import java.util.concurrent.atomic.AtomicInteger;
16 import junit.framework.Test;
17 import junit.framework.TestSuite;
18
19 import static java.util.concurrent.Flow.Subscriber;
20 import static java.util.concurrent.Flow.Subscription;
21 import static java.util.concurrent.TimeUnit.MILLISECONDS;
22
23 public class SubmissionPublisherTest extends JSR166TestCase {
24
25 public static void main(String[] args) {
26 main(suite(), args);
27 }
28 public static Test suite() {
29 return new TestSuite(SubmissionPublisherTest.class);
30 }
31
32 final Executor basicExecutor = basicPublisher().getExecutor();
33
34 static SubmissionPublisher<Integer> basicPublisher() {
35 return new SubmissionPublisher<Integer>();
36 }
37
38 static class SPException extends RuntimeException {}
39
40 class TestSubscriber implements Subscriber<Integer> {
41 volatile Subscription sn;
42 int last; // Requires that onNexts are in numeric order
43 volatile int nexts;
44 volatile int errors;
45 volatile int completes;
46 volatile boolean throwOnCall = false;
47 volatile boolean request = true;
48 volatile Throwable lastError;
49
50 public synchronized void onSubscribe(Subscription s) {
51 threadAssertTrue(sn == null);
52 sn = s;
53 notifyAll();
54 if (throwOnCall)
55 throw new SPException();
56 if (request)
57 sn.request(1L);
58 }
59 public synchronized void onNext(Integer t) {
60 ++nexts;
61 notifyAll();
62 int current = t.intValue();
63 threadAssertTrue(current >= last);
64 last = current;
65 if (request)
66 sn.request(1L);
67 if (throwOnCall)
68 throw new SPException();
69 }
70 public synchronized void onError(Throwable t) {
71 threadAssertTrue(completes == 0);
72 threadAssertTrue(errors == 0);
73 lastError = t;
74 ++errors;
75 notifyAll();
76 }
77 public synchronized void onComplete() {
78 threadAssertTrue(completes == 0);
79 ++completes;
80 notifyAll();
81 }
82
83 synchronized void awaitSubscribe() {
84 while (sn == null) {
85 try {
86 wait();
87 } catch (Exception ex) {
88 threadUnexpectedException(ex);
89 break;
90 }
91 }
92 }
93 synchronized void awaitNext(int n) {
94 while (nexts < n) {
95 try {
96 wait();
97 } catch (Exception ex) {
98 threadUnexpectedException(ex);
99 break;
100 }
101 }
102 }
103 synchronized void awaitComplete() {
104 while (completes == 0 && errors == 0) {
105 try {
106 wait();
107 } catch (Exception ex) {
108 threadUnexpectedException(ex);
109 break;
110 }
111 }
112 }
113 synchronized void awaitError() {
114 while (errors == 0) {
115 try {
116 wait();
117 } catch (Exception ex) {
118 threadUnexpectedException(ex);
119 break;
120 }
121 }
122 }
123
124 }
125
126 /**
127 * A new SubmissionPublisher has no subscribers, a non-null
128 * executor, a power-of-two capacity, is not closed, and reports
129 * zero demand and lag
130 */
131 void checkInitialState(SubmissionPublisher<?> p) {
132 assertFalse(p.hasSubscribers());
133 assertEquals(0, p.getNumberOfSubscribers());
134 assertTrue(p.getSubscribers().isEmpty());
135 assertFalse(p.isClosed());
136 assertNull(p.getClosedException());
137 int n = p.getMaxBufferCapacity();
138 assertTrue((n & (n - 1)) == 0); // power of two
139 assertNotNull(p.getExecutor());
140 assertEquals(0, p.estimateMinimumDemand());
141 assertEquals(0, p.estimateMaximumLag());
142 }
143
144 /**
145 * A default-constructed SubmissionPublisher has no subscribers,
146 * is not closed, has default buffer size, and uses the
147 * defaultExecutor
148 */
149 public void testConstructor1() {
150 SubmissionPublisher<Integer> p = new SubmissionPublisher<>();
151 checkInitialState(p);
152 assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
153 Executor e = p.getExecutor(), c = ForkJoinPool.commonPool();
154 if (ForkJoinPool.getCommonPoolParallelism() > 1)
155 assertSame(e, c);
156 else
157 assertNotSame(e, c);
158 }
159
160 /**
161 * A new SubmissionPublisher has no subscribers, is not closed,
162 * has the given buffer size, and uses the given executor
163 */
164 public void testConstructor2() {
165 Executor e = Executors.newFixedThreadPool(1);
166 SubmissionPublisher<Integer> p = new SubmissionPublisher<>(e, 8);
167 checkInitialState(p);
168 assertSame(p.getExecutor(), e);
169 assertEquals(8, p.getMaxBufferCapacity());
170 }
171
172 /**
173 * A null Executor argument to SubmissionPublisher constructor
174 * throws NullPointerException
175 */
176 public void testConstructor3() {
177 try {
178 new SubmissionPublisher<Integer>(null, 8);
179 shouldThrow();
180 } catch (NullPointerException success) {}
181 }
182
183 /**
184 * A negative capacity argument to SubmissionPublisher constructor
185 * throws IllegalArgumentException
186 */
187 public void testConstructor4() {
188 Executor e = Executors.newFixedThreadPool(1);
189 try {
190 new SubmissionPublisher<Integer>(e, -1);
191 shouldThrow();
192 } catch (IllegalArgumentException success) {}
193 }
194
195 /**
196 * A closed publisher reports isClosed with no closedException and
197 * throws IllegalStateException upon attempted submission; a
198 * subsequent close or closeExceptionally has no additional
199 * effect.
200 */
201 public void testClose() {
202 SubmissionPublisher<Integer> p = basicPublisher();
203 checkInitialState(p);
204 p.close();
205 assertTrue(p.isClosed());
206 assertNull(p.getClosedException());
207 try {
208 p.submit(1);
209 shouldThrow();
210 } catch (IllegalStateException success) {}
211 Throwable ex = new SPException();
212 p.closeExceptionally(ex);
213 assertTrue(p.isClosed());
214 assertNull(p.getClosedException());
215 }
216
217 /**
218 * A publisher closedExceptionally reports isClosed with the
219 * closedException and throws IllegalStateException upon attempted
220 * submission; a subsequent close or closeExceptionally has no
221 * additional effect.
222 */
223 public void testCloseExceptionally() {
224 SubmissionPublisher<Integer> p = basicPublisher();
225 checkInitialState(p);
226 Throwable ex = new SPException();
227 p.closeExceptionally(ex);
228 assertTrue(p.isClosed());
229 assertSame(p.getClosedException(), ex);
230 try {
231 p.submit(1);
232 shouldThrow();
233 } catch (IllegalStateException success) {}
234 p.close();
235 assertTrue(p.isClosed());
236 assertSame(p.getClosedException(), ex);
237 }
238
239 /**
240 * Upon subscription, the subscriber's onSubscribe is called, no
241 * other Subscriber methods are invoked, the publisher
242 * hasSubscribers, isSubscribed is true, and existing
243 * subscriptions are unaffected.
244 */
245 public void testSubscribe1() {
246 TestSubscriber s = new TestSubscriber();
247 SubmissionPublisher<Integer> p = basicPublisher();
248 p.subscribe(s);
249 assertTrue(p.hasSubscribers());
250 assertEquals(1, p.getNumberOfSubscribers());
251 assertTrue(p.getSubscribers().contains(s));
252 assertTrue(p.isSubscribed(s));
253 s.awaitSubscribe();
254 assertNotNull(s.sn);
255 assertEquals(0, s.nexts);
256 assertEquals(0, s.errors);
257 assertEquals(0, s.completes);
258 TestSubscriber s2 = new TestSubscriber();
259 p.subscribe(s2);
260 assertTrue(p.hasSubscribers());
261 assertEquals(2, p.getNumberOfSubscribers());
262 assertTrue(p.getSubscribers().contains(s));
263 assertTrue(p.getSubscribers().contains(s2));
264 assertTrue(p.isSubscribed(s));
265 assertTrue(p.isSubscribed(s2));
266 s2.awaitSubscribe();
267 assertNotNull(s2.sn);
268 assertEquals(0, s2.nexts);
269 assertEquals(0, s2.errors);
270 assertEquals(0, s2.completes);
271 p.close();
272 }
273
274 /**
275 * If closed, upon subscription, the subscriber's onComplete
276 * method is invoked
277 */
278 public void testSubscribe2() {
279 TestSubscriber s = new TestSubscriber();
280 SubmissionPublisher<Integer> p = basicPublisher();
281 p.close();
282 p.subscribe(s);
283 s.awaitComplete();
284 assertEquals(0, s.nexts);
285 assertEquals(0, s.errors);
286 assertEquals(1, s.completes, 1);
287 }
288
289 /**
290 * If closedExceptionally, upon subscription, the subscriber's
291 * onError method is invoked
292 */
293 public void testSubscribe3() {
294 TestSubscriber s = new TestSubscriber();
295 SubmissionPublisher<Integer> p = basicPublisher();
296 Throwable ex = new SPException();
297 p.closeExceptionally(ex);
298 assertTrue(p.isClosed());
299 assertSame(p.getClosedException(), ex);
300 p.subscribe(s);
301 s.awaitError();
302 assertEquals(0, s.nexts);
303 assertEquals(1, s.errors);
304 }
305
306 /**
307 * Upon attempted resubscription, the subscriber's onError is
308 * called and the subscription is cancelled.
309 */
310 public void testSubscribe4() {
311 TestSubscriber s = new TestSubscriber();
312 SubmissionPublisher<Integer> p = basicPublisher();
313 p.subscribe(s);
314 assertTrue(p.hasSubscribers());
315 assertEquals(1, p.getNumberOfSubscribers());
316 assertTrue(p.getSubscribers().contains(s));
317 assertTrue(p.isSubscribed(s));
318 s.awaitSubscribe();
319 assertNotNull(s.sn);
320 assertEquals(0, s.nexts);
321 assertEquals(0, s.errors);
322 assertEquals(0, s.completes);
323 p.subscribe(s);
324 s.awaitError();
325 assertEquals(0, s.nexts);
326 assertEquals(1, s.errors);
327 assertFalse(p.isSubscribed(s));
328 }
329
330 /**
331 * An exception thrown in onSubscribe causes onError
332 */
333 public void testSubscribe5() {
334 TestSubscriber s = new TestSubscriber();
335 SubmissionPublisher<Integer> p = basicPublisher();
336 s.throwOnCall = true;
337 p.subscribe(s);
338 s.awaitError();
339 assertEquals(0, s.nexts);
340 assertEquals(1, s.errors);
341 assertEquals(0, s.completes);
342 }
343
344 /**
345 * subscribe(null) throws NPE
346 */
347 public void testSubscribe6() {
348 SubmissionPublisher<Integer> p = basicPublisher();
349 try {
350 p.subscribe(null);
351 shouldThrow();
352 } catch (NullPointerException success) {}
353 checkInitialState(p);
354 }
355
356 /**
357 * Closing a publisher causes onComplete to subscribers
358 */
359 public void testCloseCompletes() {
360 SubmissionPublisher<Integer> p = basicPublisher();
361 TestSubscriber s1 = new TestSubscriber();
362 TestSubscriber s2 = new TestSubscriber();
363 p.subscribe(s1);
364 p.subscribe(s2);
365 p.submit(1);
366 p.close();
367 assertTrue(p.isClosed());
368 assertNull(p.getClosedException());
369 s1.awaitComplete();
370 assertEquals(1, s1.nexts);
371 assertEquals(1, s1.completes);
372 s2.awaitComplete();
373 assertEquals(1, s2.nexts);
374 assertEquals(1, s2.completes);
375 }
376
377 /**
378 * Closing a publisher exceptionally causes onError to subscribers
379 * after they are subscribed
380 */
381 public void testCloseExceptionallyError() {
382 SubmissionPublisher<Integer> p = basicPublisher();
383 TestSubscriber s1 = new TestSubscriber();
384 TestSubscriber s2 = new TestSubscriber();
385 p.subscribe(s1);
386 p.subscribe(s2);
387 p.submit(1);
388 p.closeExceptionally(new SPException());
389 assertTrue(p.isClosed());
390 s1.awaitSubscribe();
391 s1.awaitError();
392 assertTrue(s1.nexts <= 1);
393 assertEquals(1, s1.errors);
394 s2.awaitSubscribe();
395 s2.awaitError();
396 assertTrue(s2.nexts <= 1);
397 assertEquals(1, s2.errors);
398 }
399
400 /**
401 * Cancelling a subscription eventually causes no more onNexts to be issued
402 */
403 public void testCancel() {
404 SubmissionPublisher<Integer> p =
405 new SubmissionPublisher<>(basicExecutor, 4); // must be < 20
406 TestSubscriber s1 = new TestSubscriber();
407 TestSubscriber s2 = new TestSubscriber();
408 p.subscribe(s1);
409 p.subscribe(s2);
410 s1.awaitSubscribe();
411 p.submit(1);
412 s1.sn.cancel();
413 for (int i = 2; i <= 20; ++i)
414 p.submit(i);
415 p.close();
416 s2.awaitComplete();
417 assertEquals(20, s2.nexts);
418 assertEquals(1, s2.completes);
419 assertTrue(s1.nexts < 20);
420 assertFalse(p.isSubscribed(s1));
421 }
422
423 /**
424 * Throwing an exception in onNext causes onError
425 */
426 public void testThrowOnNext() {
427 SubmissionPublisher<Integer> p = basicPublisher();
428 TestSubscriber s1 = new TestSubscriber();
429 TestSubscriber s2 = new TestSubscriber();
430 p.subscribe(s1);
431 p.subscribe(s2);
432 s1.awaitSubscribe();
433 p.submit(1);
434 s1.throwOnCall = true;
435 p.submit(2);
436 p.close();
437 s2.awaitComplete();
438 assertEquals(2, s2.nexts);
439 s1.awaitComplete();
440 assertEquals(1, s1.errors);
441 }
442
443 /**
444 * If a handler is supplied in constructor, it is invoked when
445 * subscriber throws an exception in onNext
446 */
447 public void testThrowOnNextHandler() {
448 AtomicInteger calls = new AtomicInteger();
449 SubmissionPublisher<Integer> p = new SubmissionPublisher<>(
450 basicExecutor, 8, (s, e) -> calls.getAndIncrement());
451 TestSubscriber s1 = new TestSubscriber();
452 TestSubscriber s2 = new TestSubscriber();
453 p.subscribe(s1);
454 p.subscribe(s2);
455 s1.awaitSubscribe();
456 p.submit(1);
457 s1.throwOnCall = true;
458 p.submit(2);
459 p.close();
460 s2.awaitComplete();
461 assertEquals(2, s2.nexts);
462 assertEquals(1, s2.completes);
463 s1.awaitError();
464 assertEquals(1, s1.errors);
465 assertEquals(1, calls.get());
466 }
467
468 /**
469 * onNext items are issued in the same order to each subscriber
470 */
471 public void testOrder() {
472 SubmissionPublisher<Integer> p = basicPublisher();
473 TestSubscriber s1 = new TestSubscriber();
474 TestSubscriber s2 = new TestSubscriber();
475 p.subscribe(s1);
476 p.subscribe(s2);
477 for (int i = 1; i <= 20; ++i)
478 p.submit(i);
479 p.close();
480 s2.awaitComplete();
481 s1.awaitComplete();
482 assertEquals(20, s2.nexts);
483 assertEquals(1, s2.completes);
484 assertEquals(20, s1.nexts);
485 assertEquals(1, s1.completes);
486 }
487
488 /**
489 * onNext is issued only if requested
490 */
491 public void testRequest1() {
492 SubmissionPublisher<Integer> p = basicPublisher();
493 TestSubscriber s1 = new TestSubscriber();
494 s1.request = false;
495 p.subscribe(s1);
496 s1.awaitSubscribe();
497 assertEquals(0, p.estimateMinimumDemand());
498 TestSubscriber s2 = new TestSubscriber();
499 p.subscribe(s2);
500 p.submit(1);
501 p.submit(2);
502 s2.awaitNext(1);
503 assertEquals(0, s1.nexts);
504 s1.sn.request(3);
505 p.submit(3);
506 p.close();
507 s2.awaitComplete();
508 assertEquals(3, s2.nexts);
509 assertEquals(1, s2.completes);
510 s1.awaitComplete();
511 assertTrue(s1.nexts > 0);
512 assertEquals(1, s1.completes);
513 }
514
515 /**
516 * onNext is not issued when requests become zero
517 */
518 public void testRequest2() {
519 SubmissionPublisher<Integer> p = basicPublisher();
520 TestSubscriber s1 = new TestSubscriber();
521 TestSubscriber s2 = new TestSubscriber();
522 p.subscribe(s1);
523 p.subscribe(s2);
524 s2.awaitSubscribe();
525 s1.awaitSubscribe();
526 s1.request = false;
527 p.submit(1);
528 p.submit(2);
529 p.close();
530 s2.awaitComplete();
531 assertEquals(2, s2.nexts);
532 assertEquals(1, s2.completes);
533 s1.awaitNext(1);
534 assertEquals(1, s1.nexts);
535 }
536
537 /**
538 * Non-positive request causes error
539 */
540 public void testRequest3() {
541 SubmissionPublisher<Integer> p = basicPublisher();
542 TestSubscriber s1 = new TestSubscriber();
543 TestSubscriber s2 = new TestSubscriber();
544 TestSubscriber s3 = new TestSubscriber();
545 p.subscribe(s1);
546 p.subscribe(s2);
547 p.subscribe(s3);
548 s3.awaitSubscribe();
549 s2.awaitSubscribe();
550 s1.awaitSubscribe();
551 s1.sn.request(-1L);
552 s3.sn.request(0L);
553 p.submit(1);
554 p.submit(2);
555 p.close();
556 s2.awaitComplete();
557 assertEquals(2, s2.nexts);
558 assertEquals(1, s2.completes);
559 s1.awaitError();
560 assertEquals(1, s1.errors);
561 assertTrue(s1.lastError instanceof IllegalArgumentException);
562 s3.awaitError();
563 assertEquals(1, s3.errors);
564 assertTrue(s3.lastError instanceof IllegalArgumentException);
565 }
566
567 /**
568 * estimateMinimumDemand reports 0 until request, nonzero after
569 * request
570 */
571 public void testEstimateMinimumDemand() {
572 TestSubscriber s = new TestSubscriber();
573 SubmissionPublisher<Integer> p = basicPublisher();
574 s.request = false;
575 p.subscribe(s);
576 s.awaitSubscribe();
577 assertEquals(0, p.estimateMinimumDemand());
578 s.sn.request(1);
579 assertEquals(1, p.estimateMinimumDemand());
580 }
581
582 /**
583 * submit to a publisher with no subscribers returns lag 0
584 */
585 public void testEmptySubmit() {
586 SubmissionPublisher<Integer> p = basicPublisher();
587 assertEquals(0, p.submit(1));
588 }
589
590 /**
591 * submit(null) throws NPE
592 */
593 public void testNullSubmit() {
594 SubmissionPublisher<Integer> p = basicPublisher();
595 try {
596 p.submit(null);
597 shouldThrow();
598 } catch (NullPointerException success) {}
599 }
600
601 /**
602 * submit returns number of lagged items, compatible with result
603 * of estimateMaximumLag.
604 */
605 public void testLaggedSubmit() {
606 SubmissionPublisher<Integer> p = basicPublisher();
607 TestSubscriber s1 = new TestSubscriber();
608 s1.request = false;
609 TestSubscriber s2 = new TestSubscriber();
610 s2.request = false;
611 p.subscribe(s1);
612 p.subscribe(s2);
613 s2.awaitSubscribe();
614 s1.awaitSubscribe();
615 assertEquals(1, p.submit(1));
616 assertTrue(p.estimateMaximumLag() >= 1);
617 assertTrue(p.submit(2) >= 2);
618 assertTrue(p.estimateMaximumLag() >= 2);
619 s1.sn.request(4);
620 assertTrue(p.submit(3) >= 3);
621 assertTrue(p.estimateMaximumLag() >= 3);
622 s2.sn.request(4);
623 p.submit(4);
624 p.close();
625 s2.awaitComplete();
626 assertEquals(4, s2.nexts);
627 s1.awaitComplete();
628 assertEquals(4, s2.nexts);
629 }
630
631 /**
632 * submit eventually issues requested items when buffer capacity is 1
633 */
634 public void testCap1Submit() {
635 SubmissionPublisher<Integer> p
636 = new SubmissionPublisher<>(basicExecutor, 1);
637 TestSubscriber s1 = new TestSubscriber();
638 TestSubscriber s2 = new TestSubscriber();
639 p.subscribe(s1);
640 p.subscribe(s2);
641 for (int i = 1; i <= 20; ++i) {
642 assertTrue(p.submit(i) >= 0);
643 }
644 p.close();
645 s2.awaitComplete();
646 s1.awaitComplete();
647 assertEquals(20, s2.nexts);
648 assertEquals(1, s2.completes);
649 assertEquals(20, s1.nexts);
650 assertEquals(1, s1.completes);
651 }
652
653 static boolean noopHandle(AtomicInteger count) {
654 count.getAndIncrement();
655 return false;
656 }
657
658 static boolean reqHandle(AtomicInteger count, Subscriber<?> s) {
659 count.getAndIncrement();
660 ((TestSubscriber)s).sn.request(Long.MAX_VALUE);
661 return true;
662 }
663
664 /**
665 * offer to a publisher with no subscribers returns lag 0
666 */
667 public void testEmptyOffer() {
668 SubmissionPublisher<Integer> p = basicPublisher();
669 assertEquals(0, p.offer(1, null));
670 }
671
672 /**
673 * offer(null) throws NPE
674 */
675 public void testNullOffer() {
676 SubmissionPublisher<Integer> p = basicPublisher();
677 try {
678 p.offer(null, null);
679 shouldThrow();
680 } catch (NullPointerException success) {}
681 }
682
683 /**
684 * offer returns number of lagged items if not saturated
685 */
686 public void testLaggedOffer() {
687 SubmissionPublisher<Integer> p = basicPublisher();
688 TestSubscriber s1 = new TestSubscriber();
689 s1.request = false;
690 TestSubscriber s2 = new TestSubscriber();
691 s2.request = false;
692 p.subscribe(s1);
693 p.subscribe(s2);
694 s2.awaitSubscribe();
695 s1.awaitSubscribe();
696 assertTrue(p.offer(1, null) >= 1);
697 assertTrue(p.offer(2, null) >= 2);
698 s1.sn.request(4);
699 assertTrue(p.offer(3, null) >= 3);
700 s2.sn.request(4);
701 p.offer(4, null);
702 p.close();
703 s2.awaitComplete();
704 assertEquals(4, s2.nexts);
705 s1.awaitComplete();
706 assertEquals(4, s2.nexts);
707 }
708
709 /**
710 * offer reports drops if saturated
711 */
712 public void testDroppedOffer() {
713 SubmissionPublisher<Integer> p
714 = new SubmissionPublisher<>(basicExecutor, 4);
715 TestSubscriber s1 = new TestSubscriber();
716 s1.request = false;
717 TestSubscriber s2 = new TestSubscriber();
718 s2.request = false;
719 p.subscribe(s1);
720 p.subscribe(s2);
721 s2.awaitSubscribe();
722 s1.awaitSubscribe();
723 for (int i = 1; i <= 4; ++i)
724 assertTrue(p.offer(i, null) >= 0);
725 p.offer(5, null);
726 assertTrue(p.offer(6, null) < 0);
727 s1.sn.request(64);
728 assertTrue(p.offer(7, null) < 0);
729 s2.sn.request(64);
730 p.close();
731 s2.awaitComplete();
732 assertTrue(s2.nexts >= 4);
733 s1.awaitComplete();
734 assertTrue(s1.nexts >= 4);
735 }
736
737 /**
738 * offer invokes drop handler if saturated
739 */
740 public void testHandledDroppedOffer() {
741 AtomicInteger calls = new AtomicInteger();
742 SubmissionPublisher<Integer> p
743 = new SubmissionPublisher<>(basicExecutor, 4);
744 TestSubscriber s1 = new TestSubscriber();
745 s1.request = false;
746 TestSubscriber s2 = new TestSubscriber();
747 s2.request = false;
748 p.subscribe(s1);
749 p.subscribe(s2);
750 s2.awaitSubscribe();
751 s1.awaitSubscribe();
752 for (int i = 1; i <= 4; ++i)
753 assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
754 p.offer(4, (s, x) -> noopHandle(calls));
755 assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
756 s1.sn.request(64);
757 assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
758 s2.sn.request(64);
759 p.close();
760 s2.awaitComplete();
761 s1.awaitComplete();
762 assertTrue(calls.get() >= 4);
763 }
764
765 /**
766 * offer succeeds if drop handler forces request
767 */
768 public void testRecoveredHandledDroppedOffer() {
769 AtomicInteger calls = new AtomicInteger();
770 SubmissionPublisher<Integer> p
771 = new SubmissionPublisher<>(basicExecutor, 4);
772 TestSubscriber s1 = new TestSubscriber();
773 s1.request = false;
774 TestSubscriber s2 = new TestSubscriber();
775 s2.request = false;
776 p.subscribe(s1);
777 p.subscribe(s2);
778 s2.awaitSubscribe();
779 s1.awaitSubscribe();
780 int n = 0;
781 for (int i = 1; i <= 8; ++i) {
782 int d = p.offer(i, (s, x) -> reqHandle(calls, s));
783 n = n + 2 + (d < 0 ? d : 0);
784 }
785 p.close();
786 s2.awaitComplete();
787 s1.awaitComplete();
788 assertEquals(n, s1.nexts + s2.nexts);
789 assertTrue(calls.get() >= 2);
790 }
791
792 /**
793 * Timed offer to a publisher with no subscribers returns lag 0
794 */
795 public void testEmptyTimedOffer() {
796 SubmissionPublisher<Integer> p = basicPublisher();
797 long startTime = System.nanoTime();
798 assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null));
799 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
800 }
801
802 /**
803 * Timed offer with null item or TimeUnit throws NPE
804 */
805 public void testNullTimedOffer() {
806 SubmissionPublisher<Integer> p = basicPublisher();
807 long startTime = System.nanoTime();
808 try {
809 p.offer(null, LONG_DELAY_MS, MILLISECONDS, null);
810 shouldThrow();
811 } catch (NullPointerException success) {}
812 try {
813 p.offer(1, LONG_DELAY_MS, null, null);
814 shouldThrow();
815 } catch (NullPointerException success) {}
816 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
817 }
818
819 /**
820 * Timed offer returns number of lagged items if not saturated
821 */
822 public void testLaggedTimedOffer() {
823 SubmissionPublisher<Integer> p = basicPublisher();
824 TestSubscriber s1 = new TestSubscriber();
825 s1.request = false;
826 TestSubscriber s2 = new TestSubscriber();
827 s2.request = false;
828 p.subscribe(s1);
829 p.subscribe(s2);
830 s2.awaitSubscribe();
831 s1.awaitSubscribe();
832 long startTime = System.nanoTime();
833 assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
834 assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
835 s1.sn.request(4);
836 assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
837 s2.sn.request(4);
838 p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
839 p.close();
840 s2.awaitComplete();
841 assertEquals(4, s2.nexts);
842 s1.awaitComplete();
843 assertEquals(4, s2.nexts);
844 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
845 }
846
847 /**
848 * Timed offer reports drops if saturated
849 */
850 public void testDroppedTimedOffer() {
851 SubmissionPublisher<Integer> p
852 = new SubmissionPublisher<>(basicExecutor, 4);
853 TestSubscriber s1 = new TestSubscriber();
854 s1.request = false;
855 TestSubscriber s2 = new TestSubscriber();
856 s2.request = false;
857 p.subscribe(s1);
858 p.subscribe(s2);
859 s2.awaitSubscribe();
860 s1.awaitSubscribe();
861 long delay = timeoutMillis();
862 for (int i = 1; i <= 4; ++i)
863 assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0);
864 long startTime = System.nanoTime();
865 assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0);
866 s1.sn.request(64);
867 assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0);
868 // 2 * delay should elapse but check only 1 * delay to allow timer slop
869 assertTrue(millisElapsedSince(startTime) >= delay);
870 s2.sn.request(64);
871 p.close();
872 s2.awaitComplete();
873 assertTrue(s2.nexts >= 2);
874 s1.awaitComplete();
875 assertTrue(s1.nexts >= 2);
876 }
877
878 /**
879 * Timed offer invokes drop handler if saturated
880 */
881 public void testHandledDroppedTimedOffer() {
882 AtomicInteger calls = new AtomicInteger();
883 SubmissionPublisher<Integer> p
884 = new SubmissionPublisher<>(basicExecutor, 4);
885 TestSubscriber s1 = new TestSubscriber();
886 s1.request = false;
887 TestSubscriber s2 = new TestSubscriber();
888 s2.request = false;
889 p.subscribe(s1);
890 p.subscribe(s2);
891 s2.awaitSubscribe();
892 s1.awaitSubscribe();
893 long delay = timeoutMillis();
894 for (int i = 1; i <= 4; ++i)
895 assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
896 long startTime = System.nanoTime();
897 assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
898 s1.sn.request(64);
899 assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
900 assertTrue(millisElapsedSince(startTime) >= delay);
901 s2.sn.request(64);
902 p.close();
903 s2.awaitComplete();
904 s1.awaitComplete();
905 assertTrue(calls.get() >= 2);
906 }
907
908 /**
909 * Timed offer succeeds if drop handler forces request
910 */
911 public void testRecoveredHandledDroppedTimedOffer() {
912 AtomicInteger calls = new AtomicInteger();
913 SubmissionPublisher<Integer> p
914 = new SubmissionPublisher<>(basicExecutor, 4);
915 TestSubscriber s1 = new TestSubscriber();
916 s1.request = false;
917 TestSubscriber s2 = new TestSubscriber();
918 s2.request = false;
919 p.subscribe(s1);
920 p.subscribe(s2);
921 s2.awaitSubscribe();
922 s1.awaitSubscribe();
923 int n = 0;
924 long delay = timeoutMillis();
925 long startTime = System.nanoTime();
926 for (int i = 1; i <= 6; ++i) {
927 int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
928 n = n + 2 + (d < 0 ? d : 0);
929 }
930 assertTrue(millisElapsedSince(startTime) >= delay);
931 p.close();
932 s2.awaitComplete();
933 s1.awaitComplete();
934 assertEquals(n, s1.nexts + s2.nexts);
935 assertTrue(calls.get() >= 2);
936 }
937
938 /**
939 * consume returns a CompletableFuture that is done when
940 * publisher completes
941 */
942 public void testConsume() {
943 AtomicInteger sum = new AtomicInteger();
944 SubmissionPublisher<Integer> p = basicPublisher();
945 CompletableFuture<Void> f =
946 p.consume((Integer x) -> sum.getAndAdd(x.intValue()));
947 int n = 20;
948 for (int i = 1; i <= n; ++i)
949 p.submit(i);
950 p.close();
951 f.join();
952 assertEquals((n * (n + 1)) / 2, sum.get());
953 }
954
955 /**
956 * consume(null) throws NPE
957 */
958 public void testConsumeNPE() {
959 SubmissionPublisher<Integer> p = basicPublisher();
960 try {
961 CompletableFuture<Void> unused = p.consume(null);
962 shouldThrow();
963 } catch (NullPointerException success) {}
964 }
965
966 /**
967 * consume eventually stops processing published items if cancelled
968 */
969 public void testCancelledConsume() {
970 AtomicInteger count = new AtomicInteger();
971 SubmissionPublisher<Integer> p = basicPublisher();
972 CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
973 f.cancel(true);
974 int n = 1000000; // arbitrary limit
975 for (int i = 1; i <= n; ++i)
976 p.submit(i);
977 assertTrue(count.get() < n);
978 }
979
980 /**
981 * Tests scenario for
982 * JDK-8187947: A race condition in SubmissionPublisher
983 * cvs update -D '2017-11-25' src/main/java/util/concurrent/SubmissionPublisher.java && ant -Djsr166.expensiveTests=true -Djsr166.tckTestClass=SubmissionPublisherTest -Djsr166.methodFilter=testMissedSignal tck; cvs update -A src/main/java/util/concurrent/SubmissionPublisher.java
984 */
985 public void testMissedSignal_8187947() throws Exception {
986 if (!atLeastJava9()) return; // backport to jdk8 too hard
987 final int N =
988 ((ForkJoinPool.getCommonPoolParallelism() < 2) // JDK-8212899
989 ? (1 << 5)
990 : (1 << 10))
991 * (expensiveTests ? (1 << 10) : 1);
992 final CountDownLatch finished = new CountDownLatch(1);
993 final SubmissionPublisher<Boolean> pub = new SubmissionPublisher<>();
994 class Sub implements Subscriber<Boolean> {
995 int received;
996 public void onSubscribe(Subscription s) {
997 s.request(N);
998 }
999 public void onNext(Boolean item) {
1000 if (++received == N)
1001 finished.countDown();
1002 else
1003 CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE));
1004 }
1005 public void onError(Throwable t) { throw new AssertionError(t); }
1006 public void onComplete() {}
1007 }
1008 pub.subscribe(new Sub());
1009 checkTimedGet(
1010 CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE)),
1011 null);
1012 await(finished);
1013 }
1014 }