ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/SubmissionPublisherLoops4.java
(Generate patch)

Comparing jsr166/src/test/loops/SubmissionPublisherLoops4.java (file contents):
Revision 1.1 by dl, Thu Jul 23 11:13:47 2015 UTC vs.
Revision 1.4 by jsr166, Sun Oct 11 01:02:20 2015 UTC

# Line 8 | Line 8 | import java.util.*;
8   import java.util.concurrent.*;
9  
10   /**
11 < * Create PRODUCERS publishers each with PROCESSORS processors
11 > * Creates PRODUCERS publishers each with PROCESSORS processors
12   * each with CONSUMERS subscribers, each sent ITEMS items, with
13 < * max CAP buffering; repeat REPS times
13 > * max CAP buffering; repeats REPS times
14   */
15   public class SubmissionPublisherLoops4 {
16      static final int  ITEMS      = 1 << 20;
# Line 30 | Line 30 | public class SubmissionPublisherLoops4 {
30          if (args.length > 0)
31              reps = Integer.parseInt(args[0]);
32  
33 <        System.out.println("ITEMS: " + ITEMS +
34 <                           " PRODUCERS: " + PRODUCERS +
35 <                           " PROCESSORS: " + PROCESSORS +
33 >        System.out.println("ITEMS: " + ITEMS +
34 >                           " PRODUCERS: " + PRODUCERS +
35 >                           " PROCESSORS: " + PROCESSORS +
36                             " CONSUMERS: " + CONSUMERS +
37                             " CAP: " + CAP);
38          for (int rep = 0; rep < reps; ++rep) {
39 <            long startTime = System.nanoTime();
40 <            for (int i = 0; i < PRODUCERS; ++i)
41 <                new Pub().fork();
42 <            phaser.arriveAndAwaitAdvance();
43 <            long elapsed = System.nanoTime() - startTime;
44 <            double secs = ((double)elapsed) / (1000L * 1000 * 1000);
45 <            double ips = NEXTS / secs;
46 <            System.out.printf("Time: %7.2f", secs);
47 <            System.out.printf(" items per sec: %14.2f\n", ips);
48 <            System.out.println(ForkJoinPool.commonPool());
39 >            oneRun();
40 >            Thread.sleep(1000);
41          }
42      }
43  
44 +    static void oneRun() throws Exception {
45 +        long startTime = System.nanoTime();
46 +        for (int i = 0; i < PRODUCERS; ++i)
47 +            new Pub().fork();
48 +        phaser.arriveAndAwaitAdvance();
49 +        long elapsed = System.nanoTime() - startTime;
50 +        double secs = ((double)elapsed) / (1000L * 1000 * 1000);
51 +        double ips = NEXTS / secs;
52 +        System.out.printf("Time: %7.2f", secs);
53 +        System.out.printf(" items per sec: %14.2f\n", ips);
54 +        System.out.println(ForkJoinPool.commonPool());
55 +    }
56 +
57      static final class Sub implements Flow.Subscriber<Boolean> {
58          int count;
59          Flow.Subscription subscription;
60 <        public void onSubscribe(Flow.Subscription s) {
61 <            (subscription = s).request(CAP);
60 >        public void onSubscribe(Flow.Subscription s) {
61 >            (subscription = s).request(CAP);
62          }
63 <        public void onNext(Boolean b) {
63 >        public void onNext(Boolean b) {
64              if (b && (++count & ((CAP >>> 1) - 1)) == 0)
65                  subscription.request(CAP >>> 1);
66          }
67 <        public void onComplete() {
67 >        public void onComplete() {
68              if (count != ITEMS)
69                  System.out.println("Error: remaining " + (ITEMS - count));
70 <            phaser.arrive();
70 >            phaser.arrive();
71          }
72          public void onError(Throwable t) { t.printStackTrace(); }
73      }
# Line 87 | Line 92 | public class SubmissionPublisherLoops4 {
92      }
93  
94      static final class Pub extends RecursiveAction {
95 <        final SubmissionPublisher<Boolean> pub =
95 >        final SubmissionPublisher<Boolean> pub =
96              new SubmissionPublisher<Boolean>(ForkJoinPool.commonPool(), CAP);
97          public void compute() {
98              SubmissionPublisher<Boolean> p = pub;
# Line 97 | Line 102 | public class SubmissionPublisherLoops4 {
102                      t.subscribe(new Sub());
103                  p.subscribe(t);
104              }
105 <            for (int i = 0; i < ITEMS; ++i)
105 >            for (int i = 0; i < ITEMS; ++i)
106                  p.submit(Boolean.TRUE);
107              p.close();
108          }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines