ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/SubmissionPublisherLoops2.java
Revision: 1.2
Committed: Thu Jul 23 14:58:13 2015 UTC (8 years, 9 months ago) by jsr166
Branch: MAIN
Changes since 1.1: +9 -9 lines
Log Message:
whitespace

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7    
8     import java.util.*;
9     import java.util.concurrent.*;
10    
11     /**
12     * One FJ publisher, many subscribers
13     */
14     public class SubmissionPublisherLoops2 {
15     static final int ITEMS = 1 << 20;
16     static final int CONSUMERS = 64;
17     static final int CAP = Flow.defaultBufferSize();
18     static final int REPS = 9;
19    
20     static final Phaser phaser = new Phaser(CONSUMERS + 1);
21    
22     static class Sub implements Flow.Subscriber<Boolean> {
23     Flow.Subscription sn;
24     int count;
25 jsr166 1.2 public void onSubscribe(Flow.Subscription s) {
26     (sn = s).request(CAP);
27 dl 1.1 }
28 jsr166 1.2 public void onNext(Boolean t) {
29 dl 1.1 if ((++count & (CAP - 1)) == (CAP >>> 1))
30     sn.request(CAP);
31     }
32     public void onError(Throwable t) { t.printStackTrace(); }
33 jsr166 1.2 public void onComplete() {
34 dl 1.1 if (count != ITEMS)
35     System.out.println("Error: remaining " + (ITEMS - count));
36 jsr166 1.2 phaser.arrive();
37 dl 1.1 }
38     }
39    
40     static final class Pub extends RecursiveAction {
41 jsr166 1.2 final SubmissionPublisher<Boolean> pub =
42 dl 1.1 new SubmissionPublisher<Boolean>(ForkJoinPool.commonPool(), CAP);
43     public void compute() {
44     SubmissionPublisher<Boolean> p = pub;
45     for (int i = 0; i < CONSUMERS; ++i)
46     p.subscribe(new Sub());
47 jsr166 1.2 for (int i = 0; i < ITEMS; ++i)
48 dl 1.1 p.submit(Boolean.TRUE);
49     p.close();
50     }
51     }
52    
53     static final long NPS = (1000L * 1000 * 1000);
54    
55     public static void main(String[] args) throws Exception {
56     int reps = REPS;
57     if (args.length > 0)
58     reps = Integer.parseInt(args[0]);
59 jsr166 1.2
60     System.out.println("ITEMS: " + ITEMS +
61 dl 1.1 " CONSUMERS: " + CONSUMERS +
62     " CAP: " + CAP);
63    
64     for (int rep = 0; rep < reps; ++rep) {
65     long startTime = System.nanoTime();
66     new Pub().fork();
67     phaser.arriveAndAwaitAdvance();
68     long elapsed = System.nanoTime() - startTime;
69     double secs = ((double)elapsed) / NPS;
70     System.out.printf("\tTime: %7.3f\n", secs);
71     }
72     }
73     }