ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/extra166y/MapReduceDemo.java
Revision: 1.6
Committed: Fri Sep 27 17:07:44 2013 UTC (10 years, 7 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.5: +4 -4 lines
Log Message:
No spaces separating one-character type parameters

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4 jsr166 1.4 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.1 */
6    
7     import jsr166y.*;
8     import extra166y.*;
9     import java.util.*;
10     import java.util.concurrent.*;
11    
12     public class MapReduceDemo {
13     static final int NCPU = Runtime.getRuntime().availableProcessors();
14     static final Random rng = new Random();
15    
16     /**
17     * Sequential version, for performance comparison
18     */
19 jsr166 1.6 static <T,U> U seqMapReduce(T[] array,
20     Ops.Op<T,U> mapper,
21     Ops.Reducer<U> reducer,
22     U base) {
23 dl 1.1 int n = array.length;
24     U x = base;
25 jsr166 1.2 for (int i = 0; i < n; ++i)
26 dl 1.1 x = reducer.op(x, mapper.op(array[i]));
27     return x;
28     }
29    
30 jsr166 1.2
31 dl 1.1
32     // sample functions
33     static final class GetNext implements Ops.Op<Rand, Long> {
34 jsr166 1.2 public Long op(Rand x) {
35 dl 1.1 return x.next();
36     }
37     }
38    
39     static final class Accum implements Ops.Reducer<Long> {
40     public Long op(Long a, Long b) {
41     long x = a;
42     long y = b;
43     return x + y;
44     }
45     }
46    
47     /** for time conversion */
48     static final long NPS = (1000L * 1000 * 1000);
49    
50     public static void main(String[] args) throws Exception {
51     int n = 1 << 18;
52     int reps = 1 << 8;
53     Rand[] array = new Rand[n];
54 jsr166 1.2 for (int i = 0; i < n; ++i)
55 dl 1.1 array[i] = new Rand(i+1);
56 dl 1.3 ForkJoinPool fjp = new ForkJoinPool();
57 dl 1.1 ParallelArray<Rand> pa = ParallelArray.createUsingHandoff(array, fjp);
58     final GetNext getNext = new GetNext();
59     final Accum accum = new Accum();
60     final Long zero = Long.valueOf(0);
61     long last, now;
62     double elapsed;
63     for (int j = 0; j < 2; ++j) {
64     long rseed = rng.nextLong();
65     resetSeeds(array, rseed);
66     long seqsum = 0;
67     last = System.nanoTime();
68     for (int k = 0; k < reps; ++k) {
69     seqsum += seqMapReduce(array, getNext, accum, zero);
70     Rand tmp = array[k];
71     array[k] = array[n - k - 1];
72     array[n - k - 1] = tmp;
73     }
74     now = System.nanoTime();
75     elapsed = (double)(now - last) / NPS;
76     last = now;
77     System.out.printf("sequential: %7.3f\n", elapsed);
78     for (int i = 2; i <= NCPU; i <<= 1) {
79     resetSeeds(array, rseed);
80     long sum = 0;
81 dl 1.3 // fjp.setParallelism(i);
82 dl 1.1 last = System.nanoTime();
83     for (int k = 0; k < reps; ++k) {
84     sum += pa.withMapping(getNext).reduce(accum, zero);
85     Rand tmp = array[k];
86     array[k] = array[n - k - 1];
87     array[n - k - 1] = tmp;
88     }
89     now = System.nanoTime();
90     elapsed = (double)(now - last) / NPS;
91     last = now;
92 dl 1.3 System.out.printf("poolSize %3d: %7.3f\n", fjp.getParallelism(), elapsed);
93 dl 1.1 if (sum != seqsum) throw new Error("checksum");
94     }
95     for (int i = NCPU; i >= 1; i >>>= 1) {
96     resetSeeds(array, rseed);
97     long sum = 0;
98 dl 1.3 // fjp.setParallelism(i);
99 dl 1.1 last = System.nanoTime();
100     for (int k = 0; k < reps; ++k) {
101     sum += pa.withMapping(getNext).reduce(accum, zero);
102     Rand tmp = array[k];
103     array[k] = array[n - k - 1];
104     array[n - k - 1] = tmp;
105     }
106     now = System.nanoTime();
107     elapsed = (double)(now - last) / NPS;
108     last = now;
109 dl 1.3 System.out.printf("poolSize %3d: %7.3f\n", fjp.getParallelism(), elapsed);
110 dl 1.1 if (sum != seqsum) throw new Error("checksum");
111     }
112     }
113     fjp.shutdownNow();
114     fjp.awaitTermination(1, TimeUnit.SECONDS);
115     Thread.sleep(100);
116     }
117    
118     static void resetSeeds(Rand[] array, long s) {
119     for (int i = 0; i < array.length; ++i)
120     array[i].seed = s++;
121     }
122    
123     /**
124     * Xorshift Random algorithm.
125     */
126     public static final class Rand {
127     private long seed;
128     Rand(long s) {
129     seed = s;
130     }
131     public long next() {
132     long x = seed;
133 jsr166 1.2 x ^= x << 13;
134     x ^= x >>> 7;
135 dl 1.1 x ^= (x << 17);
136     seed = x;
137     return x;
138     }
139     }
140    
141     }