ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/FJSums.java
Revision: 1.8
Committed: Sun Oct 21 06:14:12 2012 UTC (11 years, 6 months ago) by jsr166
Branch: MAIN
Changes since 1.7: +0 -1 lines
Log Message:
delete trailing empty lines of javadoc

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4 jsr166 1.4 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.1 */
6    
7     import java.util.*;
8     import java.util.concurrent.*;
9     import java.util.concurrent.atomic.*;
10    
11     // parallel sums and cumulations
12    
13     public class FJSums {
14     static final long NPS = (1000L * 1000 * 1000);
15     static int THRESHOLD;
16    
17 jsr166 1.5 public static void main(String[] args) throws Exception {
18 dl 1.1 int procs = 0;
19     int n = 1 << 25;
20     int reps = 10;
21     try {
22     if (args.length > 0)
23     procs = Integer.parseInt(args[0]);
24     if (args.length > 1)
25     n = Integer.parseInt(args[1]);
26     if (args.length > 2)
27 dl 1.6 reps = Integer.parseInt(args[2]);
28 dl 1.1 }
29     catch (Exception e) {
30     System.out.println("Usage: java FJSums threads n reps");
31     return;
32     }
33 jsr166 1.3 ForkJoinPool g = (procs == 0) ? new ForkJoinPool() :
34 dl 1.1 new ForkJoinPool(procs);
35     System.out.println("Number of procs=" + g.getParallelism());
36     // for now hardwire Cumulate threshold to 8 * #CPUs leaf tasks
37     THRESHOLD = 1 + ((n + 7) >>> 3) / g.getParallelism();
38 jsr166 1.2
39 dl 1.1 long[] a = new long[n];
40     for (int i = 0; i < n; ++i)
41     a[i] = i;
42     long expected = ((long)n * (long)(n - 1)) / 2;
43     for (int i = 0; i < 2; ++i) {
44     System.out.print("Seq: ");
45     long last = System.nanoTime();
46     long ss = seqSum(a, 0, n);
47     double elapsed = elapsedTime(last);
48     System.out.printf("sum = %24d time: %7.3f\n", ss, elapsed);
49 jsr166 1.2 if (ss != expected)
50 dl 1.1 throw new Error("expected " + expected + " != " + ss);
51     }
52     for (int i = 0; i < reps; ++i) {
53     System.out.print("Par: ");
54     long last = System.nanoTime();
55     Summer s = new Summer(a, 0, a.length, null);
56     g.invoke(s);
57     long ss = s.result;
58     double elapsed = elapsedTime(last);
59     System.out.printf("sum = %24d time: %7.3f\n", ss, elapsed);
60 jsr166 1.2 if (i == 0 && ss != expected)
61 dl 1.1 throw new Error("expected " + expected + " != " + ss);
62     System.out.print("Cum: ");
63     last = System.nanoTime();
64     g.invoke(new Cumulater(null, a, 0, n));
65     long sc = a[n - 1];
66     elapsed = elapsedTime(last);
67     System.out.printf("sum = %24d time: %7.3f\n", ss, elapsed);
68     if (sc != ss)
69     throw new Error("expected " + ss + " != " + sc);
70     }
71     System.out.println(g);
72     g.shutdown();
73     }
74    
75     static double elapsedTime(long startTime) {
76     return (double)(System.nanoTime() - startTime) / NPS;
77     }
78    
79     static long seqSum(long[] array, int l, int h) {
80     long sum = 0;
81 jsr166 1.2 for (int i = l; i < h; ++i)
82 dl 1.1 sum += array[i];
83     return sum;
84     }
85    
86     static long seqCumulate(long[] array, int lo, int hi, long base) {
87     long sum = base;
88     for (int i = lo; i < hi; ++i)
89     array[i] = sum += array[i];
90     return sum;
91     }
92 jsr166 1.2
93 dl 1.1 /**
94     * Adapted from Applyer demo in RecursiveAction docs
95     */
96     static final class Summer extends RecursiveAction {
97     final long[] array;
98     final int lo, hi;
99     long result;
100     Summer next; // keeps track of right-hand-side tasks
101     Summer(long[] array, int lo, int hi, Summer next) {
102     this.array = array; this.lo = lo; this.hi = hi;
103     this.next = next;
104     }
105 jsr166 1.2
106 dl 1.1 protected void compute() {
107     int l = lo;
108     int h = hi;
109     Summer right = null;
110     while (h - l > 1 && getSurplusQueuedTaskCount() <= 3) {
111     int mid = (l + h) >>> 1;
112     right = new Summer(array, mid, h, right);
113     right.fork();
114     h = mid;
115     }
116     long sum = seqSum(array, l, h);
117     while (right != null) {
118     if (right.tryUnfork()) // directly calculate if not stolen
119     sum += seqSum(array, right.lo, right.hi);
120     else {
121     right.join();
122     sum += right.result;
123     }
124     right = right.next;
125     }
126     result = sum;
127     }
128     }
129    
130     /**
131     * Cumulative scan, adapted from ParallelArray code
132     *
133     * A basic version of scan is straightforward.
134     * Keep dividing by two to threshold segment size, and then:
135     * Pass 1: Create tree of partial sums for each segment
136     * Pass 2: For each segment, cumulate with offset of left sibling
137     * See G. Blelloch's http://www.cs.cmu.edu/~scandal/alg/scan.html
138     *
139     * This version improves performance within FJ framework mainly by
140     * allowing second pass of ready left-hand sides to proceed even
141     * if some right-hand side first passes are still executing. It
142     * also combines first and second pass for leftmost segment, and
143     * for cumulate (not precumulate) also skips first pass for
144     * rightmost segment (whose result is not needed for second pass).
145     *
146     * To manage this, it relies on "phase" phase/state control field
147     * maintaining bits CUMULATE, SUMMED, and FINISHED. CUMULATE is
148     * main phase bit. When false, segments compute only their sum.
149     * When true, they cumulate array elements. CUMULATE is set at
150     * root at beginning of second pass and then propagated down. But
151     * it may also be set earlier for subtrees with lo==0 (the
152     * left spine of tree). SUMMED is a one bit join count. For leafs,
153     * set when summed. For internal nodes, becomes true when one
154     * child is summed. When second child finishes summing, it then
155     * moves up tree to trigger cumulate phase. FINISHED is also a one
156     * bit join count. For leafs, it is set when cumulated. For
157     * internal nodes, it becomes true when one child is cumulated.
158     * When second child finishes cumulating, it then moves up tree,
159     * executing complete() at the root.
160     */
161     static final class Cumulater extends ForkJoinTask<Void> {
162     static final short CUMULATE = (short)1;
163     static final short SUMMED = (short)2;
164     static final short FINISHED = (short)4;
165    
166     final Cumulater parent;
167     final long[] array;
168     Cumulater left, right;
169     final int lo;
170     final int hi;
171     volatile int phase; // phase/state
172     long in, out; // initially zero
173    
174     static final AtomicIntegerFieldUpdater<Cumulater> phaseUpdater =
175     AtomicIntegerFieldUpdater.newUpdater(Cumulater.class, "phase");
176    
177     Cumulater(Cumulater parent, long[] array, int lo, int hi) {
178     this.parent = parent;
179     this.array = array;
180     this.lo = lo;
181     this.hi = hi;
182     }
183    
184     public final Void getRawResult() { return null; }
185     protected final void setRawResult(Void mustBeNull) { }
186    
187     /** Returns true if can CAS CUMULATE bit true */
188     final boolean transitionToCumulate() {
189     int c;
190     while (((c = phase) & CUMULATE) == 0)
191     if (phaseUpdater.compareAndSet(this, c, c | CUMULATE))
192     return true;
193     return false;
194     }
195    
196     public final boolean exec() {
197     if (hi - lo > THRESHOLD) {
198     if (left == null) { // first pass
199     int mid = (lo + hi) >>> 1;
200     left = new Cumulater(this, array, lo, mid);
201     right = new Cumulater(this, array, mid, hi);
202     }
203    
204     boolean cumulate = (phase & CUMULATE) != 0;
205     if (cumulate) {
206     long pin = in;
207     left.in = pin;
208     right.in = pin + left.out;
209     }
210    
211     if (!cumulate || right.transitionToCumulate())
212     right.fork();
213     if (!cumulate || left.transitionToCumulate())
214     left.exec();
215     }
216     else {
217     int cb;
218     for (;;) { // Establish action: sum, cumulate, or both
219     int b = phase;
220     if ((b & FINISHED) != 0) // already done
221     return false;
222     if ((b & CUMULATE) != 0)
223     cb = FINISHED;
224     else if (lo == 0) // combine leftmost
225     cb = (SUMMED|FINISHED);
226     else
227     cb = SUMMED;
228     if (phaseUpdater.compareAndSet(this, b, b|cb))
229     break;
230     }
231    
232     if (cb == SUMMED)
233     out = seqSum(array, lo, hi);
234     else if (cb == FINISHED)
235     seqCumulate(array, lo, hi, in);
236     else if (cb == (SUMMED|FINISHED))
237     out = seqCumulate(array, lo, hi, 0L);
238    
239     // propagate up
240     Cumulater ch = this;
241     Cumulater par = parent;
242     for (;;) {
243     if (par == null) {
244     if ((cb & FINISHED) != 0)
245     ch.complete(null);
246     break;
247     }
248     int pb = par.phase;
249     if ((pb & cb & FINISHED) != 0) { // both finished
250     ch = par;
251     par = par.parent;
252     }
253     else if ((pb & cb & SUMMED) != 0) { // both summed
254     par.out = par.left.out + par.right.out;
255     int refork =
256     ((pb & CUMULATE) == 0 &&
257 jsr166 1.3 par.lo == 0) ? CUMULATE : 0;
258 dl 1.1 int nextPhase = pb|cb|refork;
259     if (pb == nextPhase ||
260     phaseUpdater.compareAndSet(par, pb, nextPhase)) {
261     if (refork != 0)
262     par.fork();
263     cb = SUMMED; // drop finished bit
264     ch = par;
265     par = par.parent;
266     }
267     }
268     else if (phaseUpdater.compareAndSet(par, pb, pb|cb))
269     break;
270     }
271     }
272     return false;
273     }
274    
275     }
276    
277     }