ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/jsr166y/MapReduceDemo.java
Revision: 1.6
Committed: Sun Nov 1 21:50:49 2009 UTC (14 years, 7 months ago) by dl
Branch: MAIN
CVS Tags: HEAD
Changes since 1.5: +0 -0 lines
State: FILE REMOVED
Log Message:
move tests for unshipped classes

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 import jsr166y.forkjoin.*;
8 import java.util.*;
9 import java.util.concurrent.*;
10
11 public class MapReduceDemo {
12 static final int NCPU = Runtime.getRuntime().availableProcessors();
13 static final Random rng = new Random();
14
15 /**
16 * Sequential version, for performance comparison
17 */
18 static <T, U> U seqMapReduce(T[] array,
19 Ops.Op<T, U> mapper,
20 Ops.Reducer<U> reducer,
21 U base) {
22 int n = array.length;
23 U x = base;
24 for (int i = 0; i < n; ++i)
25 x = reducer.op(x, mapper.op(array[i]));
26 return x;
27 }
28
29
30
31 // sample functions
32 static final class GetNext implements Ops.Op<Rand, Long> {
33 public Long op(Rand x) {
34 return x.next();
35 }
36 }
37
38 static final class Accum implements Ops.Reducer<Long> {
39 public Long op(Long a, Long b) {
40 long x = a;
41 long y = b;
42 return x + y;
43 }
44 }
45
46 /** for time conversion */
47 static final long NPS = (1000L * 1000 * 1000);
48
49 public static void main(String[] args) throws Exception {
50 int n = 1 << 18;
51 int reps = 1 << 8;
52 Rand[] array = new Rand[n];
53 for (int i = 0; i < n; ++i)
54 array[i] = new Rand(i+1);
55 ForkJoinPool fjp = new ForkJoinPool(1);
56 ParallelArray<Rand> pa = ParallelArray.createUsingHandoff(array, fjp);
57 final GetNext getNext = new GetNext();
58 final Accum accum = new Accum();
59 final Long zero = Long.valueOf(0);
60 long last, now;
61 double elapsed;
62 for (int j = 0; j < 2; ++j) {
63 long rseed = rng.nextLong();
64 resetSeeds(array, rseed);
65 long seqsum = 0;
66 last = System.nanoTime();
67 for (int k = 0; k < reps; ++k) {
68 seqsum += seqMapReduce(array, getNext, accum, zero);
69 Rand tmp = array[k];
70 array[k] = array[n - k - 1];
71 array[n - k - 1] = tmp;
72 }
73 now = System.nanoTime();
74 elapsed = (double)(now - last) / NPS;
75 last = now;
76 System.out.printf("sequential: %7.3f\n", elapsed);
77 for (int i = 2; i <= NCPU; i <<= 1) {
78 resetSeeds(array, rseed);
79 long sum = 0;
80 fjp.setPoolSize(i);
81 last = System.nanoTime();
82 for (int k = 0; k < reps; ++k) {
83 sum += pa.withMapping(getNext).reduce(accum, zero);
84 Rand tmp = array[k];
85 array[k] = array[n - k - 1];
86 array[n - k - 1] = tmp;
87 }
88 now = System.nanoTime();
89 elapsed = (double)(now - last) / NPS;
90 last = now;
91 System.out.printf("poolSize %3d: %7.3f\n", i, elapsed);
92 if (sum != seqsum) throw new Error("checksum");
93 }
94 for (int i = NCPU; i >= 1; i >>>= 1) {
95 resetSeeds(array, rseed);
96 long sum = 0;
97 fjp.setPoolSize(i);
98 last = System.nanoTime();
99 for (int k = 0; k < reps; ++k) {
100 sum += pa.withMapping(getNext).reduce(accum, zero);
101 Rand tmp = array[k];
102 array[k] = array[n - k - 1];
103 array[n - k - 1] = tmp;
104 }
105 now = System.nanoTime();
106 elapsed = (double)(now - last) / NPS;
107 last = now;
108 System.out.printf("poolSize %3d: %7.3f\n", i, elapsed);
109 if (sum != seqsum) throw new Error("checksum");
110 }
111 }
112 fjp.shutdownNow();
113 fjp.awaitTermination(1, TimeUnit.SECONDS);
114 Thread.sleep(100);
115 }
116
117 static void resetSeeds(Rand[] array, long s) {
118 for (int i = 0; i < array.length; ++i)
119 array[i].seed = s++;
120 }
121
122 /**
123 * Xorshift Random algorithm.
124 */
125 public static final class Rand {
126 private long seed;
127 Rand(long s) {
128 seed = s;
129 }
130 public long next() {
131 long x = seed;
132 x ^= x << 13;
133 x ^= x >>> 7;
134 x ^= (x << 17);
135 seed = x;
136 return x;
137 }
138 }
139
140 }