

import java.util.*;
import java.util.concurrent.*;

public class SPL2 {
    static final int N = 1 << 20;
    static final int P = 16;
    static final int C = 16;
    static final int REPS = 100;
    static final int BATCH = 1 << 8;

    static class BatchedSub implements Flow.Subscriber<Boolean> {
        final CountDownLatch done;
        BatchedSub(CountDownLatch latch) { this.done = latch; }
        Flow.Subscription sn;
        int count;
        public void onSubscribe(Flow.Subscription s) { 
            (sn = s).request(BATCH); 
        }
        public void onNext(Boolean t) { 
            if ((++count & (BATCH - 1)) == (BATCH >>> 1))
                sn.request(BATCH);
        }
        public void onError(Throwable t) { t.printStackTrace(); }
        public void onComplete() { done.countDown(); }
    }

    static class UnbatchedSub implements Flow.Subscriber<Boolean> {
        final CountDownLatch done;
        UnbatchedSub(CountDownLatch latch) { this.done = latch; }
        int count;
        public void onSubscribe(Flow.Subscription s) { 
            s.request(Long.MAX_VALUE); 
        }
        public void onNext(Boolean t) { ++count; }
        public void onError(Throwable t) { t.printStackTrace(); }
        public void onComplete() { done.countDown(); }
    }

    static final long NPS = (1000L * 1000 * 1000);

    static final class Pub extends RecursiveAction {
        public void compute() {
            final SubmissionPublisher<Boolean> pub = 
                new SubmissionPublisher<Boolean>(
                    ForkJoinPool.commonPool(), 1<<10);
            CountDownLatch done = new CountDownLatch(C);
            for (int i = 0; i < C; ++i)
                pub.subscribe(new BatchedSub(done));
            List<Flow.Subscriber<? super Boolean>> subs = pub.getSubscribers();
            for (int i = 0; i < N; ++i) 
                pub.submit(Boolean.TRUE);
            pub.close();
            try { done.await(); } catch(Exception ignore) {}
            int ns = 0;
            for (Flow.Subscriber<? super Boolean> sub : subs) {
                BatchedSub b = (BatchedSub)sub;
                if (b.count != N) throw new Error();
                ++ns;
            }
            if (ns != C) throw new Error();
        }
    }

    public static void main(String[] args) throws Exception {
        System.out.println("N: " + N + " P: " + P + " C: " + C);
        for (int reps = 0; reps < REPS; ++reps) {
            long startTime = System.nanoTime();
            Pub[] ps = new Pub[P];
            for (int i = 0; i < P; ++i) {
                ps[i] = new Pub();
            }
            ForkJoinTask.invokeAll(ps);
            long elapsed = System.nanoTime() - startTime;
            double secs = ((double)elapsed) / NPS;
            System.out.printf("Time: %7.3f\n", secs);
            System.out.println(ForkJoinPool.commonPool());
            Thread.sleep(1000);
        }
    }
}
