ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SubmissionPublisherTest.java
Revision: 1.9
Committed: Tue Sep 8 19:44:10 2015 UTC (8 years, 7 months ago) by dl
Branch: MAIN
Changes since 1.8: +20 -11 lines
Log Message:
Faster timeout checks

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea and Martin Buchholz with assistance from
3     * members of JCP JSR-166 Expert Group and released to the public
4     * domain, as explained at
5     * http://creativecommons.org/publicdomain/zero/1.0/
6     */
7    
8     import static java.util.concurrent.TimeUnit.MILLISECONDS;
9     import static java.util.concurrent.TimeUnit.SECONDS;
10    
11     import java.util.concurrent.Executor;
12     import java.util.concurrent.Executors;
13     import java.util.concurrent.Flow;
14     import static java.util.concurrent.Flow.Publisher;
15     import static java.util.concurrent.Flow.Subscriber;
16     import static java.util.concurrent.Flow.Subscription;
17     import java.util.concurrent.LinkedBlockingQueue;
18     import java.util.concurrent.ForkJoinPool;
19     import java.util.concurrent.SubmissionPublisher;
20     import java.util.concurrent.ThreadFactory;
21     import java.util.concurrent.ThreadPoolExecutor;
22     import java.util.concurrent.TimeUnit;
23     import java.util.concurrent.atomic.AtomicInteger;
24     import java.util.function.BiConsumer;
25     import java.util.function.BiPredicate;
26     import java.util.function.BiFunction;
27    
28     import junit.framework.Test;
29     import junit.framework.TestSuite;
30    
31     public class SubmissionPublisherTest extends JSR166TestCase {
32    
33     public static void main(String[] args) {
34     main(suite(), args);
35     }
36     public static Test suite() {
37     return new TestSuite(SubmissionPublisherTest.class);
38     }
39    
40     // Factory for single thread pool in case commonPool parallelism is zero
41     static final class DaemonThreadFactory implements ThreadFactory {
42     public Thread newThread(Runnable r) {
43     Thread t = new Thread(r);
44     t.setDaemon(true);
45     return t;
46     }
47     }
48 jsr166 1.2
49 dl 1.1 static final Executor basicExecutor =
50 dl 1.9 (ForkJoinPool.getCommonPoolParallelism() > 1) ?
51 dl 1.1 ForkJoinPool.commonPool() :
52     new ThreadPoolExecutor(1, 1, 60, SECONDS,
53     new LinkedBlockingQueue<Runnable>(),
54     new DaemonThreadFactory());
55 jsr166 1.2
56 dl 1.1 static SubmissionPublisher<Integer> basicPublisher() {
57     return new SubmissionPublisher<Integer>(basicExecutor,
58     Flow.defaultBufferSize());
59     }
60 jsr166 1.2
61 dl 1.1 static class SPException extends RuntimeException {}
62    
63     class TestSubscriber implements Subscriber<Integer> {
64     volatile Subscription sn;
65     int last; // Requires that onNexts are in numeric order
66     volatile int nexts;
67     volatile int errors;
68     volatile int completes;
69     volatile boolean throwOnCall = false;
70     volatile boolean request = true;
71     volatile Throwable lastError;
72    
73     public synchronized void onSubscribe(Subscription s) {
74     threadAssertTrue(sn == null);
75     sn = s;
76     notifyAll();
77     if (throwOnCall)
78     throw new SPException();
79     if (request)
80     sn.request(1L);
81     }
82     public synchronized void onNext(Integer t) {
83     ++nexts;
84     notifyAll();
85     int current = t.intValue();
86     threadAssertTrue(current >= last);
87     last = current;
88     if (request)
89     sn.request(1L);
90     if (throwOnCall)
91     throw new SPException();
92     }
93     public synchronized void onError(Throwable t) {
94     threadAssertTrue(completes == 0);
95     threadAssertTrue(errors == 0);
96     lastError = t;
97     ++errors;
98     notifyAll();
99     }
100     public synchronized void onComplete() {
101     threadAssertTrue(completes == 0);
102     ++completes;
103     notifyAll();
104     }
105    
106     synchronized void awaitSubscribe() {
107     while (sn == null) {
108     try {
109     wait();
110     } catch (Exception ex) {
111     threadUnexpectedException(ex);
112     break;
113     }
114     }
115     }
116     synchronized void awaitNext(int n) {
117     while (nexts < n) {
118     try {
119     wait();
120     } catch (Exception ex) {
121     threadUnexpectedException(ex);
122     break;
123     }
124     }
125     }
126     synchronized void awaitComplete() {
127     while (completes == 0 && errors == 0) {
128     try {
129     wait();
130     } catch (Exception ex) {
131     threadUnexpectedException(ex);
132     break;
133     }
134     }
135     }
136     synchronized void awaitError() {
137     while (errors == 0) {
138     try {
139     wait();
140     } catch (Exception ex) {
141     threadUnexpectedException(ex);
142     break;
143     }
144     }
145     }
146    
147     }
148    
149     /**
150     * A new SubmissionPublisher has no subscribers, a non-null
151     * executor, a power-of-two capacity, is not closed, and reports
152     * zero demand and lag
153     */
154     void checkInitialState(SubmissionPublisher<?> p) {
155     assertFalse(p.hasSubscribers());
156 jsr166 1.6 assertEquals(0, p.getNumberOfSubscribers());
157 dl 1.1 assertTrue(p.getSubscribers().isEmpty());
158     assertFalse(p.isClosed());
159     assertNull(p.getClosedException());
160     int n = p.getMaxBufferCapacity();
161     assertTrue((n & (n - 1)) == 0); // power of two
162     assertNotNull(p.getExecutor());
163 jsr166 1.6 assertEquals(0, p.estimateMinimumDemand());
164     assertEquals(0, p.estimateMaximumLag());
165 dl 1.1 }
166    
167     /**
168     * A default-constructed SubmissionPublisher has no subscribers,
169     * is not closed, has default buffer size, and uses the
170     * ForkJoinPool.commonPool executor
171     */
172     public void testConstructor1() {
173     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>();
174     checkInitialState(p);
175     assertSame(p.getExecutor(), ForkJoinPool.commonPool());
176     assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
177     }
178    
179     /**
180     * A new SubmissionPublisher has no subscribers, is not closed,
181     * has the given buffer size, and uses the given executor
182     */
183     public void testConstructor2() {
184     Executor e = Executors.newFixedThreadPool(1);
185     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(e, 8);
186     checkInitialState(p);
187     assertSame(p.getExecutor(), e);
188 jsr166 1.6 assertEquals(8, p.getMaxBufferCapacity());
189 dl 1.1 }
190    
191     /**
192     * A null Executor argument to SubmissionPublisher constructor throws NPE
193     */
194     public void testConstructor3() {
195     try {
196 jsr166 1.2 new SubmissionPublisher<Integer>(null, 8);
197 dl 1.1 shouldThrow();
198 jsr166 1.2 } catch (NullPointerException success) {}
199 dl 1.1 }
200    
201     /**
202     * A negative capacity argument to SubmissionPublisher constructor
203     * throws IAE
204     */
205     public void testConstructor4() {
206     Executor e = Executors.newFixedThreadPool(1);
207     try {
208 jsr166 1.2 new SubmissionPublisher<Integer>(e, -1);
209 dl 1.1 shouldThrow();
210 jsr166 1.2 } catch (IllegalArgumentException success) {}
211 dl 1.1 }
212    
213     /**
214     * A closed publisher reports isClosed with no closedException and
215     * throws ISE upon attempted submission; a subsequent close or
216     * closeExceptionally has no additional effect.
217     */
218     public void testClose() {
219     SubmissionPublisher<Integer> p = basicPublisher();
220     checkInitialState(p);
221     p.close();
222     assertTrue(p.isClosed());
223     assertNull(p.getClosedException());
224     try {
225     p.submit(1);
226     shouldThrow();
227 jsr166 1.2 } catch (IllegalStateException success) {}
228 dl 1.1 Throwable ex = new SPException();
229     p.closeExceptionally(ex);
230     assertTrue(p.isClosed());
231     assertNull(p.getClosedException());
232     }
233    
234     /**
235     * A publisher closedExceptionally reports isClosed with the
236     * closedException and throws ISE upon attempted submission; a
237     * subsequent close or closeExceptionally has no additional
238     * effect.
239     */
240     public void testCloseExceptionally() {
241     SubmissionPublisher<Integer> p = basicPublisher();
242     checkInitialState(p);
243     Throwable ex = new SPException();
244     p.closeExceptionally(ex);
245     assertTrue(p.isClosed());
246     assertSame(p.getClosedException(), ex);
247     try {
248     p.submit(1);
249     shouldThrow();
250 jsr166 1.2 } catch (IllegalStateException success) {}
251 dl 1.1 p.close();
252     assertTrue(p.isClosed());
253     assertSame(p.getClosedException(), ex);
254     }
255    
256     /**
257     * Upon subscription, the subscriber's onSubscribe is called, no
258     * other Subscriber methods are invoked, the publisher
259     * hasSubscribers, isSubscribed is true, and existing
260     * subscriptions are unaffected.
261     */
262     public void testSubscribe1() {
263     TestSubscriber s = new TestSubscriber();
264     SubmissionPublisher<Integer> p = basicPublisher();
265     p.subscribe(s);
266     assertTrue(p.hasSubscribers());
267 jsr166 1.6 assertEquals(1, p.getNumberOfSubscribers());
268 dl 1.1 assertTrue(p.getSubscribers().contains(s));
269     assertTrue(p.isSubscribed(s));
270     s.awaitSubscribe();
271     assertNotNull(s.sn);
272 jsr166 1.6 assertEquals(0, s.nexts);
273     assertEquals(0, s.errors);
274     assertEquals(0, s.completes);
275 dl 1.1 TestSubscriber s2 = new TestSubscriber();
276     p.subscribe(s2);
277     assertTrue(p.hasSubscribers());
278 jsr166 1.6 assertEquals(2, p.getNumberOfSubscribers());
279 dl 1.1 assertTrue(p.getSubscribers().contains(s));
280     assertTrue(p.getSubscribers().contains(s2));
281     assertTrue(p.isSubscribed(s));
282     assertTrue(p.isSubscribed(s2));
283     s2.awaitSubscribe();
284     assertNotNull(s2.sn);
285 jsr166 1.6 assertEquals(0, s2.nexts);
286     assertEquals(0, s2.errors);
287     assertEquals(0, s2.completes);
288 dl 1.9 p.close();
289 dl 1.1 }
290    
291     /**
292     * If closed, upon subscription, the subscriber's onComplete
293     * method is invoked
294     */
295     public void testSubscribe2() {
296     TestSubscriber s = new TestSubscriber();
297     SubmissionPublisher<Integer> p = basicPublisher();
298     p.close();
299     p.subscribe(s);
300     s.awaitComplete();
301 jsr166 1.6 assertEquals(0, s.nexts);
302     assertEquals(0, s.errors);
303     assertEquals(1, s.completes, 1);
304 dl 1.1 }
305    
306     /**
307     * If closedExceptionally, upon subscription, the subscriber's
308     * onError method is invoked
309     */
310     public void testSubscribe3() {
311     TestSubscriber s = new TestSubscriber();
312     SubmissionPublisher<Integer> p = basicPublisher();
313     Throwable ex = new SPException();
314     p.closeExceptionally(ex);
315     assertTrue(p.isClosed());
316     assertSame(p.getClosedException(), ex);
317     p.subscribe(s);
318     s.awaitError();
319 jsr166 1.6 assertEquals(0, s.nexts);
320     assertEquals(1, s.errors);
321 dl 1.1 }
322    
323     /**
324     * Upon attempted resubscription, the subscriber's onError is
325     * called and the subscription is cancelled.
326     */
327     public void testSubscribe4() {
328     TestSubscriber s = new TestSubscriber();
329     SubmissionPublisher<Integer> p = basicPublisher();
330     p.subscribe(s);
331     assertTrue(p.hasSubscribers());
332 jsr166 1.6 assertEquals(1, p.getNumberOfSubscribers());
333 dl 1.1 assertTrue(p.getSubscribers().contains(s));
334     assertTrue(p.isSubscribed(s));
335     s.awaitSubscribe();
336     assertNotNull(s.sn);
337 jsr166 1.6 assertEquals(0, s.nexts);
338     assertEquals(0, s.errors);
339     assertEquals(0, s.completes);
340 dl 1.1 p.subscribe(s);
341     s.awaitError();
342 jsr166 1.6 assertEquals(0, s.nexts);
343     assertEquals(1, s.errors);
344 dl 1.1 assertFalse(p.isSubscribed(s));
345     }
346    
347     /**
348     * An exception thrown in onSubscribe causes onError
349     */
350     public void testSubscribe5() {
351     TestSubscriber s = new TestSubscriber();
352     SubmissionPublisher<Integer> p = basicPublisher();
353     s.throwOnCall = true;
354     try {
355     p.subscribe(s);
356 jsr166 1.2 } catch (Exception ok) {}
357 dl 1.1 s.awaitError();
358 jsr166 1.6 assertEquals(0, s.nexts);
359     assertEquals(1, s.errors);
360     assertEquals(0, s.completes);
361 dl 1.1 }
362    
363     /**
364 jsr166 1.8 * subscribe(null) throws NPE
365 dl 1.1 */
366     public void testSubscribe6() {
367     SubmissionPublisher<Integer> p = basicPublisher();
368     try {
369     p.subscribe(null);
370 jsr166 1.2 shouldThrow();
371     } catch (NullPointerException success) {}
372 dl 1.1 checkInitialState(p);
373     }
374    
375     /**
376     * Closing a publisher causes onComplete to subscribers
377     */
378     public void testCloseCompletes() {
379     SubmissionPublisher<Integer> p = basicPublisher();
380     TestSubscriber s1 = new TestSubscriber();
381     TestSubscriber s2 = new TestSubscriber();
382     p.subscribe(s1);
383     p.subscribe(s2);
384     p.submit(1);
385     p.close();
386     assertTrue(p.isClosed());
387     assertNull(p.getClosedException());
388     s1.awaitComplete();
389 jsr166 1.6 assertEquals(1, s1.nexts);
390     assertEquals(1, s1.completes);
391 dl 1.1 s2.awaitComplete();
392 jsr166 1.6 assertEquals(1, s2.nexts);
393     assertEquals(1, s2.completes);
394 dl 1.1 }
395    
396     /**
397     * Closing a publisher exceptionally causes onError to subscribers
398     */
399     public void testCloseExceptionallyError() {
400     SubmissionPublisher<Integer> p = basicPublisher();
401     TestSubscriber s1 = new TestSubscriber();
402     TestSubscriber s2 = new TestSubscriber();
403     p.subscribe(s1);
404     p.subscribe(s2);
405     p.submit(1);
406     p.closeExceptionally(new SPException());
407     assertTrue(p.isClosed());
408     s1.awaitError();
409     assertTrue(s1.nexts <= 1);
410 jsr166 1.6 assertEquals(1, s1.errors);
411 dl 1.1 s2.awaitError();
412     assertTrue(s2.nexts <= 1);
413 jsr166 1.6 assertEquals(1, s2.errors);
414 dl 1.1 }
415    
416     /**
417     * Cancelling a subscription eventually causes no more onNexts to be issued
418     */
419     public void testCancel() {
420     SubmissionPublisher<Integer> p = basicPublisher();
421     TestSubscriber s1 = new TestSubscriber();
422     TestSubscriber s2 = new TestSubscriber();
423     p.subscribe(s1);
424     p.subscribe(s2);
425     s1.awaitSubscribe();
426     p.submit(1);
427     s1.sn.cancel();
428     for (int i = 2; i <= 20; ++i)
429     p.submit(i);
430     p.close();
431     s2.awaitComplete();
432 jsr166 1.6 assertEquals(20, s2.nexts);
433     assertEquals(1, s2.completes);
434 dl 1.1 assertTrue(s1.nexts < 20);
435     assertFalse(p.isSubscribed(s1));
436     }
437    
438     /**
439     * Throwing an exception in onNext causes onError
440     */
441     public void testThrowOnNext() {
442     SubmissionPublisher<Integer> p = basicPublisher();
443     TestSubscriber s1 = new TestSubscriber();
444     TestSubscriber s2 = new TestSubscriber();
445     p.subscribe(s1);
446     p.subscribe(s2);
447     s1.awaitSubscribe();
448     p.submit(1);
449     s1.throwOnCall = true;
450     p.submit(2);
451     p.close();
452     s2.awaitComplete();
453 jsr166 1.6 assertEquals(2, s2.nexts);
454 dl 1.1 s1.awaitComplete();
455 jsr166 1.6 assertEquals(1, s1.errors);
456 dl 1.1 }
457    
458     /**
459 jsr166 1.8 * If a handler is supplied in constructor, it is invoked when
460 dl 1.1 * subscriber throws an exception in onNext
461     */
462     public void testThrowOnNextHandler() {
463     AtomicInteger calls = new AtomicInteger();
464     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>
465     (basicExecutor, 8,
466     (s, e) -> calls.getAndIncrement());
467     TestSubscriber s1 = new TestSubscriber();
468     TestSubscriber s2 = new TestSubscriber();
469     p.subscribe(s1);
470     p.subscribe(s2);
471     s1.awaitSubscribe();
472     p.submit(1);
473     s1.throwOnCall = true;
474     p.submit(2);
475     p.close();
476     s2.awaitComplete();
477 jsr166 1.6 assertEquals(2, s2.nexts);
478     assertEquals(1, s2.completes);
479 dl 1.1 s1.awaitError();
480 jsr166 1.6 assertEquals(1, s1.errors);
481     assertEquals(1, calls.get());
482 dl 1.1 }
483    
484     /**
485     * onNext items are issued in the same order to each subscriber
486     */
487     public void testOrder() {
488     SubmissionPublisher<Integer> p = basicPublisher();
489     TestSubscriber s1 = new TestSubscriber();
490     TestSubscriber s2 = new TestSubscriber();
491     p.subscribe(s1);
492     p.subscribe(s2);
493     for (int i = 1; i <= 20; ++i)
494     p.submit(i);
495     p.close();
496     s2.awaitComplete();
497     s1.awaitComplete();
498 jsr166 1.6 assertEquals(20, s2.nexts);
499     assertEquals(1, s2.completes);
500     assertEquals(20, s1.nexts);
501     assertEquals(1, s1.completes);
502 dl 1.1 }
503    
504     /**
505     * onNext is issued only if requested
506     */
507     public void testRequest1() {
508     SubmissionPublisher<Integer> p = basicPublisher();
509     TestSubscriber s1 = new TestSubscriber();
510     s1.request = false;
511     p.subscribe(s1);
512     s1.awaitSubscribe();
513     assertTrue(p.estimateMinimumDemand() == 0);
514     TestSubscriber s2 = new TestSubscriber();
515     p.subscribe(s2);
516     p.submit(1);
517     p.submit(2);
518     s2.awaitNext(1);
519 jsr166 1.6 assertEquals(0, s1.nexts);
520 dl 1.1 s1.sn.request(3);
521     p.submit(3);
522     p.close();
523     s2.awaitComplete();
524 jsr166 1.6 assertEquals(3, s2.nexts);
525     assertEquals(1, s2.completes);
526 dl 1.1 s1.awaitComplete();
527     assertTrue(s1.nexts > 0);
528 jsr166 1.6 assertEquals(1, s1.completes);
529 dl 1.1 }
530    
531     /**
532     * onNext is not issued when requests become zero
533     */
534     public void testRequest2() {
535     SubmissionPublisher<Integer> p = basicPublisher();
536     TestSubscriber s1 = new TestSubscriber();
537     TestSubscriber s2 = new TestSubscriber();
538     p.subscribe(s1);
539     p.subscribe(s2);
540     s2.awaitSubscribe();
541     s1.awaitSubscribe();
542     s1.request = false;
543     p.submit(1);
544     p.submit(2);
545     p.close();
546     s2.awaitComplete();
547 jsr166 1.6 assertEquals(2, s2.nexts);
548     assertEquals(1, s2.completes);
549 dl 1.1 s1.awaitNext(1);
550 jsr166 1.6 assertEquals(1, s1.nexts);
551 dl 1.1 }
552    
553     /**
554     * Negative request causes error
555     */
556     public void testRequest3() {
557     SubmissionPublisher<Integer> p = basicPublisher();
558     TestSubscriber s1 = new TestSubscriber();
559     TestSubscriber s2 = new TestSubscriber();
560     p.subscribe(s1);
561     p.subscribe(s2);
562     s2.awaitSubscribe();
563     s1.awaitSubscribe();
564     s1.sn.request(-1L);
565     p.submit(1);
566     p.submit(2);
567     p.close();
568     s2.awaitComplete();
569 jsr166 1.6 assertEquals(2, s2.nexts);
570     assertEquals(1, s2.completes);
571 dl 1.1 s1.awaitError();
572 jsr166 1.6 assertEquals(1, s1.errors);
573 dl 1.1 assertTrue(s1.lastError instanceof IllegalArgumentException);
574     }
575    
576     /**
577     * estimateMinimumDemand reports 0 until request, nonzero after
578     * request, and zero again after delivery
579     */
580     public void testEstimateMinimumDemand() {
581     TestSubscriber s = new TestSubscriber();
582     SubmissionPublisher<Integer> p = basicPublisher();
583     s.request = false;
584     p.subscribe(s);
585     s.awaitSubscribe();
586 jsr166 1.6 assertEquals(0, p.estimateMinimumDemand());
587 dl 1.1 s.sn.request(1);
588 jsr166 1.6 assertEquals(1, p.estimateMinimumDemand());
589 dl 1.1 p.submit(1);
590     s.awaitNext(1);
591 jsr166 1.6 assertEquals(0, p.estimateMinimumDemand());
592 dl 1.1 }
593    
594     /**
595 jsr166 1.3 * submit to a publisher with no subscribers returns lag 0
596 dl 1.1 */
597     public void testEmptySubmit() {
598     SubmissionPublisher<Integer> p = basicPublisher();
599 jsr166 1.6 assertEquals(0, p.submit(1));
600 dl 1.1 }
601    
602     /**
603 jsr166 1.3 * submit(null) throws NPE
604 dl 1.1 */
605     public void testNullSubmit() {
606     SubmissionPublisher<Integer> p = basicPublisher();
607     try {
608     p.submit(null);
609 jsr166 1.2 shouldThrow();
610     } catch (NullPointerException success) {}
611 dl 1.1 }
612    
613     /**
614 jsr166 1.3 * submit returns number of lagged items, compatible with result
615 dl 1.1 * of estimateMaximumLag.
616     */
617     public void testLaggedSubmit() {
618     SubmissionPublisher<Integer> p = basicPublisher();
619     TestSubscriber s1 = new TestSubscriber();
620     s1.request = false;
621     TestSubscriber s2 = new TestSubscriber();
622     s2.request = false;
623     p.subscribe(s1);
624     p.subscribe(s2);
625     s2.awaitSubscribe();
626     s1.awaitSubscribe();
627 jsr166 1.6 assertEquals(1, p.submit(1));
628 dl 1.1 assertTrue(p.estimateMaximumLag() >= 1);
629     assertTrue(p.submit(2) >= 2);
630     assertTrue(p.estimateMaximumLag() >= 2);
631     s1.sn.request(4);
632     assertTrue(p.submit(3) >= 3);
633     assertTrue(p.estimateMaximumLag() >= 3);
634     s2.sn.request(4);
635     p.submit(4);
636     p.close();
637     s2.awaitComplete();
638 jsr166 1.6 assertEquals(4, s2.nexts);
639 dl 1.1 s1.awaitComplete();
640 jsr166 1.6 assertEquals(4, s2.nexts);
641 dl 1.1 }
642    
643     /**
644     * submit eventually issues requested items when buffer capacity is 1
645     */
646     public void testCap1Submit() {
647     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
648     basicExecutor, 1);
649     TestSubscriber s1 = new TestSubscriber();
650     TestSubscriber s2 = new TestSubscriber();
651     p.subscribe(s1);
652     p.subscribe(s2);
653     for (int i = 1; i <= 20; ++i) {
654     assertTrue(p.estimateMinimumDemand() <= 1);
655     assertTrue(p.submit(i) >= 0);
656     }
657     p.close();
658     s2.awaitComplete();
659     s1.awaitComplete();
660 jsr166 1.6 assertEquals(20, s2.nexts);
661     assertEquals(1, s2.completes);
662     assertEquals(20, s1.nexts);
663     assertEquals(1, s1.completes);
664 dl 1.1 }
665 jsr166 1.2
666 dl 1.1 static boolean noopHandle(AtomicInteger count) {
667     count.getAndIncrement();
668     return false;
669     }
670    
671     static boolean reqHandle(AtomicInteger count, Subscriber s) {
672     count.getAndIncrement();
673     ((TestSubscriber)s).sn.request(Long.MAX_VALUE);
674     return true;
675     }
676    
677     /**
678 jsr166 1.3 * offer to a publisher with no subscribers returns lag 0
679 dl 1.1 */
680     public void testEmptyOffer() {
681     SubmissionPublisher<Integer> p = basicPublisher();
682 jsr166 1.3 assertEquals(0, p.offer(1, null));
683 dl 1.1 }
684    
685     /**
686 jsr166 1.3 * offer(null) throws NPE
687 dl 1.1 */
688     public void testNullOffer() {
689     SubmissionPublisher<Integer> p = basicPublisher();
690     try {
691     p.offer(null, null);
692 jsr166 1.2 shouldThrow();
693     } catch (NullPointerException success) {}
694 dl 1.1 }
695    
696     /**
697 jsr166 1.3 * offer returns number of lagged items if not saturated
698 dl 1.1 */
699     public void testLaggedOffer() {
700     SubmissionPublisher<Integer> p = basicPublisher();
701     TestSubscriber s1 = new TestSubscriber();
702     s1.request = false;
703     TestSubscriber s2 = new TestSubscriber();
704     s2.request = false;
705     p.subscribe(s1);
706     p.subscribe(s2);
707     s2.awaitSubscribe();
708     s1.awaitSubscribe();
709     assertTrue(p.offer(1, null) >= 1);
710     assertTrue(p.offer(2, null) >= 2);
711     s1.sn.request(4);
712     assertTrue(p.offer(3, null) >= 3);
713     s2.sn.request(4);
714     p.offer(4, null);
715     p.close();
716     s2.awaitComplete();
717 jsr166 1.6 assertEquals(4, s2.nexts);
718 dl 1.1 s1.awaitComplete();
719 jsr166 1.6 assertEquals(4, s2.nexts);
720 dl 1.1 }
721    
722     /**
723 jsr166 1.3 * offer reports drops if saturated
724 dl 1.1 */
725     public void testDroppedOffer() {
726     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
727     basicExecutor, 4);
728     TestSubscriber s1 = new TestSubscriber();
729     s1.request = false;
730     TestSubscriber s2 = new TestSubscriber();
731     s2.request = false;
732     p.subscribe(s1);
733     p.subscribe(s2);
734     s2.awaitSubscribe();
735     s1.awaitSubscribe();
736     for (int i = 1; i <= 4; ++i)
737     assertTrue(p.offer(i, null) >= 0);
738     p.offer(5, null);
739     assertTrue(p.offer(6, null) < 0);
740     s1.sn.request(64);
741     assertTrue(p.offer(7, null) < 0);
742     s2.sn.request(64);
743     p.close();
744     s2.awaitComplete();
745     assertTrue(s2.nexts >= 4);
746     s1.awaitComplete();
747     assertTrue(s1.nexts >= 4);
748     }
749    
750     /**
751 jsr166 1.3 * offer invokes drop handler if saturated
752 dl 1.1 */
753     public void testHandledDroppedOffer() {
754     AtomicInteger calls = new AtomicInteger();
755     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
756     basicExecutor, 4);
757     TestSubscriber s1 = new TestSubscriber();
758     s1.request = false;
759     TestSubscriber s2 = new TestSubscriber();
760     s2.request = false;
761     p.subscribe(s1);
762     p.subscribe(s2);
763     s2.awaitSubscribe();
764     s1.awaitSubscribe();
765     for (int i = 1; i <= 4; ++i)
766     assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
767     p.offer(4, (s, x) -> noopHandle(calls));
768     assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
769     s1.sn.request(64);
770     assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
771     s2.sn.request(64);
772     p.close();
773     s2.awaitComplete();
774     s1.awaitComplete();
775     assertTrue(calls.get() >= 4);
776     }
777    
778     /**
779 jsr166 1.3 * offer succeeds if drop handler forces request
780 dl 1.1 */
781     public void testRecoveredHandledDroppedOffer() {
782     AtomicInteger calls = new AtomicInteger();
783     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
784     basicExecutor, 4);
785     TestSubscriber s1 = new TestSubscriber();
786     s1.request = false;
787     TestSubscriber s2 = new TestSubscriber();
788     s2.request = false;
789     p.subscribe(s1);
790     p.subscribe(s2);
791     s2.awaitSubscribe();
792     s1.awaitSubscribe();
793     int n = 0;
794     for (int i = 1; i <= 8; ++i) {
795     int d = p.offer(i, (s, x) -> reqHandle(calls, s));
796     n = n + 2 + (d < 0 ? d : 0);
797     }
798     p.close();
799     s2.awaitComplete();
800     s1.awaitComplete();
801 jsr166 1.6 assertEquals(n, s1.nexts + s2.nexts);
802 dl 1.1 assertTrue(calls.get() >= 2);
803     }
804    
805     /**
806 jsr166 1.4 * Timed offer to a publisher with no subscribers returns lag 0
807 dl 1.1 */
808     public void testEmptyTimedOffer() {
809     SubmissionPublisher<Integer> p = basicPublisher();
810 jsr166 1.7 long startTime = System.nanoTime();
811 jsr166 1.4 assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null));
812 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
813 dl 1.1 }
814    
815     /**
816 jsr166 1.3 * Timed offer with null item or TimeUnit throws NPE
817 dl 1.1 */
818     public void testNullTimedOffer() {
819     SubmissionPublisher<Integer> p = basicPublisher();
820 jsr166 1.7 long startTime = System.nanoTime();
821 dl 1.1 try {
822 jsr166 1.7 p.offer(null, LONG_DELAY_MS, MILLISECONDS, null);
823 jsr166 1.2 shouldThrow();
824     } catch (NullPointerException success) {}
825 dl 1.1 try {
826 jsr166 1.7 p.offer(1, LONG_DELAY_MS, null, null);
827 jsr166 1.2 shouldThrow();
828     } catch (NullPointerException success) {}
829 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
830 dl 1.1 }
831    
832     /**
833 jsr166 1.3 * Timed offer returns number of lagged items if not saturated
834 dl 1.1 */
835     public void testLaggedTimedOffer() {
836     SubmissionPublisher<Integer> p = basicPublisher();
837     TestSubscriber s1 = new TestSubscriber();
838     s1.request = false;
839     TestSubscriber s2 = new TestSubscriber();
840     s2.request = false;
841     p.subscribe(s1);
842     p.subscribe(s2);
843     s2.awaitSubscribe();
844     s1.awaitSubscribe();
845 jsr166 1.7 long startTime = System.nanoTime();
846     assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
847     assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
848 dl 1.1 s1.sn.request(4);
849 jsr166 1.7 assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
850 dl 1.1 s2.sn.request(4);
851 jsr166 1.7 p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
852 dl 1.1 p.close();
853     s2.awaitComplete();
854 jsr166 1.6 assertEquals(4, s2.nexts);
855 dl 1.1 s1.awaitComplete();
856 jsr166 1.6 assertEquals(4, s2.nexts);
857 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
858 dl 1.1 }
859    
860     /**
861 jsr166 1.3 * Timed offer reports drops if saturated
862 dl 1.1 */
863     public void testDroppedTimedOffer() {
864     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
865     basicExecutor, 4);
866     TestSubscriber s1 = new TestSubscriber();
867     s1.request = false;
868     TestSubscriber s2 = new TestSubscriber();
869     s2.request = false;
870     p.subscribe(s1);
871     p.subscribe(s2);
872     s2.awaitSubscribe();
873     s1.awaitSubscribe();
874 dl 1.9 long delay = timeoutMillis();
875 dl 1.1 for (int i = 1; i <= 4; ++i)
876 dl 1.9 assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0);
877     long startTime = System.nanoTime();
878     assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0);
879 dl 1.1 s1.sn.request(64);
880 dl 1.9 assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0);
881     // 2 * delay should elapse but check only 1 * delay to allow timer slop
882     assertTrue(millisElapsedSince(startTime) >= delay);
883 dl 1.1 s2.sn.request(64);
884     p.close();
885     s2.awaitComplete();
886     assertTrue(s2.nexts >= 2);
887     s1.awaitComplete();
888     assertTrue(s1.nexts >= 2);
889     }
890    
891     /**
892 jsr166 1.3 * Timed offer invokes drop handler if saturated
893 dl 1.1 */
894     public void testHandledDroppedTimedOffer() {
895     AtomicInteger calls = new AtomicInteger();
896     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
897     basicExecutor, 4);
898     TestSubscriber s1 = new TestSubscriber();
899     s1.request = false;
900     TestSubscriber s2 = new TestSubscriber();
901     s2.request = false;
902     p.subscribe(s1);
903     p.subscribe(s2);
904     s2.awaitSubscribe();
905     s1.awaitSubscribe();
906 dl 1.9 long delay = timeoutMillis();
907 dl 1.1 for (int i = 1; i <= 4; ++i)
908 dl 1.9 assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
909     long startTime = System.nanoTime();
910     assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
911 dl 1.1 s1.sn.request(64);
912 dl 1.9 assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
913     assertTrue(millisElapsedSince(startTime) >= delay);
914 dl 1.1 s2.sn.request(64);
915     p.close();
916     s2.awaitComplete();
917     s1.awaitComplete();
918     assertTrue(calls.get() >= 2);
919     }
920    
921     /**
922 jsr166 1.3 * Timed offer succeeds if drop handler forces request
923 dl 1.1 */
924     public void testRecoveredHandledDroppedTimedOffer() {
925     AtomicInteger calls = new AtomicInteger();
926     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
927     basicExecutor, 4);
928     TestSubscriber s1 = new TestSubscriber();
929     s1.request = false;
930     TestSubscriber s2 = new TestSubscriber();
931     s2.request = false;
932     p.subscribe(s1);
933     p.subscribe(s2);
934     s2.awaitSubscribe();
935     s1.awaitSubscribe();
936     int n = 0;
937 dl 1.9 long delay = timeoutMillis();
938     long startTime = System.nanoTime();
939     for (int i = 1; i <= 6; ++i) {
940     int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
941 dl 1.1 n = n + 2 + (d < 0 ? d : 0);
942     }
943 dl 1.9 assertTrue(millisElapsedSince(startTime) >= delay);
944 dl 1.1 p.close();
945     s2.awaitComplete();
946     s1.awaitComplete();
947 jsr166 1.6 assertEquals(n, s1.nexts + s2.nexts);
948 dl 1.1 assertTrue(calls.get() >= 2);
949     }
950    
951     }