ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/SubmissionPublisherLoops3.java
Revision: 1.2
Committed: Thu Jul 23 14:58:13 2015 UTC (8 years, 9 months ago) by jsr166
Branch: MAIN
Changes since 1.1: +10 -10 lines
Log Message:
whitespace

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 import java.util.*;
8 import java.util.concurrent.*;
9
10 /**
11 * Create PRODUCERS publishers each with CONSUMERS subscribers,
12 * each sent ITEMS items, with CAP buffering; repeat REPS times
13 */
14 public class SubmissionPublisherLoops3 {
15 static final int ITEMS = 1 << 20;
16 static final int PRODUCERS = 32;
17 static final int CONSUMERS = 32;
18 static final int CAP = Flow.defaultBufferSize();
19 static final int REPS = 9;
20
21 static final Phaser phaser = new Phaser(PRODUCERS * CONSUMERS + 1);
22
23 public static void main(String[] args) throws Exception {
24 int reps = REPS;
25 if (args.length > 0)
26 reps = Integer.parseInt(args[0]);
27
28 System.out.println("ITEMS: " + ITEMS +
29 " PRODUCERS: " + PRODUCERS +
30 " CONSUMERS: " + CONSUMERS +
31 " CAP: " + CAP);
32 long nitems = (long)ITEMS * PRODUCERS * CONSUMERS;
33 for (int rep = 0; rep < reps; ++rep) {
34 long startTime = System.nanoTime();
35 for (int i = 0; i < PRODUCERS; ++i)
36 new Pub().fork();
37 phaser.arriveAndAwaitAdvance();
38 long elapsed = System.nanoTime() - startTime;
39 double secs = ((double)elapsed) / (1000L * 1000 * 1000);
40 double ips = nitems / secs;
41 System.out.printf("Time: %7.2f", secs);
42 System.out.printf(" items per sec: %14.2f\n", ips);
43 System.out.println(ForkJoinPool.commonPool());
44 }
45 }
46
47 static final class Sub implements Flow.Subscriber<Boolean> {
48 int count;
49 Flow.Subscription subscription;
50 public void onSubscribe(Flow.Subscription s) {
51 (subscription = s).request(CAP);
52 }
53 public void onNext(Boolean b) {
54 if (b && (++count & ((CAP >>> 1) - 1)) == 0)
55 subscription.request(CAP >>> 1);
56 }
57 public void onComplete() {
58 if (count != ITEMS)
59 System.out.println("Error: remaining " + (ITEMS - count));
60 phaser.arrive();
61 }
62 public void onError(Throwable t) { t.printStackTrace(); }
63 }
64
65 static final class Pub extends RecursiveAction {
66 final SubmissionPublisher<Boolean> pub =
67 new SubmissionPublisher<Boolean>(ForkJoinPool.commonPool(), CAP);
68 public void compute() {
69 SubmissionPublisher<Boolean> p = pub;
70 for (int i = 0; i < CONSUMERS; ++i)
71 p.subscribe(new Sub());
72 for (int i = 0; i < ITEMS; ++i)
73 p.submit(Boolean.TRUE);
74 p.close();
75 }
76 }
77
78 }