ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SubmissionPublisherTest.java
Revision: 1.19
Committed: Thu Mar 9 00:11:16 2017 UTC (7 years, 1 month ago) by dl
Branch: MAIN
Changes since 1.18: +8 -1 lines
Log Message:
Conform to reactive-stream.org -- reject request(0)

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea and Martin Buchholz with assistance from
3     * members of JCP JSR-166 Expert Group and released to the public
4     * domain, as explained at
5     * http://creativecommons.org/publicdomain/zero/1.0/
6     */
7    
8 dl 1.10 import java.util.concurrent.CompletableFuture;
9 dl 1.1 import java.util.concurrent.Executor;
10     import java.util.concurrent.Executors;
11     import java.util.concurrent.Flow;
12 dl 1.10 import java.util.concurrent.ForkJoinPool;
13 dl 1.1 import java.util.concurrent.SubmissionPublisher;
14     import java.util.concurrent.atomic.AtomicInteger;
15     import junit.framework.Test;
16     import junit.framework.TestSuite;
17    
18 dl 1.10 import static java.util.concurrent.Flow.Subscriber;
19     import static java.util.concurrent.Flow.Subscription;
20     import static java.util.concurrent.TimeUnit.MILLISECONDS;
21    
22 dl 1.1 public class SubmissionPublisherTest extends JSR166TestCase {
23    
24     public static void main(String[] args) {
25     main(suite(), args);
26     }
27     public static Test suite() {
28     return new TestSuite(SubmissionPublisherTest.class);
29     }
30    
31 dl 1.12 final Executor basicExecutor = basicPublisher().getExecutor();
32 jsr166 1.13
33 dl 1.1 static SubmissionPublisher<Integer> basicPublisher() {
34 dl 1.12 return new SubmissionPublisher<Integer>();
35 dl 1.1 }
36 jsr166 1.2
37 dl 1.1 static class SPException extends RuntimeException {}
38    
39     class TestSubscriber implements Subscriber<Integer> {
40     volatile Subscription sn;
41     int last; // Requires that onNexts are in numeric order
42     volatile int nexts;
43     volatile int errors;
44     volatile int completes;
45     volatile boolean throwOnCall = false;
46     volatile boolean request = true;
47     volatile Throwable lastError;
48    
49     public synchronized void onSubscribe(Subscription s) {
50     threadAssertTrue(sn == null);
51     sn = s;
52     notifyAll();
53     if (throwOnCall)
54     throw new SPException();
55     if (request)
56     sn.request(1L);
57     }
58     public synchronized void onNext(Integer t) {
59     ++nexts;
60     notifyAll();
61     int current = t.intValue();
62     threadAssertTrue(current >= last);
63     last = current;
64     if (request)
65     sn.request(1L);
66     if (throwOnCall)
67     throw new SPException();
68     }
69     public synchronized void onError(Throwable t) {
70     threadAssertTrue(completes == 0);
71     threadAssertTrue(errors == 0);
72     lastError = t;
73     ++errors;
74     notifyAll();
75     }
76     public synchronized void onComplete() {
77     threadAssertTrue(completes == 0);
78     ++completes;
79     notifyAll();
80     }
81    
82     synchronized void awaitSubscribe() {
83     while (sn == null) {
84     try {
85     wait();
86     } catch (Exception ex) {
87     threadUnexpectedException(ex);
88     break;
89     }
90     }
91     }
92     synchronized void awaitNext(int n) {
93     while (nexts < n) {
94     try {
95     wait();
96     } catch (Exception ex) {
97     threadUnexpectedException(ex);
98     break;
99     }
100     }
101     }
102     synchronized void awaitComplete() {
103     while (completes == 0 && errors == 0) {
104     try {
105     wait();
106     } catch (Exception ex) {
107     threadUnexpectedException(ex);
108     break;
109     }
110     }
111     }
112     synchronized void awaitError() {
113     while (errors == 0) {
114     try {
115     wait();
116     } catch (Exception ex) {
117     threadUnexpectedException(ex);
118     break;
119     }
120     }
121     }
122    
123     }
124    
125     /**
126     * A new SubmissionPublisher has no subscribers, a non-null
127     * executor, a power-of-two capacity, is not closed, and reports
128     * zero demand and lag
129     */
130     void checkInitialState(SubmissionPublisher<?> p) {
131     assertFalse(p.hasSubscribers());
132 jsr166 1.6 assertEquals(0, p.getNumberOfSubscribers());
133 dl 1.1 assertTrue(p.getSubscribers().isEmpty());
134     assertFalse(p.isClosed());
135     assertNull(p.getClosedException());
136     int n = p.getMaxBufferCapacity();
137     assertTrue((n & (n - 1)) == 0); // power of two
138     assertNotNull(p.getExecutor());
139 jsr166 1.6 assertEquals(0, p.estimateMinimumDemand());
140     assertEquals(0, p.estimateMaximumLag());
141 dl 1.1 }
142    
143     /**
144     * A default-constructed SubmissionPublisher has no subscribers,
145     * is not closed, has default buffer size, and uses the
146 dl 1.12 * defaultExecutor
147 dl 1.1 */
148     public void testConstructor1() {
149 jsr166 1.18 SubmissionPublisher<Integer> p = new SubmissionPublisher<>();
150 dl 1.1 checkInitialState(p);
151     assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
152 dl 1.12 Executor e = p.getExecutor(), c = ForkJoinPool.commonPool();
153     if (ForkJoinPool.getCommonPoolParallelism() > 1)
154     assertSame(e, c);
155     else
156     assertNotSame(e, c);
157 dl 1.1 }
158    
159     /**
160     * A new SubmissionPublisher has no subscribers, is not closed,
161     * has the given buffer size, and uses the given executor
162     */
163     public void testConstructor2() {
164     Executor e = Executors.newFixedThreadPool(1);
165 jsr166 1.18 SubmissionPublisher<Integer> p = new SubmissionPublisher<>(e, 8);
166 dl 1.1 checkInitialState(p);
167     assertSame(p.getExecutor(), e);
168 jsr166 1.6 assertEquals(8, p.getMaxBufferCapacity());
169 dl 1.1 }
170    
171     /**
172     * A null Executor argument to SubmissionPublisher constructor throws NPE
173     */
174     public void testConstructor3() {
175     try {
176 jsr166 1.2 new SubmissionPublisher<Integer>(null, 8);
177 dl 1.1 shouldThrow();
178 jsr166 1.2 } catch (NullPointerException success) {}
179 dl 1.1 }
180    
181     /**
182     * A negative capacity argument to SubmissionPublisher constructor
183     * throws IAE
184     */
185     public void testConstructor4() {
186     Executor e = Executors.newFixedThreadPool(1);
187     try {
188 jsr166 1.2 new SubmissionPublisher<Integer>(e, -1);
189 dl 1.1 shouldThrow();
190 jsr166 1.2 } catch (IllegalArgumentException success) {}
191 dl 1.1 }
192    
193     /**
194     * A closed publisher reports isClosed with no closedException and
195     * throws ISE upon attempted submission; a subsequent close or
196     * closeExceptionally has no additional effect.
197     */
198     public void testClose() {
199     SubmissionPublisher<Integer> p = basicPublisher();
200     checkInitialState(p);
201     p.close();
202     assertTrue(p.isClosed());
203     assertNull(p.getClosedException());
204     try {
205     p.submit(1);
206     shouldThrow();
207 jsr166 1.2 } catch (IllegalStateException success) {}
208 dl 1.1 Throwable ex = new SPException();
209     p.closeExceptionally(ex);
210     assertTrue(p.isClosed());
211     assertNull(p.getClosedException());
212     }
213    
214     /**
215     * A publisher closedExceptionally reports isClosed with the
216     * closedException and throws ISE upon attempted submission; a
217     * subsequent close or closeExceptionally has no additional
218     * effect.
219     */
220     public void testCloseExceptionally() {
221     SubmissionPublisher<Integer> p = basicPublisher();
222     checkInitialState(p);
223     Throwable ex = new SPException();
224     p.closeExceptionally(ex);
225     assertTrue(p.isClosed());
226     assertSame(p.getClosedException(), ex);
227     try {
228     p.submit(1);
229     shouldThrow();
230 jsr166 1.2 } catch (IllegalStateException success) {}
231 dl 1.1 p.close();
232     assertTrue(p.isClosed());
233     assertSame(p.getClosedException(), ex);
234     }
235    
236     /**
237     * Upon subscription, the subscriber's onSubscribe is called, no
238     * other Subscriber methods are invoked, the publisher
239     * hasSubscribers, isSubscribed is true, and existing
240     * subscriptions are unaffected.
241     */
242     public void testSubscribe1() {
243     TestSubscriber s = new TestSubscriber();
244     SubmissionPublisher<Integer> p = basicPublisher();
245     p.subscribe(s);
246     assertTrue(p.hasSubscribers());
247 jsr166 1.6 assertEquals(1, p.getNumberOfSubscribers());
248 dl 1.1 assertTrue(p.getSubscribers().contains(s));
249     assertTrue(p.isSubscribed(s));
250     s.awaitSubscribe();
251     assertNotNull(s.sn);
252 jsr166 1.6 assertEquals(0, s.nexts);
253     assertEquals(0, s.errors);
254     assertEquals(0, s.completes);
255 dl 1.1 TestSubscriber s2 = new TestSubscriber();
256     p.subscribe(s2);
257     assertTrue(p.hasSubscribers());
258 jsr166 1.6 assertEquals(2, p.getNumberOfSubscribers());
259 dl 1.1 assertTrue(p.getSubscribers().contains(s));
260     assertTrue(p.getSubscribers().contains(s2));
261     assertTrue(p.isSubscribed(s));
262     assertTrue(p.isSubscribed(s2));
263     s2.awaitSubscribe();
264     assertNotNull(s2.sn);
265 jsr166 1.6 assertEquals(0, s2.nexts);
266     assertEquals(0, s2.errors);
267     assertEquals(0, s2.completes);
268 dl 1.9 p.close();
269 dl 1.1 }
270    
271     /**
272     * If closed, upon subscription, the subscriber's onComplete
273     * method is invoked
274     */
275     public void testSubscribe2() {
276     TestSubscriber s = new TestSubscriber();
277     SubmissionPublisher<Integer> p = basicPublisher();
278     p.close();
279     p.subscribe(s);
280     s.awaitComplete();
281 jsr166 1.6 assertEquals(0, s.nexts);
282     assertEquals(0, s.errors);
283     assertEquals(1, s.completes, 1);
284 dl 1.1 }
285    
286     /**
287     * If closedExceptionally, upon subscription, the subscriber's
288     * onError method is invoked
289     */
290     public void testSubscribe3() {
291     TestSubscriber s = new TestSubscriber();
292     SubmissionPublisher<Integer> p = basicPublisher();
293     Throwable ex = new SPException();
294     p.closeExceptionally(ex);
295     assertTrue(p.isClosed());
296     assertSame(p.getClosedException(), ex);
297     p.subscribe(s);
298     s.awaitError();
299 jsr166 1.6 assertEquals(0, s.nexts);
300     assertEquals(1, s.errors);
301 dl 1.1 }
302    
303     /**
304     * Upon attempted resubscription, the subscriber's onError is
305     * called and the subscription is cancelled.
306     */
307     public void testSubscribe4() {
308     TestSubscriber s = new TestSubscriber();
309     SubmissionPublisher<Integer> p = basicPublisher();
310     p.subscribe(s);
311     assertTrue(p.hasSubscribers());
312 jsr166 1.6 assertEquals(1, p.getNumberOfSubscribers());
313 dl 1.1 assertTrue(p.getSubscribers().contains(s));
314     assertTrue(p.isSubscribed(s));
315     s.awaitSubscribe();
316     assertNotNull(s.sn);
317 jsr166 1.6 assertEquals(0, s.nexts);
318     assertEquals(0, s.errors);
319     assertEquals(0, s.completes);
320 dl 1.1 p.subscribe(s);
321     s.awaitError();
322 jsr166 1.6 assertEquals(0, s.nexts);
323     assertEquals(1, s.errors);
324 dl 1.1 assertFalse(p.isSubscribed(s));
325     }
326    
327     /**
328     * An exception thrown in onSubscribe causes onError
329     */
330     public void testSubscribe5() {
331     TestSubscriber s = new TestSubscriber();
332     SubmissionPublisher<Integer> p = basicPublisher();
333     s.throwOnCall = true;
334     try {
335     p.subscribe(s);
336 jsr166 1.2 } catch (Exception ok) {}
337 dl 1.1 s.awaitError();
338 jsr166 1.6 assertEquals(0, s.nexts);
339     assertEquals(1, s.errors);
340     assertEquals(0, s.completes);
341 dl 1.1 }
342    
343     /**
344 jsr166 1.8 * subscribe(null) throws NPE
345 dl 1.1 */
346     public void testSubscribe6() {
347     SubmissionPublisher<Integer> p = basicPublisher();
348     try {
349     p.subscribe(null);
350 jsr166 1.2 shouldThrow();
351     } catch (NullPointerException success) {}
352 dl 1.1 checkInitialState(p);
353     }
354    
355     /**
356     * Closing a publisher causes onComplete to subscribers
357     */
358     public void testCloseCompletes() {
359     SubmissionPublisher<Integer> p = basicPublisher();
360     TestSubscriber s1 = new TestSubscriber();
361     TestSubscriber s2 = new TestSubscriber();
362     p.subscribe(s1);
363     p.subscribe(s2);
364     p.submit(1);
365     p.close();
366     assertTrue(p.isClosed());
367     assertNull(p.getClosedException());
368     s1.awaitComplete();
369 jsr166 1.6 assertEquals(1, s1.nexts);
370     assertEquals(1, s1.completes);
371 dl 1.1 s2.awaitComplete();
372 jsr166 1.6 assertEquals(1, s2.nexts);
373     assertEquals(1, s2.completes);
374 dl 1.1 }
375    
376     /**
377     * Closing a publisher exceptionally causes onError to subscribers
378 dl 1.16 * after they are subscribed
379 dl 1.1 */
380     public void testCloseExceptionallyError() {
381     SubmissionPublisher<Integer> p = basicPublisher();
382     TestSubscriber s1 = new TestSubscriber();
383     TestSubscriber s2 = new TestSubscriber();
384     p.subscribe(s1);
385     p.subscribe(s2);
386     p.submit(1);
387     p.closeExceptionally(new SPException());
388     assertTrue(p.isClosed());
389 dl 1.16 s1.awaitSubscribe();
390 dl 1.1 s1.awaitError();
391     assertTrue(s1.nexts <= 1);
392 jsr166 1.6 assertEquals(1, s1.errors);
393 dl 1.17 s2.awaitSubscribe();
394 dl 1.1 s2.awaitError();
395     assertTrue(s2.nexts <= 1);
396 jsr166 1.6 assertEquals(1, s2.errors);
397 dl 1.1 }
398    
399     /**
400     * Cancelling a subscription eventually causes no more onNexts to be issued
401     */
402     public void testCancel() {
403     SubmissionPublisher<Integer> p = basicPublisher();
404     TestSubscriber s1 = new TestSubscriber();
405     TestSubscriber s2 = new TestSubscriber();
406     p.subscribe(s1);
407     p.subscribe(s2);
408     s1.awaitSubscribe();
409     p.submit(1);
410     s1.sn.cancel();
411     for (int i = 2; i <= 20; ++i)
412     p.submit(i);
413     p.close();
414     s2.awaitComplete();
415 jsr166 1.6 assertEquals(20, s2.nexts);
416     assertEquals(1, s2.completes);
417 dl 1.1 assertTrue(s1.nexts < 20);
418     assertFalse(p.isSubscribed(s1));
419     }
420    
421     /**
422     * Throwing an exception in onNext causes onError
423     */
424     public void testThrowOnNext() {
425     SubmissionPublisher<Integer> p = basicPublisher();
426     TestSubscriber s1 = new TestSubscriber();
427     TestSubscriber s2 = new TestSubscriber();
428     p.subscribe(s1);
429     p.subscribe(s2);
430     s1.awaitSubscribe();
431     p.submit(1);
432     s1.throwOnCall = true;
433     p.submit(2);
434     p.close();
435     s2.awaitComplete();
436 jsr166 1.6 assertEquals(2, s2.nexts);
437 dl 1.1 s1.awaitComplete();
438 jsr166 1.6 assertEquals(1, s1.errors);
439 dl 1.1 }
440    
441     /**
442 jsr166 1.8 * If a handler is supplied in constructor, it is invoked when
443 dl 1.1 * subscriber throws an exception in onNext
444     */
445     public void testThrowOnNextHandler() {
446     AtomicInteger calls = new AtomicInteger();
447 jsr166 1.18 SubmissionPublisher<Integer> p = new SubmissionPublisher<>(
448     basicExecutor, 8, (s, e) -> calls.getAndIncrement());
449 dl 1.1 TestSubscriber s1 = new TestSubscriber();
450     TestSubscriber s2 = new TestSubscriber();
451     p.subscribe(s1);
452     p.subscribe(s2);
453     s1.awaitSubscribe();
454     p.submit(1);
455     s1.throwOnCall = true;
456     p.submit(2);
457     p.close();
458     s2.awaitComplete();
459 jsr166 1.6 assertEquals(2, s2.nexts);
460     assertEquals(1, s2.completes);
461 dl 1.1 s1.awaitError();
462 jsr166 1.6 assertEquals(1, s1.errors);
463     assertEquals(1, calls.get());
464 dl 1.1 }
465    
466     /**
467     * onNext items are issued in the same order to each subscriber
468     */
469     public void testOrder() {
470     SubmissionPublisher<Integer> p = basicPublisher();
471     TestSubscriber s1 = new TestSubscriber();
472     TestSubscriber s2 = new TestSubscriber();
473     p.subscribe(s1);
474     p.subscribe(s2);
475     for (int i = 1; i <= 20; ++i)
476     p.submit(i);
477     p.close();
478     s2.awaitComplete();
479     s1.awaitComplete();
480 jsr166 1.6 assertEquals(20, s2.nexts);
481     assertEquals(1, s2.completes);
482     assertEquals(20, s1.nexts);
483     assertEquals(1, s1.completes);
484 dl 1.1 }
485    
486     /**
487     * onNext is issued only if requested
488     */
489     public void testRequest1() {
490     SubmissionPublisher<Integer> p = basicPublisher();
491     TestSubscriber s1 = new TestSubscriber();
492     s1.request = false;
493     p.subscribe(s1);
494     s1.awaitSubscribe();
495     assertTrue(p.estimateMinimumDemand() == 0);
496     TestSubscriber s2 = new TestSubscriber();
497     p.subscribe(s2);
498     p.submit(1);
499     p.submit(2);
500     s2.awaitNext(1);
501 jsr166 1.6 assertEquals(0, s1.nexts);
502 dl 1.1 s1.sn.request(3);
503     p.submit(3);
504     p.close();
505     s2.awaitComplete();
506 jsr166 1.6 assertEquals(3, s2.nexts);
507     assertEquals(1, s2.completes);
508 dl 1.1 s1.awaitComplete();
509     assertTrue(s1.nexts > 0);
510 jsr166 1.6 assertEquals(1, s1.completes);
511 dl 1.1 }
512    
513     /**
514     * onNext is not issued when requests become zero
515     */
516     public void testRequest2() {
517     SubmissionPublisher<Integer> p = basicPublisher();
518     TestSubscriber s1 = new TestSubscriber();
519     TestSubscriber s2 = new TestSubscriber();
520     p.subscribe(s1);
521     p.subscribe(s2);
522     s2.awaitSubscribe();
523     s1.awaitSubscribe();
524     s1.request = false;
525     p.submit(1);
526     p.submit(2);
527     p.close();
528     s2.awaitComplete();
529 jsr166 1.6 assertEquals(2, s2.nexts);
530     assertEquals(1, s2.completes);
531 dl 1.1 s1.awaitNext(1);
532 jsr166 1.6 assertEquals(1, s1.nexts);
533 dl 1.1 }
534    
535     /**
536 dl 1.19 * Non-positive request causes error
537 dl 1.1 */
538     public void testRequest3() {
539     SubmissionPublisher<Integer> p = basicPublisher();
540     TestSubscriber s1 = new TestSubscriber();
541     TestSubscriber s2 = new TestSubscriber();
542 dl 1.19 TestSubscriber s3 = new TestSubscriber();
543 dl 1.1 p.subscribe(s1);
544     p.subscribe(s2);
545 dl 1.19 p.subscribe(s3);
546     s3.awaitSubscribe();
547 dl 1.1 s2.awaitSubscribe();
548     s1.awaitSubscribe();
549     s1.sn.request(-1L);
550 dl 1.19 s3.sn.request(0L);
551 dl 1.1 p.submit(1);
552     p.submit(2);
553     p.close();
554     s2.awaitComplete();
555 jsr166 1.6 assertEquals(2, s2.nexts);
556     assertEquals(1, s2.completes);
557 dl 1.1 s1.awaitError();
558 jsr166 1.6 assertEquals(1, s1.errors);
559 dl 1.1 assertTrue(s1.lastError instanceof IllegalArgumentException);
560 dl 1.19 s3.awaitError();
561     assertEquals(1, s3.errors);
562     assertTrue(s3.lastError instanceof IllegalArgumentException);
563 dl 1.1 }
564    
565     /**
566     * estimateMinimumDemand reports 0 until request, nonzero after
567     * request, and zero again after delivery
568     */
569     public void testEstimateMinimumDemand() {
570     TestSubscriber s = new TestSubscriber();
571     SubmissionPublisher<Integer> p = basicPublisher();
572     s.request = false;
573     p.subscribe(s);
574     s.awaitSubscribe();
575 jsr166 1.6 assertEquals(0, p.estimateMinimumDemand());
576 dl 1.1 s.sn.request(1);
577 jsr166 1.6 assertEquals(1, p.estimateMinimumDemand());
578 dl 1.1 p.submit(1);
579     s.awaitNext(1);
580 jsr166 1.6 assertEquals(0, p.estimateMinimumDemand());
581 dl 1.1 }
582    
583     /**
584 jsr166 1.3 * submit to a publisher with no subscribers returns lag 0
585 dl 1.1 */
586     public void testEmptySubmit() {
587     SubmissionPublisher<Integer> p = basicPublisher();
588 jsr166 1.6 assertEquals(0, p.submit(1));
589 dl 1.1 }
590    
591     /**
592 jsr166 1.3 * submit(null) throws NPE
593 dl 1.1 */
594     public void testNullSubmit() {
595     SubmissionPublisher<Integer> p = basicPublisher();
596     try {
597     p.submit(null);
598 jsr166 1.2 shouldThrow();
599     } catch (NullPointerException success) {}
600 dl 1.1 }
601    
602     /**
603 jsr166 1.3 * submit returns number of lagged items, compatible with result
604 dl 1.1 * of estimateMaximumLag.
605     */
606     public void testLaggedSubmit() {
607     SubmissionPublisher<Integer> p = basicPublisher();
608     TestSubscriber s1 = new TestSubscriber();
609     s1.request = false;
610     TestSubscriber s2 = new TestSubscriber();
611     s2.request = false;
612     p.subscribe(s1);
613     p.subscribe(s2);
614     s2.awaitSubscribe();
615     s1.awaitSubscribe();
616 jsr166 1.6 assertEquals(1, p.submit(1));
617 dl 1.1 assertTrue(p.estimateMaximumLag() >= 1);
618     assertTrue(p.submit(2) >= 2);
619     assertTrue(p.estimateMaximumLag() >= 2);
620     s1.sn.request(4);
621     assertTrue(p.submit(3) >= 3);
622     assertTrue(p.estimateMaximumLag() >= 3);
623     s2.sn.request(4);
624     p.submit(4);
625     p.close();
626     s2.awaitComplete();
627 jsr166 1.6 assertEquals(4, s2.nexts);
628 dl 1.1 s1.awaitComplete();
629 jsr166 1.6 assertEquals(4, s2.nexts);
630 dl 1.1 }
631    
632     /**
633     * submit eventually issues requested items when buffer capacity is 1
634     */
635     public void testCap1Submit() {
636 jsr166 1.18 SubmissionPublisher<Integer> p
637     = new SubmissionPublisher<>(basicExecutor, 1);
638 dl 1.1 TestSubscriber s1 = new TestSubscriber();
639     TestSubscriber s2 = new TestSubscriber();
640     p.subscribe(s1);
641     p.subscribe(s2);
642     for (int i = 1; i <= 20; ++i) {
643     assertTrue(p.estimateMinimumDemand() <= 1);
644     assertTrue(p.submit(i) >= 0);
645     }
646     p.close();
647     s2.awaitComplete();
648     s1.awaitComplete();
649 jsr166 1.6 assertEquals(20, s2.nexts);
650     assertEquals(1, s2.completes);
651     assertEquals(20, s1.nexts);
652     assertEquals(1, s1.completes);
653 dl 1.1 }
654 jsr166 1.2
655 dl 1.1 static boolean noopHandle(AtomicInteger count) {
656     count.getAndIncrement();
657     return false;
658     }
659    
660     static boolean reqHandle(AtomicInteger count, Subscriber s) {
661     count.getAndIncrement();
662     ((TestSubscriber)s).sn.request(Long.MAX_VALUE);
663     return true;
664     }
665    
666     /**
667 jsr166 1.3 * offer to a publisher with no subscribers returns lag 0
668 dl 1.1 */
669     public void testEmptyOffer() {
670     SubmissionPublisher<Integer> p = basicPublisher();
671 jsr166 1.3 assertEquals(0, p.offer(1, null));
672 dl 1.1 }
673    
674     /**
675 jsr166 1.3 * offer(null) throws NPE
676 dl 1.1 */
677     public void testNullOffer() {
678     SubmissionPublisher<Integer> p = basicPublisher();
679     try {
680     p.offer(null, null);
681 jsr166 1.2 shouldThrow();
682     } catch (NullPointerException success) {}
683 dl 1.1 }
684    
685     /**
686 jsr166 1.3 * offer returns number of lagged items if not saturated
687 dl 1.1 */
688     public void testLaggedOffer() {
689     SubmissionPublisher<Integer> p = basicPublisher();
690     TestSubscriber s1 = new TestSubscriber();
691     s1.request = false;
692     TestSubscriber s2 = new TestSubscriber();
693     s2.request = false;
694     p.subscribe(s1);
695     p.subscribe(s2);
696     s2.awaitSubscribe();
697     s1.awaitSubscribe();
698     assertTrue(p.offer(1, null) >= 1);
699     assertTrue(p.offer(2, null) >= 2);
700     s1.sn.request(4);
701     assertTrue(p.offer(3, null) >= 3);
702     s2.sn.request(4);
703     p.offer(4, null);
704     p.close();
705     s2.awaitComplete();
706 jsr166 1.6 assertEquals(4, s2.nexts);
707 dl 1.1 s1.awaitComplete();
708 jsr166 1.6 assertEquals(4, s2.nexts);
709 dl 1.1 }
710    
711     /**
712 jsr166 1.3 * offer reports drops if saturated
713 dl 1.1 */
714     public void testDroppedOffer() {
715 jsr166 1.18 SubmissionPublisher<Integer> p
716     = new SubmissionPublisher<>(basicExecutor, 4);
717 dl 1.1 TestSubscriber s1 = new TestSubscriber();
718     s1.request = false;
719     TestSubscriber s2 = new TestSubscriber();
720     s2.request = false;
721     p.subscribe(s1);
722     p.subscribe(s2);
723     s2.awaitSubscribe();
724     s1.awaitSubscribe();
725     for (int i = 1; i <= 4; ++i)
726     assertTrue(p.offer(i, null) >= 0);
727     p.offer(5, null);
728     assertTrue(p.offer(6, null) < 0);
729     s1.sn.request(64);
730     assertTrue(p.offer(7, null) < 0);
731     s2.sn.request(64);
732     p.close();
733     s2.awaitComplete();
734     assertTrue(s2.nexts >= 4);
735     s1.awaitComplete();
736     assertTrue(s1.nexts >= 4);
737     }
738    
739     /**
740 jsr166 1.3 * offer invokes drop handler if saturated
741 dl 1.1 */
742     public void testHandledDroppedOffer() {
743     AtomicInteger calls = new AtomicInteger();
744 jsr166 1.18 SubmissionPublisher<Integer> p
745     = new SubmissionPublisher<>(basicExecutor, 4);
746 dl 1.1 TestSubscriber s1 = new TestSubscriber();
747     s1.request = false;
748     TestSubscriber s2 = new TestSubscriber();
749     s2.request = false;
750     p.subscribe(s1);
751     p.subscribe(s2);
752     s2.awaitSubscribe();
753     s1.awaitSubscribe();
754     for (int i = 1; i <= 4; ++i)
755     assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
756     p.offer(4, (s, x) -> noopHandle(calls));
757     assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
758     s1.sn.request(64);
759     assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
760     s2.sn.request(64);
761     p.close();
762     s2.awaitComplete();
763     s1.awaitComplete();
764     assertTrue(calls.get() >= 4);
765     }
766    
767     /**
768 jsr166 1.3 * offer succeeds if drop handler forces request
769 dl 1.1 */
770     public void testRecoveredHandledDroppedOffer() {
771     AtomicInteger calls = new AtomicInteger();
772 jsr166 1.18 SubmissionPublisher<Integer> p
773     = new SubmissionPublisher<>(basicExecutor, 4);
774 dl 1.1 TestSubscriber s1 = new TestSubscriber();
775     s1.request = false;
776     TestSubscriber s2 = new TestSubscriber();
777     s2.request = false;
778     p.subscribe(s1);
779     p.subscribe(s2);
780     s2.awaitSubscribe();
781     s1.awaitSubscribe();
782     int n = 0;
783     for (int i = 1; i <= 8; ++i) {
784     int d = p.offer(i, (s, x) -> reqHandle(calls, s));
785     n = n + 2 + (d < 0 ? d : 0);
786     }
787     p.close();
788     s2.awaitComplete();
789     s1.awaitComplete();
790 jsr166 1.6 assertEquals(n, s1.nexts + s2.nexts);
791 dl 1.1 assertTrue(calls.get() >= 2);
792     }
793    
794     /**
795 jsr166 1.4 * Timed offer to a publisher with no subscribers returns lag 0
796 dl 1.1 */
797     public void testEmptyTimedOffer() {
798     SubmissionPublisher<Integer> p = basicPublisher();
799 jsr166 1.7 long startTime = System.nanoTime();
800 jsr166 1.4 assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null));
801 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
802 dl 1.1 }
803    
804     /**
805 jsr166 1.3 * Timed offer with null item or TimeUnit throws NPE
806 dl 1.1 */
807     public void testNullTimedOffer() {
808     SubmissionPublisher<Integer> p = basicPublisher();
809 jsr166 1.7 long startTime = System.nanoTime();
810 dl 1.1 try {
811 jsr166 1.7 p.offer(null, LONG_DELAY_MS, MILLISECONDS, null);
812 jsr166 1.2 shouldThrow();
813     } catch (NullPointerException success) {}
814 dl 1.1 try {
815 jsr166 1.7 p.offer(1, LONG_DELAY_MS, null, null);
816 jsr166 1.2 shouldThrow();
817     } catch (NullPointerException success) {}
818 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
819 dl 1.1 }
820    
821     /**
822 jsr166 1.3 * Timed offer returns number of lagged items if not saturated
823 dl 1.1 */
824     public void testLaggedTimedOffer() {
825     SubmissionPublisher<Integer> p = basicPublisher();
826     TestSubscriber s1 = new TestSubscriber();
827     s1.request = false;
828     TestSubscriber s2 = new TestSubscriber();
829     s2.request = false;
830     p.subscribe(s1);
831     p.subscribe(s2);
832     s2.awaitSubscribe();
833     s1.awaitSubscribe();
834 jsr166 1.7 long startTime = System.nanoTime();
835     assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
836     assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
837 dl 1.1 s1.sn.request(4);
838 jsr166 1.7 assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
839 dl 1.1 s2.sn.request(4);
840 jsr166 1.7 p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
841 dl 1.1 p.close();
842     s2.awaitComplete();
843 jsr166 1.6 assertEquals(4, s2.nexts);
844 dl 1.1 s1.awaitComplete();
845 jsr166 1.6 assertEquals(4, s2.nexts);
846 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
847 dl 1.1 }
848    
849     /**
850 jsr166 1.3 * Timed offer reports drops if saturated
851 dl 1.1 */
852     public void testDroppedTimedOffer() {
853 jsr166 1.18 SubmissionPublisher<Integer> p
854     = new SubmissionPublisher<>(basicExecutor, 4);
855 dl 1.1 TestSubscriber s1 = new TestSubscriber();
856     s1.request = false;
857     TestSubscriber s2 = new TestSubscriber();
858     s2.request = false;
859     p.subscribe(s1);
860     p.subscribe(s2);
861     s2.awaitSubscribe();
862     s1.awaitSubscribe();
863 dl 1.9 long delay = timeoutMillis();
864 dl 1.1 for (int i = 1; i <= 4; ++i)
865 dl 1.9 assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0);
866     long startTime = System.nanoTime();
867     assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0);
868 dl 1.1 s1.sn.request(64);
869 dl 1.9 assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0);
870     // 2 * delay should elapse but check only 1 * delay to allow timer slop
871     assertTrue(millisElapsedSince(startTime) >= delay);
872 dl 1.1 s2.sn.request(64);
873     p.close();
874     s2.awaitComplete();
875     assertTrue(s2.nexts >= 2);
876     s1.awaitComplete();
877     assertTrue(s1.nexts >= 2);
878     }
879    
880     /**
881 jsr166 1.3 * Timed offer invokes drop handler if saturated
882 dl 1.1 */
883     public void testHandledDroppedTimedOffer() {
884     AtomicInteger calls = new AtomicInteger();
885 jsr166 1.18 SubmissionPublisher<Integer> p
886     = new SubmissionPublisher<>(basicExecutor, 4);
887 dl 1.1 TestSubscriber s1 = new TestSubscriber();
888     s1.request = false;
889     TestSubscriber s2 = new TestSubscriber();
890     s2.request = false;
891     p.subscribe(s1);
892     p.subscribe(s2);
893     s2.awaitSubscribe();
894     s1.awaitSubscribe();
895 dl 1.9 long delay = timeoutMillis();
896 dl 1.1 for (int i = 1; i <= 4; ++i)
897 dl 1.9 assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
898     long startTime = System.nanoTime();
899     assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
900 dl 1.1 s1.sn.request(64);
901 dl 1.9 assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
902     assertTrue(millisElapsedSince(startTime) >= delay);
903 dl 1.1 s2.sn.request(64);
904     p.close();
905     s2.awaitComplete();
906     s1.awaitComplete();
907     assertTrue(calls.get() >= 2);
908     }
909    
910     /**
911 jsr166 1.3 * Timed offer succeeds if drop handler forces request
912 dl 1.1 */
913     public void testRecoveredHandledDroppedTimedOffer() {
914     AtomicInteger calls = new AtomicInteger();
915 jsr166 1.18 SubmissionPublisher<Integer> p
916     = new SubmissionPublisher<>(basicExecutor, 4);
917 dl 1.1 TestSubscriber s1 = new TestSubscriber();
918     s1.request = false;
919     TestSubscriber s2 = new TestSubscriber();
920     s2.request = false;
921     p.subscribe(s1);
922     p.subscribe(s2);
923     s2.awaitSubscribe();
924     s1.awaitSubscribe();
925     int n = 0;
926 dl 1.9 long delay = timeoutMillis();
927     long startTime = System.nanoTime();
928     for (int i = 1; i <= 6; ++i) {
929     int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
930 dl 1.1 n = n + 2 + (d < 0 ? d : 0);
931     }
932 dl 1.9 assertTrue(millisElapsedSince(startTime) >= delay);
933 dl 1.1 p.close();
934     s2.awaitComplete();
935     s1.awaitComplete();
936 jsr166 1.6 assertEquals(n, s1.nexts + s2.nexts);
937 dl 1.1 assertTrue(calls.get() >= 2);
938     }
939    
940 dl 1.10 /**
941     * consume returns a CompletableFuture that is done when
942     * publisher completes
943     */
944     public void testConsume() {
945     AtomicInteger sum = new AtomicInteger();
946     SubmissionPublisher<Integer> p = basicPublisher();
947 jsr166 1.11 CompletableFuture<Void> f =
948 jsr166 1.15 p.consume((Integer x) -> sum.getAndAdd(x.intValue()));
949 dl 1.10 int n = 20;
950     for (int i = 1; i <= n; ++i)
951     p.submit(i);
952     p.close();
953     f.join();
954     assertEquals((n * (n + 1)) / 2, sum.get());
955     }
956    
957     /**
958     * consume(null) throws NPE
959     */
960     public void testConsumeNPE() {
961     SubmissionPublisher<Integer> p = basicPublisher();
962     try {
963     CompletableFuture<Void> f = p.consume(null);
964     shouldThrow();
965 jsr166 1.11 } catch (NullPointerException success) {}
966 dl 1.10 }
967    
968     /**
969     * consume eventually stops processing published items if cancelled
970     */
971     public void testCancelledConsume() {
972     AtomicInteger count = new AtomicInteger();
973     SubmissionPublisher<Integer> p = basicPublisher();
974     CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
975     f.cancel(true);
976     int n = 1000000; // arbitrary limit
977     for (int i = 1; i <= n; ++i)
978     p.submit(i);
979     assertTrue(count.get() < n);
980     }
981 jsr166 1.11
982 dl 1.1 }