--- jsr166/src/test/loops/TSPExchangerTest.java 2005/12/12 20:06:20 1.3 +++ jsr166/src/test/loops/TSPExchangerTest.java 2006/02/13 12:39:23 1.4 @@ -11,29 +11,42 @@ import java.util.concurrent.locks.*; /** * A parallel Traveling Salesperson Problem (TSP) program based on a - * genetic algorithm using an Exchanger. A population of chromosomes - * is distributed among "pools". The chromosomes represent tours, and - * their fitness is the total tour length. A Task is associated with - * each pool. Each task repeatedly does, for a fixed number of - * iterations (generations): + * genetic algorithm using an Exchanger. A population of chromosomes is + * distributed among "subpops". Each chromosomes represents a tour, + * and its fitness is the total tour length. + * + * A set of worker threads perform updates on subpops. The basic + * update step is: *
    - *
  1. Select a breeder b from the pool + *
  2. Select a breeder b from the subpop *
  3. Create a strand of its tour with a random starting point and length *
  4. Offer the strand to the exchanger, receiving a strand from - * another pool + * another subpop *
  5. Combine b and the received strand using crossing function to * create new chromosome c. - *
  6. Replace a chromosome in the pool with c. + *
  7. Replace a chromosome in the subpop with c. *
* + * This continues for a given number of generations per subpop. + * Because there are normally more subpops than threads, each worker + * thread performs small (randomly sized) run of updates for one + * subpop and then selects another. A run continues until there is at + * most one remaining thread performing updates. + * * See below for more details. - *

- * */ public class TSPExchangerTest { static final int NCPUS = Runtime.getRuntime().availableProcessors(); - static final int DEFAULT_MAX_THREADS = NCPUS + 6; + /** Runs start with two threads, increasing by two through max */ + static final int DEFAULT_MAX_THREADS = Math.max(4, NCPUS + NCPUS/2); + + /** The number of replication runs per thread value */ + static final int DEFAULT_REPLICATIONS = 3; + + /** If true, print statistics in SNAPSHOT_RATE intervals */ + static boolean verbose = true; + static final long SNAPSHOT_RATE = 10000; // in milliseconds /** * The problem size. Each city is a random point. The goal is to @@ -44,27 +57,27 @@ public class TSPExchangerTest { // Tuning parameters. /** - * The number of chromosomes per pool. Must be a power of two. + * The number of chromosomes per subpop. Must be a power of two. * * Smaller values lead to faster iterations but poorer quality * results */ - static final int DEFAULT_POOL_SIZE = 32; + static final int DEFAULT_SUBPOP_SIZE = 32; /** - * The number of iterations per task. Convergence appears + * The number of iterations per subpop. Convergence appears * to be roughly proportional to #cities-squared */ static final int DEFAULT_GENERATIONS = DEFAULT_CITIES * DEFAULT_CITIES; /** - * The number of pools. The total population is #pools * poolSize, + * The number of subpops. The total population is #subpops * subpopSize, * which should be roughly on the order of #cities-squared * * Smaller values lead to faster total runs but poorer quality * results */ - static final int DEFAULT_NPOOLS = DEFAULT_GENERATIONS / DEFAULT_POOL_SIZE; + static final int DEFAULT_NSUBPOPS = DEFAULT_GENERATIONS / DEFAULT_SUBPOP_SIZE; /** * The minimum length for a random chromosome strand. @@ -84,7 +97,7 @@ public class TSPExchangerTest { * Probablility control for selecting breeders. * Breeders are selected starting at the best-fitness chromosome, * with exponentially decaying probablility - * 1 / (poolSize >>> BREEDER_DECAY). + * 1 / (subpopSize >>> BREEDER_DECAY). * * Larger values usually cause faster convergence but poorer * quality results @@ -95,16 +108,13 @@ public class TSPExchangerTest { * Probablility control for selecting dyers. * Dyers are selected starting at the worst-fitness chromosome, * with exponentially decaying probablility - * 1 / (poolSize >>> DYER_DECAY) + * 1 / (subpopSize >>> DYER_DECAY) * * Larger values usually cause faster convergence but poorer * quality results */ static final int DYER_DECAY = 1; - static final boolean verbose = false; - static final long SNAPSHOT_RATE = 10000; // in milliseconds - /** * The set of cities. Created once per program run, to * make it easier to compare solutions across different runs. @@ -114,9 +124,10 @@ public class TSPExchangerTest { public static void main(String[] args) throws Exception { int maxThreads = DEFAULT_MAX_THREADS; int nCities = DEFAULT_CITIES; - int poolSize = DEFAULT_POOL_SIZE; + int subpopSize = DEFAULT_SUBPOP_SIZE; int nGen = nCities * nCities; - int nPools = nCities * nCities / poolSize; + int nSubpops = nCities * nCities / subpopSize; + int nReps = DEFAULT_REPLICATIONS; try { int argc = 0; @@ -125,14 +136,20 @@ public class TSPExchangerTest { if (option.equals("-c")) { nCities = Integer.parseInt(args[argc]); nGen = nCities * nCities; - nPools = nCities * nCities / poolSize; + nSubpops = nCities * nCities / subpopSize; } else if (option.equals("-p")) - poolSize = Integer.parseInt(args[argc]); + subpopSize = Integer.parseInt(args[argc]); else if (option.equals("-g")) nGen = Integer.parseInt(args[argc]); else if (option.equals("-n")) - nPools = Integer.parseInt(args[argc]); + nSubpops = Integer.parseInt(args[argc]); + else if (option.equals("-q")) { + verbose = false; + argc--; + } + else if (option.equals("-r")) + nReps = Integer.parseInt(args[argc]); else maxThreads = Integer.parseInt(option); argc++; @@ -145,129 +162,149 @@ public class TSPExchangerTest { System.out.print("TSPExchangerTest"); System.out.print(" -c " + nCities); System.out.print(" -g " + nGen); - System.out.print(" -p " + poolSize); - System.out.print(" -n " + nPools); + System.out.print(" -p " + subpopSize); + System.out.print(" -n " + nSubpops); + System.out.print(" -r " + nReps); System.out.print(" max threads " + maxThreads); System.out.println(); cities = new CitySet(nCities); - for (int i = 2; i <= maxThreads; i += 2) - oneRun(i, nPools, poolSize, nGen); + if (false && NCPUS > 4) { + int h = NCPUS/2; + System.out.printf("Threads: %4d Warmup\n", h); + oneRun(h, nSubpops, subpopSize, nGen); + Thread.sleep(500); + } + + int maxt = (maxThreads < nSubpops) ? maxThreads : nSubpops; + for (int j = 0; j < nReps; ++j) { + for (int i = 2; i <= maxt; i += 2) { + System.out.printf("Threads: %4d Replication: %2d\n", i, j); + oneRun(i, nSubpops, subpopSize, nGen); + Thread.sleep(500); + } + } } static void reportUsageErrorAndDie() { System.out.print("usage: TSPExchangerTest"); System.out.print(" [-c #cities]"); - System.out.print(" [-p #poolSize]"); + System.out.print(" [-p #subpopSize]"); System.out.print(" [-g #generations]"); - System.out.print(" [-n #pools]"); + System.out.print(" [-n #subpops]"); + System.out.print(" [-r #replications]"); + System.out.print(" [-q ]"); System.out.print(" #threads]"); System.out.println(); System.exit(0); } /** - * Perform one run with the given parameters. Each run completes - * when there are fewer than nThreads-2 tasks remaining. This - * avoids measuring termination effects, as well as cases where - * the one last remaining task has no one left to exchange with, - * so the pool is abruptly terminated. + * Perform one run with the given parameters. Each run complete + * when there are fewer than 2 active threads. When there is + * only one remaining thread, it will have no one to exchange + * with, so it is terminated (via interrupt). */ - static void oneRun(int nThreads, int nPools, int poolSize, int nGen) + static void oneRun(int nThreads, int nSubpops, int subpopSize, int nGen) throws InterruptedException { - Population p = new Population(nThreads, nPools, poolSize, nGen); + Population p = new Population(nThreads, nSubpops, subpopSize, nGen); ProgressMonitor mon = null; if (verbose) { + p.printSnapshot(0); mon = new ProgressMonitor(p); mon.start(); } - p.printSnapshot(0); long startTime = System.nanoTime(); p.start(); - p.awaitTasks(); + p.awaitDone(); long stopTime = System.nanoTime(); if (mon != null) mon.interrupt(); p.shutdown(); - Thread.sleep(100); + // Thread.sleep(100); long elapsed = stopTime - startTime; - long rate = elapsed / (nPools * nGen); double secs = (double)elapsed / 1000000000.0; p.printSnapshot(secs); - System.out.printf("%10d ns per transfer\n", rate); } /** - * A Population creates the pools, tasks, and threads for a run + * A Population creates the subpops, subpops, and threads for a run * and has control methods to start, stop, and report progress. */ static final class Population { - final Task[] tasks; + final Worker[] threads; + final Subpop[] subpops; final Exchanger exchanger; - final ThreadPoolExecutor exec; final CountDownLatch done; final int nGen; - final int poolSize; + final int subpopSize; final int nThreads; - Population(int nThreads, int nPools, int poolSize, int nGen) { + Population(int nThreads, int nSubpops, int subpopSize, int nGen) { this.nThreads = nThreads; this.nGen = nGen; - this.poolSize = poolSize; + this.subpopSize = subpopSize; this.exchanger = new Exchanger(); - this.done = new CountDownLatch(Math.max(1, nPools - nThreads - 2)); - this.tasks = new Task[nPools]; - for (int i = 0; i < nPools; i++) - tasks[i] = new Task(this); - BlockingQueue tq = - new LinkedBlockingQueue(); - this.exec = new ThreadPoolExecutor(nThreads, nThreads, - 0L, TimeUnit.MILLISECONDS, - tq); - exec.prestartAllCoreThreads(); + this.done = new CountDownLatch(nThreads - 1); + + this.subpops = new Subpop[nSubpops]; + for (int i = 0; i < nSubpops; i++) + subpops[i] = new Subpop(this); + + this.threads = new Worker[nThreads]; + int maxExchanges = nGen * nSubpops / nThreads; + for (int i = 0; i < nThreads; ++i) { + threads[i] = new Worker(this, maxExchanges); + } + } - /** Start the tasks */ void start() { - for (int i = 0; i < tasks.length; i++) - exec.execute(tasks[i]); + for (int i = 0; i < nThreads; ++i) { + threads[i].start(); + } } /** Stop the tasks */ void shutdown() { - exec.shutdownNow(); + for (int i = 0; i < threads.length; ++ i) + threads[i].interrupt(); } - /** Called by task upon terminations */ - void taskDone() { + void threadDone() { done.countDown(); } - /** Wait for (all but one) task to complete */ - void awaitTasks() throws InterruptedException { + /** Wait for tasks to complete */ + void awaitDone() throws InterruptedException { done.await(); } - /** - * Called by a task to resubmit itself after completing - * fewer than nGen iterations. - */ - void resubmit(Task task) { - try { - exec.execute(task); - } catch(RejectedExecutionException ignore) {} + int totalExchanges() { + int xs = 0; + for (int i = 0; i < threads.length; ++i) + xs += threads[i].exchanges; + return xs; } + /** + * Prints statistics, including best and worst tour lengths + * for points scaled in [0,1), scaled by the square root of + * number of points. This simplifies checking results. The + * expected optimal TSP for random points is believed to be + * around 0.76 * sqrt(N). For papers discussing this, see + * http://www.densis.fee.unicamp.br/~moscato/TSPBIB_home.html + */ void printSnapshot(double secs) { - int gens = 0; - Chromosome bestc = tasks[0].chromosomes[0]; + int xs = totalExchanges(); + long rate = (xs == 0)? 0L : (long)((secs * 1000000000.0) / xs); + Chromosome bestc = subpops[0].chromosomes[0]; Chromosome worstc = bestc; - for (int k = 0; k < tasks.length; ++k) { - gens += tasks[k].gen; - Chromosome[] cs = tasks[k].chromosomes; + for (int k = 0; k < subpops.length; ++k) { + Chromosome[] cs = subpops[k].chromosomes; if (cs[0].fitness < bestc.fitness) bestc = cs[0]; int w = cs[cs.length-1].fitness; @@ -277,75 +314,109 @@ public class TSPExchangerTest { double sqrtn = Math.sqrt(cities.length); double best = bestc.unitTourLength() / sqrtn; double worst = worstc.unitTourLength() / sqrtn; - int avegen = (done.getCount() == 0)? nGen : gens / tasks.length; - System.out.printf("Time:%9.3f Best:%7.3f Worst:%7.3f Gen:%6d Threads:%4d\n", - secs, best, worst, avegen, nThreads); + System.out.printf("N:%4d T:%8.3f B:%6.3f W:%6.3f X:%9d R:%7d\n", + nThreads, secs, best, worst, xs, rate); + // exchanger.printStats(); + // System.out.print(" s: " + exchanger.aveSpins()); + // System.out.print(" p: " + exchanger.aveParks()); } - } /** - * A Task updates its pool of chromosomes.. + * Worker threads perform updates on subpops. */ - static final class Task implements Runnable { - /** The pool of chromosomes, kept in sorted order */ + static final class Worker extends Thread { + final Population pop; + final int maxExchanges; + int exchanges; + final RNG rng = new RNG(); + + Worker(Population pop, int maxExchanges) { + this.pop = pop; + this.maxExchanges = maxExchanges; + } + + /** + * Repeatedly, find a subpop that is not being updated by + * another thread, and run a random number of updates on it. + */ + public void run() { + try { + int len = pop.subpops.length; + int pos = (rng.next() & 0x7FFFFFFF) % len; + while (exchanges < maxExchanges) { + Subpop s = pop.subpops[pos]; + AtomicBoolean busy = s.busy; + if (!busy.get() && busy.compareAndSet(false, true)) { + exchanges += s.runUpdates(); + busy.set(false); + pos = (rng.next() & 0x7FFFFFFF) % len; + } + else if (++pos >= len) + pos = 0; + } + pop.threadDone(); + } catch (InterruptedException fallthrough) { + } + } + } + + /** + * A Subpop maintains a set of chromosomes.. + */ + static final class Subpop { + /** The chromosomes, kept in sorted order */ final Chromosome[] chromosomes; + /** The parent population */ final Population pop; - /** The common exchanger, same for all tasks */ + /** Reservation bit for worker threads */ + final AtomicBoolean busy; + /** The common exchanger, same for all subpops */ final Exchanger exchanger; /** The current strand being exchanged */ Strand strand; /** Bitset used in cross */ final int[] inTour; final RNG rng; - final int poolSize; - final int nGen; - final int genPerRun; - int gen; + final int subpopSize; - Task(Population pop) { + Subpop(Population pop) { this.pop = pop; - this.nGen = pop.nGen; - this.gen = 0; - this.poolSize = pop.poolSize; - this.genPerRun = 4 * poolSize * Math.min(NCPUS, pop.nThreads); + this.subpopSize = pop.subpopSize; this.exchanger = pop.exchanger; + this.busy = new AtomicBoolean(false); this.rng = new RNG(); int length = cities.length; this.strand = new Strand(length); this.inTour = new int[(length >>> 5) + 1]; - this.chromosomes = new Chromosome[poolSize]; - for (int j = 0; j < poolSize; ++j) + this.chromosomes = new Chromosome[subpopSize]; + for (int j = 0; j < subpopSize; ++j) chromosomes[j] = new Chromosome(length, rng); Arrays.sort(chromosomes); } /** - * Run one or more update cycles. An average of genPerRun - * iterations are performed per run, and then the task is - * resubmitted. The rate is proportional to both pool size and - * number of threads. This keeps average rate of breeding - * across pools approximately constant across different test - * runs. + * Run a random number of updates. The number of updates is + * at least 1 and no more than subpopSize. This + * controls the granularity of multiplexing subpop updates on + * to threads. It is small enough to balance out updates + * across tasks, but large enough to avoid having runs + * dominated by subpop selection. It is randomized to avoid + * long runs where pairs of subpops exchange only with each + * other. It is hardwired because small variations of it + * don't matter much. + * + * @param g the first generation to run. */ - public void run() { - try { - int maxGen = gen + 1 + rng.next() % genPerRun; - if (maxGen > nGen) - maxGen = nGen; - while (gen++ < maxGen) - update(); - if (maxGen < nGen) - pop.resubmit(this); - else - pop.taskDone(); - } catch (InterruptedException ie) { - pop.taskDone(); - } + int runUpdates() throws InterruptedException { + int n = 1 + (rng.next() & ((subpopSize << 1) - 1)); + for (int i = 0; i < n; ++i) + update(); + return n; } /** - * Choose a breeder, exchange strand with another pool, and + * Choose a breeder, exchange strand with another subpop, and * cross them to create new chromosome to replace a chosen * dyer. */ @@ -366,10 +437,10 @@ public class TSPExchangerTest { * @return index of selected breeder */ int chooseBreeder() { - int mask = (poolSize >>> BREEDER_DECAY) - 1; + int mask = (subpopSize >>> BREEDER_DECAY) - 1; int b = 0; while ((rng.next() & mask) != mask) { - if (++b >= poolSize) + if (++b >= subpopSize) b = 0; } return b; @@ -383,11 +454,11 @@ public class TSPExchangerTest { * @return index of selected dyer */ int chooseDyer(int exclude) { - int mask = (poolSize >>> DYER_DECAY) - 1; - int d = poolSize - 1; + int mask = (subpopSize >>> DYER_DECAY) - 1; + int d = subpopSize - 1; while (d == exclude || (rng.next() & mask) != mask) { if (--d < 0) - d = poolSize - 1; + d = subpopSize - 1; } return d; } @@ -524,6 +595,9 @@ public class TSPExchangerTest { fitness = (int)(f / len); } + /** + * Return tour length for points scaled in [0, 1). + */ double unitTourLength() { int[] a = alleles; int len = a.length; @@ -537,7 +611,10 @@ public class TSPExchangerTest { return f; } - void validate() { // Ensure that this is a valid tour. + /** + * Check that this tour visits each city + */ + void validate() { int len = alleles.length; boolean[] used = new boolean[len]; for (int i = 0; i < len; ++i) @@ -550,7 +627,7 @@ public class TSPExchangerTest { } /** - * A Strand is a random sub-sequence of a Chromosome. Each task + * A Strand is a random sub-sequence of a Chromosome. Each subpop * creates only one strand, and then trades it with others, * refilling it on each iteration. */