ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/SubmissionPublisherLoops3.java
(Generate patch)

Comparing jsr166/src/test/loops/SubmissionPublisherLoops3.java (file contents):
Revision 1.1 by dl, Thu Jul 23 11:13:47 2015 UTC vs.
Revision 1.4 by jsr166, Sun Oct 11 01:02:20 2015 UTC

# Line 8 | Line 8 | import java.util.*;
8   import java.util.concurrent.*;
9  
10   /**
11 < * Create PRODUCERS publishers each with CONSUMERS subscribers,
12 < * each sent ITEMS items, with CAP buffering; repeat REPS times
11 > * Creates PRODUCERS publishers each with CONSUMERS subscribers,
12 > * each sent ITEMS items, with CAP buffering; repeats REPS times
13   */
14   public class SubmissionPublisherLoops3 {
15      static final int ITEMS      = 1 << 20;
# Line 24 | Line 24 | public class SubmissionPublisherLoops3 {
24          int reps = REPS;
25          if (args.length > 0)
26              reps = Integer.parseInt(args[0]);
27 <        
28 <        System.out.println("ITEMS: " + ITEMS +
29 <                           " PRODUCERS: " + PRODUCERS +
27 >
28 >        System.out.println("ITEMS: " + ITEMS +
29 >                           " PRODUCERS: " + PRODUCERS +
30                             " CONSUMERS: " + CONSUMERS +
31                             " CAP: " + CAP);
32        long nitems = (long)ITEMS * PRODUCERS * CONSUMERS;
32          for (int rep = 0; rep < reps; ++rep) {
33 <            long startTime = System.nanoTime();
34 <            for (int i = 0; i < PRODUCERS; ++i)
36 <                new Pub().fork();
37 <            phaser.arriveAndAwaitAdvance();
38 <            long elapsed = System.nanoTime() - startTime;
39 <            double secs = ((double)elapsed) / (1000L * 1000 * 1000);
40 <            double ips = nitems / secs;
41 <            System.out.printf("Time: %7.2f", secs);
42 <            System.out.printf(" items per sec: %14.2f\n", ips);
43 <            System.out.println(ForkJoinPool.commonPool());
33 >            oneRun();
34 >            Thread.sleep(1000);
35          }
36      }
37  
38 +    static void oneRun() throws Exception {
39 +        long nitems = (long)ITEMS * PRODUCERS * CONSUMERS;
40 +        long startTime = System.nanoTime();
41 +        for (int i = 0; i < PRODUCERS; ++i)
42 +            new Pub().fork();
43 +        phaser.arriveAndAwaitAdvance();
44 +        long elapsed = System.nanoTime() - startTime;
45 +        double secs = ((double)elapsed) / (1000L * 1000 * 1000);
46 +        double ips = nitems / secs;
47 +        System.out.printf("Time: %7.2f", secs);
48 +        System.out.printf(" items per sec: %14.2f\n", ips);
49 +        System.out.println(ForkJoinPool.commonPool());
50 +    }
51 +
52      static final class Sub implements Flow.Subscriber<Boolean> {
53          int count;
54          Flow.Subscription subscription;
55 <        public void onSubscribe(Flow.Subscription s) {
56 <            (subscription = s).request(CAP);
55 >        public void onSubscribe(Flow.Subscription s) {
56 >            (subscription = s).request(CAP);
57          }
58 <        public void onNext(Boolean b) {
58 >        public void onNext(Boolean b) {
59              if (b && (++count & ((CAP >>> 1) - 1)) == 0)
60                  subscription.request(CAP >>> 1);
61          }
62 <        public void onComplete() {
62 >        public void onComplete() {
63              if (count != ITEMS)
64                  System.out.println("Error: remaining " + (ITEMS - count));
65 <            phaser.arrive();
65 >            phaser.arrive();
66          }
67          public void onError(Throwable t) { t.printStackTrace(); }
68      }
69  
70      static final class Pub extends RecursiveAction {
71 <        final SubmissionPublisher<Boolean> pub =
71 >        final SubmissionPublisher<Boolean> pub =
72              new SubmissionPublisher<Boolean>(ForkJoinPool.commonPool(), CAP);
73          public void compute() {
74              SubmissionPublisher<Boolean> p = pub;
75              for (int i = 0; i < CONSUMERS; ++i)
76                  p.subscribe(new Sub());
77 <            for (int i = 0; i < ITEMS; ++i)
77 >            for (int i = 0; i < ITEMS; ++i)
78                  p.submit(Boolean.TRUE);
79              p.close();
80          }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines