ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/ArrayPrefixHelpers.java
Revision: 1.1
Committed: Wed Jan 16 19:01:22 2013 UTC (11 years, 3 months ago) by dl
Branch: MAIN
Log Message:
rename file

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util;
8     import java.util.concurrent.ForkJoinPool;
9     import java.util.concurrent.CountedCompleter;
10     import java.util.function.BinaryOperator;
11     import java.util.function.IntBinaryOperator;
12     import java.util.function.LongBinaryOperator;
13     import java.util.function.DoubleBinaryOperator;
14    
15     /**
16     * ForkJoin tasks to perform Arrays.parallelPrefix operations.
17     *
18     * @author Doug Lea
19     * @since 1.8
20     */
21     class ArrayPrefixHelpers {
22     private ArrayPrefixHelpers() {}; // non-instantiable
23    
24     /*
25     * Parallel prefix (aka cumulate, scan) task classes
26     * are based loosely on Guy Blelloch's original
27     * algorithm (http://www.cs.cmu.edu/~scandal/alg/scan.html):
28     * Keep dividing by two to threshold segment size, and then:
29     * Pass 1: Create tree of partial sums for each segment
30     * Pass 2: For each segment, cumulate with offset of left sibling
31     *
32     * This version improves performance within FJ framework mainly by
33     * allowing the second pass of ready left-hand sides to proceed
34     * even if some right-hand side first passes are still executing.
35     * It also combines first and second pass for leftmost segment,
36     * and skips the first pass for rightmost segment (whose result is
37     * not needed for second pass). It similarly manages to avoid
38     * requiring that users supply an identity basis for accumulations
39     * by tracking those segments/subtasks for which the first
40     * existing element is used as base.
41     *
42     * Managing this relies on ORing some bits in the pendingCount for
43     * phases/states: CUMULATE, SUMMED, and FINISHED. CUMULATE is the
44     * main phase bit. When false, segments compute only their sum.
45     * When true, they cumulate array elements. CUMULATE is set at
46     * root at beginning of second pass and then propagated down. But
47     * it may also be set earlier for subtrees with lo==0 (the left
48     * spine of tree). SUMMED is a one bit join count. For leafs, it
49     * is set when summed. For internal nodes, it becomes true when
50     * one child is summed. When the second child finishes summing,
51     * we then moves up tree to trigger the cumulate phase. FINISHED
52     * is also a one bit join count. For leafs, it is set when
53     * cumulated. For internal nodes, it becomes true when one child
54     * is cumulated. When the second child finishes cumulating, it
55     * then moves up tree, completing at the root.
56     *
57     * To better exploit locality and reduce overhead, the compute
58     * method loops starting with the current task, moving if possible
59     * to one of its subtasks rather than forking.
60     *
61     * As usual for this sort of utility, there are 4 versions, that
62     * are simple copy/paste/adapt variants of each other. (The
63     * double and int versions differ from long version solely by
64     * replacing "long" (with case-matching)).
65     */
66    
67     // see above
68     static final int CUMULATE = 1;
69     static final int SUMMED = 2;
70     static final int FINISHED = 4;
71    
72     /** The smallest subtask array partition size to use as threshold */
73     static final int MIN_PARTITION = 16;
74    
75     static final class CumulateTask<T> extends CountedCompleter<Void> {
76     final T[] array;
77     final BinaryOperator<T> function;
78     CumulateTask<T> left, right;
79     T in, out;
80     final int lo, hi, origin, fence, threshold;
81    
82     /** Root task constructor */
83     public CumulateTask(CumulateTask<T> parent,
84     BinaryOperator<T> function,
85     T[] array, int lo, int hi) {
86     super(parent);
87     this.function = function; this.array = array;
88     this.lo = this.origin = lo; this.hi = this.fence = hi;
89     int p;
90     this.threshold =
91     (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
92     <= MIN_PARTITION ? MIN_PARTITION : p;
93     }
94    
95     /** Subtask constructor */
96     CumulateTask(CumulateTask<T> parent, BinaryOperator<T> function,
97     T[] array, int origin, int fence, int threshold,
98     int lo, int hi) {
99     super(parent);
100     this.function = function; this.array = array;
101     this.origin = origin; this.fence = fence;
102     this.threshold = threshold;
103     this.lo = lo; this.hi = hi;
104     }
105    
106     public final void compute() {
107     final BinaryOperator<T> fn;
108     final T[] a;
109     if ((fn = this.function) == null || (a = this.array) == null)
110     throw new NullPointerException(); // hoist checks
111     int th = threshold, org = origin, fnc = fence, l, h;
112     CumulateTask<T> t = this;
113     outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
114     if (h - l > th) {
115     CumulateTask<T> lt = t.left, rt = t.right, f;
116     if (lt == null) { // first pass
117     int mid = (l + h) >>> 1;
118     f = rt = t.right =
119     new CumulateTask<T>(t, fn, a, org, fnc, th, mid, h);
120     t = lt = t.left =
121     new CumulateTask<T>(t, fn, a, org, fnc, th, l, mid);
122     }
123     else { // possibly refork
124     T pin = t.in;
125     lt.in = pin;
126     f = t = null;
127     if (rt != null) {
128     T lout = lt.out;
129     rt.in = (l == org ? lout :
130     fn.apply(pin, lout));
131     for (int c;;) {
132     if (((c = rt.getPendingCount()) & CUMULATE) != 0)
133     break;
134     if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
135     t = rt;
136     break;
137     }
138     }
139     }
140     for (int c;;) {
141     if (((c = lt.getPendingCount()) & CUMULATE) != 0)
142     break;
143     if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
144     if (t != null)
145     f = t;
146     t = lt;
147     break;
148     }
149     }
150     if (t == null)
151     break;
152     }
153     if (f != null)
154     f.fork();
155     }
156     else {
157     int state; // Transition to sum, cumulate, or both
158     for (int b;;) {
159     if (((b = t.getPendingCount()) & FINISHED) != 0)
160     break outer; // already done
161     state = ((b & CUMULATE) != 0 ? FINISHED :
162     (l > org) ? SUMMED : (SUMMED|FINISHED));
163     if (t.compareAndSetPendingCount(b, b|state))
164     break;
165     }
166    
167     T sum;
168     if (state != SUMMED) {
169     int first;
170     if (l == org) { // leftmost; no in
171     sum = a[org];
172     first = org + 1;
173     }
174     else {
175     sum = t.in;
176     first = l;
177     }
178     for (int i = first; i < h; ++i) // cumulate
179     a[i] = sum = fn.apply(sum, a[i]);
180     }
181     else if (h < fnc) { // skip rightmost
182     sum = a[l];
183     for (int i = l + 1; i < h; ++i) // sum only
184     sum = fn.apply(sum, a[i]);
185     }
186     else
187     sum = t.in;
188     t.out = sum;
189     for (CumulateTask<T> par;;) { // propagate
190     if ((par = (CumulateTask<T>)t.getCompleter()) == null) {
191     if ((state & FINISHED) != 0) // enable join
192     t.quietlyComplete();
193     break outer;
194     }
195     int b = par.getPendingCount();
196     if ((b & state & FINISHED) != 0)
197     t = par; // both done
198     else if ((b & state & SUMMED) != 0) { // both summed
199     int nextState; CumulateTask<T> lt, rt;
200     if ((lt = par.left) != null &&
201     (rt = par.right) != null) {
202     T lout = lt.out;
203     par.out = (rt.hi == fnc ? lout :
204     fn.apply(lout, rt.out));
205     }
206     int refork = (((b & CUMULATE) == 0 &&
207     par.lo == org) ? CUMULATE : 0);
208     if ((nextState = b|state|refork) == b ||
209     par.compareAndSetPendingCount(b, nextState)) {
210     state = SUMMED; // drop finished
211     t = par;
212     if (refork != 0)
213     par.fork();
214     }
215     }
216     else if (par.compareAndSetPendingCount(b, b|state))
217     break outer; // sib not ready
218     }
219     }
220     }
221     }
222     private static final long serialVersionUID = 5293554502939613543L;
223     }
224    
225     static final class LongCumulateTask extends CountedCompleter<Void> {
226     final long[] array;
227     final LongBinaryOperator function;
228     LongCumulateTask left, right;
229     long in, out;
230     final int lo, hi, origin, fence, threshold;
231    
232     /** Root task constructor */
233     public LongCumulateTask(LongCumulateTask parent,
234     LongBinaryOperator function,
235     long[] array, int lo, int hi) {
236     super(parent);
237     this.function = function; this.array = array;
238     this.lo = this.origin = lo; this.hi = this.fence = hi;
239     int p;
240     this.threshold =
241     (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
242     <= MIN_PARTITION ? MIN_PARTITION : p;
243     }
244    
245     /** Subtask constructor */
246     LongCumulateTask(LongCumulateTask parent, LongBinaryOperator function,
247     long[] array, int origin, int fence, int threshold,
248     int lo, int hi) {
249     super(parent);
250     this.function = function; this.array = array;
251     this.origin = origin; this.fence = fence;
252     this.threshold = threshold;
253     this.lo = lo; this.hi = hi;
254     }
255    
256     public final void compute() {
257     final LongBinaryOperator fn;
258     final long[] a;
259     if ((fn = this.function) == null || (a = this.array) == null)
260     throw new NullPointerException(); // hoist checks
261     int th = threshold, org = origin, fnc = fence, l, h;
262     LongCumulateTask t = this;
263     outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
264     if (h - l > th) {
265     LongCumulateTask lt = t.left, rt = t.right, f;
266     if (lt == null) { // first pass
267     int mid = (l + h) >>> 1;
268     f = rt = t.right =
269     new LongCumulateTask(t, fn, a, org, fnc, th, mid, h);
270     t = lt = t.left =
271     new LongCumulateTask(t, fn, a, org, fnc, th, l, mid);
272     }
273     else { // possibly refork
274     long pin = t.in;
275     lt.in = pin;
276     f = t = null;
277     if (rt != null) {
278     long lout = lt.out;
279     rt.in = (l == org ? lout :
280     fn.applyAsLong(pin, lout));
281     for (int c;;) {
282     if (((c = rt.getPendingCount()) & CUMULATE) != 0)
283     break;
284     if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
285     t = rt;
286     break;
287     }
288     }
289     }
290     for (int c;;) {
291     if (((c = lt.getPendingCount()) & CUMULATE) != 0)
292     break;
293     if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
294     if (t != null)
295     f = t;
296     t = lt;
297     break;
298     }
299     }
300     if (t == null)
301     break;
302     }
303     if (f != null)
304     f.fork();
305     }
306     else {
307     int state; // Transition to sum, cumulate, or both
308     for (int b;;) {
309     if (((b = t.getPendingCount()) & FINISHED) != 0)
310     break outer; // already done
311     state = ((b & CUMULATE) != 0 ? FINISHED :
312     (l > org) ? SUMMED : (SUMMED|FINISHED));
313     if (t.compareAndSetPendingCount(b, b|state))
314     break;
315     }
316    
317     long sum;
318     if (state != SUMMED) {
319     int first;
320     if (l == org) { // leftmost; no in
321     sum = a[org];
322     first = org + 1;
323     }
324     else {
325     sum = t.in;
326     first = l;
327     }
328     for (int i = first; i < h; ++i) // cumulate
329     a[i] = sum = fn.applyAsLong(sum, a[i]);
330     }
331     else if (h < fnc) { // skip rightmost
332     sum = a[l];
333     for (int i = l + 1; i < h; ++i) // sum only
334     sum = fn.applyAsLong(sum, a[i]);
335     }
336     else
337     sum = t.in;
338     t.out = sum;
339     for (LongCumulateTask par;;) { // propagate
340     if ((par = (LongCumulateTask)t.getCompleter()) == null) {
341     if ((state & FINISHED) != 0) // enable join
342     t.quietlyComplete();
343     break outer;
344     }
345     int b = par.getPendingCount();
346     if ((b & state & FINISHED) != 0)
347     t = par; // both done
348     else if ((b & state & SUMMED) != 0) { // both summed
349     int nextState; LongCumulateTask lt, rt;
350     if ((lt = par.left) != null &&
351     (rt = par.right) != null) {
352     long lout = lt.out;
353     par.out = (rt.hi == fnc ? lout :
354     fn.applyAsLong(lout, rt.out));
355     }
356     int refork = (((b & CUMULATE) == 0 &&
357     par.lo == org) ? CUMULATE : 0);
358     if ((nextState = b|state|refork) == b ||
359     par.compareAndSetPendingCount(b, nextState)) {
360     state = SUMMED; // drop finished
361     t = par;
362     if (refork != 0)
363     par.fork();
364     }
365     }
366     else if (par.compareAndSetPendingCount(b, b|state))
367     break outer; // sib not ready
368     }
369     }
370     }
371     }
372     private static final long serialVersionUID = -5074099945909284273L;
373     }
374    
375     static final class DoubleCumulateTask extends CountedCompleter<Void> {
376     final double[] array;
377     final DoubleBinaryOperator function;
378     DoubleCumulateTask left, right;
379     double in, out;
380     final int lo, hi, origin, fence, threshold;
381    
382     /** Root task constructor */
383     public DoubleCumulateTask(DoubleCumulateTask parent,
384     DoubleBinaryOperator function,
385     double[] array, int lo, int hi) {
386     super(parent);
387     this.function = function; this.array = array;
388     this.lo = this.origin = lo; this.hi = this.fence = hi;
389     int p;
390     this.threshold =
391     (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
392     <= MIN_PARTITION ? MIN_PARTITION : p;
393     }
394    
395     /** Subtask constructor */
396     DoubleCumulateTask(DoubleCumulateTask parent, DoubleBinaryOperator function,
397     double[] array, int origin, int fence, int threshold,
398     int lo, int hi) {
399     super(parent);
400     this.function = function; this.array = array;
401     this.origin = origin; this.fence = fence;
402     this.threshold = threshold;
403     this.lo = lo; this.hi = hi;
404     }
405    
406     public final void compute() {
407     final DoubleBinaryOperator fn;
408     final double[] a;
409     if ((fn = this.function) == null || (a = this.array) == null)
410     throw new NullPointerException(); // hoist checks
411     int th = threshold, org = origin, fnc = fence, l, h;
412     DoubleCumulateTask t = this;
413     outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
414     if (h - l > th) {
415     DoubleCumulateTask lt = t.left, rt = t.right, f;
416     if (lt == null) { // first pass
417     int mid = (l + h) >>> 1;
418     f = rt = t.right =
419     new DoubleCumulateTask(t, fn, a, org, fnc, th, mid, h);
420     t = lt = t.left =
421     new DoubleCumulateTask(t, fn, a, org, fnc, th, l, mid);
422     }
423     else { // possibly refork
424     double pin = t.in;
425     lt.in = pin;
426     f = t = null;
427     if (rt != null) {
428     double lout = lt.out;
429     rt.in = (l == org ? lout :
430     fn.applyAsDouble(pin, lout));
431     for (int c;;) {
432     if (((c = rt.getPendingCount()) & CUMULATE) != 0)
433     break;
434     if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
435     t = rt;
436     break;
437     }
438     }
439     }
440     for (int c;;) {
441     if (((c = lt.getPendingCount()) & CUMULATE) != 0)
442     break;
443     if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
444     if (t != null)
445     f = t;
446     t = lt;
447     break;
448     }
449     }
450     if (t == null)
451     break;
452     }
453     if (f != null)
454     f.fork();
455     }
456     else {
457     int state; // Transition to sum, cumulate, or both
458     for (int b;;) {
459     if (((b = t.getPendingCount()) & FINISHED) != 0)
460     break outer; // already done
461     state = ((b & CUMULATE) != 0 ? FINISHED :
462     (l > org) ? SUMMED : (SUMMED|FINISHED));
463     if (t.compareAndSetPendingCount(b, b|state))
464     break;
465     }
466    
467     double sum;
468     if (state != SUMMED) {
469     int first;
470     if (l == org) { // leftmost; no in
471     sum = a[org];
472     first = org + 1;
473     }
474     else {
475     sum = t.in;
476     first = l;
477     }
478     for (int i = first; i < h; ++i) // cumulate
479     a[i] = sum = fn.applyAsDouble(sum, a[i]);
480     }
481     else if (h < fnc) { // skip rightmost
482     sum = a[l];
483     for (int i = l + 1; i < h; ++i) // sum only
484     sum = fn.applyAsDouble(sum, a[i]);
485     }
486     else
487     sum = t.in;
488     t.out = sum;
489     for (DoubleCumulateTask par;;) { // propagate
490     if ((par = (DoubleCumulateTask)t.getCompleter()) == null) {
491     if ((state & FINISHED) != 0) // enable join
492     t.quietlyComplete();
493     break outer;
494     }
495     int b = par.getPendingCount();
496     if ((b & state & FINISHED) != 0)
497     t = par; // both done
498     else if ((b & state & SUMMED) != 0) { // both summed
499     int nextState; DoubleCumulateTask lt, rt;
500     if ((lt = par.left) != null &&
501     (rt = par.right) != null) {
502     double lout = lt.out;
503     par.out = (rt.hi == fnc ? lout :
504     fn.applyAsDouble(lout, rt.out));
505     }
506     int refork = (((b & CUMULATE) == 0 &&
507     par.lo == org) ? CUMULATE : 0);
508     if ((nextState = b|state|refork) == b ||
509     par.compareAndSetPendingCount(b, nextState)) {
510     state = SUMMED; // drop finished
511     t = par;
512     if (refork != 0)
513     par.fork();
514     }
515     }
516     else if (par.compareAndSetPendingCount(b, b|state))
517     break outer; // sib not ready
518     }
519     }
520     }
521     }
522     private static final long serialVersionUID = -586947823794232033L;
523     }
524    
525     static final class IntCumulateTask extends CountedCompleter<Void> {
526     final int[] array;
527     final IntBinaryOperator function;
528     IntCumulateTask left, right;
529     int in, out;
530     final int lo, hi, origin, fence, threshold;
531    
532     /** Root task constructor */
533     public IntCumulateTask(IntCumulateTask parent,
534     IntBinaryOperator function,
535     int[] array, int lo, int hi) {
536     super(parent);
537     this.function = function; this.array = array;
538     this.lo = this.origin = lo; this.hi = this.fence = hi;
539     int p;
540     this.threshold =
541     (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
542     <= MIN_PARTITION ? MIN_PARTITION : p;
543     }
544    
545     /** Subtask constructor */
546     IntCumulateTask(IntCumulateTask parent, IntBinaryOperator function,
547     int[] array, int origin, int fence, int threshold,
548     int lo, int hi) {
549     super(parent);
550     this.function = function; this.array = array;
551     this.origin = origin; this.fence = fence;
552     this.threshold = threshold;
553     this.lo = lo; this.hi = hi;
554     }
555    
556     public final void compute() {
557     final IntBinaryOperator fn;
558     final int[] a;
559     if ((fn = this.function) == null || (a = this.array) == null)
560     throw new NullPointerException(); // hoist checks
561     int th = threshold, org = origin, fnc = fence, l, h;
562     IntCumulateTask t = this;
563     outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
564     if (h - l > th) {
565     IntCumulateTask lt = t.left, rt = t.right, f;
566     if (lt == null) { // first pass
567     int mid = (l + h) >>> 1;
568     f = rt = t.right =
569     new IntCumulateTask(t, fn, a, org, fnc, th, mid, h);
570     t = lt = t.left =
571     new IntCumulateTask(t, fn, a, org, fnc, th, l, mid);
572     }
573     else { // possibly refork
574     int pin = t.in;
575     lt.in = pin;
576     f = t = null;
577     if (rt != null) {
578     int lout = lt.out;
579     rt.in = (l == org ? lout :
580     fn.applyAsInt(pin, lout));
581     for (int c;;) {
582     if (((c = rt.getPendingCount()) & CUMULATE) != 0)
583     break;
584     if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
585     t = rt;
586     break;
587     }
588     }
589     }
590     for (int c;;) {
591     if (((c = lt.getPendingCount()) & CUMULATE) != 0)
592     break;
593     if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
594     if (t != null)
595     f = t;
596     t = lt;
597     break;
598     }
599     }
600     if (t == null)
601     break;
602     }
603     if (f != null)
604     f.fork();
605     }
606     else {
607     int state; // Transition to sum, cumulate, or both
608     for (int b;;) {
609     if (((b = t.getPendingCount()) & FINISHED) != 0)
610     break outer; // already done
611     state = ((b & CUMULATE) != 0 ? FINISHED :
612     (l > org) ? SUMMED : (SUMMED|FINISHED));
613     if (t.compareAndSetPendingCount(b, b|state))
614     break;
615     }
616    
617     int sum;
618     if (state != SUMMED) {
619     int first;
620     if (l == org) { // leftmost; no in
621     sum = a[org];
622     first = org + 1;
623     }
624     else {
625     sum = t.in;
626     first = l;
627     }
628     for (int i = first; i < h; ++i) // cumulate
629     a[i] = sum = fn.applyAsInt(sum, a[i]);
630     }
631     else if (h < fnc) { // skip rightmost
632     sum = a[l];
633     for (int i = l + 1; i < h; ++i) // sum only
634     sum = fn.applyAsInt(sum, a[i]);
635     }
636     else
637     sum = t.in;
638     t.out = sum;
639     for (IntCumulateTask par;;) { // propagate
640     if ((par = (IntCumulateTask)t.getCompleter()) == null) {
641     if ((state & FINISHED) != 0) // enable join
642     t.quietlyComplete();
643     break outer;
644     }
645     int b = par.getPendingCount();
646     if ((b & state & FINISHED) != 0)
647     t = par; // both done
648     else if ((b & state & SUMMED) != 0) { // both summed
649     int nextState; IntCumulateTask lt, rt;
650     if ((lt = par.left) != null &&
651     (rt = par.right) != null) {
652     int lout = lt.out;
653     par.out = (rt.hi == fnc ? lout :
654     fn.applyAsInt(lout, rt.out));
655     }
656     int refork = (((b & CUMULATE) == 0 &&
657     par.lo == org) ? CUMULATE : 0);
658     if ((nextState = b|state|refork) == b ||
659     par.compareAndSetPendingCount(b, nextState)) {
660     state = SUMMED; // drop finished
661     t = par;
662     if (refork != 0)
663     par.fork();
664     }
665     }
666     else if (par.compareAndSetPendingCount(b, b|state))
667     break outer; // sib not ready
668     }
669     }
670     }
671     }
672     private static final long serialVersionUID = 3731755594596840961L;
673     }
674    
675     }