ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SubmissionPublisherTest.java
Revision: 1.10
Committed: Sat Sep 12 11:25:15 2015 UTC (8 years, 7 months ago) by dl
Branch: MAIN
Changes since 1.9: +53 -9 lines
Log Message:
Remove Flow.stream for now; move consume to SubmissionPublisher

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea and Martin Buchholz with assistance from
3     * members of JCP JSR-166 Expert Group and released to the public
4     * domain, as explained at
5     * http://creativecommons.org/publicdomain/zero/1.0/
6     */
7    
8 dl 1.10 import java.util.concurrent.CompletableFuture;
9 dl 1.1 import java.util.concurrent.Executor;
10     import java.util.concurrent.Executors;
11     import java.util.concurrent.Flow;
12 dl 1.10 import java.util.concurrent.ForkJoinPool;
13 dl 1.1 import java.util.concurrent.LinkedBlockingQueue;
14     import java.util.concurrent.SubmissionPublisher;
15     import java.util.concurrent.ThreadFactory;
16     import java.util.concurrent.ThreadPoolExecutor;
17     import java.util.concurrent.TimeUnit;
18     import java.util.concurrent.atomic.AtomicInteger;
19     import java.util.function.BiConsumer;
20 dl 1.10 import java.util.function.BiFunction;
21 dl 1.1 import java.util.function.BiPredicate;
22 dl 1.10 import java.util.stream.Stream;
23 dl 1.1 import junit.framework.Test;
24     import junit.framework.TestSuite;
25    
26 dl 1.10 import static java.util.concurrent.Flow.Publisher;
27     import static java.util.concurrent.Flow.Subscriber;
28     import static java.util.concurrent.Flow.Subscription;
29     import static java.util.concurrent.TimeUnit.MILLISECONDS;
30     import static java.util.concurrent.TimeUnit.SECONDS;
31    
32 dl 1.1 public class SubmissionPublisherTest extends JSR166TestCase {
33    
34     public static void main(String[] args) {
35     main(suite(), args);
36     }
37     public static Test suite() {
38     return new TestSuite(SubmissionPublisherTest.class);
39     }
40    
41     // Factory for single thread pool in case commonPool parallelism is zero
42     static final class DaemonThreadFactory implements ThreadFactory {
43     public Thread newThread(Runnable r) {
44     Thread t = new Thread(r);
45     t.setDaemon(true);
46     return t;
47     }
48     }
49 jsr166 1.2
50 dl 1.1 static final Executor basicExecutor =
51 dl 1.9 (ForkJoinPool.getCommonPoolParallelism() > 1) ?
52 dl 1.1 ForkJoinPool.commonPool() :
53     new ThreadPoolExecutor(1, 1, 60, SECONDS,
54     new LinkedBlockingQueue<Runnable>(),
55     new DaemonThreadFactory());
56 jsr166 1.2
57 dl 1.1 static SubmissionPublisher<Integer> basicPublisher() {
58     return new SubmissionPublisher<Integer>(basicExecutor,
59     Flow.defaultBufferSize());
60     }
61 jsr166 1.2
62 dl 1.1 static class SPException extends RuntimeException {}
63    
64     class TestSubscriber implements Subscriber<Integer> {
65     volatile Subscription sn;
66     int last; // Requires that onNexts are in numeric order
67     volatile int nexts;
68     volatile int errors;
69     volatile int completes;
70     volatile boolean throwOnCall = false;
71     volatile boolean request = true;
72     volatile Throwable lastError;
73    
74     public synchronized void onSubscribe(Subscription s) {
75     threadAssertTrue(sn == null);
76     sn = s;
77     notifyAll();
78     if (throwOnCall)
79     throw new SPException();
80     if (request)
81     sn.request(1L);
82     }
83     public synchronized void onNext(Integer t) {
84     ++nexts;
85     notifyAll();
86     int current = t.intValue();
87     threadAssertTrue(current >= last);
88     last = current;
89     if (request)
90     sn.request(1L);
91     if (throwOnCall)
92     throw new SPException();
93     }
94     public synchronized void onError(Throwable t) {
95     threadAssertTrue(completes == 0);
96     threadAssertTrue(errors == 0);
97     lastError = t;
98     ++errors;
99     notifyAll();
100     }
101     public synchronized void onComplete() {
102     threadAssertTrue(completes == 0);
103     ++completes;
104     notifyAll();
105     }
106    
107     synchronized void awaitSubscribe() {
108     while (sn == null) {
109     try {
110     wait();
111     } catch (Exception ex) {
112     threadUnexpectedException(ex);
113     break;
114     }
115     }
116     }
117     synchronized void awaitNext(int n) {
118     while (nexts < n) {
119     try {
120     wait();
121     } catch (Exception ex) {
122     threadUnexpectedException(ex);
123     break;
124     }
125     }
126     }
127     synchronized void awaitComplete() {
128     while (completes == 0 && errors == 0) {
129     try {
130     wait();
131     } catch (Exception ex) {
132     threadUnexpectedException(ex);
133     break;
134     }
135     }
136     }
137     synchronized void awaitError() {
138     while (errors == 0) {
139     try {
140     wait();
141     } catch (Exception ex) {
142     threadUnexpectedException(ex);
143     break;
144     }
145     }
146     }
147    
148     }
149    
150     /**
151     * A new SubmissionPublisher has no subscribers, a non-null
152     * executor, a power-of-two capacity, is not closed, and reports
153     * zero demand and lag
154     */
155     void checkInitialState(SubmissionPublisher<?> p) {
156     assertFalse(p.hasSubscribers());
157 jsr166 1.6 assertEquals(0, p.getNumberOfSubscribers());
158 dl 1.1 assertTrue(p.getSubscribers().isEmpty());
159     assertFalse(p.isClosed());
160     assertNull(p.getClosedException());
161     int n = p.getMaxBufferCapacity();
162     assertTrue((n & (n - 1)) == 0); // power of two
163     assertNotNull(p.getExecutor());
164 jsr166 1.6 assertEquals(0, p.estimateMinimumDemand());
165     assertEquals(0, p.estimateMaximumLag());
166 dl 1.1 }
167    
168     /**
169     * A default-constructed SubmissionPublisher has no subscribers,
170     * is not closed, has default buffer size, and uses the
171     * ForkJoinPool.commonPool executor
172     */
173     public void testConstructor1() {
174     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>();
175     checkInitialState(p);
176     assertSame(p.getExecutor(), ForkJoinPool.commonPool());
177     assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
178     }
179    
180     /**
181     * A new SubmissionPublisher has no subscribers, is not closed,
182     * has the given buffer size, and uses the given executor
183     */
184     public void testConstructor2() {
185     Executor e = Executors.newFixedThreadPool(1);
186     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(e, 8);
187     checkInitialState(p);
188     assertSame(p.getExecutor(), e);
189 jsr166 1.6 assertEquals(8, p.getMaxBufferCapacity());
190 dl 1.1 }
191    
192     /**
193     * A null Executor argument to SubmissionPublisher constructor throws NPE
194     */
195     public void testConstructor3() {
196     try {
197 jsr166 1.2 new SubmissionPublisher<Integer>(null, 8);
198 dl 1.1 shouldThrow();
199 jsr166 1.2 } catch (NullPointerException success) {}
200 dl 1.1 }
201    
202     /**
203     * A negative capacity argument to SubmissionPublisher constructor
204     * throws IAE
205     */
206     public void testConstructor4() {
207     Executor e = Executors.newFixedThreadPool(1);
208     try {
209 jsr166 1.2 new SubmissionPublisher<Integer>(e, -1);
210 dl 1.1 shouldThrow();
211 jsr166 1.2 } catch (IllegalArgumentException success) {}
212 dl 1.1 }
213    
214     /**
215     * A closed publisher reports isClosed with no closedException and
216     * throws ISE upon attempted submission; a subsequent close or
217     * closeExceptionally has no additional effect.
218     */
219     public void testClose() {
220     SubmissionPublisher<Integer> p = basicPublisher();
221     checkInitialState(p);
222     p.close();
223     assertTrue(p.isClosed());
224     assertNull(p.getClosedException());
225     try {
226     p.submit(1);
227     shouldThrow();
228 jsr166 1.2 } catch (IllegalStateException success) {}
229 dl 1.1 Throwable ex = new SPException();
230     p.closeExceptionally(ex);
231     assertTrue(p.isClosed());
232     assertNull(p.getClosedException());
233     }
234    
235     /**
236     * A publisher closedExceptionally reports isClosed with the
237     * closedException and throws ISE upon attempted submission; a
238     * subsequent close or closeExceptionally has no additional
239     * effect.
240     */
241     public void testCloseExceptionally() {
242     SubmissionPublisher<Integer> p = basicPublisher();
243     checkInitialState(p);
244     Throwable ex = new SPException();
245     p.closeExceptionally(ex);
246     assertTrue(p.isClosed());
247     assertSame(p.getClosedException(), ex);
248     try {
249     p.submit(1);
250     shouldThrow();
251 jsr166 1.2 } catch (IllegalStateException success) {}
252 dl 1.1 p.close();
253     assertTrue(p.isClosed());
254     assertSame(p.getClosedException(), ex);
255     }
256    
257     /**
258     * Upon subscription, the subscriber's onSubscribe is called, no
259     * other Subscriber methods are invoked, the publisher
260     * hasSubscribers, isSubscribed is true, and existing
261     * subscriptions are unaffected.
262     */
263     public void testSubscribe1() {
264     TestSubscriber s = new TestSubscriber();
265     SubmissionPublisher<Integer> p = basicPublisher();
266     p.subscribe(s);
267     assertTrue(p.hasSubscribers());
268 jsr166 1.6 assertEquals(1, p.getNumberOfSubscribers());
269 dl 1.1 assertTrue(p.getSubscribers().contains(s));
270     assertTrue(p.isSubscribed(s));
271     s.awaitSubscribe();
272     assertNotNull(s.sn);
273 jsr166 1.6 assertEquals(0, s.nexts);
274     assertEquals(0, s.errors);
275     assertEquals(0, s.completes);
276 dl 1.1 TestSubscriber s2 = new TestSubscriber();
277     p.subscribe(s2);
278     assertTrue(p.hasSubscribers());
279 jsr166 1.6 assertEquals(2, p.getNumberOfSubscribers());
280 dl 1.1 assertTrue(p.getSubscribers().contains(s));
281     assertTrue(p.getSubscribers().contains(s2));
282     assertTrue(p.isSubscribed(s));
283     assertTrue(p.isSubscribed(s2));
284     s2.awaitSubscribe();
285     assertNotNull(s2.sn);
286 jsr166 1.6 assertEquals(0, s2.nexts);
287     assertEquals(0, s2.errors);
288     assertEquals(0, s2.completes);
289 dl 1.9 p.close();
290 dl 1.1 }
291    
292     /**
293     * If closed, upon subscription, the subscriber's onComplete
294     * method is invoked
295     */
296     public void testSubscribe2() {
297     TestSubscriber s = new TestSubscriber();
298     SubmissionPublisher<Integer> p = basicPublisher();
299     p.close();
300     p.subscribe(s);
301     s.awaitComplete();
302 jsr166 1.6 assertEquals(0, s.nexts);
303     assertEquals(0, s.errors);
304     assertEquals(1, s.completes, 1);
305 dl 1.1 }
306    
307     /**
308     * If closedExceptionally, upon subscription, the subscriber's
309     * onError method is invoked
310     */
311     public void testSubscribe3() {
312     TestSubscriber s = new TestSubscriber();
313     SubmissionPublisher<Integer> p = basicPublisher();
314     Throwable ex = new SPException();
315     p.closeExceptionally(ex);
316     assertTrue(p.isClosed());
317     assertSame(p.getClosedException(), ex);
318     p.subscribe(s);
319     s.awaitError();
320 jsr166 1.6 assertEquals(0, s.nexts);
321     assertEquals(1, s.errors);
322 dl 1.1 }
323    
324     /**
325     * Upon attempted resubscription, the subscriber's onError is
326     * called and the subscription is cancelled.
327     */
328     public void testSubscribe4() {
329     TestSubscriber s = new TestSubscriber();
330     SubmissionPublisher<Integer> p = basicPublisher();
331     p.subscribe(s);
332     assertTrue(p.hasSubscribers());
333 jsr166 1.6 assertEquals(1, p.getNumberOfSubscribers());
334 dl 1.1 assertTrue(p.getSubscribers().contains(s));
335     assertTrue(p.isSubscribed(s));
336     s.awaitSubscribe();
337     assertNotNull(s.sn);
338 jsr166 1.6 assertEquals(0, s.nexts);
339     assertEquals(0, s.errors);
340     assertEquals(0, s.completes);
341 dl 1.1 p.subscribe(s);
342     s.awaitError();
343 jsr166 1.6 assertEquals(0, s.nexts);
344     assertEquals(1, s.errors);
345 dl 1.1 assertFalse(p.isSubscribed(s));
346     }
347    
348     /**
349     * An exception thrown in onSubscribe causes onError
350     */
351     public void testSubscribe5() {
352     TestSubscriber s = new TestSubscriber();
353     SubmissionPublisher<Integer> p = basicPublisher();
354     s.throwOnCall = true;
355     try {
356     p.subscribe(s);
357 jsr166 1.2 } catch (Exception ok) {}
358 dl 1.1 s.awaitError();
359 jsr166 1.6 assertEquals(0, s.nexts);
360     assertEquals(1, s.errors);
361     assertEquals(0, s.completes);
362 dl 1.1 }
363    
364     /**
365 jsr166 1.8 * subscribe(null) throws NPE
366 dl 1.1 */
367     public void testSubscribe6() {
368     SubmissionPublisher<Integer> p = basicPublisher();
369     try {
370     p.subscribe(null);
371 jsr166 1.2 shouldThrow();
372     } catch (NullPointerException success) {}
373 dl 1.1 checkInitialState(p);
374     }
375    
376     /**
377     * Closing a publisher causes onComplete to subscribers
378     */
379     public void testCloseCompletes() {
380     SubmissionPublisher<Integer> p = basicPublisher();
381     TestSubscriber s1 = new TestSubscriber();
382     TestSubscriber s2 = new TestSubscriber();
383     p.subscribe(s1);
384     p.subscribe(s2);
385     p.submit(1);
386     p.close();
387     assertTrue(p.isClosed());
388     assertNull(p.getClosedException());
389     s1.awaitComplete();
390 jsr166 1.6 assertEquals(1, s1.nexts);
391     assertEquals(1, s1.completes);
392 dl 1.1 s2.awaitComplete();
393 jsr166 1.6 assertEquals(1, s2.nexts);
394     assertEquals(1, s2.completes);
395 dl 1.1 }
396    
397     /**
398     * Closing a publisher exceptionally causes onError to subscribers
399     */
400     public void testCloseExceptionallyError() {
401     SubmissionPublisher<Integer> p = basicPublisher();
402     TestSubscriber s1 = new TestSubscriber();
403     TestSubscriber s2 = new TestSubscriber();
404     p.subscribe(s1);
405     p.subscribe(s2);
406     p.submit(1);
407     p.closeExceptionally(new SPException());
408     assertTrue(p.isClosed());
409     s1.awaitError();
410     assertTrue(s1.nexts <= 1);
411 jsr166 1.6 assertEquals(1, s1.errors);
412 dl 1.1 s2.awaitError();
413     assertTrue(s2.nexts <= 1);
414 jsr166 1.6 assertEquals(1, s2.errors);
415 dl 1.1 }
416    
417     /**
418     * Cancelling a subscription eventually causes no more onNexts to be issued
419     */
420     public void testCancel() {
421     SubmissionPublisher<Integer> p = basicPublisher();
422     TestSubscriber s1 = new TestSubscriber();
423     TestSubscriber s2 = new TestSubscriber();
424     p.subscribe(s1);
425     p.subscribe(s2);
426     s1.awaitSubscribe();
427     p.submit(1);
428     s1.sn.cancel();
429     for (int i = 2; i <= 20; ++i)
430     p.submit(i);
431     p.close();
432     s2.awaitComplete();
433 jsr166 1.6 assertEquals(20, s2.nexts);
434     assertEquals(1, s2.completes);
435 dl 1.1 assertTrue(s1.nexts < 20);
436     assertFalse(p.isSubscribed(s1));
437     }
438    
439     /**
440     * Throwing an exception in onNext causes onError
441     */
442     public void testThrowOnNext() {
443     SubmissionPublisher<Integer> p = basicPublisher();
444     TestSubscriber s1 = new TestSubscriber();
445     TestSubscriber s2 = new TestSubscriber();
446     p.subscribe(s1);
447     p.subscribe(s2);
448     s1.awaitSubscribe();
449     p.submit(1);
450     s1.throwOnCall = true;
451     p.submit(2);
452     p.close();
453     s2.awaitComplete();
454 jsr166 1.6 assertEquals(2, s2.nexts);
455 dl 1.1 s1.awaitComplete();
456 jsr166 1.6 assertEquals(1, s1.errors);
457 dl 1.1 }
458    
459     /**
460 jsr166 1.8 * If a handler is supplied in constructor, it is invoked when
461 dl 1.1 * subscriber throws an exception in onNext
462     */
463     public void testThrowOnNextHandler() {
464     AtomicInteger calls = new AtomicInteger();
465     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>
466     (basicExecutor, 8,
467     (s, e) -> calls.getAndIncrement());
468     TestSubscriber s1 = new TestSubscriber();
469     TestSubscriber s2 = new TestSubscriber();
470     p.subscribe(s1);
471     p.subscribe(s2);
472     s1.awaitSubscribe();
473     p.submit(1);
474     s1.throwOnCall = true;
475     p.submit(2);
476     p.close();
477     s2.awaitComplete();
478 jsr166 1.6 assertEquals(2, s2.nexts);
479     assertEquals(1, s2.completes);
480 dl 1.1 s1.awaitError();
481 jsr166 1.6 assertEquals(1, s1.errors);
482     assertEquals(1, calls.get());
483 dl 1.1 }
484    
485     /**
486     * onNext items are issued in the same order to each subscriber
487     */
488     public void testOrder() {
489     SubmissionPublisher<Integer> p = basicPublisher();
490     TestSubscriber s1 = new TestSubscriber();
491     TestSubscriber s2 = new TestSubscriber();
492     p.subscribe(s1);
493     p.subscribe(s2);
494     for (int i = 1; i <= 20; ++i)
495     p.submit(i);
496     p.close();
497     s2.awaitComplete();
498     s1.awaitComplete();
499 jsr166 1.6 assertEquals(20, s2.nexts);
500     assertEquals(1, s2.completes);
501     assertEquals(20, s1.nexts);
502     assertEquals(1, s1.completes);
503 dl 1.1 }
504    
505     /**
506     * onNext is issued only if requested
507     */
508     public void testRequest1() {
509     SubmissionPublisher<Integer> p = basicPublisher();
510     TestSubscriber s1 = new TestSubscriber();
511     s1.request = false;
512     p.subscribe(s1);
513     s1.awaitSubscribe();
514     assertTrue(p.estimateMinimumDemand() == 0);
515     TestSubscriber s2 = new TestSubscriber();
516     p.subscribe(s2);
517     p.submit(1);
518     p.submit(2);
519     s2.awaitNext(1);
520 jsr166 1.6 assertEquals(0, s1.nexts);
521 dl 1.1 s1.sn.request(3);
522     p.submit(3);
523     p.close();
524     s2.awaitComplete();
525 jsr166 1.6 assertEquals(3, s2.nexts);
526     assertEquals(1, s2.completes);
527 dl 1.1 s1.awaitComplete();
528     assertTrue(s1.nexts > 0);
529 jsr166 1.6 assertEquals(1, s1.completes);
530 dl 1.1 }
531    
532     /**
533     * onNext is not issued when requests become zero
534     */
535     public void testRequest2() {
536     SubmissionPublisher<Integer> p = basicPublisher();
537     TestSubscriber s1 = new TestSubscriber();
538     TestSubscriber s2 = new TestSubscriber();
539     p.subscribe(s1);
540     p.subscribe(s2);
541     s2.awaitSubscribe();
542     s1.awaitSubscribe();
543     s1.request = false;
544     p.submit(1);
545     p.submit(2);
546     p.close();
547     s2.awaitComplete();
548 jsr166 1.6 assertEquals(2, s2.nexts);
549     assertEquals(1, s2.completes);
550 dl 1.1 s1.awaitNext(1);
551 jsr166 1.6 assertEquals(1, s1.nexts);
552 dl 1.1 }
553    
554     /**
555     * Negative request causes error
556     */
557     public void testRequest3() {
558     SubmissionPublisher<Integer> p = basicPublisher();
559     TestSubscriber s1 = new TestSubscriber();
560     TestSubscriber s2 = new TestSubscriber();
561     p.subscribe(s1);
562     p.subscribe(s2);
563     s2.awaitSubscribe();
564     s1.awaitSubscribe();
565     s1.sn.request(-1L);
566     p.submit(1);
567     p.submit(2);
568     p.close();
569     s2.awaitComplete();
570 jsr166 1.6 assertEquals(2, s2.nexts);
571     assertEquals(1, s2.completes);
572 dl 1.1 s1.awaitError();
573 jsr166 1.6 assertEquals(1, s1.errors);
574 dl 1.1 assertTrue(s1.lastError instanceof IllegalArgumentException);
575     }
576    
577     /**
578     * estimateMinimumDemand reports 0 until request, nonzero after
579     * request, and zero again after delivery
580     */
581     public void testEstimateMinimumDemand() {
582     TestSubscriber s = new TestSubscriber();
583     SubmissionPublisher<Integer> p = basicPublisher();
584     s.request = false;
585     p.subscribe(s);
586     s.awaitSubscribe();
587 jsr166 1.6 assertEquals(0, p.estimateMinimumDemand());
588 dl 1.1 s.sn.request(1);
589 jsr166 1.6 assertEquals(1, p.estimateMinimumDemand());
590 dl 1.1 p.submit(1);
591     s.awaitNext(1);
592 jsr166 1.6 assertEquals(0, p.estimateMinimumDemand());
593 dl 1.1 }
594    
595     /**
596 jsr166 1.3 * submit to a publisher with no subscribers returns lag 0
597 dl 1.1 */
598     public void testEmptySubmit() {
599     SubmissionPublisher<Integer> p = basicPublisher();
600 jsr166 1.6 assertEquals(0, p.submit(1));
601 dl 1.1 }
602    
603     /**
604 jsr166 1.3 * submit(null) throws NPE
605 dl 1.1 */
606     public void testNullSubmit() {
607     SubmissionPublisher<Integer> p = basicPublisher();
608     try {
609     p.submit(null);
610 jsr166 1.2 shouldThrow();
611     } catch (NullPointerException success) {}
612 dl 1.1 }
613    
614     /**
615 jsr166 1.3 * submit returns number of lagged items, compatible with result
616 dl 1.1 * of estimateMaximumLag.
617     */
618     public void testLaggedSubmit() {
619     SubmissionPublisher<Integer> p = basicPublisher();
620     TestSubscriber s1 = new TestSubscriber();
621     s1.request = false;
622     TestSubscriber s2 = new TestSubscriber();
623     s2.request = false;
624     p.subscribe(s1);
625     p.subscribe(s2);
626     s2.awaitSubscribe();
627     s1.awaitSubscribe();
628 jsr166 1.6 assertEquals(1, p.submit(1));
629 dl 1.1 assertTrue(p.estimateMaximumLag() >= 1);
630     assertTrue(p.submit(2) >= 2);
631     assertTrue(p.estimateMaximumLag() >= 2);
632     s1.sn.request(4);
633     assertTrue(p.submit(3) >= 3);
634     assertTrue(p.estimateMaximumLag() >= 3);
635     s2.sn.request(4);
636     p.submit(4);
637     p.close();
638     s2.awaitComplete();
639 jsr166 1.6 assertEquals(4, s2.nexts);
640 dl 1.1 s1.awaitComplete();
641 jsr166 1.6 assertEquals(4, s2.nexts);
642 dl 1.1 }
643    
644     /**
645     * submit eventually issues requested items when buffer capacity is 1
646     */
647     public void testCap1Submit() {
648     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
649     basicExecutor, 1);
650     TestSubscriber s1 = new TestSubscriber();
651     TestSubscriber s2 = new TestSubscriber();
652     p.subscribe(s1);
653     p.subscribe(s2);
654     for (int i = 1; i <= 20; ++i) {
655     assertTrue(p.estimateMinimumDemand() <= 1);
656     assertTrue(p.submit(i) >= 0);
657     }
658     p.close();
659     s2.awaitComplete();
660     s1.awaitComplete();
661 jsr166 1.6 assertEquals(20, s2.nexts);
662     assertEquals(1, s2.completes);
663     assertEquals(20, s1.nexts);
664     assertEquals(1, s1.completes);
665 dl 1.1 }
666 jsr166 1.2
667 dl 1.1 static boolean noopHandle(AtomicInteger count) {
668     count.getAndIncrement();
669     return false;
670     }
671    
672     static boolean reqHandle(AtomicInteger count, Subscriber s) {
673     count.getAndIncrement();
674     ((TestSubscriber)s).sn.request(Long.MAX_VALUE);
675     return true;
676     }
677    
678     /**
679 jsr166 1.3 * offer to a publisher with no subscribers returns lag 0
680 dl 1.1 */
681     public void testEmptyOffer() {
682     SubmissionPublisher<Integer> p = basicPublisher();
683 jsr166 1.3 assertEquals(0, p.offer(1, null));
684 dl 1.1 }
685    
686     /**
687 jsr166 1.3 * offer(null) throws NPE
688 dl 1.1 */
689     public void testNullOffer() {
690     SubmissionPublisher<Integer> p = basicPublisher();
691     try {
692     p.offer(null, null);
693 jsr166 1.2 shouldThrow();
694     } catch (NullPointerException success) {}
695 dl 1.1 }
696    
697     /**
698 jsr166 1.3 * offer returns number of lagged items if not saturated
699 dl 1.1 */
700     public void testLaggedOffer() {
701     SubmissionPublisher<Integer> p = basicPublisher();
702     TestSubscriber s1 = new TestSubscriber();
703     s1.request = false;
704     TestSubscriber s2 = new TestSubscriber();
705     s2.request = false;
706     p.subscribe(s1);
707     p.subscribe(s2);
708     s2.awaitSubscribe();
709     s1.awaitSubscribe();
710     assertTrue(p.offer(1, null) >= 1);
711     assertTrue(p.offer(2, null) >= 2);
712     s1.sn.request(4);
713     assertTrue(p.offer(3, null) >= 3);
714     s2.sn.request(4);
715     p.offer(4, null);
716     p.close();
717     s2.awaitComplete();
718 jsr166 1.6 assertEquals(4, s2.nexts);
719 dl 1.1 s1.awaitComplete();
720 jsr166 1.6 assertEquals(4, s2.nexts);
721 dl 1.1 }
722    
723     /**
724 jsr166 1.3 * offer reports drops if saturated
725 dl 1.1 */
726     public void testDroppedOffer() {
727     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
728     basicExecutor, 4);
729     TestSubscriber s1 = new TestSubscriber();
730     s1.request = false;
731     TestSubscriber s2 = new TestSubscriber();
732     s2.request = false;
733     p.subscribe(s1);
734     p.subscribe(s2);
735     s2.awaitSubscribe();
736     s1.awaitSubscribe();
737     for (int i = 1; i <= 4; ++i)
738     assertTrue(p.offer(i, null) >= 0);
739     p.offer(5, null);
740     assertTrue(p.offer(6, null) < 0);
741     s1.sn.request(64);
742     assertTrue(p.offer(7, null) < 0);
743     s2.sn.request(64);
744     p.close();
745     s2.awaitComplete();
746     assertTrue(s2.nexts >= 4);
747     s1.awaitComplete();
748     assertTrue(s1.nexts >= 4);
749     }
750    
751     /**
752 jsr166 1.3 * offer invokes drop handler if saturated
753 dl 1.1 */
754     public void testHandledDroppedOffer() {
755     AtomicInteger calls = new AtomicInteger();
756     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
757     basicExecutor, 4);
758     TestSubscriber s1 = new TestSubscriber();
759     s1.request = false;
760     TestSubscriber s2 = new TestSubscriber();
761     s2.request = false;
762     p.subscribe(s1);
763     p.subscribe(s2);
764     s2.awaitSubscribe();
765     s1.awaitSubscribe();
766     for (int i = 1; i <= 4; ++i)
767     assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
768     p.offer(4, (s, x) -> noopHandle(calls));
769     assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
770     s1.sn.request(64);
771     assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
772     s2.sn.request(64);
773     p.close();
774     s2.awaitComplete();
775     s1.awaitComplete();
776     assertTrue(calls.get() >= 4);
777     }
778    
779     /**
780 jsr166 1.3 * offer succeeds if drop handler forces request
781 dl 1.1 */
782     public void testRecoveredHandledDroppedOffer() {
783     AtomicInteger calls = new AtomicInteger();
784     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
785     basicExecutor, 4);
786     TestSubscriber s1 = new TestSubscriber();
787     s1.request = false;
788     TestSubscriber s2 = new TestSubscriber();
789     s2.request = false;
790     p.subscribe(s1);
791     p.subscribe(s2);
792     s2.awaitSubscribe();
793     s1.awaitSubscribe();
794     int n = 0;
795     for (int i = 1; i <= 8; ++i) {
796     int d = p.offer(i, (s, x) -> reqHandle(calls, s));
797     n = n + 2 + (d < 0 ? d : 0);
798     }
799     p.close();
800     s2.awaitComplete();
801     s1.awaitComplete();
802 jsr166 1.6 assertEquals(n, s1.nexts + s2.nexts);
803 dl 1.1 assertTrue(calls.get() >= 2);
804     }
805    
806     /**
807 jsr166 1.4 * Timed offer to a publisher with no subscribers returns lag 0
808 dl 1.1 */
809     public void testEmptyTimedOffer() {
810     SubmissionPublisher<Integer> p = basicPublisher();
811 jsr166 1.7 long startTime = System.nanoTime();
812 jsr166 1.4 assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null));
813 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
814 dl 1.1 }
815    
816     /**
817 jsr166 1.3 * Timed offer with null item or TimeUnit throws NPE
818 dl 1.1 */
819     public void testNullTimedOffer() {
820     SubmissionPublisher<Integer> p = basicPublisher();
821 jsr166 1.7 long startTime = System.nanoTime();
822 dl 1.1 try {
823 jsr166 1.7 p.offer(null, LONG_DELAY_MS, MILLISECONDS, null);
824 jsr166 1.2 shouldThrow();
825     } catch (NullPointerException success) {}
826 dl 1.1 try {
827 jsr166 1.7 p.offer(1, LONG_DELAY_MS, null, null);
828 jsr166 1.2 shouldThrow();
829     } catch (NullPointerException success) {}
830 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
831 dl 1.1 }
832    
833     /**
834 jsr166 1.3 * Timed offer returns number of lagged items if not saturated
835 dl 1.1 */
836     public void testLaggedTimedOffer() {
837     SubmissionPublisher<Integer> p = basicPublisher();
838     TestSubscriber s1 = new TestSubscriber();
839     s1.request = false;
840     TestSubscriber s2 = new TestSubscriber();
841     s2.request = false;
842     p.subscribe(s1);
843     p.subscribe(s2);
844     s2.awaitSubscribe();
845     s1.awaitSubscribe();
846 jsr166 1.7 long startTime = System.nanoTime();
847     assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
848     assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
849 dl 1.1 s1.sn.request(4);
850 jsr166 1.7 assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
851 dl 1.1 s2.sn.request(4);
852 jsr166 1.7 p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
853 dl 1.1 p.close();
854     s2.awaitComplete();
855 jsr166 1.6 assertEquals(4, s2.nexts);
856 dl 1.1 s1.awaitComplete();
857 jsr166 1.6 assertEquals(4, s2.nexts);
858 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
859 dl 1.1 }
860    
861     /**
862 jsr166 1.3 * Timed offer reports drops if saturated
863 dl 1.1 */
864     public void testDroppedTimedOffer() {
865     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
866     basicExecutor, 4);
867     TestSubscriber s1 = new TestSubscriber();
868     s1.request = false;
869     TestSubscriber s2 = new TestSubscriber();
870     s2.request = false;
871     p.subscribe(s1);
872     p.subscribe(s2);
873     s2.awaitSubscribe();
874     s1.awaitSubscribe();
875 dl 1.9 long delay = timeoutMillis();
876 dl 1.1 for (int i = 1; i <= 4; ++i)
877 dl 1.9 assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0);
878     long startTime = System.nanoTime();
879     assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0);
880 dl 1.1 s1.sn.request(64);
881 dl 1.9 assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0);
882     // 2 * delay should elapse but check only 1 * delay to allow timer slop
883     assertTrue(millisElapsedSince(startTime) >= delay);
884 dl 1.1 s2.sn.request(64);
885     p.close();
886     s2.awaitComplete();
887     assertTrue(s2.nexts >= 2);
888     s1.awaitComplete();
889     assertTrue(s1.nexts >= 2);
890     }
891    
892     /**
893 jsr166 1.3 * Timed offer invokes drop handler if saturated
894 dl 1.1 */
895     public void testHandledDroppedTimedOffer() {
896     AtomicInteger calls = new AtomicInteger();
897     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
898     basicExecutor, 4);
899     TestSubscriber s1 = new TestSubscriber();
900     s1.request = false;
901     TestSubscriber s2 = new TestSubscriber();
902     s2.request = false;
903     p.subscribe(s1);
904     p.subscribe(s2);
905     s2.awaitSubscribe();
906     s1.awaitSubscribe();
907 dl 1.9 long delay = timeoutMillis();
908 dl 1.1 for (int i = 1; i <= 4; ++i)
909 dl 1.9 assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
910     long startTime = System.nanoTime();
911     assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
912 dl 1.1 s1.sn.request(64);
913 dl 1.9 assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
914     assertTrue(millisElapsedSince(startTime) >= delay);
915 dl 1.1 s2.sn.request(64);
916     p.close();
917     s2.awaitComplete();
918     s1.awaitComplete();
919     assertTrue(calls.get() >= 2);
920     }
921    
922     /**
923 jsr166 1.3 * Timed offer succeeds if drop handler forces request
924 dl 1.1 */
925     public void testRecoveredHandledDroppedTimedOffer() {
926     AtomicInteger calls = new AtomicInteger();
927     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
928     basicExecutor, 4);
929     TestSubscriber s1 = new TestSubscriber();
930     s1.request = false;
931     TestSubscriber s2 = new TestSubscriber();
932     s2.request = false;
933     p.subscribe(s1);
934     p.subscribe(s2);
935     s2.awaitSubscribe();
936     s1.awaitSubscribe();
937     int n = 0;
938 dl 1.9 long delay = timeoutMillis();
939     long startTime = System.nanoTime();
940     for (int i = 1; i <= 6; ++i) {
941     int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
942 dl 1.1 n = n + 2 + (d < 0 ? d : 0);
943     }
944 dl 1.9 assertTrue(millisElapsedSince(startTime) >= delay);
945 dl 1.1 p.close();
946     s2.awaitComplete();
947     s1.awaitComplete();
948 jsr166 1.6 assertEquals(n, s1.nexts + s2.nexts);
949 dl 1.1 assertTrue(calls.get() >= 2);
950     }
951    
952 dl 1.10 /**
953     * consume returns a CompletableFuture that is done when
954     * publisher completes
955     */
956     public void testConsume() {
957     AtomicInteger sum = new AtomicInteger();
958     SubmissionPublisher<Integer> p = basicPublisher();
959     CompletableFuture<Void> f =
960     p.consume((Integer x) -> { sum.getAndAdd(x.intValue()); });
961     int n = 20;
962     for (int i = 1; i <= n; ++i)
963     p.submit(i);
964     p.close();
965     f.join();
966     assertEquals((n * (n + 1)) / 2, sum.get());
967     }
968    
969     /**
970     * consume(null) throws NPE
971     */
972     public void testConsumeNPE() {
973     SubmissionPublisher<Integer> p = basicPublisher();
974     try {
975     CompletableFuture<Void> f = p.consume(null);
976     shouldThrow();
977     } catch(NullPointerException success) {
978     }
979     }
980    
981     /**
982     * consume eventually stops processing published items if cancelled
983     */
984     public void testCancelledConsume() {
985     AtomicInteger count = new AtomicInteger();
986     SubmissionPublisher<Integer> p = basicPublisher();
987     CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
988     f.cancel(true);
989     int n = 1000000; // arbitrary limit
990     for (int i = 1; i <= n; ++i)
991     p.submit(i);
992     assertTrue(count.get() < n);
993     }
994    
995 dl 1.1 }