ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/extra166y/MapReduceDemo.java
Revision: 1.6
Committed: Fri Sep 27 17:07:44 2013 UTC (10 years, 6 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.5: +4 -4 lines
Log Message:
No spaces separating one-character type parameters

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 import jsr166y.*;
8 import extra166y.*;
9 import java.util.*;
10 import java.util.concurrent.*;
11
12 public class MapReduceDemo {
13 static final int NCPU = Runtime.getRuntime().availableProcessors();
14 static final Random rng = new Random();
15
16 /**
17 * Sequential version, for performance comparison
18 */
19 static <T,U> U seqMapReduce(T[] array,
20 Ops.Op<T,U> mapper,
21 Ops.Reducer<U> reducer,
22 U base) {
23 int n = array.length;
24 U x = base;
25 for (int i = 0; i < n; ++i)
26 x = reducer.op(x, mapper.op(array[i]));
27 return x;
28 }
29
30
31
32 // sample functions
33 static final class GetNext implements Ops.Op<Rand, Long> {
34 public Long op(Rand x) {
35 return x.next();
36 }
37 }
38
39 static final class Accum implements Ops.Reducer<Long> {
40 public Long op(Long a, Long b) {
41 long x = a;
42 long y = b;
43 return x + y;
44 }
45 }
46
47 /** for time conversion */
48 static final long NPS = (1000L * 1000 * 1000);
49
50 public static void main(String[] args) throws Exception {
51 int n = 1 << 18;
52 int reps = 1 << 8;
53 Rand[] array = new Rand[n];
54 for (int i = 0; i < n; ++i)
55 array[i] = new Rand(i+1);
56 ForkJoinPool fjp = new ForkJoinPool();
57 ParallelArray<Rand> pa = ParallelArray.createUsingHandoff(array, fjp);
58 final GetNext getNext = new GetNext();
59 final Accum accum = new Accum();
60 final Long zero = Long.valueOf(0);
61 long last, now;
62 double elapsed;
63 for (int j = 0; j < 2; ++j) {
64 long rseed = rng.nextLong();
65 resetSeeds(array, rseed);
66 long seqsum = 0;
67 last = System.nanoTime();
68 for (int k = 0; k < reps; ++k) {
69 seqsum += seqMapReduce(array, getNext, accum, zero);
70 Rand tmp = array[k];
71 array[k] = array[n - k - 1];
72 array[n - k - 1] = tmp;
73 }
74 now = System.nanoTime();
75 elapsed = (double)(now - last) / NPS;
76 last = now;
77 System.out.printf("sequential: %7.3f\n", elapsed);
78 for (int i = 2; i <= NCPU; i <<= 1) {
79 resetSeeds(array, rseed);
80 long sum = 0;
81 // fjp.setParallelism(i);
82 last = System.nanoTime();
83 for (int k = 0; k < reps; ++k) {
84 sum += pa.withMapping(getNext).reduce(accum, zero);
85 Rand tmp = array[k];
86 array[k] = array[n - k - 1];
87 array[n - k - 1] = tmp;
88 }
89 now = System.nanoTime();
90 elapsed = (double)(now - last) / NPS;
91 last = now;
92 System.out.printf("poolSize %3d: %7.3f\n", fjp.getParallelism(), elapsed);
93 if (sum != seqsum) throw new Error("checksum");
94 }
95 for (int i = NCPU; i >= 1; i >>>= 1) {
96 resetSeeds(array, rseed);
97 long sum = 0;
98 // fjp.setParallelism(i);
99 last = System.nanoTime();
100 for (int k = 0; k < reps; ++k) {
101 sum += pa.withMapping(getNext).reduce(accum, zero);
102 Rand tmp = array[k];
103 array[k] = array[n - k - 1];
104 array[n - k - 1] = tmp;
105 }
106 now = System.nanoTime();
107 elapsed = (double)(now - last) / NPS;
108 last = now;
109 System.out.printf("poolSize %3d: %7.3f\n", fjp.getParallelism(), elapsed);
110 if (sum != seqsum) throw new Error("checksum");
111 }
112 }
113 fjp.shutdownNow();
114 fjp.awaitTermination(1, TimeUnit.SECONDS);
115 Thread.sleep(100);
116 }
117
118 static void resetSeeds(Rand[] array, long s) {
119 for (int i = 0; i < array.length; ++i)
120 array[i].seed = s++;
121 }
122
123 /**
124 * Xorshift Random algorithm.
125 */
126 public static final class Rand {
127 private long seed;
128 Rand(long s) {
129 seed = s;
130 }
131 public long next() {
132 long x = seed;
133 x ^= x << 13;
134 x ^= x >>> 7;
135 x ^= (x << 17);
136 seed = x;
137 return x;
138 }
139 }
140
141 }