--- jsr166/src/test/loops/ExchangeLoops.java 2005/05/02 19:19:38 1.1 +++ jsr166/src/test/loops/ExchangeLoops.java 2006/02/13 12:39:23 1.2 @@ -1,105 +1,145 @@ /* - * @test - * @summary checks to make sure a pipeline of exchangers passes data. - */ -/* - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain. Use, modify, and - * redistribute this code in any way without acknowledgement. + * Written by Bill Scherer and Doug Lea with assistance from members + * of JCP JSR-166 Expert Group and released to the public domain. Use, + * modify, and redistribute this code in any way without + * acknowledgement. */ import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; public class ExchangeLoops { - static final ExecutorService pool = Executors.newCachedThreadPool(); - static boolean print = false; + static final int NCPUS = Runtime.getRuntime().availableProcessors(); - static class Int { - public int value; - Int(int i) { value = i; } - } - + static final int DEFAULT_THREADS = NCPUS + 2; + static final long DEFAULT_TRIAL_MILLIS = 10000; public static void main(String[] args) throws Exception { - int maxStages = 100; - int iters = 100000; + int maxThreads = DEFAULT_THREADS; + long trialMillis = DEFAULT_TRIAL_MILLIS; + int nReps = 3; + + // Parse and check args + int argc = 0; + while (argc < args.length) { + String option = args[argc++]; + if (option.equals("-t")) + trialMillis = Integer.parseInt(args[argc]); + else if (option.equals("-r")) + nReps = Integer.parseInt(args[argc]); + else + maxThreads = Integer.parseInt(option); + argc++; + } - if (args.length > 0) - maxStages = Integer.parseInt(args[0]); + // Display runtime parameters + System.out.print("ExchangeTest"); + System.out.print(" -t " + trialMillis); + System.out.print(" -r " + nReps); + System.out.print(" max threads " + maxThreads); + System.out.println(); + long warmupTime = 2000; + long sleepTime = 100; + int nw = maxThreads >= 3? 3 : 2; + + System.out.println("Warmups.."); + oneRun(3, warmupTime); + Thread.sleep(sleepTime); + + for (int i = maxThreads; i >= 2; i -= 1) { + oneRun(i, warmupTime++); + // System.gc(); + Thread.sleep(sleepTime); + } - print = false; - System.out.println("Warmup..."); - oneRun(2, 100000); - print = true; - - for (int i = 2; i <= maxStages; i += (i+1) >>> 1) { - System.out.print("Threads: " + i + "\t: "); - oneRun(i, iters); - } - pool.shutdown(); - } - - static class Stage implements Runnable { - final int iters; - final Exchanger left; - final Exchanger right; - final CyclicBarrier barrier; - volatile int result; - Stage (Exchanger left, - Exchanger right, - CyclicBarrier b, int iters) { - this.left = left; - this.right = right; - barrier = b; - this.iters = iters; + /* + for (int i = maxThreads; i >= 2; i -= 1) { + oneRun(i, warmupTime++); } + */ - public void run() { - try { - barrier.await(); - Int item = new Int(hashCode()); - for (int i = 0; i < iters; ++i) { - if (left != null) { - item.value = LoopHelpers.compute1(item.value); - Int other = left.exchange(item); - if (other == item || other == null) - throw new Error("Failed Exchange"); - item = other; - - } - if (right != null) { - item.value = LoopHelpers.compute2(item.value); - Int other = right.exchange(item); - if (other == item || other == null) - throw new Error("Failed Exchange"); - item = other; - } - } - barrier.await(); - + for (int j = 0; j < nReps; ++j) { + System.out.println("Trial: " + j); + for (int i = 2; i <= maxThreads; i += 2) { + oneRun(i, trialMillis); + // System.gc(); + Thread.sleep(sleepTime); } - catch (Exception ie) { - ie.printStackTrace(); - return; + for (int i = maxThreads; i >= 2; i -= 2) { + oneRun(i, trialMillis); + // System.gc(); + Thread.sleep(sleepTime); } + Thread.sleep(sleepTime); } + + } - static void oneRun(int nthreads, int iters) throws Exception { - LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer(); - CyclicBarrier barrier = new CyclicBarrier(nthreads + 1, timer); - Exchanger l = null; - Exchanger r = new Exchanger(); - for (int i = 0; i < nthreads; ++i) { - pool.execute(new Stage(l, r, barrier, iters)); - l = r; - r = (i+2 < nthreads) ? new Exchanger() : null; - } - barrier.await(); - barrier.await(); - long time = timer.getTime(); - if (print) - System.out.println(LoopHelpers.rightJustify(time / (iters * nthreads + iters * (nthreads-2))) + " ns per transfer"); + static void oneRun(int nThreads, long trialMillis) throws Exception { + System.out.printf("%4d threads", nThreads); + System.out.printf("%9dms", trialMillis); + Exchanger x = new Exchanger(); + Runner[] runners = new Runner[nThreads]; + Thread[] threads = new Thread[nThreads]; + for (int i = 0; i < nThreads; ++i) { + runners[i] = new Runner(x); + threads[i] = new Thread(runners[i]); + // int h = System.identityHashCode(threads[i]); + // h ^= h << 1; + // h ^= h >>> 3; + // h ^= h << 10; + // System.out.printf("%10x\n", h); + } + + long startTime = System.nanoTime(); + for (int i = 0; i < nThreads; ++i) { + threads[i].start(); + } + Thread.sleep(trialMillis); + for (int i = 0; i < nThreads; ++i) + threads[i].interrupt(); + long elapsed = System.nanoTime() - startTime; + for (int i = 0; i < nThreads; ++i) + threads[i].join(); + int iters = 1; + // System.out.println(); + for (int i = 0; i < nThreads; ++i) { + int ipr = runners[i].iters; + // System.out.println(ipr); + iters += ipr; + } + long rate = iters * 1000L * 1000L * 1000L / elapsed; + long npt = elapsed / iters; + System.out.printf("%9d it/s ", rate); + System.out.printf("%9d ns/it", npt); + System.out.println(); + // x.printStats(); } + + static final class Runner implements Runnable { + final Exchanger exchanger; + final Object mine = new Integer(2688); + volatile int iters; + Runner(Exchanger x) { this.exchanger = x; } + public void run() { + Exchanger x = exchanger; + Object m = mine; + int i = 0; + try { + for (;;) { + Object e = x.exchange(m); + if (e == null || e == m) + throw new Error(); + m = e; + ++i; + } + } catch (InterruptedException ie) { + iters = i; + } + } + } } +