ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SubmissionPublisherTest.java
Revision: 1.26
Committed: Sun Jan 7 22:59:18 2018 UTC (6 years, 3 months ago) by jsr166
Branch: MAIN
Changes since 1.25: +1 -1 lines
Log Message:
use <>

File Contents

# Content
1 /*
2 * Written by Doug Lea and Martin Buchholz with assistance from
3 * members of JCP JSR-166 Expert Group and released to the public
4 * domain, as explained at
5 * http://creativecommons.org/publicdomain/zero/1.0/
6 */
7
8 import java.util.concurrent.CompletableFuture;
9 import java.util.concurrent.CountDownLatch;
10 import java.util.concurrent.Executor;
11 import java.util.concurrent.Executors;
12 import java.util.concurrent.Flow;
13 import java.util.concurrent.ForkJoinPool;
14 import java.util.concurrent.SubmissionPublisher;
15 import java.util.concurrent.atomic.AtomicInteger;
16 import junit.framework.Test;
17 import junit.framework.TestSuite;
18
19 import static java.util.concurrent.Flow.Subscriber;
20 import static java.util.concurrent.Flow.Subscription;
21 import static java.util.concurrent.TimeUnit.MILLISECONDS;
22
23 public class SubmissionPublisherTest extends JSR166TestCase {
24
25 public static void main(String[] args) {
26 main(suite(), args);
27 }
28 public static Test suite() {
29 return new TestSuite(SubmissionPublisherTest.class);
30 }
31
32 final Executor basicExecutor = basicPublisher().getExecutor();
33
34 static SubmissionPublisher<Integer> basicPublisher() {
35 return new SubmissionPublisher<Integer>();
36 }
37
38 static class SPException extends RuntimeException {}
39
40 class TestSubscriber implements Subscriber<Integer> {
41 volatile Subscription sn;
42 int last; // Requires that onNexts are in numeric order
43 volatile int nexts;
44 volatile int errors;
45 volatile int completes;
46 volatile boolean throwOnCall = false;
47 volatile boolean request = true;
48 volatile Throwable lastError;
49
50 public synchronized void onSubscribe(Subscription s) {
51 threadAssertTrue(sn == null);
52 sn = s;
53 notifyAll();
54 if (throwOnCall)
55 throw new SPException();
56 if (request)
57 sn.request(1L);
58 }
59 public synchronized void onNext(Integer t) {
60 ++nexts;
61 notifyAll();
62 int current = t.intValue();
63 threadAssertTrue(current >= last);
64 last = current;
65 if (request)
66 sn.request(1L);
67 if (throwOnCall)
68 throw new SPException();
69 }
70 public synchronized void onError(Throwable t) {
71 threadAssertTrue(completes == 0);
72 threadAssertTrue(errors == 0);
73 lastError = t;
74 ++errors;
75 notifyAll();
76 }
77 public synchronized void onComplete() {
78 threadAssertTrue(completes == 0);
79 ++completes;
80 notifyAll();
81 }
82
83 synchronized void awaitSubscribe() {
84 while (sn == null) {
85 try {
86 wait();
87 } catch (Exception ex) {
88 threadUnexpectedException(ex);
89 break;
90 }
91 }
92 }
93 synchronized void awaitNext(int n) {
94 while (nexts < n) {
95 try {
96 wait();
97 } catch (Exception ex) {
98 threadUnexpectedException(ex);
99 break;
100 }
101 }
102 }
103 synchronized void awaitComplete() {
104 while (completes == 0 && errors == 0) {
105 try {
106 wait();
107 } catch (Exception ex) {
108 threadUnexpectedException(ex);
109 break;
110 }
111 }
112 }
113 synchronized void awaitError() {
114 while (errors == 0) {
115 try {
116 wait();
117 } catch (Exception ex) {
118 threadUnexpectedException(ex);
119 break;
120 }
121 }
122 }
123
124 }
125
126 /**
127 * A new SubmissionPublisher has no subscribers, a non-null
128 * executor, a power-of-two capacity, is not closed, and reports
129 * zero demand and lag
130 */
131 void checkInitialState(SubmissionPublisher<?> p) {
132 assertFalse(p.hasSubscribers());
133 assertEquals(0, p.getNumberOfSubscribers());
134 assertTrue(p.getSubscribers().isEmpty());
135 assertFalse(p.isClosed());
136 assertNull(p.getClosedException());
137 int n = p.getMaxBufferCapacity();
138 assertTrue((n & (n - 1)) == 0); // power of two
139 assertNotNull(p.getExecutor());
140 assertEquals(0, p.estimateMinimumDemand());
141 assertEquals(0, p.estimateMaximumLag());
142 }
143
144 /**
145 * A default-constructed SubmissionPublisher has no subscribers,
146 * is not closed, has default buffer size, and uses the
147 * defaultExecutor
148 */
149 public void testConstructor1() {
150 SubmissionPublisher<Integer> p = new SubmissionPublisher<>();
151 checkInitialState(p);
152 assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
153 Executor e = p.getExecutor(), c = ForkJoinPool.commonPool();
154 if (ForkJoinPool.getCommonPoolParallelism() > 1)
155 assertSame(e, c);
156 else
157 assertNotSame(e, c);
158 }
159
160 /**
161 * A new SubmissionPublisher has no subscribers, is not closed,
162 * has the given buffer size, and uses the given executor
163 */
164 public void testConstructor2() {
165 Executor e = Executors.newFixedThreadPool(1);
166 SubmissionPublisher<Integer> p = new SubmissionPublisher<>(e, 8);
167 checkInitialState(p);
168 assertSame(p.getExecutor(), e);
169 assertEquals(8, p.getMaxBufferCapacity());
170 }
171
172 /**
173 * A null Executor argument to SubmissionPublisher constructor
174 * throws NullPointerException
175 */
176 public void testConstructor3() {
177 try {
178 new SubmissionPublisher<Integer>(null, 8);
179 shouldThrow();
180 } catch (NullPointerException success) {}
181 }
182
183 /**
184 * A negative capacity argument to SubmissionPublisher constructor
185 * throws IllegalArgumentException
186 */
187 public void testConstructor4() {
188 Executor e = Executors.newFixedThreadPool(1);
189 try {
190 new SubmissionPublisher<Integer>(e, -1);
191 shouldThrow();
192 } catch (IllegalArgumentException success) {}
193 }
194
195 /**
196 * A closed publisher reports isClosed with no closedException and
197 * throws IllegalStateException upon attempted submission; a
198 * subsequent close or closeExceptionally has no additional
199 * effect.
200 */
201 public void testClose() {
202 SubmissionPublisher<Integer> p = basicPublisher();
203 checkInitialState(p);
204 p.close();
205 assertTrue(p.isClosed());
206 assertNull(p.getClosedException());
207 try {
208 p.submit(1);
209 shouldThrow();
210 } catch (IllegalStateException success) {}
211 Throwable ex = new SPException();
212 p.closeExceptionally(ex);
213 assertTrue(p.isClosed());
214 assertNull(p.getClosedException());
215 }
216
217 /**
218 * A publisher closedExceptionally reports isClosed with the
219 * closedException and throws IllegalStateException upon attempted
220 * submission; a subsequent close or closeExceptionally has no
221 * additional effect.
222 */
223 public void testCloseExceptionally() {
224 SubmissionPublisher<Integer> p = basicPublisher();
225 checkInitialState(p);
226 Throwable ex = new SPException();
227 p.closeExceptionally(ex);
228 assertTrue(p.isClosed());
229 assertSame(p.getClosedException(), ex);
230 try {
231 p.submit(1);
232 shouldThrow();
233 } catch (IllegalStateException success) {}
234 p.close();
235 assertTrue(p.isClosed());
236 assertSame(p.getClosedException(), ex);
237 }
238
239 /**
240 * Upon subscription, the subscriber's onSubscribe is called, no
241 * other Subscriber methods are invoked, the publisher
242 * hasSubscribers, isSubscribed is true, and existing
243 * subscriptions are unaffected.
244 */
245 public void testSubscribe1() {
246 TestSubscriber s = new TestSubscriber();
247 SubmissionPublisher<Integer> p = basicPublisher();
248 p.subscribe(s);
249 assertTrue(p.hasSubscribers());
250 assertEquals(1, p.getNumberOfSubscribers());
251 assertTrue(p.getSubscribers().contains(s));
252 assertTrue(p.isSubscribed(s));
253 s.awaitSubscribe();
254 assertNotNull(s.sn);
255 assertEquals(0, s.nexts);
256 assertEquals(0, s.errors);
257 assertEquals(0, s.completes);
258 TestSubscriber s2 = new TestSubscriber();
259 p.subscribe(s2);
260 assertTrue(p.hasSubscribers());
261 assertEquals(2, p.getNumberOfSubscribers());
262 assertTrue(p.getSubscribers().contains(s));
263 assertTrue(p.getSubscribers().contains(s2));
264 assertTrue(p.isSubscribed(s));
265 assertTrue(p.isSubscribed(s2));
266 s2.awaitSubscribe();
267 assertNotNull(s2.sn);
268 assertEquals(0, s2.nexts);
269 assertEquals(0, s2.errors);
270 assertEquals(0, s2.completes);
271 p.close();
272 }
273
274 /**
275 * If closed, upon subscription, the subscriber's onComplete
276 * method is invoked
277 */
278 public void testSubscribe2() {
279 TestSubscriber s = new TestSubscriber();
280 SubmissionPublisher<Integer> p = basicPublisher();
281 p.close();
282 p.subscribe(s);
283 s.awaitComplete();
284 assertEquals(0, s.nexts);
285 assertEquals(0, s.errors);
286 assertEquals(1, s.completes, 1);
287 }
288
289 /**
290 * If closedExceptionally, upon subscription, the subscriber's
291 * onError method is invoked
292 */
293 public void testSubscribe3() {
294 TestSubscriber s = new TestSubscriber();
295 SubmissionPublisher<Integer> p = basicPublisher();
296 Throwable ex = new SPException();
297 p.closeExceptionally(ex);
298 assertTrue(p.isClosed());
299 assertSame(p.getClosedException(), ex);
300 p.subscribe(s);
301 s.awaitError();
302 assertEquals(0, s.nexts);
303 assertEquals(1, s.errors);
304 }
305
306 /**
307 * Upon attempted resubscription, the subscriber's onError is
308 * called and the subscription is cancelled.
309 */
310 public void testSubscribe4() {
311 TestSubscriber s = new TestSubscriber();
312 SubmissionPublisher<Integer> p = basicPublisher();
313 p.subscribe(s);
314 assertTrue(p.hasSubscribers());
315 assertEquals(1, p.getNumberOfSubscribers());
316 assertTrue(p.getSubscribers().contains(s));
317 assertTrue(p.isSubscribed(s));
318 s.awaitSubscribe();
319 assertNotNull(s.sn);
320 assertEquals(0, s.nexts);
321 assertEquals(0, s.errors);
322 assertEquals(0, s.completes);
323 p.subscribe(s);
324 s.awaitError();
325 assertEquals(0, s.nexts);
326 assertEquals(1, s.errors);
327 assertFalse(p.isSubscribed(s));
328 }
329
330 /**
331 * An exception thrown in onSubscribe causes onError
332 */
333 public void testSubscribe5() {
334 TestSubscriber s = new TestSubscriber();
335 SubmissionPublisher<Integer> p = basicPublisher();
336 s.throwOnCall = true;
337 try {
338 p.subscribe(s);
339 } catch (Exception ok) {}
340 s.awaitError();
341 assertEquals(0, s.nexts);
342 assertEquals(1, s.errors);
343 assertEquals(0, s.completes);
344 }
345
346 /**
347 * subscribe(null) throws NPE
348 */
349 public void testSubscribe6() {
350 SubmissionPublisher<Integer> p = basicPublisher();
351 try {
352 p.subscribe(null);
353 shouldThrow();
354 } catch (NullPointerException success) {}
355 checkInitialState(p);
356 }
357
358 /**
359 * Closing a publisher causes onComplete to subscribers
360 */
361 public void testCloseCompletes() {
362 SubmissionPublisher<Integer> p = basicPublisher();
363 TestSubscriber s1 = new TestSubscriber();
364 TestSubscriber s2 = new TestSubscriber();
365 p.subscribe(s1);
366 p.subscribe(s2);
367 p.submit(1);
368 p.close();
369 assertTrue(p.isClosed());
370 assertNull(p.getClosedException());
371 s1.awaitComplete();
372 assertEquals(1, s1.nexts);
373 assertEquals(1, s1.completes);
374 s2.awaitComplete();
375 assertEquals(1, s2.nexts);
376 assertEquals(1, s2.completes);
377 }
378
379 /**
380 * Closing a publisher exceptionally causes onError to subscribers
381 * after they are subscribed
382 */
383 public void testCloseExceptionallyError() {
384 SubmissionPublisher<Integer> p = basicPublisher();
385 TestSubscriber s1 = new TestSubscriber();
386 TestSubscriber s2 = new TestSubscriber();
387 p.subscribe(s1);
388 p.subscribe(s2);
389 p.submit(1);
390 p.closeExceptionally(new SPException());
391 assertTrue(p.isClosed());
392 s1.awaitSubscribe();
393 s1.awaitError();
394 assertTrue(s1.nexts <= 1);
395 assertEquals(1, s1.errors);
396 s2.awaitSubscribe();
397 s2.awaitError();
398 assertTrue(s2.nexts <= 1);
399 assertEquals(1, s2.errors);
400 }
401
402 /**
403 * Cancelling a subscription eventually causes no more onNexts to be issued
404 */
405 public void testCancel() {
406 SubmissionPublisher<Integer> p =
407 new SubmissionPublisher<>(basicExecutor, 4); // must be < 20
408 TestSubscriber s1 = new TestSubscriber();
409 TestSubscriber s2 = new TestSubscriber();
410 p.subscribe(s1);
411 p.subscribe(s2);
412 s1.awaitSubscribe();
413 p.submit(1);
414 s1.sn.cancel();
415 for (int i = 2; i <= 20; ++i)
416 p.submit(i);
417 p.close();
418 s2.awaitComplete();
419 assertEquals(20, s2.nexts);
420 assertEquals(1, s2.completes);
421 assertTrue(s1.nexts < 20);
422 assertFalse(p.isSubscribed(s1));
423 }
424
425 /**
426 * Throwing an exception in onNext causes onError
427 */
428 public void testThrowOnNext() {
429 SubmissionPublisher<Integer> p = basicPublisher();
430 TestSubscriber s1 = new TestSubscriber();
431 TestSubscriber s2 = new TestSubscriber();
432 p.subscribe(s1);
433 p.subscribe(s2);
434 s1.awaitSubscribe();
435 p.submit(1);
436 s1.throwOnCall = true;
437 p.submit(2);
438 p.close();
439 s2.awaitComplete();
440 assertEquals(2, s2.nexts);
441 s1.awaitComplete();
442 assertEquals(1, s1.errors);
443 }
444
445 /**
446 * If a handler is supplied in constructor, it is invoked when
447 * subscriber throws an exception in onNext
448 */
449 public void testThrowOnNextHandler() {
450 AtomicInteger calls = new AtomicInteger();
451 SubmissionPublisher<Integer> p = new SubmissionPublisher<>(
452 basicExecutor, 8, (s, e) -> calls.getAndIncrement());
453 TestSubscriber s1 = new TestSubscriber();
454 TestSubscriber s2 = new TestSubscriber();
455 p.subscribe(s1);
456 p.subscribe(s2);
457 s1.awaitSubscribe();
458 p.submit(1);
459 s1.throwOnCall = true;
460 p.submit(2);
461 p.close();
462 s2.awaitComplete();
463 assertEquals(2, s2.nexts);
464 assertEquals(1, s2.completes);
465 s1.awaitError();
466 assertEquals(1, s1.errors);
467 assertEquals(1, calls.get());
468 }
469
470 /**
471 * onNext items are issued in the same order to each subscriber
472 */
473 public void testOrder() {
474 SubmissionPublisher<Integer> p = basicPublisher();
475 TestSubscriber s1 = new TestSubscriber();
476 TestSubscriber s2 = new TestSubscriber();
477 p.subscribe(s1);
478 p.subscribe(s2);
479 for (int i = 1; i <= 20; ++i)
480 p.submit(i);
481 p.close();
482 s2.awaitComplete();
483 s1.awaitComplete();
484 assertEquals(20, s2.nexts);
485 assertEquals(1, s2.completes);
486 assertEquals(20, s1.nexts);
487 assertEquals(1, s1.completes);
488 }
489
490 /**
491 * onNext is issued only if requested
492 */
493 public void testRequest1() {
494 SubmissionPublisher<Integer> p = basicPublisher();
495 TestSubscriber s1 = new TestSubscriber();
496 s1.request = false;
497 p.subscribe(s1);
498 s1.awaitSubscribe();
499 assertEquals(0, p.estimateMinimumDemand());
500 TestSubscriber s2 = new TestSubscriber();
501 p.subscribe(s2);
502 p.submit(1);
503 p.submit(2);
504 s2.awaitNext(1);
505 assertEquals(0, s1.nexts);
506 s1.sn.request(3);
507 p.submit(3);
508 p.close();
509 s2.awaitComplete();
510 assertEquals(3, s2.nexts);
511 assertEquals(1, s2.completes);
512 s1.awaitComplete();
513 assertTrue(s1.nexts > 0);
514 assertEquals(1, s1.completes);
515 }
516
517 /**
518 * onNext is not issued when requests become zero
519 */
520 public void testRequest2() {
521 SubmissionPublisher<Integer> p = basicPublisher();
522 TestSubscriber s1 = new TestSubscriber();
523 TestSubscriber s2 = new TestSubscriber();
524 p.subscribe(s1);
525 p.subscribe(s2);
526 s2.awaitSubscribe();
527 s1.awaitSubscribe();
528 s1.request = false;
529 p.submit(1);
530 p.submit(2);
531 p.close();
532 s2.awaitComplete();
533 assertEquals(2, s2.nexts);
534 assertEquals(1, s2.completes);
535 s1.awaitNext(1);
536 assertEquals(1, s1.nexts);
537 }
538
539 /**
540 * Non-positive request causes error
541 */
542 public void testRequest3() {
543 SubmissionPublisher<Integer> p = basicPublisher();
544 TestSubscriber s1 = new TestSubscriber();
545 TestSubscriber s2 = new TestSubscriber();
546 TestSubscriber s3 = new TestSubscriber();
547 p.subscribe(s1);
548 p.subscribe(s2);
549 p.subscribe(s3);
550 s3.awaitSubscribe();
551 s2.awaitSubscribe();
552 s1.awaitSubscribe();
553 s1.sn.request(-1L);
554 s3.sn.request(0L);
555 p.submit(1);
556 p.submit(2);
557 p.close();
558 s2.awaitComplete();
559 assertEquals(2, s2.nexts);
560 assertEquals(1, s2.completes);
561 s1.awaitError();
562 assertEquals(1, s1.errors);
563 assertTrue(s1.lastError instanceof IllegalArgumentException);
564 s3.awaitError();
565 assertEquals(1, s3.errors);
566 assertTrue(s3.lastError instanceof IllegalArgumentException);
567 }
568
569 /**
570 * estimateMinimumDemand reports 0 until request, nonzero after
571 * request
572 */
573 public void testEstimateMinimumDemand() {
574 TestSubscriber s = new TestSubscriber();
575 SubmissionPublisher<Integer> p = basicPublisher();
576 s.request = false;
577 p.subscribe(s);
578 s.awaitSubscribe();
579 assertEquals(0, p.estimateMinimumDemand());
580 s.sn.request(1);
581 assertEquals(1, p.estimateMinimumDemand());
582 }
583
584 /**
585 * submit to a publisher with no subscribers returns lag 0
586 */
587 public void testEmptySubmit() {
588 SubmissionPublisher<Integer> p = basicPublisher();
589 assertEquals(0, p.submit(1));
590 }
591
592 /**
593 * submit(null) throws NPE
594 */
595 public void testNullSubmit() {
596 SubmissionPublisher<Integer> p = basicPublisher();
597 try {
598 p.submit(null);
599 shouldThrow();
600 } catch (NullPointerException success) {}
601 }
602
603 /**
604 * submit returns number of lagged items, compatible with result
605 * of estimateMaximumLag.
606 */
607 public void testLaggedSubmit() {
608 SubmissionPublisher<Integer> p = basicPublisher();
609 TestSubscriber s1 = new TestSubscriber();
610 s1.request = false;
611 TestSubscriber s2 = new TestSubscriber();
612 s2.request = false;
613 p.subscribe(s1);
614 p.subscribe(s2);
615 s2.awaitSubscribe();
616 s1.awaitSubscribe();
617 assertEquals(1, p.submit(1));
618 assertTrue(p.estimateMaximumLag() >= 1);
619 assertTrue(p.submit(2) >= 2);
620 assertTrue(p.estimateMaximumLag() >= 2);
621 s1.sn.request(4);
622 assertTrue(p.submit(3) >= 3);
623 assertTrue(p.estimateMaximumLag() >= 3);
624 s2.sn.request(4);
625 p.submit(4);
626 p.close();
627 s2.awaitComplete();
628 assertEquals(4, s2.nexts);
629 s1.awaitComplete();
630 assertEquals(4, s2.nexts);
631 }
632
633 /**
634 * submit eventually issues requested items when buffer capacity is 1
635 */
636 public void testCap1Submit() {
637 SubmissionPublisher<Integer> p
638 = new SubmissionPublisher<>(basicExecutor, 1);
639 TestSubscriber s1 = new TestSubscriber();
640 TestSubscriber s2 = new TestSubscriber();
641 p.subscribe(s1);
642 p.subscribe(s2);
643 for (int i = 1; i <= 20; ++i) {
644 assertTrue(p.submit(i) >= 0);
645 }
646 p.close();
647 s2.awaitComplete();
648 s1.awaitComplete();
649 assertEquals(20, s2.nexts);
650 assertEquals(1, s2.completes);
651 assertEquals(20, s1.nexts);
652 assertEquals(1, s1.completes);
653 }
654
655 static boolean noopHandle(AtomicInteger count) {
656 count.getAndIncrement();
657 return false;
658 }
659
660 static boolean reqHandle(AtomicInteger count, Subscriber s) {
661 count.getAndIncrement();
662 ((TestSubscriber)s).sn.request(Long.MAX_VALUE);
663 return true;
664 }
665
666 /**
667 * offer to a publisher with no subscribers returns lag 0
668 */
669 public void testEmptyOffer() {
670 SubmissionPublisher<Integer> p = basicPublisher();
671 assertEquals(0, p.offer(1, null));
672 }
673
674 /**
675 * offer(null) throws NPE
676 */
677 public void testNullOffer() {
678 SubmissionPublisher<Integer> p = basicPublisher();
679 try {
680 p.offer(null, null);
681 shouldThrow();
682 } catch (NullPointerException success) {}
683 }
684
685 /**
686 * offer returns number of lagged items if not saturated
687 */
688 public void testLaggedOffer() {
689 SubmissionPublisher<Integer> p = basicPublisher();
690 TestSubscriber s1 = new TestSubscriber();
691 s1.request = false;
692 TestSubscriber s2 = new TestSubscriber();
693 s2.request = false;
694 p.subscribe(s1);
695 p.subscribe(s2);
696 s2.awaitSubscribe();
697 s1.awaitSubscribe();
698 assertTrue(p.offer(1, null) >= 1);
699 assertTrue(p.offer(2, null) >= 2);
700 s1.sn.request(4);
701 assertTrue(p.offer(3, null) >= 3);
702 s2.sn.request(4);
703 p.offer(4, null);
704 p.close();
705 s2.awaitComplete();
706 assertEquals(4, s2.nexts);
707 s1.awaitComplete();
708 assertEquals(4, s2.nexts);
709 }
710
711 /**
712 * offer reports drops if saturated
713 */
714 public void testDroppedOffer() {
715 SubmissionPublisher<Integer> p
716 = new SubmissionPublisher<>(basicExecutor, 4);
717 TestSubscriber s1 = new TestSubscriber();
718 s1.request = false;
719 TestSubscriber s2 = new TestSubscriber();
720 s2.request = false;
721 p.subscribe(s1);
722 p.subscribe(s2);
723 s2.awaitSubscribe();
724 s1.awaitSubscribe();
725 for (int i = 1; i <= 4; ++i)
726 assertTrue(p.offer(i, null) >= 0);
727 p.offer(5, null);
728 assertTrue(p.offer(6, null) < 0);
729 s1.sn.request(64);
730 assertTrue(p.offer(7, null) < 0);
731 s2.sn.request(64);
732 p.close();
733 s2.awaitComplete();
734 assertTrue(s2.nexts >= 4);
735 s1.awaitComplete();
736 assertTrue(s1.nexts >= 4);
737 }
738
739 /**
740 * offer invokes drop handler if saturated
741 */
742 public void testHandledDroppedOffer() {
743 AtomicInteger calls = new AtomicInteger();
744 SubmissionPublisher<Integer> p
745 = new SubmissionPublisher<>(basicExecutor, 4);
746 TestSubscriber s1 = new TestSubscriber();
747 s1.request = false;
748 TestSubscriber s2 = new TestSubscriber();
749 s2.request = false;
750 p.subscribe(s1);
751 p.subscribe(s2);
752 s2.awaitSubscribe();
753 s1.awaitSubscribe();
754 for (int i = 1; i <= 4; ++i)
755 assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
756 p.offer(4, (s, x) -> noopHandle(calls));
757 assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
758 s1.sn.request(64);
759 assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
760 s2.sn.request(64);
761 p.close();
762 s2.awaitComplete();
763 s1.awaitComplete();
764 assertTrue(calls.get() >= 4);
765 }
766
767 /**
768 * offer succeeds if drop handler forces request
769 */
770 public void testRecoveredHandledDroppedOffer() {
771 AtomicInteger calls = new AtomicInteger();
772 SubmissionPublisher<Integer> p
773 = new SubmissionPublisher<>(basicExecutor, 4);
774 TestSubscriber s1 = new TestSubscriber();
775 s1.request = false;
776 TestSubscriber s2 = new TestSubscriber();
777 s2.request = false;
778 p.subscribe(s1);
779 p.subscribe(s2);
780 s2.awaitSubscribe();
781 s1.awaitSubscribe();
782 int n = 0;
783 for (int i = 1; i <= 8; ++i) {
784 int d = p.offer(i, (s, x) -> reqHandle(calls, s));
785 n = n + 2 + (d < 0 ? d : 0);
786 }
787 p.close();
788 s2.awaitComplete();
789 s1.awaitComplete();
790 assertEquals(n, s1.nexts + s2.nexts);
791 assertTrue(calls.get() >= 2);
792 }
793
794 /**
795 * Timed offer to a publisher with no subscribers returns lag 0
796 */
797 public void testEmptyTimedOffer() {
798 SubmissionPublisher<Integer> p = basicPublisher();
799 long startTime = System.nanoTime();
800 assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null));
801 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
802 }
803
804 /**
805 * Timed offer with null item or TimeUnit throws NPE
806 */
807 public void testNullTimedOffer() {
808 SubmissionPublisher<Integer> p = basicPublisher();
809 long startTime = System.nanoTime();
810 try {
811 p.offer(null, LONG_DELAY_MS, MILLISECONDS, null);
812 shouldThrow();
813 } catch (NullPointerException success) {}
814 try {
815 p.offer(1, LONG_DELAY_MS, null, null);
816 shouldThrow();
817 } catch (NullPointerException success) {}
818 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
819 }
820
821 /**
822 * Timed offer returns number of lagged items if not saturated
823 */
824 public void testLaggedTimedOffer() {
825 SubmissionPublisher<Integer> p = basicPublisher();
826 TestSubscriber s1 = new TestSubscriber();
827 s1.request = false;
828 TestSubscriber s2 = new TestSubscriber();
829 s2.request = false;
830 p.subscribe(s1);
831 p.subscribe(s2);
832 s2.awaitSubscribe();
833 s1.awaitSubscribe();
834 long startTime = System.nanoTime();
835 assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
836 assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
837 s1.sn.request(4);
838 assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
839 s2.sn.request(4);
840 p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
841 p.close();
842 s2.awaitComplete();
843 assertEquals(4, s2.nexts);
844 s1.awaitComplete();
845 assertEquals(4, s2.nexts);
846 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
847 }
848
849 /**
850 * Timed offer reports drops if saturated
851 */
852 public void testDroppedTimedOffer() {
853 SubmissionPublisher<Integer> p
854 = new SubmissionPublisher<>(basicExecutor, 4);
855 TestSubscriber s1 = new TestSubscriber();
856 s1.request = false;
857 TestSubscriber s2 = new TestSubscriber();
858 s2.request = false;
859 p.subscribe(s1);
860 p.subscribe(s2);
861 s2.awaitSubscribe();
862 s1.awaitSubscribe();
863 long delay = timeoutMillis();
864 for (int i = 1; i <= 4; ++i)
865 assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0);
866 long startTime = System.nanoTime();
867 assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0);
868 s1.sn.request(64);
869 assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0);
870 // 2 * delay should elapse but check only 1 * delay to allow timer slop
871 assertTrue(millisElapsedSince(startTime) >= delay);
872 s2.sn.request(64);
873 p.close();
874 s2.awaitComplete();
875 assertTrue(s2.nexts >= 2);
876 s1.awaitComplete();
877 assertTrue(s1.nexts >= 2);
878 }
879
880 /**
881 * Timed offer invokes drop handler if saturated
882 */
883 public void testHandledDroppedTimedOffer() {
884 AtomicInteger calls = new AtomicInteger();
885 SubmissionPublisher<Integer> p
886 = new SubmissionPublisher<>(basicExecutor, 4);
887 TestSubscriber s1 = new TestSubscriber();
888 s1.request = false;
889 TestSubscriber s2 = new TestSubscriber();
890 s2.request = false;
891 p.subscribe(s1);
892 p.subscribe(s2);
893 s2.awaitSubscribe();
894 s1.awaitSubscribe();
895 long delay = timeoutMillis();
896 for (int i = 1; i <= 4; ++i)
897 assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
898 long startTime = System.nanoTime();
899 assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
900 s1.sn.request(64);
901 assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
902 assertTrue(millisElapsedSince(startTime) >= delay);
903 s2.sn.request(64);
904 p.close();
905 s2.awaitComplete();
906 s1.awaitComplete();
907 assertTrue(calls.get() >= 2);
908 }
909
910 /**
911 * Timed offer succeeds if drop handler forces request
912 */
913 public void testRecoveredHandledDroppedTimedOffer() {
914 AtomicInteger calls = new AtomicInteger();
915 SubmissionPublisher<Integer> p
916 = new SubmissionPublisher<>(basicExecutor, 4);
917 TestSubscriber s1 = new TestSubscriber();
918 s1.request = false;
919 TestSubscriber s2 = new TestSubscriber();
920 s2.request = false;
921 p.subscribe(s1);
922 p.subscribe(s2);
923 s2.awaitSubscribe();
924 s1.awaitSubscribe();
925 int n = 0;
926 long delay = timeoutMillis();
927 long startTime = System.nanoTime();
928 for (int i = 1; i <= 6; ++i) {
929 int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
930 n = n + 2 + (d < 0 ? d : 0);
931 }
932 assertTrue(millisElapsedSince(startTime) >= delay);
933 p.close();
934 s2.awaitComplete();
935 s1.awaitComplete();
936 assertEquals(n, s1.nexts + s2.nexts);
937 assertTrue(calls.get() >= 2);
938 }
939
940 /**
941 * consume returns a CompletableFuture that is done when
942 * publisher completes
943 */
944 public void testConsume() {
945 AtomicInteger sum = new AtomicInteger();
946 SubmissionPublisher<Integer> p = basicPublisher();
947 CompletableFuture<Void> f =
948 p.consume((Integer x) -> sum.getAndAdd(x.intValue()));
949 int n = 20;
950 for (int i = 1; i <= n; ++i)
951 p.submit(i);
952 p.close();
953 f.join();
954 assertEquals((n * (n + 1)) / 2, sum.get());
955 }
956
957 /**
958 * consume(null) throws NPE
959 */
960 public void testConsumeNPE() {
961 SubmissionPublisher<Integer> p = basicPublisher();
962 try {
963 CompletableFuture<Void> f = p.consume(null);
964 shouldThrow();
965 } catch (NullPointerException success) {}
966 }
967
968 /**
969 * consume eventually stops processing published items if cancelled
970 */
971 public void testCancelledConsume() {
972 AtomicInteger count = new AtomicInteger();
973 SubmissionPublisher<Integer> p = basicPublisher();
974 CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
975 f.cancel(true);
976 int n = 1000000; // arbitrary limit
977 for (int i = 1; i <= n; ++i)
978 p.submit(i);
979 assertTrue(count.get() < n);
980 }
981
982 /**
983 * Tests scenario for
984 * JDK-8187947: A race condition in SubmissionPublisher
985 * cvs update -D '2017-11-25' src/main/java/util/concurrent/SubmissionPublisher.java && ant -Djsr166.expensiveTests=true -Djsr166.tckTestClass=SubmissionPublisherTest -Djsr166.methodFilter=testMissedSignal tck; cvs update -A src/main/java/util/concurrent/SubmissionPublisher.java
986 */
987 public void testMissedSignal_8187947() throws Exception {
988 if (!atLeastJava9()) return; // backport to jdk8 too hard
989 final int N = expensiveTests ? (1 << 20) : (1 << 10);
990 final CountDownLatch finished = new CountDownLatch(1);
991 final SubmissionPublisher<Boolean> pub = new SubmissionPublisher<>();
992 class Sub implements Subscriber<Boolean> {
993 int received;
994 public void onSubscribe(Subscription s) {
995 s.request(N);
996 }
997 public void onNext(Boolean item) {
998 if (++received == N)
999 finished.countDown();
1000 else
1001 CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE));
1002 }
1003 public void onError(Throwable t) { throw new AssertionError(t); }
1004 public void onComplete() {}
1005 }
1006 pub.subscribe(new Sub());
1007 CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE));
1008 await(finished);
1009 }
1010 }