ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/ArrayPrefixUtil.java
Revision: 1.1
Committed: Sat Dec 22 18:42:44 2012 UTC (11 years, 4 months ago) by dl
Branch: MAIN
Log Message:
Initial version

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util;
8     import java.util.concurrent.ForkJoinPool;
9     import java.util.concurrent.CountedCompleter;
10     import java.util.function.BinaryOperator;
11     import java.util.function.IntBinaryOperator;
12     import java.util.function.LongBinaryOperator;
13     import java.util.function.DoubleBinaryOperator;
14    
15     /**
16     * ForkJoin tasks to perform Arrays.parallelPrefix operations.
17     *
18     * @author Doug Lea
19     * @since 1.8
20     */
21     class ArrayPrefixUtil {
22     private ArrayPrefixUtil() {}; // non-instantiable
23    
24     /*
25     * Parallel prefix (aka cumulate, scan) task classes
26     * are based loosely on Guy Blelloch's original
27     * algorithm (http://www.cs.cmu.edu/~scandal/alg/scan.html):
28     * Keep dividing by two to threshold segment size, and then:
29     * Pass 1: Create tree of partial sums for each segment
30     * Pass 2: For each segment, cumulate with offset of left sibling
31     *
32     * This version improves performance within FJ framework mainly by
33     * allowing the second pass of ready left-hand sides to proceed
34     * even if some right-hand side first passes are still executing.
35     * It also combines first and second pass for leftmost segment,
36     * and skips the first pass for rightmost segment (whose result is
37     * not needed for second pass). It similarly manages to avoid
38     * requiring that users supply an identity basis for accumulations
39     * by tracking those segments/subtasks for which the first
40     * existing element is used as base.
41     *
42     * Managing this relies on ORing some bits in the pendingCount for
43     * phases/states: CUMULATE, SUMMED, and FINISHED. CUMULATE is the
44     * main phase bit. When false, segments compute only their sum.
45     * When true, they cumulate array elements. CUMULATE is set at
46     * root at beginning of second pass and then propagated down. But
47     * it may also be set earlier for subtrees with lo==0 (the left
48     * spine of tree). SUMMED is a one bit join count. For leafs, it
49     * is set when summed. For internal nodes, it becomes true when
50     * one child is summed. When the second child finishes summing,
51     * we then moves up tree to trigger the cumulate phase. FINISHED
52     * is also a one bit join count. For leafs, it is set when
53     * cumulated. For internal nodes, it becomes true when one child
54     * is cumulated. When the second child finishes cumulating, it
55     * then moves up tree, completing at the root.
56     *
57     * To better exploit locality and reduce overhead, the compute
58     * method loops starting with the current task, moving if possible
59     * to one of its subtasks rather than forking.
60     *
61     * As usual for this sort of utility, there are 4 versions, that
62     * are simple copy/paste/adapt variants of each other. (The
63     * double and int versions differ from long version soley by
64     * replacing "long" (with case-matching)).
65     */
66    
67     // see above
68     static final int CUMULATE = 1;
69     static final int SUMMED = 2;
70     static final int FINISHED = 4;
71    
72     /** The smallest subtask array partition size to use as threshold */
73     static final int MIN_PARTITION = 16;
74    
75     static final class CumulateTask<T> extends CountedCompleter<Void> {
76     final T[] array;
77     final BinaryOperator<T> function;
78     CumulateTask<T> left, right;
79     T in, out;
80     final int lo, hi, origin, fence, threshold;
81    
82     /** Root task constructor */
83     public CumulateTask(CumulateTask<T> parent,
84     BinaryOperator<T> function,
85     T[] array, int lo, int hi) {
86     super(parent);
87     this.function = function; this.array = array;
88     this.lo = this.origin = lo; this.hi = this.fence = hi;
89     int p;
90     this.threshold =
91     (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
92     <= MIN_PARTITION ? MIN_PARTITION : p;
93     }
94    
95     /** Subtask constructor */
96     CumulateTask(CumulateTask<T> parent, BinaryOperator<T> function,
97     T[] array, int origin, int fence, int threshold,
98     int lo, int hi) {
99     super(parent);
100     this.function = function; this.array = array;
101     this.origin = origin; this.fence = fence;
102     this.threshold = threshold;
103     this.lo = lo; this.hi = hi;
104     }
105    
106     public final void compute() {
107     final BinaryOperator<T> fn;
108     final T[] a;
109     if ((fn = this.function) == null || (a = this.array) == null)
110     throw new NullPointerException(); // hoist checks
111     int th = threshold, org = origin, fnc = fence, l, h;
112     CumulateTask<T> t = this;
113     outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
114     if (h - l > th) {
115     CumulateTask<T> lt = t.left, rt = t.right, f;
116     if (lt == null) { // first pass
117     int mid = (l + h) >>> 1;
118     f = rt = t.right =
119     new CumulateTask<T>(t, fn, a, org, fnc, th, mid, h);
120     t = lt = t.left =
121     new CumulateTask<T>(t, fn, a, org, fnc, th, l, mid);
122     }
123     else { // possibly refork
124     T pin = t.in;
125     lt.in = pin;
126     f = t = null;
127     if (rt != null) {
128     T lout = lt.out;
129     rt.in = (l == org ? lout :
130     fn.apply(pin, lout));
131     for (int c;;) {
132     if (((c = rt.getPendingCount()) & CUMULATE) != 0)
133     break;
134     if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
135     t = rt;
136     break;
137     }
138     }
139     }
140     for (int c;;) {
141     if (((c = lt.getPendingCount()) & CUMULATE) != 0)
142     break;
143     if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
144     if (t != null)
145     f = t;
146     t = lt;
147     break;
148     }
149     }
150     if (t == null)
151     break;
152     }
153     if (f != null)
154     f.fork();
155     }
156     else {
157     int state; // Transition to sum, cumulate, or both
158     for (int b;;) {
159     if (((b = t.getPendingCount()) & FINISHED) != 0)
160     break outer; // already done
161     state = ((b & CUMULATE) != 0? FINISHED :
162     (l > org) ? SUMMED : (SUMMED|FINISHED));
163     if (t.compareAndSetPendingCount(b, b|state))
164     break;
165     }
166    
167     T sum;
168     if (state != SUMMED) {
169     int first;
170     if (l == org) { // leftmost; no in
171     sum = a[org];
172     first = org + 1;
173     }
174     else {
175     sum = t.in;
176     first = l;
177     }
178     for (int i = first; i < h; ++i) // cumulate
179     a[i] = sum = fn.apply(sum, a[i]);
180     }
181     else if (h < fnc) { // skip rightmost
182     sum = a[l];
183     for (int i = l + 1; i < h; ++i) // sum only
184     sum = fn.apply(sum, a[i]);
185     }
186     else
187     sum = t.in;
188     t.out = sum;
189     for (CumulateTask<T> par;;) { // propagate
190     if ((par = (CumulateTask<T>)t.getCompleter()) == null) {
191     if ((state & FINISHED) != 0) // enable join
192     t.quietlyComplete();
193     break outer;
194     }
195     int b = par.getPendingCount();
196     if ((b & state & FINISHED) != 0)
197     t = par; // both done
198     else if ((b & state & SUMMED) != 0) { // both summed
199     int nextState; CumulateTask<T> lt, rt;
200     if ((lt = par.left) != null &&
201     (rt = par.right) != null) {
202     T lout = lt.out;
203     par.out = (rt.hi == fnc ? lout :
204     fn.apply(lout, rt.out));
205     }
206     int refork = (((b & CUMULATE) == 0 &&
207     par.lo == org) ? CUMULATE : 0);
208     if ((nextState = b|state|refork) == b ||
209     par.compareAndSetPendingCount(b, nextState)) {
210     state = SUMMED; // drop finished
211     t = par;
212     if (refork != 0)
213     par.fork();
214     }
215     }
216     else if (par.compareAndSetPendingCount(b, b|state))
217     break outer; // sib not ready
218     }
219     }
220     }
221     }
222     }
223    
224     static final class LongCumulateTask extends CountedCompleter<Void> {
225     final long[] array;
226     final LongBinaryOperator function;
227     LongCumulateTask left, right;
228     long in, out;
229     final int lo, hi, origin, fence, threshold;
230    
231     /** Root task constructor */
232     public LongCumulateTask(LongCumulateTask parent,
233     LongBinaryOperator function,
234     long[] array, int lo, int hi) {
235     super(parent);
236     this.function = function; this.array = array;
237     this.lo = this.origin = lo; this.hi = this.fence = hi;
238     int p;
239     this.threshold =
240     (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
241     <= MIN_PARTITION ? MIN_PARTITION : p;
242     }
243    
244     /** Subtask constructor */
245     LongCumulateTask(LongCumulateTask parent, LongBinaryOperator function,
246     long[] array, int origin, int fence, int threshold,
247     int lo, int hi) {
248     super(parent);
249     this.function = function; this.array = array;
250     this.origin = origin; this.fence = fence;
251     this.threshold = threshold;
252     this.lo = lo; this.hi = hi;
253     }
254    
255     public final void compute() {
256     final LongBinaryOperator fn;
257     final long[] a;
258     if ((fn = this.function) == null || (a = this.array) == null)
259     throw new NullPointerException(); // hoist checks
260     int th = threshold, org = origin, fnc = fence, l, h;
261     LongCumulateTask t = this;
262     outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
263     if (h - l > th) {
264     LongCumulateTask lt = t.left, rt = t.right, f;
265     if (lt == null) { // first pass
266     int mid = (l + h) >>> 1;
267     f = rt = t.right =
268     new LongCumulateTask(t, fn, a, org, fnc, th, mid, h);
269     t = lt = t.left =
270     new LongCumulateTask(t, fn, a, org, fnc, th, l, mid);
271     }
272     else { // possibly refork
273     long pin = t.in;
274     lt.in = pin;
275     f = t = null;
276     if (rt != null) {
277     long lout = lt.out;
278     rt.in = (l == org ? lout :
279     fn.applyAsLong(pin, lout));
280     for (int c;;) {
281     if (((c = rt.getPendingCount()) & CUMULATE) != 0)
282     break;
283     if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
284     t = rt;
285     break;
286     }
287     }
288     }
289     for (int c;;) {
290     if (((c = lt.getPendingCount()) & CUMULATE) != 0)
291     break;
292     if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
293     if (t != null)
294     f = t;
295     t = lt;
296     break;
297     }
298     }
299     if (t == null)
300     break;
301     }
302     if (f != null)
303     f.fork();
304     }
305     else {
306     int state; // Transition to sum, cumulate, or both
307     for (int b;;) {
308     if (((b = t.getPendingCount()) & FINISHED) != 0)
309     break outer; // already done
310     state = ((b & CUMULATE) != 0? FINISHED :
311     (l > org) ? SUMMED : (SUMMED|FINISHED));
312     if (t.compareAndSetPendingCount(b, b|state))
313     break;
314     }
315    
316     long sum;
317     if (state != SUMMED) {
318     int first;
319     if (l == org) { // leftmost; no in
320     sum = a[org];
321     first = org + 1;
322     }
323     else {
324     sum = t.in;
325     first = l;
326     }
327     for (int i = first; i < h; ++i) // cumulate
328     a[i] = sum = fn.applyAsLong(sum, a[i]);
329     }
330     else if (h < fnc) { // skip rightmost
331     sum = a[l];
332     for (int i = l + 1; i < h; ++i) // sum only
333     sum = fn.applyAsLong(sum, a[i]);
334     }
335     else
336     sum = t.in;
337     t.out = sum;
338     for (LongCumulateTask par;;) { // propagate
339     if ((par = (LongCumulateTask)t.getCompleter()) == null) {
340     if ((state & FINISHED) != 0) // enable join
341     t.quietlyComplete();
342     break outer;
343     }
344     int b = par.getPendingCount();
345     if ((b & state & FINISHED) != 0)
346     t = par; // both done
347     else if ((b & state & SUMMED) != 0) { // both summed
348     int nextState; LongCumulateTask lt, rt;
349     if ((lt = par.left) != null &&
350     (rt = par.right) != null) {
351     long lout = lt.out;
352     par.out = (rt.hi == fnc ? lout :
353     fn.applyAsLong(lout, rt.out));
354     }
355     int refork = (((b & CUMULATE) == 0 &&
356     par.lo == org) ? CUMULATE : 0);
357     if ((nextState = b|state|refork) == b ||
358     par.compareAndSetPendingCount(b, nextState)) {
359     state = SUMMED; // drop finished
360     t = par;
361     if (refork != 0)
362     par.fork();
363     }
364     }
365     else if (par.compareAndSetPendingCount(b, b|state))
366     break outer; // sib not ready
367     }
368     }
369     }
370     }
371     }
372    
373     static final class DoubleCumulateTask extends CountedCompleter<Void> {
374     final double[] array;
375     final DoubleBinaryOperator function;
376     DoubleCumulateTask left, right;
377     double in, out;
378     final int lo, hi, origin, fence, threshold;
379    
380     /** Root task constructor */
381     public DoubleCumulateTask(DoubleCumulateTask parent,
382     DoubleBinaryOperator function,
383     double[] array, int lo, int hi) {
384     super(parent);
385     this.function = function; this.array = array;
386     this.lo = this.origin = lo; this.hi = this.fence = hi;
387     int p;
388     this.threshold =
389     (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
390     <= MIN_PARTITION ? MIN_PARTITION : p;
391     }
392    
393     /** Subtask constructor */
394     DoubleCumulateTask(DoubleCumulateTask parent, DoubleBinaryOperator function,
395     double[] array, int origin, int fence, int threshold,
396     int lo, int hi) {
397     super(parent);
398     this.function = function; this.array = array;
399     this.origin = origin; this.fence = fence;
400     this.threshold = threshold;
401     this.lo = lo; this.hi = hi;
402     }
403    
404     public final void compute() {
405     final DoubleBinaryOperator fn;
406     final double[] a;
407     if ((fn = this.function) == null || (a = this.array) == null)
408     throw new NullPointerException(); // hoist checks
409     int th = threshold, org = origin, fnc = fence, l, h;
410     DoubleCumulateTask t = this;
411     outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
412     if (h - l > th) {
413     DoubleCumulateTask lt = t.left, rt = t.right, f;
414     if (lt == null) { // first pass
415     int mid = (l + h) >>> 1;
416     f = rt = t.right =
417     new DoubleCumulateTask(t, fn, a, org, fnc, th, mid, h);
418     t = lt = t.left =
419     new DoubleCumulateTask(t, fn, a, org, fnc, th, l, mid);
420     }
421     else { // possibly refork
422     double pin = t.in;
423     lt.in = pin;
424     f = t = null;
425     if (rt != null) {
426     double lout = lt.out;
427     rt.in = (l == org ? lout :
428     fn.applyAsDouble(pin, lout));
429     for (int c;;) {
430     if (((c = rt.getPendingCount()) & CUMULATE) != 0)
431     break;
432     if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
433     t = rt;
434     break;
435     }
436     }
437     }
438     for (int c;;) {
439     if (((c = lt.getPendingCount()) & CUMULATE) != 0)
440     break;
441     if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
442     if (t != null)
443     f = t;
444     t = lt;
445     break;
446     }
447     }
448     if (t == null)
449     break;
450     }
451     if (f != null)
452     f.fork();
453     }
454     else {
455     int state; // Transition to sum, cumulate, or both
456     for (int b;;) {
457     if (((b = t.getPendingCount()) & FINISHED) != 0)
458     break outer; // already done
459     state = ((b & CUMULATE) != 0? FINISHED :
460     (l > org) ? SUMMED : (SUMMED|FINISHED));
461     if (t.compareAndSetPendingCount(b, b|state))
462     break;
463     }
464    
465     double sum;
466     if (state != SUMMED) {
467     int first;
468     if (l == org) { // leftmost; no in
469     sum = a[org];
470     first = org + 1;
471     }
472     else {
473     sum = t.in;
474     first = l;
475     }
476     for (int i = first; i < h; ++i) // cumulate
477     a[i] = sum = fn.applyAsDouble(sum, a[i]);
478     }
479     else if (h < fnc) { // skip rightmost
480     sum = a[l];
481     for (int i = l + 1; i < h; ++i) // sum only
482     sum = fn.applyAsDouble(sum, a[i]);
483     }
484     else
485     sum = t.in;
486     t.out = sum;
487     for (DoubleCumulateTask par;;) { // propagate
488     if ((par = (DoubleCumulateTask)t.getCompleter()) == null) {
489     if ((state & FINISHED) != 0) // enable join
490     t.quietlyComplete();
491     break outer;
492     }
493     int b = par.getPendingCount();
494     if ((b & state & FINISHED) != 0)
495     t = par; // both done
496     else if ((b & state & SUMMED) != 0) { // both summed
497     int nextState; DoubleCumulateTask lt, rt;
498     if ((lt = par.left) != null &&
499     (rt = par.right) != null) {
500     double lout = lt.out;
501     par.out = (rt.hi == fnc ? lout :
502     fn.applyAsDouble(lout, rt.out));
503     }
504     int refork = (((b & CUMULATE) == 0 &&
505     par.lo == org) ? CUMULATE : 0);
506     if ((nextState = b|state|refork) == b ||
507     par.compareAndSetPendingCount(b, nextState)) {
508     state = SUMMED; // drop finished
509     t = par;
510     if (refork != 0)
511     par.fork();
512     }
513     }
514     else if (par.compareAndSetPendingCount(b, b|state))
515     break outer; // sib not ready
516     }
517     }
518     }
519     }
520     }
521    
522     static final class IntCumulateTask extends CountedCompleter<Void> {
523     final int[] array;
524     final IntBinaryOperator function;
525     IntCumulateTask left, right;
526     int in, out;
527     final int lo, hi, origin, fence, threshold;
528    
529     /** Root task constructor */
530     public IntCumulateTask(IntCumulateTask parent,
531     IntBinaryOperator function,
532     int[] array, int lo, int hi) {
533     super(parent);
534     this.function = function; this.array = array;
535     this.lo = this.origin = lo; this.hi = this.fence = hi;
536     int p;
537     this.threshold =
538     (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
539     <= MIN_PARTITION ? MIN_PARTITION : p;
540     }
541    
542     /** Subtask constructor */
543     IntCumulateTask(IntCumulateTask parent, IntBinaryOperator function,
544     int[] array, int origin, int fence, int threshold,
545     int lo, int hi) {
546     super(parent);
547     this.function = function; this.array = array;
548     this.origin = origin; this.fence = fence;
549     this.threshold = threshold;
550     this.lo = lo; this.hi = hi;
551     }
552    
553     public final void compute() {
554     final IntBinaryOperator fn;
555     final int[] a;
556     if ((fn = this.function) == null || (a = this.array) == null)
557     throw new NullPointerException(); // hoist checks
558     int th = threshold, org = origin, fnc = fence, l, h;
559     IntCumulateTask t = this;
560     outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
561     if (h - l > th) {
562     IntCumulateTask lt = t.left, rt = t.right, f;
563     if (lt == null) { // first pass
564     int mid = (l + h) >>> 1;
565     f = rt = t.right =
566     new IntCumulateTask(t, fn, a, org, fnc, th, mid, h);
567     t = lt = t.left =
568     new IntCumulateTask(t, fn, a, org, fnc, th, l, mid);
569     }
570     else { // possibly refork
571     int pin = t.in;
572     lt.in = pin;
573     f = t = null;
574     if (rt != null) {
575     int lout = lt.out;
576     rt.in = (l == org ? lout :
577     fn.applyAsInt(pin, lout));
578     for (int c;;) {
579     if (((c = rt.getPendingCount()) & CUMULATE) != 0)
580     break;
581     if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
582     t = rt;
583     break;
584     }
585     }
586     }
587     for (int c;;) {
588     if (((c = lt.getPendingCount()) & CUMULATE) != 0)
589     break;
590     if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
591     if (t != null)
592     f = t;
593     t = lt;
594     break;
595     }
596     }
597     if (t == null)
598     break;
599     }
600     if (f != null)
601     f.fork();
602     }
603     else {
604     int state; // Transition to sum, cumulate, or both
605     for (int b;;) {
606     if (((b = t.getPendingCount()) & FINISHED) != 0)
607     break outer; // already done
608     state = ((b & CUMULATE) != 0? FINISHED :
609     (l > org) ? SUMMED : (SUMMED|FINISHED));
610     if (t.compareAndSetPendingCount(b, b|state))
611     break;
612     }
613    
614     int sum;
615     if (state != SUMMED) {
616     int first;
617     if (l == org) { // leftmost; no in
618     sum = a[org];
619     first = org + 1;
620     }
621     else {
622     sum = t.in;
623     first = l;
624     }
625     for (int i = first; i < h; ++i) // cumulate
626     a[i] = sum = fn.applyAsInt(sum, a[i]);
627     }
628     else if (h < fnc) { // skip rightmost
629     sum = a[l];
630     for (int i = l + 1; i < h; ++i) // sum only
631     sum = fn.applyAsInt(sum, a[i]);
632     }
633     else
634     sum = t.in;
635     t.out = sum;
636     for (IntCumulateTask par;;) { // propagate
637     if ((par = (IntCumulateTask)t.getCompleter()) == null) {
638     if ((state & FINISHED) != 0) // enable join
639     t.quietlyComplete();
640     break outer;
641     }
642     int b = par.getPendingCount();
643     if ((b & state & FINISHED) != 0)
644     t = par; // both done
645     else if ((b & state & SUMMED) != 0) { // both summed
646     int nextState; IntCumulateTask lt, rt;
647     if ((lt = par.left) != null &&
648     (rt = par.right) != null) {
649     int lout = lt.out;
650     par.out = (rt.hi == fnc ? lout :
651     fn.applyAsInt(lout, rt.out));
652     }
653     int refork = (((b & CUMULATE) == 0 &&
654     par.lo == org) ? CUMULATE : 0);
655     if ((nextState = b|state|refork) == b ||
656     par.compareAndSetPendingCount(b, nextState)) {
657     state = SUMMED; // drop finished
658     t = par;
659     if (refork != 0)
660     par.fork();
661     }
662     }
663     else if (par.compareAndSetPendingCount(b, b|state))
664     break outer; // sib not ready
665     }
666     }
667     }
668     }
669     }
670    
671    
672     }