ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SubmissionPublisherTest.java
Revision: 1.16
Committed: Sun Dec 11 22:11:45 2016 UTC (7 years, 4 months ago) by dl
Branch: MAIN
Changes since 1.15: +3 -0 lines
Log Message:
Test that onSubscribe has precedence over onError

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea and Martin Buchholz with assistance from
3     * members of JCP JSR-166 Expert Group and released to the public
4     * domain, as explained at
5     * http://creativecommons.org/publicdomain/zero/1.0/
6     */
7    
8 dl 1.10 import java.util.concurrent.CompletableFuture;
9 dl 1.1 import java.util.concurrent.Executor;
10     import java.util.concurrent.Executors;
11     import java.util.concurrent.Flow;
12 dl 1.10 import java.util.concurrent.ForkJoinPool;
13 dl 1.1 import java.util.concurrent.SubmissionPublisher;
14     import java.util.concurrent.atomic.AtomicInteger;
15     import junit.framework.Test;
16     import junit.framework.TestSuite;
17    
18 dl 1.10 import static java.util.concurrent.Flow.Subscriber;
19     import static java.util.concurrent.Flow.Subscription;
20     import static java.util.concurrent.TimeUnit.MILLISECONDS;
21    
22 dl 1.1 public class SubmissionPublisherTest extends JSR166TestCase {
23    
24     public static void main(String[] args) {
25     main(suite(), args);
26     }
27     public static Test suite() {
28     return new TestSuite(SubmissionPublisherTest.class);
29     }
30    
31 dl 1.12 final Executor basicExecutor = basicPublisher().getExecutor();
32 jsr166 1.13
33 dl 1.1 static SubmissionPublisher<Integer> basicPublisher() {
34 dl 1.12 return new SubmissionPublisher<Integer>();
35 dl 1.1 }
36 jsr166 1.2
37 dl 1.1 static class SPException extends RuntimeException {}
38    
39     class TestSubscriber implements Subscriber<Integer> {
40     volatile Subscription sn;
41     int last; // Requires that onNexts are in numeric order
42     volatile int nexts;
43     volatile int errors;
44     volatile int completes;
45     volatile boolean throwOnCall = false;
46     volatile boolean request = true;
47     volatile Throwable lastError;
48    
49     public synchronized void onSubscribe(Subscription s) {
50     threadAssertTrue(sn == null);
51     sn = s;
52     notifyAll();
53     if (throwOnCall)
54     throw new SPException();
55     if (request)
56     sn.request(1L);
57     }
58     public synchronized void onNext(Integer t) {
59     ++nexts;
60     notifyAll();
61     int current = t.intValue();
62     threadAssertTrue(current >= last);
63     last = current;
64     if (request)
65     sn.request(1L);
66     if (throwOnCall)
67     throw new SPException();
68     }
69     public synchronized void onError(Throwable t) {
70     threadAssertTrue(completes == 0);
71     threadAssertTrue(errors == 0);
72     lastError = t;
73     ++errors;
74     notifyAll();
75     }
76     public synchronized void onComplete() {
77     threadAssertTrue(completes == 0);
78     ++completes;
79     notifyAll();
80     }
81    
82     synchronized void awaitSubscribe() {
83     while (sn == null) {
84     try {
85     wait();
86     } catch (Exception ex) {
87     threadUnexpectedException(ex);
88     break;
89     }
90     }
91     }
92     synchronized void awaitNext(int n) {
93     while (nexts < n) {
94     try {
95     wait();
96     } catch (Exception ex) {
97     threadUnexpectedException(ex);
98     break;
99     }
100     }
101     }
102     synchronized void awaitComplete() {
103     while (completes == 0 && errors == 0) {
104     try {
105     wait();
106     } catch (Exception ex) {
107     threadUnexpectedException(ex);
108     break;
109     }
110     }
111     }
112     synchronized void awaitError() {
113     while (errors == 0) {
114     try {
115     wait();
116     } catch (Exception ex) {
117     threadUnexpectedException(ex);
118     break;
119     }
120     }
121     }
122    
123     }
124    
125     /**
126     * A new SubmissionPublisher has no subscribers, a non-null
127     * executor, a power-of-two capacity, is not closed, and reports
128     * zero demand and lag
129     */
130     void checkInitialState(SubmissionPublisher<?> p) {
131     assertFalse(p.hasSubscribers());
132 jsr166 1.6 assertEquals(0, p.getNumberOfSubscribers());
133 dl 1.1 assertTrue(p.getSubscribers().isEmpty());
134     assertFalse(p.isClosed());
135     assertNull(p.getClosedException());
136     int n = p.getMaxBufferCapacity();
137     assertTrue((n & (n - 1)) == 0); // power of two
138     assertNotNull(p.getExecutor());
139 jsr166 1.6 assertEquals(0, p.estimateMinimumDemand());
140     assertEquals(0, p.estimateMaximumLag());
141 dl 1.1 }
142    
143     /**
144     * A default-constructed SubmissionPublisher has no subscribers,
145     * is not closed, has default buffer size, and uses the
146 dl 1.12 * defaultExecutor
147 dl 1.1 */
148     public void testConstructor1() {
149     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>();
150     checkInitialState(p);
151     assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
152 dl 1.12 Executor e = p.getExecutor(), c = ForkJoinPool.commonPool();
153     if (ForkJoinPool.getCommonPoolParallelism() > 1)
154     assertSame(e, c);
155     else
156     assertNotSame(e, c);
157 dl 1.1 }
158    
159     /**
160     * A new SubmissionPublisher has no subscribers, is not closed,
161     * has the given buffer size, and uses the given executor
162     */
163     public void testConstructor2() {
164     Executor e = Executors.newFixedThreadPool(1);
165     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(e, 8);
166     checkInitialState(p);
167     assertSame(p.getExecutor(), e);
168 jsr166 1.6 assertEquals(8, p.getMaxBufferCapacity());
169 dl 1.1 }
170    
171     /**
172     * A null Executor argument to SubmissionPublisher constructor throws NPE
173     */
174     public void testConstructor3() {
175     try {
176 jsr166 1.2 new SubmissionPublisher<Integer>(null, 8);
177 dl 1.1 shouldThrow();
178 jsr166 1.2 } catch (NullPointerException success) {}
179 dl 1.1 }
180    
181     /**
182     * A negative capacity argument to SubmissionPublisher constructor
183     * throws IAE
184     */
185     public void testConstructor4() {
186     Executor e = Executors.newFixedThreadPool(1);
187     try {
188 jsr166 1.2 new SubmissionPublisher<Integer>(e, -1);
189 dl 1.1 shouldThrow();
190 jsr166 1.2 } catch (IllegalArgumentException success) {}
191 dl 1.1 }
192    
193     /**
194     * A closed publisher reports isClosed with no closedException and
195     * throws ISE upon attempted submission; a subsequent close or
196     * closeExceptionally has no additional effect.
197     */
198     public void testClose() {
199     SubmissionPublisher<Integer> p = basicPublisher();
200     checkInitialState(p);
201     p.close();
202     assertTrue(p.isClosed());
203     assertNull(p.getClosedException());
204     try {
205     p.submit(1);
206     shouldThrow();
207 jsr166 1.2 } catch (IllegalStateException success) {}
208 dl 1.1 Throwable ex = new SPException();
209     p.closeExceptionally(ex);
210     assertTrue(p.isClosed());
211     assertNull(p.getClosedException());
212     }
213    
214     /**
215     * A publisher closedExceptionally reports isClosed with the
216     * closedException and throws ISE upon attempted submission; a
217     * subsequent close or closeExceptionally has no additional
218     * effect.
219     */
220     public void testCloseExceptionally() {
221     SubmissionPublisher<Integer> p = basicPublisher();
222     checkInitialState(p);
223     Throwable ex = new SPException();
224     p.closeExceptionally(ex);
225     assertTrue(p.isClosed());
226     assertSame(p.getClosedException(), ex);
227     try {
228     p.submit(1);
229     shouldThrow();
230 jsr166 1.2 } catch (IllegalStateException success) {}
231 dl 1.1 p.close();
232     assertTrue(p.isClosed());
233     assertSame(p.getClosedException(), ex);
234     }
235    
236     /**
237     * Upon subscription, the subscriber's onSubscribe is called, no
238     * other Subscriber methods are invoked, the publisher
239     * hasSubscribers, isSubscribed is true, and existing
240     * subscriptions are unaffected.
241     */
242     public void testSubscribe1() {
243     TestSubscriber s = new TestSubscriber();
244     SubmissionPublisher<Integer> p = basicPublisher();
245     p.subscribe(s);
246     assertTrue(p.hasSubscribers());
247 jsr166 1.6 assertEquals(1, p.getNumberOfSubscribers());
248 dl 1.1 assertTrue(p.getSubscribers().contains(s));
249     assertTrue(p.isSubscribed(s));
250     s.awaitSubscribe();
251     assertNotNull(s.sn);
252 jsr166 1.6 assertEquals(0, s.nexts);
253     assertEquals(0, s.errors);
254     assertEquals(0, s.completes);
255 dl 1.1 TestSubscriber s2 = new TestSubscriber();
256     p.subscribe(s2);
257     assertTrue(p.hasSubscribers());
258 jsr166 1.6 assertEquals(2, p.getNumberOfSubscribers());
259 dl 1.1 assertTrue(p.getSubscribers().contains(s));
260     assertTrue(p.getSubscribers().contains(s2));
261     assertTrue(p.isSubscribed(s));
262     assertTrue(p.isSubscribed(s2));
263     s2.awaitSubscribe();
264     assertNotNull(s2.sn);
265 jsr166 1.6 assertEquals(0, s2.nexts);
266     assertEquals(0, s2.errors);
267     assertEquals(0, s2.completes);
268 dl 1.9 p.close();
269 dl 1.1 }
270    
271     /**
272     * If closed, upon subscription, the subscriber's onComplete
273     * method is invoked
274     */
275     public void testSubscribe2() {
276     TestSubscriber s = new TestSubscriber();
277     SubmissionPublisher<Integer> p = basicPublisher();
278     p.close();
279     p.subscribe(s);
280     s.awaitComplete();
281 jsr166 1.6 assertEquals(0, s.nexts);
282     assertEquals(0, s.errors);
283     assertEquals(1, s.completes, 1);
284 dl 1.1 }
285    
286     /**
287     * If closedExceptionally, upon subscription, the subscriber's
288     * onError method is invoked
289     */
290     public void testSubscribe3() {
291     TestSubscriber s = new TestSubscriber();
292     SubmissionPublisher<Integer> p = basicPublisher();
293     Throwable ex = new SPException();
294     p.closeExceptionally(ex);
295     assertTrue(p.isClosed());
296     assertSame(p.getClosedException(), ex);
297     p.subscribe(s);
298     s.awaitError();
299 jsr166 1.6 assertEquals(0, s.nexts);
300     assertEquals(1, s.errors);
301 dl 1.1 }
302    
303     /**
304     * Upon attempted resubscription, the subscriber's onError is
305     * called and the subscription is cancelled.
306     */
307     public void testSubscribe4() {
308     TestSubscriber s = new TestSubscriber();
309     SubmissionPublisher<Integer> p = basicPublisher();
310     p.subscribe(s);
311     assertTrue(p.hasSubscribers());
312 jsr166 1.6 assertEquals(1, p.getNumberOfSubscribers());
313 dl 1.1 assertTrue(p.getSubscribers().contains(s));
314     assertTrue(p.isSubscribed(s));
315     s.awaitSubscribe();
316     assertNotNull(s.sn);
317 jsr166 1.6 assertEquals(0, s.nexts);
318     assertEquals(0, s.errors);
319     assertEquals(0, s.completes);
320 dl 1.1 p.subscribe(s);
321     s.awaitError();
322 jsr166 1.6 assertEquals(0, s.nexts);
323     assertEquals(1, s.errors);
324 dl 1.1 assertFalse(p.isSubscribed(s));
325     }
326    
327     /**
328     * An exception thrown in onSubscribe causes onError
329     */
330     public void testSubscribe5() {
331     TestSubscriber s = new TestSubscriber();
332     SubmissionPublisher<Integer> p = basicPublisher();
333     s.throwOnCall = true;
334     try {
335     p.subscribe(s);
336 jsr166 1.2 } catch (Exception ok) {}
337 dl 1.1 s.awaitError();
338 jsr166 1.6 assertEquals(0, s.nexts);
339     assertEquals(1, s.errors);
340     assertEquals(0, s.completes);
341 dl 1.1 }
342    
343     /**
344 jsr166 1.8 * subscribe(null) throws NPE
345 dl 1.1 */
346     public void testSubscribe6() {
347     SubmissionPublisher<Integer> p = basicPublisher();
348     try {
349     p.subscribe(null);
350 jsr166 1.2 shouldThrow();
351     } catch (NullPointerException success) {}
352 dl 1.1 checkInitialState(p);
353     }
354    
355     /**
356     * Closing a publisher causes onComplete to subscribers
357     */
358     public void testCloseCompletes() {
359     SubmissionPublisher<Integer> p = basicPublisher();
360     TestSubscriber s1 = new TestSubscriber();
361     TestSubscriber s2 = new TestSubscriber();
362     p.subscribe(s1);
363     p.subscribe(s2);
364     p.submit(1);
365     p.close();
366     assertTrue(p.isClosed());
367     assertNull(p.getClosedException());
368     s1.awaitComplete();
369 jsr166 1.6 assertEquals(1, s1.nexts);
370     assertEquals(1, s1.completes);
371 dl 1.1 s2.awaitComplete();
372 jsr166 1.6 assertEquals(1, s2.nexts);
373     assertEquals(1, s2.completes);
374 dl 1.1 }
375    
376     /**
377     * Closing a publisher exceptionally causes onError to subscribers
378 dl 1.16 * after they are subscribed
379 dl 1.1 */
380     public void testCloseExceptionallyError() {
381     SubmissionPublisher<Integer> p = basicPublisher();
382     TestSubscriber s1 = new TestSubscriber();
383     TestSubscriber s2 = new TestSubscriber();
384     p.subscribe(s1);
385     p.subscribe(s2);
386     p.submit(1);
387     p.closeExceptionally(new SPException());
388     assertTrue(p.isClosed());
389 dl 1.16 s1.awaitSubscribe();
390 dl 1.1 s1.awaitError();
391     assertTrue(s1.nexts <= 1);
392 jsr166 1.6 assertEquals(1, s1.errors);
393 dl 1.16 s1.awaitSubscribe();
394 dl 1.1 s2.awaitError();
395     assertTrue(s2.nexts <= 1);
396 jsr166 1.6 assertEquals(1, s2.errors);
397 dl 1.1 }
398    
399     /**
400     * Cancelling a subscription eventually causes no more onNexts to be issued
401     */
402     public void testCancel() {
403     SubmissionPublisher<Integer> p = basicPublisher();
404     TestSubscriber s1 = new TestSubscriber();
405     TestSubscriber s2 = new TestSubscriber();
406     p.subscribe(s1);
407     p.subscribe(s2);
408     s1.awaitSubscribe();
409     p.submit(1);
410     s1.sn.cancel();
411     for (int i = 2; i <= 20; ++i)
412     p.submit(i);
413     p.close();
414     s2.awaitComplete();
415 jsr166 1.6 assertEquals(20, s2.nexts);
416     assertEquals(1, s2.completes);
417 dl 1.1 assertTrue(s1.nexts < 20);
418     assertFalse(p.isSubscribed(s1));
419     }
420    
421     /**
422     * Throwing an exception in onNext causes onError
423     */
424     public void testThrowOnNext() {
425     SubmissionPublisher<Integer> p = basicPublisher();
426     TestSubscriber s1 = new TestSubscriber();
427     TestSubscriber s2 = new TestSubscriber();
428     p.subscribe(s1);
429     p.subscribe(s2);
430     s1.awaitSubscribe();
431     p.submit(1);
432     s1.throwOnCall = true;
433     p.submit(2);
434     p.close();
435     s2.awaitComplete();
436 jsr166 1.6 assertEquals(2, s2.nexts);
437 dl 1.1 s1.awaitComplete();
438 jsr166 1.6 assertEquals(1, s1.errors);
439 dl 1.1 }
440    
441     /**
442 jsr166 1.8 * If a handler is supplied in constructor, it is invoked when
443 dl 1.1 * subscriber throws an exception in onNext
444     */
445     public void testThrowOnNextHandler() {
446     AtomicInteger calls = new AtomicInteger();
447     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>
448     (basicExecutor, 8,
449     (s, e) -> calls.getAndIncrement());
450     TestSubscriber s1 = new TestSubscriber();
451     TestSubscriber s2 = new TestSubscriber();
452     p.subscribe(s1);
453     p.subscribe(s2);
454     s1.awaitSubscribe();
455     p.submit(1);
456     s1.throwOnCall = true;
457     p.submit(2);
458     p.close();
459     s2.awaitComplete();
460 jsr166 1.6 assertEquals(2, s2.nexts);
461     assertEquals(1, s2.completes);
462 dl 1.1 s1.awaitError();
463 jsr166 1.6 assertEquals(1, s1.errors);
464     assertEquals(1, calls.get());
465 dl 1.1 }
466    
467     /**
468     * onNext items are issued in the same order to each subscriber
469     */
470     public void testOrder() {
471     SubmissionPublisher<Integer> p = basicPublisher();
472     TestSubscriber s1 = new TestSubscriber();
473     TestSubscriber s2 = new TestSubscriber();
474     p.subscribe(s1);
475     p.subscribe(s2);
476     for (int i = 1; i <= 20; ++i)
477     p.submit(i);
478     p.close();
479     s2.awaitComplete();
480     s1.awaitComplete();
481 jsr166 1.6 assertEquals(20, s2.nexts);
482     assertEquals(1, s2.completes);
483     assertEquals(20, s1.nexts);
484     assertEquals(1, s1.completes);
485 dl 1.1 }
486    
487     /**
488     * onNext is issued only if requested
489     */
490     public void testRequest1() {
491     SubmissionPublisher<Integer> p = basicPublisher();
492     TestSubscriber s1 = new TestSubscriber();
493     s1.request = false;
494     p.subscribe(s1);
495     s1.awaitSubscribe();
496     assertTrue(p.estimateMinimumDemand() == 0);
497     TestSubscriber s2 = new TestSubscriber();
498     p.subscribe(s2);
499     p.submit(1);
500     p.submit(2);
501     s2.awaitNext(1);
502 jsr166 1.6 assertEquals(0, s1.nexts);
503 dl 1.1 s1.sn.request(3);
504     p.submit(3);
505     p.close();
506     s2.awaitComplete();
507 jsr166 1.6 assertEquals(3, s2.nexts);
508     assertEquals(1, s2.completes);
509 dl 1.1 s1.awaitComplete();
510     assertTrue(s1.nexts > 0);
511 jsr166 1.6 assertEquals(1, s1.completes);
512 dl 1.1 }
513    
514     /**
515     * onNext is not issued when requests become zero
516     */
517     public void testRequest2() {
518     SubmissionPublisher<Integer> p = basicPublisher();
519     TestSubscriber s1 = new TestSubscriber();
520     TestSubscriber s2 = new TestSubscriber();
521     p.subscribe(s1);
522     p.subscribe(s2);
523     s2.awaitSubscribe();
524     s1.awaitSubscribe();
525     s1.request = false;
526     p.submit(1);
527     p.submit(2);
528     p.close();
529     s2.awaitComplete();
530 jsr166 1.6 assertEquals(2, s2.nexts);
531     assertEquals(1, s2.completes);
532 dl 1.1 s1.awaitNext(1);
533 jsr166 1.6 assertEquals(1, s1.nexts);
534 dl 1.1 }
535    
536     /**
537     * Negative request causes error
538     */
539     public void testRequest3() {
540     SubmissionPublisher<Integer> p = basicPublisher();
541     TestSubscriber s1 = new TestSubscriber();
542     TestSubscriber s2 = new TestSubscriber();
543     p.subscribe(s1);
544     p.subscribe(s2);
545     s2.awaitSubscribe();
546     s1.awaitSubscribe();
547     s1.sn.request(-1L);
548     p.submit(1);
549     p.submit(2);
550     p.close();
551     s2.awaitComplete();
552 jsr166 1.6 assertEquals(2, s2.nexts);
553     assertEquals(1, s2.completes);
554 dl 1.1 s1.awaitError();
555 jsr166 1.6 assertEquals(1, s1.errors);
556 dl 1.1 assertTrue(s1.lastError instanceof IllegalArgumentException);
557     }
558    
559     /**
560     * estimateMinimumDemand reports 0 until request, nonzero after
561     * request, and zero again after delivery
562     */
563     public void testEstimateMinimumDemand() {
564     TestSubscriber s = new TestSubscriber();
565     SubmissionPublisher<Integer> p = basicPublisher();
566     s.request = false;
567     p.subscribe(s);
568     s.awaitSubscribe();
569 jsr166 1.6 assertEquals(0, p.estimateMinimumDemand());
570 dl 1.1 s.sn.request(1);
571 jsr166 1.6 assertEquals(1, p.estimateMinimumDemand());
572 dl 1.1 p.submit(1);
573     s.awaitNext(1);
574 jsr166 1.6 assertEquals(0, p.estimateMinimumDemand());
575 dl 1.1 }
576    
577     /**
578 jsr166 1.3 * submit to a publisher with no subscribers returns lag 0
579 dl 1.1 */
580     public void testEmptySubmit() {
581     SubmissionPublisher<Integer> p = basicPublisher();
582 jsr166 1.6 assertEquals(0, p.submit(1));
583 dl 1.1 }
584    
585     /**
586 jsr166 1.3 * submit(null) throws NPE
587 dl 1.1 */
588     public void testNullSubmit() {
589     SubmissionPublisher<Integer> p = basicPublisher();
590     try {
591     p.submit(null);
592 jsr166 1.2 shouldThrow();
593     } catch (NullPointerException success) {}
594 dl 1.1 }
595    
596     /**
597 jsr166 1.3 * submit returns number of lagged items, compatible with result
598 dl 1.1 * of estimateMaximumLag.
599     */
600     public void testLaggedSubmit() {
601     SubmissionPublisher<Integer> p = basicPublisher();
602     TestSubscriber s1 = new TestSubscriber();
603     s1.request = false;
604     TestSubscriber s2 = new TestSubscriber();
605     s2.request = false;
606     p.subscribe(s1);
607     p.subscribe(s2);
608     s2.awaitSubscribe();
609     s1.awaitSubscribe();
610 jsr166 1.6 assertEquals(1, p.submit(1));
611 dl 1.1 assertTrue(p.estimateMaximumLag() >= 1);
612     assertTrue(p.submit(2) >= 2);
613     assertTrue(p.estimateMaximumLag() >= 2);
614     s1.sn.request(4);
615     assertTrue(p.submit(3) >= 3);
616     assertTrue(p.estimateMaximumLag() >= 3);
617     s2.sn.request(4);
618     p.submit(4);
619     p.close();
620     s2.awaitComplete();
621 jsr166 1.6 assertEquals(4, s2.nexts);
622 dl 1.1 s1.awaitComplete();
623 jsr166 1.6 assertEquals(4, s2.nexts);
624 dl 1.1 }
625    
626     /**
627     * submit eventually issues requested items when buffer capacity is 1
628     */
629     public void testCap1Submit() {
630     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
631     basicExecutor, 1);
632     TestSubscriber s1 = new TestSubscriber();
633     TestSubscriber s2 = new TestSubscriber();
634     p.subscribe(s1);
635     p.subscribe(s2);
636     for (int i = 1; i <= 20; ++i) {
637     assertTrue(p.estimateMinimumDemand() <= 1);
638     assertTrue(p.submit(i) >= 0);
639     }
640     p.close();
641     s2.awaitComplete();
642     s1.awaitComplete();
643 jsr166 1.6 assertEquals(20, s2.nexts);
644     assertEquals(1, s2.completes);
645     assertEquals(20, s1.nexts);
646     assertEquals(1, s1.completes);
647 dl 1.1 }
648 jsr166 1.2
649 dl 1.1 static boolean noopHandle(AtomicInteger count) {
650     count.getAndIncrement();
651     return false;
652     }
653    
654     static boolean reqHandle(AtomicInteger count, Subscriber s) {
655     count.getAndIncrement();
656     ((TestSubscriber)s).sn.request(Long.MAX_VALUE);
657     return true;
658     }
659    
660     /**
661 jsr166 1.3 * offer to a publisher with no subscribers returns lag 0
662 dl 1.1 */
663     public void testEmptyOffer() {
664     SubmissionPublisher<Integer> p = basicPublisher();
665 jsr166 1.3 assertEquals(0, p.offer(1, null));
666 dl 1.1 }
667    
668     /**
669 jsr166 1.3 * offer(null) throws NPE
670 dl 1.1 */
671     public void testNullOffer() {
672     SubmissionPublisher<Integer> p = basicPublisher();
673     try {
674     p.offer(null, null);
675 jsr166 1.2 shouldThrow();
676     } catch (NullPointerException success) {}
677 dl 1.1 }
678    
679     /**
680 jsr166 1.3 * offer returns number of lagged items if not saturated
681 dl 1.1 */
682     public void testLaggedOffer() {
683     SubmissionPublisher<Integer> p = basicPublisher();
684     TestSubscriber s1 = new TestSubscriber();
685     s1.request = false;
686     TestSubscriber s2 = new TestSubscriber();
687     s2.request = false;
688     p.subscribe(s1);
689     p.subscribe(s2);
690     s2.awaitSubscribe();
691     s1.awaitSubscribe();
692     assertTrue(p.offer(1, null) >= 1);
693     assertTrue(p.offer(2, null) >= 2);
694     s1.sn.request(4);
695     assertTrue(p.offer(3, null) >= 3);
696     s2.sn.request(4);
697     p.offer(4, null);
698     p.close();
699     s2.awaitComplete();
700 jsr166 1.6 assertEquals(4, s2.nexts);
701 dl 1.1 s1.awaitComplete();
702 jsr166 1.6 assertEquals(4, s2.nexts);
703 dl 1.1 }
704    
705     /**
706 jsr166 1.3 * offer reports drops if saturated
707 dl 1.1 */
708     public void testDroppedOffer() {
709     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
710     basicExecutor, 4);
711     TestSubscriber s1 = new TestSubscriber();
712     s1.request = false;
713     TestSubscriber s2 = new TestSubscriber();
714     s2.request = false;
715     p.subscribe(s1);
716     p.subscribe(s2);
717     s2.awaitSubscribe();
718     s1.awaitSubscribe();
719     for (int i = 1; i <= 4; ++i)
720     assertTrue(p.offer(i, null) >= 0);
721     p.offer(5, null);
722     assertTrue(p.offer(6, null) < 0);
723     s1.sn.request(64);
724     assertTrue(p.offer(7, null) < 0);
725     s2.sn.request(64);
726     p.close();
727     s2.awaitComplete();
728     assertTrue(s2.nexts >= 4);
729     s1.awaitComplete();
730     assertTrue(s1.nexts >= 4);
731     }
732    
733     /**
734 jsr166 1.3 * offer invokes drop handler if saturated
735 dl 1.1 */
736     public void testHandledDroppedOffer() {
737     AtomicInteger calls = new AtomicInteger();
738     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
739     basicExecutor, 4);
740     TestSubscriber s1 = new TestSubscriber();
741     s1.request = false;
742     TestSubscriber s2 = new TestSubscriber();
743     s2.request = false;
744     p.subscribe(s1);
745     p.subscribe(s2);
746     s2.awaitSubscribe();
747     s1.awaitSubscribe();
748     for (int i = 1; i <= 4; ++i)
749     assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
750     p.offer(4, (s, x) -> noopHandle(calls));
751     assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
752     s1.sn.request(64);
753     assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
754     s2.sn.request(64);
755     p.close();
756     s2.awaitComplete();
757     s1.awaitComplete();
758     assertTrue(calls.get() >= 4);
759     }
760    
761     /**
762 jsr166 1.3 * offer succeeds if drop handler forces request
763 dl 1.1 */
764     public void testRecoveredHandledDroppedOffer() {
765     AtomicInteger calls = new AtomicInteger();
766     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
767     basicExecutor, 4);
768     TestSubscriber s1 = new TestSubscriber();
769     s1.request = false;
770     TestSubscriber s2 = new TestSubscriber();
771     s2.request = false;
772     p.subscribe(s1);
773     p.subscribe(s2);
774     s2.awaitSubscribe();
775     s1.awaitSubscribe();
776     int n = 0;
777     for (int i = 1; i <= 8; ++i) {
778     int d = p.offer(i, (s, x) -> reqHandle(calls, s));
779     n = n + 2 + (d < 0 ? d : 0);
780     }
781     p.close();
782     s2.awaitComplete();
783     s1.awaitComplete();
784 jsr166 1.6 assertEquals(n, s1.nexts + s2.nexts);
785 dl 1.1 assertTrue(calls.get() >= 2);
786     }
787    
788     /**
789 jsr166 1.4 * Timed offer to a publisher with no subscribers returns lag 0
790 dl 1.1 */
791     public void testEmptyTimedOffer() {
792     SubmissionPublisher<Integer> p = basicPublisher();
793 jsr166 1.7 long startTime = System.nanoTime();
794 jsr166 1.4 assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null));
795 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
796 dl 1.1 }
797    
798     /**
799 jsr166 1.3 * Timed offer with null item or TimeUnit throws NPE
800 dl 1.1 */
801     public void testNullTimedOffer() {
802     SubmissionPublisher<Integer> p = basicPublisher();
803 jsr166 1.7 long startTime = System.nanoTime();
804 dl 1.1 try {
805 jsr166 1.7 p.offer(null, LONG_DELAY_MS, MILLISECONDS, null);
806 jsr166 1.2 shouldThrow();
807     } catch (NullPointerException success) {}
808 dl 1.1 try {
809 jsr166 1.7 p.offer(1, LONG_DELAY_MS, null, null);
810 jsr166 1.2 shouldThrow();
811     } catch (NullPointerException success) {}
812 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
813 dl 1.1 }
814    
815     /**
816 jsr166 1.3 * Timed offer returns number of lagged items if not saturated
817 dl 1.1 */
818     public void testLaggedTimedOffer() {
819     SubmissionPublisher<Integer> p = basicPublisher();
820     TestSubscriber s1 = new TestSubscriber();
821     s1.request = false;
822     TestSubscriber s2 = new TestSubscriber();
823     s2.request = false;
824     p.subscribe(s1);
825     p.subscribe(s2);
826     s2.awaitSubscribe();
827     s1.awaitSubscribe();
828 jsr166 1.7 long startTime = System.nanoTime();
829     assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
830     assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
831 dl 1.1 s1.sn.request(4);
832 jsr166 1.7 assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
833 dl 1.1 s2.sn.request(4);
834 jsr166 1.7 p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
835 dl 1.1 p.close();
836     s2.awaitComplete();
837 jsr166 1.6 assertEquals(4, s2.nexts);
838 dl 1.1 s1.awaitComplete();
839 jsr166 1.6 assertEquals(4, s2.nexts);
840 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
841 dl 1.1 }
842    
843     /**
844 jsr166 1.3 * Timed offer reports drops if saturated
845 dl 1.1 */
846     public void testDroppedTimedOffer() {
847     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
848     basicExecutor, 4);
849     TestSubscriber s1 = new TestSubscriber();
850     s1.request = false;
851     TestSubscriber s2 = new TestSubscriber();
852     s2.request = false;
853     p.subscribe(s1);
854     p.subscribe(s2);
855     s2.awaitSubscribe();
856     s1.awaitSubscribe();
857 dl 1.9 long delay = timeoutMillis();
858 dl 1.1 for (int i = 1; i <= 4; ++i)
859 dl 1.9 assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0);
860     long startTime = System.nanoTime();
861     assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0);
862 dl 1.1 s1.sn.request(64);
863 dl 1.9 assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0);
864     // 2 * delay should elapse but check only 1 * delay to allow timer slop
865     assertTrue(millisElapsedSince(startTime) >= delay);
866 dl 1.1 s2.sn.request(64);
867     p.close();
868     s2.awaitComplete();
869     assertTrue(s2.nexts >= 2);
870     s1.awaitComplete();
871     assertTrue(s1.nexts >= 2);
872     }
873    
874     /**
875 jsr166 1.3 * Timed offer invokes drop handler if saturated
876 dl 1.1 */
877     public void testHandledDroppedTimedOffer() {
878     AtomicInteger calls = new AtomicInteger();
879     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
880     basicExecutor, 4);
881     TestSubscriber s1 = new TestSubscriber();
882     s1.request = false;
883     TestSubscriber s2 = new TestSubscriber();
884     s2.request = false;
885     p.subscribe(s1);
886     p.subscribe(s2);
887     s2.awaitSubscribe();
888     s1.awaitSubscribe();
889 dl 1.9 long delay = timeoutMillis();
890 dl 1.1 for (int i = 1; i <= 4; ++i)
891 dl 1.9 assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
892     long startTime = System.nanoTime();
893     assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
894 dl 1.1 s1.sn.request(64);
895 dl 1.9 assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
896     assertTrue(millisElapsedSince(startTime) >= delay);
897 dl 1.1 s2.sn.request(64);
898     p.close();
899     s2.awaitComplete();
900     s1.awaitComplete();
901     assertTrue(calls.get() >= 2);
902     }
903    
904     /**
905 jsr166 1.3 * Timed offer succeeds if drop handler forces request
906 dl 1.1 */
907     public void testRecoveredHandledDroppedTimedOffer() {
908     AtomicInteger calls = new AtomicInteger();
909     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
910     basicExecutor, 4);
911     TestSubscriber s1 = new TestSubscriber();
912     s1.request = false;
913     TestSubscriber s2 = new TestSubscriber();
914     s2.request = false;
915     p.subscribe(s1);
916     p.subscribe(s2);
917     s2.awaitSubscribe();
918     s1.awaitSubscribe();
919     int n = 0;
920 dl 1.9 long delay = timeoutMillis();
921     long startTime = System.nanoTime();
922     for (int i = 1; i <= 6; ++i) {
923     int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
924 dl 1.1 n = n + 2 + (d < 0 ? d : 0);
925     }
926 dl 1.9 assertTrue(millisElapsedSince(startTime) >= delay);
927 dl 1.1 p.close();
928     s2.awaitComplete();
929     s1.awaitComplete();
930 jsr166 1.6 assertEquals(n, s1.nexts + s2.nexts);
931 dl 1.1 assertTrue(calls.get() >= 2);
932     }
933    
934 dl 1.10 /**
935     * consume returns a CompletableFuture that is done when
936     * publisher completes
937     */
938     public void testConsume() {
939     AtomicInteger sum = new AtomicInteger();
940     SubmissionPublisher<Integer> p = basicPublisher();
941 jsr166 1.11 CompletableFuture<Void> f =
942 jsr166 1.15 p.consume((Integer x) -> sum.getAndAdd(x.intValue()));
943 dl 1.10 int n = 20;
944     for (int i = 1; i <= n; ++i)
945     p.submit(i);
946     p.close();
947     f.join();
948     assertEquals((n * (n + 1)) / 2, sum.get());
949     }
950    
951     /**
952     * consume(null) throws NPE
953     */
954     public void testConsumeNPE() {
955     SubmissionPublisher<Integer> p = basicPublisher();
956     try {
957     CompletableFuture<Void> f = p.consume(null);
958     shouldThrow();
959 jsr166 1.11 } catch (NullPointerException success) {}
960 dl 1.10 }
961    
962     /**
963     * consume eventually stops processing published items if cancelled
964     */
965     public void testCancelledConsume() {
966     AtomicInteger count = new AtomicInteger();
967     SubmissionPublisher<Integer> p = basicPublisher();
968     CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
969     f.cancel(true);
970     int n = 1000000; // arbitrary limit
971     for (int i = 1; i <= n; ++i)
972     p.submit(i);
973     assertTrue(count.get() < n);
974     }
975 jsr166 1.11
976 dl 1.1 }