ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SubmissionPublisherTest.java
Revision: 1.15
Committed: Sun Nov 6 22:50:32 2016 UTC (7 years, 5 months ago) by jsr166
Branch: MAIN
Changes since 1.14: +1 -1 lines
Log Message:
elide unnecessary braces

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea and Martin Buchholz with assistance from
3     * members of JCP JSR-166 Expert Group and released to the public
4     * domain, as explained at
5     * http://creativecommons.org/publicdomain/zero/1.0/
6     */
7    
8 dl 1.10 import java.util.concurrent.CompletableFuture;
9 dl 1.1 import java.util.concurrent.Executor;
10     import java.util.concurrent.Executors;
11     import java.util.concurrent.Flow;
12 dl 1.10 import java.util.concurrent.ForkJoinPool;
13 dl 1.1 import java.util.concurrent.SubmissionPublisher;
14     import java.util.concurrent.atomic.AtomicInteger;
15     import junit.framework.Test;
16     import junit.framework.TestSuite;
17    
18 dl 1.10 import static java.util.concurrent.Flow.Subscriber;
19     import static java.util.concurrent.Flow.Subscription;
20     import static java.util.concurrent.TimeUnit.MILLISECONDS;
21    
22 dl 1.1 public class SubmissionPublisherTest extends JSR166TestCase {
23    
24     public static void main(String[] args) {
25     main(suite(), args);
26     }
27     public static Test suite() {
28     return new TestSuite(SubmissionPublisherTest.class);
29     }
30    
31 dl 1.12 final Executor basicExecutor = basicPublisher().getExecutor();
32 jsr166 1.13
33 dl 1.1 static SubmissionPublisher<Integer> basicPublisher() {
34 dl 1.12 return new SubmissionPublisher<Integer>();
35 dl 1.1 }
36 jsr166 1.2
37 dl 1.1 static class SPException extends RuntimeException {}
38    
39     class TestSubscriber implements Subscriber<Integer> {
40     volatile Subscription sn;
41     int last; // Requires that onNexts are in numeric order
42     volatile int nexts;
43     volatile int errors;
44     volatile int completes;
45     volatile boolean throwOnCall = false;
46     volatile boolean request = true;
47     volatile Throwable lastError;
48    
49     public synchronized void onSubscribe(Subscription s) {
50     threadAssertTrue(sn == null);
51     sn = s;
52     notifyAll();
53     if (throwOnCall)
54     throw new SPException();
55     if (request)
56     sn.request(1L);
57     }
58     public synchronized void onNext(Integer t) {
59     ++nexts;
60     notifyAll();
61     int current = t.intValue();
62     threadAssertTrue(current >= last);
63     last = current;
64     if (request)
65     sn.request(1L);
66     if (throwOnCall)
67     throw new SPException();
68     }
69     public synchronized void onError(Throwable t) {
70     threadAssertTrue(completes == 0);
71     threadAssertTrue(errors == 0);
72     lastError = t;
73     ++errors;
74     notifyAll();
75     }
76     public synchronized void onComplete() {
77     threadAssertTrue(completes == 0);
78     ++completes;
79     notifyAll();
80     }
81    
82     synchronized void awaitSubscribe() {
83     while (sn == null) {
84     try {
85     wait();
86     } catch (Exception ex) {
87     threadUnexpectedException(ex);
88     break;
89     }
90     }
91     }
92     synchronized void awaitNext(int n) {
93     while (nexts < n) {
94     try {
95     wait();
96     } catch (Exception ex) {
97     threadUnexpectedException(ex);
98     break;
99     }
100     }
101     }
102     synchronized void awaitComplete() {
103     while (completes == 0 && errors == 0) {
104     try {
105     wait();
106     } catch (Exception ex) {
107     threadUnexpectedException(ex);
108     break;
109     }
110     }
111     }
112     synchronized void awaitError() {
113     while (errors == 0) {
114     try {
115     wait();
116     } catch (Exception ex) {
117     threadUnexpectedException(ex);
118     break;
119     }
120     }
121     }
122    
123     }
124    
125     /**
126     * A new SubmissionPublisher has no subscribers, a non-null
127     * executor, a power-of-two capacity, is not closed, and reports
128     * zero demand and lag
129     */
130     void checkInitialState(SubmissionPublisher<?> p) {
131     assertFalse(p.hasSubscribers());
132 jsr166 1.6 assertEquals(0, p.getNumberOfSubscribers());
133 dl 1.1 assertTrue(p.getSubscribers().isEmpty());
134     assertFalse(p.isClosed());
135     assertNull(p.getClosedException());
136     int n = p.getMaxBufferCapacity();
137     assertTrue((n & (n - 1)) == 0); // power of two
138     assertNotNull(p.getExecutor());
139 jsr166 1.6 assertEquals(0, p.estimateMinimumDemand());
140     assertEquals(0, p.estimateMaximumLag());
141 dl 1.1 }
142    
143     /**
144     * A default-constructed SubmissionPublisher has no subscribers,
145     * is not closed, has default buffer size, and uses the
146 dl 1.12 * defaultExecutor
147 dl 1.1 */
148     public void testConstructor1() {
149     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>();
150     checkInitialState(p);
151     assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
152 dl 1.12 Executor e = p.getExecutor(), c = ForkJoinPool.commonPool();
153     if (ForkJoinPool.getCommonPoolParallelism() > 1)
154     assertSame(e, c);
155     else
156     assertNotSame(e, c);
157 dl 1.1 }
158    
159     /**
160     * A new SubmissionPublisher has no subscribers, is not closed,
161     * has the given buffer size, and uses the given executor
162     */
163     public void testConstructor2() {
164     Executor e = Executors.newFixedThreadPool(1);
165     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(e, 8);
166     checkInitialState(p);
167     assertSame(p.getExecutor(), e);
168 jsr166 1.6 assertEquals(8, p.getMaxBufferCapacity());
169 dl 1.1 }
170    
171     /**
172     * A null Executor argument to SubmissionPublisher constructor throws NPE
173     */
174     public void testConstructor3() {
175     try {
176 jsr166 1.2 new SubmissionPublisher<Integer>(null, 8);
177 dl 1.1 shouldThrow();
178 jsr166 1.2 } catch (NullPointerException success) {}
179 dl 1.1 }
180    
181     /**
182     * A negative capacity argument to SubmissionPublisher constructor
183     * throws IAE
184     */
185     public void testConstructor4() {
186     Executor e = Executors.newFixedThreadPool(1);
187     try {
188 jsr166 1.2 new SubmissionPublisher<Integer>(e, -1);
189 dl 1.1 shouldThrow();
190 jsr166 1.2 } catch (IllegalArgumentException success) {}
191 dl 1.1 }
192    
193     /**
194     * A closed publisher reports isClosed with no closedException and
195     * throws ISE upon attempted submission; a subsequent close or
196     * closeExceptionally has no additional effect.
197     */
198     public void testClose() {
199     SubmissionPublisher<Integer> p = basicPublisher();
200     checkInitialState(p);
201     p.close();
202     assertTrue(p.isClosed());
203     assertNull(p.getClosedException());
204     try {
205     p.submit(1);
206     shouldThrow();
207 jsr166 1.2 } catch (IllegalStateException success) {}
208 dl 1.1 Throwable ex = new SPException();
209     p.closeExceptionally(ex);
210     assertTrue(p.isClosed());
211     assertNull(p.getClosedException());
212     }
213    
214     /**
215     * A publisher closedExceptionally reports isClosed with the
216     * closedException and throws ISE upon attempted submission; a
217     * subsequent close or closeExceptionally has no additional
218     * effect.
219     */
220     public void testCloseExceptionally() {
221     SubmissionPublisher<Integer> p = basicPublisher();
222     checkInitialState(p);
223     Throwable ex = new SPException();
224     p.closeExceptionally(ex);
225     assertTrue(p.isClosed());
226     assertSame(p.getClosedException(), ex);
227     try {
228     p.submit(1);
229     shouldThrow();
230 jsr166 1.2 } catch (IllegalStateException success) {}
231 dl 1.1 p.close();
232     assertTrue(p.isClosed());
233     assertSame(p.getClosedException(), ex);
234     }
235    
236     /**
237     * Upon subscription, the subscriber's onSubscribe is called, no
238     * other Subscriber methods are invoked, the publisher
239     * hasSubscribers, isSubscribed is true, and existing
240     * subscriptions are unaffected.
241     */
242     public void testSubscribe1() {
243     TestSubscriber s = new TestSubscriber();
244     SubmissionPublisher<Integer> p = basicPublisher();
245     p.subscribe(s);
246     assertTrue(p.hasSubscribers());
247 jsr166 1.6 assertEquals(1, p.getNumberOfSubscribers());
248 dl 1.1 assertTrue(p.getSubscribers().contains(s));
249     assertTrue(p.isSubscribed(s));
250     s.awaitSubscribe();
251     assertNotNull(s.sn);
252 jsr166 1.6 assertEquals(0, s.nexts);
253     assertEquals(0, s.errors);
254     assertEquals(0, s.completes);
255 dl 1.1 TestSubscriber s2 = new TestSubscriber();
256     p.subscribe(s2);
257     assertTrue(p.hasSubscribers());
258 jsr166 1.6 assertEquals(2, p.getNumberOfSubscribers());
259 dl 1.1 assertTrue(p.getSubscribers().contains(s));
260     assertTrue(p.getSubscribers().contains(s2));
261     assertTrue(p.isSubscribed(s));
262     assertTrue(p.isSubscribed(s2));
263     s2.awaitSubscribe();
264     assertNotNull(s2.sn);
265 jsr166 1.6 assertEquals(0, s2.nexts);
266     assertEquals(0, s2.errors);
267     assertEquals(0, s2.completes);
268 dl 1.9 p.close();
269 dl 1.1 }
270    
271     /**
272     * If closed, upon subscription, the subscriber's onComplete
273     * method is invoked
274     */
275     public void testSubscribe2() {
276     TestSubscriber s = new TestSubscriber();
277     SubmissionPublisher<Integer> p = basicPublisher();
278     p.close();
279     p.subscribe(s);
280     s.awaitComplete();
281 jsr166 1.6 assertEquals(0, s.nexts);
282     assertEquals(0, s.errors);
283     assertEquals(1, s.completes, 1);
284 dl 1.1 }
285    
286     /**
287     * If closedExceptionally, upon subscription, the subscriber's
288     * onError method is invoked
289     */
290     public void testSubscribe3() {
291     TestSubscriber s = new TestSubscriber();
292     SubmissionPublisher<Integer> p = basicPublisher();
293     Throwable ex = new SPException();
294     p.closeExceptionally(ex);
295     assertTrue(p.isClosed());
296     assertSame(p.getClosedException(), ex);
297     p.subscribe(s);
298     s.awaitError();
299 jsr166 1.6 assertEquals(0, s.nexts);
300     assertEquals(1, s.errors);
301 dl 1.1 }
302    
303     /**
304     * Upon attempted resubscription, the subscriber's onError is
305     * called and the subscription is cancelled.
306     */
307     public void testSubscribe4() {
308     TestSubscriber s = new TestSubscriber();
309     SubmissionPublisher<Integer> p = basicPublisher();
310     p.subscribe(s);
311     assertTrue(p.hasSubscribers());
312 jsr166 1.6 assertEquals(1, p.getNumberOfSubscribers());
313 dl 1.1 assertTrue(p.getSubscribers().contains(s));
314     assertTrue(p.isSubscribed(s));
315     s.awaitSubscribe();
316     assertNotNull(s.sn);
317 jsr166 1.6 assertEquals(0, s.nexts);
318     assertEquals(0, s.errors);
319     assertEquals(0, s.completes);
320 dl 1.1 p.subscribe(s);
321     s.awaitError();
322 jsr166 1.6 assertEquals(0, s.nexts);
323     assertEquals(1, s.errors);
324 dl 1.1 assertFalse(p.isSubscribed(s));
325     }
326    
327     /**
328     * An exception thrown in onSubscribe causes onError
329     */
330     public void testSubscribe5() {
331     TestSubscriber s = new TestSubscriber();
332     SubmissionPublisher<Integer> p = basicPublisher();
333     s.throwOnCall = true;
334     try {
335     p.subscribe(s);
336 jsr166 1.2 } catch (Exception ok) {}
337 dl 1.1 s.awaitError();
338 jsr166 1.6 assertEquals(0, s.nexts);
339     assertEquals(1, s.errors);
340     assertEquals(0, s.completes);
341 dl 1.1 }
342    
343     /**
344 jsr166 1.8 * subscribe(null) throws NPE
345 dl 1.1 */
346     public void testSubscribe6() {
347     SubmissionPublisher<Integer> p = basicPublisher();
348     try {
349     p.subscribe(null);
350 jsr166 1.2 shouldThrow();
351     } catch (NullPointerException success) {}
352 dl 1.1 checkInitialState(p);
353     }
354    
355     /**
356     * Closing a publisher causes onComplete to subscribers
357     */
358     public void testCloseCompletes() {
359     SubmissionPublisher<Integer> p = basicPublisher();
360     TestSubscriber s1 = new TestSubscriber();
361     TestSubscriber s2 = new TestSubscriber();
362     p.subscribe(s1);
363     p.subscribe(s2);
364     p.submit(1);
365     p.close();
366     assertTrue(p.isClosed());
367     assertNull(p.getClosedException());
368     s1.awaitComplete();
369 jsr166 1.6 assertEquals(1, s1.nexts);
370     assertEquals(1, s1.completes);
371 dl 1.1 s2.awaitComplete();
372 jsr166 1.6 assertEquals(1, s2.nexts);
373     assertEquals(1, s2.completes);
374 dl 1.1 }
375    
376     /**
377     * Closing a publisher exceptionally causes onError to subscribers
378     */
379     public void testCloseExceptionallyError() {
380     SubmissionPublisher<Integer> p = basicPublisher();
381     TestSubscriber s1 = new TestSubscriber();
382     TestSubscriber s2 = new TestSubscriber();
383     p.subscribe(s1);
384     p.subscribe(s2);
385     p.submit(1);
386     p.closeExceptionally(new SPException());
387     assertTrue(p.isClosed());
388     s1.awaitError();
389     assertTrue(s1.nexts <= 1);
390 jsr166 1.6 assertEquals(1, s1.errors);
391 dl 1.1 s2.awaitError();
392     assertTrue(s2.nexts <= 1);
393 jsr166 1.6 assertEquals(1, s2.errors);
394 dl 1.1 }
395    
396     /**
397     * Cancelling a subscription eventually causes no more onNexts to be issued
398     */
399     public void testCancel() {
400     SubmissionPublisher<Integer> p = basicPublisher();
401     TestSubscriber s1 = new TestSubscriber();
402     TestSubscriber s2 = new TestSubscriber();
403     p.subscribe(s1);
404     p.subscribe(s2);
405     s1.awaitSubscribe();
406     p.submit(1);
407     s1.sn.cancel();
408     for (int i = 2; i <= 20; ++i)
409     p.submit(i);
410     p.close();
411     s2.awaitComplete();
412 jsr166 1.6 assertEquals(20, s2.nexts);
413     assertEquals(1, s2.completes);
414 dl 1.1 assertTrue(s1.nexts < 20);
415     assertFalse(p.isSubscribed(s1));
416     }
417    
418     /**
419     * Throwing an exception in onNext causes onError
420     */
421     public void testThrowOnNext() {
422     SubmissionPublisher<Integer> p = basicPublisher();
423     TestSubscriber s1 = new TestSubscriber();
424     TestSubscriber s2 = new TestSubscriber();
425     p.subscribe(s1);
426     p.subscribe(s2);
427     s1.awaitSubscribe();
428     p.submit(1);
429     s1.throwOnCall = true;
430     p.submit(2);
431     p.close();
432     s2.awaitComplete();
433 jsr166 1.6 assertEquals(2, s2.nexts);
434 dl 1.1 s1.awaitComplete();
435 jsr166 1.6 assertEquals(1, s1.errors);
436 dl 1.1 }
437    
438     /**
439 jsr166 1.8 * If a handler is supplied in constructor, it is invoked when
440 dl 1.1 * subscriber throws an exception in onNext
441     */
442     public void testThrowOnNextHandler() {
443     AtomicInteger calls = new AtomicInteger();
444     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>
445     (basicExecutor, 8,
446     (s, e) -> calls.getAndIncrement());
447     TestSubscriber s1 = new TestSubscriber();
448     TestSubscriber s2 = new TestSubscriber();
449     p.subscribe(s1);
450     p.subscribe(s2);
451     s1.awaitSubscribe();
452     p.submit(1);
453     s1.throwOnCall = true;
454     p.submit(2);
455     p.close();
456     s2.awaitComplete();
457 jsr166 1.6 assertEquals(2, s2.nexts);
458     assertEquals(1, s2.completes);
459 dl 1.1 s1.awaitError();
460 jsr166 1.6 assertEquals(1, s1.errors);
461     assertEquals(1, calls.get());
462 dl 1.1 }
463    
464     /**
465     * onNext items are issued in the same order to each subscriber
466     */
467     public void testOrder() {
468     SubmissionPublisher<Integer> p = basicPublisher();
469     TestSubscriber s1 = new TestSubscriber();
470     TestSubscriber s2 = new TestSubscriber();
471     p.subscribe(s1);
472     p.subscribe(s2);
473     for (int i = 1; i <= 20; ++i)
474     p.submit(i);
475     p.close();
476     s2.awaitComplete();
477     s1.awaitComplete();
478 jsr166 1.6 assertEquals(20, s2.nexts);
479     assertEquals(1, s2.completes);
480     assertEquals(20, s1.nexts);
481     assertEquals(1, s1.completes);
482 dl 1.1 }
483    
484     /**
485     * onNext is issued only if requested
486     */
487     public void testRequest1() {
488     SubmissionPublisher<Integer> p = basicPublisher();
489     TestSubscriber s1 = new TestSubscriber();
490     s1.request = false;
491     p.subscribe(s1);
492     s1.awaitSubscribe();
493     assertTrue(p.estimateMinimumDemand() == 0);
494     TestSubscriber s2 = new TestSubscriber();
495     p.subscribe(s2);
496     p.submit(1);
497     p.submit(2);
498     s2.awaitNext(1);
499 jsr166 1.6 assertEquals(0, s1.nexts);
500 dl 1.1 s1.sn.request(3);
501     p.submit(3);
502     p.close();
503     s2.awaitComplete();
504 jsr166 1.6 assertEquals(3, s2.nexts);
505     assertEquals(1, s2.completes);
506 dl 1.1 s1.awaitComplete();
507     assertTrue(s1.nexts > 0);
508 jsr166 1.6 assertEquals(1, s1.completes);
509 dl 1.1 }
510    
511     /**
512     * onNext is not issued when requests become zero
513     */
514     public void testRequest2() {
515     SubmissionPublisher<Integer> p = basicPublisher();
516     TestSubscriber s1 = new TestSubscriber();
517     TestSubscriber s2 = new TestSubscriber();
518     p.subscribe(s1);
519     p.subscribe(s2);
520     s2.awaitSubscribe();
521     s1.awaitSubscribe();
522     s1.request = false;
523     p.submit(1);
524     p.submit(2);
525     p.close();
526     s2.awaitComplete();
527 jsr166 1.6 assertEquals(2, s2.nexts);
528     assertEquals(1, s2.completes);
529 dl 1.1 s1.awaitNext(1);
530 jsr166 1.6 assertEquals(1, s1.nexts);
531 dl 1.1 }
532    
533     /**
534     * Negative request causes error
535     */
536     public void testRequest3() {
537     SubmissionPublisher<Integer> p = basicPublisher();
538     TestSubscriber s1 = new TestSubscriber();
539     TestSubscriber s2 = new TestSubscriber();
540     p.subscribe(s1);
541     p.subscribe(s2);
542     s2.awaitSubscribe();
543     s1.awaitSubscribe();
544     s1.sn.request(-1L);
545     p.submit(1);
546     p.submit(2);
547     p.close();
548     s2.awaitComplete();
549 jsr166 1.6 assertEquals(2, s2.nexts);
550     assertEquals(1, s2.completes);
551 dl 1.1 s1.awaitError();
552 jsr166 1.6 assertEquals(1, s1.errors);
553 dl 1.1 assertTrue(s1.lastError instanceof IllegalArgumentException);
554     }
555    
556     /**
557     * estimateMinimumDemand reports 0 until request, nonzero after
558     * request, and zero again after delivery
559     */
560     public void testEstimateMinimumDemand() {
561     TestSubscriber s = new TestSubscriber();
562     SubmissionPublisher<Integer> p = basicPublisher();
563     s.request = false;
564     p.subscribe(s);
565     s.awaitSubscribe();
566 jsr166 1.6 assertEquals(0, p.estimateMinimumDemand());
567 dl 1.1 s.sn.request(1);
568 jsr166 1.6 assertEquals(1, p.estimateMinimumDemand());
569 dl 1.1 p.submit(1);
570     s.awaitNext(1);
571 jsr166 1.6 assertEquals(0, p.estimateMinimumDemand());
572 dl 1.1 }
573    
574     /**
575 jsr166 1.3 * submit to a publisher with no subscribers returns lag 0
576 dl 1.1 */
577     public void testEmptySubmit() {
578     SubmissionPublisher<Integer> p = basicPublisher();
579 jsr166 1.6 assertEquals(0, p.submit(1));
580 dl 1.1 }
581    
582     /**
583 jsr166 1.3 * submit(null) throws NPE
584 dl 1.1 */
585     public void testNullSubmit() {
586     SubmissionPublisher<Integer> p = basicPublisher();
587     try {
588     p.submit(null);
589 jsr166 1.2 shouldThrow();
590     } catch (NullPointerException success) {}
591 dl 1.1 }
592    
593     /**
594 jsr166 1.3 * submit returns number of lagged items, compatible with result
595 dl 1.1 * of estimateMaximumLag.
596     */
597     public void testLaggedSubmit() {
598     SubmissionPublisher<Integer> p = basicPublisher();
599     TestSubscriber s1 = new TestSubscriber();
600     s1.request = false;
601     TestSubscriber s2 = new TestSubscriber();
602     s2.request = false;
603     p.subscribe(s1);
604     p.subscribe(s2);
605     s2.awaitSubscribe();
606     s1.awaitSubscribe();
607 jsr166 1.6 assertEquals(1, p.submit(1));
608 dl 1.1 assertTrue(p.estimateMaximumLag() >= 1);
609     assertTrue(p.submit(2) >= 2);
610     assertTrue(p.estimateMaximumLag() >= 2);
611     s1.sn.request(4);
612     assertTrue(p.submit(3) >= 3);
613     assertTrue(p.estimateMaximumLag() >= 3);
614     s2.sn.request(4);
615     p.submit(4);
616     p.close();
617     s2.awaitComplete();
618 jsr166 1.6 assertEquals(4, s2.nexts);
619 dl 1.1 s1.awaitComplete();
620 jsr166 1.6 assertEquals(4, s2.nexts);
621 dl 1.1 }
622    
623     /**
624     * submit eventually issues requested items when buffer capacity is 1
625     */
626     public void testCap1Submit() {
627     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
628     basicExecutor, 1);
629     TestSubscriber s1 = new TestSubscriber();
630     TestSubscriber s2 = new TestSubscriber();
631     p.subscribe(s1);
632     p.subscribe(s2);
633     for (int i = 1; i <= 20; ++i) {
634     assertTrue(p.estimateMinimumDemand() <= 1);
635     assertTrue(p.submit(i) >= 0);
636     }
637     p.close();
638     s2.awaitComplete();
639     s1.awaitComplete();
640 jsr166 1.6 assertEquals(20, s2.nexts);
641     assertEquals(1, s2.completes);
642     assertEquals(20, s1.nexts);
643     assertEquals(1, s1.completes);
644 dl 1.1 }
645 jsr166 1.2
646 dl 1.1 static boolean noopHandle(AtomicInteger count) {
647     count.getAndIncrement();
648     return false;
649     }
650    
651     static boolean reqHandle(AtomicInteger count, Subscriber s) {
652     count.getAndIncrement();
653     ((TestSubscriber)s).sn.request(Long.MAX_VALUE);
654     return true;
655     }
656    
657     /**
658 jsr166 1.3 * offer to a publisher with no subscribers returns lag 0
659 dl 1.1 */
660     public void testEmptyOffer() {
661     SubmissionPublisher<Integer> p = basicPublisher();
662 jsr166 1.3 assertEquals(0, p.offer(1, null));
663 dl 1.1 }
664    
665     /**
666 jsr166 1.3 * offer(null) throws NPE
667 dl 1.1 */
668     public void testNullOffer() {
669     SubmissionPublisher<Integer> p = basicPublisher();
670     try {
671     p.offer(null, null);
672 jsr166 1.2 shouldThrow();
673     } catch (NullPointerException success) {}
674 dl 1.1 }
675    
676     /**
677 jsr166 1.3 * offer returns number of lagged items if not saturated
678 dl 1.1 */
679     public void testLaggedOffer() {
680     SubmissionPublisher<Integer> p = basicPublisher();
681     TestSubscriber s1 = new TestSubscriber();
682     s1.request = false;
683     TestSubscriber s2 = new TestSubscriber();
684     s2.request = false;
685     p.subscribe(s1);
686     p.subscribe(s2);
687     s2.awaitSubscribe();
688     s1.awaitSubscribe();
689     assertTrue(p.offer(1, null) >= 1);
690     assertTrue(p.offer(2, null) >= 2);
691     s1.sn.request(4);
692     assertTrue(p.offer(3, null) >= 3);
693     s2.sn.request(4);
694     p.offer(4, null);
695     p.close();
696     s2.awaitComplete();
697 jsr166 1.6 assertEquals(4, s2.nexts);
698 dl 1.1 s1.awaitComplete();
699 jsr166 1.6 assertEquals(4, s2.nexts);
700 dl 1.1 }
701    
702     /**
703 jsr166 1.3 * offer reports drops if saturated
704 dl 1.1 */
705     public void testDroppedOffer() {
706     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
707     basicExecutor, 4);
708     TestSubscriber s1 = new TestSubscriber();
709     s1.request = false;
710     TestSubscriber s2 = new TestSubscriber();
711     s2.request = false;
712     p.subscribe(s1);
713     p.subscribe(s2);
714     s2.awaitSubscribe();
715     s1.awaitSubscribe();
716     for (int i = 1; i <= 4; ++i)
717     assertTrue(p.offer(i, null) >= 0);
718     p.offer(5, null);
719     assertTrue(p.offer(6, null) < 0);
720     s1.sn.request(64);
721     assertTrue(p.offer(7, null) < 0);
722     s2.sn.request(64);
723     p.close();
724     s2.awaitComplete();
725     assertTrue(s2.nexts >= 4);
726     s1.awaitComplete();
727     assertTrue(s1.nexts >= 4);
728     }
729    
730     /**
731 jsr166 1.3 * offer invokes drop handler if saturated
732 dl 1.1 */
733     public void testHandledDroppedOffer() {
734     AtomicInteger calls = new AtomicInteger();
735     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
736     basicExecutor, 4);
737     TestSubscriber s1 = new TestSubscriber();
738     s1.request = false;
739     TestSubscriber s2 = new TestSubscriber();
740     s2.request = false;
741     p.subscribe(s1);
742     p.subscribe(s2);
743     s2.awaitSubscribe();
744     s1.awaitSubscribe();
745     for (int i = 1; i <= 4; ++i)
746     assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
747     p.offer(4, (s, x) -> noopHandle(calls));
748     assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
749     s1.sn.request(64);
750     assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
751     s2.sn.request(64);
752     p.close();
753     s2.awaitComplete();
754     s1.awaitComplete();
755     assertTrue(calls.get() >= 4);
756     }
757    
758     /**
759 jsr166 1.3 * offer succeeds if drop handler forces request
760 dl 1.1 */
761     public void testRecoveredHandledDroppedOffer() {
762     AtomicInteger calls = new AtomicInteger();
763     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
764     basicExecutor, 4);
765     TestSubscriber s1 = new TestSubscriber();
766     s1.request = false;
767     TestSubscriber s2 = new TestSubscriber();
768     s2.request = false;
769     p.subscribe(s1);
770     p.subscribe(s2);
771     s2.awaitSubscribe();
772     s1.awaitSubscribe();
773     int n = 0;
774     for (int i = 1; i <= 8; ++i) {
775     int d = p.offer(i, (s, x) -> reqHandle(calls, s));
776     n = n + 2 + (d < 0 ? d : 0);
777     }
778     p.close();
779     s2.awaitComplete();
780     s1.awaitComplete();
781 jsr166 1.6 assertEquals(n, s1.nexts + s2.nexts);
782 dl 1.1 assertTrue(calls.get() >= 2);
783     }
784    
785     /**
786 jsr166 1.4 * Timed offer to a publisher with no subscribers returns lag 0
787 dl 1.1 */
788     public void testEmptyTimedOffer() {
789     SubmissionPublisher<Integer> p = basicPublisher();
790 jsr166 1.7 long startTime = System.nanoTime();
791 jsr166 1.4 assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null));
792 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
793 dl 1.1 }
794    
795     /**
796 jsr166 1.3 * Timed offer with null item or TimeUnit throws NPE
797 dl 1.1 */
798     public void testNullTimedOffer() {
799     SubmissionPublisher<Integer> p = basicPublisher();
800 jsr166 1.7 long startTime = System.nanoTime();
801 dl 1.1 try {
802 jsr166 1.7 p.offer(null, LONG_DELAY_MS, MILLISECONDS, null);
803 jsr166 1.2 shouldThrow();
804     } catch (NullPointerException success) {}
805 dl 1.1 try {
806 jsr166 1.7 p.offer(1, LONG_DELAY_MS, null, null);
807 jsr166 1.2 shouldThrow();
808     } catch (NullPointerException success) {}
809 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
810 dl 1.1 }
811    
812     /**
813 jsr166 1.3 * Timed offer returns number of lagged items if not saturated
814 dl 1.1 */
815     public void testLaggedTimedOffer() {
816     SubmissionPublisher<Integer> p = basicPublisher();
817     TestSubscriber s1 = new TestSubscriber();
818     s1.request = false;
819     TestSubscriber s2 = new TestSubscriber();
820     s2.request = false;
821     p.subscribe(s1);
822     p.subscribe(s2);
823     s2.awaitSubscribe();
824     s1.awaitSubscribe();
825 jsr166 1.7 long startTime = System.nanoTime();
826     assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
827     assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
828 dl 1.1 s1.sn.request(4);
829 jsr166 1.7 assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
830 dl 1.1 s2.sn.request(4);
831 jsr166 1.7 p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
832 dl 1.1 p.close();
833     s2.awaitComplete();
834 jsr166 1.6 assertEquals(4, s2.nexts);
835 dl 1.1 s1.awaitComplete();
836 jsr166 1.6 assertEquals(4, s2.nexts);
837 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
838 dl 1.1 }
839    
840     /**
841 jsr166 1.3 * Timed offer reports drops if saturated
842 dl 1.1 */
843     public void testDroppedTimedOffer() {
844     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
845     basicExecutor, 4);
846     TestSubscriber s1 = new TestSubscriber();
847     s1.request = false;
848     TestSubscriber s2 = new TestSubscriber();
849     s2.request = false;
850     p.subscribe(s1);
851     p.subscribe(s2);
852     s2.awaitSubscribe();
853     s1.awaitSubscribe();
854 dl 1.9 long delay = timeoutMillis();
855 dl 1.1 for (int i = 1; i <= 4; ++i)
856 dl 1.9 assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0);
857     long startTime = System.nanoTime();
858     assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0);
859 dl 1.1 s1.sn.request(64);
860 dl 1.9 assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0);
861     // 2 * delay should elapse but check only 1 * delay to allow timer slop
862     assertTrue(millisElapsedSince(startTime) >= delay);
863 dl 1.1 s2.sn.request(64);
864     p.close();
865     s2.awaitComplete();
866     assertTrue(s2.nexts >= 2);
867     s1.awaitComplete();
868     assertTrue(s1.nexts >= 2);
869     }
870    
871     /**
872 jsr166 1.3 * Timed offer invokes drop handler if saturated
873 dl 1.1 */
874     public void testHandledDroppedTimedOffer() {
875     AtomicInteger calls = new AtomicInteger();
876     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
877     basicExecutor, 4);
878     TestSubscriber s1 = new TestSubscriber();
879     s1.request = false;
880     TestSubscriber s2 = new TestSubscriber();
881     s2.request = false;
882     p.subscribe(s1);
883     p.subscribe(s2);
884     s2.awaitSubscribe();
885     s1.awaitSubscribe();
886 dl 1.9 long delay = timeoutMillis();
887 dl 1.1 for (int i = 1; i <= 4; ++i)
888 dl 1.9 assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
889     long startTime = System.nanoTime();
890     assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
891 dl 1.1 s1.sn.request(64);
892 dl 1.9 assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
893     assertTrue(millisElapsedSince(startTime) >= delay);
894 dl 1.1 s2.sn.request(64);
895     p.close();
896     s2.awaitComplete();
897     s1.awaitComplete();
898     assertTrue(calls.get() >= 2);
899     }
900    
901     /**
902 jsr166 1.3 * Timed offer succeeds if drop handler forces request
903 dl 1.1 */
904     public void testRecoveredHandledDroppedTimedOffer() {
905     AtomicInteger calls = new AtomicInteger();
906     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
907     basicExecutor, 4);
908     TestSubscriber s1 = new TestSubscriber();
909     s1.request = false;
910     TestSubscriber s2 = new TestSubscriber();
911     s2.request = false;
912     p.subscribe(s1);
913     p.subscribe(s2);
914     s2.awaitSubscribe();
915     s1.awaitSubscribe();
916     int n = 0;
917 dl 1.9 long delay = timeoutMillis();
918     long startTime = System.nanoTime();
919     for (int i = 1; i <= 6; ++i) {
920     int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
921 dl 1.1 n = n + 2 + (d < 0 ? d : 0);
922     }
923 dl 1.9 assertTrue(millisElapsedSince(startTime) >= delay);
924 dl 1.1 p.close();
925     s2.awaitComplete();
926     s1.awaitComplete();
927 jsr166 1.6 assertEquals(n, s1.nexts + s2.nexts);
928 dl 1.1 assertTrue(calls.get() >= 2);
929     }
930    
931 dl 1.10 /**
932     * consume returns a CompletableFuture that is done when
933     * publisher completes
934     */
935     public void testConsume() {
936     AtomicInteger sum = new AtomicInteger();
937     SubmissionPublisher<Integer> p = basicPublisher();
938 jsr166 1.11 CompletableFuture<Void> f =
939 jsr166 1.15 p.consume((Integer x) -> sum.getAndAdd(x.intValue()));
940 dl 1.10 int n = 20;
941     for (int i = 1; i <= n; ++i)
942     p.submit(i);
943     p.close();
944     f.join();
945     assertEquals((n * (n + 1)) / 2, sum.get());
946     }
947    
948     /**
949     * consume(null) throws NPE
950     */
951     public void testConsumeNPE() {
952     SubmissionPublisher<Integer> p = basicPublisher();
953     try {
954     CompletableFuture<Void> f = p.consume(null);
955     shouldThrow();
956 jsr166 1.11 } catch (NullPointerException success) {}
957 dl 1.10 }
958    
959     /**
960     * consume eventually stops processing published items if cancelled
961     */
962     public void testCancelledConsume() {
963     AtomicInteger count = new AtomicInteger();
964     SubmissionPublisher<Integer> p = basicPublisher();
965     CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
966     f.cancel(true);
967     int n = 1000000; // arbitrary limit
968     for (int i = 1; i <= n; ++i)
969     p.submit(i);
970     assertTrue(count.get() < n);
971     }
972 jsr166 1.11
973 dl 1.1 }