ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/SubmissionPublisherLoops1.java
Revision: 1.1
Committed: Thu Jul 23 11:13:47 2015 UTC (8 years, 9 months ago) by dl
Branch: MAIN
Log Message:
initial commits of SubmissionPublisher tests

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     import java.util.*;
8     import java.util.concurrent.*;
9    
10     /**
11     * One publisher, many subscribers
12     */
13     public class SubmissionPublisherLoops1 {
14     static final int ITEMS = 1 << 20;
15     static final int CONSUMERS = 64;
16     static final int CAP = Flow.defaultBufferSize();
17     static final int REPS = 9;
18    
19     static final Phaser phaser = new Phaser(CONSUMERS + 1);
20    
21     static class Sub implements Flow.Subscriber<Boolean> {
22     Flow.Subscription sn;
23     int count;
24     public void onSubscribe(Flow.Subscription s) {
25     (sn = s).request(CAP);
26     }
27     public void onNext(Boolean t) {
28     if ((++count & (CAP - 1)) == (CAP >>> 1))
29     sn.request(CAP);
30     }
31     public void onError(Throwable t) { t.printStackTrace(); }
32     public void onComplete() {
33     if (count != ITEMS)
34     System.out.println("Error: remaining " + (ITEMS - count));
35     phaser.arrive();
36     }
37     }
38    
39     static final long NPS = (1000L * 1000 * 1000);
40    
41     public static void main(String[] args) throws Exception {
42     int reps = REPS;
43     if (args.length > 0)
44     reps = Integer.parseInt(args[0]);
45    
46     System.out.println("ITEMS: " + ITEMS +
47     " CONSUMERS: " + CONSUMERS +
48     " CAP: " + CAP);
49     ExecutorService exec = ForkJoinPool.commonPool();
50     for (int rep = 0; rep < reps; ++reps) {
51     long startTime = System.nanoTime();
52     final SubmissionPublisher<Boolean> pub =
53     new SubmissionPublisher<Boolean>(exec, CAP);
54     for (int i = 0; i < CONSUMERS; ++i)
55     pub.subscribe(new Sub());
56     for (int i = 0; i < ITEMS; ++i) {
57     pub.submit(Boolean.TRUE);
58     }
59     pub.close();
60     phaser.arriveAndAwaitAdvance();
61     long elapsed = System.nanoTime() - startTime;
62     double secs = ((double)elapsed) / NPS;
63     System.out.printf("\tTime: %7.3f\n", secs);
64     // System.out.println(exec);
65     Thread.sleep(1000);
66     }
67     if (exec != ForkJoinPool.commonPool())
68     exec.shutdown();
69     }
70     }