ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SubmissionPublisherTest.java
Revision: 1.21
Committed: Mon May 29 22:44:27 2017 UTC (6 years, 11 months ago) by jsr166
Branch: MAIN
Changes since 1.20: +9 -7 lines
Log Message:
more timeout handling rework; remove most uses of MEDIUM_DELAY_MS; randomize timeouts and TimeUnits; write out IAE and ISE

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea and Martin Buchholz with assistance from
3     * members of JCP JSR-166 Expert Group and released to the public
4     * domain, as explained at
5     * http://creativecommons.org/publicdomain/zero/1.0/
6     */
7    
8 dl 1.10 import java.util.concurrent.CompletableFuture;
9 dl 1.1 import java.util.concurrent.Executor;
10     import java.util.concurrent.Executors;
11     import java.util.concurrent.Flow;
12 dl 1.10 import java.util.concurrent.ForkJoinPool;
13 dl 1.1 import java.util.concurrent.SubmissionPublisher;
14     import java.util.concurrent.atomic.AtomicInteger;
15     import junit.framework.Test;
16     import junit.framework.TestSuite;
17    
18 dl 1.10 import static java.util.concurrent.Flow.Subscriber;
19     import static java.util.concurrent.Flow.Subscription;
20     import static java.util.concurrent.TimeUnit.MILLISECONDS;
21    
22 dl 1.1 public class SubmissionPublisherTest extends JSR166TestCase {
23    
24     public static void main(String[] args) {
25     main(suite(), args);
26     }
27     public static Test suite() {
28     return new TestSuite(SubmissionPublisherTest.class);
29     }
30    
31 dl 1.12 final Executor basicExecutor = basicPublisher().getExecutor();
32 jsr166 1.13
33 dl 1.1 static SubmissionPublisher<Integer> basicPublisher() {
34 dl 1.12 return new SubmissionPublisher<Integer>();
35 dl 1.1 }
36 jsr166 1.2
37 dl 1.1 static class SPException extends RuntimeException {}
38    
39     class TestSubscriber implements Subscriber<Integer> {
40     volatile Subscription sn;
41     int last; // Requires that onNexts are in numeric order
42     volatile int nexts;
43     volatile int errors;
44     volatile int completes;
45     volatile boolean throwOnCall = false;
46     volatile boolean request = true;
47     volatile Throwable lastError;
48    
49     public synchronized void onSubscribe(Subscription s) {
50     threadAssertTrue(sn == null);
51     sn = s;
52     notifyAll();
53     if (throwOnCall)
54     throw new SPException();
55     if (request)
56     sn.request(1L);
57     }
58     public synchronized void onNext(Integer t) {
59     ++nexts;
60     notifyAll();
61     int current = t.intValue();
62     threadAssertTrue(current >= last);
63     last = current;
64     if (request)
65     sn.request(1L);
66     if (throwOnCall)
67     throw new SPException();
68     }
69     public synchronized void onError(Throwable t) {
70     threadAssertTrue(completes == 0);
71     threadAssertTrue(errors == 0);
72     lastError = t;
73     ++errors;
74     notifyAll();
75     }
76     public synchronized void onComplete() {
77     threadAssertTrue(completes == 0);
78     ++completes;
79     notifyAll();
80     }
81    
82     synchronized void awaitSubscribe() {
83     while (sn == null) {
84     try {
85     wait();
86     } catch (Exception ex) {
87     threadUnexpectedException(ex);
88     break;
89     }
90     }
91     }
92     synchronized void awaitNext(int n) {
93     while (nexts < n) {
94     try {
95     wait();
96     } catch (Exception ex) {
97     threadUnexpectedException(ex);
98     break;
99     }
100     }
101     }
102     synchronized void awaitComplete() {
103     while (completes == 0 && errors == 0) {
104     try {
105     wait();
106     } catch (Exception ex) {
107     threadUnexpectedException(ex);
108     break;
109     }
110     }
111     }
112     synchronized void awaitError() {
113     while (errors == 0) {
114     try {
115     wait();
116     } catch (Exception ex) {
117     threadUnexpectedException(ex);
118     break;
119     }
120     }
121     }
122    
123     }
124    
125     /**
126     * A new SubmissionPublisher has no subscribers, a non-null
127     * executor, a power-of-two capacity, is not closed, and reports
128     * zero demand and lag
129     */
130     void checkInitialState(SubmissionPublisher<?> p) {
131     assertFalse(p.hasSubscribers());
132 jsr166 1.6 assertEquals(0, p.getNumberOfSubscribers());
133 dl 1.1 assertTrue(p.getSubscribers().isEmpty());
134     assertFalse(p.isClosed());
135     assertNull(p.getClosedException());
136     int n = p.getMaxBufferCapacity();
137     assertTrue((n & (n - 1)) == 0); // power of two
138     assertNotNull(p.getExecutor());
139 jsr166 1.6 assertEquals(0, p.estimateMinimumDemand());
140     assertEquals(0, p.estimateMaximumLag());
141 dl 1.1 }
142    
143     /**
144     * A default-constructed SubmissionPublisher has no subscribers,
145     * is not closed, has default buffer size, and uses the
146 dl 1.12 * defaultExecutor
147 dl 1.1 */
148     public void testConstructor1() {
149 jsr166 1.18 SubmissionPublisher<Integer> p = new SubmissionPublisher<>();
150 dl 1.1 checkInitialState(p);
151     assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
152 dl 1.12 Executor e = p.getExecutor(), c = ForkJoinPool.commonPool();
153     if (ForkJoinPool.getCommonPoolParallelism() > 1)
154     assertSame(e, c);
155     else
156     assertNotSame(e, c);
157 dl 1.1 }
158    
159     /**
160     * A new SubmissionPublisher has no subscribers, is not closed,
161     * has the given buffer size, and uses the given executor
162     */
163     public void testConstructor2() {
164     Executor e = Executors.newFixedThreadPool(1);
165 jsr166 1.18 SubmissionPublisher<Integer> p = new SubmissionPublisher<>(e, 8);
166 dl 1.1 checkInitialState(p);
167     assertSame(p.getExecutor(), e);
168 jsr166 1.6 assertEquals(8, p.getMaxBufferCapacity());
169 dl 1.1 }
170    
171     /**
172 jsr166 1.21 * A null Executor argument to SubmissionPublisher constructor
173     * throws NullPointerException
174 dl 1.1 */
175     public void testConstructor3() {
176     try {
177 jsr166 1.2 new SubmissionPublisher<Integer>(null, 8);
178 dl 1.1 shouldThrow();
179 jsr166 1.2 } catch (NullPointerException success) {}
180 dl 1.1 }
181    
182     /**
183     * A negative capacity argument to SubmissionPublisher constructor
184 jsr166 1.21 * throws IllegalArgumentException
185 dl 1.1 */
186     public void testConstructor4() {
187     Executor e = Executors.newFixedThreadPool(1);
188     try {
189 jsr166 1.2 new SubmissionPublisher<Integer>(e, -1);
190 dl 1.1 shouldThrow();
191 jsr166 1.2 } catch (IllegalArgumentException success) {}
192 dl 1.1 }
193    
194     /**
195     * A closed publisher reports isClosed with no closedException and
196 jsr166 1.21 * throws IllegalStateException upon attempted submission; a
197     * subsequent close or closeExceptionally has no additional
198     * effect.
199 dl 1.1 */
200     public void testClose() {
201     SubmissionPublisher<Integer> p = basicPublisher();
202     checkInitialState(p);
203     p.close();
204     assertTrue(p.isClosed());
205     assertNull(p.getClosedException());
206     try {
207     p.submit(1);
208     shouldThrow();
209 jsr166 1.2 } catch (IllegalStateException success) {}
210 dl 1.1 Throwable ex = new SPException();
211     p.closeExceptionally(ex);
212     assertTrue(p.isClosed());
213     assertNull(p.getClosedException());
214     }
215    
216     /**
217     * A publisher closedExceptionally reports isClosed with the
218 jsr166 1.21 * closedException and throws IllegalStateException upon attempted
219     * submission; a subsequent close or closeExceptionally has no
220     * additional effect.
221 dl 1.1 */
222     public void testCloseExceptionally() {
223     SubmissionPublisher<Integer> p = basicPublisher();
224     checkInitialState(p);
225     Throwable ex = new SPException();
226     p.closeExceptionally(ex);
227     assertTrue(p.isClosed());
228     assertSame(p.getClosedException(), ex);
229     try {
230     p.submit(1);
231     shouldThrow();
232 jsr166 1.2 } catch (IllegalStateException success) {}
233 dl 1.1 p.close();
234     assertTrue(p.isClosed());
235     assertSame(p.getClosedException(), ex);
236     }
237    
238     /**
239     * Upon subscription, the subscriber's onSubscribe is called, no
240     * other Subscriber methods are invoked, the publisher
241     * hasSubscribers, isSubscribed is true, and existing
242     * subscriptions are unaffected.
243     */
244     public void testSubscribe1() {
245     TestSubscriber s = new TestSubscriber();
246     SubmissionPublisher<Integer> p = basicPublisher();
247     p.subscribe(s);
248     assertTrue(p.hasSubscribers());
249 jsr166 1.6 assertEquals(1, p.getNumberOfSubscribers());
250 dl 1.1 assertTrue(p.getSubscribers().contains(s));
251     assertTrue(p.isSubscribed(s));
252     s.awaitSubscribe();
253     assertNotNull(s.sn);
254 jsr166 1.6 assertEquals(0, s.nexts);
255     assertEquals(0, s.errors);
256     assertEquals(0, s.completes);
257 dl 1.1 TestSubscriber s2 = new TestSubscriber();
258     p.subscribe(s2);
259     assertTrue(p.hasSubscribers());
260 jsr166 1.6 assertEquals(2, p.getNumberOfSubscribers());
261 dl 1.1 assertTrue(p.getSubscribers().contains(s));
262     assertTrue(p.getSubscribers().contains(s2));
263     assertTrue(p.isSubscribed(s));
264     assertTrue(p.isSubscribed(s2));
265     s2.awaitSubscribe();
266     assertNotNull(s2.sn);
267 jsr166 1.6 assertEquals(0, s2.nexts);
268     assertEquals(0, s2.errors);
269     assertEquals(0, s2.completes);
270 dl 1.9 p.close();
271 dl 1.1 }
272    
273     /**
274     * If closed, upon subscription, the subscriber's onComplete
275     * method is invoked
276     */
277     public void testSubscribe2() {
278     TestSubscriber s = new TestSubscriber();
279     SubmissionPublisher<Integer> p = basicPublisher();
280     p.close();
281     p.subscribe(s);
282     s.awaitComplete();
283 jsr166 1.6 assertEquals(0, s.nexts);
284     assertEquals(0, s.errors);
285     assertEquals(1, s.completes, 1);
286 dl 1.1 }
287    
288     /**
289     * If closedExceptionally, upon subscription, the subscriber's
290     * onError method is invoked
291     */
292     public void testSubscribe3() {
293     TestSubscriber s = new TestSubscriber();
294     SubmissionPublisher<Integer> p = basicPublisher();
295     Throwable ex = new SPException();
296     p.closeExceptionally(ex);
297     assertTrue(p.isClosed());
298     assertSame(p.getClosedException(), ex);
299     p.subscribe(s);
300     s.awaitError();
301 jsr166 1.6 assertEquals(0, s.nexts);
302     assertEquals(1, s.errors);
303 dl 1.1 }
304    
305     /**
306     * Upon attempted resubscription, the subscriber's onError is
307     * called and the subscription is cancelled.
308     */
309     public void testSubscribe4() {
310     TestSubscriber s = new TestSubscriber();
311     SubmissionPublisher<Integer> p = basicPublisher();
312     p.subscribe(s);
313     assertTrue(p.hasSubscribers());
314 jsr166 1.6 assertEquals(1, p.getNumberOfSubscribers());
315 dl 1.1 assertTrue(p.getSubscribers().contains(s));
316     assertTrue(p.isSubscribed(s));
317     s.awaitSubscribe();
318     assertNotNull(s.sn);
319 jsr166 1.6 assertEquals(0, s.nexts);
320     assertEquals(0, s.errors);
321     assertEquals(0, s.completes);
322 dl 1.1 p.subscribe(s);
323     s.awaitError();
324 jsr166 1.6 assertEquals(0, s.nexts);
325     assertEquals(1, s.errors);
326 dl 1.1 assertFalse(p.isSubscribed(s));
327     }
328    
329     /**
330     * An exception thrown in onSubscribe causes onError
331     */
332     public void testSubscribe5() {
333     TestSubscriber s = new TestSubscriber();
334     SubmissionPublisher<Integer> p = basicPublisher();
335     s.throwOnCall = true;
336     try {
337     p.subscribe(s);
338 jsr166 1.2 } catch (Exception ok) {}
339 dl 1.1 s.awaitError();
340 jsr166 1.6 assertEquals(0, s.nexts);
341     assertEquals(1, s.errors);
342     assertEquals(0, s.completes);
343 dl 1.1 }
344    
345     /**
346 jsr166 1.8 * subscribe(null) throws NPE
347 dl 1.1 */
348     public void testSubscribe6() {
349     SubmissionPublisher<Integer> p = basicPublisher();
350     try {
351     p.subscribe(null);
352 jsr166 1.2 shouldThrow();
353     } catch (NullPointerException success) {}
354 dl 1.1 checkInitialState(p);
355     }
356    
357     /**
358     * Closing a publisher causes onComplete to subscribers
359     */
360     public void testCloseCompletes() {
361     SubmissionPublisher<Integer> p = basicPublisher();
362     TestSubscriber s1 = new TestSubscriber();
363     TestSubscriber s2 = new TestSubscriber();
364     p.subscribe(s1);
365     p.subscribe(s2);
366     p.submit(1);
367     p.close();
368     assertTrue(p.isClosed());
369     assertNull(p.getClosedException());
370     s1.awaitComplete();
371 jsr166 1.6 assertEquals(1, s1.nexts);
372     assertEquals(1, s1.completes);
373 dl 1.1 s2.awaitComplete();
374 jsr166 1.6 assertEquals(1, s2.nexts);
375     assertEquals(1, s2.completes);
376 dl 1.1 }
377    
378     /**
379     * Closing a publisher exceptionally causes onError to subscribers
380 dl 1.16 * after they are subscribed
381 dl 1.1 */
382     public void testCloseExceptionallyError() {
383     SubmissionPublisher<Integer> p = basicPublisher();
384     TestSubscriber s1 = new TestSubscriber();
385     TestSubscriber s2 = new TestSubscriber();
386     p.subscribe(s1);
387     p.subscribe(s2);
388     p.submit(1);
389     p.closeExceptionally(new SPException());
390     assertTrue(p.isClosed());
391 dl 1.16 s1.awaitSubscribe();
392 dl 1.1 s1.awaitError();
393     assertTrue(s1.nexts <= 1);
394 jsr166 1.6 assertEquals(1, s1.errors);
395 dl 1.17 s2.awaitSubscribe();
396 dl 1.1 s2.awaitError();
397     assertTrue(s2.nexts <= 1);
398 jsr166 1.6 assertEquals(1, s2.errors);
399 dl 1.1 }
400    
401     /**
402     * Cancelling a subscription eventually causes no more onNexts to be issued
403     */
404     public void testCancel() {
405     SubmissionPublisher<Integer> p = basicPublisher();
406     TestSubscriber s1 = new TestSubscriber();
407     TestSubscriber s2 = new TestSubscriber();
408     p.subscribe(s1);
409     p.subscribe(s2);
410     s1.awaitSubscribe();
411     p.submit(1);
412     s1.sn.cancel();
413     for (int i = 2; i <= 20; ++i)
414     p.submit(i);
415     p.close();
416     s2.awaitComplete();
417 jsr166 1.6 assertEquals(20, s2.nexts);
418     assertEquals(1, s2.completes);
419 dl 1.1 assertTrue(s1.nexts < 20);
420     assertFalse(p.isSubscribed(s1));
421     }
422    
423     /**
424     * Throwing an exception in onNext causes onError
425     */
426     public void testThrowOnNext() {
427     SubmissionPublisher<Integer> p = basicPublisher();
428     TestSubscriber s1 = new TestSubscriber();
429     TestSubscriber s2 = new TestSubscriber();
430     p.subscribe(s1);
431     p.subscribe(s2);
432     s1.awaitSubscribe();
433     p.submit(1);
434     s1.throwOnCall = true;
435     p.submit(2);
436     p.close();
437     s2.awaitComplete();
438 jsr166 1.6 assertEquals(2, s2.nexts);
439 dl 1.1 s1.awaitComplete();
440 jsr166 1.6 assertEquals(1, s1.errors);
441 dl 1.1 }
442    
443     /**
444 jsr166 1.8 * If a handler is supplied in constructor, it is invoked when
445 dl 1.1 * subscriber throws an exception in onNext
446     */
447     public void testThrowOnNextHandler() {
448     AtomicInteger calls = new AtomicInteger();
449 jsr166 1.18 SubmissionPublisher<Integer> p = new SubmissionPublisher<>(
450     basicExecutor, 8, (s, e) -> calls.getAndIncrement());
451 dl 1.1 TestSubscriber s1 = new TestSubscriber();
452     TestSubscriber s2 = new TestSubscriber();
453     p.subscribe(s1);
454     p.subscribe(s2);
455     s1.awaitSubscribe();
456     p.submit(1);
457     s1.throwOnCall = true;
458     p.submit(2);
459     p.close();
460     s2.awaitComplete();
461 jsr166 1.6 assertEquals(2, s2.nexts);
462     assertEquals(1, s2.completes);
463 dl 1.1 s1.awaitError();
464 jsr166 1.6 assertEquals(1, s1.errors);
465     assertEquals(1, calls.get());
466 dl 1.1 }
467    
468     /**
469     * onNext items are issued in the same order to each subscriber
470     */
471     public void testOrder() {
472     SubmissionPublisher<Integer> p = basicPublisher();
473     TestSubscriber s1 = new TestSubscriber();
474     TestSubscriber s2 = new TestSubscriber();
475     p.subscribe(s1);
476     p.subscribe(s2);
477     for (int i = 1; i <= 20; ++i)
478     p.submit(i);
479     p.close();
480     s2.awaitComplete();
481     s1.awaitComplete();
482 jsr166 1.6 assertEquals(20, s2.nexts);
483     assertEquals(1, s2.completes);
484     assertEquals(20, s1.nexts);
485     assertEquals(1, s1.completes);
486 dl 1.1 }
487    
488     /**
489     * onNext is issued only if requested
490     */
491     public void testRequest1() {
492     SubmissionPublisher<Integer> p = basicPublisher();
493     TestSubscriber s1 = new TestSubscriber();
494     s1.request = false;
495     p.subscribe(s1);
496     s1.awaitSubscribe();
497 jsr166 1.20 assertEquals(0, p.estimateMinimumDemand());
498 dl 1.1 TestSubscriber s2 = new TestSubscriber();
499     p.subscribe(s2);
500     p.submit(1);
501     p.submit(2);
502     s2.awaitNext(1);
503 jsr166 1.6 assertEquals(0, s1.nexts);
504 dl 1.1 s1.sn.request(3);
505     p.submit(3);
506     p.close();
507     s2.awaitComplete();
508 jsr166 1.6 assertEquals(3, s2.nexts);
509     assertEquals(1, s2.completes);
510 dl 1.1 s1.awaitComplete();
511     assertTrue(s1.nexts > 0);
512 jsr166 1.6 assertEquals(1, s1.completes);
513 dl 1.1 }
514    
515     /**
516     * onNext is not issued when requests become zero
517     */
518     public void testRequest2() {
519     SubmissionPublisher<Integer> p = basicPublisher();
520     TestSubscriber s1 = new TestSubscriber();
521     TestSubscriber s2 = new TestSubscriber();
522     p.subscribe(s1);
523     p.subscribe(s2);
524     s2.awaitSubscribe();
525     s1.awaitSubscribe();
526     s1.request = false;
527     p.submit(1);
528     p.submit(2);
529     p.close();
530     s2.awaitComplete();
531 jsr166 1.6 assertEquals(2, s2.nexts);
532     assertEquals(1, s2.completes);
533 dl 1.1 s1.awaitNext(1);
534 jsr166 1.6 assertEquals(1, s1.nexts);
535 dl 1.1 }
536    
537     /**
538 dl 1.19 * Non-positive request causes error
539 dl 1.1 */
540     public void testRequest3() {
541     SubmissionPublisher<Integer> p = basicPublisher();
542     TestSubscriber s1 = new TestSubscriber();
543     TestSubscriber s2 = new TestSubscriber();
544 dl 1.19 TestSubscriber s3 = new TestSubscriber();
545 dl 1.1 p.subscribe(s1);
546     p.subscribe(s2);
547 dl 1.19 p.subscribe(s3);
548     s3.awaitSubscribe();
549 dl 1.1 s2.awaitSubscribe();
550     s1.awaitSubscribe();
551     s1.sn.request(-1L);
552 dl 1.19 s3.sn.request(0L);
553 dl 1.1 p.submit(1);
554     p.submit(2);
555     p.close();
556     s2.awaitComplete();
557 jsr166 1.6 assertEquals(2, s2.nexts);
558     assertEquals(1, s2.completes);
559 dl 1.1 s1.awaitError();
560 jsr166 1.6 assertEquals(1, s1.errors);
561 dl 1.1 assertTrue(s1.lastError instanceof IllegalArgumentException);
562 dl 1.19 s3.awaitError();
563     assertEquals(1, s3.errors);
564     assertTrue(s3.lastError instanceof IllegalArgumentException);
565 dl 1.1 }
566    
567     /**
568     * estimateMinimumDemand reports 0 until request, nonzero after
569     * request, and zero again after delivery
570     */
571     public void testEstimateMinimumDemand() {
572     TestSubscriber s = new TestSubscriber();
573     SubmissionPublisher<Integer> p = basicPublisher();
574     s.request = false;
575     p.subscribe(s);
576     s.awaitSubscribe();
577 jsr166 1.6 assertEquals(0, p.estimateMinimumDemand());
578 dl 1.1 s.sn.request(1);
579 jsr166 1.6 assertEquals(1, p.estimateMinimumDemand());
580 dl 1.1 p.submit(1);
581     s.awaitNext(1);
582 jsr166 1.6 assertEquals(0, p.estimateMinimumDemand());
583 dl 1.1 }
584    
585     /**
586 jsr166 1.3 * submit to a publisher with no subscribers returns lag 0
587 dl 1.1 */
588     public void testEmptySubmit() {
589     SubmissionPublisher<Integer> p = basicPublisher();
590 jsr166 1.6 assertEquals(0, p.submit(1));
591 dl 1.1 }
592    
593     /**
594 jsr166 1.3 * submit(null) throws NPE
595 dl 1.1 */
596     public void testNullSubmit() {
597     SubmissionPublisher<Integer> p = basicPublisher();
598     try {
599     p.submit(null);
600 jsr166 1.2 shouldThrow();
601     } catch (NullPointerException success) {}
602 dl 1.1 }
603    
604     /**
605 jsr166 1.3 * submit returns number of lagged items, compatible with result
606 dl 1.1 * of estimateMaximumLag.
607     */
608     public void testLaggedSubmit() {
609     SubmissionPublisher<Integer> p = basicPublisher();
610     TestSubscriber s1 = new TestSubscriber();
611     s1.request = false;
612     TestSubscriber s2 = new TestSubscriber();
613     s2.request = false;
614     p.subscribe(s1);
615     p.subscribe(s2);
616     s2.awaitSubscribe();
617     s1.awaitSubscribe();
618 jsr166 1.6 assertEquals(1, p.submit(1));
619 dl 1.1 assertTrue(p.estimateMaximumLag() >= 1);
620     assertTrue(p.submit(2) >= 2);
621     assertTrue(p.estimateMaximumLag() >= 2);
622     s1.sn.request(4);
623     assertTrue(p.submit(3) >= 3);
624     assertTrue(p.estimateMaximumLag() >= 3);
625     s2.sn.request(4);
626     p.submit(4);
627     p.close();
628     s2.awaitComplete();
629 jsr166 1.6 assertEquals(4, s2.nexts);
630 dl 1.1 s1.awaitComplete();
631 jsr166 1.6 assertEquals(4, s2.nexts);
632 dl 1.1 }
633    
634     /**
635     * submit eventually issues requested items when buffer capacity is 1
636     */
637     public void testCap1Submit() {
638 jsr166 1.18 SubmissionPublisher<Integer> p
639     = new SubmissionPublisher<>(basicExecutor, 1);
640 dl 1.1 TestSubscriber s1 = new TestSubscriber();
641     TestSubscriber s2 = new TestSubscriber();
642     p.subscribe(s1);
643     p.subscribe(s2);
644     for (int i = 1; i <= 20; ++i) {
645     assertTrue(p.estimateMinimumDemand() <= 1);
646     assertTrue(p.submit(i) >= 0);
647     }
648     p.close();
649     s2.awaitComplete();
650     s1.awaitComplete();
651 jsr166 1.6 assertEquals(20, s2.nexts);
652     assertEquals(1, s2.completes);
653     assertEquals(20, s1.nexts);
654     assertEquals(1, s1.completes);
655 dl 1.1 }
656 jsr166 1.2
657 dl 1.1 static boolean noopHandle(AtomicInteger count) {
658     count.getAndIncrement();
659     return false;
660     }
661    
662     static boolean reqHandle(AtomicInteger count, Subscriber s) {
663     count.getAndIncrement();
664     ((TestSubscriber)s).sn.request(Long.MAX_VALUE);
665     return true;
666     }
667    
668     /**
669 jsr166 1.3 * offer to a publisher with no subscribers returns lag 0
670 dl 1.1 */
671     public void testEmptyOffer() {
672     SubmissionPublisher<Integer> p = basicPublisher();
673 jsr166 1.3 assertEquals(0, p.offer(1, null));
674 dl 1.1 }
675    
676     /**
677 jsr166 1.3 * offer(null) throws NPE
678 dl 1.1 */
679     public void testNullOffer() {
680     SubmissionPublisher<Integer> p = basicPublisher();
681     try {
682     p.offer(null, null);
683 jsr166 1.2 shouldThrow();
684     } catch (NullPointerException success) {}
685 dl 1.1 }
686    
687     /**
688 jsr166 1.3 * offer returns number of lagged items if not saturated
689 dl 1.1 */
690     public void testLaggedOffer() {
691     SubmissionPublisher<Integer> p = basicPublisher();
692     TestSubscriber s1 = new TestSubscriber();
693     s1.request = false;
694     TestSubscriber s2 = new TestSubscriber();
695     s2.request = false;
696     p.subscribe(s1);
697     p.subscribe(s2);
698     s2.awaitSubscribe();
699     s1.awaitSubscribe();
700     assertTrue(p.offer(1, null) >= 1);
701     assertTrue(p.offer(2, null) >= 2);
702     s1.sn.request(4);
703     assertTrue(p.offer(3, null) >= 3);
704     s2.sn.request(4);
705     p.offer(4, null);
706     p.close();
707     s2.awaitComplete();
708 jsr166 1.6 assertEquals(4, s2.nexts);
709 dl 1.1 s1.awaitComplete();
710 jsr166 1.6 assertEquals(4, s2.nexts);
711 dl 1.1 }
712    
713     /**
714 jsr166 1.3 * offer reports drops if saturated
715 dl 1.1 */
716     public void testDroppedOffer() {
717 jsr166 1.18 SubmissionPublisher<Integer> p
718     = new SubmissionPublisher<>(basicExecutor, 4);
719 dl 1.1 TestSubscriber s1 = new TestSubscriber();
720     s1.request = false;
721     TestSubscriber s2 = new TestSubscriber();
722     s2.request = false;
723     p.subscribe(s1);
724     p.subscribe(s2);
725     s2.awaitSubscribe();
726     s1.awaitSubscribe();
727     for (int i = 1; i <= 4; ++i)
728     assertTrue(p.offer(i, null) >= 0);
729     p.offer(5, null);
730     assertTrue(p.offer(6, null) < 0);
731     s1.sn.request(64);
732     assertTrue(p.offer(7, null) < 0);
733     s2.sn.request(64);
734     p.close();
735     s2.awaitComplete();
736     assertTrue(s2.nexts >= 4);
737     s1.awaitComplete();
738     assertTrue(s1.nexts >= 4);
739     }
740    
741     /**
742 jsr166 1.3 * offer invokes drop handler if saturated
743 dl 1.1 */
744     public void testHandledDroppedOffer() {
745     AtomicInteger calls = new AtomicInteger();
746 jsr166 1.18 SubmissionPublisher<Integer> p
747     = new SubmissionPublisher<>(basicExecutor, 4);
748 dl 1.1 TestSubscriber s1 = new TestSubscriber();
749     s1.request = false;
750     TestSubscriber s2 = new TestSubscriber();
751     s2.request = false;
752     p.subscribe(s1);
753     p.subscribe(s2);
754     s2.awaitSubscribe();
755     s1.awaitSubscribe();
756     for (int i = 1; i <= 4; ++i)
757     assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
758     p.offer(4, (s, x) -> noopHandle(calls));
759     assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
760     s1.sn.request(64);
761     assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
762     s2.sn.request(64);
763     p.close();
764     s2.awaitComplete();
765     s1.awaitComplete();
766     assertTrue(calls.get() >= 4);
767     }
768    
769     /**
770 jsr166 1.3 * offer succeeds if drop handler forces request
771 dl 1.1 */
772     public void testRecoveredHandledDroppedOffer() {
773     AtomicInteger calls = new AtomicInteger();
774 jsr166 1.18 SubmissionPublisher<Integer> p
775     = new SubmissionPublisher<>(basicExecutor, 4);
776 dl 1.1 TestSubscriber s1 = new TestSubscriber();
777     s1.request = false;
778     TestSubscriber s2 = new TestSubscriber();
779     s2.request = false;
780     p.subscribe(s1);
781     p.subscribe(s2);
782     s2.awaitSubscribe();
783     s1.awaitSubscribe();
784     int n = 0;
785     for (int i = 1; i <= 8; ++i) {
786     int d = p.offer(i, (s, x) -> reqHandle(calls, s));
787     n = n + 2 + (d < 0 ? d : 0);
788     }
789     p.close();
790     s2.awaitComplete();
791     s1.awaitComplete();
792 jsr166 1.6 assertEquals(n, s1.nexts + s2.nexts);
793 dl 1.1 assertTrue(calls.get() >= 2);
794     }
795    
796     /**
797 jsr166 1.4 * Timed offer to a publisher with no subscribers returns lag 0
798 dl 1.1 */
799     public void testEmptyTimedOffer() {
800     SubmissionPublisher<Integer> p = basicPublisher();
801 jsr166 1.7 long startTime = System.nanoTime();
802 jsr166 1.4 assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null));
803 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
804 dl 1.1 }
805    
806     /**
807 jsr166 1.3 * Timed offer with null item or TimeUnit throws NPE
808 dl 1.1 */
809     public void testNullTimedOffer() {
810     SubmissionPublisher<Integer> p = basicPublisher();
811 jsr166 1.7 long startTime = System.nanoTime();
812 dl 1.1 try {
813 jsr166 1.7 p.offer(null, LONG_DELAY_MS, MILLISECONDS, null);
814 jsr166 1.2 shouldThrow();
815     } catch (NullPointerException success) {}
816 dl 1.1 try {
817 jsr166 1.7 p.offer(1, LONG_DELAY_MS, null, null);
818 jsr166 1.2 shouldThrow();
819     } catch (NullPointerException success) {}
820 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
821 dl 1.1 }
822    
823     /**
824 jsr166 1.3 * Timed offer returns number of lagged items if not saturated
825 dl 1.1 */
826     public void testLaggedTimedOffer() {
827     SubmissionPublisher<Integer> p = basicPublisher();
828     TestSubscriber s1 = new TestSubscriber();
829     s1.request = false;
830     TestSubscriber s2 = new TestSubscriber();
831     s2.request = false;
832     p.subscribe(s1);
833     p.subscribe(s2);
834     s2.awaitSubscribe();
835     s1.awaitSubscribe();
836 jsr166 1.7 long startTime = System.nanoTime();
837     assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
838     assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
839 dl 1.1 s1.sn.request(4);
840 jsr166 1.7 assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
841 dl 1.1 s2.sn.request(4);
842 jsr166 1.7 p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
843 dl 1.1 p.close();
844     s2.awaitComplete();
845 jsr166 1.6 assertEquals(4, s2.nexts);
846 dl 1.1 s1.awaitComplete();
847 jsr166 1.6 assertEquals(4, s2.nexts);
848 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
849 dl 1.1 }
850    
851     /**
852 jsr166 1.3 * Timed offer reports drops if saturated
853 dl 1.1 */
854     public void testDroppedTimedOffer() {
855 jsr166 1.18 SubmissionPublisher<Integer> p
856     = new SubmissionPublisher<>(basicExecutor, 4);
857 dl 1.1 TestSubscriber s1 = new TestSubscriber();
858     s1.request = false;
859     TestSubscriber s2 = new TestSubscriber();
860     s2.request = false;
861     p.subscribe(s1);
862     p.subscribe(s2);
863     s2.awaitSubscribe();
864     s1.awaitSubscribe();
865 dl 1.9 long delay = timeoutMillis();
866 dl 1.1 for (int i = 1; i <= 4; ++i)
867 dl 1.9 assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0);
868     long startTime = System.nanoTime();
869     assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0);
870 dl 1.1 s1.sn.request(64);
871 dl 1.9 assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0);
872     // 2 * delay should elapse but check only 1 * delay to allow timer slop
873     assertTrue(millisElapsedSince(startTime) >= delay);
874 dl 1.1 s2.sn.request(64);
875     p.close();
876     s2.awaitComplete();
877     assertTrue(s2.nexts >= 2);
878     s1.awaitComplete();
879     assertTrue(s1.nexts >= 2);
880     }
881    
882     /**
883 jsr166 1.3 * Timed offer invokes drop handler if saturated
884 dl 1.1 */
885     public void testHandledDroppedTimedOffer() {
886     AtomicInteger calls = new AtomicInteger();
887 jsr166 1.18 SubmissionPublisher<Integer> p
888     = new SubmissionPublisher<>(basicExecutor, 4);
889 dl 1.1 TestSubscriber s1 = new TestSubscriber();
890     s1.request = false;
891     TestSubscriber s2 = new TestSubscriber();
892     s2.request = false;
893     p.subscribe(s1);
894     p.subscribe(s2);
895     s2.awaitSubscribe();
896     s1.awaitSubscribe();
897 dl 1.9 long delay = timeoutMillis();
898 dl 1.1 for (int i = 1; i <= 4; ++i)
899 dl 1.9 assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
900     long startTime = System.nanoTime();
901     assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
902 dl 1.1 s1.sn.request(64);
903 dl 1.9 assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
904     assertTrue(millisElapsedSince(startTime) >= delay);
905 dl 1.1 s2.sn.request(64);
906     p.close();
907     s2.awaitComplete();
908     s1.awaitComplete();
909     assertTrue(calls.get() >= 2);
910     }
911    
912     /**
913 jsr166 1.3 * Timed offer succeeds if drop handler forces request
914 dl 1.1 */
915     public void testRecoveredHandledDroppedTimedOffer() {
916     AtomicInteger calls = new AtomicInteger();
917 jsr166 1.18 SubmissionPublisher<Integer> p
918     = new SubmissionPublisher<>(basicExecutor, 4);
919 dl 1.1 TestSubscriber s1 = new TestSubscriber();
920     s1.request = false;
921     TestSubscriber s2 = new TestSubscriber();
922     s2.request = false;
923     p.subscribe(s1);
924     p.subscribe(s2);
925     s2.awaitSubscribe();
926     s1.awaitSubscribe();
927     int n = 0;
928 dl 1.9 long delay = timeoutMillis();
929     long startTime = System.nanoTime();
930     for (int i = 1; i <= 6; ++i) {
931     int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
932 dl 1.1 n = n + 2 + (d < 0 ? d : 0);
933     }
934 dl 1.9 assertTrue(millisElapsedSince(startTime) >= delay);
935 dl 1.1 p.close();
936     s2.awaitComplete();
937     s1.awaitComplete();
938 jsr166 1.6 assertEquals(n, s1.nexts + s2.nexts);
939 dl 1.1 assertTrue(calls.get() >= 2);
940     }
941    
942 dl 1.10 /**
943     * consume returns a CompletableFuture that is done when
944     * publisher completes
945     */
946     public void testConsume() {
947     AtomicInteger sum = new AtomicInteger();
948     SubmissionPublisher<Integer> p = basicPublisher();
949 jsr166 1.11 CompletableFuture<Void> f =
950 jsr166 1.15 p.consume((Integer x) -> sum.getAndAdd(x.intValue()));
951 dl 1.10 int n = 20;
952     for (int i = 1; i <= n; ++i)
953     p.submit(i);
954     p.close();
955     f.join();
956     assertEquals((n * (n + 1)) / 2, sum.get());
957     }
958    
959     /**
960     * consume(null) throws NPE
961     */
962     public void testConsumeNPE() {
963     SubmissionPublisher<Integer> p = basicPublisher();
964     try {
965     CompletableFuture<Void> f = p.consume(null);
966     shouldThrow();
967 jsr166 1.11 } catch (NullPointerException success) {}
968 dl 1.10 }
969    
970     /**
971     * consume eventually stops processing published items if cancelled
972     */
973     public void testCancelledConsume() {
974     AtomicInteger count = new AtomicInteger();
975     SubmissionPublisher<Integer> p = basicPublisher();
976     CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
977     f.cancel(true);
978     int n = 1000000; // arbitrary limit
979     for (int i = 1; i <= n; ++i)
980     p.submit(i);
981     assertTrue(count.get() < n);
982     }
983 jsr166 1.11
984 dl 1.1 }