--- jsr166/src/test/loops/SubmissionPublisherLoops1.java 2015/07/23 11:13:47 1.1 +++ jsr166/src/test/loops/SubmissionPublisherLoops1.java 2016/04/18 11:04:14 1.5 @@ -18,21 +18,21 @@ public class SubmissionPublisherLoops1 { static final Phaser phaser = new Phaser(CONSUMERS + 1); - static class Sub implements Flow.Subscriber { + static final class Sub implements Flow.Subscriber { Flow.Subscription sn; int count; - public void onSubscribe(Flow.Subscription s) { - (sn = s).request(CAP); + public void onSubscribe(Flow.Subscription s) { + (sn = s).request(CAP); } - public void onNext(Boolean t) { + public void onNext(Boolean t) { if ((++count & (CAP - 1)) == (CAP >>> 1)) sn.request(CAP); } public void onError(Throwable t) { t.printStackTrace(); } - public void onComplete() { + public void onComplete() { if (count != ITEMS) System.out.println("Error: remaining " + (ITEMS - count)); - phaser.arrive(); + phaser.arrive(); } } @@ -42,29 +42,33 @@ public class SubmissionPublisherLoops1 { int reps = REPS; if (args.length > 0) reps = Integer.parseInt(args[0]); - - System.out.println("ITEMS: " + ITEMS + + + System.out.println("ITEMS: " + ITEMS + " CONSUMERS: " + CONSUMERS + " CAP: " + CAP); ExecutorService exec = ForkJoinPool.commonPool(); - for (int rep = 0; rep < reps; ++reps) { - long startTime = System.nanoTime(); - final SubmissionPublisher pub = - new SubmissionPublisher(exec, CAP); - for (int i = 0; i < CONSUMERS; ++i) - pub.subscribe(new Sub()); - for (int i = 0; i < ITEMS; ++i) { - pub.submit(Boolean.TRUE); - } - pub.close(); - phaser.arriveAndAwaitAdvance(); - long elapsed = System.nanoTime() - startTime; - double secs = ((double)elapsed) / NPS; - System.out.printf("\tTime: %7.3f\n", secs); - // System.out.println(exec); + for (int rep = 0; rep < reps; ++rep) { + oneRun(exec); + System.out.println(exec); Thread.sleep(1000); } if (exec != ForkJoinPool.commonPool()) exec.shutdown(); } + + static void oneRun(ExecutorService exec) throws Exception { + long startTime = System.nanoTime(); + final SubmissionPublisher pub = + new SubmissionPublisher(exec, CAP); + for (int i = 0; i < CONSUMERS; ++i) + pub.subscribe(new Sub()); + for (int i = 0; i < ITEMS; ++i) { + pub.submit(Boolean.TRUE); + } + pub.close(); + phaser.arriveAndAwaitAdvance(); + long elapsed = System.nanoTime() - startTime; + double secs = ((double)elapsed) / NPS; + System.out.printf("\tTime: %7.3f\n", secs); + } }