ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/ArrayPrefixHelpers.java
Revision: 1.2
Committed: Sun Jan 20 04:25:11 2013 UTC (11 years, 3 months ago) by jsr166
Branch: MAIN
Changes since 1.1: +3 -1 lines
Log Message:
fix javac [unchecked] warning

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util;
8     import java.util.concurrent.ForkJoinPool;
9     import java.util.concurrent.CountedCompleter;
10     import java.util.function.BinaryOperator;
11     import java.util.function.IntBinaryOperator;
12     import java.util.function.LongBinaryOperator;
13     import java.util.function.DoubleBinaryOperator;
14    
15     /**
16     * ForkJoin tasks to perform Arrays.parallelPrefix operations.
17     *
18     * @author Doug Lea
19     * @since 1.8
20     */
21     class ArrayPrefixHelpers {
22     private ArrayPrefixHelpers() {}; // non-instantiable
23    
24     /*
25     * Parallel prefix (aka cumulate, scan) task classes
26     * are based loosely on Guy Blelloch's original
27     * algorithm (http://www.cs.cmu.edu/~scandal/alg/scan.html):
28     * Keep dividing by two to threshold segment size, and then:
29     * Pass 1: Create tree of partial sums for each segment
30     * Pass 2: For each segment, cumulate with offset of left sibling
31     *
32     * This version improves performance within FJ framework mainly by
33     * allowing the second pass of ready left-hand sides to proceed
34     * even if some right-hand side first passes are still executing.
35     * It also combines first and second pass for leftmost segment,
36     * and skips the first pass for rightmost segment (whose result is
37     * not needed for second pass). It similarly manages to avoid
38     * requiring that users supply an identity basis for accumulations
39     * by tracking those segments/subtasks for which the first
40     * existing element is used as base.
41     *
42     * Managing this relies on ORing some bits in the pendingCount for
43     * phases/states: CUMULATE, SUMMED, and FINISHED. CUMULATE is the
44     * main phase bit. When false, segments compute only their sum.
45     * When true, they cumulate array elements. CUMULATE is set at
46     * root at beginning of second pass and then propagated down. But
47     * it may also be set earlier for subtrees with lo==0 (the left
48     * spine of tree). SUMMED is a one bit join count. For leafs, it
49     * is set when summed. For internal nodes, it becomes true when
50     * one child is summed. When the second child finishes summing,
51     * we then moves up tree to trigger the cumulate phase. FINISHED
52     * is also a one bit join count. For leafs, it is set when
53     * cumulated. For internal nodes, it becomes true when one child
54     * is cumulated. When the second child finishes cumulating, it
55     * then moves up tree, completing at the root.
56     *
57     * To better exploit locality and reduce overhead, the compute
58     * method loops starting with the current task, moving if possible
59     * to one of its subtasks rather than forking.
60     *
61     * As usual for this sort of utility, there are 4 versions, that
62     * are simple copy/paste/adapt variants of each other. (The
63     * double and int versions differ from long version solely by
64     * replacing "long" (with case-matching)).
65     */
66    
67     // see above
68     static final int CUMULATE = 1;
69     static final int SUMMED = 2;
70     static final int FINISHED = 4;
71    
72     /** The smallest subtask array partition size to use as threshold */
73     static final int MIN_PARTITION = 16;
74    
75     static final class CumulateTask<T> extends CountedCompleter<Void> {
76     final T[] array;
77     final BinaryOperator<T> function;
78     CumulateTask<T> left, right;
79     T in, out;
80     final int lo, hi, origin, fence, threshold;
81    
82     /** Root task constructor */
83     public CumulateTask(CumulateTask<T> parent,
84     BinaryOperator<T> function,
85     T[] array, int lo, int hi) {
86     super(parent);
87     this.function = function; this.array = array;
88     this.lo = this.origin = lo; this.hi = this.fence = hi;
89     int p;
90     this.threshold =
91     (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
92     <= MIN_PARTITION ? MIN_PARTITION : p;
93     }
94    
95     /** Subtask constructor */
96     CumulateTask(CumulateTask<T> parent, BinaryOperator<T> function,
97     T[] array, int origin, int fence, int threshold,
98     int lo, int hi) {
99     super(parent);
100     this.function = function; this.array = array;
101     this.origin = origin; this.fence = fence;
102     this.threshold = threshold;
103     this.lo = lo; this.hi = hi;
104     }
105    
106     public final void compute() {
107     final BinaryOperator<T> fn;
108     final T[] a;
109     if ((fn = this.function) == null || (a = this.array) == null)
110     throw new NullPointerException(); // hoist checks
111     int th = threshold, org = origin, fnc = fence, l, h;
112     CumulateTask<T> t = this;
113     outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
114     if (h - l > th) {
115     CumulateTask<T> lt = t.left, rt = t.right, f;
116     if (lt == null) { // first pass
117     int mid = (l + h) >>> 1;
118     f = rt = t.right =
119     new CumulateTask<T>(t, fn, a, org, fnc, th, mid, h);
120     t = lt = t.left =
121     new CumulateTask<T>(t, fn, a, org, fnc, th, l, mid);
122     }
123     else { // possibly refork
124     T pin = t.in;
125     lt.in = pin;
126     f = t = null;
127     if (rt != null) {
128     T lout = lt.out;
129     rt.in = (l == org ? lout :
130     fn.apply(pin, lout));
131     for (int c;;) {
132     if (((c = rt.getPendingCount()) & CUMULATE) != 0)
133     break;
134     if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
135     t = rt;
136     break;
137     }
138     }
139     }
140     for (int c;;) {
141     if (((c = lt.getPendingCount()) & CUMULATE) != 0)
142     break;
143     if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
144     if (t != null)
145     f = t;
146     t = lt;
147     break;
148     }
149     }
150     if (t == null)
151     break;
152     }
153     if (f != null)
154     f.fork();
155     }
156     else {
157     int state; // Transition to sum, cumulate, or both
158     for (int b;;) {
159     if (((b = t.getPendingCount()) & FINISHED) != 0)
160     break outer; // already done
161     state = ((b & CUMULATE) != 0 ? FINISHED :
162     (l > org) ? SUMMED : (SUMMED|FINISHED));
163     if (t.compareAndSetPendingCount(b, b|state))
164     break;
165     }
166    
167     T sum;
168     if (state != SUMMED) {
169     int first;
170     if (l == org) { // leftmost; no in
171     sum = a[org];
172     first = org + 1;
173     }
174     else {
175     sum = t.in;
176     first = l;
177     }
178     for (int i = first; i < h; ++i) // cumulate
179     a[i] = sum = fn.apply(sum, a[i]);
180     }
181     else if (h < fnc) { // skip rightmost
182     sum = a[l];
183     for (int i = l + 1; i < h; ++i) // sum only
184     sum = fn.apply(sum, a[i]);
185     }
186     else
187     sum = t.in;
188     t.out = sum;
189     for (CumulateTask<T> par;;) { // propagate
190 jsr166 1.2 @SuppressWarnings("unchecked") CumulateTask<T> partmp
191     = (CumulateTask<T>)t.getCompleter();
192     if ((par = partmp) == null) {
193 dl 1.1 if ((state & FINISHED) != 0) // enable join
194     t.quietlyComplete();
195     break outer;
196     }
197     int b = par.getPendingCount();
198     if ((b & state & FINISHED) != 0)
199     t = par; // both done
200     else if ((b & state & SUMMED) != 0) { // both summed
201     int nextState; CumulateTask<T> lt, rt;
202     if ((lt = par.left) != null &&
203     (rt = par.right) != null) {
204     T lout = lt.out;
205     par.out = (rt.hi == fnc ? lout :
206     fn.apply(lout, rt.out));
207     }
208     int refork = (((b & CUMULATE) == 0 &&
209     par.lo == org) ? CUMULATE : 0);
210     if ((nextState = b|state|refork) == b ||
211     par.compareAndSetPendingCount(b, nextState)) {
212     state = SUMMED; // drop finished
213     t = par;
214     if (refork != 0)
215     par.fork();
216     }
217     }
218     else if (par.compareAndSetPendingCount(b, b|state))
219     break outer; // sib not ready
220     }
221     }
222     }
223     }
224     private static final long serialVersionUID = 5293554502939613543L;
225     }
226    
227     static final class LongCumulateTask extends CountedCompleter<Void> {
228     final long[] array;
229     final LongBinaryOperator function;
230     LongCumulateTask left, right;
231     long in, out;
232     final int lo, hi, origin, fence, threshold;
233    
234     /** Root task constructor */
235     public LongCumulateTask(LongCumulateTask parent,
236     LongBinaryOperator function,
237     long[] array, int lo, int hi) {
238     super(parent);
239     this.function = function; this.array = array;
240     this.lo = this.origin = lo; this.hi = this.fence = hi;
241     int p;
242     this.threshold =
243     (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
244     <= MIN_PARTITION ? MIN_PARTITION : p;
245     }
246    
247     /** Subtask constructor */
248     LongCumulateTask(LongCumulateTask parent, LongBinaryOperator function,
249     long[] array, int origin, int fence, int threshold,
250     int lo, int hi) {
251     super(parent);
252     this.function = function; this.array = array;
253     this.origin = origin; this.fence = fence;
254     this.threshold = threshold;
255     this.lo = lo; this.hi = hi;
256     }
257    
258     public final void compute() {
259     final LongBinaryOperator fn;
260     final long[] a;
261     if ((fn = this.function) == null || (a = this.array) == null)
262     throw new NullPointerException(); // hoist checks
263     int th = threshold, org = origin, fnc = fence, l, h;
264     LongCumulateTask t = this;
265     outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
266     if (h - l > th) {
267     LongCumulateTask lt = t.left, rt = t.right, f;
268     if (lt == null) { // first pass
269     int mid = (l + h) >>> 1;
270     f = rt = t.right =
271     new LongCumulateTask(t, fn, a, org, fnc, th, mid, h);
272     t = lt = t.left =
273     new LongCumulateTask(t, fn, a, org, fnc, th, l, mid);
274     }
275     else { // possibly refork
276     long pin = t.in;
277     lt.in = pin;
278     f = t = null;
279     if (rt != null) {
280     long lout = lt.out;
281     rt.in = (l == org ? lout :
282     fn.applyAsLong(pin, lout));
283     for (int c;;) {
284     if (((c = rt.getPendingCount()) & CUMULATE) != 0)
285     break;
286     if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
287     t = rt;
288     break;
289     }
290     }
291     }
292     for (int c;;) {
293     if (((c = lt.getPendingCount()) & CUMULATE) != 0)
294     break;
295     if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
296     if (t != null)
297     f = t;
298     t = lt;
299     break;
300     }
301     }
302     if (t == null)
303     break;
304     }
305     if (f != null)
306     f.fork();
307     }
308     else {
309     int state; // Transition to sum, cumulate, or both
310     for (int b;;) {
311     if (((b = t.getPendingCount()) & FINISHED) != 0)
312     break outer; // already done
313     state = ((b & CUMULATE) != 0 ? FINISHED :
314     (l > org) ? SUMMED : (SUMMED|FINISHED));
315     if (t.compareAndSetPendingCount(b, b|state))
316     break;
317     }
318    
319     long sum;
320     if (state != SUMMED) {
321     int first;
322     if (l == org) { // leftmost; no in
323     sum = a[org];
324     first = org + 1;
325     }
326     else {
327     sum = t.in;
328     first = l;
329     }
330     for (int i = first; i < h; ++i) // cumulate
331     a[i] = sum = fn.applyAsLong(sum, a[i]);
332     }
333     else if (h < fnc) { // skip rightmost
334     sum = a[l];
335     for (int i = l + 1; i < h; ++i) // sum only
336     sum = fn.applyAsLong(sum, a[i]);
337     }
338     else
339     sum = t.in;
340     t.out = sum;
341     for (LongCumulateTask par;;) { // propagate
342     if ((par = (LongCumulateTask)t.getCompleter()) == null) {
343     if ((state & FINISHED) != 0) // enable join
344     t.quietlyComplete();
345     break outer;
346     }
347     int b = par.getPendingCount();
348     if ((b & state & FINISHED) != 0)
349     t = par; // both done
350     else if ((b & state & SUMMED) != 0) { // both summed
351     int nextState; LongCumulateTask lt, rt;
352     if ((lt = par.left) != null &&
353     (rt = par.right) != null) {
354     long lout = lt.out;
355     par.out = (rt.hi == fnc ? lout :
356     fn.applyAsLong(lout, rt.out));
357     }
358     int refork = (((b & CUMULATE) == 0 &&
359     par.lo == org) ? CUMULATE : 0);
360     if ((nextState = b|state|refork) == b ||
361     par.compareAndSetPendingCount(b, nextState)) {
362     state = SUMMED; // drop finished
363     t = par;
364     if (refork != 0)
365     par.fork();
366     }
367     }
368     else if (par.compareAndSetPendingCount(b, b|state))
369     break outer; // sib not ready
370     }
371     }
372     }
373     }
374     private static final long serialVersionUID = -5074099945909284273L;
375     }
376    
377     static final class DoubleCumulateTask extends CountedCompleter<Void> {
378     final double[] array;
379     final DoubleBinaryOperator function;
380     DoubleCumulateTask left, right;
381     double in, out;
382     final int lo, hi, origin, fence, threshold;
383    
384     /** Root task constructor */
385     public DoubleCumulateTask(DoubleCumulateTask parent,
386     DoubleBinaryOperator function,
387     double[] array, int lo, int hi) {
388     super(parent);
389     this.function = function; this.array = array;
390     this.lo = this.origin = lo; this.hi = this.fence = hi;
391     int p;
392     this.threshold =
393     (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
394     <= MIN_PARTITION ? MIN_PARTITION : p;
395     }
396    
397     /** Subtask constructor */
398     DoubleCumulateTask(DoubleCumulateTask parent, DoubleBinaryOperator function,
399     double[] array, int origin, int fence, int threshold,
400     int lo, int hi) {
401     super(parent);
402     this.function = function; this.array = array;
403     this.origin = origin; this.fence = fence;
404     this.threshold = threshold;
405     this.lo = lo; this.hi = hi;
406     }
407    
408     public final void compute() {
409     final DoubleBinaryOperator fn;
410     final double[] a;
411     if ((fn = this.function) == null || (a = this.array) == null)
412     throw new NullPointerException(); // hoist checks
413     int th = threshold, org = origin, fnc = fence, l, h;
414     DoubleCumulateTask t = this;
415     outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
416     if (h - l > th) {
417     DoubleCumulateTask lt = t.left, rt = t.right, f;
418     if (lt == null) { // first pass
419     int mid = (l + h) >>> 1;
420     f = rt = t.right =
421     new DoubleCumulateTask(t, fn, a, org, fnc, th, mid, h);
422     t = lt = t.left =
423     new DoubleCumulateTask(t, fn, a, org, fnc, th, l, mid);
424     }
425     else { // possibly refork
426     double pin = t.in;
427     lt.in = pin;
428     f = t = null;
429     if (rt != null) {
430     double lout = lt.out;
431     rt.in = (l == org ? lout :
432     fn.applyAsDouble(pin, lout));
433     for (int c;;) {
434     if (((c = rt.getPendingCount()) & CUMULATE) != 0)
435     break;
436     if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
437     t = rt;
438     break;
439     }
440     }
441     }
442     for (int c;;) {
443     if (((c = lt.getPendingCount()) & CUMULATE) != 0)
444     break;
445     if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
446     if (t != null)
447     f = t;
448     t = lt;
449     break;
450     }
451     }
452     if (t == null)
453     break;
454     }
455     if (f != null)
456     f.fork();
457     }
458     else {
459     int state; // Transition to sum, cumulate, or both
460     for (int b;;) {
461     if (((b = t.getPendingCount()) & FINISHED) != 0)
462     break outer; // already done
463     state = ((b & CUMULATE) != 0 ? FINISHED :
464     (l > org) ? SUMMED : (SUMMED|FINISHED));
465     if (t.compareAndSetPendingCount(b, b|state))
466     break;
467     }
468    
469     double sum;
470     if (state != SUMMED) {
471     int first;
472     if (l == org) { // leftmost; no in
473     sum = a[org];
474     first = org + 1;
475     }
476     else {
477     sum = t.in;
478     first = l;
479     }
480     for (int i = first; i < h; ++i) // cumulate
481     a[i] = sum = fn.applyAsDouble(sum, a[i]);
482     }
483     else if (h < fnc) { // skip rightmost
484     sum = a[l];
485     for (int i = l + 1; i < h; ++i) // sum only
486     sum = fn.applyAsDouble(sum, a[i]);
487     }
488     else
489     sum = t.in;
490     t.out = sum;
491     for (DoubleCumulateTask par;;) { // propagate
492     if ((par = (DoubleCumulateTask)t.getCompleter()) == null) {
493     if ((state & FINISHED) != 0) // enable join
494     t.quietlyComplete();
495     break outer;
496     }
497     int b = par.getPendingCount();
498     if ((b & state & FINISHED) != 0)
499     t = par; // both done
500     else if ((b & state & SUMMED) != 0) { // both summed
501     int nextState; DoubleCumulateTask lt, rt;
502     if ((lt = par.left) != null &&
503     (rt = par.right) != null) {
504     double lout = lt.out;
505     par.out = (rt.hi == fnc ? lout :
506     fn.applyAsDouble(lout, rt.out));
507     }
508     int refork = (((b & CUMULATE) == 0 &&
509     par.lo == org) ? CUMULATE : 0);
510     if ((nextState = b|state|refork) == b ||
511     par.compareAndSetPendingCount(b, nextState)) {
512     state = SUMMED; // drop finished
513     t = par;
514     if (refork != 0)
515     par.fork();
516     }
517     }
518     else if (par.compareAndSetPendingCount(b, b|state))
519     break outer; // sib not ready
520     }
521     }
522     }
523     }
524     private static final long serialVersionUID = -586947823794232033L;
525     }
526    
527     static final class IntCumulateTask extends CountedCompleter<Void> {
528     final int[] array;
529     final IntBinaryOperator function;
530     IntCumulateTask left, right;
531     int in, out;
532     final int lo, hi, origin, fence, threshold;
533    
534     /** Root task constructor */
535     public IntCumulateTask(IntCumulateTask parent,
536     IntBinaryOperator function,
537     int[] array, int lo, int hi) {
538     super(parent);
539     this.function = function; this.array = array;
540     this.lo = this.origin = lo; this.hi = this.fence = hi;
541     int p;
542     this.threshold =
543     (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
544     <= MIN_PARTITION ? MIN_PARTITION : p;
545     }
546    
547     /** Subtask constructor */
548     IntCumulateTask(IntCumulateTask parent, IntBinaryOperator function,
549     int[] array, int origin, int fence, int threshold,
550     int lo, int hi) {
551     super(parent);
552     this.function = function; this.array = array;
553     this.origin = origin; this.fence = fence;
554     this.threshold = threshold;
555     this.lo = lo; this.hi = hi;
556     }
557    
558     public final void compute() {
559     final IntBinaryOperator fn;
560     final int[] a;
561     if ((fn = this.function) == null || (a = this.array) == null)
562     throw new NullPointerException(); // hoist checks
563     int th = threshold, org = origin, fnc = fence, l, h;
564     IntCumulateTask t = this;
565     outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
566     if (h - l > th) {
567     IntCumulateTask lt = t.left, rt = t.right, f;
568     if (lt == null) { // first pass
569     int mid = (l + h) >>> 1;
570     f = rt = t.right =
571     new IntCumulateTask(t, fn, a, org, fnc, th, mid, h);
572     t = lt = t.left =
573     new IntCumulateTask(t, fn, a, org, fnc, th, l, mid);
574     }
575     else { // possibly refork
576     int pin = t.in;
577     lt.in = pin;
578     f = t = null;
579     if (rt != null) {
580     int lout = lt.out;
581     rt.in = (l == org ? lout :
582     fn.applyAsInt(pin, lout));
583     for (int c;;) {
584     if (((c = rt.getPendingCount()) & CUMULATE) != 0)
585     break;
586     if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
587     t = rt;
588     break;
589     }
590     }
591     }
592     for (int c;;) {
593     if (((c = lt.getPendingCount()) & CUMULATE) != 0)
594     break;
595     if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
596     if (t != null)
597     f = t;
598     t = lt;
599     break;
600     }
601     }
602     if (t == null)
603     break;
604     }
605     if (f != null)
606     f.fork();
607     }
608     else {
609     int state; // Transition to sum, cumulate, or both
610     for (int b;;) {
611     if (((b = t.getPendingCount()) & FINISHED) != 0)
612     break outer; // already done
613     state = ((b & CUMULATE) != 0 ? FINISHED :
614     (l > org) ? SUMMED : (SUMMED|FINISHED));
615     if (t.compareAndSetPendingCount(b, b|state))
616     break;
617     }
618    
619     int sum;
620     if (state != SUMMED) {
621     int first;
622     if (l == org) { // leftmost; no in
623     sum = a[org];
624     first = org + 1;
625     }
626     else {
627     sum = t.in;
628     first = l;
629     }
630     for (int i = first; i < h; ++i) // cumulate
631     a[i] = sum = fn.applyAsInt(sum, a[i]);
632     }
633     else if (h < fnc) { // skip rightmost
634     sum = a[l];
635     for (int i = l + 1; i < h; ++i) // sum only
636     sum = fn.applyAsInt(sum, a[i]);
637     }
638     else
639     sum = t.in;
640     t.out = sum;
641     for (IntCumulateTask par;;) { // propagate
642     if ((par = (IntCumulateTask)t.getCompleter()) == null) {
643     if ((state & FINISHED) != 0) // enable join
644     t.quietlyComplete();
645     break outer;
646     }
647     int b = par.getPendingCount();
648     if ((b & state & FINISHED) != 0)
649     t = par; // both done
650     else if ((b & state & SUMMED) != 0) { // both summed
651     int nextState; IntCumulateTask lt, rt;
652     if ((lt = par.left) != null &&
653     (rt = par.right) != null) {
654     int lout = lt.out;
655     par.out = (rt.hi == fnc ? lout :
656     fn.applyAsInt(lout, rt.out));
657     }
658     int refork = (((b & CUMULATE) == 0 &&
659     par.lo == org) ? CUMULATE : 0);
660     if ((nextState = b|state|refork) == b ||
661     par.compareAndSetPendingCount(b, nextState)) {
662     state = SUMMED; // drop finished
663     t = par;
664     if (refork != 0)
665     par.fork();
666     }
667     }
668     else if (par.compareAndSetPendingCount(b, b|state))
669     break outer; // sib not ready
670     }
671     }
672     }
673     }
674     private static final long serialVersionUID = 3731755594596840961L;
675     }
676    
677     }