ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/TSPExchangerTest.java
(Generate patch)

Comparing jsr166/src/test/loops/TSPExchangerTest.java (file contents):
Revision 1.3 by dl, Mon Dec 12 20:06:20 2005 UTC vs.
Revision 1.4 by dl, Mon Feb 13 12:39:23 2006 UTC

# Line 11 | Line 11 | import java.util.concurrent.locks.*;
11  
12   /**
13   * A parallel Traveling Salesperson Problem (TSP) program based on a
14 < * genetic algorithm using an Exchanger.  A population of chromosomes
15 < * is distributed among "pools".  The chromosomes represent tours, and
16 < * their fitness is the total tour length. A Task is associated with
17 < * each pool.  Each task repeatedly does, for a fixed number of
18 < * iterations (generations):
14 > * genetic algorithm using an Exchanger.  A population of chromosomes is
15 > * distributed among "subpops".  Each chromosomes represents a tour,
16 > * and its fitness is the total tour length.
17 > *
18 > * A set of worker threads perform updates on subpops. The basic
19 > * update step is:
20   * <ol>
21 < *   <li> Select a breeder b from the pool
21 > *   <li> Select a breeder b from the subpop
22   *   <li> Create a strand of its tour with a random starting point and length
23   *   <li> Offer the strand to the exchanger, receiving a strand from
24 < *        another pool
24 > *        another subpop
25   *   <li> Combine b and the received strand using crossing function to
26   *        create new chromosome c.
27 < *   <li> Replace a chromosome in the pool with c.
27 > *   <li> Replace a chromosome in the subpop with c.
28   * </ol>
29   *
30 + * This continues for a given number of generations per subpop.
31 + * Because there are normally more subpops than threads, each worker
32 + * thread performs small (randomly sized) run of updates for one
33 + * subpop and then selects another. A run continues until there is at
34 + * most one remaining thread performing updates.
35 + *
36   * See below for more details.
30 * <p>
31 *
37   */
38   public class TSPExchangerTest {
39      static final int NCPUS = Runtime.getRuntime().availableProcessors();
40  
41 <    static final int DEFAULT_MAX_THREADS  = NCPUS + 6;
41 >    /** Runs start with two threads, increasing by two through max */
42 >    static final int DEFAULT_MAX_THREADS  = Math.max(4, NCPUS + NCPUS/2);
43 >
44 >    /** The number of replication runs per thread value */
45 >    static final int DEFAULT_REPLICATIONS = 3;
46 >
47 >    /** If true, print statistics in SNAPSHOT_RATE intervals */
48 >    static boolean verbose = true;
49 >    static final long SNAPSHOT_RATE = 10000; // in milliseconds
50  
51      /**
52       * The problem size. Each city is a random point. The goal is to
# Line 44 | Line 57 | public class TSPExchangerTest {
57      // Tuning parameters.
58  
59      /**
60 <     * The number of chromosomes per pool. Must be a power of two.
60 >     * The number of chromosomes per subpop. Must be a power of two.
61       *
62       * Smaller values lead to faster iterations but poorer quality
63       * results
64       */
65 <    static final int DEFAULT_POOL_SIZE = 32;
65 >    static final int DEFAULT_SUBPOP_SIZE = 32;
66  
67      /**
68 <     * The number of iterations per task. Convergence appears
68 >     * The number of iterations per subpop. Convergence appears
69       * to be roughly proportional to #cities-squared
70       */
71      static final int DEFAULT_GENERATIONS = DEFAULT_CITIES * DEFAULT_CITIES;
72  
73      /**
74 <     * The number of pools. The total population is #pools * poolSize,
74 >     * The number of subpops. The total population is #subpops * subpopSize,
75       * which should be roughly on the order of #cities-squared
76       *
77       * Smaller values lead to faster total runs but poorer quality
78       * results
79       */
80 <    static final int DEFAULT_NPOOLS = DEFAULT_GENERATIONS / DEFAULT_POOL_SIZE;
80 >    static final int DEFAULT_NSUBPOPS = DEFAULT_GENERATIONS / DEFAULT_SUBPOP_SIZE;
81  
82      /**
83       * The minimum length for a random chromosome strand.
# Line 84 | Line 97 | public class TSPExchangerTest {
97       * Probablility control for selecting breeders.
98       * Breeders are selected starting at the best-fitness chromosome,
99       * with exponentially decaying probablility
100 <     * 1 / (poolSize >>> BREEDER_DECAY).
100 >     * 1 / (subpopSize >>> BREEDER_DECAY).
101       *
102       * Larger values usually cause faster convergence but poorer
103       * quality results
# Line 95 | Line 108 | public class TSPExchangerTest {
108       * Probablility control for selecting dyers.
109       * Dyers are selected starting at the worst-fitness chromosome,
110       * with exponentially decaying probablility
111 <     * 1 / (poolSize >>> DYER_DECAY)
111 >     * 1 / (subpopSize >>> DYER_DECAY)
112       *
113       * Larger values usually cause faster convergence but poorer
114       * quality results
115       */
116      static final int DYER_DECAY = 1;
117  
105    static final boolean verbose = false;
106    static final long SNAPSHOT_RATE = 10000; // in milliseconds
107
118      /**
119       * The set of cities. Created once per program run, to
120       * make it easier to compare solutions across different runs.
# Line 114 | Line 124 | public class TSPExchangerTest {
124      public static void main(String[] args) throws Exception {
125          int maxThreads = DEFAULT_MAX_THREADS;
126          int nCities = DEFAULT_CITIES;
127 <        int poolSize = DEFAULT_POOL_SIZE;
127 >        int subpopSize = DEFAULT_SUBPOP_SIZE;
128          int nGen = nCities * nCities;
129 <        int nPools = nCities * nCities / poolSize;
129 >        int nSubpops = nCities * nCities / subpopSize;
130 >        int nReps = DEFAULT_REPLICATIONS;
131  
132          try {
133              int argc = 0;
# Line 125 | Line 136 | public class TSPExchangerTest {
136                  if (option.equals("-c")) {
137                      nCities = Integer.parseInt(args[argc]);
138                      nGen = nCities * nCities;
139 <                    nPools = nCities * nCities / poolSize;
139 >                    nSubpops = nCities * nCities / subpopSize;
140                  }
141                  else if (option.equals("-p"))
142 <                    poolSize = Integer.parseInt(args[argc]);
142 >                    subpopSize = Integer.parseInt(args[argc]);
143                  else if (option.equals("-g"))
144                      nGen = Integer.parseInt(args[argc]);
145                  else if (option.equals("-n"))
146 <                    nPools = Integer.parseInt(args[argc]);
146 >                    nSubpops = Integer.parseInt(args[argc]);
147 >                else if (option.equals("-q")) {
148 >                    verbose = false;
149 >                    argc--;
150 >                }
151 >                else if (option.equals("-r"))
152 >                    nReps = Integer.parseInt(args[argc]);
153                  else
154                      maxThreads = Integer.parseInt(option);
155                  argc++;
# Line 145 | Line 162 | public class TSPExchangerTest {
162          System.out.print("TSPExchangerTest");
163          System.out.print(" -c " + nCities);
164          System.out.print(" -g " + nGen);
165 <        System.out.print(" -p " + poolSize);
166 <        System.out.print(" -n " + nPools);
165 >        System.out.print(" -p " + subpopSize);
166 >        System.out.print(" -n " + nSubpops);
167 >        System.out.print(" -r " + nReps);
168          System.out.print(" max threads " + maxThreads);
169          System.out.println();
170  
171          cities = new CitySet(nCities);
172  
173 <        for (int i = 2; i <= maxThreads; i += 2)
174 <            oneRun(i, nPools, poolSize, nGen);
173 >        if (false && NCPUS > 4) {
174 >            int h = NCPUS/2;
175 >            System.out.printf("Threads: %4d Warmup\n", h);
176 >            oneRun(h, nSubpops, subpopSize, nGen);
177 >            Thread.sleep(500);
178 >        }
179 >
180 >        int maxt = (maxThreads < nSubpops) ? maxThreads : nSubpops;
181 >        for (int j = 0; j < nReps; ++j) {
182 >            for (int i = 2; i <= maxt; i += 2) {
183 >                System.out.printf("Threads: %4d Replication: %2d\n", i, j);
184 >                oneRun(i, nSubpops, subpopSize, nGen);
185 >                Thread.sleep(500);
186 >            }
187 >        }
188      }
189  
190      static void reportUsageErrorAndDie() {
191          System.out.print("usage: TSPExchangerTest");
192          System.out.print(" [-c #cities]");
193 <        System.out.print(" [-p #poolSize]");
193 >        System.out.print(" [-p #subpopSize]");
194          System.out.print(" [-g #generations]");
195 <        System.out.print(" [-n #pools]");
195 >        System.out.print(" [-n #subpops]");
196 >        System.out.print(" [-r #replications]");
197 >        System.out.print(" [-q <quiet>]");
198          System.out.print(" #threads]");
199          System.out.println();
200          System.exit(0);
201      }
202  
203      /**
204 <     * Perform one run with the given parameters.  Each run completes
205 <     * when there are fewer than nThreads-2 tasks remaining.  This
206 <     * avoids measuring termination effects, as well as cases where
207 <     * the one last remaining task has no one left to exchange with,
175 <     * so the pool is abruptly terminated.
204 >     * Perform one run with the given parameters.  Each run complete
205 >     * when there are fewer than 2 active threads.  When there is
206 >     * only one remaining thread, it will have no one to exchange
207 >     * with, so it is terminated (via interrupt).
208       */
209 <    static void oneRun(int nThreads, int nPools, int poolSize, int nGen)
209 >    static void oneRun(int nThreads, int nSubpops, int subpopSize, int nGen)
210          throws InterruptedException {
211 <        Population p = new Population(nThreads, nPools, poolSize, nGen);
211 >        Population p = new Population(nThreads, nSubpops, subpopSize, nGen);
212          ProgressMonitor mon = null;
213          if (verbose) {
214 +            p.printSnapshot(0);
215              mon = new ProgressMonitor(p);
216              mon.start();
217          }
185        p.printSnapshot(0);
218          long startTime = System.nanoTime();
219          p.start();
220 <        p.awaitTasks();
220 >        p.awaitDone();
221          long stopTime = System.nanoTime();
222          if (mon != null)
223              mon.interrupt();
224          p.shutdown();
225 <        Thread.sleep(100);
225 >        //        Thread.sleep(100);
226  
227          long elapsed = stopTime - startTime;
196        long rate = elapsed / (nPools * nGen);
228          double secs = (double)elapsed / 1000000000.0;
229          p.printSnapshot(secs);
199        System.out.printf("%10d ns per transfer\n", rate);
230      }
231  
232  
233      /**
234 <     * A Population creates the pools, tasks, and threads for a run
234 >     * A Population creates the subpops, subpops, and threads for a run
235       * and has control methods to start, stop, and report progress.
236       */
237      static final class Population {
238 <        final Task[] tasks;
238 >        final Worker[] threads;
239 >        final Subpop[] subpops;
240          final Exchanger<Strand> exchanger;
210        final ThreadPoolExecutor exec;
241          final CountDownLatch done;
242          final int nGen;
243 <        final int poolSize;
243 >        final int subpopSize;
244          final int nThreads;
245  
246 <        Population(int nThreads, int nPools, int poolSize, int nGen) {
246 >        Population(int nThreads, int nSubpops, int subpopSize, int nGen) {
247              this.nThreads = nThreads;
248              this.nGen = nGen;
249 <            this.poolSize = poolSize;
249 >            this.subpopSize = subpopSize;
250              this.exchanger = new Exchanger<Strand>();
251 <            this.done = new CountDownLatch(Math.max(1, nPools - nThreads - 2));
252 <            this.tasks = new Task[nPools];
253 <            for (int i = 0; i < nPools; i++)
254 <                tasks[i] = new Task(this);
255 <            BlockingQueue<Runnable> tq =
256 <                new LinkedBlockingQueue<Runnable>();
257 <            this.exec = new ThreadPoolExecutor(nThreads, nThreads,
258 <                                               0L, TimeUnit.MILLISECONDS,
259 <                                               tq);
260 <            exec.prestartAllCoreThreads();
251 >            this.done = new CountDownLatch(nThreads - 1);
252 >
253 >            this.subpops = new Subpop[nSubpops];
254 >            for (int i = 0; i < nSubpops; i++)
255 >                subpops[i] = new Subpop(this);
256 >
257 >            this.threads = new Worker[nThreads];
258 >            int maxExchanges = nGen * nSubpops / nThreads;
259 >            for (int i = 0; i < nThreads; ++i) {
260 >                threads[i] = new Worker(this, maxExchanges);
261 >            }
262 >
263          }
264  
233        /** Start the tasks */
265          void start() {
266 <            for (int i = 0; i < tasks.length; i++)
267 <                exec.execute(tasks[i]);
266 >            for (int i = 0; i < nThreads; ++i) {
267 >                threads[i].start();                
268 >            }
269          }
270  
271          /** Stop the tasks */
272          void shutdown() {
273 <            exec.shutdownNow();
273 >            for (int i = 0; i < threads.length; ++ i)
274 >                threads[i].interrupt();
275          }
276  
277 <        /** Called by task upon terminations */
245 <        void taskDone() {
277 >        void threadDone() {
278              done.countDown();
279          }
280  
281 <        /** Wait for (all but one) task to complete */
282 <        void awaitTasks() throws InterruptedException {
281 >        /** Wait for tasks to complete */
282 >        void awaitDone() throws InterruptedException {
283              done.await();
284          }
285  
286 <        /**
287 <         * Called by a task to resubmit itself after completing
288 <         * fewer than nGen iterations.
289 <         */
290 <        void resubmit(Task task) {
259 <            try {
260 <                exec.execute(task);
261 <            } catch(RejectedExecutionException ignore) {}
286 >        int totalExchanges() {
287 >            int xs = 0;
288 >            for (int i = 0; i < threads.length; ++i)
289 >                xs += threads[i].exchanges;
290 >            return xs;
291          }
292  
293 +        /**
294 +         * Prints statistics, including best and worst tour lengths
295 +         * for points scaled in [0,1), scaled by the square root of
296 +         * number of points. This simplifies checking results.  The
297 +         * expected optimal TSP for random points is believed to be
298 +         * around 0.76 * sqrt(N). For papers discussing this, see
299 +         * http://www.densis.fee.unicamp.br/~moscato/TSPBIB_home.html
300 +         */
301          void printSnapshot(double secs) {
302 <            int gens = 0;
303 <            Chromosome bestc = tasks[0].chromosomes[0];
302 >            int xs = totalExchanges();
303 >            long rate = (xs == 0)? 0L : (long)((secs * 1000000000.0) / xs);
304 >            Chromosome bestc = subpops[0].chromosomes[0];
305              Chromosome worstc = bestc;
306 <            for (int k = 0; k < tasks.length; ++k) {
307 <                gens += tasks[k].gen;
270 <                Chromosome[] cs = tasks[k].chromosomes;
306 >            for (int k = 0; k < subpops.length; ++k) {
307 >                Chromosome[] cs = subpops[k].chromosomes;
308                  if (cs[0].fitness < bestc.fitness)
309                      bestc = cs[0];
310                  int w = cs[cs.length-1].fitness;
# Line 277 | Line 314 | public class TSPExchangerTest {
314              double sqrtn = Math.sqrt(cities.length);
315              double best = bestc.unitTourLength() / sqrtn;
316              double worst = worstc.unitTourLength() / sqrtn;
317 <            int avegen = (done.getCount() == 0)? nGen : gens / tasks.length;
318 <            System.out.printf("Time:%9.3f Best:%7.3f Worst:%7.3f Gen:%6d Threads:%4d\n",
319 <                              secs, best, worst, avegen, nThreads);
317 >            System.out.printf("N:%4d T:%8.3f B:%6.3f W:%6.3f X:%9d R:%7d\n",
318 >                              nThreads, secs, best, worst, xs, rate);
319 >            //            exchanger.printStats();
320 >            //            System.out.print(" s: " + exchanger.aveSpins());
321 >            //            System.out.print(" p: " + exchanger.aveParks());
322          }
284        
323      }
324  
325      /**
326 <     * A Task updates its pool of chromosomes..
326 >     * Worker threads perform updates on subpops.
327       */
328 <    static final class Task implements Runnable {
329 <        /** The pool of chromosomes, kept in sorted order */
328 >    static final class Worker extends Thread {
329 >        final Population pop;
330 >        final int maxExchanges;
331 >        int exchanges;
332 >        final RNG rng = new RNG();
333 >
334 >        Worker(Population pop, int maxExchanges) {
335 >            this.pop = pop;
336 >            this.maxExchanges = maxExchanges;
337 >        }
338 >
339 >        /**
340 >         * Repeatedly, find a subpop that is not being updated by
341 >         * another thread, and run a random number of updates on it.
342 >         */
343 >        public void run() {
344 >            try {
345 >                int len = pop.subpops.length;
346 >                int pos = (rng.next() & 0x7FFFFFFF) % len;
347 >                while (exchanges < maxExchanges) {
348 >                    Subpop s = pop.subpops[pos];
349 >                    AtomicBoolean busy = s.busy;
350 >                    if (!busy.get() && busy.compareAndSet(false, true)) {
351 >                        exchanges += s.runUpdates();
352 >                        busy.set(false);
353 >                        pos = (rng.next() & 0x7FFFFFFF) % len;
354 >                    }
355 >                    else if (++pos >= len)
356 >                        pos = 0;
357 >                }
358 >                pop.threadDone();
359 >            } catch (InterruptedException fallthrough) {
360 >            }
361 >        }
362 >    }
363 >
364 >    /**
365 >     * A Subpop maintains a set of chromosomes..
366 >     */
367 >    static final class Subpop {
368 >        /** The chromosomes, kept in sorted order */
369          final Chromosome[] chromosomes;
370 +        /** The parent population */
371          final Population pop;
372 <        /** The common exchanger, same for all tasks */
372 >        /** Reservation bit for worker threads */
373 >        final AtomicBoolean busy;
374 >        /** The common exchanger, same for all subpops */
375          final Exchanger<Strand> exchanger;
376          /** The current strand being exchanged */
377          Strand strand;
378          /** Bitset used in cross */
379          final int[] inTour;
380          final RNG rng;
381 <        final int poolSize;
302 <        final int nGen;
303 <        final int genPerRun;
304 <        int gen;
381 >        final int subpopSize;
382  
383 <        Task(Population pop) {
383 >        Subpop(Population pop) {
384              this.pop = pop;
385 <            this.nGen = pop.nGen;
309 <            this.gen = 0;
310 <            this.poolSize = pop.poolSize;
311 <            this.genPerRun = 4 * poolSize * Math.min(NCPUS, pop.nThreads);
385 >            this.subpopSize = pop.subpopSize;
386              this.exchanger = pop.exchanger;
387 +            this.busy = new AtomicBoolean(false);
388              this.rng = new RNG();
389              int length = cities.length;
390              this.strand = new Strand(length);
391              this.inTour = new int[(length >>> 5) + 1];
392 <            this.chromosomes = new Chromosome[poolSize];
393 <            for (int j = 0; j < poolSize; ++j)
392 >            this.chromosomes = new Chromosome[subpopSize];
393 >            for (int j = 0; j < subpopSize; ++j)
394                  chromosomes[j] = new Chromosome(length, rng);
395              Arrays.sort(chromosomes);
396          }
397  
398          /**
399 <         * Run one or more update cycles.  An average of genPerRun
400 <         * iterations are performed per run, and then the task is
401 <         * resubmitted. The rate is proportional to both pool size and
402 <         * number of threads.  This keeps average rate of breeding
403 <         * across pools approximately constant across different test
404 <         * runs.
399 >         * Run a random number of updates.  The number of updates is
400 >         * at least 1 and no more than subpopSize.  This
401 >         * controls the granularity of multiplexing subpop updates on
402 >         * to threads. It is small enough to balance out updates
403 >         * across tasks, but large enough to avoid having runs
404 >         * dominated by subpop selection. It is randomized to avoid
405 >         * long runs where pairs of subpops exchange only with each
406 >         * other.  It is hardwired because small variations of it
407 >         * don't matter much.
408 >         *
409 >         * @param g the first generation to run.
410           */
411 <        public void run() {
412 <            try {
413 <                int maxGen = gen + 1 + rng.next() % genPerRun;
414 <                if (maxGen > nGen)
415 <                    maxGen = nGen;
336 <                while (gen++ < maxGen)
337 <                    update();
338 <                if (maxGen < nGen)
339 <                    pop.resubmit(this);
340 <                else
341 <                    pop.taskDone();
342 <            } catch (InterruptedException ie) {
343 <                pop.taskDone();
344 <            }
411 >        int runUpdates() throws InterruptedException {
412 >            int n = 1 + (rng.next() & ((subpopSize << 1) - 1));
413 >            for (int i = 0; i < n; ++i)
414 >                update();
415 >            return n;
416          }
417  
418          /**
419 <         * Choose a breeder, exchange strand with another pool, and
419 >         * Choose a breeder, exchange strand with another subpop, and
420           * cross them to create new chromosome to replace a chosen
421           * dyer.
422           */
# Line 366 | Line 437 | public class TSPExchangerTest {
437           * @return index of selected breeder
438           */
439          int chooseBreeder() {
440 <            int mask = (poolSize >>> BREEDER_DECAY) - 1;
440 >            int mask = (subpopSize >>> BREEDER_DECAY) - 1;
441              int b = 0;
442              while ((rng.next() & mask) != mask) {
443 <                if (++b >= poolSize)
443 >                if (++b >= subpopSize)
444                      b = 0;
445              }
446              return b;
# Line 383 | Line 454 | public class TSPExchangerTest {
454           * @return index of selected dyer
455           */
456          int chooseDyer(int exclude) {
457 <            int mask = (poolSize >>> DYER_DECAY)  - 1;
458 <            int d = poolSize - 1;
457 >            int mask = (subpopSize >>> DYER_DECAY)  - 1;
458 >            int d = subpopSize - 1;
459              while (d == exclude || (rng.next() & mask) != mask) {
460                  if (--d < 0)
461 <                    d = poolSize - 1;
461 >                    d = subpopSize - 1;
462              }
463              return d;
464          }
# Line 524 | Line 595 | public class TSPExchangerTest {
595              fitness = (int)(f / len);
596          }
597  
598 +        /**
599 +         * Return tour length for points scaled in [0, 1).
600 +         */
601          double unitTourLength() {
602              int[] a = alleles;
603              int len = a.length;
# Line 537 | Line 611 | public class TSPExchangerTest {
611              return f;
612          }
613  
614 <        void validate() { // Ensure that this is a valid tour.
614 >        /**
615 >         * Check that this tour visits each city
616 >         */
617 >        void validate() {
618              int len = alleles.length;
619              boolean[] used = new boolean[len];
620              for (int i = 0; i < len; ++i)
# Line 550 | Line 627 | public class TSPExchangerTest {
627      }
628  
629      /**
630 <     * A Strand is a random sub-sequence of a Chromosome.  Each task
630 >     * A Strand is a random sub-sequence of a Chromosome.  Each subpop
631       * creates only one strand, and then trades it with others,
632       * refilling it on each iteration.
633       */

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines