ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/SubmissionPublisherLoops4.java
Revision: 1.3
Committed: Sat Sep 12 20:30:46 2015 UTC (8 years, 8 months ago) by dl
Branch: MAIN
Changes since 1.2: +15 -10 lines
Log Message:
Use common conventions

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     import java.util.*;
8     import java.util.concurrent.*;
9    
10     /**
11     * Create PRODUCERS publishers each with PROCESSORS processors
12     * each with CONSUMERS subscribers, each sent ITEMS items, with
13     * max CAP buffering; repeat REPS times
14     */
15     public class SubmissionPublisherLoops4 {
16     static final int ITEMS = 1 << 20;
17     static final int PRODUCERS = 32;
18     static final int PROCESSORS = 32;
19     static final int CONSUMERS = 32;
20     static final int CAP = Flow.defaultBufferSize();
21    
22     static final int REPS = 9;
23    
24     static final int SINKS = PRODUCERS * PROCESSORS * CONSUMERS;
25     static final long NEXTS = (long)ITEMS * SINKS;
26     static final Phaser phaser = new Phaser(SINKS + 1);
27    
28     public static void main(String[] args) throws Exception {
29     int reps = REPS;
30     if (args.length > 0)
31     reps = Integer.parseInt(args[0]);
32    
33 jsr166 1.2 System.out.println("ITEMS: " + ITEMS +
34     " PRODUCERS: " + PRODUCERS +
35     " PROCESSORS: " + PROCESSORS +
36 dl 1.1 " CONSUMERS: " + CONSUMERS +
37     " CAP: " + CAP);
38     for (int rep = 0; rep < reps; ++rep) {
39 dl 1.3 oneRun();
40     Thread.sleep(1000);
41 dl 1.1 }
42     }
43    
44 dl 1.3 static void oneRun() throws Exception {
45     long startTime = System.nanoTime();
46     for (int i = 0; i < PRODUCERS; ++i)
47     new Pub().fork();
48     phaser.arriveAndAwaitAdvance();
49     long elapsed = System.nanoTime() - startTime;
50     double secs = ((double)elapsed) / (1000L * 1000 * 1000);
51     double ips = NEXTS / secs;
52     System.out.printf("Time: %7.2f", secs);
53     System.out.printf(" items per sec: %14.2f\n", ips);
54     System.out.println(ForkJoinPool.commonPool());
55     }
56    
57 dl 1.1 static final class Sub implements Flow.Subscriber<Boolean> {
58     int count;
59     Flow.Subscription subscription;
60 jsr166 1.2 public void onSubscribe(Flow.Subscription s) {
61     (subscription = s).request(CAP);
62 dl 1.1 }
63 jsr166 1.2 public void onNext(Boolean b) {
64 dl 1.1 if (b && (++count & ((CAP >>> 1) - 1)) == 0)
65     subscription.request(CAP >>> 1);
66     }
67 jsr166 1.2 public void onComplete() {
68 dl 1.1 if (count != ITEMS)
69     System.out.println("Error: remaining " + (ITEMS - count));
70 jsr166 1.2 phaser.arrive();
71 dl 1.1 }
72     public void onError(Throwable t) { t.printStackTrace(); }
73     }
74    
75     static final class Proc extends SubmissionPublisher<Boolean>
76     implements Flow.Processor<Boolean,Boolean> {
77     Flow.Subscription subscription;
78     int count;
79     Proc(Executor executor, int maxBufferCapacity) {
80     super(executor, maxBufferCapacity);
81     }
82     public void onSubscribe(Flow.Subscription subscription) {
83     (this.subscription = subscription).request(CAP);
84     }
85     public void onNext(Boolean item) {
86     if ((++count & ((CAP >>> 1) - 1)) == 0)
87     subscription.request(CAP >>> 1);
88     submit(item);
89     }
90     public void onError(Throwable ex) { closeExceptionally(ex); }
91     public void onComplete() { close(); }
92     }
93    
94     static final class Pub extends RecursiveAction {
95 jsr166 1.2 final SubmissionPublisher<Boolean> pub =
96 dl 1.1 new SubmissionPublisher<Boolean>(ForkJoinPool.commonPool(), CAP);
97     public void compute() {
98     SubmissionPublisher<Boolean> p = pub;
99     for (int j = 0; j < PROCESSORS; ++j) {
100     Proc t = new Proc(ForkJoinPool.commonPool(), CAP);
101     for (int i = 0; i < CONSUMERS; ++i)
102     t.subscribe(new Sub());
103     p.subscribe(t);
104     }
105 jsr166 1.2 for (int i = 0; i < ITEMS; ++i)
106 dl 1.1 p.submit(Boolean.TRUE);
107     p.close();
108     }
109     }
110     }