47 |
|
} |
48 |
|
|
49 |
|
static final Executor basicExecutor = |
50 |
< |
(ForkJoinPool.getCommonPoolParallelism() > 0) ? |
50 |
> |
(ForkJoinPool.getCommonPoolParallelism() > 1) ? |
51 |
|
ForkJoinPool.commonPool() : |
52 |
|
new ThreadPoolExecutor(1, 1, 60, SECONDS, |
53 |
|
new LinkedBlockingQueue<Runnable>(), |
285 |
|
assertEquals(0, s2.nexts); |
286 |
|
assertEquals(0, s2.errors); |
287 |
|
assertEquals(0, s2.completes); |
288 |
+ |
p.close(); |
289 |
|
} |
290 |
|
|
291 |
|
/** |
361 |
|
} |
362 |
|
|
363 |
|
/** |
364 |
< |
* subscribe(null) thows NPE |
364 |
> |
* subscribe(null) throws NPE |
365 |
|
*/ |
366 |
|
public void testSubscribe6() { |
367 |
|
SubmissionPublisher<Integer> p = basicPublisher(); |
456 |
|
} |
457 |
|
|
458 |
|
/** |
459 |
< |
* If a handler is supplied in conctructor, it is invoked when |
459 |
> |
* If a handler is supplied in constructor, it is invoked when |
460 |
|
* subscriber throws an exception in onNext |
461 |
|
*/ |
462 |
|
public void testThrowOnNextHandler() { |
807 |
|
*/ |
808 |
|
public void testEmptyTimedOffer() { |
809 |
|
SubmissionPublisher<Integer> p = basicPublisher(); |
810 |
+ |
long startTime = System.nanoTime(); |
811 |
|
assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null)); |
812 |
+ |
assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2); |
813 |
|
} |
814 |
|
|
815 |
|
/** |
817 |
|
*/ |
818 |
|
public void testNullTimedOffer() { |
819 |
|
SubmissionPublisher<Integer> p = basicPublisher(); |
820 |
+ |
long startTime = System.nanoTime(); |
821 |
|
try { |
822 |
< |
p.offer(null, SHORT_DELAY_MS, MILLISECONDS, null); |
822 |
> |
p.offer(null, LONG_DELAY_MS, MILLISECONDS, null); |
823 |
|
shouldThrow(); |
824 |
|
} catch (NullPointerException success) {} |
825 |
|
try { |
826 |
< |
p.offer(1, SHORT_DELAY_MS, null, null); |
826 |
> |
p.offer(1, LONG_DELAY_MS, null, null); |
827 |
|
shouldThrow(); |
828 |
|
} catch (NullPointerException success) {} |
829 |
+ |
assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2); |
830 |
|
} |
831 |
|
|
832 |
|
/** |
842 |
|
p.subscribe(s2); |
843 |
|
s2.awaitSubscribe(); |
844 |
|
s1.awaitSubscribe(); |
845 |
< |
assertTrue(p.offer(1, SHORT_DELAY_MS, MILLISECONDS, null) >= 1); |
846 |
< |
assertTrue(p.offer(2, SHORT_DELAY_MS, MILLISECONDS, null) >= 2); |
845 |
> |
long startTime = System.nanoTime(); |
846 |
> |
assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1); |
847 |
> |
assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2); |
848 |
|
s1.sn.request(4); |
849 |
< |
assertTrue(p.offer(3, SHORT_DELAY_MS, MILLISECONDS, null) >= 3); |
849 |
> |
assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3); |
850 |
|
s2.sn.request(4); |
851 |
< |
p.offer(4, SHORT_DELAY_MS, MILLISECONDS, null); |
851 |
> |
p.offer(4, LONG_DELAY_MS, MILLISECONDS, null); |
852 |
|
p.close(); |
853 |
|
s2.awaitComplete(); |
854 |
|
assertEquals(4, s2.nexts); |
855 |
|
s1.awaitComplete(); |
856 |
|
assertEquals(4, s2.nexts); |
857 |
+ |
assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2); |
858 |
|
} |
859 |
|
|
860 |
|
/** |
871 |
|
p.subscribe(s2); |
872 |
|
s2.awaitSubscribe(); |
873 |
|
s1.awaitSubscribe(); |
874 |
+ |
long delay = timeoutMillis(); |
875 |
|
for (int i = 1; i <= 4; ++i) |
876 |
< |
assertTrue(p.offer(i, SHORT_DELAY_MS, MILLISECONDS, null) >= 0); |
877 |
< |
p.offer(5, SHORT_DELAY_MS, MILLISECONDS, null); |
878 |
< |
assertTrue(p.offer(6, SHORT_DELAY_MS, MILLISECONDS, null) < 0); |
876 |
> |
assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0); |
877 |
> |
long startTime = System.nanoTime(); |
878 |
> |
assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0); |
879 |
|
s1.sn.request(64); |
880 |
< |
assertTrue(p.offer(7, SHORT_DELAY_MS, MILLISECONDS, null) < 0); |
880 |
> |
assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0); |
881 |
> |
// 2 * delay should elapse but check only 1 * delay to allow timer slop |
882 |
> |
assertTrue(millisElapsedSince(startTime) >= delay); |
883 |
|
s2.sn.request(64); |
884 |
|
p.close(); |
885 |
|
s2.awaitComplete(); |
903 |
|
p.subscribe(s2); |
904 |
|
s2.awaitSubscribe(); |
905 |
|
s1.awaitSubscribe(); |
906 |
+ |
long delay = timeoutMillis(); |
907 |
|
for (int i = 1; i <= 4; ++i) |
908 |
< |
assertTrue(p.offer(i, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0); |
909 |
< |
p.offer(5, (s, x) -> noopHandle(calls)); |
910 |
< |
assertTrue(p.offer(6, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0); |
908 |
> |
assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0); |
909 |
> |
long startTime = System.nanoTime(); |
910 |
> |
assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0); |
911 |
|
s1.sn.request(64); |
912 |
< |
assertTrue(p.offer(7, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0); |
912 |
> |
assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0); |
913 |
> |
assertTrue(millisElapsedSince(startTime) >= delay); |
914 |
|
s2.sn.request(64); |
915 |
|
p.close(); |
916 |
|
s2.awaitComplete(); |
934 |
|
s2.awaitSubscribe(); |
935 |
|
s1.awaitSubscribe(); |
936 |
|
int n = 0; |
937 |
< |
for (int i = 1; i <= 8; ++i) { |
938 |
< |
int d = p.offer(i, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> reqHandle(calls, s)); |
937 |
> |
long delay = timeoutMillis(); |
938 |
> |
long startTime = System.nanoTime(); |
939 |
> |
for (int i = 1; i <= 6; ++i) { |
940 |
> |
int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s)); |
941 |
|
n = n + 2 + (d < 0 ? d : 0); |
942 |
|
} |
943 |
+ |
assertTrue(millisElapsedSince(startTime) >= delay); |
944 |
|
p.close(); |
945 |
|
s2.awaitComplete(); |
946 |
|
s1.awaitComplete(); |