47 |
|
} |
48 |
|
|
49 |
|
static final Executor basicExecutor = |
50 |
< |
(ForkJoinPool.getCommonPoolParallelism() > 0) ? |
50 |
> |
(ForkJoinPool.getCommonPoolParallelism() > 1) ? |
51 |
|
ForkJoinPool.commonPool() : |
52 |
|
new ThreadPoolExecutor(1, 1, 60, SECONDS, |
53 |
|
new LinkedBlockingQueue<Runnable>(), |
285 |
|
assertEquals(0, s2.nexts); |
286 |
|
assertEquals(0, s2.errors); |
287 |
|
assertEquals(0, s2.completes); |
288 |
+ |
p.close(); |
289 |
|
} |
290 |
|
|
291 |
|
/** |
361 |
|
} |
362 |
|
|
363 |
|
/** |
364 |
< |
* subscribe(null) thows NPE |
364 |
> |
* subscribe(null) throws NPE |
365 |
|
*/ |
366 |
|
public void testSubscribe6() { |
367 |
|
SubmissionPublisher<Integer> p = basicPublisher(); |
456 |
|
} |
457 |
|
|
458 |
|
/** |
459 |
< |
* If a handler is supplied in conctructor, it is invoked when |
459 |
> |
* If a handler is supplied in constructor, it is invoked when |
460 |
|
* subscriber throws an exception in onNext |
461 |
|
*/ |
462 |
|
public void testThrowOnNextHandler() { |
871 |
|
p.subscribe(s2); |
872 |
|
s2.awaitSubscribe(); |
873 |
|
s1.awaitSubscribe(); |
874 |
+ |
long delay = timeoutMillis(); |
875 |
|
for (int i = 1; i <= 4; ++i) |
876 |
< |
assertTrue(p.offer(i, SHORT_DELAY_MS, MILLISECONDS, null) >= 0); |
877 |
< |
p.offer(5, SHORT_DELAY_MS, MILLISECONDS, null); |
878 |
< |
assertTrue(p.offer(6, SHORT_DELAY_MS, MILLISECONDS, null) < 0); |
876 |
> |
assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0); |
877 |
> |
long startTime = System.nanoTime(); |
878 |
> |
assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0); |
879 |
|
s1.sn.request(64); |
880 |
< |
assertTrue(p.offer(7, SHORT_DELAY_MS, MILLISECONDS, null) < 0); |
880 |
> |
assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0); |
881 |
> |
// 2 * delay should elapse but check only 1 * delay to allow timer slop |
882 |
> |
assertTrue(millisElapsedSince(startTime) >= delay); |
883 |
|
s2.sn.request(64); |
884 |
|
p.close(); |
885 |
|
s2.awaitComplete(); |
903 |
|
p.subscribe(s2); |
904 |
|
s2.awaitSubscribe(); |
905 |
|
s1.awaitSubscribe(); |
906 |
+ |
long delay = timeoutMillis(); |
907 |
|
for (int i = 1; i <= 4; ++i) |
908 |
< |
assertTrue(p.offer(i, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0); |
909 |
< |
p.offer(5, (s, x) -> noopHandle(calls)); |
910 |
< |
assertTrue(p.offer(6, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0); |
908 |
> |
assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0); |
909 |
> |
long startTime = System.nanoTime(); |
910 |
> |
assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0); |
911 |
|
s1.sn.request(64); |
912 |
< |
assertTrue(p.offer(7, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0); |
912 |
> |
assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0); |
913 |
> |
assertTrue(millisElapsedSince(startTime) >= delay); |
914 |
|
s2.sn.request(64); |
915 |
|
p.close(); |
916 |
|
s2.awaitComplete(); |
934 |
|
s2.awaitSubscribe(); |
935 |
|
s1.awaitSubscribe(); |
936 |
|
int n = 0; |
937 |
< |
for (int i = 1; i <= 8; ++i) { |
938 |
< |
int d = p.offer(i, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> reqHandle(calls, s)); |
937 |
> |
long delay = timeoutMillis(); |
938 |
> |
long startTime = System.nanoTime(); |
939 |
> |
for (int i = 1; i <= 6; ++i) { |
940 |
> |
int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s)); |
941 |
|
n = n + 2 + (d < 0 ? d : 0); |
942 |
|
} |
943 |
+ |
assertTrue(millisElapsedSince(startTime) >= delay); |
944 |
|
p.close(); |
945 |
|
s2.awaitComplete(); |
946 |
|
s1.awaitComplete(); |