/*
 * Written by Doug Lea with assistance from members of JCP JSR-166
 * Expert Group and released to the public domain, as explained at
 * http://creativecommons.org/publicdomain/zero/1.0/
 */

import java.util.*;
import java.util.concurrent.*;

public class SPL6 {
    static final int ITEMS = 1 << 20;
    static final int CONSUMERS  = 100;
    static final int CAP        = Flow.defaultBufferSize();

    static final Phaser phaser = new Phaser(CONSUMERS + 1);

    static class Sub implements Flow.Subscriber<Boolean> {
        Flow.Subscription sn;
        int count;
        public void onSubscribe(Flow.Subscription s) { 
            (sn = s).request(1L); 
        }
        public void onNext(Boolean t) { 
            sn.request(1L);
            ++count;
            /*
            int r = (ThreadLocalRandom.current().nextInt() | 1);
            for (int i = 1 << 8; i >= 0; --i) {
                r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
                if (r == 0)
                    ++count;
            }
            */
        }
        public void onError(Throwable t) { t.printStackTrace(); }
        public void onComplete() { 
            if (count != ITEMS)
                System.out.println("Error: remaining " + (ITEMS - count));
            phaser.arrive(); 
        }
    }

    static class XSub implements Flow.Subscriber<Boolean> {
        Flow.Subscription sn;
        int count;
        public void onSubscribe(Flow.Subscription s) { 
            (sn = s).request(CAP); 
        }
        public void onNext(Boolean t) { 
            if ((++count & (CAP - 1)) == (CAP >>> 1))
                sn.request(CAP);
            int r = (ThreadLocalRandom.current().nextInt() | 1);
            for (int i = 1 << 8; i >= 0; --i) {
                r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
                if (r == 0)
                    ++count;
            }
        }
        public void onError(Throwable t) { t.printStackTrace(); }
        public void onComplete() { 
            if (count != ITEMS)
                System.out.println("Error: remaining " + (ITEMS - count));
            phaser.arrive(); 
        }
    }

    static final long NPS = (1000L * 1000 * 1000);

    public static void main(String[] args) throws Exception {
        System.out.println("ITEMS " + ITEMS);
        //        ExecutorService exec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() - 1);
        //        ExecutorService exec = new ForkJoinPool();
        ExecutorService exec = ForkJoinPool.commonPool();
        for (int reps = 0; reps < 9; ++reps) {
            long startTime = System.nanoTime();
            final SubmissionPublisher<Boolean> pub = 
                new SubmissionPublisher<Boolean>(exec, CAP);
            for (int i = 0; i < CONSUMERS; ++i)
                pub.subscribe(new Sub());
            for (int i = 0; i < ITEMS; ++i) {
                pub.submit(Boolean.TRUE);
            }
            pub.close();
            phaser.arriveAndAwaitAdvance();
            long elapsed = System.nanoTime() - startTime;
            double secs = ((double)elapsed) / NPS;
            System.out.printf("\tTime: %7.3f\n", secs);
            System.out.println(ForkJoinPool.commonPool());
            Thread.sleep(1000);
        }
        if (exec != ForkJoinPool.commonPool())
            exec.shutdown();
    }
}
