ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/ArrayPrefixUtil.java
Revision: 1.4
Committed: Wed Jan 2 03:26:36 2013 UTC (11 years, 4 months ago) by jsr166
Branch: MAIN
Changes since 1.3: +4 -0 lines
Log Message:
suppress [serial] warnings

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util;
8 import java.util.concurrent.ForkJoinPool;
9 import java.util.concurrent.CountedCompleter;
10 import java.util.function.BinaryOperator;
11 import java.util.function.IntBinaryOperator;
12 import java.util.function.LongBinaryOperator;
13 import java.util.function.DoubleBinaryOperator;
14
15 /**
16 * ForkJoin tasks to perform Arrays.parallelPrefix operations.
17 *
18 * @author Doug Lea
19 * @since 1.8
20 */
21 class ArrayPrefixUtil {
22 private ArrayPrefixUtil() {}; // non-instantiable
23
24 /*
25 * Parallel prefix (aka cumulate, scan) task classes
26 * are based loosely on Guy Blelloch's original
27 * algorithm (http://www.cs.cmu.edu/~scandal/alg/scan.html):
28 * Keep dividing by two to threshold segment size, and then:
29 * Pass 1: Create tree of partial sums for each segment
30 * Pass 2: For each segment, cumulate with offset of left sibling
31 *
32 * This version improves performance within FJ framework mainly by
33 * allowing the second pass of ready left-hand sides to proceed
34 * even if some right-hand side first passes are still executing.
35 * It also combines first and second pass for leftmost segment,
36 * and skips the first pass for rightmost segment (whose result is
37 * not needed for second pass). It similarly manages to avoid
38 * requiring that users supply an identity basis for accumulations
39 * by tracking those segments/subtasks for which the first
40 * existing element is used as base.
41 *
42 * Managing this relies on ORing some bits in the pendingCount for
43 * phases/states: CUMULATE, SUMMED, and FINISHED. CUMULATE is the
44 * main phase bit. When false, segments compute only their sum.
45 * When true, they cumulate array elements. CUMULATE is set at
46 * root at beginning of second pass and then propagated down. But
47 * it may also be set earlier for subtrees with lo==0 (the left
48 * spine of tree). SUMMED is a one bit join count. For leafs, it
49 * is set when summed. For internal nodes, it becomes true when
50 * one child is summed. When the second child finishes summing,
51 * we then moves up tree to trigger the cumulate phase. FINISHED
52 * is also a one bit join count. For leafs, it is set when
53 * cumulated. For internal nodes, it becomes true when one child
54 * is cumulated. When the second child finishes cumulating, it
55 * then moves up tree, completing at the root.
56 *
57 * To better exploit locality and reduce overhead, the compute
58 * method loops starting with the current task, moving if possible
59 * to one of its subtasks rather than forking.
60 *
61 * As usual for this sort of utility, there are 4 versions, that
62 * are simple copy/paste/adapt variants of each other. (The
63 * double and int versions differ from long version solely by
64 * replacing "long" (with case-matching)).
65 */
66
67 // see above
68 static final int CUMULATE = 1;
69 static final int SUMMED = 2;
70 static final int FINISHED = 4;
71
72 /** The smallest subtask array partition size to use as threshold */
73 static final int MIN_PARTITION = 16;
74
75 static final class CumulateTask<T> extends CountedCompleter<Void> {
76 static final long serialVersionUID = 5293554502939613543L;
77 final T[] array;
78 final BinaryOperator<T> function;
79 CumulateTask<T> left, right;
80 T in, out;
81 final int lo, hi, origin, fence, threshold;
82
83 /** Root task constructor */
84 public CumulateTask(CumulateTask<T> parent,
85 BinaryOperator<T> function,
86 T[] array, int lo, int hi) {
87 super(parent);
88 this.function = function; this.array = array;
89 this.lo = this.origin = lo; this.hi = this.fence = hi;
90 int p;
91 this.threshold =
92 (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
93 <= MIN_PARTITION ? MIN_PARTITION : p;
94 }
95
96 /** Subtask constructor */
97 CumulateTask(CumulateTask<T> parent, BinaryOperator<T> function,
98 T[] array, int origin, int fence, int threshold,
99 int lo, int hi) {
100 super(parent);
101 this.function = function; this.array = array;
102 this.origin = origin; this.fence = fence;
103 this.threshold = threshold;
104 this.lo = lo; this.hi = hi;
105 }
106
107 public final void compute() {
108 final BinaryOperator<T> fn;
109 final T[] a;
110 if ((fn = this.function) == null || (a = this.array) == null)
111 throw new NullPointerException(); // hoist checks
112 int th = threshold, org = origin, fnc = fence, l, h;
113 CumulateTask<T> t = this;
114 outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
115 if (h - l > th) {
116 CumulateTask<T> lt = t.left, rt = t.right, f;
117 if (lt == null) { // first pass
118 int mid = (l + h) >>> 1;
119 f = rt = t.right =
120 new CumulateTask<T>(t, fn, a, org, fnc, th, mid, h);
121 t = lt = t.left =
122 new CumulateTask<T>(t, fn, a, org, fnc, th, l, mid);
123 }
124 else { // possibly refork
125 T pin = t.in;
126 lt.in = pin;
127 f = t = null;
128 if (rt != null) {
129 T lout = lt.out;
130 rt.in = (l == org ? lout :
131 fn.apply(pin, lout));
132 for (int c;;) {
133 if (((c = rt.getPendingCount()) & CUMULATE) != 0)
134 break;
135 if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
136 t = rt;
137 break;
138 }
139 }
140 }
141 for (int c;;) {
142 if (((c = lt.getPendingCount()) & CUMULATE) != 0)
143 break;
144 if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
145 if (t != null)
146 f = t;
147 t = lt;
148 break;
149 }
150 }
151 if (t == null)
152 break;
153 }
154 if (f != null)
155 f.fork();
156 }
157 else {
158 int state; // Transition to sum, cumulate, or both
159 for (int b;;) {
160 if (((b = t.getPendingCount()) & FINISHED) != 0)
161 break outer; // already done
162 state = ((b & CUMULATE) != 0 ? FINISHED :
163 (l > org) ? SUMMED : (SUMMED|FINISHED));
164 if (t.compareAndSetPendingCount(b, b|state))
165 break;
166 }
167
168 T sum;
169 if (state != SUMMED) {
170 int first;
171 if (l == org) { // leftmost; no in
172 sum = a[org];
173 first = org + 1;
174 }
175 else {
176 sum = t.in;
177 first = l;
178 }
179 for (int i = first; i < h; ++i) // cumulate
180 a[i] = sum = fn.apply(sum, a[i]);
181 }
182 else if (h < fnc) { // skip rightmost
183 sum = a[l];
184 for (int i = l + 1; i < h; ++i) // sum only
185 sum = fn.apply(sum, a[i]);
186 }
187 else
188 sum = t.in;
189 t.out = sum;
190 for (CumulateTask<T> par;;) { // propagate
191 if ((par = (CumulateTask<T>)t.getCompleter()) == null) {
192 if ((state & FINISHED) != 0) // enable join
193 t.quietlyComplete();
194 break outer;
195 }
196 int b = par.getPendingCount();
197 if ((b & state & FINISHED) != 0)
198 t = par; // both done
199 else if ((b & state & SUMMED) != 0) { // both summed
200 int nextState; CumulateTask<T> lt, rt;
201 if ((lt = par.left) != null &&
202 (rt = par.right) != null) {
203 T lout = lt.out;
204 par.out = (rt.hi == fnc ? lout :
205 fn.apply(lout, rt.out));
206 }
207 int refork = (((b & CUMULATE) == 0 &&
208 par.lo == org) ? CUMULATE : 0);
209 if ((nextState = b|state|refork) == b ||
210 par.compareAndSetPendingCount(b, nextState)) {
211 state = SUMMED; // drop finished
212 t = par;
213 if (refork != 0)
214 par.fork();
215 }
216 }
217 else if (par.compareAndSetPendingCount(b, b|state))
218 break outer; // sib not ready
219 }
220 }
221 }
222 }
223 }
224
225 static final class LongCumulateTask extends CountedCompleter<Void> {
226 static final long serialVersionUID = -5074099945909284273L;
227 final long[] array;
228 final LongBinaryOperator function;
229 LongCumulateTask left, right;
230 long in, out;
231 final int lo, hi, origin, fence, threshold;
232
233 /** Root task constructor */
234 public LongCumulateTask(LongCumulateTask parent,
235 LongBinaryOperator function,
236 long[] array, int lo, int hi) {
237 super(parent);
238 this.function = function; this.array = array;
239 this.lo = this.origin = lo; this.hi = this.fence = hi;
240 int p;
241 this.threshold =
242 (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
243 <= MIN_PARTITION ? MIN_PARTITION : p;
244 }
245
246 /** Subtask constructor */
247 LongCumulateTask(LongCumulateTask parent, LongBinaryOperator function,
248 long[] array, int origin, int fence, int threshold,
249 int lo, int hi) {
250 super(parent);
251 this.function = function; this.array = array;
252 this.origin = origin; this.fence = fence;
253 this.threshold = threshold;
254 this.lo = lo; this.hi = hi;
255 }
256
257 public final void compute() {
258 final LongBinaryOperator fn;
259 final long[] a;
260 if ((fn = this.function) == null || (a = this.array) == null)
261 throw new NullPointerException(); // hoist checks
262 int th = threshold, org = origin, fnc = fence, l, h;
263 LongCumulateTask t = this;
264 outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
265 if (h - l > th) {
266 LongCumulateTask lt = t.left, rt = t.right, f;
267 if (lt == null) { // first pass
268 int mid = (l + h) >>> 1;
269 f = rt = t.right =
270 new LongCumulateTask(t, fn, a, org, fnc, th, mid, h);
271 t = lt = t.left =
272 new LongCumulateTask(t, fn, a, org, fnc, th, l, mid);
273 }
274 else { // possibly refork
275 long pin = t.in;
276 lt.in = pin;
277 f = t = null;
278 if (rt != null) {
279 long lout = lt.out;
280 rt.in = (l == org ? lout :
281 fn.applyAsLong(pin, lout));
282 for (int c;;) {
283 if (((c = rt.getPendingCount()) & CUMULATE) != 0)
284 break;
285 if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
286 t = rt;
287 break;
288 }
289 }
290 }
291 for (int c;;) {
292 if (((c = lt.getPendingCount()) & CUMULATE) != 0)
293 break;
294 if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
295 if (t != null)
296 f = t;
297 t = lt;
298 break;
299 }
300 }
301 if (t == null)
302 break;
303 }
304 if (f != null)
305 f.fork();
306 }
307 else {
308 int state; // Transition to sum, cumulate, or both
309 for (int b;;) {
310 if (((b = t.getPendingCount()) & FINISHED) != 0)
311 break outer; // already done
312 state = ((b & CUMULATE) != 0 ? FINISHED :
313 (l > org) ? SUMMED : (SUMMED|FINISHED));
314 if (t.compareAndSetPendingCount(b, b|state))
315 break;
316 }
317
318 long sum;
319 if (state != SUMMED) {
320 int first;
321 if (l == org) { // leftmost; no in
322 sum = a[org];
323 first = org + 1;
324 }
325 else {
326 sum = t.in;
327 first = l;
328 }
329 for (int i = first; i < h; ++i) // cumulate
330 a[i] = sum = fn.applyAsLong(sum, a[i]);
331 }
332 else if (h < fnc) { // skip rightmost
333 sum = a[l];
334 for (int i = l + 1; i < h; ++i) // sum only
335 sum = fn.applyAsLong(sum, a[i]);
336 }
337 else
338 sum = t.in;
339 t.out = sum;
340 for (LongCumulateTask par;;) { // propagate
341 if ((par = (LongCumulateTask)t.getCompleter()) == null) {
342 if ((state & FINISHED) != 0) // enable join
343 t.quietlyComplete();
344 break outer;
345 }
346 int b = par.getPendingCount();
347 if ((b & state & FINISHED) != 0)
348 t = par; // both done
349 else if ((b & state & SUMMED) != 0) { // both summed
350 int nextState; LongCumulateTask lt, rt;
351 if ((lt = par.left) != null &&
352 (rt = par.right) != null) {
353 long lout = lt.out;
354 par.out = (rt.hi == fnc ? lout :
355 fn.applyAsLong(lout, rt.out));
356 }
357 int refork = (((b & CUMULATE) == 0 &&
358 par.lo == org) ? CUMULATE : 0);
359 if ((nextState = b|state|refork) == b ||
360 par.compareAndSetPendingCount(b, nextState)) {
361 state = SUMMED; // drop finished
362 t = par;
363 if (refork != 0)
364 par.fork();
365 }
366 }
367 else if (par.compareAndSetPendingCount(b, b|state))
368 break outer; // sib not ready
369 }
370 }
371 }
372 }
373 }
374
375 static final class DoubleCumulateTask extends CountedCompleter<Void> {
376 static final long serialVersionUID = -586947823794232033L;
377 final double[] array;
378 final DoubleBinaryOperator function;
379 DoubleCumulateTask left, right;
380 double in, out;
381 final int lo, hi, origin, fence, threshold;
382
383 /** Root task constructor */
384 public DoubleCumulateTask(DoubleCumulateTask parent,
385 DoubleBinaryOperator function,
386 double[] array, int lo, int hi) {
387 super(parent);
388 this.function = function; this.array = array;
389 this.lo = this.origin = lo; this.hi = this.fence = hi;
390 int p;
391 this.threshold =
392 (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
393 <= MIN_PARTITION ? MIN_PARTITION : p;
394 }
395
396 /** Subtask constructor */
397 DoubleCumulateTask(DoubleCumulateTask parent, DoubleBinaryOperator function,
398 double[] array, int origin, int fence, int threshold,
399 int lo, int hi) {
400 super(parent);
401 this.function = function; this.array = array;
402 this.origin = origin; this.fence = fence;
403 this.threshold = threshold;
404 this.lo = lo; this.hi = hi;
405 }
406
407 public final void compute() {
408 final DoubleBinaryOperator fn;
409 final double[] a;
410 if ((fn = this.function) == null || (a = this.array) == null)
411 throw new NullPointerException(); // hoist checks
412 int th = threshold, org = origin, fnc = fence, l, h;
413 DoubleCumulateTask t = this;
414 outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
415 if (h - l > th) {
416 DoubleCumulateTask lt = t.left, rt = t.right, f;
417 if (lt == null) { // first pass
418 int mid = (l + h) >>> 1;
419 f = rt = t.right =
420 new DoubleCumulateTask(t, fn, a, org, fnc, th, mid, h);
421 t = lt = t.left =
422 new DoubleCumulateTask(t, fn, a, org, fnc, th, l, mid);
423 }
424 else { // possibly refork
425 double pin = t.in;
426 lt.in = pin;
427 f = t = null;
428 if (rt != null) {
429 double lout = lt.out;
430 rt.in = (l == org ? lout :
431 fn.applyAsDouble(pin, lout));
432 for (int c;;) {
433 if (((c = rt.getPendingCount()) & CUMULATE) != 0)
434 break;
435 if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
436 t = rt;
437 break;
438 }
439 }
440 }
441 for (int c;;) {
442 if (((c = lt.getPendingCount()) & CUMULATE) != 0)
443 break;
444 if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
445 if (t != null)
446 f = t;
447 t = lt;
448 break;
449 }
450 }
451 if (t == null)
452 break;
453 }
454 if (f != null)
455 f.fork();
456 }
457 else {
458 int state; // Transition to sum, cumulate, or both
459 for (int b;;) {
460 if (((b = t.getPendingCount()) & FINISHED) != 0)
461 break outer; // already done
462 state = ((b & CUMULATE) != 0 ? FINISHED :
463 (l > org) ? SUMMED : (SUMMED|FINISHED));
464 if (t.compareAndSetPendingCount(b, b|state))
465 break;
466 }
467
468 double sum;
469 if (state != SUMMED) {
470 int first;
471 if (l == org) { // leftmost; no in
472 sum = a[org];
473 first = org + 1;
474 }
475 else {
476 sum = t.in;
477 first = l;
478 }
479 for (int i = first; i < h; ++i) // cumulate
480 a[i] = sum = fn.applyAsDouble(sum, a[i]);
481 }
482 else if (h < fnc) { // skip rightmost
483 sum = a[l];
484 for (int i = l + 1; i < h; ++i) // sum only
485 sum = fn.applyAsDouble(sum, a[i]);
486 }
487 else
488 sum = t.in;
489 t.out = sum;
490 for (DoubleCumulateTask par;;) { // propagate
491 if ((par = (DoubleCumulateTask)t.getCompleter()) == null) {
492 if ((state & FINISHED) != 0) // enable join
493 t.quietlyComplete();
494 break outer;
495 }
496 int b = par.getPendingCount();
497 if ((b & state & FINISHED) != 0)
498 t = par; // both done
499 else if ((b & state & SUMMED) != 0) { // both summed
500 int nextState; DoubleCumulateTask lt, rt;
501 if ((lt = par.left) != null &&
502 (rt = par.right) != null) {
503 double lout = lt.out;
504 par.out = (rt.hi == fnc ? lout :
505 fn.applyAsDouble(lout, rt.out));
506 }
507 int refork = (((b & CUMULATE) == 0 &&
508 par.lo == org) ? CUMULATE : 0);
509 if ((nextState = b|state|refork) == b ||
510 par.compareAndSetPendingCount(b, nextState)) {
511 state = SUMMED; // drop finished
512 t = par;
513 if (refork != 0)
514 par.fork();
515 }
516 }
517 else if (par.compareAndSetPendingCount(b, b|state))
518 break outer; // sib not ready
519 }
520 }
521 }
522 }
523 }
524
525 static final class IntCumulateTask extends CountedCompleter<Void> {
526 static final long serialVersionUID = 3731755594596840961L;
527 final int[] array;
528 final IntBinaryOperator function;
529 IntCumulateTask left, right;
530 int in, out;
531 final int lo, hi, origin, fence, threshold;
532
533 /** Root task constructor */
534 public IntCumulateTask(IntCumulateTask parent,
535 IntBinaryOperator function,
536 int[] array, int lo, int hi) {
537 super(parent);
538 this.function = function; this.array = array;
539 this.lo = this.origin = lo; this.hi = this.fence = hi;
540 int p;
541 this.threshold =
542 (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
543 <= MIN_PARTITION ? MIN_PARTITION : p;
544 }
545
546 /** Subtask constructor */
547 IntCumulateTask(IntCumulateTask parent, IntBinaryOperator function,
548 int[] array, int origin, int fence, int threshold,
549 int lo, int hi) {
550 super(parent);
551 this.function = function; this.array = array;
552 this.origin = origin; this.fence = fence;
553 this.threshold = threshold;
554 this.lo = lo; this.hi = hi;
555 }
556
557 public final void compute() {
558 final IntBinaryOperator fn;
559 final int[] a;
560 if ((fn = this.function) == null || (a = this.array) == null)
561 throw new NullPointerException(); // hoist checks
562 int th = threshold, org = origin, fnc = fence, l, h;
563 IntCumulateTask t = this;
564 outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
565 if (h - l > th) {
566 IntCumulateTask lt = t.left, rt = t.right, f;
567 if (lt == null) { // first pass
568 int mid = (l + h) >>> 1;
569 f = rt = t.right =
570 new IntCumulateTask(t, fn, a, org, fnc, th, mid, h);
571 t = lt = t.left =
572 new IntCumulateTask(t, fn, a, org, fnc, th, l, mid);
573 }
574 else { // possibly refork
575 int pin = t.in;
576 lt.in = pin;
577 f = t = null;
578 if (rt != null) {
579 int lout = lt.out;
580 rt.in = (l == org ? lout :
581 fn.applyAsInt(pin, lout));
582 for (int c;;) {
583 if (((c = rt.getPendingCount()) & CUMULATE) != 0)
584 break;
585 if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
586 t = rt;
587 break;
588 }
589 }
590 }
591 for (int c;;) {
592 if (((c = lt.getPendingCount()) & CUMULATE) != 0)
593 break;
594 if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
595 if (t != null)
596 f = t;
597 t = lt;
598 break;
599 }
600 }
601 if (t == null)
602 break;
603 }
604 if (f != null)
605 f.fork();
606 }
607 else {
608 int state; // Transition to sum, cumulate, or both
609 for (int b;;) {
610 if (((b = t.getPendingCount()) & FINISHED) != 0)
611 break outer; // already done
612 state = ((b & CUMULATE) != 0 ? FINISHED :
613 (l > org) ? SUMMED : (SUMMED|FINISHED));
614 if (t.compareAndSetPendingCount(b, b|state))
615 break;
616 }
617
618 int sum;
619 if (state != SUMMED) {
620 int first;
621 if (l == org) { // leftmost; no in
622 sum = a[org];
623 first = org + 1;
624 }
625 else {
626 sum = t.in;
627 first = l;
628 }
629 for (int i = first; i < h; ++i) // cumulate
630 a[i] = sum = fn.applyAsInt(sum, a[i]);
631 }
632 else if (h < fnc) { // skip rightmost
633 sum = a[l];
634 for (int i = l + 1; i < h; ++i) // sum only
635 sum = fn.applyAsInt(sum, a[i]);
636 }
637 else
638 sum = t.in;
639 t.out = sum;
640 for (IntCumulateTask par;;) { // propagate
641 if ((par = (IntCumulateTask)t.getCompleter()) == null) {
642 if ((state & FINISHED) != 0) // enable join
643 t.quietlyComplete();
644 break outer;
645 }
646 int b = par.getPendingCount();
647 if ((b & state & FINISHED) != 0)
648 t = par; // both done
649 else if ((b & state & SUMMED) != 0) { // both summed
650 int nextState; IntCumulateTask lt, rt;
651 if ((lt = par.left) != null &&
652 (rt = par.right) != null) {
653 int lout = lt.out;
654 par.out = (rt.hi == fnc ? lout :
655 fn.applyAsInt(lout, rt.out));
656 }
657 int refork = (((b & CUMULATE) == 0 &&
658 par.lo == org) ? CUMULATE : 0);
659 if ((nextState = b|state|refork) == b ||
660 par.compareAndSetPendingCount(b, nextState)) {
661 state = SUMMED; // drop finished
662 t = par;
663 if (refork != 0)
664 par.fork();
665 }
666 }
667 else if (par.compareAndSetPendingCount(b, b|state))
668 break outer; // sib not ready
669 }
670 }
671 }
672 }
673 }
674
675
676 }