ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/ArrayPrefixUtil.java
Revision: 1.4
Committed: Wed Jan 2 03:26:36 2013 UTC (11 years, 4 months ago) by jsr166
Branch: MAIN
Changes since 1.3: +4 -0 lines
Log Message:
suppress [serial] warnings

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util;
8     import java.util.concurrent.ForkJoinPool;
9     import java.util.concurrent.CountedCompleter;
10     import java.util.function.BinaryOperator;
11     import java.util.function.IntBinaryOperator;
12     import java.util.function.LongBinaryOperator;
13     import java.util.function.DoubleBinaryOperator;
14    
15     /**
16     * ForkJoin tasks to perform Arrays.parallelPrefix operations.
17     *
18     * @author Doug Lea
19     * @since 1.8
20     */
21     class ArrayPrefixUtil {
22     private ArrayPrefixUtil() {}; // non-instantiable
23    
24     /*
25     * Parallel prefix (aka cumulate, scan) task classes
26     * are based loosely on Guy Blelloch's original
27     * algorithm (http://www.cs.cmu.edu/~scandal/alg/scan.html):
28     * Keep dividing by two to threshold segment size, and then:
29     * Pass 1: Create tree of partial sums for each segment
30     * Pass 2: For each segment, cumulate with offset of left sibling
31     *
32     * This version improves performance within FJ framework mainly by
33     * allowing the second pass of ready left-hand sides to proceed
34     * even if some right-hand side first passes are still executing.
35     * It also combines first and second pass for leftmost segment,
36     * and skips the first pass for rightmost segment (whose result is
37     * not needed for second pass). It similarly manages to avoid
38     * requiring that users supply an identity basis for accumulations
39     * by tracking those segments/subtasks for which the first
40     * existing element is used as base.
41     *
42     * Managing this relies on ORing some bits in the pendingCount for
43     * phases/states: CUMULATE, SUMMED, and FINISHED. CUMULATE is the
44     * main phase bit. When false, segments compute only their sum.
45     * When true, they cumulate array elements. CUMULATE is set at
46     * root at beginning of second pass and then propagated down. But
47     * it may also be set earlier for subtrees with lo==0 (the left
48     * spine of tree). SUMMED is a one bit join count. For leafs, it
49     * is set when summed. For internal nodes, it becomes true when
50     * one child is summed. When the second child finishes summing,
51     * we then moves up tree to trigger the cumulate phase. FINISHED
52     * is also a one bit join count. For leafs, it is set when
53     * cumulated. For internal nodes, it becomes true when one child
54     * is cumulated. When the second child finishes cumulating, it
55     * then moves up tree, completing at the root.
56     *
57     * To better exploit locality and reduce overhead, the compute
58     * method loops starting with the current task, moving if possible
59     * to one of its subtasks rather than forking.
60     *
61     * As usual for this sort of utility, there are 4 versions, that
62     * are simple copy/paste/adapt variants of each other. (The
63 jsr166 1.3 * double and int versions differ from long version solely by
64 dl 1.1 * replacing "long" (with case-matching)).
65     */
66    
67     // see above
68     static final int CUMULATE = 1;
69     static final int SUMMED = 2;
70     static final int FINISHED = 4;
71    
72     /** The smallest subtask array partition size to use as threshold */
73     static final int MIN_PARTITION = 16;
74    
75     static final class CumulateTask<T> extends CountedCompleter<Void> {
76 jsr166 1.4 static final long serialVersionUID = 5293554502939613543L;
77 dl 1.1 final T[] array;
78     final BinaryOperator<T> function;
79     CumulateTask<T> left, right;
80     T in, out;
81     final int lo, hi, origin, fence, threshold;
82    
83     /** Root task constructor */
84     public CumulateTask(CumulateTask<T> parent,
85     BinaryOperator<T> function,
86     T[] array, int lo, int hi) {
87     super(parent);
88     this.function = function; this.array = array;
89     this.lo = this.origin = lo; this.hi = this.fence = hi;
90     int p;
91     this.threshold =
92     (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
93     <= MIN_PARTITION ? MIN_PARTITION : p;
94     }
95    
96     /** Subtask constructor */
97     CumulateTask(CumulateTask<T> parent, BinaryOperator<T> function,
98     T[] array, int origin, int fence, int threshold,
99     int lo, int hi) {
100     super(parent);
101     this.function = function; this.array = array;
102     this.origin = origin; this.fence = fence;
103     this.threshold = threshold;
104     this.lo = lo; this.hi = hi;
105     }
106    
107     public final void compute() {
108     final BinaryOperator<T> fn;
109     final T[] a;
110     if ((fn = this.function) == null || (a = this.array) == null)
111     throw new NullPointerException(); // hoist checks
112     int th = threshold, org = origin, fnc = fence, l, h;
113     CumulateTask<T> t = this;
114     outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
115     if (h - l > th) {
116     CumulateTask<T> lt = t.left, rt = t.right, f;
117     if (lt == null) { // first pass
118     int mid = (l + h) >>> 1;
119     f = rt = t.right =
120     new CumulateTask<T>(t, fn, a, org, fnc, th, mid, h);
121     t = lt = t.left =
122     new CumulateTask<T>(t, fn, a, org, fnc, th, l, mid);
123     }
124     else { // possibly refork
125     T pin = t.in;
126     lt.in = pin;
127     f = t = null;
128     if (rt != null) {
129     T lout = lt.out;
130     rt.in = (l == org ? lout :
131     fn.apply(pin, lout));
132     for (int c;;) {
133     if (((c = rt.getPendingCount()) & CUMULATE) != 0)
134     break;
135     if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
136     t = rt;
137     break;
138     }
139     }
140     }
141     for (int c;;) {
142     if (((c = lt.getPendingCount()) & CUMULATE) != 0)
143     break;
144     if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
145     if (t != null)
146     f = t;
147     t = lt;
148     break;
149     }
150     }
151     if (t == null)
152     break;
153     }
154     if (f != null)
155     f.fork();
156     }
157     else {
158     int state; // Transition to sum, cumulate, or both
159     for (int b;;) {
160     if (((b = t.getPendingCount()) & FINISHED) != 0)
161     break outer; // already done
162 jsr166 1.2 state = ((b & CUMULATE) != 0 ? FINISHED :
163 dl 1.1 (l > org) ? SUMMED : (SUMMED|FINISHED));
164     if (t.compareAndSetPendingCount(b, b|state))
165     break;
166     }
167    
168     T sum;
169     if (state != SUMMED) {
170     int first;
171     if (l == org) { // leftmost; no in
172     sum = a[org];
173     first = org + 1;
174     }
175     else {
176     sum = t.in;
177     first = l;
178     }
179     for (int i = first; i < h; ++i) // cumulate
180     a[i] = sum = fn.apply(sum, a[i]);
181     }
182     else if (h < fnc) { // skip rightmost
183     sum = a[l];
184     for (int i = l + 1; i < h; ++i) // sum only
185     sum = fn.apply(sum, a[i]);
186     }
187     else
188     sum = t.in;
189     t.out = sum;
190     for (CumulateTask<T> par;;) { // propagate
191     if ((par = (CumulateTask<T>)t.getCompleter()) == null) {
192     if ((state & FINISHED) != 0) // enable join
193     t.quietlyComplete();
194     break outer;
195     }
196     int b = par.getPendingCount();
197     if ((b & state & FINISHED) != 0)
198     t = par; // both done
199     else if ((b & state & SUMMED) != 0) { // both summed
200     int nextState; CumulateTask<T> lt, rt;
201     if ((lt = par.left) != null &&
202     (rt = par.right) != null) {
203     T lout = lt.out;
204     par.out = (rt.hi == fnc ? lout :
205     fn.apply(lout, rt.out));
206     }
207     int refork = (((b & CUMULATE) == 0 &&
208     par.lo == org) ? CUMULATE : 0);
209     if ((nextState = b|state|refork) == b ||
210     par.compareAndSetPendingCount(b, nextState)) {
211     state = SUMMED; // drop finished
212     t = par;
213     if (refork != 0)
214     par.fork();
215     }
216     }
217     else if (par.compareAndSetPendingCount(b, b|state))
218     break outer; // sib not ready
219     }
220     }
221     }
222     }
223     }
224    
225     static final class LongCumulateTask extends CountedCompleter<Void> {
226 jsr166 1.4 static final long serialVersionUID = -5074099945909284273L;
227 dl 1.1 final long[] array;
228     final LongBinaryOperator function;
229     LongCumulateTask left, right;
230     long in, out;
231     final int lo, hi, origin, fence, threshold;
232    
233     /** Root task constructor */
234     public LongCumulateTask(LongCumulateTask parent,
235     LongBinaryOperator function,
236     long[] array, int lo, int hi) {
237     super(parent);
238     this.function = function; this.array = array;
239     this.lo = this.origin = lo; this.hi = this.fence = hi;
240     int p;
241     this.threshold =
242     (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
243     <= MIN_PARTITION ? MIN_PARTITION : p;
244     }
245    
246     /** Subtask constructor */
247     LongCumulateTask(LongCumulateTask parent, LongBinaryOperator function,
248     long[] array, int origin, int fence, int threshold,
249     int lo, int hi) {
250     super(parent);
251     this.function = function; this.array = array;
252     this.origin = origin; this.fence = fence;
253     this.threshold = threshold;
254     this.lo = lo; this.hi = hi;
255     }
256    
257     public final void compute() {
258     final LongBinaryOperator fn;
259     final long[] a;
260     if ((fn = this.function) == null || (a = this.array) == null)
261     throw new NullPointerException(); // hoist checks
262     int th = threshold, org = origin, fnc = fence, l, h;
263     LongCumulateTask t = this;
264     outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
265     if (h - l > th) {
266     LongCumulateTask lt = t.left, rt = t.right, f;
267     if (lt == null) { // first pass
268     int mid = (l + h) >>> 1;
269     f = rt = t.right =
270     new LongCumulateTask(t, fn, a, org, fnc, th, mid, h);
271     t = lt = t.left =
272     new LongCumulateTask(t, fn, a, org, fnc, th, l, mid);
273     }
274     else { // possibly refork
275     long pin = t.in;
276     lt.in = pin;
277     f = t = null;
278     if (rt != null) {
279     long lout = lt.out;
280     rt.in = (l == org ? lout :
281     fn.applyAsLong(pin, lout));
282     for (int c;;) {
283     if (((c = rt.getPendingCount()) & CUMULATE) != 0)
284     break;
285     if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
286     t = rt;
287     break;
288     }
289     }
290     }
291     for (int c;;) {
292     if (((c = lt.getPendingCount()) & CUMULATE) != 0)
293     break;
294     if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
295     if (t != null)
296     f = t;
297     t = lt;
298     break;
299     }
300     }
301     if (t == null)
302     break;
303     }
304     if (f != null)
305     f.fork();
306     }
307     else {
308     int state; // Transition to sum, cumulate, or both
309     for (int b;;) {
310     if (((b = t.getPendingCount()) & FINISHED) != 0)
311     break outer; // already done
312 jsr166 1.2 state = ((b & CUMULATE) != 0 ? FINISHED :
313 dl 1.1 (l > org) ? SUMMED : (SUMMED|FINISHED));
314     if (t.compareAndSetPendingCount(b, b|state))
315     break;
316     }
317    
318     long sum;
319     if (state != SUMMED) {
320     int first;
321     if (l == org) { // leftmost; no in
322     sum = a[org];
323     first = org + 1;
324     }
325     else {
326     sum = t.in;
327     first = l;
328     }
329     for (int i = first; i < h; ++i) // cumulate
330     a[i] = sum = fn.applyAsLong(sum, a[i]);
331     }
332     else if (h < fnc) { // skip rightmost
333     sum = a[l];
334     for (int i = l + 1; i < h; ++i) // sum only
335     sum = fn.applyAsLong(sum, a[i]);
336     }
337     else
338     sum = t.in;
339     t.out = sum;
340     for (LongCumulateTask par;;) { // propagate
341     if ((par = (LongCumulateTask)t.getCompleter()) == null) {
342     if ((state & FINISHED) != 0) // enable join
343     t.quietlyComplete();
344     break outer;
345     }
346     int b = par.getPendingCount();
347     if ((b & state & FINISHED) != 0)
348     t = par; // both done
349     else if ((b & state & SUMMED) != 0) { // both summed
350     int nextState; LongCumulateTask lt, rt;
351     if ((lt = par.left) != null &&
352     (rt = par.right) != null) {
353     long lout = lt.out;
354     par.out = (rt.hi == fnc ? lout :
355     fn.applyAsLong(lout, rt.out));
356     }
357     int refork = (((b & CUMULATE) == 0 &&
358     par.lo == org) ? CUMULATE : 0);
359     if ((nextState = b|state|refork) == b ||
360     par.compareAndSetPendingCount(b, nextState)) {
361     state = SUMMED; // drop finished
362     t = par;
363     if (refork != 0)
364     par.fork();
365     }
366     }
367     else if (par.compareAndSetPendingCount(b, b|state))
368     break outer; // sib not ready
369     }
370     }
371     }
372     }
373     }
374    
375     static final class DoubleCumulateTask extends CountedCompleter<Void> {
376 jsr166 1.4 static final long serialVersionUID = -586947823794232033L;
377 dl 1.1 final double[] array;
378     final DoubleBinaryOperator function;
379     DoubleCumulateTask left, right;
380     double in, out;
381     final int lo, hi, origin, fence, threshold;
382    
383     /** Root task constructor */
384     public DoubleCumulateTask(DoubleCumulateTask parent,
385     DoubleBinaryOperator function,
386     double[] array, int lo, int hi) {
387     super(parent);
388     this.function = function; this.array = array;
389     this.lo = this.origin = lo; this.hi = this.fence = hi;
390     int p;
391     this.threshold =
392     (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
393     <= MIN_PARTITION ? MIN_PARTITION : p;
394     }
395    
396     /** Subtask constructor */
397     DoubleCumulateTask(DoubleCumulateTask parent, DoubleBinaryOperator function,
398     double[] array, int origin, int fence, int threshold,
399     int lo, int hi) {
400     super(parent);
401     this.function = function; this.array = array;
402     this.origin = origin; this.fence = fence;
403     this.threshold = threshold;
404     this.lo = lo; this.hi = hi;
405     }
406    
407     public final void compute() {
408     final DoubleBinaryOperator fn;
409     final double[] a;
410     if ((fn = this.function) == null || (a = this.array) == null)
411     throw new NullPointerException(); // hoist checks
412     int th = threshold, org = origin, fnc = fence, l, h;
413     DoubleCumulateTask t = this;
414     outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
415     if (h - l > th) {
416     DoubleCumulateTask lt = t.left, rt = t.right, f;
417     if (lt == null) { // first pass
418     int mid = (l + h) >>> 1;
419     f = rt = t.right =
420     new DoubleCumulateTask(t, fn, a, org, fnc, th, mid, h);
421     t = lt = t.left =
422     new DoubleCumulateTask(t, fn, a, org, fnc, th, l, mid);
423     }
424     else { // possibly refork
425     double pin = t.in;
426     lt.in = pin;
427     f = t = null;
428     if (rt != null) {
429     double lout = lt.out;
430     rt.in = (l == org ? lout :
431     fn.applyAsDouble(pin, lout));
432     for (int c;;) {
433     if (((c = rt.getPendingCount()) & CUMULATE) != 0)
434     break;
435     if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
436     t = rt;
437     break;
438     }
439     }
440     }
441     for (int c;;) {
442     if (((c = lt.getPendingCount()) & CUMULATE) != 0)
443     break;
444     if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
445     if (t != null)
446     f = t;
447     t = lt;
448     break;
449     }
450     }
451     if (t == null)
452     break;
453     }
454     if (f != null)
455     f.fork();
456     }
457     else {
458     int state; // Transition to sum, cumulate, or both
459     for (int b;;) {
460     if (((b = t.getPendingCount()) & FINISHED) != 0)
461     break outer; // already done
462 jsr166 1.2 state = ((b & CUMULATE) != 0 ? FINISHED :
463 dl 1.1 (l > org) ? SUMMED : (SUMMED|FINISHED));
464     if (t.compareAndSetPendingCount(b, b|state))
465     break;
466     }
467    
468     double sum;
469     if (state != SUMMED) {
470     int first;
471     if (l == org) { // leftmost; no in
472     sum = a[org];
473     first = org + 1;
474     }
475     else {
476     sum = t.in;
477     first = l;
478     }
479     for (int i = first; i < h; ++i) // cumulate
480     a[i] = sum = fn.applyAsDouble(sum, a[i]);
481     }
482     else if (h < fnc) { // skip rightmost
483     sum = a[l];
484     for (int i = l + 1; i < h; ++i) // sum only
485     sum = fn.applyAsDouble(sum, a[i]);
486     }
487     else
488     sum = t.in;
489     t.out = sum;
490     for (DoubleCumulateTask par;;) { // propagate
491     if ((par = (DoubleCumulateTask)t.getCompleter()) == null) {
492     if ((state & FINISHED) != 0) // enable join
493     t.quietlyComplete();
494     break outer;
495     }
496     int b = par.getPendingCount();
497     if ((b & state & FINISHED) != 0)
498     t = par; // both done
499     else if ((b & state & SUMMED) != 0) { // both summed
500     int nextState; DoubleCumulateTask lt, rt;
501     if ((lt = par.left) != null &&
502     (rt = par.right) != null) {
503     double lout = lt.out;
504     par.out = (rt.hi == fnc ? lout :
505     fn.applyAsDouble(lout, rt.out));
506     }
507     int refork = (((b & CUMULATE) == 0 &&
508     par.lo == org) ? CUMULATE : 0);
509     if ((nextState = b|state|refork) == b ||
510     par.compareAndSetPendingCount(b, nextState)) {
511     state = SUMMED; // drop finished
512     t = par;
513     if (refork != 0)
514     par.fork();
515     }
516     }
517     else if (par.compareAndSetPendingCount(b, b|state))
518     break outer; // sib not ready
519     }
520     }
521     }
522     }
523     }
524    
525     static final class IntCumulateTask extends CountedCompleter<Void> {
526 jsr166 1.4 static final long serialVersionUID = 3731755594596840961L;
527 dl 1.1 final int[] array;
528     final IntBinaryOperator function;
529     IntCumulateTask left, right;
530     int in, out;
531     final int lo, hi, origin, fence, threshold;
532    
533     /** Root task constructor */
534     public IntCumulateTask(IntCumulateTask parent,
535     IntBinaryOperator function,
536     int[] array, int lo, int hi) {
537     super(parent);
538     this.function = function; this.array = array;
539     this.lo = this.origin = lo; this.hi = this.fence = hi;
540     int p;
541     this.threshold =
542     (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
543     <= MIN_PARTITION ? MIN_PARTITION : p;
544     }
545    
546     /** Subtask constructor */
547     IntCumulateTask(IntCumulateTask parent, IntBinaryOperator function,
548     int[] array, int origin, int fence, int threshold,
549     int lo, int hi) {
550     super(parent);
551     this.function = function; this.array = array;
552     this.origin = origin; this.fence = fence;
553     this.threshold = threshold;
554     this.lo = lo; this.hi = hi;
555     }
556    
557     public final void compute() {
558     final IntBinaryOperator fn;
559     final int[] a;
560     if ((fn = this.function) == null || (a = this.array) == null)
561     throw new NullPointerException(); // hoist checks
562     int th = threshold, org = origin, fnc = fence, l, h;
563     IntCumulateTask t = this;
564     outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
565     if (h - l > th) {
566     IntCumulateTask lt = t.left, rt = t.right, f;
567     if (lt == null) { // first pass
568     int mid = (l + h) >>> 1;
569     f = rt = t.right =
570     new IntCumulateTask(t, fn, a, org, fnc, th, mid, h);
571     t = lt = t.left =
572     new IntCumulateTask(t, fn, a, org, fnc, th, l, mid);
573     }
574     else { // possibly refork
575     int pin = t.in;
576     lt.in = pin;
577     f = t = null;
578     if (rt != null) {
579     int lout = lt.out;
580     rt.in = (l == org ? lout :
581     fn.applyAsInt(pin, lout));
582     for (int c;;) {
583     if (((c = rt.getPendingCount()) & CUMULATE) != 0)
584     break;
585     if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
586     t = rt;
587     break;
588     }
589     }
590     }
591     for (int c;;) {
592     if (((c = lt.getPendingCount()) & CUMULATE) != 0)
593     break;
594     if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
595     if (t != null)
596     f = t;
597     t = lt;
598     break;
599     }
600     }
601     if (t == null)
602     break;
603     }
604     if (f != null)
605     f.fork();
606     }
607     else {
608     int state; // Transition to sum, cumulate, or both
609     for (int b;;) {
610     if (((b = t.getPendingCount()) & FINISHED) != 0)
611     break outer; // already done
612 jsr166 1.2 state = ((b & CUMULATE) != 0 ? FINISHED :
613 dl 1.1 (l > org) ? SUMMED : (SUMMED|FINISHED));
614     if (t.compareAndSetPendingCount(b, b|state))
615     break;
616     }
617    
618     int sum;
619     if (state != SUMMED) {
620     int first;
621     if (l == org) { // leftmost; no in
622     sum = a[org];
623     first = org + 1;
624     }
625     else {
626     sum = t.in;
627     first = l;
628     }
629     for (int i = first; i < h; ++i) // cumulate
630     a[i] = sum = fn.applyAsInt(sum, a[i]);
631     }
632     else if (h < fnc) { // skip rightmost
633     sum = a[l];
634     for (int i = l + 1; i < h; ++i) // sum only
635     sum = fn.applyAsInt(sum, a[i]);
636     }
637     else
638     sum = t.in;
639     t.out = sum;
640     for (IntCumulateTask par;;) { // propagate
641     if ((par = (IntCumulateTask)t.getCompleter()) == null) {
642     if ((state & FINISHED) != 0) // enable join
643     t.quietlyComplete();
644     break outer;
645     }
646     int b = par.getPendingCount();
647     if ((b & state & FINISHED) != 0)
648     t = par; // both done
649     else if ((b & state & SUMMED) != 0) { // both summed
650     int nextState; IntCumulateTask lt, rt;
651     if ((lt = par.left) != null &&
652     (rt = par.right) != null) {
653     int lout = lt.out;
654     par.out = (rt.hi == fnc ? lout :
655     fn.applyAsInt(lout, rt.out));
656     }
657     int refork = (((b & CUMULATE) == 0 &&
658     par.lo == org) ? CUMULATE : 0);
659     if ((nextState = b|state|refork) == b ||
660     par.compareAndSetPendingCount(b, nextState)) {
661     state = SUMMED; // drop finished
662     t = par;
663     if (refork != 0)
664     par.fork();
665     }
666     }
667     else if (par.compareAndSetPendingCount(b, b|state))
668     break outer; // sib not ready
669     }
670     }
671     }
672     }
673     }
674    
675    
676 jsr166 1.2 }