ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SubmissionPublisherTest.java
Revision: 1.13
Committed: Sat Sep 12 18:19:57 2015 UTC (8 years, 7 months ago) by jsr166
Branch: MAIN
Changes since 1.12: +1 -1 lines
Log Message:
whitespace

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea and Martin Buchholz with assistance from
3     * members of JCP JSR-166 Expert Group and released to the public
4     * domain, as explained at
5     * http://creativecommons.org/publicdomain/zero/1.0/
6     */
7    
8 dl 1.10 import java.util.concurrent.CompletableFuture;
9 dl 1.1 import java.util.concurrent.Executor;
10     import java.util.concurrent.Executors;
11     import java.util.concurrent.Flow;
12 dl 1.10 import java.util.concurrent.ForkJoinPool;
13 dl 1.1 import java.util.concurrent.LinkedBlockingQueue;
14     import java.util.concurrent.SubmissionPublisher;
15     import java.util.concurrent.ThreadFactory;
16     import java.util.concurrent.ThreadPoolExecutor;
17     import java.util.concurrent.TimeUnit;
18     import java.util.concurrent.atomic.AtomicInteger;
19     import java.util.function.BiConsumer;
20 dl 1.10 import java.util.function.BiFunction;
21 dl 1.1 import java.util.function.BiPredicate;
22 dl 1.10 import java.util.stream.Stream;
23 dl 1.1 import junit.framework.Test;
24     import junit.framework.TestSuite;
25    
26 dl 1.10 import static java.util.concurrent.Flow.Publisher;
27     import static java.util.concurrent.Flow.Subscriber;
28     import static java.util.concurrent.Flow.Subscription;
29     import static java.util.concurrent.TimeUnit.MILLISECONDS;
30     import static java.util.concurrent.TimeUnit.SECONDS;
31    
32 dl 1.1 public class SubmissionPublisherTest extends JSR166TestCase {
33    
34     public static void main(String[] args) {
35     main(suite(), args);
36     }
37     public static Test suite() {
38     return new TestSuite(SubmissionPublisherTest.class);
39     }
40    
41 dl 1.12 final Executor basicExecutor = basicPublisher().getExecutor();
42 jsr166 1.13
43 dl 1.1 static SubmissionPublisher<Integer> basicPublisher() {
44 dl 1.12 return new SubmissionPublisher<Integer>();
45 dl 1.1 }
46 jsr166 1.2
47 dl 1.1 static class SPException extends RuntimeException {}
48    
49     class TestSubscriber implements Subscriber<Integer> {
50     volatile Subscription sn;
51     int last; // Requires that onNexts are in numeric order
52     volatile int nexts;
53     volatile int errors;
54     volatile int completes;
55     volatile boolean throwOnCall = false;
56     volatile boolean request = true;
57     volatile Throwable lastError;
58    
59     public synchronized void onSubscribe(Subscription s) {
60     threadAssertTrue(sn == null);
61     sn = s;
62     notifyAll();
63     if (throwOnCall)
64     throw new SPException();
65     if (request)
66     sn.request(1L);
67     }
68     public synchronized void onNext(Integer t) {
69     ++nexts;
70     notifyAll();
71     int current = t.intValue();
72     threadAssertTrue(current >= last);
73     last = current;
74     if (request)
75     sn.request(1L);
76     if (throwOnCall)
77     throw new SPException();
78     }
79     public synchronized void onError(Throwable t) {
80     threadAssertTrue(completes == 0);
81     threadAssertTrue(errors == 0);
82     lastError = t;
83     ++errors;
84     notifyAll();
85     }
86     public synchronized void onComplete() {
87     threadAssertTrue(completes == 0);
88     ++completes;
89     notifyAll();
90     }
91    
92     synchronized void awaitSubscribe() {
93     while (sn == null) {
94     try {
95     wait();
96     } catch (Exception ex) {
97     threadUnexpectedException(ex);
98     break;
99     }
100     }
101     }
102     synchronized void awaitNext(int n) {
103     while (nexts < n) {
104     try {
105     wait();
106     } catch (Exception ex) {
107     threadUnexpectedException(ex);
108     break;
109     }
110     }
111     }
112     synchronized void awaitComplete() {
113     while (completes == 0 && errors == 0) {
114     try {
115     wait();
116     } catch (Exception ex) {
117     threadUnexpectedException(ex);
118     break;
119     }
120     }
121     }
122     synchronized void awaitError() {
123     while (errors == 0) {
124     try {
125     wait();
126     } catch (Exception ex) {
127     threadUnexpectedException(ex);
128     break;
129     }
130     }
131     }
132    
133     }
134    
135     /**
136     * A new SubmissionPublisher has no subscribers, a non-null
137     * executor, a power-of-two capacity, is not closed, and reports
138     * zero demand and lag
139     */
140     void checkInitialState(SubmissionPublisher<?> p) {
141     assertFalse(p.hasSubscribers());
142 jsr166 1.6 assertEquals(0, p.getNumberOfSubscribers());
143 dl 1.1 assertTrue(p.getSubscribers().isEmpty());
144     assertFalse(p.isClosed());
145     assertNull(p.getClosedException());
146     int n = p.getMaxBufferCapacity();
147     assertTrue((n & (n - 1)) == 0); // power of two
148     assertNotNull(p.getExecutor());
149 jsr166 1.6 assertEquals(0, p.estimateMinimumDemand());
150     assertEquals(0, p.estimateMaximumLag());
151 dl 1.1 }
152    
153     /**
154     * A default-constructed SubmissionPublisher has no subscribers,
155     * is not closed, has default buffer size, and uses the
156 dl 1.12 * defaultExecutor
157 dl 1.1 */
158     public void testConstructor1() {
159     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>();
160     checkInitialState(p);
161     assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
162 dl 1.12 Executor e = p.getExecutor(), c = ForkJoinPool.commonPool();
163     if (ForkJoinPool.getCommonPoolParallelism() > 1)
164     assertSame(e, c);
165     else
166     assertNotSame(e, c);
167 dl 1.1 }
168    
169     /**
170     * A new SubmissionPublisher has no subscribers, is not closed,
171     * has the given buffer size, and uses the given executor
172     */
173     public void testConstructor2() {
174     Executor e = Executors.newFixedThreadPool(1);
175     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(e, 8);
176     checkInitialState(p);
177     assertSame(p.getExecutor(), e);
178 jsr166 1.6 assertEquals(8, p.getMaxBufferCapacity());
179 dl 1.1 }
180    
181     /**
182     * A null Executor argument to SubmissionPublisher constructor throws NPE
183     */
184     public void testConstructor3() {
185     try {
186 jsr166 1.2 new SubmissionPublisher<Integer>(null, 8);
187 dl 1.1 shouldThrow();
188 jsr166 1.2 } catch (NullPointerException success) {}
189 dl 1.1 }
190    
191     /**
192     * A negative capacity argument to SubmissionPublisher constructor
193     * throws IAE
194     */
195     public void testConstructor4() {
196     Executor e = Executors.newFixedThreadPool(1);
197     try {
198 jsr166 1.2 new SubmissionPublisher<Integer>(e, -1);
199 dl 1.1 shouldThrow();
200 jsr166 1.2 } catch (IllegalArgumentException success) {}
201 dl 1.1 }
202    
203     /**
204     * A closed publisher reports isClosed with no closedException and
205     * throws ISE upon attempted submission; a subsequent close or
206     * closeExceptionally has no additional effect.
207     */
208     public void testClose() {
209     SubmissionPublisher<Integer> p = basicPublisher();
210     checkInitialState(p);
211     p.close();
212     assertTrue(p.isClosed());
213     assertNull(p.getClosedException());
214     try {
215     p.submit(1);
216     shouldThrow();
217 jsr166 1.2 } catch (IllegalStateException success) {}
218 dl 1.1 Throwable ex = new SPException();
219     p.closeExceptionally(ex);
220     assertTrue(p.isClosed());
221     assertNull(p.getClosedException());
222     }
223    
224     /**
225     * A publisher closedExceptionally reports isClosed with the
226     * closedException and throws ISE upon attempted submission; a
227     * subsequent close or closeExceptionally has no additional
228     * effect.
229     */
230     public void testCloseExceptionally() {
231     SubmissionPublisher<Integer> p = basicPublisher();
232     checkInitialState(p);
233     Throwable ex = new SPException();
234     p.closeExceptionally(ex);
235     assertTrue(p.isClosed());
236     assertSame(p.getClosedException(), ex);
237     try {
238     p.submit(1);
239     shouldThrow();
240 jsr166 1.2 } catch (IllegalStateException success) {}
241 dl 1.1 p.close();
242     assertTrue(p.isClosed());
243     assertSame(p.getClosedException(), ex);
244     }
245    
246     /**
247     * Upon subscription, the subscriber's onSubscribe is called, no
248     * other Subscriber methods are invoked, the publisher
249     * hasSubscribers, isSubscribed is true, and existing
250     * subscriptions are unaffected.
251     */
252     public void testSubscribe1() {
253     TestSubscriber s = new TestSubscriber();
254     SubmissionPublisher<Integer> p = basicPublisher();
255     p.subscribe(s);
256     assertTrue(p.hasSubscribers());
257 jsr166 1.6 assertEquals(1, p.getNumberOfSubscribers());
258 dl 1.1 assertTrue(p.getSubscribers().contains(s));
259     assertTrue(p.isSubscribed(s));
260     s.awaitSubscribe();
261     assertNotNull(s.sn);
262 jsr166 1.6 assertEquals(0, s.nexts);
263     assertEquals(0, s.errors);
264     assertEquals(0, s.completes);
265 dl 1.1 TestSubscriber s2 = new TestSubscriber();
266     p.subscribe(s2);
267     assertTrue(p.hasSubscribers());
268 jsr166 1.6 assertEquals(2, p.getNumberOfSubscribers());
269 dl 1.1 assertTrue(p.getSubscribers().contains(s));
270     assertTrue(p.getSubscribers().contains(s2));
271     assertTrue(p.isSubscribed(s));
272     assertTrue(p.isSubscribed(s2));
273     s2.awaitSubscribe();
274     assertNotNull(s2.sn);
275 jsr166 1.6 assertEquals(0, s2.nexts);
276     assertEquals(0, s2.errors);
277     assertEquals(0, s2.completes);
278 dl 1.9 p.close();
279 dl 1.1 }
280    
281     /**
282     * If closed, upon subscription, the subscriber's onComplete
283     * method is invoked
284     */
285     public void testSubscribe2() {
286     TestSubscriber s = new TestSubscriber();
287     SubmissionPublisher<Integer> p = basicPublisher();
288     p.close();
289     p.subscribe(s);
290     s.awaitComplete();
291 jsr166 1.6 assertEquals(0, s.nexts);
292     assertEquals(0, s.errors);
293     assertEquals(1, s.completes, 1);
294 dl 1.1 }
295    
296     /**
297     * If closedExceptionally, upon subscription, the subscriber's
298     * onError method is invoked
299     */
300     public void testSubscribe3() {
301     TestSubscriber s = new TestSubscriber();
302     SubmissionPublisher<Integer> p = basicPublisher();
303     Throwable ex = new SPException();
304     p.closeExceptionally(ex);
305     assertTrue(p.isClosed());
306     assertSame(p.getClosedException(), ex);
307     p.subscribe(s);
308     s.awaitError();
309 jsr166 1.6 assertEquals(0, s.nexts);
310     assertEquals(1, s.errors);
311 dl 1.1 }
312    
313     /**
314     * Upon attempted resubscription, the subscriber's onError is
315     * called and the subscription is cancelled.
316     */
317     public void testSubscribe4() {
318     TestSubscriber s = new TestSubscriber();
319     SubmissionPublisher<Integer> p = basicPublisher();
320     p.subscribe(s);
321     assertTrue(p.hasSubscribers());
322 jsr166 1.6 assertEquals(1, p.getNumberOfSubscribers());
323 dl 1.1 assertTrue(p.getSubscribers().contains(s));
324     assertTrue(p.isSubscribed(s));
325     s.awaitSubscribe();
326     assertNotNull(s.sn);
327 jsr166 1.6 assertEquals(0, s.nexts);
328     assertEquals(0, s.errors);
329     assertEquals(0, s.completes);
330 dl 1.1 p.subscribe(s);
331     s.awaitError();
332 jsr166 1.6 assertEquals(0, s.nexts);
333     assertEquals(1, s.errors);
334 dl 1.1 assertFalse(p.isSubscribed(s));
335     }
336    
337     /**
338     * An exception thrown in onSubscribe causes onError
339     */
340     public void testSubscribe5() {
341     TestSubscriber s = new TestSubscriber();
342     SubmissionPublisher<Integer> p = basicPublisher();
343     s.throwOnCall = true;
344     try {
345     p.subscribe(s);
346 jsr166 1.2 } catch (Exception ok) {}
347 dl 1.1 s.awaitError();
348 jsr166 1.6 assertEquals(0, s.nexts);
349     assertEquals(1, s.errors);
350     assertEquals(0, s.completes);
351 dl 1.1 }
352    
353     /**
354 jsr166 1.8 * subscribe(null) throws NPE
355 dl 1.1 */
356     public void testSubscribe6() {
357     SubmissionPublisher<Integer> p = basicPublisher();
358     try {
359     p.subscribe(null);
360 jsr166 1.2 shouldThrow();
361     } catch (NullPointerException success) {}
362 dl 1.1 checkInitialState(p);
363     }
364    
365     /**
366     * Closing a publisher causes onComplete to subscribers
367     */
368     public void testCloseCompletes() {
369     SubmissionPublisher<Integer> p = basicPublisher();
370     TestSubscriber s1 = new TestSubscriber();
371     TestSubscriber s2 = new TestSubscriber();
372     p.subscribe(s1);
373     p.subscribe(s2);
374     p.submit(1);
375     p.close();
376     assertTrue(p.isClosed());
377     assertNull(p.getClosedException());
378     s1.awaitComplete();
379 jsr166 1.6 assertEquals(1, s1.nexts);
380     assertEquals(1, s1.completes);
381 dl 1.1 s2.awaitComplete();
382 jsr166 1.6 assertEquals(1, s2.nexts);
383     assertEquals(1, s2.completes);
384 dl 1.1 }
385    
386     /**
387     * Closing a publisher exceptionally causes onError to subscribers
388     */
389     public void testCloseExceptionallyError() {
390     SubmissionPublisher<Integer> p = basicPublisher();
391     TestSubscriber s1 = new TestSubscriber();
392     TestSubscriber s2 = new TestSubscriber();
393     p.subscribe(s1);
394     p.subscribe(s2);
395     p.submit(1);
396     p.closeExceptionally(new SPException());
397     assertTrue(p.isClosed());
398     s1.awaitError();
399     assertTrue(s1.nexts <= 1);
400 jsr166 1.6 assertEquals(1, s1.errors);
401 dl 1.1 s2.awaitError();
402     assertTrue(s2.nexts <= 1);
403 jsr166 1.6 assertEquals(1, s2.errors);
404 dl 1.1 }
405    
406     /**
407     * Cancelling a subscription eventually causes no more onNexts to be issued
408     */
409     public void testCancel() {
410     SubmissionPublisher<Integer> p = basicPublisher();
411     TestSubscriber s1 = new TestSubscriber();
412     TestSubscriber s2 = new TestSubscriber();
413     p.subscribe(s1);
414     p.subscribe(s2);
415     s1.awaitSubscribe();
416     p.submit(1);
417     s1.sn.cancel();
418     for (int i = 2; i <= 20; ++i)
419     p.submit(i);
420     p.close();
421     s2.awaitComplete();
422 jsr166 1.6 assertEquals(20, s2.nexts);
423     assertEquals(1, s2.completes);
424 dl 1.1 assertTrue(s1.nexts < 20);
425     assertFalse(p.isSubscribed(s1));
426     }
427    
428     /**
429     * Throwing an exception in onNext causes onError
430     */
431     public void testThrowOnNext() {
432     SubmissionPublisher<Integer> p = basicPublisher();
433     TestSubscriber s1 = new TestSubscriber();
434     TestSubscriber s2 = new TestSubscriber();
435     p.subscribe(s1);
436     p.subscribe(s2);
437     s1.awaitSubscribe();
438     p.submit(1);
439     s1.throwOnCall = true;
440     p.submit(2);
441     p.close();
442     s2.awaitComplete();
443 jsr166 1.6 assertEquals(2, s2.nexts);
444 dl 1.1 s1.awaitComplete();
445 jsr166 1.6 assertEquals(1, s1.errors);
446 dl 1.1 }
447    
448     /**
449 jsr166 1.8 * If a handler is supplied in constructor, it is invoked when
450 dl 1.1 * subscriber throws an exception in onNext
451     */
452     public void testThrowOnNextHandler() {
453     AtomicInteger calls = new AtomicInteger();
454     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>
455     (basicExecutor, 8,
456     (s, e) -> calls.getAndIncrement());
457     TestSubscriber s1 = new TestSubscriber();
458     TestSubscriber s2 = new TestSubscriber();
459     p.subscribe(s1);
460     p.subscribe(s2);
461     s1.awaitSubscribe();
462     p.submit(1);
463     s1.throwOnCall = true;
464     p.submit(2);
465     p.close();
466     s2.awaitComplete();
467 jsr166 1.6 assertEquals(2, s2.nexts);
468     assertEquals(1, s2.completes);
469 dl 1.1 s1.awaitError();
470 jsr166 1.6 assertEquals(1, s1.errors);
471     assertEquals(1, calls.get());
472 dl 1.1 }
473    
474     /**
475     * onNext items are issued in the same order to each subscriber
476     */
477     public void testOrder() {
478     SubmissionPublisher<Integer> p = basicPublisher();
479     TestSubscriber s1 = new TestSubscriber();
480     TestSubscriber s2 = new TestSubscriber();
481     p.subscribe(s1);
482     p.subscribe(s2);
483     for (int i = 1; i <= 20; ++i)
484     p.submit(i);
485     p.close();
486     s2.awaitComplete();
487     s1.awaitComplete();
488 jsr166 1.6 assertEquals(20, s2.nexts);
489     assertEquals(1, s2.completes);
490     assertEquals(20, s1.nexts);
491     assertEquals(1, s1.completes);
492 dl 1.1 }
493    
494     /**
495     * onNext is issued only if requested
496     */
497     public void testRequest1() {
498     SubmissionPublisher<Integer> p = basicPublisher();
499     TestSubscriber s1 = new TestSubscriber();
500     s1.request = false;
501     p.subscribe(s1);
502     s1.awaitSubscribe();
503     assertTrue(p.estimateMinimumDemand() == 0);
504     TestSubscriber s2 = new TestSubscriber();
505     p.subscribe(s2);
506     p.submit(1);
507     p.submit(2);
508     s2.awaitNext(1);
509 jsr166 1.6 assertEquals(0, s1.nexts);
510 dl 1.1 s1.sn.request(3);
511     p.submit(3);
512     p.close();
513     s2.awaitComplete();
514 jsr166 1.6 assertEquals(3, s2.nexts);
515     assertEquals(1, s2.completes);
516 dl 1.1 s1.awaitComplete();
517     assertTrue(s1.nexts > 0);
518 jsr166 1.6 assertEquals(1, s1.completes);
519 dl 1.1 }
520    
521     /**
522     * onNext is not issued when requests become zero
523     */
524     public void testRequest2() {
525     SubmissionPublisher<Integer> p = basicPublisher();
526     TestSubscriber s1 = new TestSubscriber();
527     TestSubscriber s2 = new TestSubscriber();
528     p.subscribe(s1);
529     p.subscribe(s2);
530     s2.awaitSubscribe();
531     s1.awaitSubscribe();
532     s1.request = false;
533     p.submit(1);
534     p.submit(2);
535     p.close();
536     s2.awaitComplete();
537 jsr166 1.6 assertEquals(2, s2.nexts);
538     assertEquals(1, s2.completes);
539 dl 1.1 s1.awaitNext(1);
540 jsr166 1.6 assertEquals(1, s1.nexts);
541 dl 1.1 }
542    
543     /**
544     * Negative request causes error
545     */
546     public void testRequest3() {
547     SubmissionPublisher<Integer> p = basicPublisher();
548     TestSubscriber s1 = new TestSubscriber();
549     TestSubscriber s2 = new TestSubscriber();
550     p.subscribe(s1);
551     p.subscribe(s2);
552     s2.awaitSubscribe();
553     s1.awaitSubscribe();
554     s1.sn.request(-1L);
555     p.submit(1);
556     p.submit(2);
557     p.close();
558     s2.awaitComplete();
559 jsr166 1.6 assertEquals(2, s2.nexts);
560     assertEquals(1, s2.completes);
561 dl 1.1 s1.awaitError();
562 jsr166 1.6 assertEquals(1, s1.errors);
563 dl 1.1 assertTrue(s1.lastError instanceof IllegalArgumentException);
564     }
565    
566     /**
567     * estimateMinimumDemand reports 0 until request, nonzero after
568     * request, and zero again after delivery
569     */
570     public void testEstimateMinimumDemand() {
571     TestSubscriber s = new TestSubscriber();
572     SubmissionPublisher<Integer> p = basicPublisher();
573     s.request = false;
574     p.subscribe(s);
575     s.awaitSubscribe();
576 jsr166 1.6 assertEquals(0, p.estimateMinimumDemand());
577 dl 1.1 s.sn.request(1);
578 jsr166 1.6 assertEquals(1, p.estimateMinimumDemand());
579 dl 1.1 p.submit(1);
580     s.awaitNext(1);
581 jsr166 1.6 assertEquals(0, p.estimateMinimumDemand());
582 dl 1.1 }
583    
584     /**
585 jsr166 1.3 * submit to a publisher with no subscribers returns lag 0
586 dl 1.1 */
587     public void testEmptySubmit() {
588     SubmissionPublisher<Integer> p = basicPublisher();
589 jsr166 1.6 assertEquals(0, p.submit(1));
590 dl 1.1 }
591    
592     /**
593 jsr166 1.3 * submit(null) throws NPE
594 dl 1.1 */
595     public void testNullSubmit() {
596     SubmissionPublisher<Integer> p = basicPublisher();
597     try {
598     p.submit(null);
599 jsr166 1.2 shouldThrow();
600     } catch (NullPointerException success) {}
601 dl 1.1 }
602    
603     /**
604 jsr166 1.3 * submit returns number of lagged items, compatible with result
605 dl 1.1 * of estimateMaximumLag.
606     */
607     public void testLaggedSubmit() {
608     SubmissionPublisher<Integer> p = basicPublisher();
609     TestSubscriber s1 = new TestSubscriber();
610     s1.request = false;
611     TestSubscriber s2 = new TestSubscriber();
612     s2.request = false;
613     p.subscribe(s1);
614     p.subscribe(s2);
615     s2.awaitSubscribe();
616     s1.awaitSubscribe();
617 jsr166 1.6 assertEquals(1, p.submit(1));
618 dl 1.1 assertTrue(p.estimateMaximumLag() >= 1);
619     assertTrue(p.submit(2) >= 2);
620     assertTrue(p.estimateMaximumLag() >= 2);
621     s1.sn.request(4);
622     assertTrue(p.submit(3) >= 3);
623     assertTrue(p.estimateMaximumLag() >= 3);
624     s2.sn.request(4);
625     p.submit(4);
626     p.close();
627     s2.awaitComplete();
628 jsr166 1.6 assertEquals(4, s2.nexts);
629 dl 1.1 s1.awaitComplete();
630 jsr166 1.6 assertEquals(4, s2.nexts);
631 dl 1.1 }
632    
633     /**
634     * submit eventually issues requested items when buffer capacity is 1
635     */
636     public void testCap1Submit() {
637     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
638     basicExecutor, 1);
639     TestSubscriber s1 = new TestSubscriber();
640     TestSubscriber s2 = new TestSubscriber();
641     p.subscribe(s1);
642     p.subscribe(s2);
643     for (int i = 1; i <= 20; ++i) {
644     assertTrue(p.estimateMinimumDemand() <= 1);
645     assertTrue(p.submit(i) >= 0);
646     }
647     p.close();
648     s2.awaitComplete();
649     s1.awaitComplete();
650 jsr166 1.6 assertEquals(20, s2.nexts);
651     assertEquals(1, s2.completes);
652     assertEquals(20, s1.nexts);
653     assertEquals(1, s1.completes);
654 dl 1.1 }
655 jsr166 1.2
656 dl 1.1 static boolean noopHandle(AtomicInteger count) {
657     count.getAndIncrement();
658     return false;
659     }
660    
661     static boolean reqHandle(AtomicInteger count, Subscriber s) {
662     count.getAndIncrement();
663     ((TestSubscriber)s).sn.request(Long.MAX_VALUE);
664     return true;
665     }
666    
667     /**
668 jsr166 1.3 * offer to a publisher with no subscribers returns lag 0
669 dl 1.1 */
670     public void testEmptyOffer() {
671     SubmissionPublisher<Integer> p = basicPublisher();
672 jsr166 1.3 assertEquals(0, p.offer(1, null));
673 dl 1.1 }
674    
675     /**
676 jsr166 1.3 * offer(null) throws NPE
677 dl 1.1 */
678     public void testNullOffer() {
679     SubmissionPublisher<Integer> p = basicPublisher();
680     try {
681     p.offer(null, null);
682 jsr166 1.2 shouldThrow();
683     } catch (NullPointerException success) {}
684 dl 1.1 }
685    
686     /**
687 jsr166 1.3 * offer returns number of lagged items if not saturated
688 dl 1.1 */
689     public void testLaggedOffer() {
690     SubmissionPublisher<Integer> p = basicPublisher();
691     TestSubscriber s1 = new TestSubscriber();
692     s1.request = false;
693     TestSubscriber s2 = new TestSubscriber();
694     s2.request = false;
695     p.subscribe(s1);
696     p.subscribe(s2);
697     s2.awaitSubscribe();
698     s1.awaitSubscribe();
699     assertTrue(p.offer(1, null) >= 1);
700     assertTrue(p.offer(2, null) >= 2);
701     s1.sn.request(4);
702     assertTrue(p.offer(3, null) >= 3);
703     s2.sn.request(4);
704     p.offer(4, null);
705     p.close();
706     s2.awaitComplete();
707 jsr166 1.6 assertEquals(4, s2.nexts);
708 dl 1.1 s1.awaitComplete();
709 jsr166 1.6 assertEquals(4, s2.nexts);
710 dl 1.1 }
711    
712     /**
713 jsr166 1.3 * offer reports drops if saturated
714 dl 1.1 */
715     public void testDroppedOffer() {
716     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
717     basicExecutor, 4);
718     TestSubscriber s1 = new TestSubscriber();
719     s1.request = false;
720     TestSubscriber s2 = new TestSubscriber();
721     s2.request = false;
722     p.subscribe(s1);
723     p.subscribe(s2);
724     s2.awaitSubscribe();
725     s1.awaitSubscribe();
726     for (int i = 1; i <= 4; ++i)
727     assertTrue(p.offer(i, null) >= 0);
728     p.offer(5, null);
729     assertTrue(p.offer(6, null) < 0);
730     s1.sn.request(64);
731     assertTrue(p.offer(7, null) < 0);
732     s2.sn.request(64);
733     p.close();
734     s2.awaitComplete();
735     assertTrue(s2.nexts >= 4);
736     s1.awaitComplete();
737     assertTrue(s1.nexts >= 4);
738     }
739    
740     /**
741 jsr166 1.3 * offer invokes drop handler if saturated
742 dl 1.1 */
743     public void testHandledDroppedOffer() {
744     AtomicInteger calls = new AtomicInteger();
745     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
746     basicExecutor, 4);
747     TestSubscriber s1 = new TestSubscriber();
748     s1.request = false;
749     TestSubscriber s2 = new TestSubscriber();
750     s2.request = false;
751     p.subscribe(s1);
752     p.subscribe(s2);
753     s2.awaitSubscribe();
754     s1.awaitSubscribe();
755     for (int i = 1; i <= 4; ++i)
756     assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
757     p.offer(4, (s, x) -> noopHandle(calls));
758     assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
759     s1.sn.request(64);
760     assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
761     s2.sn.request(64);
762     p.close();
763     s2.awaitComplete();
764     s1.awaitComplete();
765     assertTrue(calls.get() >= 4);
766     }
767    
768     /**
769 jsr166 1.3 * offer succeeds if drop handler forces request
770 dl 1.1 */
771     public void testRecoveredHandledDroppedOffer() {
772     AtomicInteger calls = new AtomicInteger();
773     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
774     basicExecutor, 4);
775     TestSubscriber s1 = new TestSubscriber();
776     s1.request = false;
777     TestSubscriber s2 = new TestSubscriber();
778     s2.request = false;
779     p.subscribe(s1);
780     p.subscribe(s2);
781     s2.awaitSubscribe();
782     s1.awaitSubscribe();
783     int n = 0;
784     for (int i = 1; i <= 8; ++i) {
785     int d = p.offer(i, (s, x) -> reqHandle(calls, s));
786     n = n + 2 + (d < 0 ? d : 0);
787     }
788     p.close();
789     s2.awaitComplete();
790     s1.awaitComplete();
791 jsr166 1.6 assertEquals(n, s1.nexts + s2.nexts);
792 dl 1.1 assertTrue(calls.get() >= 2);
793     }
794    
795     /**
796 jsr166 1.4 * Timed offer to a publisher with no subscribers returns lag 0
797 dl 1.1 */
798     public void testEmptyTimedOffer() {
799     SubmissionPublisher<Integer> p = basicPublisher();
800 jsr166 1.7 long startTime = System.nanoTime();
801 jsr166 1.4 assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null));
802 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
803 dl 1.1 }
804    
805     /**
806 jsr166 1.3 * Timed offer with null item or TimeUnit throws NPE
807 dl 1.1 */
808     public void testNullTimedOffer() {
809     SubmissionPublisher<Integer> p = basicPublisher();
810 jsr166 1.7 long startTime = System.nanoTime();
811 dl 1.1 try {
812 jsr166 1.7 p.offer(null, LONG_DELAY_MS, MILLISECONDS, null);
813 jsr166 1.2 shouldThrow();
814     } catch (NullPointerException success) {}
815 dl 1.1 try {
816 jsr166 1.7 p.offer(1, LONG_DELAY_MS, null, null);
817 jsr166 1.2 shouldThrow();
818     } catch (NullPointerException success) {}
819 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
820 dl 1.1 }
821    
822     /**
823 jsr166 1.3 * Timed offer returns number of lagged items if not saturated
824 dl 1.1 */
825     public void testLaggedTimedOffer() {
826     SubmissionPublisher<Integer> p = basicPublisher();
827     TestSubscriber s1 = new TestSubscriber();
828     s1.request = false;
829     TestSubscriber s2 = new TestSubscriber();
830     s2.request = false;
831     p.subscribe(s1);
832     p.subscribe(s2);
833     s2.awaitSubscribe();
834     s1.awaitSubscribe();
835 jsr166 1.7 long startTime = System.nanoTime();
836     assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
837     assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
838 dl 1.1 s1.sn.request(4);
839 jsr166 1.7 assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
840 dl 1.1 s2.sn.request(4);
841 jsr166 1.7 p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
842 dl 1.1 p.close();
843     s2.awaitComplete();
844 jsr166 1.6 assertEquals(4, s2.nexts);
845 dl 1.1 s1.awaitComplete();
846 jsr166 1.6 assertEquals(4, s2.nexts);
847 jsr166 1.7 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
848 dl 1.1 }
849    
850     /**
851 jsr166 1.3 * Timed offer reports drops if saturated
852 dl 1.1 */
853     public void testDroppedTimedOffer() {
854     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
855     basicExecutor, 4);
856     TestSubscriber s1 = new TestSubscriber();
857     s1.request = false;
858     TestSubscriber s2 = new TestSubscriber();
859     s2.request = false;
860     p.subscribe(s1);
861     p.subscribe(s2);
862     s2.awaitSubscribe();
863     s1.awaitSubscribe();
864 dl 1.9 long delay = timeoutMillis();
865 dl 1.1 for (int i = 1; i <= 4; ++i)
866 dl 1.9 assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0);
867     long startTime = System.nanoTime();
868     assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0);
869 dl 1.1 s1.sn.request(64);
870 dl 1.9 assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0);
871     // 2 * delay should elapse but check only 1 * delay to allow timer slop
872     assertTrue(millisElapsedSince(startTime) >= delay);
873 dl 1.1 s2.sn.request(64);
874     p.close();
875     s2.awaitComplete();
876     assertTrue(s2.nexts >= 2);
877     s1.awaitComplete();
878     assertTrue(s1.nexts >= 2);
879     }
880    
881     /**
882 jsr166 1.3 * Timed offer invokes drop handler if saturated
883 dl 1.1 */
884     public void testHandledDroppedTimedOffer() {
885     AtomicInteger calls = new AtomicInteger();
886     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
887     basicExecutor, 4);
888     TestSubscriber s1 = new TestSubscriber();
889     s1.request = false;
890     TestSubscriber s2 = new TestSubscriber();
891     s2.request = false;
892     p.subscribe(s1);
893     p.subscribe(s2);
894     s2.awaitSubscribe();
895     s1.awaitSubscribe();
896 dl 1.9 long delay = timeoutMillis();
897 dl 1.1 for (int i = 1; i <= 4; ++i)
898 dl 1.9 assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
899     long startTime = System.nanoTime();
900     assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
901 dl 1.1 s1.sn.request(64);
902 dl 1.9 assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
903     assertTrue(millisElapsedSince(startTime) >= delay);
904 dl 1.1 s2.sn.request(64);
905     p.close();
906     s2.awaitComplete();
907     s1.awaitComplete();
908     assertTrue(calls.get() >= 2);
909     }
910    
911     /**
912 jsr166 1.3 * Timed offer succeeds if drop handler forces request
913 dl 1.1 */
914     public void testRecoveredHandledDroppedTimedOffer() {
915     AtomicInteger calls = new AtomicInteger();
916     SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
917     basicExecutor, 4);
918     TestSubscriber s1 = new TestSubscriber();
919     s1.request = false;
920     TestSubscriber s2 = new TestSubscriber();
921     s2.request = false;
922     p.subscribe(s1);
923     p.subscribe(s2);
924     s2.awaitSubscribe();
925     s1.awaitSubscribe();
926     int n = 0;
927 dl 1.9 long delay = timeoutMillis();
928     long startTime = System.nanoTime();
929     for (int i = 1; i <= 6; ++i) {
930     int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
931 dl 1.1 n = n + 2 + (d < 0 ? d : 0);
932     }
933 dl 1.9 assertTrue(millisElapsedSince(startTime) >= delay);
934 dl 1.1 p.close();
935     s2.awaitComplete();
936     s1.awaitComplete();
937 jsr166 1.6 assertEquals(n, s1.nexts + s2.nexts);
938 dl 1.1 assertTrue(calls.get() >= 2);
939     }
940    
941 dl 1.10 /**
942     * consume returns a CompletableFuture that is done when
943     * publisher completes
944     */
945     public void testConsume() {
946     AtomicInteger sum = new AtomicInteger();
947     SubmissionPublisher<Integer> p = basicPublisher();
948 jsr166 1.11 CompletableFuture<Void> f =
949 dl 1.10 p.consume((Integer x) -> { sum.getAndAdd(x.intValue()); });
950     int n = 20;
951     for (int i = 1; i <= n; ++i)
952     p.submit(i);
953     p.close();
954     f.join();
955     assertEquals((n * (n + 1)) / 2, sum.get());
956     }
957    
958     /**
959     * consume(null) throws NPE
960     */
961     public void testConsumeNPE() {
962     SubmissionPublisher<Integer> p = basicPublisher();
963     try {
964     CompletableFuture<Void> f = p.consume(null);
965     shouldThrow();
966 jsr166 1.11 } catch (NullPointerException success) {}
967 dl 1.10 }
968    
969     /**
970     * consume eventually stops processing published items if cancelled
971     */
972     public void testCancelledConsume() {
973     AtomicInteger count = new AtomicInteger();
974     SubmissionPublisher<Integer> p = basicPublisher();
975     CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
976     f.cancel(true);
977     int n = 1000000; // arbitrary limit
978     for (int i = 1; i <= n; ++i)
979     p.submit(i);
980     assertTrue(count.get() < n);
981     }
982 jsr166 1.11
983 dl 1.1 }