ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SubmissionPublisherTest.java
Revision: 1.18
Committed: Wed Jan 4 06:09:58 2017 UTC (7 years, 3 months ago) by jsr166
Branch: MAIN
Changes since 1.17: +18 -19 lines
Log Message:
convert to Diamond

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea and Martin Buchholz with assistance from
3     * members of JCP JSR-166 Expert Group and released to the public
4     * domain, as explained at
5     * http://creativecommons.org/publicdomain/zero/1.0/
6     */
7    
8 dl 1.10 import java.util.concurrent.CompletableFuture;
9 dl 1.1 import java.util.concurrent.Executor;
10     import java.util.concurrent.Executors;
11     import java.util.concurrent.Flow;
12 dl 1.10 import java.util.concurrent.ForkJoinPool;
13 dl 1.1 import java.util.concurrent.SubmissionPublisher;
14     import java.util.concurrent.atomic.AtomicInteger;
15     import junit.framework.Test;
16     import junit.framework.TestSuite;
17    
18 dl 1.10 import static java.util.concurrent.Flow.Subscriber;
19     import static java.util.concurrent.Flow.Subscription;
20     import static java.util.concurrent.TimeUnit.MILLISECONDS;
21    
22 dl 1.1 public class SubmissionPublisherTest extends JSR166TestCase {
23    
24     public static void main(String[] args) {
25     main(suite(), args);
26     }
27     public static Test suite() {
28     return new TestSuite(SubmissionPublisherTest.class);
29     }
30    
31 dl 1.12 final Executor basicExecutor = basicPublisher().getExecutor();
32 jsr166 1.13
33 dl 1.1 static SubmissionPublisher<Integer> basicPublisher() {
34 dl 1.12 return new SubmissionPublisher<Integer>();
35 dl 1.1 }
36 jsr166 1.2
37 dl 1.1 static class SPException extends RuntimeException {}
38    
39     class TestSubscriber implements Subscriber<Integer> {
40     volatile Subscription sn;
41     int last; // Requires that onNexts are in numeric order
42     volatile int nexts;
43     volatile int errors;
44     volatile int completes;
45     volatile boolean throwOnCall = false;
46     volatile boolean request = true;
47     volatile Throwable lastError;
48    
49     public synchronized void onSubscribe(Subscription s) {
50     threadAssertTrue(sn == null);
51     sn = s;
52     notifyAll();
53     if (throwOnCall)
54     throw new SPException();
55     if (request)
56     sn.request(1L);
57     }
58     public synchronized void onNext(Integer t) {
59     ++nexts;
60     notifyAll();
61     int current = t.intValue();
62     threadAssertTrue(current >= last);
63     last = current;
64     if (request)
65     sn.request(1L);
66     if (throwOnCall)
67     throw new SPException();
68     }
69     public synchronized void onError(Throwable t) {
70     threadAssertTrue(completes == 0);
71     threadAssertTrue(errors == 0);
72     lastError = t;
73     ++errors;
74     notifyAll();
75     }
76     public synchronized void onComplete() {
77     threadAssertTrue(completes == 0);
78     ++completes;
79     notifyAll();
80     }
81    
82     synchronized void awaitSubscribe() {
83     while (sn == null) {
84     try {
85     wait();
86     } catch (Exception ex) {
87     threadUnexpectedException(ex);
88     break;
89     }
90     }
91     }
92     synchronized void awaitNext(int n) {
93     while (nexts < n) {
94     try {
95     wait();
96     } catch (Exception ex) {
97     threadUnexpectedException(ex);
98     break;
99     }
100     }
101     }
102     synchronized void awaitComplete() {
103     while (completes == 0 && errors == 0) {
104     try {
105     wait();
106     } catch (Exception ex) {
107     threadUnexpectedException(ex);
108     break;
109     }
110     }
111     }
112     synchronized void awaitError() {
113     while (errors == 0) {
114     try {
115     wait();
116     } catch (Exception ex) {
117     threadUnexpectedException(ex);
118     break;
119     }
120     }
121     }
122    
123     }
124    
125     /**
126     * A new SubmissionPublisher has no subscribers, a non-null
127     * executor, a power-of-two capacity, is not closed, and reports
128     * zero demand and lag
129     */
130     void checkInitialState(SubmissionPublisher<?> p) {
131     assertFalse(p.hasSubscribers());
132 jsr166 1.6 assertEquals(0, p.getNumberOfSubscribers());
133 dl 1.1 assertTrue(p.getSubscribers().isEmpty());
134     assertFalse(p.isClosed());
135     assertNull(p.getClosedException());
136     int n = p.getMaxBufferCapacity();
137     assertTrue((n & (n - 1)) == 0); // power of two
138     assertNotNull(p.getExecutor());
139 jsr166 1.6 assertEquals(0, p.estimateMinimumDemand());
140     assertEquals(0, p.estimateMaximumLag());
141 dl 1.1 }
142    
143     /**
144     * A default-constructed SubmissionPublisher has no subscribers,
145     * is not closed, has default buffer size, and uses the
146 dl 1.12 * defaultExecutor
147 dl 1.1 */
148     public void testConstructor1() {
149 jsr166 1.18 SubmissionPublisher<Integer> p = new SubmissionPublisher<>();
150 dl 1.1 checkInitialState(p);
151     assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
152 dl 1.12 Executor e = p.getExecutor(), c = ForkJoinPool.commonPool();
153     if (ForkJoinPool.getCommonPoolParallelism() > 1)
154     assertSame(e, c);
155     else
156     assertNotSame(e, c);
157 dl 1.1 }
158    
159     /**
160     * A new SubmissionPublisher has no subscribers, is not closed,
161     * has the given buffer size, and uses the given executor
162     */
163     public void testConstructor2() {
164     Executor e = Executors.newFixedThreadPool(1);
165 jsr166 1.18 SubmissionPublisher<Integer> p = new SubmissionPublisher<>(e, 8);
166 dl 1.1 checkInitialState(p);
167     assertSame(p.getExecutor(), e);
168 jsr166 1.6 assertEquals(8, p.getMaxBufferCapacity());
169 dl 1.1 }
170    
171     /**
172     * A null Executor argument to SubmissionPublisher constructor throws NPE
173     */
174     public void testConstructor3() {
175     try {
176 jsr166 1.2 new SubmissionPublisher<Integer>(null, 8);
177 dl 1.1 shouldThrow();
178 jsr166 1.2 } catch (NullPointerException success) {}
179 dl 1.1 }
180    
181     /**
182     * A negative capacity argument to SubmissionPublisher constructor
183     * throws IAE
184     */
185     public void testConstructor4() {
186     Executor e = Executors.newFixedThreadPool(1);
187     try {
188 jsr166 1.2 new SubmissionPublisher<Integer>(e, -1);
189 dl 1.1 shouldThrow();
190 jsr166 1.2 } catch (IllegalArgumentException success) {}
191 dl 1.1 }
192    
193     /**
194     * A closed publisher reports isClosed with no closedException and
195     * throws ISE upon attempted submission; a subsequent close or
196     * closeExceptionally has no additional effect.
197     */
198     public void testClose() {
199     SubmissionPublisher<Integer> p = basicPublisher();
200     checkInitialState(p);
201     p.close();
202     assertTrue(p.isClosed());
203     assertNull(p.getClosedException());
204     try {
205     p.submit(1);
206     shouldThrow();
207 jsr166 1.2 } catch (IllegalStateException success) {}
208 dl 1.1 Throwable ex = new SPException();
209     p.closeExceptionally(ex);
210     assertTrue(p.isClosed());
211     assertNull(p.getClosedException());
212     }
213    
214     /**
215     * A publisher closedExceptionally reports isClosed with the
216     * closedException and throws ISE upon attempted submission; a
217     * subsequent close or closeExceptionally has no additional
218     * effect.
219     */
220     public void testCloseExceptionally() {
221     SubmissionPublisher<Integer> p = basicPublisher();
222     checkInitialState(p);
223     Throwable ex = new SPException();
224     p.closeExceptionally(ex);
225     assertTrue(p.isClosed());
226     assertSame(p.getClosedException(), ex);
227     try {
228     p.submit(1);
229     shouldThrow();
230 jsr166 1.2 } catch (IllegalStateException success) {}
231 dl 1.1 p.close();
232     assertTrue(p.isClosed());
233     assertSame(p.getClosedException(), ex);
234     }
235    
236     /**
237     * Upon subscription, the subscriber's onSubscribe is called, no
238     * other Subscriber methods are invoked, the publisher
239     * hasSubscribers, isSubscribed is true, and existing
240     * subscriptions are unaffected.
241     */
242     public void testSubscribe1() {
243     TestSubscriber s = new TestSubscriber();
244     SubmissionPublisher<Integer> p = basicPublisher();
245     p.subscribe(s);
246     assertTrue(p.hasSubscribers());
247 jsr166 1.6 assertEquals(1, p.getNumberOfSubscribers());
248 dl 1.1 assertTrue(p.getSubscribers().contains(s));
249     assertTrue(p.isSubscribed(s));
250     s.awaitSubscribe();
251     assertNotNull(s.sn);
252 jsr166 1.6 assertEquals(0, s.nexts);
253     assertEquals(0, s.errors);
254     assertEquals(0, s.completes);
255 dl 1.1 TestSubscriber s2 = new TestSubscriber();
256     p.subscribe(s2);
257     assertTrue(p.hasSubscribers());
258 jsr166 1.6 assertEquals(2, p.getNumberOfSubscribers());
259 dl 1.1 assertTrue(p.getSubscribers().contains(s));
260     assertTrue(p.getSubscribers().contains(s2));
261     assertTrue(p.isSubscribed(s));
262     assertTrue(p.isSubscribed(s2));
263     s2.awaitSubscribe();
264     assertNotNull(s2.sn);
265 jsr166 1.6 assertEquals(0, s2.nexts);
266     assertEquals(0, s2.errors);
267     assertEquals(0, s2.completes);
268 dl 1.9 p.close();
269 dl 1.1 }
270    
271     /**
272     * If closed, upon subscription, the subscriber's onComplete
273     * method is invoked
274     */
275     public void testSubscribe2() {
276     TestSubscriber s = new TestSubscriber();
277     SubmissionPublisher<Integer> p = basicPublisher();
278     p.close();
279     p.subscribe(s);
280     s.awaitComplete();
281 jsr166 1.6 assertEquals(0, s.nexts);
282     assertEquals(0, s.errors);
283     assertEquals(1, s.completes, 1);
284 dl 1.1 }
285    
286     /**
287     * If closedExceptionally, upon subscription, the subscriber's
288     * onError method is invoked
289     */
290     public void testSubscribe3() {
291     TestSubscriber s = new TestSubscriber();
292     SubmissionPublisher<Integer> p = basicPublisher();
293     Throwable ex = new SPException();
294     p.closeExceptionally(ex);
295     assertTrue(p.isClosed());
296     assertSame(p.getClosedException(), ex);
297     p.subscribe(s);
298     s.awaitError();
299 jsr166 1.6 assertEquals(0, s.nexts);
300     assertEquals(1, s.errors);
301 dl 1.1 }
302    
303     /**
304     * Upon attempted resubscription, the subscriber's onError is
305     * called and the subscription is cancelled.
306     */
307     public void testSubscribe4() {
308     TestSubscriber s = new TestSubscriber();
309     SubmissionPublisher<Integer> p = basicPublisher();
310     p.subscribe(s);
311     assertTrue(p.hasSubscribers());
312 jsr166 1.6 assertEquals(1, p.getNumberOfSubscribers());
313 dl 1.1 assertTrue(p.getSubscribers().contains(s));
314     assertTrue(p.isSubscribed(s));
315     s.awaitSubscribe();
316     assertNotNull(s.sn);
317 jsr166 1.6 assertEquals(0, s.nexts);
318     assertEquals(0, s.errors);
319     assertEquals(0, s.completes);
320 dl 1.1 p.subscribe(s);
321     s.awaitError();
322 jsr166 1.6 assertEquals(0, s.nexts);
323     assertEquals(1, s.errors);
324 dl 1.1 assertFalse(p.isSubscribed(s));
325     }
326    
327     /**
328     * An exception thrown in onSubscribe causes onError
329     */
330     public void testSubscribe5() {
331     TestSubscriber s = new TestSubscriber();
332     SubmissionPublisher<Integer> p = basicPublisher();
333     s.throwOnCall = true;
334     try {
335     p.subscribe(s);
336 jsr166 1.2 } catch (Exception ok) {}
337 dl 1.1 s.awaitError();
338 jsr166 1.6 assertEquals(0, s.nexts);
339     assertEquals(1, s.errors);
340     assertEquals(0, s.completes);
341 dl 1.1 }
342    
343     /**
344 jsr166 1.8 * subscribe(null) throws NPE
345 dl 1.1 */
346     public void testSubscribe6() {
347     SubmissionPublisher<Integer> p = basicPublisher();
348     try {
349     p.subscribe(null);
350 jsr166 1.2 shouldThrow();
351     } catch (NullPointerException success) {}
352 dl 1.1 checkInitialState(p);
353     }
354    
355     /**
356     * Closing a publisher causes onComplete to subscribers
357     */
358     public void testCloseCompletes() {
359     SubmissionPublisher<Integer> p = basicPublisher();
360     TestSubscriber s1 = new TestSubscriber();
361     TestSubscriber s2 = new TestSubscriber();
362     p.subscribe(s1);
363     p.subscribe(s2);
364     p.submit(1);
365     p.close();
366     assertTrue(p.isClosed());
367     assertNull(p.getClosedException());
368     s1.awaitComplete();
369 jsr166 1.6 assertEquals(1, s1.nexts);
370     assertEquals(1, s1.completes);
371 dl 1.1 s2.awaitComplete();
372 jsr166 1.6 assertEquals(1, s2.nexts);
373     assertEquals(1, s2.completes);
374 dl 1.1 }
375    
376     /**
377     * Closing a publisher exceptionally causes onError to subscribers
378 dl 1.16 * after they are subscribed
379 dl 1.1 */
380     public void testCloseExceptionallyError() {
381     SubmissionPublisher<Integer> p = basicPublisher();
382     TestSubscriber s1 = new TestSubscriber();
383     TestSubscriber s2 = new TestSubscriber();
384     p.subscribe(s1);
385     p.subscribe(s2);
386     p.submit(1);
387     p.closeExceptionally(new SPException());
388     assertTrue(p.isClosed());
389 dl 1.16 s1.awaitSubscribe();
390 dl 1.1 s1.awaitError();
391     assertTrue(s1.nexts <= 1);
392 jsr166 1.6 assertEquals(1, s1.errors);
393 dl 1.17 s2.awaitSubscribe();
394 dl 1.1 s2.awaitError();
395     assertTrue(s2.nexts <= 1);
396 jsr166 1.6 assertEquals(1, s2.errors);
397 dl 1.1 }
398    
399     /**
400     * Cancelling a subscription eventually causes no more onNexts to be issued
401     */
402     public void testCancel() {
403     SubmissionPublisher<Integer> p = basicPublisher();
404     TestSubscriber s1 = new TestSubscriber();
405     TestSubscriber s2 = new TestSubscriber();
406     p.subscribe(s1);
407     p.subscribe(s2);
408     s1.awaitSubscribe();
409     p.submit(1);
410     s1.sn.cancel();
411     for (int i = 2; i <= 20; ++i)
412     p.submit(i);
413     p.close();
414     s2.awaitComplete();
415 jsr166 1.6 assertEquals(20, s2.nexts);
416     assertEquals(1, s2.completes);
417 dl 1.1 assertTrue(s1.nexts < 20);
418     assertFalse(p.isSubscribed(s1));
419     }
420    
421     /**
422     * Throwing an exception in onNext causes onError
423     */
424     public void testThrowOnNext() {
425     SubmissionPublisher<Integer> p = basicPublisher();
426     TestSubscriber s1 = new TestSubscriber();
427     TestSubscriber s2 = new TestSubscriber();
428     p.subscribe(s1);
429     p.subscribe(s2);
430     s1.awaitSubscribe();
431     p.submit(1);
432     s1.throwOnCall = true;
433     p.submit(2);
434     p.close();
435     s2.awaitComplete();
436 jsr166 1.6 assertEquals(2, s2.nexts);
437 dl 1.1 s1.awaitComplete();
438 jsr166 1.6 assertEquals(1, s1.errors);
439 dl 1.1 }
440    
441     /**
442 jsr166 1.8 * If a handler is supplied in constructor, it is invoked when
443 dl 1.1 * subscriber throws an exception in onNext
444     */
445     public void testThrowOnNextHandler() {
446     AtomicInteger calls = new AtomicInteger();
447 jsr166 1.18 SubmissionPublisher<Integer> p = new SubmissionPublisher<>(
448     basicExecutor, 8, (s, e) -> calls.getAndIncrement());
449 dl 1.1 TestSubscriber s1 = new TestSubscriber();
450     TestSubscriber s2 = new TestSubscriber();
451     p.subscribe(s1);
452     p.subscribe(s2);
453     s1.awaitSubscribe();
454     p.submit(1);
455     s1.throwOnCall = true;
456     p.submit(2);
457     p.close();
458     s2.awaitComplete();
459 jsr166 1.6 assertEquals(2, s2.nexts);
460     assertEquals(1, s2.completes);
461 dl 1.1 s1.awaitError();
462 jsr166 1.6 assertEquals(1, s1.errors);
463     assertEquals(1, calls.get());
464 dl 1.1 }
465    
466     /**
467     * onNext items are issued in the same order to each subscriber
468     */
469     public void testOrder() {
470     SubmissionPublisher<Integer> p = basicPublisher();
471     TestSubscriber s1 = new TestSubscriber();
472     TestSubscriber s2 = new TestSubscriber();
473     p.subscribe(s1);
474     p.subscribe(s2);
475     for (int i = 1; i <= 20; ++i)
476     p.submit(i);
477     p.close();
478     s2.awaitComplete();
479     s1.awaitComplete();
480 jsr166 1.6 assertEquals(20, s2.nexts);
481     assertEquals(1, s2.completes);
482     assertEquals(20, s1.nexts);
483     assertEquals(1, s1.completes);
484 dl 1.1 }
485    
486     /**
487     * onNext is issued only if requested
488     */
489     public void testRequest1() {
490     SubmissionPublisher<Integer> p = basicPublisher();
491     TestSubscriber s1 = new TestSubscriber();
492     s1.request = false;
493     p.subscribe(s1);
494     s1.awaitSubscribe();
495     assertTrue(p.estimateMinimumDemand() == 0);
496     TestSubscriber s2 = new TestSubscriber();
497     p.subscribe(s2);
498     p.submit(1);
499     p.submit(2);
500     s2.awaitNext(1);
501 jsr166 1.6 assertEquals(0, s1.nexts);
502 dl 1.1 s1.sn.request(3);
503     p.submit(3);
504     p.close();
505     s2.awaitComplete();
506 jsr166 1.6 assertEquals(3, s2.nexts);
507     assertEquals(1, s2.completes);
508 dl 1.1 s1.awaitComplete();
509     assertTrue(s1.nexts > 0);
510 jsr166 1.6 assertEquals(1, s1.completes);
511 dl 1.1 }
512    
513     /**
514     * onNext is not issued when requests become zero
515     */
516     public void testRequest2() {
517     SubmissionPublisher<Integer> p = basicPublisher();
518     TestSubscriber s1 = new TestSubscriber();
519     TestSubscriber s2 = new TestSubscriber();
520     p.subscribe(s1);
521     p.subscribe(s2);
522     s2.awaitSubscribe();
523     s1.awaitSubscribe();
524     s1.request = false;
525     p.submit(1);
526     p.submit(2);
527     p.close();
528     s2.awaitComplete();
529 jsr166 1.6 assertEquals(2, s2.nexts);
530     assertEquals(1, s2.completes);
531 dl 1.1 s1.awaitNext(1);
532 jsr166 1.6 assertEquals(1, s1.nexts);
533 dl 1.1 }
534    
535     /**
536     * Negative request causes error
537     */
538     public void testRequest3() {
539     SubmissionPublisher<Integer> p = basicPublisher();
540     TestSubscriber s1 = new TestSubscriber();
541     TestSubscriber s2 = new TestSubscriber();
542     p.subscribe(s1);
543     p.subscribe(s2);
544     s2.awaitSubscribe();
545     s1.awaitSubscribe();
546     s1.sn.request(-1L);
547     p.submit(1);
548     p.submit(2);
549     p.close();
550     s2.awaitComplete();
551 jsr166 1.6 assertEquals(2, s2.nexts);
552     assertEquals(1, s2.completes);
553 dl 1.1 s1.awaitError();
554 jsr166 1.6 assertEquals(1, s1.errors);
555 dl 1.1 assertTrue(s1.lastError instanceof IllegalArgumentException);
556     }
557    
558     /**
559     * estimateMinimumDemand reports 0 until request, nonzero after
560     * request, and zero again after delivery
561     */
562     public void testEstimateMinimumDemand() {
563     TestSubscriber s = new TestSubscriber();
564     SubmissionPublisher<Integer> p = basicPublisher();
565     s.request = false;
566     p.subscribe(s);
567     s.awaitSubscribe();
568 jsr166 1.6 assertEquals(0, p.estimateMinimumDemand());
569 dl 1.1 s.sn.request(1);
570 jsr166 1.6 assertEquals(1, p.estimateMinimumDemand());
571 dl 1.1 p.submit(1);
572     s.awaitNext(1);
573 jsr166 1.6 assertEquals(0, p.estimateMinimumDemand());
574 dl 1.1 }
575    
576     /**
577 jsr166 1.3 * submit to a publisher with no subscribers returns lag 0
578 dl 1.1 */
579     public void testEmptySubmit() {
580     SubmissionPublisher<Integer> p = basicPublisher();
581 jsr166 1.6 assertEquals(0, p.submit(1));
582 dl 1.1 }
583    
584     /**
585 jsr166 1.3 * submit(null) throws NPE
586 dl 1.1 */
587     public void testNullSubmit() {
588     SubmissionPublisher<Integer> p = basicPublisher();
589     try {
590     p.submit(null);
591 jsr166 1.2 shouldThrow();
592     } catch (NullPointerException success) {}
593 dl 1.1 }
594    
595     /**
596 jsr166 1.3 * submit returns number of lagged items, compatible with result
597 dl 1.1 * of estimateMaximumLag.
598     */
599     public void testLaggedSubmit() {
600     SubmissionPublisher<Integer> p = basicPublisher();
601     TestSubscriber s1 = new TestSubscriber();
602     s1.request = false;
603     TestSubscriber s2 = new TestSubscriber();
604     s2.request = false;
605     p.subscribe(s1);
606     p.subscribe(s2);
607     s2.awaitSubscribe();
608     s1.awaitSubscribe();
609 jsr166 1.6 assertEquals(1, p.submit(1));
610 dl 1.1 assertTrue(p.estimateMaximumLag() >= 1);
611     assertTrue(p.submit(2) >= 2);
612     assertTrue(p.estimateMaximumLag() >= 2);
613     s1.sn.request(4);
614     assertTrue(p.submit(3) >= 3);
615     assertTrue(p.estimateMaximumLag() >= 3);
616     s2.sn.request(4);
617     p.submit(4);
618     p.close();
619     s2.awaitComplete();
620 jsr166 1.6 assertEquals(4, s2.nexts);
621 dl 1.1 s1.awaitComplete();
622 jsr166 1.6 assertEquals(4, s2.nexts);
623 dl 1.1 }
624    
625     /**
626     * submit eventually issues requested items when buffer capacity is 1
627     */
628     public void testCap1Submit() {
629 jsr166 1.18 SubmissionPublisher<Integer> p
630     = new SubmissionPublisher<>(basicExecutor, 1);
631 dl 1.1 TestSubscriber s1 = new TestSubscriber();
632     TestSubscriber s2 = new TestSubscriber();
633     p.subscribe(s1);
634     p.subscribe(s2);
635     for (int i = 1; i <= 20; ++i) {
636     assertTrue(p.estimateMinimumDemand() <= 1);
637     assertTrue(p.submit(i) >= 0);
638     }
639     p.close();
640     s2.awaitComplete();
641     s1.awaitComplete();
642 jsr166 1.6 assertEquals(20, s2.nexts);
643     assertEquals(1, s2.completes);
644     assertEquals(20, s1.nexts);
645     assertEquals(1, s1.completes);
646 dl 1.1 }
647 jsr166 1.2
648 dl 1.1 static boolean noopHandle(AtomicInteger count) {
649     count.getAndIncrement();
650     return false;
651     }
652    
653     static boolean reqHandle(AtomicInteger count, Subscriber s) {
654     count.getAndIncrement();
655     ((TestSubscriber)s).sn.request(Long.MAX_VALUE);
656     return true;
657     }
658    
659     /**
660 jsr166 1.3 * offer to a publisher with no subscribers returns lag 0
661 dl 1.1 */
662     public void testEmptyOffer() {
663     SubmissionPublisher<Integer> p = basicPublisher();
664 jsr166 1.3 assertEquals(0, p.offer(1, null));
665 dl 1.1 }
666    
667     /**
668 jsr166 1.3 * offer(null) throws NPE
669 dl 1.1 */
670     public void testNullOffer() {
671     SubmissionPublisher<Integer> p = basicPublisher();
672     try {
673     p.offer(null, null);
674 jsr166 1.2 shouldThrow();
675     } catch (NullPointerException success) {}
676 dl 1.1 }
677    
678     /**
679 jsr166 1.3 * offer returns number of lagged items if not saturated
680 dl 1.1 */
681     public void testLaggedOffer() {
682     SubmissionPublisher<Integer> p = basicPublisher();
683     TestSubscriber s1 = new TestSubscriber();
684     s1.request = false;
685     TestSubscriber s2 = new TestSubscriber();
686     s2.request = false;
687     p.subscribe(s1);
688     p.subscribe(s2);
689     s2.awaitSubscribe();
690     s1.awaitSubscribe();
691     assertTrue(p.offer(1, null) >= 1);
692     assertTrue(p.offer(2, null) >= 2);
693     s1.sn.request(4);
694     assertTrue(p.offer(3, null) >= 3);
695     s2.sn.request(4);
696     p.offer(4, null);
697     p.close();
698     s2.awaitComplete();
699 jsr166 1.6 assertEquals(4, s2.nexts);
700 dl 1.1 s1.awaitComplete();
701 jsr166 1.6 assertEquals(4, s2.nexts);
702 dl 1.1 }
703    
704     /**
705 jsr166 1.3 * offer reports drops if saturated
706 dl 1.1 */
707     public void testDroppedOffer() {
708 jsr166 1.18 SubmissionPublisher<Integer> p
709     = new SubmissionPublisher<>(basicExecutor, 4);
710 dl 1.1 TestSubscriber s1 = new TestSubscriber();
711     s1.request = false;
712     TestSubscriber s2 = new TestSubscriber();
713     s2.request = false;
714     p.subscribe(s1);
715     p.subscribe(s2);
716     s2.awaitSubscribe();
717     s1.awaitSubscribe();
718     for (int i = 1; i <= 4; ++i)
719     assertTrue(p.offer(i, null) >= 0);
720     p.offer(5, null);
721     assertTrue(p.offer(6, null) < 0);
722     s1.sn.request(64);
723     assertTrue(p.offer(7, null) < 0);
724     s2.sn.request(64);
725     p.close();
726     s2.awaitComplete();
727     assertTrue(s2.nexts >= 4);
728     s1.awaitComplete();
729     assertTrue(s1.nexts >= 4);
730     }
731    
732     /**
733 jsr166 1.3 * offer invokes drop handler if saturated
734 dl 1.1 */
735     public void testHandledDroppedOffer() {
736     AtomicInteger calls = new AtomicInteger();
737 jsr166 1.18 SubmissionPublisher<Integer> p
738     = new SubmissionPublisher<>(basicExecutor, 4);
739 dl 1.1 TestSubscriber s1 = new TestSubscriber();
740     s1.request = false;
741     TestSubscriber s2 = new TestSubscriber();
742     s2.request = false;
743     p.subscribe(s1);
744     p.subscribe(s2);
745     s2.awaitSubscribe();
746     s1.awaitSubscribe();
747     for (int i = 1; i <= 4; ++i)
748     assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
749     p.offer(4, (s, x) -> noopHandle(calls));
750     assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
751     s1.sn.request(64);
752     assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
753     s2.sn.request(64);
754     p.close();
755     s2.awaitComplete();
756     s1.awaitComplete();
757     assertTrue(calls.get() >= 4);
758     }
759    
760     /**
761 jsr166 1.3 * offer succeeds if drop handler forces request
762 dl 1.1 */
763     public void testRecoveredHandledDroppedOffer() {
764     AtomicInteger calls = new AtomicInteger();
765 jsr166 1.18 SubmissionPublisher<Integer> p
766     = new SubmissionPublisher<>(basicExecutor, 4);
767 dl 1.1 TestSubscriber s1 = new TestSubscriber();
768     s1.request = false;
769     TestSubscriber s2 = new TestSubscriber();
770     s2.request = false;
771     p.subscribe(s1);
772     p.subscribe(s2);
773     s2.awaitSubscribe();
774     s1.awaitSubscribe();
775     int n = 0;
776     for (int i = 1; i <= 8; ++i) {
777     int d = p.offer(i, (s, x) -> reqHandle(calls, s));
778     n = n + 2 + (d < 0 ? d : 0);
779     }
780     p.close();
781     s2.awaitComplete();
782     s1.awaitComplete();
783 jsr166 1.6 assertEquals(n, s1.nexts + s2.nexts);
784 dl 1.1 assertTrue(calls.get() >= 2);
785     }
786    
787     /**
788 jsr166 1.4 * Timed offer to a publisher with no subscribers returns lag 0
789 dl 1.1 */
790     public void testEmptyTimedOffer() {
791     SubmissionPublisher<Integer> p = basicPublisher();
792 jsr166 1.7 long startTime = System.nanoTime();
793 jsr166 1.4 assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null));
794 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
795 dl 1.1 }
796    
797     /**
798 jsr166 1.3 * Timed offer with null item or TimeUnit throws NPE
799 dl 1.1 */
800     public void testNullTimedOffer() {
801     SubmissionPublisher<Integer> p = basicPublisher();
802 jsr166 1.7 long startTime = System.nanoTime();
803 dl 1.1 try {
804 jsr166 1.7 p.offer(null, LONG_DELAY_MS, MILLISECONDS, null);
805 jsr166 1.2 shouldThrow();
806     } catch (NullPointerException success) {}
807 dl 1.1 try {
808 jsr166 1.7 p.offer(1, LONG_DELAY_MS, null, null);
809 jsr166 1.2 shouldThrow();
810     } catch (NullPointerException success) {}
811 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
812 dl 1.1 }
813    
814     /**
815 jsr166 1.3 * Timed offer returns number of lagged items if not saturated
816 dl 1.1 */
817     public void testLaggedTimedOffer() {
818     SubmissionPublisher<Integer> p = basicPublisher();
819     TestSubscriber s1 = new TestSubscriber();
820     s1.request = false;
821     TestSubscriber s2 = new TestSubscriber();
822     s2.request = false;
823     p.subscribe(s1);
824     p.subscribe(s2);
825     s2.awaitSubscribe();
826     s1.awaitSubscribe();
827 jsr166 1.7 long startTime = System.nanoTime();
828     assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
829     assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
830 dl 1.1 s1.sn.request(4);
831 jsr166 1.7 assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
832 dl 1.1 s2.sn.request(4);
833 jsr166 1.7 p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
834 dl 1.1 p.close();
835     s2.awaitComplete();
836 jsr166 1.6 assertEquals(4, s2.nexts);
837 dl 1.1 s1.awaitComplete();
838 jsr166 1.6 assertEquals(4, s2.nexts);
839 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
840 dl 1.1 }
841    
842     /**
843 jsr166 1.3 * Timed offer reports drops if saturated
844 dl 1.1 */
845     public void testDroppedTimedOffer() {
846 jsr166 1.18 SubmissionPublisher<Integer> p
847     = new SubmissionPublisher<>(basicExecutor, 4);
848 dl 1.1 TestSubscriber s1 = new TestSubscriber();
849     s1.request = false;
850     TestSubscriber s2 = new TestSubscriber();
851     s2.request = false;
852     p.subscribe(s1);
853     p.subscribe(s2);
854     s2.awaitSubscribe();
855     s1.awaitSubscribe();
856 dl 1.9 long delay = timeoutMillis();
857 dl 1.1 for (int i = 1; i <= 4; ++i)
858 dl 1.9 assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0);
859     long startTime = System.nanoTime();
860     assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0);
861 dl 1.1 s1.sn.request(64);
862 dl 1.9 assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0);
863     // 2 * delay should elapse but check only 1 * delay to allow timer slop
864     assertTrue(millisElapsedSince(startTime) >= delay);
865 dl 1.1 s2.sn.request(64);
866     p.close();
867     s2.awaitComplete();
868     assertTrue(s2.nexts >= 2);
869     s1.awaitComplete();
870     assertTrue(s1.nexts >= 2);
871     }
872    
873     /**
874 jsr166 1.3 * Timed offer invokes drop handler if saturated
875 dl 1.1 */
876     public void testHandledDroppedTimedOffer() {
877     AtomicInteger calls = new AtomicInteger();
878 jsr166 1.18 SubmissionPublisher<Integer> p
879     = new SubmissionPublisher<>(basicExecutor, 4);
880 dl 1.1 TestSubscriber s1 = new TestSubscriber();
881     s1.request = false;
882     TestSubscriber s2 = new TestSubscriber();
883     s2.request = false;
884     p.subscribe(s1);
885     p.subscribe(s2);
886     s2.awaitSubscribe();
887     s1.awaitSubscribe();
888 dl 1.9 long delay = timeoutMillis();
889 dl 1.1 for (int i = 1; i <= 4; ++i)
890 dl 1.9 assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
891     long startTime = System.nanoTime();
892     assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
893 dl 1.1 s1.sn.request(64);
894 dl 1.9 assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
895     assertTrue(millisElapsedSince(startTime) >= delay);
896 dl 1.1 s2.sn.request(64);
897     p.close();
898     s2.awaitComplete();
899     s1.awaitComplete();
900     assertTrue(calls.get() >= 2);
901     }
902    
903     /**
904 jsr166 1.3 * Timed offer succeeds if drop handler forces request
905 dl 1.1 */
906     public void testRecoveredHandledDroppedTimedOffer() {
907     AtomicInteger calls = new AtomicInteger();
908 jsr166 1.18 SubmissionPublisher<Integer> p
909     = new SubmissionPublisher<>(basicExecutor, 4);
910 dl 1.1 TestSubscriber s1 = new TestSubscriber();
911     s1.request = false;
912     TestSubscriber s2 = new TestSubscriber();
913     s2.request = false;
914     p.subscribe(s1);
915     p.subscribe(s2);
916     s2.awaitSubscribe();
917     s1.awaitSubscribe();
918     int n = 0;
919 dl 1.9 long delay = timeoutMillis();
920     long startTime = System.nanoTime();
921     for (int i = 1; i <= 6; ++i) {
922     int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
923 dl 1.1 n = n + 2 + (d < 0 ? d : 0);
924     }
925 dl 1.9 assertTrue(millisElapsedSince(startTime) >= delay);
926 dl 1.1 p.close();
927     s2.awaitComplete();
928     s1.awaitComplete();
929 jsr166 1.6 assertEquals(n, s1.nexts + s2.nexts);
930 dl 1.1 assertTrue(calls.get() >= 2);
931     }
932    
933 dl 1.10 /**
934     * consume returns a CompletableFuture that is done when
935     * publisher completes
936     */
937     public void testConsume() {
938     AtomicInteger sum = new AtomicInteger();
939     SubmissionPublisher<Integer> p = basicPublisher();
940 jsr166 1.11 CompletableFuture<Void> f =
941 jsr166 1.15 p.consume((Integer x) -> sum.getAndAdd(x.intValue()));
942 dl 1.10 int n = 20;
943     for (int i = 1; i <= n; ++i)
944     p.submit(i);
945     p.close();
946     f.join();
947     assertEquals((n * (n + 1)) / 2, sum.get());
948     }
949    
950     /**
951     * consume(null) throws NPE
952     */
953     public void testConsumeNPE() {
954     SubmissionPublisher<Integer> p = basicPublisher();
955     try {
956     CompletableFuture<Void> f = p.consume(null);
957     shouldThrow();
958 jsr166 1.11 } catch (NullPointerException success) {}
959 dl 1.10 }
960    
961     /**
962     * consume eventually stops processing published items if cancelled
963     */
964     public void testCancelledConsume() {
965     AtomicInteger count = new AtomicInteger();
966     SubmissionPublisher<Integer> p = basicPublisher();
967     CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
968     f.cancel(true);
969     int n = 1000000; // arbitrary limit
970     for (int i = 1; i <= n; ++i)
971     p.submit(i);
972     assertTrue(count.get() < n);
973     }
974 jsr166 1.11
975 dl 1.1 }