ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SubmissionPublisherTest.java
Revision: 1.4
Committed: Mon Sep 7 20:33:41 2015 UTC (8 years, 7 months ago) by jsr166
Branch: MAIN
Changes since 1.3: +2 -2 lines
Log Message:
testEmptyTimedOffer: actually use timed offer

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea and Martin Buchholz with assistance from
3     * members of JCP JSR-166 Expert Group and released to the public
4     * domain, as explained at
5     * http://creativecommons.org/publicdomain/zero/1.0/
6     */
7    
8     import static java.util.concurrent.TimeUnit.MILLISECONDS;
9     import static java.util.concurrent.TimeUnit.SECONDS;
10    
11     import java.util.concurrent.Executor;
12     import java.util.concurrent.Executors;
13     import java.util.concurrent.Flow;
14     import static java.util.concurrent.Flow.Publisher;
15     import static java.util.concurrent.Flow.Subscriber;
16     import static java.util.concurrent.Flow.Subscription;
17     import java.util.concurrent.LinkedBlockingQueue;
18     import java.util.concurrent.ForkJoinPool;
19     import java.util.concurrent.SubmissionPublisher;
20     import java.util.concurrent.ThreadFactory;
21     import java.util.concurrent.ThreadPoolExecutor;
22     import java.util.concurrent.TimeUnit;
23     import java.util.concurrent.atomic.AtomicInteger;
24     import java.util.function.BiConsumer;
25     import java.util.function.BiPredicate;
26     import java.util.function.BiFunction;
27    
28     import junit.framework.Test;
29     import junit.framework.TestSuite;
30    
31     public class SubmissionPublisherTest extends JSR166TestCase {
32    
33     public static void main(String[] args) {
34     main(suite(), args);
35     }
36     public static Test suite() {
37     return new TestSuite(SubmissionPublisherTest.class);
38     }
39    
40     // Factory for single thread pool in case commonPool parallelism is zero
41     static final class DaemonThreadFactory implements ThreadFactory {
42     public Thread newThread(Runnable r) {
43     Thread t = new Thread(r);
44     t.setDaemon(true);
45     return t;
46     }
47     }
48 jsr166 1.2
49 dl 1.1 static final Executor basicExecutor =
50     (ForkJoinPool.getCommonPoolParallelism() > 0) ?
51     ForkJoinPool.commonPool() :
52     new ThreadPoolExecutor(1, 1, 60, SECONDS,
53     new LinkedBlockingQueue<Runnable>(),
54     new DaemonThreadFactory());
55 jsr166 1.2
56 dl 1.1 static SubmissionPublisher<Integer> basicPublisher() {
57     return new SubmissionPublisher<Integer>(basicExecutor,
58     Flow.defaultBufferSize());
59     }
60 jsr166 1.2
61 dl 1.1 static class SPException extends RuntimeException {}
62    
63     class TestSubscriber implements Subscriber<Integer> {
64     volatile Subscription sn;
65     int last; // Requires that onNexts are in numeric order
66     volatile int nexts;
67     volatile int errors;
68     volatile int completes;
69     volatile boolean throwOnCall = false;
70     volatile boolean request = true;
71     volatile Throwable lastError;
72    
73     public synchronized void onSubscribe(Subscription s) {
74     threadAssertTrue(sn == null);
75     sn = s;
76     notifyAll();
77     if (throwOnCall)
78     throw new SPException();
79     if (request)
80     sn.request(1L);
81     }
82     public synchronized void onNext(Integer t) {
83     ++nexts;
84     notifyAll();
85     int current = t.intValue();
86     threadAssertTrue(current >= last);
87     last = current;
88     if (request)
89     sn.request(1L);
90     if (throwOnCall)
91     throw new SPException();
92     }
93     public synchronized void onError(Throwable t) {
94     threadAssertTrue(completes == 0);
95     threadAssertTrue(errors == 0);
96     lastError = t;
97     ++errors;
98     notifyAll();
99     }
100     public synchronized void onComplete() {
101     threadAssertTrue(completes == 0);
102     ++completes;
103     notifyAll();
104     }
105    
106     synchronized void awaitSubscribe() {
107     while (sn == null) {
108     try {
109     wait();
110     } catch (Exception ex) {
111     threadUnexpectedException(ex);
112     break;
113     }
114     }
115     }
116     synchronized void awaitNext(int n) {
117     while (nexts < n) {
118     try {
119     wait();
120     } catch (Exception ex) {
121     threadUnexpectedException(ex);
122     break;
123     }
124     }
125     }
126     synchronized void awaitComplete() {
127     while (completes == 0 && errors == 0) {
128     try {
129     wait();
130     } catch (Exception ex) {
131     threadUnexpectedException(ex);
132     break;
133     }
134     }
135     }
136     synchronized void awaitError() {
137     while (errors == 0) {
138     try {
139     wait();
140     } catch (Exception ex) {
141     threadUnexpectedException(ex);
142     break;
143     }
144     }
145     }
146    
147     }
148    
149     /**
150     * A new SubmissionPublisher has no subscribers, a non-null
151     * executor, a power-of-two capacity, is not closed, and reports
152     * zero demand and lag
153     */
154     void checkInitialState(SubmissionPublisher<?> p) {
155     assertFalse(p.hasSubscribers());
156     assertEquals(p.getNumberOfSubscribers(), 0);
157     assertTrue(p.getSubscribers().isEmpty());
158     assertFalse(p.isClosed());
159     assertNull(p.getClosedException());
160     int n = p.getMaxBufferCapacity();
161     assertTrue((n & (n - 1)) == 0); // power of two
162     assertNotNull(p.getExecutor());
163     assertEquals(p.estimateMinimumDemand(), 0);
164     assertEquals(p.estimateMaximumLag(), 0);
165     }
166    
167     /**
168     * A default-constructed SubmissionPublisher has no subscribers,
169     * is not closed, has default buffer size, and uses the
170     * ForkJoinPool.commonPool executor
171     */
172     public void testConstructor1() {
173     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>();
174     checkInitialState(p);
175     assertSame(p.getExecutor(), ForkJoinPool.commonPool());
176     assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
177     }
178    
179     /**
180     * A new SubmissionPublisher has no subscribers, is not closed,
181     * has the given buffer size, and uses the given executor
182     */
183     public void testConstructor2() {
184     Executor e = Executors.newFixedThreadPool(1);
185     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(e, 8);
186     checkInitialState(p);
187     assertSame(p.getExecutor(), e);
188     assertEquals(p.getMaxBufferCapacity(), 8);
189     }
190    
191     /**
192     * A null Executor argument to SubmissionPublisher constructor throws NPE
193     */
194     public void testConstructor3() {
195     try {
196 jsr166 1.2 new SubmissionPublisher<Integer>(null, 8);
197 dl 1.1 shouldThrow();
198 jsr166 1.2 } catch (NullPointerException success) {}
199 dl 1.1 }
200    
201     /**
202     * A negative capacity argument to SubmissionPublisher constructor
203     * throws IAE
204     */
205     public void testConstructor4() {
206     Executor e = Executors.newFixedThreadPool(1);
207     try {
208 jsr166 1.2 new SubmissionPublisher<Integer>(e, -1);
209 dl 1.1 shouldThrow();
210 jsr166 1.2 } catch (IllegalArgumentException success) {}
211 dl 1.1 }
212    
213     /**
214     * A closed publisher reports isClosed with no closedException and
215     * throws ISE upon attempted submission; a subsequent close or
216     * closeExceptionally has no additional effect.
217     */
218     public void testClose() {
219     SubmissionPublisher<Integer> p = basicPublisher();
220     checkInitialState(p);
221     p.close();
222     assertTrue(p.isClosed());
223     assertNull(p.getClosedException());
224     try {
225     p.submit(1);
226     shouldThrow();
227 jsr166 1.2 } catch (IllegalStateException success) {}
228 dl 1.1 Throwable ex = new SPException();
229     p.closeExceptionally(ex);
230     assertTrue(p.isClosed());
231     assertNull(p.getClosedException());
232     }
233    
234     /**
235     * A publisher closedExceptionally reports isClosed with the
236     * closedException and throws ISE upon attempted submission; a
237     * subsequent close or closeExceptionally has no additional
238     * effect.
239     */
240     public void testCloseExceptionally() {
241     SubmissionPublisher<Integer> p = basicPublisher();
242     checkInitialState(p);
243     Throwable ex = new SPException();
244     p.closeExceptionally(ex);
245     assertTrue(p.isClosed());
246     assertSame(p.getClosedException(), ex);
247     try {
248     p.submit(1);
249     shouldThrow();
250 jsr166 1.2 } catch (IllegalStateException success) {}
251 dl 1.1 p.close();
252     assertTrue(p.isClosed());
253     assertSame(p.getClosedException(), ex);
254     }
255    
256     /**
257     * Upon subscription, the subscriber's onSubscribe is called, no
258     * other Subscriber methods are invoked, the publisher
259     * hasSubscribers, isSubscribed is true, and existing
260     * subscriptions are unaffected.
261     */
262     public void testSubscribe1() {
263     TestSubscriber s = new TestSubscriber();
264     SubmissionPublisher<Integer> p = basicPublisher();
265     p.subscribe(s);
266     assertTrue(p.hasSubscribers());
267     assertEquals(p.getNumberOfSubscribers(), 1);
268     assertTrue(p.getSubscribers().contains(s));
269     assertTrue(p.isSubscribed(s));
270     s.awaitSubscribe();
271     assertNotNull(s.sn);
272     assertEquals(s.nexts, 0);
273     assertEquals(s.errors, 0);
274     assertEquals(s.completes, 0);
275     TestSubscriber s2 = new TestSubscriber();
276     p.subscribe(s2);
277     assertTrue(p.hasSubscribers());
278     assertEquals(p.getNumberOfSubscribers(), 2);
279     assertTrue(p.getSubscribers().contains(s));
280     assertTrue(p.getSubscribers().contains(s2));
281     assertTrue(p.isSubscribed(s));
282     assertTrue(p.isSubscribed(s2));
283     s2.awaitSubscribe();
284     assertNotNull(s2.sn);
285     assertEquals(s2.nexts, 0);
286     assertEquals(s2.errors, 0);
287     assertEquals(s2.completes, 0);
288     }
289    
290     /**
291     * If closed, upon subscription, the subscriber's onComplete
292     * method is invoked
293     */
294     public void testSubscribe2() {
295     TestSubscriber s = new TestSubscriber();
296     SubmissionPublisher<Integer> p = basicPublisher();
297     p.close();
298     p.subscribe(s);
299     s.awaitComplete();
300     assertEquals(s.nexts, 0);
301     assertEquals(s.errors, 0);
302     assertEquals(s.completes, 1);
303     }
304    
305     /**
306     * If closedExceptionally, upon subscription, the subscriber's
307     * onError method is invoked
308     */
309     public void testSubscribe3() {
310     TestSubscriber s = new TestSubscriber();
311     SubmissionPublisher<Integer> p = basicPublisher();
312     Throwable ex = new SPException();
313     p.closeExceptionally(ex);
314     assertTrue(p.isClosed());
315     assertSame(p.getClosedException(), ex);
316     p.subscribe(s);
317     s.awaitError();
318     assertEquals(s.nexts, 0);
319     assertEquals(s.errors, 1);
320     }
321    
322     /**
323     * Upon attempted resubscription, the subscriber's onError is
324     * called and the subscription is cancelled.
325     */
326     public void testSubscribe4() {
327     TestSubscriber s = new TestSubscriber();
328     SubmissionPublisher<Integer> p = basicPublisher();
329     p.subscribe(s);
330     assertTrue(p.hasSubscribers());
331     assertEquals(p.getNumberOfSubscribers(), 1);
332     assertTrue(p.getSubscribers().contains(s));
333     assertTrue(p.isSubscribed(s));
334     s.awaitSubscribe();
335     assertNotNull(s.sn);
336     assertEquals(s.nexts, 0);
337     assertEquals(s.errors, 0);
338     assertEquals(s.completes, 0);
339     p.subscribe(s);
340     s.awaitError();
341     assertEquals(s.nexts, 0);
342     assertEquals(s.errors, 1);
343     assertFalse(p.isSubscribed(s));
344     }
345    
346     /**
347     * An exception thrown in onSubscribe causes onError
348     */
349     public void testSubscribe5() {
350     TestSubscriber s = new TestSubscriber();
351     SubmissionPublisher<Integer> p = basicPublisher();
352     s.throwOnCall = true;
353     try {
354     p.subscribe(s);
355 jsr166 1.2 } catch (Exception ok) {}
356 dl 1.1 s.awaitError();
357     assertEquals(s.nexts, 0);
358     assertEquals(s.errors, 1);
359     assertEquals(s.completes, 0);
360     }
361    
362     /**
363     * subscribe(null) thows NPE
364     */
365     public void testSubscribe6() {
366     SubmissionPublisher<Integer> p = basicPublisher();
367     try {
368     p.subscribe(null);
369 jsr166 1.2 shouldThrow();
370     } catch (NullPointerException success) {}
371 dl 1.1 checkInitialState(p);
372     }
373    
374     /**
375     * Closing a publisher causes onComplete to subscribers
376     */
377     public void testCloseCompletes() {
378     SubmissionPublisher<Integer> p = basicPublisher();
379     TestSubscriber s1 = new TestSubscriber();
380     TestSubscriber s2 = new TestSubscriber();
381     p.subscribe(s1);
382     p.subscribe(s2);
383     p.submit(1);
384     p.close();
385     assertTrue(p.isClosed());
386     assertNull(p.getClosedException());
387     s1.awaitComplete();
388     assertEquals(s1.nexts, 1);
389     assertEquals(s1.completes, 1);
390     s2.awaitComplete();
391     assertEquals(s2.nexts, 1);
392     assertEquals(s2.completes, 1);
393     }
394    
395     /**
396     * Closing a publisher exceptionally causes onError to subscribers
397     */
398     public void testCloseExceptionallyError() {
399     SubmissionPublisher<Integer> p = basicPublisher();
400     TestSubscriber s1 = new TestSubscriber();
401     TestSubscriber s2 = new TestSubscriber();
402     p.subscribe(s1);
403     p.subscribe(s2);
404     p.submit(1);
405     p.closeExceptionally(new SPException());
406     assertTrue(p.isClosed());
407     s1.awaitError();
408     assertTrue(s1.nexts <= 1);
409     assertEquals(s1.errors, 1);
410     s2.awaitError();
411     assertTrue(s2.nexts <= 1);
412     assertEquals(s2.errors, 1);
413     }
414    
415     /**
416     * Cancelling a subscription eventually causes no more onNexts to be issued
417     */
418     public void testCancel() {
419     SubmissionPublisher<Integer> p = basicPublisher();
420     TestSubscriber s1 = new TestSubscriber();
421     TestSubscriber s2 = new TestSubscriber();
422     p.subscribe(s1);
423     p.subscribe(s2);
424     s1.awaitSubscribe();
425     p.submit(1);
426     s1.sn.cancel();
427     for (int i = 2; i <= 20; ++i)
428     p.submit(i);
429     p.close();
430     s2.awaitComplete();
431     assertEquals(s2.nexts, 20);
432     assertEquals(s2.completes, 1);
433     assertTrue(s1.nexts < 20);
434     assertFalse(p.isSubscribed(s1));
435     }
436    
437     /**
438     * Throwing an exception in onNext causes onError
439     */
440     public void testThrowOnNext() {
441     SubmissionPublisher<Integer> p = basicPublisher();
442     TestSubscriber s1 = new TestSubscriber();
443     TestSubscriber s2 = new TestSubscriber();
444     p.subscribe(s1);
445     p.subscribe(s2);
446     s1.awaitSubscribe();
447     p.submit(1);
448     s1.throwOnCall = true;
449     p.submit(2);
450     p.close();
451     s2.awaitComplete();
452     assertEquals(s2.nexts, 2);
453     s1.awaitComplete();
454     assertEquals(s1.errors, 1);
455     }
456    
457     /**
458     * If a handler is supplied in conctructor, it is invoked when
459     * subscriber throws an exception in onNext
460     */
461     public void testThrowOnNextHandler() {
462     AtomicInteger calls = new AtomicInteger();
463     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>
464     (basicExecutor, 8,
465     (s, e) -> calls.getAndIncrement());
466     TestSubscriber s1 = new TestSubscriber();
467     TestSubscriber s2 = new TestSubscriber();
468     p.subscribe(s1);
469     p.subscribe(s2);
470     s1.awaitSubscribe();
471     p.submit(1);
472     s1.throwOnCall = true;
473     p.submit(2);
474     p.close();
475     s2.awaitComplete();
476     assertEquals(s2.nexts, 2);
477     assertEquals(s2.completes, 1);
478     s1.awaitError();
479     assertEquals(s1.errors, 1);
480     assertEquals(calls.get(), 1);
481     }
482    
483     /**
484     * onNext items are issued in the same order to each subscriber
485     */
486     public void testOrder() {
487     SubmissionPublisher<Integer> p = basicPublisher();
488     TestSubscriber s1 = new TestSubscriber();
489     TestSubscriber s2 = new TestSubscriber();
490     p.subscribe(s1);
491     p.subscribe(s2);
492     for (int i = 1; i <= 20; ++i)
493     p.submit(i);
494     p.close();
495     s2.awaitComplete();
496     s1.awaitComplete();
497     assertEquals(s2.nexts, 20);
498     assertEquals(s2.completes, 1);
499     assertEquals(s1.nexts, 20);
500     assertEquals(s1.completes, 1);
501     }
502    
503     /**
504     * onNext is issued only if requested
505     */
506     public void testRequest1() {
507     SubmissionPublisher<Integer> p = basicPublisher();
508     TestSubscriber s1 = new TestSubscriber();
509     s1.request = false;
510     p.subscribe(s1);
511     s1.awaitSubscribe();
512     assertTrue(p.estimateMinimumDemand() == 0);
513     TestSubscriber s2 = new TestSubscriber();
514     p.subscribe(s2);
515     p.submit(1);
516     p.submit(2);
517     s2.awaitNext(1);
518     assertEquals(s1.nexts, 0);
519     s1.sn.request(3);
520     p.submit(3);
521     p.close();
522     s2.awaitComplete();
523     assertEquals(s2.nexts, 3);
524     assertEquals(s2.completes, 1);
525     s1.awaitComplete();
526     assertTrue(s1.nexts > 0);
527     assertEquals(s1.completes, 1);
528     }
529    
530     /**
531     * onNext is not issued when requests become zero
532     */
533     public void testRequest2() {
534     SubmissionPublisher<Integer> p = basicPublisher();
535     TestSubscriber s1 = new TestSubscriber();
536     TestSubscriber s2 = new TestSubscriber();
537     p.subscribe(s1);
538     p.subscribe(s2);
539     s2.awaitSubscribe();
540     s1.awaitSubscribe();
541     s1.request = false;
542     p.submit(1);
543     p.submit(2);
544     p.close();
545     s2.awaitComplete();
546     assertEquals(s2.nexts, 2);
547     assertEquals(s2.completes, 1);
548     s1.awaitNext(1);
549     assertEquals(s1.nexts, 1);
550     }
551    
552     /**
553     * Negative request causes error
554     */
555     public void testRequest3() {
556     SubmissionPublisher<Integer> p = basicPublisher();
557     TestSubscriber s1 = new TestSubscriber();
558     TestSubscriber s2 = new TestSubscriber();
559     p.subscribe(s1);
560     p.subscribe(s2);
561     s2.awaitSubscribe();
562     s1.awaitSubscribe();
563     s1.sn.request(-1L);
564     p.submit(1);
565     p.submit(2);
566     p.close();
567     s2.awaitComplete();
568     assertEquals(s2.nexts, 2);
569     assertEquals(s2.completes, 1);
570     s1.awaitError();
571     assertEquals(s1.errors, 1);
572     assertTrue(s1.lastError instanceof IllegalArgumentException);
573     }
574    
575     /**
576     * estimateMinimumDemand reports 0 until request, nonzero after
577     * request, and zero again after delivery
578     */
579     public void testEstimateMinimumDemand() {
580     TestSubscriber s = new TestSubscriber();
581     SubmissionPublisher<Integer> p = basicPublisher();
582     s.request = false;
583     p.subscribe(s);
584     s.awaitSubscribe();
585     assertEquals(p.estimateMinimumDemand(), 0);
586     s.sn.request(1);
587     assertEquals(p.estimateMinimumDemand(), 1);
588     p.submit(1);
589     s.awaitNext(1);
590     assertEquals(p.estimateMinimumDemand(), 0);
591     }
592    
593     /**
594 jsr166 1.3 * submit to a publisher with no subscribers returns lag 0
595 dl 1.1 */
596     public void testEmptySubmit() {
597     SubmissionPublisher<Integer> p = basicPublisher();
598     assertEquals(p.submit(1), 0);
599     }
600    
601     /**
602 jsr166 1.3 * submit(null) throws NPE
603 dl 1.1 */
604     public void testNullSubmit() {
605     SubmissionPublisher<Integer> p = basicPublisher();
606     try {
607     p.submit(null);
608 jsr166 1.2 shouldThrow();
609     } catch (NullPointerException success) {}
610 dl 1.1 }
611    
612     /**
613 jsr166 1.3 * submit returns number of lagged items, compatible with result
614 dl 1.1 * of estimateMaximumLag.
615     */
616     public void testLaggedSubmit() {
617     SubmissionPublisher<Integer> p = basicPublisher();
618     TestSubscriber s1 = new TestSubscriber();
619     s1.request = false;
620     TestSubscriber s2 = new TestSubscriber();
621     s2.request = false;
622     p.subscribe(s1);
623     p.subscribe(s2);
624     s2.awaitSubscribe();
625     s1.awaitSubscribe();
626     assertEquals(p.submit(1), 1);
627     assertTrue(p.estimateMaximumLag() >= 1);
628     assertTrue(p.submit(2) >= 2);
629     assertTrue(p.estimateMaximumLag() >= 2);
630     s1.sn.request(4);
631     assertTrue(p.submit(3) >= 3);
632     assertTrue(p.estimateMaximumLag() >= 3);
633     s2.sn.request(4);
634     p.submit(4);
635     p.close();
636     s2.awaitComplete();
637     assertEquals(s2.nexts, 4);
638     s1.awaitComplete();
639     assertEquals(s2.nexts, 4);
640     }
641    
642     /**
643     * submit eventually issues requested items when buffer capacity is 1
644     */
645     public void testCap1Submit() {
646     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
647     basicExecutor, 1);
648     TestSubscriber s1 = new TestSubscriber();
649     TestSubscriber s2 = new TestSubscriber();
650     p.subscribe(s1);
651     p.subscribe(s2);
652     for (int i = 1; i <= 20; ++i) {
653     assertTrue(p.estimateMinimumDemand() <= 1);
654     assertTrue(p.submit(i) >= 0);
655     }
656     p.close();
657     s2.awaitComplete();
658     s1.awaitComplete();
659     assertEquals(s2.nexts, 20);
660     assertEquals(s2.completes, 1);
661     assertEquals(s1.nexts, 20);
662     assertEquals(s1.completes, 1);
663     }
664 jsr166 1.2
665 dl 1.1 static boolean noopHandle(AtomicInteger count) {
666     count.getAndIncrement();
667     return false;
668     }
669    
670     static boolean reqHandle(AtomicInteger count, Subscriber s) {
671     count.getAndIncrement();
672     ((TestSubscriber)s).sn.request(Long.MAX_VALUE);
673     return true;
674     }
675    
676     /**
677 jsr166 1.3 * offer to a publisher with no subscribers returns lag 0
678 dl 1.1 */
679     public void testEmptyOffer() {
680     SubmissionPublisher<Integer> p = basicPublisher();
681 jsr166 1.3 assertEquals(0, p.offer(1, null));
682 dl 1.1 }
683    
684     /**
685 jsr166 1.3 * offer(null) throws NPE
686 dl 1.1 */
687     public void testNullOffer() {
688     SubmissionPublisher<Integer> p = basicPublisher();
689     try {
690     p.offer(null, null);
691 jsr166 1.2 shouldThrow();
692     } catch (NullPointerException success) {}
693 dl 1.1 }
694    
695     /**
696 jsr166 1.3 * offer returns number of lagged items if not saturated
697 dl 1.1 */
698     public void testLaggedOffer() {
699     SubmissionPublisher<Integer> p = basicPublisher();
700     TestSubscriber s1 = new TestSubscriber();
701     s1.request = false;
702     TestSubscriber s2 = new TestSubscriber();
703     s2.request = false;
704     p.subscribe(s1);
705     p.subscribe(s2);
706     s2.awaitSubscribe();
707     s1.awaitSubscribe();
708     assertTrue(p.offer(1, null) >= 1);
709     assertTrue(p.offer(2, null) >= 2);
710     s1.sn.request(4);
711     assertTrue(p.offer(3, null) >= 3);
712     s2.sn.request(4);
713     p.offer(4, null);
714     p.close();
715     s2.awaitComplete();
716     assertEquals(s2.nexts, 4);
717     s1.awaitComplete();
718     assertEquals(s2.nexts, 4);
719     }
720    
721     /**
722 jsr166 1.3 * offer reports drops if saturated
723 dl 1.1 */
724     public void testDroppedOffer() {
725     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
726     basicExecutor, 4);
727     TestSubscriber s1 = new TestSubscriber();
728     s1.request = false;
729     TestSubscriber s2 = new TestSubscriber();
730     s2.request = false;
731     p.subscribe(s1);
732     p.subscribe(s2);
733     s2.awaitSubscribe();
734     s1.awaitSubscribe();
735     for (int i = 1; i <= 4; ++i)
736     assertTrue(p.offer(i, null) >= 0);
737     p.offer(5, null);
738     assertTrue(p.offer(6, null) < 0);
739     s1.sn.request(64);
740     assertTrue(p.offer(7, null) < 0);
741     s2.sn.request(64);
742     p.close();
743     s2.awaitComplete();
744     assertTrue(s2.nexts >= 4);
745     s1.awaitComplete();
746     assertTrue(s1.nexts >= 4);
747     }
748    
749     /**
750 jsr166 1.3 * offer invokes drop handler if saturated
751 dl 1.1 */
752     public void testHandledDroppedOffer() {
753     AtomicInteger calls = new AtomicInteger();
754     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
755     basicExecutor, 4);
756     TestSubscriber s1 = new TestSubscriber();
757     s1.request = false;
758     TestSubscriber s2 = new TestSubscriber();
759     s2.request = false;
760     p.subscribe(s1);
761     p.subscribe(s2);
762     s2.awaitSubscribe();
763     s1.awaitSubscribe();
764     for (int i = 1; i <= 4; ++i)
765     assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
766     p.offer(4, (s, x) -> noopHandle(calls));
767     assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
768     s1.sn.request(64);
769     assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
770     s2.sn.request(64);
771     p.close();
772     s2.awaitComplete();
773     s1.awaitComplete();
774     assertTrue(calls.get() >= 4);
775     }
776    
777    
778     /**
779 jsr166 1.3 * offer succeeds if drop handler forces request
780 dl 1.1 */
781     public void testRecoveredHandledDroppedOffer() {
782     AtomicInteger calls = new AtomicInteger();
783     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
784     basicExecutor, 4);
785     TestSubscriber s1 = new TestSubscriber();
786     s1.request = false;
787     TestSubscriber s2 = new TestSubscriber();
788     s2.request = false;
789     p.subscribe(s1);
790     p.subscribe(s2);
791     s2.awaitSubscribe();
792     s1.awaitSubscribe();
793     int n = 0;
794     for (int i = 1; i <= 8; ++i) {
795     int d = p.offer(i, (s, x) -> reqHandle(calls, s));
796     n = n + 2 + (d < 0 ? d : 0);
797     }
798     p.close();
799     s2.awaitComplete();
800     s1.awaitComplete();
801     assertEquals(s1.nexts + s2.nexts, n);
802     assertTrue(calls.get() >= 2);
803     }
804    
805    
806     /**
807 jsr166 1.4 * Timed offer to a publisher with no subscribers returns lag 0
808 dl 1.1 */
809     public void testEmptyTimedOffer() {
810     SubmissionPublisher<Integer> p = basicPublisher();
811 jsr166 1.4 assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null));
812 dl 1.1 }
813    
814     /**
815 jsr166 1.3 * Timed offer with null item or TimeUnit throws NPE
816 dl 1.1 */
817     public void testNullTimedOffer() {
818     SubmissionPublisher<Integer> p = basicPublisher();
819     try {
820     p.offer(null, SHORT_DELAY_MS, MILLISECONDS, null);
821 jsr166 1.2 shouldThrow();
822     } catch (NullPointerException success) {}
823 dl 1.1 try {
824     p.offer(1, SHORT_DELAY_MS, null, null);
825 jsr166 1.2 shouldThrow();
826     } catch (NullPointerException success) {}
827 dl 1.1 }
828    
829     /**
830 jsr166 1.3 * Timed offer returns number of lagged items if not saturated
831 dl 1.1 */
832     public void testLaggedTimedOffer() {
833     SubmissionPublisher<Integer> p = basicPublisher();
834     TestSubscriber s1 = new TestSubscriber();
835     s1.request = false;
836     TestSubscriber s2 = new TestSubscriber();
837     s2.request = false;
838     p.subscribe(s1);
839     p.subscribe(s2);
840     s2.awaitSubscribe();
841     s1.awaitSubscribe();
842     assertTrue(p.offer(1, SHORT_DELAY_MS, MILLISECONDS, null) >= 1);
843     assertTrue(p.offer(2, SHORT_DELAY_MS, MILLISECONDS, null) >= 2);
844     s1.sn.request(4);
845     assertTrue(p.offer(3, SHORT_DELAY_MS, MILLISECONDS, null) >= 3);
846     s2.sn.request(4);
847     p.offer(4, SHORT_DELAY_MS, MILLISECONDS, null);
848     p.close();
849     s2.awaitComplete();
850     assertEquals(s2.nexts, 4);
851     s1.awaitComplete();
852     assertEquals(s2.nexts, 4);
853     }
854    
855     /**
856 jsr166 1.3 * Timed offer reports drops if saturated
857 dl 1.1 */
858     public void testDroppedTimedOffer() {
859     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
860     basicExecutor, 4);
861     TestSubscriber s1 = new TestSubscriber();
862     s1.request = false;
863     TestSubscriber s2 = new TestSubscriber();
864     s2.request = false;
865     p.subscribe(s1);
866     p.subscribe(s2);
867     s2.awaitSubscribe();
868     s1.awaitSubscribe();
869     for (int i = 1; i <= 4; ++i)
870     assertTrue(p.offer(i, SHORT_DELAY_MS, MILLISECONDS, null) >= 0);
871     p.offer(5, SHORT_DELAY_MS, MILLISECONDS, null);
872     assertTrue(p.offer(6, SHORT_DELAY_MS, MILLISECONDS, null) < 0);
873     s1.sn.request(64);
874     assertTrue(p.offer(7, SHORT_DELAY_MS, MILLISECONDS, null) < 0);
875     s2.sn.request(64);
876     p.close();
877     s2.awaitComplete();
878     assertTrue(s2.nexts >= 2);
879     s1.awaitComplete();
880     assertTrue(s1.nexts >= 2);
881     }
882    
883     /**
884 jsr166 1.3 * Timed offer invokes drop handler if saturated
885 dl 1.1 */
886     public void testHandledDroppedTimedOffer() {
887     AtomicInteger calls = new AtomicInteger();
888     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
889     basicExecutor, 4);
890     TestSubscriber s1 = new TestSubscriber();
891     s1.request = false;
892     TestSubscriber s2 = new TestSubscriber();
893     s2.request = false;
894     p.subscribe(s1);
895     p.subscribe(s2);
896     s2.awaitSubscribe();
897     s1.awaitSubscribe();
898     for (int i = 1; i <= 4; ++i)
899     assertTrue(p.offer(i, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
900     p.offer(5, (s, x) -> noopHandle(calls));
901     assertTrue(p.offer(6, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
902     s1.sn.request(64);
903     assertTrue(p.offer(7, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
904     s2.sn.request(64);
905     p.close();
906     s2.awaitComplete();
907     s1.awaitComplete();
908     assertTrue(calls.get() >= 2);
909     }
910    
911     /**
912 jsr166 1.3 * Timed offer succeeds if drop handler forces request
913 dl 1.1 */
914     public void testRecoveredHandledDroppedTimedOffer() {
915     AtomicInteger calls = new AtomicInteger();
916     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
917     basicExecutor, 4);
918     TestSubscriber s1 = new TestSubscriber();
919     s1.request = false;
920     TestSubscriber s2 = new TestSubscriber();
921     s2.request = false;
922     p.subscribe(s1);
923     p.subscribe(s2);
924     s2.awaitSubscribe();
925     s1.awaitSubscribe();
926     int n = 0;
927     for (int i = 1; i <= 8; ++i) {
928     int d = p.offer(i, SHORT_DELAY_MS, MILLISECONDS, (s, x) -> reqHandle(calls, s));
929     n = n + 2 + (d < 0 ? d : 0);
930     }
931     p.close();
932     s2.awaitComplete();
933     s1.awaitComplete();
934     assertEquals(s1.nexts + s2.nexts, n);
935     assertTrue(calls.get() >= 2);
936     }
937    
938    
939     }