ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/SubmissionPublisherLoops4.java
Revision: 1.4
Committed: Sun Oct 11 01:02:20 2015 UTC (8 years, 6 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.3: +2 -2 lines
Log Message:
javadoc style

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 import java.util.*;
8 import java.util.concurrent.*;
9
10 /**
11 * Creates PRODUCERS publishers each with PROCESSORS processors
12 * each with CONSUMERS subscribers, each sent ITEMS items, with
13 * max CAP buffering; repeats REPS times
14 */
15 public class SubmissionPublisherLoops4 {
16 static final int ITEMS = 1 << 20;
17 static final int PRODUCERS = 32;
18 static final int PROCESSORS = 32;
19 static final int CONSUMERS = 32;
20 static final int CAP = Flow.defaultBufferSize();
21
22 static final int REPS = 9;
23
24 static final int SINKS = PRODUCERS * PROCESSORS * CONSUMERS;
25 static final long NEXTS = (long)ITEMS * SINKS;
26 static final Phaser phaser = new Phaser(SINKS + 1);
27
28 public static void main(String[] args) throws Exception {
29 int reps = REPS;
30 if (args.length > 0)
31 reps = Integer.parseInt(args[0]);
32
33 System.out.println("ITEMS: " + ITEMS +
34 " PRODUCERS: " + PRODUCERS +
35 " PROCESSORS: " + PROCESSORS +
36 " CONSUMERS: " + CONSUMERS +
37 " CAP: " + CAP);
38 for (int rep = 0; rep < reps; ++rep) {
39 oneRun();
40 Thread.sleep(1000);
41 }
42 }
43
44 static void oneRun() throws Exception {
45 long startTime = System.nanoTime();
46 for (int i = 0; i < PRODUCERS; ++i)
47 new Pub().fork();
48 phaser.arriveAndAwaitAdvance();
49 long elapsed = System.nanoTime() - startTime;
50 double secs = ((double)elapsed) / (1000L * 1000 * 1000);
51 double ips = NEXTS / secs;
52 System.out.printf("Time: %7.2f", secs);
53 System.out.printf(" items per sec: %14.2f\n", ips);
54 System.out.println(ForkJoinPool.commonPool());
55 }
56
57 static final class Sub implements Flow.Subscriber<Boolean> {
58 int count;
59 Flow.Subscription subscription;
60 public void onSubscribe(Flow.Subscription s) {
61 (subscription = s).request(CAP);
62 }
63 public void onNext(Boolean b) {
64 if (b && (++count & ((CAP >>> 1) - 1)) == 0)
65 subscription.request(CAP >>> 1);
66 }
67 public void onComplete() {
68 if (count != ITEMS)
69 System.out.println("Error: remaining " + (ITEMS - count));
70 phaser.arrive();
71 }
72 public void onError(Throwable t) { t.printStackTrace(); }
73 }
74
75 static final class Proc extends SubmissionPublisher<Boolean>
76 implements Flow.Processor<Boolean,Boolean> {
77 Flow.Subscription subscription;
78 int count;
79 Proc(Executor executor, int maxBufferCapacity) {
80 super(executor, maxBufferCapacity);
81 }
82 public void onSubscribe(Flow.Subscription subscription) {
83 (this.subscription = subscription).request(CAP);
84 }
85 public void onNext(Boolean item) {
86 if ((++count & ((CAP >>> 1) - 1)) == 0)
87 subscription.request(CAP >>> 1);
88 submit(item);
89 }
90 public void onError(Throwable ex) { closeExceptionally(ex); }
91 public void onComplete() { close(); }
92 }
93
94 static final class Pub extends RecursiveAction {
95 final SubmissionPublisher<Boolean> pub =
96 new SubmissionPublisher<Boolean>(ForkJoinPool.commonPool(), CAP);
97 public void compute() {
98 SubmissionPublisher<Boolean> p = pub;
99 for (int j = 0; j < PROCESSORS; ++j) {
100 Proc t = new Proc(ForkJoinPool.commonPool(), CAP);
101 for (int i = 0; i < CONSUMERS; ++i)
102 t.subscribe(new Sub());
103 p.subscribe(t);
104 }
105 for (int i = 0; i < ITEMS; ++i)
106 p.submit(Boolean.TRUE);
107 p.close();
108 }
109 }
110 }