ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SubmissionPublisherTest.java
Revision: 1.1
Committed: Mon Sep 7 17:14:06 2015 UTC (8 years, 7 months ago) by dl
Branch: MAIN
Log Message:
Add SubmissionPublisher tests

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea and Martin Buchholz with assistance from
3     * members of JCP JSR-166 Expert Group and released to the public
4     * domain, as explained at
5     * http://creativecommons.org/publicdomain/zero/1.0/
6     */
7    
8     import static java.util.concurrent.TimeUnit.MILLISECONDS;
9     import static java.util.concurrent.TimeUnit.SECONDS;
10    
11     import java.util.concurrent.Executor;
12     import java.util.concurrent.Executors;
13     import java.util.concurrent.Flow;
14     import static java.util.concurrent.Flow.Publisher;
15     import static java.util.concurrent.Flow.Subscriber;
16     import static java.util.concurrent.Flow.Subscription;
17     import java.util.concurrent.LinkedBlockingQueue;
18     import java.util.concurrent.ForkJoinPool;
19     import java.util.concurrent.SubmissionPublisher;
20     import java.util.concurrent.ThreadFactory;
21     import java.util.concurrent.ThreadPoolExecutor;
22     import java.util.concurrent.TimeUnit;
23     import java.util.concurrent.atomic.AtomicInteger;
24     import java.util.function.BiConsumer;
25     import java.util.function.BiPredicate;
26     import java.util.function.BiFunction;
27    
28     import junit.framework.Test;
29     import junit.framework.TestSuite;
30    
31     public class SubmissionPublisherTest extends JSR166TestCase {
32    
33     public static void main(String[] args) {
34     main(suite(), args);
35     }
36     public static Test suite() {
37     return new TestSuite(SubmissionPublisherTest.class);
38     }
39    
40     // Factory for single thread pool in case commonPool parallelism is zero
41     static final class DaemonThreadFactory implements ThreadFactory {
42     public Thread newThread(Runnable r) {
43     Thread t = new Thread(r);
44     t.setDaemon(true);
45     return t;
46     }
47     }
48    
49     static final Executor basicExecutor =
50     (ForkJoinPool.getCommonPoolParallelism() > 0) ?
51     ForkJoinPool.commonPool() :
52     new ThreadPoolExecutor(1, 1, 60, SECONDS,
53     new LinkedBlockingQueue<Runnable>(),
54     new DaemonThreadFactory());
55    
56     static SubmissionPublisher<Integer> basicPublisher() {
57     return new SubmissionPublisher<Integer>(basicExecutor,
58     Flow.defaultBufferSize());
59     }
60    
61     static class SPException extends RuntimeException {}
62    
63     class TestSubscriber implements Subscriber<Integer> {
64     volatile Subscription sn;
65     int last; // Requires that onNexts are in numeric order
66     volatile int nexts;
67     volatile int errors;
68     volatile int completes;
69     volatile boolean throwOnCall = false;
70     volatile boolean request = true;
71     volatile Throwable lastError;
72    
73     public synchronized void onSubscribe(Subscription s) {
74     threadAssertTrue(sn == null);
75     sn = s;
76     notifyAll();
77     if (throwOnCall)
78     throw new SPException();
79     if (request)
80     sn.request(1L);
81     }
82     public synchronized void onNext(Integer t) {
83     ++nexts;
84     notifyAll();
85     int current = t.intValue();
86     threadAssertTrue(current >= last);
87     last = current;
88     if (request)
89     sn.request(1L);
90     if (throwOnCall)
91     throw new SPException();
92     }
93     public synchronized void onError(Throwable t) {
94     threadAssertTrue(completes == 0);
95     threadAssertTrue(errors == 0);
96     lastError = t;
97     ++errors;
98     notifyAll();
99     }
100     public synchronized void onComplete() {
101     threadAssertTrue(completes == 0);
102     ++completes;
103     notifyAll();
104     }
105    
106     synchronized void awaitSubscribe() {
107     while (sn == null) {
108     try {
109     wait();
110     } catch (Exception ex) {
111     threadUnexpectedException(ex);
112     break;
113     }
114     }
115     }
116     synchronized void awaitNext(int n) {
117     while (nexts < n) {
118     try {
119     wait();
120     } catch (Exception ex) {
121     threadUnexpectedException(ex);
122     break;
123     }
124     }
125     }
126     synchronized void awaitComplete() {
127     while (completes == 0 && errors == 0) {
128     try {
129     wait();
130     } catch (Exception ex) {
131     threadUnexpectedException(ex);
132     break;
133     }
134     }
135     }
136     synchronized void awaitError() {
137     while (errors == 0) {
138     try {
139     wait();
140     } catch (Exception ex) {
141     threadUnexpectedException(ex);
142     break;
143     }
144     }
145     }
146    
147     }
148    
149     /**
150     * A new SubmissionPublisher has no subscribers, a non-null
151     * executor, a power-of-two capacity, is not closed, and reports
152     * zero demand and lag
153     */
154     void checkInitialState(SubmissionPublisher<?> p) {
155     assertFalse(p.hasSubscribers());
156     assertEquals(p.getNumberOfSubscribers(), 0);
157     assertTrue(p.getSubscribers().isEmpty());
158     assertFalse(p.isClosed());
159     assertNull(p.getClosedException());
160     int n = p.getMaxBufferCapacity();
161     assertTrue((n & (n - 1)) == 0); // power of two
162     assertNotNull(p.getExecutor());
163     assertEquals(p.estimateMinimumDemand(), 0);
164     assertEquals(p.estimateMaximumLag(), 0);
165     }
166    
167     /**
168     * A default-constructed SubmissionPublisher has no subscribers,
169     * is not closed, has default buffer size, and uses the
170     * ForkJoinPool.commonPool executor
171     */
172     public void testConstructor1() {
173     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>();
174     checkInitialState(p);
175     assertSame(p.getExecutor(), ForkJoinPool.commonPool());
176     assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
177     }
178    
179     /**
180     * A new SubmissionPublisher has no subscribers, is not closed,
181     * has the given buffer size, and uses the given executor
182     */
183     public void testConstructor2() {
184     Executor e = Executors.newFixedThreadPool(1);
185     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(e, 8);
186     checkInitialState(p);
187     assertSame(p.getExecutor(), e);
188     assertEquals(p.getMaxBufferCapacity(), 8);
189     }
190    
191     /**
192     * A null Executor argument to SubmissionPublisher constructor throws NPE
193     */
194     public void testConstructor3() {
195     try {
196     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(null, 8);
197     shouldThrow();
198     } catch (NullPointerException success) {
199     }
200     }
201    
202     /**
203     * A negative capacity argument to SubmissionPublisher constructor
204     * throws IAE
205     */
206     public void testConstructor4() {
207     Executor e = Executors.newFixedThreadPool(1);
208     try {
209     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(e, -1);
210     shouldThrow();
211     } catch (IllegalArgumentException success) {
212     }
213     }
214    
215     /**
216     * A closed publisher reports isClosed with no closedException and
217     * throws ISE upon attempted submission; a subsequent close or
218     * closeExceptionally has no additional effect.
219     */
220     public void testClose() {
221     SubmissionPublisher<Integer> p = basicPublisher();
222     checkInitialState(p);
223     p.close();
224     assertTrue(p.isClosed());
225     assertNull(p.getClosedException());
226     try {
227     p.submit(1);
228     shouldThrow();
229     }
230     catch(IllegalStateException success) {
231     }
232     Throwable ex = new SPException();
233     p.closeExceptionally(ex);
234     assertTrue(p.isClosed());
235     assertNull(p.getClosedException());
236     }
237    
238     /**
239     * A publisher closedExceptionally reports isClosed with the
240     * closedException and throws ISE upon attempted submission; a
241     * subsequent close or closeExceptionally has no additional
242     * effect.
243     */
244     public void testCloseExceptionally() {
245     SubmissionPublisher<Integer> p = basicPublisher();
246     checkInitialState(p);
247     Throwable ex = new SPException();
248     p.closeExceptionally(ex);
249     assertTrue(p.isClosed());
250     assertSame(p.getClosedException(), ex);
251     try {
252     p.submit(1);
253     shouldThrow();
254     }
255     catch(IllegalStateException success) {
256     }
257     p.close();
258     assertTrue(p.isClosed());
259     assertSame(p.getClosedException(), ex);
260     }
261    
262     /**
263     * Upon subscription, the subscriber's onSubscribe is called, no
264     * other Subscriber methods are invoked, the publisher
265     * hasSubscribers, isSubscribed is true, and existing
266     * subscriptions are unaffected.
267     */
268     public void testSubscribe1() {
269     TestSubscriber s = new TestSubscriber();
270     SubmissionPublisher<Integer> p = basicPublisher();
271     p.subscribe(s);
272     assertTrue(p.hasSubscribers());
273     assertEquals(p.getNumberOfSubscribers(), 1);
274     assertTrue(p.getSubscribers().contains(s));
275     assertTrue(p.isSubscribed(s));
276     s.awaitSubscribe();
277     assertNotNull(s.sn);
278     assertEquals(s.nexts, 0);
279     assertEquals(s.errors, 0);
280     assertEquals(s.completes, 0);
281     TestSubscriber s2 = new TestSubscriber();
282     p.subscribe(s2);
283     assertTrue(p.hasSubscribers());
284     assertEquals(p.getNumberOfSubscribers(), 2);
285     assertTrue(p.getSubscribers().contains(s));
286     assertTrue(p.getSubscribers().contains(s2));
287     assertTrue(p.isSubscribed(s));
288     assertTrue(p.isSubscribed(s2));
289     s2.awaitSubscribe();
290     assertNotNull(s2.sn);
291     assertEquals(s2.nexts, 0);
292     assertEquals(s2.errors, 0);
293     assertEquals(s2.completes, 0);
294     }
295    
296     /**
297     * If closed, upon subscription, the subscriber's onComplete
298     * method is invoked
299     */
300     public void testSubscribe2() {
301     TestSubscriber s = new TestSubscriber();
302     SubmissionPublisher<Integer> p = basicPublisher();
303     p.close();
304     p.subscribe(s);
305     s.awaitComplete();
306     assertEquals(s.nexts, 0);
307     assertEquals(s.errors, 0);
308     assertEquals(s.completes, 1);
309     }
310    
311     /**
312     * If closedExceptionally, upon subscription, the subscriber's
313     * onError method is invoked
314     */
315     public void testSubscribe3() {
316     TestSubscriber s = new TestSubscriber();
317     SubmissionPublisher<Integer> p = basicPublisher();
318     Throwable ex = new SPException();
319     p.closeExceptionally(ex);
320     assertTrue(p.isClosed());
321     assertSame(p.getClosedException(), ex);
322     p.subscribe(s);
323     s.awaitError();
324     assertEquals(s.nexts, 0);
325     assertEquals(s.errors, 1);
326     }
327    
328     /**
329     * Upon attempted resubscription, the subscriber's onError is
330     * called and the subscription is cancelled.
331     */
332     public void testSubscribe4() {
333     TestSubscriber s = new TestSubscriber();
334     SubmissionPublisher<Integer> p = basicPublisher();
335     p.subscribe(s);
336     assertTrue(p.hasSubscribers());
337     assertEquals(p.getNumberOfSubscribers(), 1);
338     assertTrue(p.getSubscribers().contains(s));
339     assertTrue(p.isSubscribed(s));
340     s.awaitSubscribe();
341     assertNotNull(s.sn);
342     assertEquals(s.nexts, 0);
343     assertEquals(s.errors, 0);
344     assertEquals(s.completes, 0);
345     p.subscribe(s);
346     s.awaitError();
347     assertEquals(s.nexts, 0);
348     assertEquals(s.errors, 1);
349     assertFalse(p.isSubscribed(s));
350     }
351    
352     /**
353     * An exception thrown in onSubscribe causes onError
354     */
355     public void testSubscribe5() {
356     TestSubscriber s = new TestSubscriber();
357     SubmissionPublisher<Integer> p = basicPublisher();
358     s.throwOnCall = true;
359     try {
360     p.subscribe(s);
361     } catch(Exception ok) {
362     }
363     s.awaitError();
364     assertEquals(s.nexts, 0);
365     assertEquals(s.errors, 1);
366     assertEquals(s.completes, 0);
367     }
368    
369     /**
370     * subscribe(null) thows NPE
371     */
372     public void testSubscribe6() {
373     SubmissionPublisher<Integer> p = basicPublisher();
374     try {
375     p.subscribe(null);
376     } catch(NullPointerException success) {
377     }
378     checkInitialState(p);
379     }
380    
381     /**
382     * Closing a publisher causes onComplete to subscribers
383     */
384     public void testCloseCompletes() {
385     SubmissionPublisher<Integer> p = basicPublisher();
386     TestSubscriber s1 = new TestSubscriber();
387     TestSubscriber s2 = new TestSubscriber();
388     p.subscribe(s1);
389     p.subscribe(s2);
390     p.submit(1);
391     p.close();
392     assertTrue(p.isClosed());
393     assertNull(p.getClosedException());
394     s1.awaitComplete();
395     assertEquals(s1.nexts, 1);
396     assertEquals(s1.completes, 1);
397     s2.awaitComplete();
398     assertEquals(s2.nexts, 1);
399     assertEquals(s2.completes, 1);
400     }
401    
402     /**
403     * Closing a publisher exceptionally causes onError to subscribers
404     */
405     public void testCloseExceptionallyError() {
406     SubmissionPublisher<Integer> p = basicPublisher();
407     TestSubscriber s1 = new TestSubscriber();
408     TestSubscriber s2 = new TestSubscriber();
409     p.subscribe(s1);
410     p.subscribe(s2);
411     p.submit(1);
412     p.closeExceptionally(new SPException());
413     assertTrue(p.isClosed());
414     s1.awaitError();
415     assertTrue(s1.nexts <= 1);
416     assertEquals(s1.errors, 1);
417     s2.awaitError();
418     assertTrue(s2.nexts <= 1);
419     assertEquals(s2.errors, 1);
420     }
421    
422     /**
423     * Cancelling a subscription eventually causes no more onNexts to be issued
424     */
425     public void testCancel() {
426     SubmissionPublisher<Integer> p = basicPublisher();
427     TestSubscriber s1 = new TestSubscriber();
428     TestSubscriber s2 = new TestSubscriber();
429     p.subscribe(s1);
430     p.subscribe(s2);
431     s1.awaitSubscribe();
432     p.submit(1);
433     s1.sn.cancel();
434     for (int i = 2; i <= 20; ++i)
435     p.submit(i);
436     p.close();
437     s2.awaitComplete();
438     assertEquals(s2.nexts, 20);
439     assertEquals(s2.completes, 1);
440     assertTrue(s1.nexts < 20);
441     assertFalse(p.isSubscribed(s1));
442     }
443    
444     /**
445     * Throwing an exception in onNext causes onError
446     */
447     public void testThrowOnNext() {
448     SubmissionPublisher<Integer> p = basicPublisher();
449     TestSubscriber s1 = new TestSubscriber();
450     TestSubscriber s2 = new TestSubscriber();
451     p.subscribe(s1);
452     p.subscribe(s2);
453     s1.awaitSubscribe();
454     p.submit(1);
455     s1.throwOnCall = true;
456     p.submit(2);
457     p.close();
458     s2.awaitComplete();
459     assertEquals(s2.nexts, 2);
460     s1.awaitComplete();
461     assertEquals(s1.errors, 1);
462     }
463    
464     /**
465     * If a handler is supplied in conctructor, it is invoked when
466     * subscriber throws an exception in onNext
467     */
468     public void testThrowOnNextHandler() {
469     AtomicInteger calls = new AtomicInteger();
470     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>
471     (basicExecutor, 8,
472     (s, e) -> calls.getAndIncrement());
473     TestSubscriber s1 = new TestSubscriber();
474     TestSubscriber s2 = new TestSubscriber();
475     p.subscribe(s1);
476     p.subscribe(s2);
477     s1.awaitSubscribe();
478     p.submit(1);
479     s1.throwOnCall = true;
480     p.submit(2);
481     p.close();
482     s2.awaitComplete();
483     assertEquals(s2.nexts, 2);
484     assertEquals(s2.completes, 1);
485     s1.awaitError();
486     assertEquals(s1.errors, 1);
487     assertEquals(calls.get(), 1);
488     }
489    
490     /**
491     * onNext items are issued in the same order to each subscriber
492     */
493     public void testOrder() {
494     SubmissionPublisher<Integer> p = basicPublisher();
495     TestSubscriber s1 = new TestSubscriber();
496     TestSubscriber s2 = new TestSubscriber();
497     p.subscribe(s1);
498     p.subscribe(s2);
499     for (int i = 1; i <= 20; ++i)
500     p.submit(i);
501     p.close();
502     s2.awaitComplete();
503     s1.awaitComplete();
504     assertEquals(s2.nexts, 20);
505     assertEquals(s2.completes, 1);
506     assertEquals(s1.nexts, 20);
507     assertEquals(s1.completes, 1);
508     }
509    
510     /**
511     * onNext is issued only if requested
512     */
513     public void testRequest1() {
514     SubmissionPublisher<Integer> p = basicPublisher();
515     TestSubscriber s1 = new TestSubscriber();
516     s1.request = false;
517     p.subscribe(s1);
518     s1.awaitSubscribe();
519     assertTrue(p.estimateMinimumDemand() == 0);
520     TestSubscriber s2 = new TestSubscriber();
521     p.subscribe(s2);
522     p.submit(1);
523     p.submit(2);
524     s2.awaitNext(1);
525     assertEquals(s1.nexts, 0);
526     s1.sn.request(3);
527     p.submit(3);
528     p.close();
529     s2.awaitComplete();
530     assertEquals(s2.nexts, 3);
531     assertEquals(s2.completes, 1);
532     s1.awaitComplete();
533     assertTrue(s1.nexts > 0);
534     assertEquals(s1.completes, 1);
535     }
536    
537     /**
538     * onNext is not issued when requests become zero
539     */
540     public void testRequest2() {
541     SubmissionPublisher<Integer> p = basicPublisher();
542     TestSubscriber s1 = new TestSubscriber();
543     TestSubscriber s2 = new TestSubscriber();
544     p.subscribe(s1);
545     p.subscribe(s2);
546     s2.awaitSubscribe();
547     s1.awaitSubscribe();
548     s1.request = false;
549     p.submit(1);
550     p.submit(2);
551     p.close();
552     s2.awaitComplete();
553     assertEquals(s2.nexts, 2);
554     assertEquals(s2.completes, 1);
555     s1.awaitNext(1);
556     assertEquals(s1.nexts, 1);
557     }
558    
559     /**
560     * Negative request causes error
561     */
562     public void testRequest3() {
563     SubmissionPublisher<Integer> p = basicPublisher();
564     TestSubscriber s1 = new TestSubscriber();
565     TestSubscriber s2 = new TestSubscriber();
566     p.subscribe(s1);
567     p.subscribe(s2);
568     s2.awaitSubscribe();
569     s1.awaitSubscribe();
570     s1.sn.request(-1L);
571     p.submit(1);
572     p.submit(2);
573     p.close();
574     s2.awaitComplete();
575     assertEquals(s2.nexts, 2);
576     assertEquals(s2.completes, 1);
577     s1.awaitError();
578     assertEquals(s1.errors, 1);
579     assertTrue(s1.lastError instanceof IllegalArgumentException);
580     }
581    
582     /**
583     * estimateMinimumDemand reports 0 until request, nonzero after
584     * request, and zero again after delivery
585     */
586     public void testEstimateMinimumDemand() {
587     TestSubscriber s = new TestSubscriber();
588     SubmissionPublisher<Integer> p = basicPublisher();
589     s.request = false;
590     p.subscribe(s);
591     s.awaitSubscribe();
592     assertEquals(p.estimateMinimumDemand(), 0);
593     s.sn.request(1);
594     assertEquals(p.estimateMinimumDemand(), 1);
595     p.submit(1);
596     s.awaitNext(1);
597     assertEquals(p.estimateMinimumDemand(), 0);
598     }
599    
600     /**
601     * Submit to a publisher with no subscribers returns lag 0
602     */
603     public void testEmptySubmit() {
604     SubmissionPublisher<Integer> p = basicPublisher();
605     assertEquals(p.submit(1), 0);
606     }
607    
608     /**
609     * Submit(null) throws NPE
610     */
611     public void testNullSubmit() {
612     SubmissionPublisher<Integer> p = basicPublisher();
613     try {
614     p.submit(null);
615     } catch (NullPointerException success) {
616     }
617     }
618    
619     /**
620     * Submit returns number of lagged items, compatible with result
621     * of estimateMaximumLag.
622     */
623     public void testLaggedSubmit() {
624     SubmissionPublisher<Integer> p = basicPublisher();
625     TestSubscriber s1 = new TestSubscriber();
626     s1.request = false;
627     TestSubscriber s2 = new TestSubscriber();
628     s2.request = false;
629     p.subscribe(s1);
630     p.subscribe(s2);
631     s2.awaitSubscribe();
632     s1.awaitSubscribe();
633     assertEquals(p.submit(1), 1);
634     assertTrue(p.estimateMaximumLag() >= 1);
635     assertTrue(p.submit(2) >= 2);
636     assertTrue(p.estimateMaximumLag() >= 2);
637     s1.sn.request(4);
638     assertTrue(p.submit(3) >= 3);
639     assertTrue(p.estimateMaximumLag() >= 3);
640     s2.sn.request(4);
641     p.submit(4);
642     p.close();
643     s2.awaitComplete();
644     assertEquals(s2.nexts, 4);
645     s1.awaitComplete();
646     assertEquals(s2.nexts, 4);
647     }
648    
649     /**
650     * submit eventually issues requested items when buffer capacity is 1
651     */
652     public void testCap1Submit() {
653     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
654     basicExecutor, 1);
655     TestSubscriber s1 = new TestSubscriber();
656     TestSubscriber s2 = new TestSubscriber();
657     p.subscribe(s1);
658     p.subscribe(s2);
659     for (int i = 1; i <= 20; ++i) {
660     assertTrue(p.estimateMinimumDemand() <= 1);
661     assertTrue(p.submit(i) >= 0);
662     }
663     p.close();
664     s2.awaitComplete();
665     s1.awaitComplete();
666     assertEquals(s2.nexts, 20);
667     assertEquals(s2.completes, 1);
668     assertEquals(s1.nexts, 20);
669     assertEquals(s1.completes, 1);
670     }
671    
672     static boolean noopHandle(AtomicInteger count) {
673     count.getAndIncrement();
674     return false;
675     }
676    
677     static boolean reqHandle(AtomicInteger count, Subscriber s) {
678     count.getAndIncrement();
679     ((TestSubscriber)s).sn.request(Long.MAX_VALUE);
680     return true;
681     }
682    
683     /**
684     * Offer to a publisher with no subscribers returns lag 0
685     */
686     public void testEmptyOffer() {
687     SubmissionPublisher<Integer> p = basicPublisher();
688     assertEquals(p.offer(1, null), 0);
689     }
690    
691     /**
692     * Offer(null) throws NPE
693     */
694     public void testNullOffer() {
695     SubmissionPublisher<Integer> p = basicPublisher();
696     try {
697     p.offer(null, null);
698     } catch (NullPointerException success) {
699     }
700     }
701    
702     /**
703     * Offer returns number of lagged items if not saturated
704     */
705     public void testLaggedOffer() {
706     SubmissionPublisher<Integer> p = basicPublisher();
707     TestSubscriber s1 = new TestSubscriber();
708     s1.request = false;
709     TestSubscriber s2 = new TestSubscriber();
710     s2.request = false;
711     p.subscribe(s1);
712     p.subscribe(s2);
713     s2.awaitSubscribe();
714     s1.awaitSubscribe();
715     assertTrue(p.offer(1, null) >= 1);
716     assertTrue(p.offer(2, null) >= 2);
717     s1.sn.request(4);
718     assertTrue(p.offer(3, null) >= 3);
719     s2.sn.request(4);
720     p.offer(4, null);
721     p.close();
722     s2.awaitComplete();
723     assertEquals(s2.nexts, 4);
724     s1.awaitComplete();
725     assertEquals(s2.nexts, 4);
726     }
727    
728     /**
729     * Offer reports drops if saturated
730     */
731     public void testDroppedOffer() {
732     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
733     basicExecutor, 4);
734     TestSubscriber s1 = new TestSubscriber();
735     s1.request = false;
736     TestSubscriber s2 = new TestSubscriber();
737     s2.request = false;
738     p.subscribe(s1);
739     p.subscribe(s2);
740     s2.awaitSubscribe();
741     s1.awaitSubscribe();
742     for (int i = 1; i <= 4; ++i)
743     assertTrue(p.offer(i, null) >= 0);
744     p.offer(5, null);
745     assertTrue(p.offer(6, null) < 0);
746     s1.sn.request(64);
747     assertTrue(p.offer(7, null) < 0);
748     s2.sn.request(64);
749     p.close();
750     s2.awaitComplete();
751     assertTrue(s2.nexts >= 4);
752     s1.awaitComplete();
753     assertTrue(s1.nexts >= 4);
754     }
755    
756     /**
757     * Offer invokes drop handler if saturated
758     */
759     public void testHandledDroppedOffer() {
760     AtomicInteger calls = new AtomicInteger();
761     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
762     basicExecutor, 4);
763     TestSubscriber s1 = new TestSubscriber();
764     s1.request = false;
765     TestSubscriber s2 = new TestSubscriber();
766     s2.request = false;
767     p.subscribe(s1);
768     p.subscribe(s2);
769     s2.awaitSubscribe();
770     s1.awaitSubscribe();
771     for (int i = 1; i <= 4; ++i)
772     assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
773     p.offer(4, (s, x) -> noopHandle(calls));
774     assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
775     s1.sn.request(64);
776     assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
777     s2.sn.request(64);
778     p.close();
779     s2.awaitComplete();
780     s1.awaitComplete();
781     assertTrue(calls.get() >= 4);
782     }
783    
784    
785     /**
786     * Offer succeeds if drop handler forces request
787     */
788     public void testRecoveredHandledDroppedOffer() {
789     AtomicInteger calls = new AtomicInteger();
790     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
791     basicExecutor, 4);
792     TestSubscriber s1 = new TestSubscriber();
793     s1.request = false;
794     TestSubscriber s2 = new TestSubscriber();
795     s2.request = false;
796     p.subscribe(s1);
797     p.subscribe(s2);
798     s2.awaitSubscribe();
799     s1.awaitSubscribe();
800     int n = 0;
801     for (int i = 1; i <= 8; ++i) {
802     int d = p.offer(i, (s, x) -> reqHandle(calls, s));
803     n = n + 2 + (d < 0 ? d : 0);
804     }
805     p.close();
806     s2.awaitComplete();
807     s1.awaitComplete();
808     assertEquals(s1.nexts + s2.nexts, n);
809     assertTrue(calls.get() >= 2);
810     }
811    
812    
813     /**
814     * TimedOffer to a publisher with no subscribers returns lag 0
815     */
816     public void testEmptyTimedOffer() {
817     SubmissionPublisher<Integer> p = basicPublisher();
818     assertEquals(p.offer(1, null), 0);
819     }
820    
821     /**
822     * Timed Offer with null item or TimeUnit throws NPE
823     */
824     public void testNullTimedOffer() {
825     SubmissionPublisher<Integer> p = basicPublisher();
826     try {
827     p.offer(null, SHORT_DELAY_MS, MILLISECONDS, null);
828     } catch (NullPointerException success) {
829     }
830     try {
831     p.offer(1, SHORT_DELAY_MS, null, null);
832     } catch (NullPointerException success) {
833     }
834     }
835    
836     /**
837     * Timed Offer returns number of lagged items if not saturated
838     */
839     public void testLaggedTimedOffer() {
840     SubmissionPublisher<Integer> p = basicPublisher();
841     TestSubscriber s1 = new TestSubscriber();
842     s1.request = false;
843     TestSubscriber s2 = new TestSubscriber();
844     s2.request = false;
845     p.subscribe(s1);
846     p.subscribe(s2);
847     s2.awaitSubscribe();
848     s1.awaitSubscribe();
849     assertTrue(p.offer(1, SHORT_DELAY_MS, MILLISECONDS, null) >= 1);
850     assertTrue(p.offer(2, SHORT_DELAY_MS, MILLISECONDS, null) >= 2);
851     s1.sn.request(4);
852     assertTrue(p.offer(3, SHORT_DELAY_MS, MILLISECONDS, null) >= 3);
853     s2.sn.request(4);
854     p.offer(4, SHORT_DELAY_MS, MILLISECONDS, null);
855     p.close();
856     s2.awaitComplete();
857     assertEquals(s2.nexts, 4);
858     s1.awaitComplete();
859     assertEquals(s2.nexts, 4);
860     }
861    
862     /**
863     * Timed Offer reports drops if saturated
864     */
865     public void testDroppedTimedOffer() {
866     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
867     basicExecutor, 4);
868     TestSubscriber s1 = new TestSubscriber();
869     s1.request = false;
870     TestSubscriber s2 = new TestSubscriber();
871     s2.request = false;
872     p.subscribe(s1);
873     p.subscribe(s2);
874     s2.awaitSubscribe();
875     s1.awaitSubscribe();
876     for (int i = 1; i <= 4; ++i)
877     assertTrue(p.offer(i, SHORT_DELAY_MS, MILLISECONDS, null) >= 0);
878     p.offer(5, SHORT_DELAY_MS, MILLISECONDS, null);
879     assertTrue(p.offer(6, SHORT_DELAY_MS, MILLISECONDS, null) < 0);
880     s1.sn.request(64);
881     assertTrue(p.offer(7, SHORT_DELAY_MS, MILLISECONDS, null) < 0);
882     s2.sn.request(64);
883     p.close();
884     s2.awaitComplete();
885     assertTrue(s2.nexts >= 2);
886     s1.awaitComplete();
887     assertTrue(s1.nexts >= 2);
888     }
889    
890     /**
891     * Timed Offer invokes drop handler if saturated
892     */
893     public void testHandledDroppedTimedOffer() {
894     AtomicInteger calls = new AtomicInteger();
895     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
896     basicExecutor, 4);
897     TestSubscriber s1 = new TestSubscriber();
898     s1.request = false;
899     TestSubscriber s2 = new TestSubscriber();
900     s2.request = false;
901     p.subscribe(s1);
902     p.subscribe(s2);
903     s2.awaitSubscribe();
904     s1.awaitSubscribe();
905     for (int i = 1; i <= 4; ++i)
906     assertTrue(p.offer(i, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
907     p.offer(5, (s, x) -> noopHandle(calls));
908     assertTrue(p.offer(6, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
909     s1.sn.request(64);
910     assertTrue(p.offer(7, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
911     s2.sn.request(64);
912     p.close();
913     s2.awaitComplete();
914     s1.awaitComplete();
915     assertTrue(calls.get() >= 2);
916     }
917    
918     /**
919     * Timed Offer succeeds if drop handler forces request
920     */
921     public void testRecoveredHandledDroppedTimedOffer() {
922     AtomicInteger calls = new AtomicInteger();
923     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
924     basicExecutor, 4);
925     TestSubscriber s1 = new TestSubscriber();
926     s1.request = false;
927     TestSubscriber s2 = new TestSubscriber();
928     s2.request = false;
929     p.subscribe(s1);
930     p.subscribe(s2);
931     s2.awaitSubscribe();
932     s1.awaitSubscribe();
933     int n = 0;
934     for (int i = 1; i <= 8; ++i) {
935     int d = p.offer(i, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> reqHandle(calls, s));
936     n = n + 2 + (d < 0 ? d : 0);
937     }
938     p.close();
939     s2.awaitComplete();
940     s1.awaitComplete();
941     assertEquals(s1.nexts + s2.nexts, n);
942     assertTrue(calls.get() >= 2);
943     }
944    
945    
946     }