--- jsr166/src/test/loops/SubmissionPublisherLoops4.java 2015/07/23 11:13:47 1.1 +++ jsr166/src/test/loops/SubmissionPublisherLoops4.java 2015/10/11 01:02:20 1.4 @@ -8,9 +8,9 @@ import java.util.*; import java.util.concurrent.*; /** - * Create PRODUCERS publishers each with PROCESSORS processors + * Creates PRODUCERS publishers each with PROCESSORS processors * each with CONSUMERS subscribers, each sent ITEMS items, with - * max CAP buffering; repeat REPS times + * max CAP buffering; repeats REPS times */ public class SubmissionPublisherLoops4 { static final int ITEMS = 1 << 20; @@ -30,39 +30,44 @@ public class SubmissionPublisherLoops4 { if (args.length > 0) reps = Integer.parseInt(args[0]); - System.out.println("ITEMS: " + ITEMS + - " PRODUCERS: " + PRODUCERS + - " PROCESSORS: " + PROCESSORS + + System.out.println("ITEMS: " + ITEMS + + " PRODUCERS: " + PRODUCERS + + " PROCESSORS: " + PROCESSORS + " CONSUMERS: " + CONSUMERS + " CAP: " + CAP); for (int rep = 0; rep < reps; ++rep) { - long startTime = System.nanoTime(); - for (int i = 0; i < PRODUCERS; ++i) - new Pub().fork(); - phaser.arriveAndAwaitAdvance(); - long elapsed = System.nanoTime() - startTime; - double secs = ((double)elapsed) / (1000L * 1000 * 1000); - double ips = NEXTS / secs; - System.out.printf("Time: %7.2f", secs); - System.out.printf(" items per sec: %14.2f\n", ips); - System.out.println(ForkJoinPool.commonPool()); + oneRun(); + Thread.sleep(1000); } } + static void oneRun() throws Exception { + long startTime = System.nanoTime(); + for (int i = 0; i < PRODUCERS; ++i) + new Pub().fork(); + phaser.arriveAndAwaitAdvance(); + long elapsed = System.nanoTime() - startTime; + double secs = ((double)elapsed) / (1000L * 1000 * 1000); + double ips = NEXTS / secs; + System.out.printf("Time: %7.2f", secs); + System.out.printf(" items per sec: %14.2f\n", ips); + System.out.println(ForkJoinPool.commonPool()); + } + static final class Sub implements Flow.Subscriber { int count; Flow.Subscription subscription; - public void onSubscribe(Flow.Subscription s) { - (subscription = s).request(CAP); + public void onSubscribe(Flow.Subscription s) { + (subscription = s).request(CAP); } - public void onNext(Boolean b) { + public void onNext(Boolean b) { if (b && (++count & ((CAP >>> 1) - 1)) == 0) subscription.request(CAP >>> 1); } - public void onComplete() { + public void onComplete() { if (count != ITEMS) System.out.println("Error: remaining " + (ITEMS - count)); - phaser.arrive(); + phaser.arrive(); } public void onError(Throwable t) { t.printStackTrace(); } } @@ -87,7 +92,7 @@ public class SubmissionPublisherLoops4 { } static final class Pub extends RecursiveAction { - final SubmissionPublisher pub = + final SubmissionPublisher pub = new SubmissionPublisher(ForkJoinPool.commonPool(), CAP); public void compute() { SubmissionPublisher p = pub; @@ -97,7 +102,7 @@ public class SubmissionPublisherLoops4 { t.subscribe(new Sub()); p.subscribe(t); } - for (int i = 0; i < ITEMS; ++i) + for (int i = 0; i < ITEMS; ++i) p.submit(Boolean.TRUE); p.close(); }