ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SubmissionPublisherTest.java
Revision: 1.28
Committed: Sun Jul 22 21:49:33 2018 UTC (5 years, 9 months ago) by jsr166
Branch: MAIN
Changes since 1.27: +3 -1 lines
Log Message:
Fix errorprone warning [FutureReturnValueIgnored]

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea and Martin Buchholz with assistance from
3     * members of JCP JSR-166 Expert Group and released to the public
4     * domain, as explained at
5     * http://creativecommons.org/publicdomain/zero/1.0/
6     */
7    
8 dl 1.10 import java.util.concurrent.CompletableFuture;
9 jsr166 1.24 import java.util.concurrent.CountDownLatch;
10 dl 1.1 import java.util.concurrent.Executor;
11     import java.util.concurrent.Executors;
12     import java.util.concurrent.Flow;
13 dl 1.10 import java.util.concurrent.ForkJoinPool;
14 dl 1.1 import java.util.concurrent.SubmissionPublisher;
15     import java.util.concurrent.atomic.AtomicInteger;
16     import junit.framework.Test;
17     import junit.framework.TestSuite;
18    
19 dl 1.10 import static java.util.concurrent.Flow.Subscriber;
20     import static java.util.concurrent.Flow.Subscription;
21     import static java.util.concurrent.TimeUnit.MILLISECONDS;
22    
23 dl 1.1 public class SubmissionPublisherTest extends JSR166TestCase {
24    
25     public static void main(String[] args) {
26     main(suite(), args);
27     }
28     public static Test suite() {
29     return new TestSuite(SubmissionPublisherTest.class);
30     }
31    
32 dl 1.12 final Executor basicExecutor = basicPublisher().getExecutor();
33 jsr166 1.13
34 dl 1.1 static SubmissionPublisher<Integer> basicPublisher() {
35 dl 1.12 return new SubmissionPublisher<Integer>();
36 dl 1.1 }
37 jsr166 1.2
38 dl 1.1 static class SPException extends RuntimeException {}
39    
40     class TestSubscriber implements Subscriber<Integer> {
41     volatile Subscription sn;
42     int last; // Requires that onNexts are in numeric order
43     volatile int nexts;
44     volatile int errors;
45     volatile int completes;
46     volatile boolean throwOnCall = false;
47     volatile boolean request = true;
48     volatile Throwable lastError;
49    
50     public synchronized void onSubscribe(Subscription s) {
51     threadAssertTrue(sn == null);
52     sn = s;
53     notifyAll();
54     if (throwOnCall)
55     throw new SPException();
56     if (request)
57     sn.request(1L);
58     }
59     public synchronized void onNext(Integer t) {
60     ++nexts;
61     notifyAll();
62     int current = t.intValue();
63     threadAssertTrue(current >= last);
64     last = current;
65     if (request)
66     sn.request(1L);
67     if (throwOnCall)
68     throw new SPException();
69     }
70     public synchronized void onError(Throwable t) {
71     threadAssertTrue(completes == 0);
72     threadAssertTrue(errors == 0);
73     lastError = t;
74     ++errors;
75     notifyAll();
76     }
77     public synchronized void onComplete() {
78     threadAssertTrue(completes == 0);
79     ++completes;
80     notifyAll();
81     }
82    
83     synchronized void awaitSubscribe() {
84     while (sn == null) {
85     try {
86     wait();
87     } catch (Exception ex) {
88     threadUnexpectedException(ex);
89     break;
90     }
91     }
92     }
93     synchronized void awaitNext(int n) {
94     while (nexts < n) {
95     try {
96     wait();
97     } catch (Exception ex) {
98     threadUnexpectedException(ex);
99     break;
100     }
101     }
102     }
103     synchronized void awaitComplete() {
104     while (completes == 0 && errors == 0) {
105     try {
106     wait();
107     } catch (Exception ex) {
108     threadUnexpectedException(ex);
109     break;
110     }
111     }
112     }
113     synchronized void awaitError() {
114     while (errors == 0) {
115     try {
116     wait();
117     } catch (Exception ex) {
118     threadUnexpectedException(ex);
119     break;
120     }
121     }
122     }
123    
124     }
125    
126     /**
127     * A new SubmissionPublisher has no subscribers, a non-null
128     * executor, a power-of-two capacity, is not closed, and reports
129     * zero demand and lag
130     */
131     void checkInitialState(SubmissionPublisher<?> p) {
132     assertFalse(p.hasSubscribers());
133 jsr166 1.6 assertEquals(0, p.getNumberOfSubscribers());
134 dl 1.1 assertTrue(p.getSubscribers().isEmpty());
135     assertFalse(p.isClosed());
136     assertNull(p.getClosedException());
137     int n = p.getMaxBufferCapacity();
138     assertTrue((n & (n - 1)) == 0); // power of two
139     assertNotNull(p.getExecutor());
140 jsr166 1.6 assertEquals(0, p.estimateMinimumDemand());
141     assertEquals(0, p.estimateMaximumLag());
142 dl 1.1 }
143    
144     /**
145     * A default-constructed SubmissionPublisher has no subscribers,
146     * is not closed, has default buffer size, and uses the
147 dl 1.12 * defaultExecutor
148 dl 1.1 */
149     public void testConstructor1() {
150 jsr166 1.18 SubmissionPublisher<Integer> p = new SubmissionPublisher<>();
151 dl 1.1 checkInitialState(p);
152     assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
153 dl 1.12 Executor e = p.getExecutor(), c = ForkJoinPool.commonPool();
154     if (ForkJoinPool.getCommonPoolParallelism() > 1)
155     assertSame(e, c);
156     else
157     assertNotSame(e, c);
158 dl 1.1 }
159    
160     /**
161     * A new SubmissionPublisher has no subscribers, is not closed,
162     * has the given buffer size, and uses the given executor
163     */
164     public void testConstructor2() {
165     Executor e = Executors.newFixedThreadPool(1);
166 jsr166 1.18 SubmissionPublisher<Integer> p = new SubmissionPublisher<>(e, 8);
167 dl 1.1 checkInitialState(p);
168     assertSame(p.getExecutor(), e);
169 jsr166 1.6 assertEquals(8, p.getMaxBufferCapacity());
170 dl 1.1 }
171    
172     /**
173 jsr166 1.21 * A null Executor argument to SubmissionPublisher constructor
174     * throws NullPointerException
175 dl 1.1 */
176     public void testConstructor3() {
177     try {
178 jsr166 1.2 new SubmissionPublisher<Integer>(null, 8);
179 dl 1.1 shouldThrow();
180 jsr166 1.2 } catch (NullPointerException success) {}
181 dl 1.1 }
182    
183     /**
184     * A negative capacity argument to SubmissionPublisher constructor
185 jsr166 1.21 * throws IllegalArgumentException
186 dl 1.1 */
187     public void testConstructor4() {
188     Executor e = Executors.newFixedThreadPool(1);
189     try {
190 jsr166 1.2 new SubmissionPublisher<Integer>(e, -1);
191 dl 1.1 shouldThrow();
192 jsr166 1.2 } catch (IllegalArgumentException success) {}
193 dl 1.1 }
194    
195     /**
196     * A closed publisher reports isClosed with no closedException and
197 jsr166 1.21 * throws IllegalStateException upon attempted submission; a
198     * subsequent close or closeExceptionally has no additional
199     * effect.
200 dl 1.1 */
201     public void testClose() {
202     SubmissionPublisher<Integer> p = basicPublisher();
203     checkInitialState(p);
204     p.close();
205     assertTrue(p.isClosed());
206     assertNull(p.getClosedException());
207     try {
208     p.submit(1);
209     shouldThrow();
210 jsr166 1.2 } catch (IllegalStateException success) {}
211 dl 1.1 Throwable ex = new SPException();
212     p.closeExceptionally(ex);
213     assertTrue(p.isClosed());
214     assertNull(p.getClosedException());
215     }
216    
217     /**
218     * A publisher closedExceptionally reports isClosed with the
219 jsr166 1.21 * closedException and throws IllegalStateException upon attempted
220     * submission; a subsequent close or closeExceptionally has no
221     * additional effect.
222 dl 1.1 */
223     public void testCloseExceptionally() {
224     SubmissionPublisher<Integer> p = basicPublisher();
225     checkInitialState(p);
226     Throwable ex = new SPException();
227     p.closeExceptionally(ex);
228     assertTrue(p.isClosed());
229     assertSame(p.getClosedException(), ex);
230     try {
231     p.submit(1);
232     shouldThrow();
233 jsr166 1.2 } catch (IllegalStateException success) {}
234 dl 1.1 p.close();
235     assertTrue(p.isClosed());
236     assertSame(p.getClosedException(), ex);
237     }
238    
239     /**
240     * Upon subscription, the subscriber's onSubscribe is called, no
241     * other Subscriber methods are invoked, the publisher
242     * hasSubscribers, isSubscribed is true, and existing
243     * subscriptions are unaffected.
244     */
245     public void testSubscribe1() {
246     TestSubscriber s = new TestSubscriber();
247     SubmissionPublisher<Integer> p = basicPublisher();
248     p.subscribe(s);
249     assertTrue(p.hasSubscribers());
250 jsr166 1.6 assertEquals(1, p.getNumberOfSubscribers());
251 dl 1.1 assertTrue(p.getSubscribers().contains(s));
252     assertTrue(p.isSubscribed(s));
253     s.awaitSubscribe();
254     assertNotNull(s.sn);
255 jsr166 1.6 assertEquals(0, s.nexts);
256     assertEquals(0, s.errors);
257     assertEquals(0, s.completes);
258 dl 1.1 TestSubscriber s2 = new TestSubscriber();
259     p.subscribe(s2);
260     assertTrue(p.hasSubscribers());
261 jsr166 1.6 assertEquals(2, p.getNumberOfSubscribers());
262 dl 1.1 assertTrue(p.getSubscribers().contains(s));
263     assertTrue(p.getSubscribers().contains(s2));
264     assertTrue(p.isSubscribed(s));
265     assertTrue(p.isSubscribed(s2));
266     s2.awaitSubscribe();
267     assertNotNull(s2.sn);
268 jsr166 1.6 assertEquals(0, s2.nexts);
269     assertEquals(0, s2.errors);
270     assertEquals(0, s2.completes);
271 dl 1.9 p.close();
272 dl 1.1 }
273    
274     /**
275     * If closed, upon subscription, the subscriber's onComplete
276     * method is invoked
277     */
278     public void testSubscribe2() {
279     TestSubscriber s = new TestSubscriber();
280     SubmissionPublisher<Integer> p = basicPublisher();
281     p.close();
282     p.subscribe(s);
283     s.awaitComplete();
284 jsr166 1.6 assertEquals(0, s.nexts);
285     assertEquals(0, s.errors);
286     assertEquals(1, s.completes, 1);
287 dl 1.1 }
288    
289     /**
290     * If closedExceptionally, upon subscription, the subscriber's
291     * onError method is invoked
292     */
293     public void testSubscribe3() {
294     TestSubscriber s = new TestSubscriber();
295     SubmissionPublisher<Integer> p = basicPublisher();
296     Throwable ex = new SPException();
297     p.closeExceptionally(ex);
298     assertTrue(p.isClosed());
299     assertSame(p.getClosedException(), ex);
300     p.subscribe(s);
301     s.awaitError();
302 jsr166 1.6 assertEquals(0, s.nexts);
303     assertEquals(1, s.errors);
304 dl 1.1 }
305    
306     /**
307     * Upon attempted resubscription, the subscriber's onError is
308     * called and the subscription is cancelled.
309     */
310     public void testSubscribe4() {
311     TestSubscriber s = new TestSubscriber();
312     SubmissionPublisher<Integer> p = basicPublisher();
313     p.subscribe(s);
314     assertTrue(p.hasSubscribers());
315 jsr166 1.6 assertEquals(1, p.getNumberOfSubscribers());
316 dl 1.1 assertTrue(p.getSubscribers().contains(s));
317     assertTrue(p.isSubscribed(s));
318     s.awaitSubscribe();
319     assertNotNull(s.sn);
320 jsr166 1.6 assertEquals(0, s.nexts);
321     assertEquals(0, s.errors);
322     assertEquals(0, s.completes);
323 dl 1.1 p.subscribe(s);
324     s.awaitError();
325 jsr166 1.6 assertEquals(0, s.nexts);
326     assertEquals(1, s.errors);
327 dl 1.1 assertFalse(p.isSubscribed(s));
328     }
329    
330     /**
331     * An exception thrown in onSubscribe causes onError
332     */
333     public void testSubscribe5() {
334     TestSubscriber s = new TestSubscriber();
335     SubmissionPublisher<Integer> p = basicPublisher();
336     s.throwOnCall = true;
337 jsr166 1.27 p.subscribe(s);
338 dl 1.1 s.awaitError();
339 jsr166 1.6 assertEquals(0, s.nexts);
340     assertEquals(1, s.errors);
341     assertEquals(0, s.completes);
342 dl 1.1 }
343    
344     /**
345 jsr166 1.8 * subscribe(null) throws NPE
346 dl 1.1 */
347     public void testSubscribe6() {
348     SubmissionPublisher<Integer> p = basicPublisher();
349     try {
350     p.subscribe(null);
351 jsr166 1.2 shouldThrow();
352     } catch (NullPointerException success) {}
353 dl 1.1 checkInitialState(p);
354     }
355    
356     /**
357     * Closing a publisher causes onComplete to subscribers
358     */
359     public void testCloseCompletes() {
360     SubmissionPublisher<Integer> p = basicPublisher();
361     TestSubscriber s1 = new TestSubscriber();
362     TestSubscriber s2 = new TestSubscriber();
363     p.subscribe(s1);
364     p.subscribe(s2);
365     p.submit(1);
366     p.close();
367     assertTrue(p.isClosed());
368     assertNull(p.getClosedException());
369     s1.awaitComplete();
370 jsr166 1.6 assertEquals(1, s1.nexts);
371     assertEquals(1, s1.completes);
372 dl 1.1 s2.awaitComplete();
373 jsr166 1.6 assertEquals(1, s2.nexts);
374     assertEquals(1, s2.completes);
375 dl 1.1 }
376    
377     /**
378     * Closing a publisher exceptionally causes onError to subscribers
379 dl 1.16 * after they are subscribed
380 dl 1.1 */
381     public void testCloseExceptionallyError() {
382     SubmissionPublisher<Integer> p = basicPublisher();
383     TestSubscriber s1 = new TestSubscriber();
384     TestSubscriber s2 = new TestSubscriber();
385     p.subscribe(s1);
386     p.subscribe(s2);
387     p.submit(1);
388     p.closeExceptionally(new SPException());
389     assertTrue(p.isClosed());
390 dl 1.16 s1.awaitSubscribe();
391 dl 1.1 s1.awaitError();
392     assertTrue(s1.nexts <= 1);
393 jsr166 1.6 assertEquals(1, s1.errors);
394 dl 1.17 s2.awaitSubscribe();
395 dl 1.1 s2.awaitError();
396     assertTrue(s2.nexts <= 1);
397 jsr166 1.6 assertEquals(1, s2.errors);
398 dl 1.1 }
399    
400     /**
401     * Cancelling a subscription eventually causes no more onNexts to be issued
402     */
403     public void testCancel() {
404 dl 1.23 SubmissionPublisher<Integer> p =
405 jsr166 1.26 new SubmissionPublisher<>(basicExecutor, 4); // must be < 20
406 dl 1.1 TestSubscriber s1 = new TestSubscriber();
407     TestSubscriber s2 = new TestSubscriber();
408     p.subscribe(s1);
409     p.subscribe(s2);
410     s1.awaitSubscribe();
411     p.submit(1);
412     s1.sn.cancel();
413     for (int i = 2; i <= 20; ++i)
414     p.submit(i);
415     p.close();
416     s2.awaitComplete();
417 jsr166 1.6 assertEquals(20, s2.nexts);
418     assertEquals(1, s2.completes);
419 dl 1.1 assertTrue(s1.nexts < 20);
420     assertFalse(p.isSubscribed(s1));
421     }
422    
423     /**
424     * Throwing an exception in onNext causes onError
425     */
426     public void testThrowOnNext() {
427     SubmissionPublisher<Integer> p = basicPublisher();
428     TestSubscriber s1 = new TestSubscriber();
429     TestSubscriber s2 = new TestSubscriber();
430     p.subscribe(s1);
431     p.subscribe(s2);
432     s1.awaitSubscribe();
433     p.submit(1);
434     s1.throwOnCall = true;
435     p.submit(2);
436     p.close();
437     s2.awaitComplete();
438 jsr166 1.6 assertEquals(2, s2.nexts);
439 dl 1.1 s1.awaitComplete();
440 jsr166 1.6 assertEquals(1, s1.errors);
441 dl 1.1 }
442    
443     /**
444 jsr166 1.8 * If a handler is supplied in constructor, it is invoked when
445 dl 1.1 * subscriber throws an exception in onNext
446     */
447     public void testThrowOnNextHandler() {
448     AtomicInteger calls = new AtomicInteger();
449 jsr166 1.18 SubmissionPublisher<Integer> p = new SubmissionPublisher<>(
450     basicExecutor, 8, (s, e) -> calls.getAndIncrement());
451 dl 1.1 TestSubscriber s1 = new TestSubscriber();
452     TestSubscriber s2 = new TestSubscriber();
453     p.subscribe(s1);
454     p.subscribe(s2);
455     s1.awaitSubscribe();
456     p.submit(1);
457     s1.throwOnCall = true;
458     p.submit(2);
459     p.close();
460     s2.awaitComplete();
461 jsr166 1.6 assertEquals(2, s2.nexts);
462     assertEquals(1, s2.completes);
463 dl 1.1 s1.awaitError();
464 jsr166 1.6 assertEquals(1, s1.errors);
465     assertEquals(1, calls.get());
466 dl 1.1 }
467    
468     /**
469     * onNext items are issued in the same order to each subscriber
470     */
471     public void testOrder() {
472     SubmissionPublisher<Integer> p = basicPublisher();
473     TestSubscriber s1 = new TestSubscriber();
474     TestSubscriber s2 = new TestSubscriber();
475     p.subscribe(s1);
476     p.subscribe(s2);
477     for (int i = 1; i <= 20; ++i)
478     p.submit(i);
479     p.close();
480     s2.awaitComplete();
481     s1.awaitComplete();
482 jsr166 1.6 assertEquals(20, s2.nexts);
483     assertEquals(1, s2.completes);
484     assertEquals(20, s1.nexts);
485     assertEquals(1, s1.completes);
486 dl 1.1 }
487    
488     /**
489     * onNext is issued only if requested
490     */
491     public void testRequest1() {
492     SubmissionPublisher<Integer> p = basicPublisher();
493     TestSubscriber s1 = new TestSubscriber();
494     s1.request = false;
495     p.subscribe(s1);
496     s1.awaitSubscribe();
497 jsr166 1.20 assertEquals(0, p.estimateMinimumDemand());
498 dl 1.1 TestSubscriber s2 = new TestSubscriber();
499     p.subscribe(s2);
500     p.submit(1);
501     p.submit(2);
502     s2.awaitNext(1);
503 jsr166 1.6 assertEquals(0, s1.nexts);
504 dl 1.1 s1.sn.request(3);
505     p.submit(3);
506     p.close();
507     s2.awaitComplete();
508 jsr166 1.6 assertEquals(3, s2.nexts);
509     assertEquals(1, s2.completes);
510 dl 1.1 s1.awaitComplete();
511     assertTrue(s1.nexts > 0);
512 jsr166 1.6 assertEquals(1, s1.completes);
513 dl 1.1 }
514    
515     /**
516     * onNext is not issued when requests become zero
517     */
518     public void testRequest2() {
519     SubmissionPublisher<Integer> p = basicPublisher();
520     TestSubscriber s1 = new TestSubscriber();
521     TestSubscriber s2 = new TestSubscriber();
522     p.subscribe(s1);
523     p.subscribe(s2);
524     s2.awaitSubscribe();
525     s1.awaitSubscribe();
526     s1.request = false;
527     p.submit(1);
528     p.submit(2);
529     p.close();
530     s2.awaitComplete();
531 jsr166 1.6 assertEquals(2, s2.nexts);
532     assertEquals(1, s2.completes);
533 dl 1.1 s1.awaitNext(1);
534 jsr166 1.6 assertEquals(1, s1.nexts);
535 dl 1.1 }
536    
537     /**
538 dl 1.19 * Non-positive request causes error
539 dl 1.1 */
540     public void testRequest3() {
541     SubmissionPublisher<Integer> p = basicPublisher();
542     TestSubscriber s1 = new TestSubscriber();
543     TestSubscriber s2 = new TestSubscriber();
544 dl 1.19 TestSubscriber s3 = new TestSubscriber();
545 dl 1.1 p.subscribe(s1);
546     p.subscribe(s2);
547 dl 1.19 p.subscribe(s3);
548     s3.awaitSubscribe();
549 dl 1.1 s2.awaitSubscribe();
550     s1.awaitSubscribe();
551     s1.sn.request(-1L);
552 dl 1.19 s3.sn.request(0L);
553 dl 1.1 p.submit(1);
554     p.submit(2);
555     p.close();
556     s2.awaitComplete();
557 jsr166 1.6 assertEquals(2, s2.nexts);
558     assertEquals(1, s2.completes);
559 dl 1.1 s1.awaitError();
560 jsr166 1.6 assertEquals(1, s1.errors);
561 dl 1.1 assertTrue(s1.lastError instanceof IllegalArgumentException);
562 dl 1.19 s3.awaitError();
563     assertEquals(1, s3.errors);
564     assertTrue(s3.lastError instanceof IllegalArgumentException);
565 dl 1.1 }
566    
567     /**
568     * estimateMinimumDemand reports 0 until request, nonzero after
569 dl 1.22 * request
570 dl 1.1 */
571     public void testEstimateMinimumDemand() {
572     TestSubscriber s = new TestSubscriber();
573     SubmissionPublisher<Integer> p = basicPublisher();
574     s.request = false;
575     p.subscribe(s);
576     s.awaitSubscribe();
577 jsr166 1.6 assertEquals(0, p.estimateMinimumDemand());
578 dl 1.1 s.sn.request(1);
579 jsr166 1.6 assertEquals(1, p.estimateMinimumDemand());
580 dl 1.1 }
581    
582     /**
583 jsr166 1.3 * submit to a publisher with no subscribers returns lag 0
584 dl 1.1 */
585     public void testEmptySubmit() {
586     SubmissionPublisher<Integer> p = basicPublisher();
587 jsr166 1.6 assertEquals(0, p.submit(1));
588 dl 1.1 }
589    
590     /**
591 jsr166 1.3 * submit(null) throws NPE
592 dl 1.1 */
593     public void testNullSubmit() {
594     SubmissionPublisher<Integer> p = basicPublisher();
595     try {
596     p.submit(null);
597 jsr166 1.2 shouldThrow();
598     } catch (NullPointerException success) {}
599 dl 1.1 }
600    
601     /**
602 jsr166 1.3 * submit returns number of lagged items, compatible with result
603 dl 1.1 * of estimateMaximumLag.
604     */
605     public void testLaggedSubmit() {
606     SubmissionPublisher<Integer> p = basicPublisher();
607     TestSubscriber s1 = new TestSubscriber();
608     s1.request = false;
609     TestSubscriber s2 = new TestSubscriber();
610     s2.request = false;
611     p.subscribe(s1);
612     p.subscribe(s2);
613     s2.awaitSubscribe();
614     s1.awaitSubscribe();
615 jsr166 1.6 assertEquals(1, p.submit(1));
616 dl 1.1 assertTrue(p.estimateMaximumLag() >= 1);
617     assertTrue(p.submit(2) >= 2);
618     assertTrue(p.estimateMaximumLag() >= 2);
619     s1.sn.request(4);
620     assertTrue(p.submit(3) >= 3);
621     assertTrue(p.estimateMaximumLag() >= 3);
622     s2.sn.request(4);
623     p.submit(4);
624     p.close();
625     s2.awaitComplete();
626 jsr166 1.6 assertEquals(4, s2.nexts);
627 dl 1.1 s1.awaitComplete();
628 jsr166 1.6 assertEquals(4, s2.nexts);
629 dl 1.1 }
630    
631     /**
632     * submit eventually issues requested items when buffer capacity is 1
633     */
634     public void testCap1Submit() {
635 jsr166 1.18 SubmissionPublisher<Integer> p
636     = new SubmissionPublisher<>(basicExecutor, 1);
637 dl 1.1 TestSubscriber s1 = new TestSubscriber();
638     TestSubscriber s2 = new TestSubscriber();
639     p.subscribe(s1);
640     p.subscribe(s2);
641     for (int i = 1; i <= 20; ++i) {
642     assertTrue(p.submit(i) >= 0);
643     }
644     p.close();
645     s2.awaitComplete();
646     s1.awaitComplete();
647 jsr166 1.6 assertEquals(20, s2.nexts);
648     assertEquals(1, s2.completes);
649     assertEquals(20, s1.nexts);
650     assertEquals(1, s1.completes);
651 dl 1.1 }
652 jsr166 1.2
653 dl 1.1 static boolean noopHandle(AtomicInteger count) {
654     count.getAndIncrement();
655     return false;
656     }
657    
658     static boolean reqHandle(AtomicInteger count, Subscriber s) {
659     count.getAndIncrement();
660     ((TestSubscriber)s).sn.request(Long.MAX_VALUE);
661     return true;
662     }
663    
664     /**
665 jsr166 1.3 * offer to a publisher with no subscribers returns lag 0
666 dl 1.1 */
667     public void testEmptyOffer() {
668     SubmissionPublisher<Integer> p = basicPublisher();
669 jsr166 1.3 assertEquals(0, p.offer(1, null));
670 dl 1.1 }
671    
672     /**
673 jsr166 1.3 * offer(null) throws NPE
674 dl 1.1 */
675     public void testNullOffer() {
676     SubmissionPublisher<Integer> p = basicPublisher();
677     try {
678     p.offer(null, null);
679 jsr166 1.2 shouldThrow();
680     } catch (NullPointerException success) {}
681 dl 1.1 }
682    
683     /**
684 jsr166 1.3 * offer returns number of lagged items if not saturated
685 dl 1.1 */
686     public void testLaggedOffer() {
687     SubmissionPublisher<Integer> p = basicPublisher();
688     TestSubscriber s1 = new TestSubscriber();
689     s1.request = false;
690     TestSubscriber s2 = new TestSubscriber();
691     s2.request = false;
692     p.subscribe(s1);
693     p.subscribe(s2);
694     s2.awaitSubscribe();
695     s1.awaitSubscribe();
696     assertTrue(p.offer(1, null) >= 1);
697     assertTrue(p.offer(2, null) >= 2);
698     s1.sn.request(4);
699     assertTrue(p.offer(3, null) >= 3);
700     s2.sn.request(4);
701     p.offer(4, null);
702     p.close();
703     s2.awaitComplete();
704 jsr166 1.6 assertEquals(4, s2.nexts);
705 dl 1.1 s1.awaitComplete();
706 jsr166 1.6 assertEquals(4, s2.nexts);
707 dl 1.1 }
708    
709     /**
710 jsr166 1.3 * offer reports drops if saturated
711 dl 1.1 */
712     public void testDroppedOffer() {
713 jsr166 1.18 SubmissionPublisher<Integer> p
714     = new SubmissionPublisher<>(basicExecutor, 4);
715 dl 1.1 TestSubscriber s1 = new TestSubscriber();
716     s1.request = false;
717     TestSubscriber s2 = new TestSubscriber();
718     s2.request = false;
719     p.subscribe(s1);
720     p.subscribe(s2);
721     s2.awaitSubscribe();
722     s1.awaitSubscribe();
723     for (int i = 1; i <= 4; ++i)
724     assertTrue(p.offer(i, null) >= 0);
725     p.offer(5, null);
726     assertTrue(p.offer(6, null) < 0);
727     s1.sn.request(64);
728     assertTrue(p.offer(7, null) < 0);
729     s2.sn.request(64);
730     p.close();
731     s2.awaitComplete();
732     assertTrue(s2.nexts >= 4);
733     s1.awaitComplete();
734     assertTrue(s1.nexts >= 4);
735     }
736    
737     /**
738 jsr166 1.3 * offer invokes drop handler if saturated
739 dl 1.1 */
740     public void testHandledDroppedOffer() {
741     AtomicInteger calls = new AtomicInteger();
742 jsr166 1.18 SubmissionPublisher<Integer> p
743     = new SubmissionPublisher<>(basicExecutor, 4);
744 dl 1.1 TestSubscriber s1 = new TestSubscriber();
745     s1.request = false;
746     TestSubscriber s2 = new TestSubscriber();
747     s2.request = false;
748     p.subscribe(s1);
749     p.subscribe(s2);
750     s2.awaitSubscribe();
751     s1.awaitSubscribe();
752     for (int i = 1; i <= 4; ++i)
753     assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
754     p.offer(4, (s, x) -> noopHandle(calls));
755     assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
756     s1.sn.request(64);
757     assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
758     s2.sn.request(64);
759     p.close();
760     s2.awaitComplete();
761     s1.awaitComplete();
762     assertTrue(calls.get() >= 4);
763     }
764    
765     /**
766 jsr166 1.3 * offer succeeds if drop handler forces request
767 dl 1.1 */
768     public void testRecoveredHandledDroppedOffer() {
769     AtomicInteger calls = new AtomicInteger();
770 jsr166 1.18 SubmissionPublisher<Integer> p
771     = new SubmissionPublisher<>(basicExecutor, 4);
772 dl 1.1 TestSubscriber s1 = new TestSubscriber();
773     s1.request = false;
774     TestSubscriber s2 = new TestSubscriber();
775     s2.request = false;
776     p.subscribe(s1);
777     p.subscribe(s2);
778     s2.awaitSubscribe();
779     s1.awaitSubscribe();
780     int n = 0;
781     for (int i = 1; i <= 8; ++i) {
782     int d = p.offer(i, (s, x) -> reqHandle(calls, s));
783     n = n + 2 + (d < 0 ? d : 0);
784     }
785     p.close();
786     s2.awaitComplete();
787     s1.awaitComplete();
788 jsr166 1.6 assertEquals(n, s1.nexts + s2.nexts);
789 dl 1.1 assertTrue(calls.get() >= 2);
790     }
791    
792     /**
793 jsr166 1.4 * Timed offer to a publisher with no subscribers returns lag 0
794 dl 1.1 */
795     public void testEmptyTimedOffer() {
796     SubmissionPublisher<Integer> p = basicPublisher();
797 jsr166 1.7 long startTime = System.nanoTime();
798 jsr166 1.4 assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null));
799 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
800 dl 1.1 }
801    
802     /**
803 jsr166 1.3 * Timed offer with null item or TimeUnit throws NPE
804 dl 1.1 */
805     public void testNullTimedOffer() {
806     SubmissionPublisher<Integer> p = basicPublisher();
807 jsr166 1.7 long startTime = System.nanoTime();
808 dl 1.1 try {
809 jsr166 1.7 p.offer(null, LONG_DELAY_MS, MILLISECONDS, null);
810 jsr166 1.2 shouldThrow();
811     } catch (NullPointerException success) {}
812 dl 1.1 try {
813 jsr166 1.7 p.offer(1, LONG_DELAY_MS, null, null);
814 jsr166 1.2 shouldThrow();
815     } catch (NullPointerException success) {}
816 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
817 dl 1.1 }
818    
819     /**
820 jsr166 1.3 * Timed offer returns number of lagged items if not saturated
821 dl 1.1 */
822     public void testLaggedTimedOffer() {
823     SubmissionPublisher<Integer> p = basicPublisher();
824     TestSubscriber s1 = new TestSubscriber();
825     s1.request = false;
826     TestSubscriber s2 = new TestSubscriber();
827     s2.request = false;
828     p.subscribe(s1);
829     p.subscribe(s2);
830     s2.awaitSubscribe();
831     s1.awaitSubscribe();
832 jsr166 1.7 long startTime = System.nanoTime();
833     assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
834     assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
835 dl 1.1 s1.sn.request(4);
836 jsr166 1.7 assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
837 dl 1.1 s2.sn.request(4);
838 jsr166 1.7 p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
839 dl 1.1 p.close();
840     s2.awaitComplete();
841 jsr166 1.6 assertEquals(4, s2.nexts);
842 dl 1.1 s1.awaitComplete();
843 jsr166 1.6 assertEquals(4, s2.nexts);
844 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
845 dl 1.1 }
846    
847     /**
848 jsr166 1.3 * Timed offer reports drops if saturated
849 dl 1.1 */
850     public void testDroppedTimedOffer() {
851 jsr166 1.18 SubmissionPublisher<Integer> p
852     = new SubmissionPublisher<>(basicExecutor, 4);
853 dl 1.1 TestSubscriber s1 = new TestSubscriber();
854     s1.request = false;
855     TestSubscriber s2 = new TestSubscriber();
856     s2.request = false;
857     p.subscribe(s1);
858     p.subscribe(s2);
859     s2.awaitSubscribe();
860     s1.awaitSubscribe();
861 dl 1.9 long delay = timeoutMillis();
862 dl 1.1 for (int i = 1; i <= 4; ++i)
863 dl 1.9 assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0);
864     long startTime = System.nanoTime();
865     assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0);
866 dl 1.1 s1.sn.request(64);
867 dl 1.9 assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0);
868     // 2 * delay should elapse but check only 1 * delay to allow timer slop
869     assertTrue(millisElapsedSince(startTime) >= delay);
870 dl 1.1 s2.sn.request(64);
871     p.close();
872     s2.awaitComplete();
873     assertTrue(s2.nexts >= 2);
874     s1.awaitComplete();
875     assertTrue(s1.nexts >= 2);
876     }
877    
878     /**
879 jsr166 1.3 * Timed offer invokes drop handler if saturated
880 dl 1.1 */
881     public void testHandledDroppedTimedOffer() {
882     AtomicInteger calls = new AtomicInteger();
883 jsr166 1.18 SubmissionPublisher<Integer> p
884     = new SubmissionPublisher<>(basicExecutor, 4);
885 dl 1.1 TestSubscriber s1 = new TestSubscriber();
886     s1.request = false;
887     TestSubscriber s2 = new TestSubscriber();
888     s2.request = false;
889     p.subscribe(s1);
890     p.subscribe(s2);
891     s2.awaitSubscribe();
892     s1.awaitSubscribe();
893 dl 1.9 long delay = timeoutMillis();
894 dl 1.1 for (int i = 1; i <= 4; ++i)
895 dl 1.9 assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
896     long startTime = System.nanoTime();
897     assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
898 dl 1.1 s1.sn.request(64);
899 dl 1.9 assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
900     assertTrue(millisElapsedSince(startTime) >= delay);
901 dl 1.1 s2.sn.request(64);
902     p.close();
903     s2.awaitComplete();
904     s1.awaitComplete();
905     assertTrue(calls.get() >= 2);
906     }
907    
908     /**
909 jsr166 1.3 * Timed offer succeeds if drop handler forces request
910 dl 1.1 */
911     public void testRecoveredHandledDroppedTimedOffer() {
912     AtomicInteger calls = new AtomicInteger();
913 jsr166 1.18 SubmissionPublisher<Integer> p
914     = new SubmissionPublisher<>(basicExecutor, 4);
915 dl 1.1 TestSubscriber s1 = new TestSubscriber();
916     s1.request = false;
917     TestSubscriber s2 = new TestSubscriber();
918     s2.request = false;
919     p.subscribe(s1);
920     p.subscribe(s2);
921     s2.awaitSubscribe();
922     s1.awaitSubscribe();
923     int n = 0;
924 dl 1.9 long delay = timeoutMillis();
925     long startTime = System.nanoTime();
926     for (int i = 1; i <= 6; ++i) {
927     int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
928 dl 1.1 n = n + 2 + (d < 0 ? d : 0);
929     }
930 dl 1.9 assertTrue(millisElapsedSince(startTime) >= delay);
931 dl 1.1 p.close();
932     s2.awaitComplete();
933     s1.awaitComplete();
934 jsr166 1.6 assertEquals(n, s1.nexts + s2.nexts);
935 dl 1.1 assertTrue(calls.get() >= 2);
936     }
937    
938 dl 1.10 /**
939     * consume returns a CompletableFuture that is done when
940     * publisher completes
941     */
942     public void testConsume() {
943     AtomicInteger sum = new AtomicInteger();
944     SubmissionPublisher<Integer> p = basicPublisher();
945 jsr166 1.11 CompletableFuture<Void> f =
946 jsr166 1.15 p.consume((Integer x) -> sum.getAndAdd(x.intValue()));
947 dl 1.10 int n = 20;
948     for (int i = 1; i <= n; ++i)
949     p.submit(i);
950     p.close();
951     f.join();
952     assertEquals((n * (n + 1)) / 2, sum.get());
953     }
954    
955     /**
956     * consume(null) throws NPE
957     */
958     public void testConsumeNPE() {
959     SubmissionPublisher<Integer> p = basicPublisher();
960     try {
961     CompletableFuture<Void> f = p.consume(null);
962     shouldThrow();
963 jsr166 1.11 } catch (NullPointerException success) {}
964 dl 1.10 }
965    
966     /**
967     * consume eventually stops processing published items if cancelled
968     */
969     public void testCancelledConsume() {
970     AtomicInteger count = new AtomicInteger();
971     SubmissionPublisher<Integer> p = basicPublisher();
972     CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
973     f.cancel(true);
974     int n = 1000000; // arbitrary limit
975     for (int i = 1; i <= n; ++i)
976     p.submit(i);
977     assertTrue(count.get() < n);
978     }
979 jsr166 1.11
980 jsr166 1.24 /**
981     * Tests scenario for
982     * JDK-8187947: A race condition in SubmissionPublisher
983     * cvs update -D '2017-11-25' src/main/java/util/concurrent/SubmissionPublisher.java && ant -Djsr166.expensiveTests=true -Djsr166.tckTestClass=SubmissionPublisherTest -Djsr166.methodFilter=testMissedSignal tck; cvs update -A src/main/java/util/concurrent/SubmissionPublisher.java
984     */
985     public void testMissedSignal_8187947() throws Exception {
986 jsr166 1.25 if (!atLeastJava9()) return; // backport to jdk8 too hard
987 jsr166 1.24 final int N = expensiveTests ? (1 << 20) : (1 << 10);
988     final CountDownLatch finished = new CountDownLatch(1);
989     final SubmissionPublisher<Boolean> pub = new SubmissionPublisher<>();
990     class Sub implements Subscriber<Boolean> {
991     int received;
992     public void onSubscribe(Subscription s) {
993     s.request(N);
994     }
995     public void onNext(Boolean item) {
996     if (++received == N)
997     finished.countDown();
998     else
999     CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE));
1000     }
1001     public void onError(Throwable t) { throw new AssertionError(t); }
1002     public void onComplete() {}
1003     }
1004     pub.subscribe(new Sub());
1005 jsr166 1.28 checkTimedGet(
1006     CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE)),
1007     null);
1008 jsr166 1.24 await(finished);
1009     }
1010 dl 1.1 }