ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SubmissionPublisherTest.java
Revision: 1.24
Committed: Mon Nov 27 01:19:51 2017 UTC (6 years, 5 months ago) by jsr166
Branch: MAIN
Changes since 1.23: +28 -0 lines
Log Message:
add test for JDK-8187947: A race condition in SubmissionPublisher

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea and Martin Buchholz with assistance from
3     * members of JCP JSR-166 Expert Group and released to the public
4     * domain, as explained at
5     * http://creativecommons.org/publicdomain/zero/1.0/
6     */
7    
8 dl 1.10 import java.util.concurrent.CompletableFuture;
9 jsr166 1.24 import java.util.concurrent.CountDownLatch;
10 dl 1.1 import java.util.concurrent.Executor;
11     import java.util.concurrent.Executors;
12     import java.util.concurrent.Flow;
13 dl 1.10 import java.util.concurrent.ForkJoinPool;
14 dl 1.1 import java.util.concurrent.SubmissionPublisher;
15     import java.util.concurrent.atomic.AtomicInteger;
16     import junit.framework.Test;
17     import junit.framework.TestSuite;
18    
19 dl 1.10 import static java.util.concurrent.Flow.Subscriber;
20     import static java.util.concurrent.Flow.Subscription;
21     import static java.util.concurrent.TimeUnit.MILLISECONDS;
22    
23 dl 1.1 public class SubmissionPublisherTest extends JSR166TestCase {
24    
25     public static void main(String[] args) {
26     main(suite(), args);
27     }
28     public static Test suite() {
29     return new TestSuite(SubmissionPublisherTest.class);
30     }
31    
32 dl 1.12 final Executor basicExecutor = basicPublisher().getExecutor();
33 jsr166 1.13
34 dl 1.1 static SubmissionPublisher<Integer> basicPublisher() {
35 dl 1.12 return new SubmissionPublisher<Integer>();
36 dl 1.1 }
37 jsr166 1.2
38 dl 1.1 static class SPException extends RuntimeException {}
39    
40     class TestSubscriber implements Subscriber<Integer> {
41     volatile Subscription sn;
42     int last; // Requires that onNexts are in numeric order
43     volatile int nexts;
44     volatile int errors;
45     volatile int completes;
46     volatile boolean throwOnCall = false;
47     volatile boolean request = true;
48     volatile Throwable lastError;
49    
50     public synchronized void onSubscribe(Subscription s) {
51     threadAssertTrue(sn == null);
52     sn = s;
53     notifyAll();
54     if (throwOnCall)
55     throw new SPException();
56     if (request)
57     sn.request(1L);
58     }
59     public synchronized void onNext(Integer t) {
60     ++nexts;
61     notifyAll();
62     int current = t.intValue();
63     threadAssertTrue(current >= last);
64     last = current;
65     if (request)
66     sn.request(1L);
67     if (throwOnCall)
68     throw new SPException();
69     }
70     public synchronized void onError(Throwable t) {
71     threadAssertTrue(completes == 0);
72     threadAssertTrue(errors == 0);
73     lastError = t;
74     ++errors;
75     notifyAll();
76     }
77     public synchronized void onComplete() {
78     threadAssertTrue(completes == 0);
79     ++completes;
80     notifyAll();
81     }
82    
83     synchronized void awaitSubscribe() {
84     while (sn == null) {
85     try {
86     wait();
87     } catch (Exception ex) {
88     threadUnexpectedException(ex);
89     break;
90     }
91     }
92     }
93     synchronized void awaitNext(int n) {
94     while (nexts < n) {
95     try {
96     wait();
97     } catch (Exception ex) {
98     threadUnexpectedException(ex);
99     break;
100     }
101     }
102     }
103     synchronized void awaitComplete() {
104     while (completes == 0 && errors == 0) {
105     try {
106     wait();
107     } catch (Exception ex) {
108     threadUnexpectedException(ex);
109     break;
110     }
111     }
112     }
113     synchronized void awaitError() {
114     while (errors == 0) {
115     try {
116     wait();
117     } catch (Exception ex) {
118     threadUnexpectedException(ex);
119     break;
120     }
121     }
122     }
123    
124     }
125    
126     /**
127     * A new SubmissionPublisher has no subscribers, a non-null
128     * executor, a power-of-two capacity, is not closed, and reports
129     * zero demand and lag
130     */
131     void checkInitialState(SubmissionPublisher<?> p) {
132     assertFalse(p.hasSubscribers());
133 jsr166 1.6 assertEquals(0, p.getNumberOfSubscribers());
134 dl 1.1 assertTrue(p.getSubscribers().isEmpty());
135     assertFalse(p.isClosed());
136     assertNull(p.getClosedException());
137     int n = p.getMaxBufferCapacity();
138     assertTrue((n & (n - 1)) == 0); // power of two
139     assertNotNull(p.getExecutor());
140 jsr166 1.6 assertEquals(0, p.estimateMinimumDemand());
141     assertEquals(0, p.estimateMaximumLag());
142 dl 1.1 }
143    
144     /**
145     * A default-constructed SubmissionPublisher has no subscribers,
146     * is not closed, has default buffer size, and uses the
147 dl 1.12 * defaultExecutor
148 dl 1.1 */
149     public void testConstructor1() {
150 jsr166 1.18 SubmissionPublisher<Integer> p = new SubmissionPublisher<>();
151 dl 1.1 checkInitialState(p);
152     assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
153 dl 1.12 Executor e = p.getExecutor(), c = ForkJoinPool.commonPool();
154     if (ForkJoinPool.getCommonPoolParallelism() > 1)
155     assertSame(e, c);
156     else
157     assertNotSame(e, c);
158 dl 1.1 }
159    
160     /**
161     * A new SubmissionPublisher has no subscribers, is not closed,
162     * has the given buffer size, and uses the given executor
163     */
164     public void testConstructor2() {
165     Executor e = Executors.newFixedThreadPool(1);
166 jsr166 1.18 SubmissionPublisher<Integer> p = new SubmissionPublisher<>(e, 8);
167 dl 1.1 checkInitialState(p);
168     assertSame(p.getExecutor(), e);
169 jsr166 1.6 assertEquals(8, p.getMaxBufferCapacity());
170 dl 1.1 }
171    
172     /**
173 jsr166 1.21 * A null Executor argument to SubmissionPublisher constructor
174     * throws NullPointerException
175 dl 1.1 */
176     public void testConstructor3() {
177     try {
178 jsr166 1.2 new SubmissionPublisher<Integer>(null, 8);
179 dl 1.1 shouldThrow();
180 jsr166 1.2 } catch (NullPointerException success) {}
181 dl 1.1 }
182    
183     /**
184     * A negative capacity argument to SubmissionPublisher constructor
185 jsr166 1.21 * throws IllegalArgumentException
186 dl 1.1 */
187     public void testConstructor4() {
188     Executor e = Executors.newFixedThreadPool(1);
189     try {
190 jsr166 1.2 new SubmissionPublisher<Integer>(e, -1);
191 dl 1.1 shouldThrow();
192 jsr166 1.2 } catch (IllegalArgumentException success) {}
193 dl 1.1 }
194    
195     /**
196     * A closed publisher reports isClosed with no closedException and
197 jsr166 1.21 * throws IllegalStateException upon attempted submission; a
198     * subsequent close or closeExceptionally has no additional
199     * effect.
200 dl 1.1 */
201     public void testClose() {
202     SubmissionPublisher<Integer> p = basicPublisher();
203     checkInitialState(p);
204     p.close();
205     assertTrue(p.isClosed());
206     assertNull(p.getClosedException());
207     try {
208     p.submit(1);
209     shouldThrow();
210 jsr166 1.2 } catch (IllegalStateException success) {}
211 dl 1.1 Throwable ex = new SPException();
212     p.closeExceptionally(ex);
213     assertTrue(p.isClosed());
214     assertNull(p.getClosedException());
215     }
216    
217     /**
218     * A publisher closedExceptionally reports isClosed with the
219 jsr166 1.21 * closedException and throws IllegalStateException upon attempted
220     * submission; a subsequent close or closeExceptionally has no
221     * additional effect.
222 dl 1.1 */
223     public void testCloseExceptionally() {
224     SubmissionPublisher<Integer> p = basicPublisher();
225     checkInitialState(p);
226     Throwable ex = new SPException();
227     p.closeExceptionally(ex);
228     assertTrue(p.isClosed());
229     assertSame(p.getClosedException(), ex);
230     try {
231     p.submit(1);
232     shouldThrow();
233 jsr166 1.2 } catch (IllegalStateException success) {}
234 dl 1.1 p.close();
235     assertTrue(p.isClosed());
236     assertSame(p.getClosedException(), ex);
237     }
238    
239     /**
240     * Upon subscription, the subscriber's onSubscribe is called, no
241     * other Subscriber methods are invoked, the publisher
242     * hasSubscribers, isSubscribed is true, and existing
243     * subscriptions are unaffected.
244     */
245     public void testSubscribe1() {
246     TestSubscriber s = new TestSubscriber();
247     SubmissionPublisher<Integer> p = basicPublisher();
248     p.subscribe(s);
249     assertTrue(p.hasSubscribers());
250 jsr166 1.6 assertEquals(1, p.getNumberOfSubscribers());
251 dl 1.1 assertTrue(p.getSubscribers().contains(s));
252     assertTrue(p.isSubscribed(s));
253     s.awaitSubscribe();
254     assertNotNull(s.sn);
255 jsr166 1.6 assertEquals(0, s.nexts);
256     assertEquals(0, s.errors);
257     assertEquals(0, s.completes);
258 dl 1.1 TestSubscriber s2 = new TestSubscriber();
259     p.subscribe(s2);
260     assertTrue(p.hasSubscribers());
261 jsr166 1.6 assertEquals(2, p.getNumberOfSubscribers());
262 dl 1.1 assertTrue(p.getSubscribers().contains(s));
263     assertTrue(p.getSubscribers().contains(s2));
264     assertTrue(p.isSubscribed(s));
265     assertTrue(p.isSubscribed(s2));
266     s2.awaitSubscribe();
267     assertNotNull(s2.sn);
268 jsr166 1.6 assertEquals(0, s2.nexts);
269     assertEquals(0, s2.errors);
270     assertEquals(0, s2.completes);
271 dl 1.9 p.close();
272 dl 1.1 }
273    
274     /**
275     * If closed, upon subscription, the subscriber's onComplete
276     * method is invoked
277     */
278     public void testSubscribe2() {
279     TestSubscriber s = new TestSubscriber();
280     SubmissionPublisher<Integer> p = basicPublisher();
281     p.close();
282     p.subscribe(s);
283     s.awaitComplete();
284 jsr166 1.6 assertEquals(0, s.nexts);
285     assertEquals(0, s.errors);
286     assertEquals(1, s.completes, 1);
287 dl 1.1 }
288    
289     /**
290     * If closedExceptionally, upon subscription, the subscriber's
291     * onError method is invoked
292     */
293     public void testSubscribe3() {
294     TestSubscriber s = new TestSubscriber();
295     SubmissionPublisher<Integer> p = basicPublisher();
296     Throwable ex = new SPException();
297     p.closeExceptionally(ex);
298     assertTrue(p.isClosed());
299     assertSame(p.getClosedException(), ex);
300     p.subscribe(s);
301     s.awaitError();
302 jsr166 1.6 assertEquals(0, s.nexts);
303     assertEquals(1, s.errors);
304 dl 1.1 }
305    
306     /**
307     * Upon attempted resubscription, the subscriber's onError is
308     * called and the subscription is cancelled.
309     */
310     public void testSubscribe4() {
311     TestSubscriber s = new TestSubscriber();
312     SubmissionPublisher<Integer> p = basicPublisher();
313     p.subscribe(s);
314     assertTrue(p.hasSubscribers());
315 jsr166 1.6 assertEquals(1, p.getNumberOfSubscribers());
316 dl 1.1 assertTrue(p.getSubscribers().contains(s));
317     assertTrue(p.isSubscribed(s));
318     s.awaitSubscribe();
319     assertNotNull(s.sn);
320 jsr166 1.6 assertEquals(0, s.nexts);
321     assertEquals(0, s.errors);
322     assertEquals(0, s.completes);
323 dl 1.1 p.subscribe(s);
324     s.awaitError();
325 jsr166 1.6 assertEquals(0, s.nexts);
326     assertEquals(1, s.errors);
327 dl 1.1 assertFalse(p.isSubscribed(s));
328     }
329    
330     /**
331     * An exception thrown in onSubscribe causes onError
332     */
333     public void testSubscribe5() {
334     TestSubscriber s = new TestSubscriber();
335     SubmissionPublisher<Integer> p = basicPublisher();
336     s.throwOnCall = true;
337     try {
338     p.subscribe(s);
339 jsr166 1.2 } catch (Exception ok) {}
340 dl 1.1 s.awaitError();
341 jsr166 1.6 assertEquals(0, s.nexts);
342     assertEquals(1, s.errors);
343     assertEquals(0, s.completes);
344 dl 1.1 }
345    
346     /**
347 jsr166 1.8 * subscribe(null) throws NPE
348 dl 1.1 */
349     public void testSubscribe6() {
350     SubmissionPublisher<Integer> p = basicPublisher();
351     try {
352     p.subscribe(null);
353 jsr166 1.2 shouldThrow();
354     } catch (NullPointerException success) {}
355 dl 1.1 checkInitialState(p);
356     }
357    
358     /**
359     * Closing a publisher causes onComplete to subscribers
360     */
361     public void testCloseCompletes() {
362     SubmissionPublisher<Integer> p = basicPublisher();
363     TestSubscriber s1 = new TestSubscriber();
364     TestSubscriber s2 = new TestSubscriber();
365     p.subscribe(s1);
366     p.subscribe(s2);
367     p.submit(1);
368     p.close();
369     assertTrue(p.isClosed());
370     assertNull(p.getClosedException());
371     s1.awaitComplete();
372 jsr166 1.6 assertEquals(1, s1.nexts);
373     assertEquals(1, s1.completes);
374 dl 1.1 s2.awaitComplete();
375 jsr166 1.6 assertEquals(1, s2.nexts);
376     assertEquals(1, s2.completes);
377 dl 1.1 }
378    
379     /**
380     * Closing a publisher exceptionally causes onError to subscribers
381 dl 1.16 * after they are subscribed
382 dl 1.1 */
383     public void testCloseExceptionallyError() {
384     SubmissionPublisher<Integer> p = basicPublisher();
385     TestSubscriber s1 = new TestSubscriber();
386     TestSubscriber s2 = new TestSubscriber();
387     p.subscribe(s1);
388     p.subscribe(s2);
389     p.submit(1);
390     p.closeExceptionally(new SPException());
391     assertTrue(p.isClosed());
392 dl 1.16 s1.awaitSubscribe();
393 dl 1.1 s1.awaitError();
394     assertTrue(s1.nexts <= 1);
395 jsr166 1.6 assertEquals(1, s1.errors);
396 dl 1.17 s2.awaitSubscribe();
397 dl 1.1 s2.awaitError();
398     assertTrue(s2.nexts <= 1);
399 jsr166 1.6 assertEquals(1, s2.errors);
400 dl 1.1 }
401    
402     /**
403     * Cancelling a subscription eventually causes no more onNexts to be issued
404     */
405     public void testCancel() {
406 dl 1.23 SubmissionPublisher<Integer> p =
407     new SubmissionPublisher<Integer>(basicExecutor, 4); // must be < 20
408 dl 1.1 TestSubscriber s1 = new TestSubscriber();
409     TestSubscriber s2 = new TestSubscriber();
410     p.subscribe(s1);
411     p.subscribe(s2);
412     s1.awaitSubscribe();
413     p.submit(1);
414     s1.sn.cancel();
415     for (int i = 2; i <= 20; ++i)
416     p.submit(i);
417     p.close();
418     s2.awaitComplete();
419 jsr166 1.6 assertEquals(20, s2.nexts);
420     assertEquals(1, s2.completes);
421 dl 1.1 assertTrue(s1.nexts < 20);
422     assertFalse(p.isSubscribed(s1));
423     }
424    
425     /**
426     * Throwing an exception in onNext causes onError
427     */
428     public void testThrowOnNext() {
429     SubmissionPublisher<Integer> p = basicPublisher();
430     TestSubscriber s1 = new TestSubscriber();
431     TestSubscriber s2 = new TestSubscriber();
432     p.subscribe(s1);
433     p.subscribe(s2);
434     s1.awaitSubscribe();
435     p.submit(1);
436     s1.throwOnCall = true;
437     p.submit(2);
438     p.close();
439     s2.awaitComplete();
440 jsr166 1.6 assertEquals(2, s2.nexts);
441 dl 1.1 s1.awaitComplete();
442 jsr166 1.6 assertEquals(1, s1.errors);
443 dl 1.1 }
444    
445     /**
446 jsr166 1.8 * If a handler is supplied in constructor, it is invoked when
447 dl 1.1 * subscriber throws an exception in onNext
448     */
449     public void testThrowOnNextHandler() {
450     AtomicInteger calls = new AtomicInteger();
451 jsr166 1.18 SubmissionPublisher<Integer> p = new SubmissionPublisher<>(
452     basicExecutor, 8, (s, e) -> calls.getAndIncrement());
453 dl 1.1 TestSubscriber s1 = new TestSubscriber();
454     TestSubscriber s2 = new TestSubscriber();
455     p.subscribe(s1);
456     p.subscribe(s2);
457     s1.awaitSubscribe();
458     p.submit(1);
459     s1.throwOnCall = true;
460     p.submit(2);
461     p.close();
462     s2.awaitComplete();
463 jsr166 1.6 assertEquals(2, s2.nexts);
464     assertEquals(1, s2.completes);
465 dl 1.1 s1.awaitError();
466 jsr166 1.6 assertEquals(1, s1.errors);
467     assertEquals(1, calls.get());
468 dl 1.1 }
469    
470     /**
471     * onNext items are issued in the same order to each subscriber
472     */
473     public void testOrder() {
474     SubmissionPublisher<Integer> p = basicPublisher();
475     TestSubscriber s1 = new TestSubscriber();
476     TestSubscriber s2 = new TestSubscriber();
477     p.subscribe(s1);
478     p.subscribe(s2);
479     for (int i = 1; i <= 20; ++i)
480     p.submit(i);
481     p.close();
482     s2.awaitComplete();
483     s1.awaitComplete();
484 jsr166 1.6 assertEquals(20, s2.nexts);
485     assertEquals(1, s2.completes);
486     assertEquals(20, s1.nexts);
487     assertEquals(1, s1.completes);
488 dl 1.1 }
489    
490     /**
491     * onNext is issued only if requested
492     */
493     public void testRequest1() {
494     SubmissionPublisher<Integer> p = basicPublisher();
495     TestSubscriber s1 = new TestSubscriber();
496     s1.request = false;
497     p.subscribe(s1);
498     s1.awaitSubscribe();
499 jsr166 1.20 assertEquals(0, p.estimateMinimumDemand());
500 dl 1.1 TestSubscriber s2 = new TestSubscriber();
501     p.subscribe(s2);
502     p.submit(1);
503     p.submit(2);
504     s2.awaitNext(1);
505 jsr166 1.6 assertEquals(0, s1.nexts);
506 dl 1.1 s1.sn.request(3);
507     p.submit(3);
508     p.close();
509     s2.awaitComplete();
510 jsr166 1.6 assertEquals(3, s2.nexts);
511     assertEquals(1, s2.completes);
512 dl 1.1 s1.awaitComplete();
513     assertTrue(s1.nexts > 0);
514 jsr166 1.6 assertEquals(1, s1.completes);
515 dl 1.1 }
516    
517     /**
518     * onNext is not issued when requests become zero
519     */
520     public void testRequest2() {
521     SubmissionPublisher<Integer> p = basicPublisher();
522     TestSubscriber s1 = new TestSubscriber();
523     TestSubscriber s2 = new TestSubscriber();
524     p.subscribe(s1);
525     p.subscribe(s2);
526     s2.awaitSubscribe();
527     s1.awaitSubscribe();
528     s1.request = false;
529     p.submit(1);
530     p.submit(2);
531     p.close();
532     s2.awaitComplete();
533 jsr166 1.6 assertEquals(2, s2.nexts);
534     assertEquals(1, s2.completes);
535 dl 1.1 s1.awaitNext(1);
536 jsr166 1.6 assertEquals(1, s1.nexts);
537 dl 1.1 }
538    
539     /**
540 dl 1.19 * Non-positive request causes error
541 dl 1.1 */
542     public void testRequest3() {
543     SubmissionPublisher<Integer> p = basicPublisher();
544     TestSubscriber s1 = new TestSubscriber();
545     TestSubscriber s2 = new TestSubscriber();
546 dl 1.19 TestSubscriber s3 = new TestSubscriber();
547 dl 1.1 p.subscribe(s1);
548     p.subscribe(s2);
549 dl 1.19 p.subscribe(s3);
550     s3.awaitSubscribe();
551 dl 1.1 s2.awaitSubscribe();
552     s1.awaitSubscribe();
553     s1.sn.request(-1L);
554 dl 1.19 s3.sn.request(0L);
555 dl 1.1 p.submit(1);
556     p.submit(2);
557     p.close();
558     s2.awaitComplete();
559 jsr166 1.6 assertEquals(2, s2.nexts);
560     assertEquals(1, s2.completes);
561 dl 1.1 s1.awaitError();
562 jsr166 1.6 assertEquals(1, s1.errors);
563 dl 1.1 assertTrue(s1.lastError instanceof IllegalArgumentException);
564 dl 1.19 s3.awaitError();
565     assertEquals(1, s3.errors);
566     assertTrue(s3.lastError instanceof IllegalArgumentException);
567 dl 1.1 }
568    
569     /**
570     * estimateMinimumDemand reports 0 until request, nonzero after
571 dl 1.22 * request
572 dl 1.1 */
573     public void testEstimateMinimumDemand() {
574     TestSubscriber s = new TestSubscriber();
575     SubmissionPublisher<Integer> p = basicPublisher();
576     s.request = false;
577     p.subscribe(s);
578     s.awaitSubscribe();
579 jsr166 1.6 assertEquals(0, p.estimateMinimumDemand());
580 dl 1.1 s.sn.request(1);
581 jsr166 1.6 assertEquals(1, p.estimateMinimumDemand());
582 dl 1.1 }
583    
584     /**
585 jsr166 1.3 * submit to a publisher with no subscribers returns lag 0
586 dl 1.1 */
587     public void testEmptySubmit() {
588     SubmissionPublisher<Integer> p = basicPublisher();
589 jsr166 1.6 assertEquals(0, p.submit(1));
590 dl 1.1 }
591    
592     /**
593 jsr166 1.3 * submit(null) throws NPE
594 dl 1.1 */
595     public void testNullSubmit() {
596     SubmissionPublisher<Integer> p = basicPublisher();
597     try {
598     p.submit(null);
599 jsr166 1.2 shouldThrow();
600     } catch (NullPointerException success) {}
601 dl 1.1 }
602    
603     /**
604 jsr166 1.3 * submit returns number of lagged items, compatible with result
605 dl 1.1 * of estimateMaximumLag.
606     */
607     public void testLaggedSubmit() {
608     SubmissionPublisher<Integer> p = basicPublisher();
609     TestSubscriber s1 = new TestSubscriber();
610     s1.request = false;
611     TestSubscriber s2 = new TestSubscriber();
612     s2.request = false;
613     p.subscribe(s1);
614     p.subscribe(s2);
615     s2.awaitSubscribe();
616     s1.awaitSubscribe();
617 jsr166 1.6 assertEquals(1, p.submit(1));
618 dl 1.1 assertTrue(p.estimateMaximumLag() >= 1);
619     assertTrue(p.submit(2) >= 2);
620     assertTrue(p.estimateMaximumLag() >= 2);
621     s1.sn.request(4);
622     assertTrue(p.submit(3) >= 3);
623     assertTrue(p.estimateMaximumLag() >= 3);
624     s2.sn.request(4);
625     p.submit(4);
626     p.close();
627     s2.awaitComplete();
628 jsr166 1.6 assertEquals(4, s2.nexts);
629 dl 1.1 s1.awaitComplete();
630 jsr166 1.6 assertEquals(4, s2.nexts);
631 dl 1.1 }
632    
633     /**
634     * submit eventually issues requested items when buffer capacity is 1
635     */
636     public void testCap1Submit() {
637 jsr166 1.18 SubmissionPublisher<Integer> p
638     = new SubmissionPublisher<>(basicExecutor, 1);
639 dl 1.1 TestSubscriber s1 = new TestSubscriber();
640     TestSubscriber s2 = new TestSubscriber();
641     p.subscribe(s1);
642     p.subscribe(s2);
643     for (int i = 1; i <= 20; ++i) {
644     assertTrue(p.submit(i) >= 0);
645     }
646     p.close();
647     s2.awaitComplete();
648     s1.awaitComplete();
649 jsr166 1.6 assertEquals(20, s2.nexts);
650     assertEquals(1, s2.completes);
651     assertEquals(20, s1.nexts);
652     assertEquals(1, s1.completes);
653 dl 1.1 }
654 jsr166 1.2
655 dl 1.1 static boolean noopHandle(AtomicInteger count) {
656     count.getAndIncrement();
657     return false;
658     }
659    
660     static boolean reqHandle(AtomicInteger count, Subscriber s) {
661     count.getAndIncrement();
662     ((TestSubscriber)s).sn.request(Long.MAX_VALUE);
663     return true;
664     }
665    
666     /**
667 jsr166 1.3 * offer to a publisher with no subscribers returns lag 0
668 dl 1.1 */
669     public void testEmptyOffer() {
670     SubmissionPublisher<Integer> p = basicPublisher();
671 jsr166 1.3 assertEquals(0, p.offer(1, null));
672 dl 1.1 }
673    
674     /**
675 jsr166 1.3 * offer(null) throws NPE
676 dl 1.1 */
677     public void testNullOffer() {
678     SubmissionPublisher<Integer> p = basicPublisher();
679     try {
680     p.offer(null, null);
681 jsr166 1.2 shouldThrow();
682     } catch (NullPointerException success) {}
683 dl 1.1 }
684    
685     /**
686 jsr166 1.3 * offer returns number of lagged items if not saturated
687 dl 1.1 */
688     public void testLaggedOffer() {
689     SubmissionPublisher<Integer> p = basicPublisher();
690     TestSubscriber s1 = new TestSubscriber();
691     s1.request = false;
692     TestSubscriber s2 = new TestSubscriber();
693     s2.request = false;
694     p.subscribe(s1);
695     p.subscribe(s2);
696     s2.awaitSubscribe();
697     s1.awaitSubscribe();
698     assertTrue(p.offer(1, null) >= 1);
699     assertTrue(p.offer(2, null) >= 2);
700     s1.sn.request(4);
701     assertTrue(p.offer(3, null) >= 3);
702     s2.sn.request(4);
703     p.offer(4, null);
704     p.close();
705     s2.awaitComplete();
706 jsr166 1.6 assertEquals(4, s2.nexts);
707 dl 1.1 s1.awaitComplete();
708 jsr166 1.6 assertEquals(4, s2.nexts);
709 dl 1.1 }
710    
711     /**
712 jsr166 1.3 * offer reports drops if saturated
713 dl 1.1 */
714     public void testDroppedOffer() {
715 jsr166 1.18 SubmissionPublisher<Integer> p
716     = new SubmissionPublisher<>(basicExecutor, 4);
717 dl 1.1 TestSubscriber s1 = new TestSubscriber();
718     s1.request = false;
719     TestSubscriber s2 = new TestSubscriber();
720     s2.request = false;
721     p.subscribe(s1);
722     p.subscribe(s2);
723     s2.awaitSubscribe();
724     s1.awaitSubscribe();
725     for (int i = 1; i <= 4; ++i)
726     assertTrue(p.offer(i, null) >= 0);
727     p.offer(5, null);
728     assertTrue(p.offer(6, null) < 0);
729     s1.sn.request(64);
730     assertTrue(p.offer(7, null) < 0);
731     s2.sn.request(64);
732     p.close();
733     s2.awaitComplete();
734     assertTrue(s2.nexts >= 4);
735     s1.awaitComplete();
736     assertTrue(s1.nexts >= 4);
737     }
738    
739     /**
740 jsr166 1.3 * offer invokes drop handler if saturated
741 dl 1.1 */
742     public void testHandledDroppedOffer() {
743     AtomicInteger calls = new AtomicInteger();
744 jsr166 1.18 SubmissionPublisher<Integer> p
745     = new SubmissionPublisher<>(basicExecutor, 4);
746 dl 1.1 TestSubscriber s1 = new TestSubscriber();
747     s1.request = false;
748     TestSubscriber s2 = new TestSubscriber();
749     s2.request = false;
750     p.subscribe(s1);
751     p.subscribe(s2);
752     s2.awaitSubscribe();
753     s1.awaitSubscribe();
754     for (int i = 1; i <= 4; ++i)
755     assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
756     p.offer(4, (s, x) -> noopHandle(calls));
757     assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
758     s1.sn.request(64);
759     assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
760     s2.sn.request(64);
761     p.close();
762     s2.awaitComplete();
763     s1.awaitComplete();
764     assertTrue(calls.get() >= 4);
765     }
766    
767     /**
768 jsr166 1.3 * offer succeeds if drop handler forces request
769 dl 1.1 */
770     public void testRecoveredHandledDroppedOffer() {
771     AtomicInteger calls = new AtomicInteger();
772 jsr166 1.18 SubmissionPublisher<Integer> p
773     = new SubmissionPublisher<>(basicExecutor, 4);
774 dl 1.1 TestSubscriber s1 = new TestSubscriber();
775     s1.request = false;
776     TestSubscriber s2 = new TestSubscriber();
777     s2.request = false;
778     p.subscribe(s1);
779     p.subscribe(s2);
780     s2.awaitSubscribe();
781     s1.awaitSubscribe();
782     int n = 0;
783     for (int i = 1; i <= 8; ++i) {
784     int d = p.offer(i, (s, x) -> reqHandle(calls, s));
785     n = n + 2 + (d < 0 ? d : 0);
786     }
787     p.close();
788     s2.awaitComplete();
789     s1.awaitComplete();
790 jsr166 1.6 assertEquals(n, s1.nexts + s2.nexts);
791 dl 1.1 assertTrue(calls.get() >= 2);
792     }
793    
794     /**
795 jsr166 1.4 * Timed offer to a publisher with no subscribers returns lag 0
796 dl 1.1 */
797     public void testEmptyTimedOffer() {
798     SubmissionPublisher<Integer> p = basicPublisher();
799 jsr166 1.7 long startTime = System.nanoTime();
800 jsr166 1.4 assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null));
801 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
802 dl 1.1 }
803    
804     /**
805 jsr166 1.3 * Timed offer with null item or TimeUnit throws NPE
806 dl 1.1 */
807     public void testNullTimedOffer() {
808     SubmissionPublisher<Integer> p = basicPublisher();
809 jsr166 1.7 long startTime = System.nanoTime();
810 dl 1.1 try {
811 jsr166 1.7 p.offer(null, LONG_DELAY_MS, MILLISECONDS, null);
812 jsr166 1.2 shouldThrow();
813     } catch (NullPointerException success) {}
814 dl 1.1 try {
815 jsr166 1.7 p.offer(1, LONG_DELAY_MS, null, null);
816 jsr166 1.2 shouldThrow();
817     } catch (NullPointerException success) {}
818 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
819 dl 1.1 }
820    
821     /**
822 jsr166 1.3 * Timed offer returns number of lagged items if not saturated
823 dl 1.1 */
824     public void testLaggedTimedOffer() {
825     SubmissionPublisher<Integer> p = basicPublisher();
826     TestSubscriber s1 = new TestSubscriber();
827     s1.request = false;
828     TestSubscriber s2 = new TestSubscriber();
829     s2.request = false;
830     p.subscribe(s1);
831     p.subscribe(s2);
832     s2.awaitSubscribe();
833     s1.awaitSubscribe();
834 jsr166 1.7 long startTime = System.nanoTime();
835     assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
836     assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
837 dl 1.1 s1.sn.request(4);
838 jsr166 1.7 assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
839 dl 1.1 s2.sn.request(4);
840 jsr166 1.7 p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
841 dl 1.1 p.close();
842     s2.awaitComplete();
843 jsr166 1.6 assertEquals(4, s2.nexts);
844 dl 1.1 s1.awaitComplete();
845 jsr166 1.6 assertEquals(4, s2.nexts);
846 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
847 dl 1.1 }
848    
849     /**
850 jsr166 1.3 * Timed offer reports drops if saturated
851 dl 1.1 */
852     public void testDroppedTimedOffer() {
853 jsr166 1.18 SubmissionPublisher<Integer> p
854     = new SubmissionPublisher<>(basicExecutor, 4);
855 dl 1.1 TestSubscriber s1 = new TestSubscriber();
856     s1.request = false;
857     TestSubscriber s2 = new TestSubscriber();
858     s2.request = false;
859     p.subscribe(s1);
860     p.subscribe(s2);
861     s2.awaitSubscribe();
862     s1.awaitSubscribe();
863 dl 1.9 long delay = timeoutMillis();
864 dl 1.1 for (int i = 1; i <= 4; ++i)
865 dl 1.9 assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0);
866     long startTime = System.nanoTime();
867     assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0);
868 dl 1.1 s1.sn.request(64);
869 dl 1.9 assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0);
870     // 2 * delay should elapse but check only 1 * delay to allow timer slop
871     assertTrue(millisElapsedSince(startTime) >= delay);
872 dl 1.1 s2.sn.request(64);
873     p.close();
874     s2.awaitComplete();
875     assertTrue(s2.nexts >= 2);
876     s1.awaitComplete();
877     assertTrue(s1.nexts >= 2);
878     }
879    
880     /**
881 jsr166 1.3 * Timed offer invokes drop handler if saturated
882 dl 1.1 */
883     public void testHandledDroppedTimedOffer() {
884     AtomicInteger calls = new AtomicInteger();
885 jsr166 1.18 SubmissionPublisher<Integer> p
886     = new SubmissionPublisher<>(basicExecutor, 4);
887 dl 1.1 TestSubscriber s1 = new TestSubscriber();
888     s1.request = false;
889     TestSubscriber s2 = new TestSubscriber();
890     s2.request = false;
891     p.subscribe(s1);
892     p.subscribe(s2);
893     s2.awaitSubscribe();
894     s1.awaitSubscribe();
895 dl 1.9 long delay = timeoutMillis();
896 dl 1.1 for (int i = 1; i <= 4; ++i)
897 dl 1.9 assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
898     long startTime = System.nanoTime();
899     assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
900 dl 1.1 s1.sn.request(64);
901 dl 1.9 assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
902     assertTrue(millisElapsedSince(startTime) >= delay);
903 dl 1.1 s2.sn.request(64);
904     p.close();
905     s2.awaitComplete();
906     s1.awaitComplete();
907     assertTrue(calls.get() >= 2);
908     }
909    
910     /**
911 jsr166 1.3 * Timed offer succeeds if drop handler forces request
912 dl 1.1 */
913     public void testRecoveredHandledDroppedTimedOffer() {
914     AtomicInteger calls = new AtomicInteger();
915 jsr166 1.18 SubmissionPublisher<Integer> p
916     = new SubmissionPublisher<>(basicExecutor, 4);
917 dl 1.1 TestSubscriber s1 = new TestSubscriber();
918     s1.request = false;
919     TestSubscriber s2 = new TestSubscriber();
920     s2.request = false;
921     p.subscribe(s1);
922     p.subscribe(s2);
923     s2.awaitSubscribe();
924     s1.awaitSubscribe();
925     int n = 0;
926 dl 1.9 long delay = timeoutMillis();
927     long startTime = System.nanoTime();
928     for (int i = 1; i <= 6; ++i) {
929     int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
930 dl 1.1 n = n + 2 + (d < 0 ? d : 0);
931     }
932 dl 1.9 assertTrue(millisElapsedSince(startTime) >= delay);
933 dl 1.1 p.close();
934     s2.awaitComplete();
935     s1.awaitComplete();
936 jsr166 1.6 assertEquals(n, s1.nexts + s2.nexts);
937 dl 1.1 assertTrue(calls.get() >= 2);
938     }
939    
940 dl 1.10 /**
941     * consume returns a CompletableFuture that is done when
942     * publisher completes
943     */
944     public void testConsume() {
945     AtomicInteger sum = new AtomicInteger();
946     SubmissionPublisher<Integer> p = basicPublisher();
947 jsr166 1.11 CompletableFuture<Void> f =
948 jsr166 1.15 p.consume((Integer x) -> sum.getAndAdd(x.intValue()));
949 dl 1.10 int n = 20;
950     for (int i = 1; i <= n; ++i)
951     p.submit(i);
952     p.close();
953     f.join();
954     assertEquals((n * (n + 1)) / 2, sum.get());
955     }
956    
957     /**
958     * consume(null) throws NPE
959     */
960     public void testConsumeNPE() {
961     SubmissionPublisher<Integer> p = basicPublisher();
962     try {
963     CompletableFuture<Void> f = p.consume(null);
964     shouldThrow();
965 jsr166 1.11 } catch (NullPointerException success) {}
966 dl 1.10 }
967    
968     /**
969     * consume eventually stops processing published items if cancelled
970     */
971     public void testCancelledConsume() {
972     AtomicInteger count = new AtomicInteger();
973     SubmissionPublisher<Integer> p = basicPublisher();
974     CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
975     f.cancel(true);
976     int n = 1000000; // arbitrary limit
977     for (int i = 1; i <= n; ++i)
978     p.submit(i);
979     assertTrue(count.get() < n);
980     }
981 jsr166 1.11
982 jsr166 1.24 /**
983     * Tests scenario for
984     * JDK-8187947: A race condition in SubmissionPublisher
985     * cvs update -D '2017-11-25' src/main/java/util/concurrent/SubmissionPublisher.java && ant -Djsr166.expensiveTests=true -Djsr166.tckTestClass=SubmissionPublisherTest -Djsr166.methodFilter=testMissedSignal tck; cvs update -A src/main/java/util/concurrent/SubmissionPublisher.java
986     */
987     public void testMissedSignal_8187947() throws Exception {
988     final int N = expensiveTests ? (1 << 20) : (1 << 10);
989     final CountDownLatch finished = new CountDownLatch(1);
990     final SubmissionPublisher<Boolean> pub = new SubmissionPublisher<>();
991     class Sub implements Subscriber<Boolean> {
992     int received;
993     public void onSubscribe(Subscription s) {
994     s.request(N);
995     }
996     public void onNext(Boolean item) {
997     if (++received == N)
998     finished.countDown();
999     else
1000     CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE));
1001     }
1002     public void onError(Throwable t) { throw new AssertionError(t); }
1003     public void onComplete() {}
1004     }
1005     pub.subscribe(new Sub());
1006     CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE));
1007     await(finished);
1008     }
1009 dl 1.1 }