ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/SubmissionPublisherLoops1.java
Revision: 1.5
Committed: Mon Apr 18 11:04:14 2016 UTC (8 years ago) by dl
Branch: MAIN
CVS Tags: HEAD
Changes since 1.4: +1 -0 lines
Log Message:
print pool status

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 import java.util.*;
8 import java.util.concurrent.*;
9
10 /**
11 * One publisher, many subscribers
12 */
13 public class SubmissionPublisherLoops1 {
14 static final int ITEMS = 1 << 20;
15 static final int CONSUMERS = 64;
16 static final int CAP = Flow.defaultBufferSize();
17 static final int REPS = 9;
18
19 static final Phaser phaser = new Phaser(CONSUMERS + 1);
20
21 static final class Sub implements Flow.Subscriber<Boolean> {
22 Flow.Subscription sn;
23 int count;
24 public void onSubscribe(Flow.Subscription s) {
25 (sn = s).request(CAP);
26 }
27 public void onNext(Boolean t) {
28 if ((++count & (CAP - 1)) == (CAP >>> 1))
29 sn.request(CAP);
30 }
31 public void onError(Throwable t) { t.printStackTrace(); }
32 public void onComplete() {
33 if (count != ITEMS)
34 System.out.println("Error: remaining " + (ITEMS - count));
35 phaser.arrive();
36 }
37 }
38
39 static final long NPS = (1000L * 1000 * 1000);
40
41 public static void main(String[] args) throws Exception {
42 int reps = REPS;
43 if (args.length > 0)
44 reps = Integer.parseInt(args[0]);
45
46 System.out.println("ITEMS: " + ITEMS +
47 " CONSUMERS: " + CONSUMERS +
48 " CAP: " + CAP);
49 ExecutorService exec = ForkJoinPool.commonPool();
50 for (int rep = 0; rep < reps; ++rep) {
51 oneRun(exec);
52 System.out.println(exec);
53 Thread.sleep(1000);
54 }
55 if (exec != ForkJoinPool.commonPool())
56 exec.shutdown();
57 }
58
59 static void oneRun(ExecutorService exec) throws Exception {
60 long startTime = System.nanoTime();
61 final SubmissionPublisher<Boolean> pub =
62 new SubmissionPublisher<Boolean>(exec, CAP);
63 for (int i = 0; i < CONSUMERS; ++i)
64 pub.subscribe(new Sub());
65 for (int i = 0; i < ITEMS; ++i) {
66 pub.submit(Boolean.TRUE);
67 }
68 pub.close();
69 phaser.arriveAndAwaitAdvance();
70 long elapsed = System.nanoTime() - startTime;
71 double secs = ((double)elapsed) / NPS;
72 System.out.printf("\tTime: %7.3f\n", secs);
73 }
74 }