ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/SubmissionPublisherLoops2.java
Revision: 1.2
Committed: Thu Jul 23 14:58:13 2015 UTC (8 years, 9 months ago) by jsr166
Branch: MAIN
Changes since 1.1: +9 -9 lines
Log Message:
whitespace

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7
8 import java.util.*;
9 import java.util.concurrent.*;
10
11 /**
12 * One FJ publisher, many subscribers
13 */
14 public class SubmissionPublisherLoops2 {
15 static final int ITEMS = 1 << 20;
16 static final int CONSUMERS = 64;
17 static final int CAP = Flow.defaultBufferSize();
18 static final int REPS = 9;
19
20 static final Phaser phaser = new Phaser(CONSUMERS + 1);
21
22 static class Sub implements Flow.Subscriber<Boolean> {
23 Flow.Subscription sn;
24 int count;
25 public void onSubscribe(Flow.Subscription s) {
26 (sn = s).request(CAP);
27 }
28 public void onNext(Boolean t) {
29 if ((++count & (CAP - 1)) == (CAP >>> 1))
30 sn.request(CAP);
31 }
32 public void onError(Throwable t) { t.printStackTrace(); }
33 public void onComplete() {
34 if (count != ITEMS)
35 System.out.println("Error: remaining " + (ITEMS - count));
36 phaser.arrive();
37 }
38 }
39
40 static final class Pub extends RecursiveAction {
41 final SubmissionPublisher<Boolean> pub =
42 new SubmissionPublisher<Boolean>(ForkJoinPool.commonPool(), CAP);
43 public void compute() {
44 SubmissionPublisher<Boolean> p = pub;
45 for (int i = 0; i < CONSUMERS; ++i)
46 p.subscribe(new Sub());
47 for (int i = 0; i < ITEMS; ++i)
48 p.submit(Boolean.TRUE);
49 p.close();
50 }
51 }
52
53 static final long NPS = (1000L * 1000 * 1000);
54
55 public static void main(String[] args) throws Exception {
56 int reps = REPS;
57 if (args.length > 0)
58 reps = Integer.parseInt(args[0]);
59
60 System.out.println("ITEMS: " + ITEMS +
61 " CONSUMERS: " + CONSUMERS +
62 " CAP: " + CAP);
63
64 for (int rep = 0; rep < reps; ++rep) {
65 long startTime = System.nanoTime();
66 new Pub().fork();
67 phaser.arriveAndAwaitAdvance();
68 long elapsed = System.nanoTime() - startTime;
69 double secs = ((double)elapsed) / NPS;
70 System.out.printf("\tTime: %7.3f\n", secs);
71 }
72 }
73 }