ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/ArrayPrefixUtil.java
Revision: 1.1
Committed: Sat Dec 22 18:42:44 2012 UTC (11 years, 4 months ago) by dl
Branch: MAIN
Log Message:
Initial version

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util;
8 import java.util.concurrent.ForkJoinPool;
9 import java.util.concurrent.CountedCompleter;
10 import java.util.function.BinaryOperator;
11 import java.util.function.IntBinaryOperator;
12 import java.util.function.LongBinaryOperator;
13 import java.util.function.DoubleBinaryOperator;
14
15 /**
16 * ForkJoin tasks to perform Arrays.parallelPrefix operations.
17 *
18 * @author Doug Lea
19 * @since 1.8
20 */
21 class ArrayPrefixUtil {
22 private ArrayPrefixUtil() {}; // non-instantiable
23
24 /*
25 * Parallel prefix (aka cumulate, scan) task classes
26 * are based loosely on Guy Blelloch's original
27 * algorithm (http://www.cs.cmu.edu/~scandal/alg/scan.html):
28 * Keep dividing by two to threshold segment size, and then:
29 * Pass 1: Create tree of partial sums for each segment
30 * Pass 2: For each segment, cumulate with offset of left sibling
31 *
32 * This version improves performance within FJ framework mainly by
33 * allowing the second pass of ready left-hand sides to proceed
34 * even if some right-hand side first passes are still executing.
35 * It also combines first and second pass for leftmost segment,
36 * and skips the first pass for rightmost segment (whose result is
37 * not needed for second pass). It similarly manages to avoid
38 * requiring that users supply an identity basis for accumulations
39 * by tracking those segments/subtasks for which the first
40 * existing element is used as base.
41 *
42 * Managing this relies on ORing some bits in the pendingCount for
43 * phases/states: CUMULATE, SUMMED, and FINISHED. CUMULATE is the
44 * main phase bit. When false, segments compute only their sum.
45 * When true, they cumulate array elements. CUMULATE is set at
46 * root at beginning of second pass and then propagated down. But
47 * it may also be set earlier for subtrees with lo==0 (the left
48 * spine of tree). SUMMED is a one bit join count. For leafs, it
49 * is set when summed. For internal nodes, it becomes true when
50 * one child is summed. When the second child finishes summing,
51 * we then moves up tree to trigger the cumulate phase. FINISHED
52 * is also a one bit join count. For leafs, it is set when
53 * cumulated. For internal nodes, it becomes true when one child
54 * is cumulated. When the second child finishes cumulating, it
55 * then moves up tree, completing at the root.
56 *
57 * To better exploit locality and reduce overhead, the compute
58 * method loops starting with the current task, moving if possible
59 * to one of its subtasks rather than forking.
60 *
61 * As usual for this sort of utility, there are 4 versions, that
62 * are simple copy/paste/adapt variants of each other. (The
63 * double and int versions differ from long version soley by
64 * replacing "long" (with case-matching)).
65 */
66
67 // see above
68 static final int CUMULATE = 1;
69 static final int SUMMED = 2;
70 static final int FINISHED = 4;
71
72 /** The smallest subtask array partition size to use as threshold */
73 static final int MIN_PARTITION = 16;
74
75 static final class CumulateTask<T> extends CountedCompleter<Void> {
76 final T[] array;
77 final BinaryOperator<T> function;
78 CumulateTask<T> left, right;
79 T in, out;
80 final int lo, hi, origin, fence, threshold;
81
82 /** Root task constructor */
83 public CumulateTask(CumulateTask<T> parent,
84 BinaryOperator<T> function,
85 T[] array, int lo, int hi) {
86 super(parent);
87 this.function = function; this.array = array;
88 this.lo = this.origin = lo; this.hi = this.fence = hi;
89 int p;
90 this.threshold =
91 (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
92 <= MIN_PARTITION ? MIN_PARTITION : p;
93 }
94
95 /** Subtask constructor */
96 CumulateTask(CumulateTask<T> parent, BinaryOperator<T> function,
97 T[] array, int origin, int fence, int threshold,
98 int lo, int hi) {
99 super(parent);
100 this.function = function; this.array = array;
101 this.origin = origin; this.fence = fence;
102 this.threshold = threshold;
103 this.lo = lo; this.hi = hi;
104 }
105
106 public final void compute() {
107 final BinaryOperator<T> fn;
108 final T[] a;
109 if ((fn = this.function) == null || (a = this.array) == null)
110 throw new NullPointerException(); // hoist checks
111 int th = threshold, org = origin, fnc = fence, l, h;
112 CumulateTask<T> t = this;
113 outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
114 if (h - l > th) {
115 CumulateTask<T> lt = t.left, rt = t.right, f;
116 if (lt == null) { // first pass
117 int mid = (l + h) >>> 1;
118 f = rt = t.right =
119 new CumulateTask<T>(t, fn, a, org, fnc, th, mid, h);
120 t = lt = t.left =
121 new CumulateTask<T>(t, fn, a, org, fnc, th, l, mid);
122 }
123 else { // possibly refork
124 T pin = t.in;
125 lt.in = pin;
126 f = t = null;
127 if (rt != null) {
128 T lout = lt.out;
129 rt.in = (l == org ? lout :
130 fn.apply(pin, lout));
131 for (int c;;) {
132 if (((c = rt.getPendingCount()) & CUMULATE) != 0)
133 break;
134 if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
135 t = rt;
136 break;
137 }
138 }
139 }
140 for (int c;;) {
141 if (((c = lt.getPendingCount()) & CUMULATE) != 0)
142 break;
143 if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
144 if (t != null)
145 f = t;
146 t = lt;
147 break;
148 }
149 }
150 if (t == null)
151 break;
152 }
153 if (f != null)
154 f.fork();
155 }
156 else {
157 int state; // Transition to sum, cumulate, or both
158 for (int b;;) {
159 if (((b = t.getPendingCount()) & FINISHED) != 0)
160 break outer; // already done
161 state = ((b & CUMULATE) != 0? FINISHED :
162 (l > org) ? SUMMED : (SUMMED|FINISHED));
163 if (t.compareAndSetPendingCount(b, b|state))
164 break;
165 }
166
167 T sum;
168 if (state != SUMMED) {
169 int first;
170 if (l == org) { // leftmost; no in
171 sum = a[org];
172 first = org + 1;
173 }
174 else {
175 sum = t.in;
176 first = l;
177 }
178 for (int i = first; i < h; ++i) // cumulate
179 a[i] = sum = fn.apply(sum, a[i]);
180 }
181 else if (h < fnc) { // skip rightmost
182 sum = a[l];
183 for (int i = l + 1; i < h; ++i) // sum only
184 sum = fn.apply(sum, a[i]);
185 }
186 else
187 sum = t.in;
188 t.out = sum;
189 for (CumulateTask<T> par;;) { // propagate
190 if ((par = (CumulateTask<T>)t.getCompleter()) == null) {
191 if ((state & FINISHED) != 0) // enable join
192 t.quietlyComplete();
193 break outer;
194 }
195 int b = par.getPendingCount();
196 if ((b & state & FINISHED) != 0)
197 t = par; // both done
198 else if ((b & state & SUMMED) != 0) { // both summed
199 int nextState; CumulateTask<T> lt, rt;
200 if ((lt = par.left) != null &&
201 (rt = par.right) != null) {
202 T lout = lt.out;
203 par.out = (rt.hi == fnc ? lout :
204 fn.apply(lout, rt.out));
205 }
206 int refork = (((b & CUMULATE) == 0 &&
207 par.lo == org) ? CUMULATE : 0);
208 if ((nextState = b|state|refork) == b ||
209 par.compareAndSetPendingCount(b, nextState)) {
210 state = SUMMED; // drop finished
211 t = par;
212 if (refork != 0)
213 par.fork();
214 }
215 }
216 else if (par.compareAndSetPendingCount(b, b|state))
217 break outer; // sib not ready
218 }
219 }
220 }
221 }
222 }
223
224 static final class LongCumulateTask extends CountedCompleter<Void> {
225 final long[] array;
226 final LongBinaryOperator function;
227 LongCumulateTask left, right;
228 long in, out;
229 final int lo, hi, origin, fence, threshold;
230
231 /** Root task constructor */
232 public LongCumulateTask(LongCumulateTask parent,
233 LongBinaryOperator function,
234 long[] array, int lo, int hi) {
235 super(parent);
236 this.function = function; this.array = array;
237 this.lo = this.origin = lo; this.hi = this.fence = hi;
238 int p;
239 this.threshold =
240 (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
241 <= MIN_PARTITION ? MIN_PARTITION : p;
242 }
243
244 /** Subtask constructor */
245 LongCumulateTask(LongCumulateTask parent, LongBinaryOperator function,
246 long[] array, int origin, int fence, int threshold,
247 int lo, int hi) {
248 super(parent);
249 this.function = function; this.array = array;
250 this.origin = origin; this.fence = fence;
251 this.threshold = threshold;
252 this.lo = lo; this.hi = hi;
253 }
254
255 public final void compute() {
256 final LongBinaryOperator fn;
257 final long[] a;
258 if ((fn = this.function) == null || (a = this.array) == null)
259 throw new NullPointerException(); // hoist checks
260 int th = threshold, org = origin, fnc = fence, l, h;
261 LongCumulateTask t = this;
262 outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
263 if (h - l > th) {
264 LongCumulateTask lt = t.left, rt = t.right, f;
265 if (lt == null) { // first pass
266 int mid = (l + h) >>> 1;
267 f = rt = t.right =
268 new LongCumulateTask(t, fn, a, org, fnc, th, mid, h);
269 t = lt = t.left =
270 new LongCumulateTask(t, fn, a, org, fnc, th, l, mid);
271 }
272 else { // possibly refork
273 long pin = t.in;
274 lt.in = pin;
275 f = t = null;
276 if (rt != null) {
277 long lout = lt.out;
278 rt.in = (l == org ? lout :
279 fn.applyAsLong(pin, lout));
280 for (int c;;) {
281 if (((c = rt.getPendingCount()) & CUMULATE) != 0)
282 break;
283 if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
284 t = rt;
285 break;
286 }
287 }
288 }
289 for (int c;;) {
290 if (((c = lt.getPendingCount()) & CUMULATE) != 0)
291 break;
292 if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
293 if (t != null)
294 f = t;
295 t = lt;
296 break;
297 }
298 }
299 if (t == null)
300 break;
301 }
302 if (f != null)
303 f.fork();
304 }
305 else {
306 int state; // Transition to sum, cumulate, or both
307 for (int b;;) {
308 if (((b = t.getPendingCount()) & FINISHED) != 0)
309 break outer; // already done
310 state = ((b & CUMULATE) != 0? FINISHED :
311 (l > org) ? SUMMED : (SUMMED|FINISHED));
312 if (t.compareAndSetPendingCount(b, b|state))
313 break;
314 }
315
316 long sum;
317 if (state != SUMMED) {
318 int first;
319 if (l == org) { // leftmost; no in
320 sum = a[org];
321 first = org + 1;
322 }
323 else {
324 sum = t.in;
325 first = l;
326 }
327 for (int i = first; i < h; ++i) // cumulate
328 a[i] = sum = fn.applyAsLong(sum, a[i]);
329 }
330 else if (h < fnc) { // skip rightmost
331 sum = a[l];
332 for (int i = l + 1; i < h; ++i) // sum only
333 sum = fn.applyAsLong(sum, a[i]);
334 }
335 else
336 sum = t.in;
337 t.out = sum;
338 for (LongCumulateTask par;;) { // propagate
339 if ((par = (LongCumulateTask)t.getCompleter()) == null) {
340 if ((state & FINISHED) != 0) // enable join
341 t.quietlyComplete();
342 break outer;
343 }
344 int b = par.getPendingCount();
345 if ((b & state & FINISHED) != 0)
346 t = par; // both done
347 else if ((b & state & SUMMED) != 0) { // both summed
348 int nextState; LongCumulateTask lt, rt;
349 if ((lt = par.left) != null &&
350 (rt = par.right) != null) {
351 long lout = lt.out;
352 par.out = (rt.hi == fnc ? lout :
353 fn.applyAsLong(lout, rt.out));
354 }
355 int refork = (((b & CUMULATE) == 0 &&
356 par.lo == org) ? CUMULATE : 0);
357 if ((nextState = b|state|refork) == b ||
358 par.compareAndSetPendingCount(b, nextState)) {
359 state = SUMMED; // drop finished
360 t = par;
361 if (refork != 0)
362 par.fork();
363 }
364 }
365 else if (par.compareAndSetPendingCount(b, b|state))
366 break outer; // sib not ready
367 }
368 }
369 }
370 }
371 }
372
373 static final class DoubleCumulateTask extends CountedCompleter<Void> {
374 final double[] array;
375 final DoubleBinaryOperator function;
376 DoubleCumulateTask left, right;
377 double in, out;
378 final int lo, hi, origin, fence, threshold;
379
380 /** Root task constructor */
381 public DoubleCumulateTask(DoubleCumulateTask parent,
382 DoubleBinaryOperator function,
383 double[] array, int lo, int hi) {
384 super(parent);
385 this.function = function; this.array = array;
386 this.lo = this.origin = lo; this.hi = this.fence = hi;
387 int p;
388 this.threshold =
389 (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
390 <= MIN_PARTITION ? MIN_PARTITION : p;
391 }
392
393 /** Subtask constructor */
394 DoubleCumulateTask(DoubleCumulateTask parent, DoubleBinaryOperator function,
395 double[] array, int origin, int fence, int threshold,
396 int lo, int hi) {
397 super(parent);
398 this.function = function; this.array = array;
399 this.origin = origin; this.fence = fence;
400 this.threshold = threshold;
401 this.lo = lo; this.hi = hi;
402 }
403
404 public final void compute() {
405 final DoubleBinaryOperator fn;
406 final double[] a;
407 if ((fn = this.function) == null || (a = this.array) == null)
408 throw new NullPointerException(); // hoist checks
409 int th = threshold, org = origin, fnc = fence, l, h;
410 DoubleCumulateTask t = this;
411 outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
412 if (h - l > th) {
413 DoubleCumulateTask lt = t.left, rt = t.right, f;
414 if (lt == null) { // first pass
415 int mid = (l + h) >>> 1;
416 f = rt = t.right =
417 new DoubleCumulateTask(t, fn, a, org, fnc, th, mid, h);
418 t = lt = t.left =
419 new DoubleCumulateTask(t, fn, a, org, fnc, th, l, mid);
420 }
421 else { // possibly refork
422 double pin = t.in;
423 lt.in = pin;
424 f = t = null;
425 if (rt != null) {
426 double lout = lt.out;
427 rt.in = (l == org ? lout :
428 fn.applyAsDouble(pin, lout));
429 for (int c;;) {
430 if (((c = rt.getPendingCount()) & CUMULATE) != 0)
431 break;
432 if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
433 t = rt;
434 break;
435 }
436 }
437 }
438 for (int c;;) {
439 if (((c = lt.getPendingCount()) & CUMULATE) != 0)
440 break;
441 if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
442 if (t != null)
443 f = t;
444 t = lt;
445 break;
446 }
447 }
448 if (t == null)
449 break;
450 }
451 if (f != null)
452 f.fork();
453 }
454 else {
455 int state; // Transition to sum, cumulate, or both
456 for (int b;;) {
457 if (((b = t.getPendingCount()) & FINISHED) != 0)
458 break outer; // already done
459 state = ((b & CUMULATE) != 0? FINISHED :
460 (l > org) ? SUMMED : (SUMMED|FINISHED));
461 if (t.compareAndSetPendingCount(b, b|state))
462 break;
463 }
464
465 double sum;
466 if (state != SUMMED) {
467 int first;
468 if (l == org) { // leftmost; no in
469 sum = a[org];
470 first = org + 1;
471 }
472 else {
473 sum = t.in;
474 first = l;
475 }
476 for (int i = first; i < h; ++i) // cumulate
477 a[i] = sum = fn.applyAsDouble(sum, a[i]);
478 }
479 else if (h < fnc) { // skip rightmost
480 sum = a[l];
481 for (int i = l + 1; i < h; ++i) // sum only
482 sum = fn.applyAsDouble(sum, a[i]);
483 }
484 else
485 sum = t.in;
486 t.out = sum;
487 for (DoubleCumulateTask par;;) { // propagate
488 if ((par = (DoubleCumulateTask)t.getCompleter()) == null) {
489 if ((state & FINISHED) != 0) // enable join
490 t.quietlyComplete();
491 break outer;
492 }
493 int b = par.getPendingCount();
494 if ((b & state & FINISHED) != 0)
495 t = par; // both done
496 else if ((b & state & SUMMED) != 0) { // both summed
497 int nextState; DoubleCumulateTask lt, rt;
498 if ((lt = par.left) != null &&
499 (rt = par.right) != null) {
500 double lout = lt.out;
501 par.out = (rt.hi == fnc ? lout :
502 fn.applyAsDouble(lout, rt.out));
503 }
504 int refork = (((b & CUMULATE) == 0 &&
505 par.lo == org) ? CUMULATE : 0);
506 if ((nextState = b|state|refork) == b ||
507 par.compareAndSetPendingCount(b, nextState)) {
508 state = SUMMED; // drop finished
509 t = par;
510 if (refork != 0)
511 par.fork();
512 }
513 }
514 else if (par.compareAndSetPendingCount(b, b|state))
515 break outer; // sib not ready
516 }
517 }
518 }
519 }
520 }
521
522 static final class IntCumulateTask extends CountedCompleter<Void> {
523 final int[] array;
524 final IntBinaryOperator function;
525 IntCumulateTask left, right;
526 int in, out;
527 final int lo, hi, origin, fence, threshold;
528
529 /** Root task constructor */
530 public IntCumulateTask(IntCumulateTask parent,
531 IntBinaryOperator function,
532 int[] array, int lo, int hi) {
533 super(parent);
534 this.function = function; this.array = array;
535 this.lo = this.origin = lo; this.hi = this.fence = hi;
536 int p;
537 this.threshold =
538 (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
539 <= MIN_PARTITION ? MIN_PARTITION : p;
540 }
541
542 /** Subtask constructor */
543 IntCumulateTask(IntCumulateTask parent, IntBinaryOperator function,
544 int[] array, int origin, int fence, int threshold,
545 int lo, int hi) {
546 super(parent);
547 this.function = function; this.array = array;
548 this.origin = origin; this.fence = fence;
549 this.threshold = threshold;
550 this.lo = lo; this.hi = hi;
551 }
552
553 public final void compute() {
554 final IntBinaryOperator fn;
555 final int[] a;
556 if ((fn = this.function) == null || (a = this.array) == null)
557 throw new NullPointerException(); // hoist checks
558 int th = threshold, org = origin, fnc = fence, l, h;
559 IntCumulateTask t = this;
560 outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
561 if (h - l > th) {
562 IntCumulateTask lt = t.left, rt = t.right, f;
563 if (lt == null) { // first pass
564 int mid = (l + h) >>> 1;
565 f = rt = t.right =
566 new IntCumulateTask(t, fn, a, org, fnc, th, mid, h);
567 t = lt = t.left =
568 new IntCumulateTask(t, fn, a, org, fnc, th, l, mid);
569 }
570 else { // possibly refork
571 int pin = t.in;
572 lt.in = pin;
573 f = t = null;
574 if (rt != null) {
575 int lout = lt.out;
576 rt.in = (l == org ? lout :
577 fn.applyAsInt(pin, lout));
578 for (int c;;) {
579 if (((c = rt.getPendingCount()) & CUMULATE) != 0)
580 break;
581 if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
582 t = rt;
583 break;
584 }
585 }
586 }
587 for (int c;;) {
588 if (((c = lt.getPendingCount()) & CUMULATE) != 0)
589 break;
590 if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
591 if (t != null)
592 f = t;
593 t = lt;
594 break;
595 }
596 }
597 if (t == null)
598 break;
599 }
600 if (f != null)
601 f.fork();
602 }
603 else {
604 int state; // Transition to sum, cumulate, or both
605 for (int b;;) {
606 if (((b = t.getPendingCount()) & FINISHED) != 0)
607 break outer; // already done
608 state = ((b & CUMULATE) != 0? FINISHED :
609 (l > org) ? SUMMED : (SUMMED|FINISHED));
610 if (t.compareAndSetPendingCount(b, b|state))
611 break;
612 }
613
614 int sum;
615 if (state != SUMMED) {
616 int first;
617 if (l == org) { // leftmost; no in
618 sum = a[org];
619 first = org + 1;
620 }
621 else {
622 sum = t.in;
623 first = l;
624 }
625 for (int i = first; i < h; ++i) // cumulate
626 a[i] = sum = fn.applyAsInt(sum, a[i]);
627 }
628 else if (h < fnc) { // skip rightmost
629 sum = a[l];
630 for (int i = l + 1; i < h; ++i) // sum only
631 sum = fn.applyAsInt(sum, a[i]);
632 }
633 else
634 sum = t.in;
635 t.out = sum;
636 for (IntCumulateTask par;;) { // propagate
637 if ((par = (IntCumulateTask)t.getCompleter()) == null) {
638 if ((state & FINISHED) != 0) // enable join
639 t.quietlyComplete();
640 break outer;
641 }
642 int b = par.getPendingCount();
643 if ((b & state & FINISHED) != 0)
644 t = par; // both done
645 else if ((b & state & SUMMED) != 0) { // both summed
646 int nextState; IntCumulateTask lt, rt;
647 if ((lt = par.left) != null &&
648 (rt = par.right) != null) {
649 int lout = lt.out;
650 par.out = (rt.hi == fnc ? lout :
651 fn.applyAsInt(lout, rt.out));
652 }
653 int refork = (((b & CUMULATE) == 0 &&
654 par.lo == org) ? CUMULATE : 0);
655 if ((nextState = b|state|refork) == b ||
656 par.compareAndSetPendingCount(b, nextState)) {
657 state = SUMMED; // drop finished
658 t = par;
659 if (refork != 0)
660 par.fork();
661 }
662 }
663 else if (par.compareAndSetPendingCount(b, b|state))
664 break outer; // sib not ready
665 }
666 }
667 }
668 }
669 }
670
671
672 }