1 |
/* |
2 |
* Written by Doug Lea with assistance from members of JCP JSR-166 |
3 |
* Expert Group and released to the public domain, as explained at |
4 |
* http://creativecommons.org/licenses/publicdomain |
5 |
*/ |
6 |
|
7 |
|
8 |
// MapReduce, specialized for ints |
9 |
|
10 |
import jsr166y.forkjoin.*; |
11 |
import static jsr166y.forkjoin.Ops.*; |
12 |
import java.util.*; |
13 |
import java.util.concurrent.*; |
14 |
|
15 |
public class IntMapReduceDemo { |
16 |
static final int NCPU = Runtime.getRuntime().availableProcessors(); |
17 |
/** |
18 |
* Sequential version, for performance comparison |
19 |
*/ |
20 |
static int seqMapReduce(int[] array, |
21 |
MapperFromIntToInt mapper, |
22 |
IntReducer reducer, |
23 |
int base) { |
24 |
int n = array.length; |
25 |
int x = base; |
26 |
for (int i = 0; i < n; ++i) |
27 |
x = reducer.combine(x, mapper.map(array[i])); |
28 |
return x; |
29 |
} |
30 |
|
31 |
// sample functions |
32 |
static final class NextRand implements MapperFromIntToInt { |
33 |
public int map(int seed) { |
34 |
int x = seed; |
35 |
x ^= x << 13; |
36 |
x ^= x >>> 7; |
37 |
x ^= (x << 17); |
38 |
return x; |
39 |
} |
40 |
} |
41 |
|
42 |
static final NextRand nextRand = new NextRand(); |
43 |
|
44 |
static final class Accum implements IntReducer { |
45 |
public int combine(int x, int y) { |
46 |
y ^= y << 6; |
47 |
y ^= y >>> 21; |
48 |
y ^= (y << 7); |
49 |
x ^= x << 6; |
50 |
x ^= x >>> 21; |
51 |
x ^= (x << 7); |
52 |
return x + y; |
53 |
} |
54 |
} |
55 |
|
56 |
static final Accum accum = new Accum(); |
57 |
|
58 |
/** for time conversion */ |
59 |
static final long NPS = (1000L * 1000 * 1000); |
60 |
|
61 |
public static void main(String[] args) throws Exception { |
62 |
int n = 1 << 20; |
63 |
int reps = 1 << 8; |
64 |
int[] array = new int[n]; |
65 |
ForkJoinPool fjp = new ForkJoinPool(1); |
66 |
ParallelIntArray pa = new ParallelIntArray(fjp, array); |
67 |
|
68 |
final int zero = 0; |
69 |
long last, now; |
70 |
double elapsed; |
71 |
last = System.nanoTime(); |
72 |
int sum = 0; |
73 |
for (int j = 0; j < 2; ++j) { |
74 |
pa.randomFill(); |
75 |
for (int k = 0; k < reps; ++k) { |
76 |
sum += seqMapReduce(array, nextRand, accum, zero); |
77 |
array[k] = sum; |
78 |
} |
79 |
now = System.nanoTime(); |
80 |
elapsed = (double)(now - last) / NPS; |
81 |
last = now; |
82 |
System.out.printf("sequential: %7.3f\n", elapsed); |
83 |
|
84 |
for (int i = 2; i <= NCPU; i <<= 1) { |
85 |
fjp.setPoolSize(i); |
86 |
last = System.nanoTime(); |
87 |
for (int k = 0; k < reps; ++k) { |
88 |
sum += pa.withMapping(nextRand).reduce(accum, zero); |
89 |
array[k] = sum; |
90 |
} |
91 |
now = System.nanoTime(); |
92 |
elapsed = (double)(now - last) / NPS; |
93 |
last = now; |
94 |
System.out.printf("poolSize %3d: %7.3f\n", i, elapsed); |
95 |
} |
96 |
for (int i = NCPU; i >= 1; i >>>= 1) { |
97 |
fjp.setPoolSize(i); |
98 |
last = System.nanoTime(); |
99 |
for (int k = 0; k < reps; ++k) { |
100 |
sum += pa.withMapping(nextRand).reduce(accum, zero); |
101 |
array[k] = sum; |
102 |
} |
103 |
now = System.nanoTime(); |
104 |
elapsed = (double)(now - last) / NPS; |
105 |
last = now; |
106 |
System.out.printf("poolSize %3d: %7.3f\n", i, elapsed); |
107 |
} |
108 |
|
109 |
} |
110 |
fjp.shutdown(); |
111 |
if (sum == 0) System.out.print(" "); |
112 |
} |
113 |
} |