ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/ArrayPrefixUtil.java
Revision: 1.6
Committed: Wed Jan 16 19:01:22 2013 UTC (11 years, 3 months ago) by dl
Branch: MAIN
CVS Tags: HEAD
Changes since 1.5: +0 -0 lines
State: FILE REMOVED
Log Message:
rename file

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util;
8 import java.util.concurrent.ForkJoinPool;
9 import java.util.concurrent.CountedCompleter;
10 import java.util.function.BinaryOperator;
11 import java.util.function.IntBinaryOperator;
12 import java.util.function.LongBinaryOperator;
13 import java.util.function.DoubleBinaryOperator;
14
15 /**
16 * ForkJoin tasks to perform Arrays.parallelPrefix operations.
17 *
18 * @author Doug Lea
19 * @since 1.8
20 */
21 class ArrayPrefixUtil {
22 private ArrayPrefixUtil() {}; // non-instantiable
23
24 /*
25 * Parallel prefix (aka cumulate, scan) task classes
26 * are based loosely on Guy Blelloch's original
27 * algorithm (http://www.cs.cmu.edu/~scandal/alg/scan.html):
28 * Keep dividing by two to threshold segment size, and then:
29 * Pass 1: Create tree of partial sums for each segment
30 * Pass 2: For each segment, cumulate with offset of left sibling
31 *
32 * This version improves performance within FJ framework mainly by
33 * allowing the second pass of ready left-hand sides to proceed
34 * even if some right-hand side first passes are still executing.
35 * It also combines first and second pass for leftmost segment,
36 * and skips the first pass for rightmost segment (whose result is
37 * not needed for second pass). It similarly manages to avoid
38 * requiring that users supply an identity basis for accumulations
39 * by tracking those segments/subtasks for which the first
40 * existing element is used as base.
41 *
42 * Managing this relies on ORing some bits in the pendingCount for
43 * phases/states: CUMULATE, SUMMED, and FINISHED. CUMULATE is the
44 * main phase bit. When false, segments compute only their sum.
45 * When true, they cumulate array elements. CUMULATE is set at
46 * root at beginning of second pass and then propagated down. But
47 * it may also be set earlier for subtrees with lo==0 (the left
48 * spine of tree). SUMMED is a one bit join count. For leafs, it
49 * is set when summed. For internal nodes, it becomes true when
50 * one child is summed. When the second child finishes summing,
51 * we then moves up tree to trigger the cumulate phase. FINISHED
52 * is also a one bit join count. For leafs, it is set when
53 * cumulated. For internal nodes, it becomes true when one child
54 * is cumulated. When the second child finishes cumulating, it
55 * then moves up tree, completing at the root.
56 *
57 * To better exploit locality and reduce overhead, the compute
58 * method loops starting with the current task, moving if possible
59 * to one of its subtasks rather than forking.
60 *
61 * As usual for this sort of utility, there are 4 versions, that
62 * are simple copy/paste/adapt variants of each other. (The
63 * double and int versions differ from long version solely by
64 * replacing "long" (with case-matching)).
65 */
66
67 // see above
68 static final int CUMULATE = 1;
69 static final int SUMMED = 2;
70 static final int FINISHED = 4;
71
72 /** The smallest subtask array partition size to use as threshold */
73 static final int MIN_PARTITION = 16;
74
75 static final class CumulateTask<T> extends CountedCompleter<Void> {
76 final T[] array;
77 final BinaryOperator<T> function;
78 CumulateTask<T> left, right;
79 T in, out;
80 final int lo, hi, origin, fence, threshold;
81
82 /** Root task constructor */
83 public CumulateTask(CumulateTask<T> parent,
84 BinaryOperator<T> function,
85 T[] array, int lo, int hi) {
86 super(parent);
87 this.function = function; this.array = array;
88 this.lo = this.origin = lo; this.hi = this.fence = hi;
89 int p;
90 this.threshold =
91 (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
92 <= MIN_PARTITION ? MIN_PARTITION : p;
93 }
94
95 /** Subtask constructor */
96 CumulateTask(CumulateTask<T> parent, BinaryOperator<T> function,
97 T[] array, int origin, int fence, int threshold,
98 int lo, int hi) {
99 super(parent);
100 this.function = function; this.array = array;
101 this.origin = origin; this.fence = fence;
102 this.threshold = threshold;
103 this.lo = lo; this.hi = hi;
104 }
105
106 public final void compute() {
107 final BinaryOperator<T> fn;
108 final T[] a;
109 if ((fn = this.function) == null || (a = this.array) == null)
110 throw new NullPointerException(); // hoist checks
111 int th = threshold, org = origin, fnc = fence, l, h;
112 CumulateTask<T> t = this;
113 outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
114 if (h - l > th) {
115 CumulateTask<T> lt = t.left, rt = t.right, f;
116 if (lt == null) { // first pass
117 int mid = (l + h) >>> 1;
118 f = rt = t.right =
119 new CumulateTask<T>(t, fn, a, org, fnc, th, mid, h);
120 t = lt = t.left =
121 new CumulateTask<T>(t, fn, a, org, fnc, th, l, mid);
122 }
123 else { // possibly refork
124 T pin = t.in;
125 lt.in = pin;
126 f = t = null;
127 if (rt != null) {
128 T lout = lt.out;
129 rt.in = (l == org ? lout :
130 fn.apply(pin, lout));
131 for (int c;;) {
132 if (((c = rt.getPendingCount()) & CUMULATE) != 0)
133 break;
134 if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
135 t = rt;
136 break;
137 }
138 }
139 }
140 for (int c;;) {
141 if (((c = lt.getPendingCount()) & CUMULATE) != 0)
142 break;
143 if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
144 if (t != null)
145 f = t;
146 t = lt;
147 break;
148 }
149 }
150 if (t == null)
151 break;
152 }
153 if (f != null)
154 f.fork();
155 }
156 else {
157 int state; // Transition to sum, cumulate, or both
158 for (int b;;) {
159 if (((b = t.getPendingCount()) & FINISHED) != 0)
160 break outer; // already done
161 state = ((b & CUMULATE) != 0 ? FINISHED :
162 (l > org) ? SUMMED : (SUMMED|FINISHED));
163 if (t.compareAndSetPendingCount(b, b|state))
164 break;
165 }
166
167 T sum;
168 if (state != SUMMED) {
169 int first;
170 if (l == org) { // leftmost; no in
171 sum = a[org];
172 first = org + 1;
173 }
174 else {
175 sum = t.in;
176 first = l;
177 }
178 for (int i = first; i < h; ++i) // cumulate
179 a[i] = sum = fn.apply(sum, a[i]);
180 }
181 else if (h < fnc) { // skip rightmost
182 sum = a[l];
183 for (int i = l + 1; i < h; ++i) // sum only
184 sum = fn.apply(sum, a[i]);
185 }
186 else
187 sum = t.in;
188 t.out = sum;
189 for (CumulateTask<T> par;;) { // propagate
190 if ((par = (CumulateTask<T>)t.getCompleter()) == null) {
191 if ((state & FINISHED) != 0) // enable join
192 t.quietlyComplete();
193 break outer;
194 }
195 int b = par.getPendingCount();
196 if ((b & state & FINISHED) != 0)
197 t = par; // both done
198 else if ((b & state & SUMMED) != 0) { // both summed
199 int nextState; CumulateTask<T> lt, rt;
200 if ((lt = par.left) != null &&
201 (rt = par.right) != null) {
202 T lout = lt.out;
203 par.out = (rt.hi == fnc ? lout :
204 fn.apply(lout, rt.out));
205 }
206 int refork = (((b & CUMULATE) == 0 &&
207 par.lo == org) ? CUMULATE : 0);
208 if ((nextState = b|state|refork) == b ||
209 par.compareAndSetPendingCount(b, nextState)) {
210 state = SUMMED; // drop finished
211 t = par;
212 if (refork != 0)
213 par.fork();
214 }
215 }
216 else if (par.compareAndSetPendingCount(b, b|state))
217 break outer; // sib not ready
218 }
219 }
220 }
221 }
222 private static final long serialVersionUID = 5293554502939613543L;
223 }
224
225 static final class LongCumulateTask extends CountedCompleter<Void> {
226 final long[] array;
227 final LongBinaryOperator function;
228 LongCumulateTask left, right;
229 long in, out;
230 final int lo, hi, origin, fence, threshold;
231
232 /** Root task constructor */
233 public LongCumulateTask(LongCumulateTask parent,
234 LongBinaryOperator function,
235 long[] array, int lo, int hi) {
236 super(parent);
237 this.function = function; this.array = array;
238 this.lo = this.origin = lo; this.hi = this.fence = hi;
239 int p;
240 this.threshold =
241 (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
242 <= MIN_PARTITION ? MIN_PARTITION : p;
243 }
244
245 /** Subtask constructor */
246 LongCumulateTask(LongCumulateTask parent, LongBinaryOperator function,
247 long[] array, int origin, int fence, int threshold,
248 int lo, int hi) {
249 super(parent);
250 this.function = function; this.array = array;
251 this.origin = origin; this.fence = fence;
252 this.threshold = threshold;
253 this.lo = lo; this.hi = hi;
254 }
255
256 public final void compute() {
257 final LongBinaryOperator fn;
258 final long[] a;
259 if ((fn = this.function) == null || (a = this.array) == null)
260 throw new NullPointerException(); // hoist checks
261 int th = threshold, org = origin, fnc = fence, l, h;
262 LongCumulateTask t = this;
263 outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
264 if (h - l > th) {
265 LongCumulateTask lt = t.left, rt = t.right, f;
266 if (lt == null) { // first pass
267 int mid = (l + h) >>> 1;
268 f = rt = t.right =
269 new LongCumulateTask(t, fn, a, org, fnc, th, mid, h);
270 t = lt = t.left =
271 new LongCumulateTask(t, fn, a, org, fnc, th, l, mid);
272 }
273 else { // possibly refork
274 long pin = t.in;
275 lt.in = pin;
276 f = t = null;
277 if (rt != null) {
278 long lout = lt.out;
279 rt.in = (l == org ? lout :
280 fn.applyAsLong(pin, lout));
281 for (int c;;) {
282 if (((c = rt.getPendingCount()) & CUMULATE) != 0)
283 break;
284 if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
285 t = rt;
286 break;
287 }
288 }
289 }
290 for (int c;;) {
291 if (((c = lt.getPendingCount()) & CUMULATE) != 0)
292 break;
293 if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
294 if (t != null)
295 f = t;
296 t = lt;
297 break;
298 }
299 }
300 if (t == null)
301 break;
302 }
303 if (f != null)
304 f.fork();
305 }
306 else {
307 int state; // Transition to sum, cumulate, or both
308 for (int b;;) {
309 if (((b = t.getPendingCount()) & FINISHED) != 0)
310 break outer; // already done
311 state = ((b & CUMULATE) != 0 ? FINISHED :
312 (l > org) ? SUMMED : (SUMMED|FINISHED));
313 if (t.compareAndSetPendingCount(b, b|state))
314 break;
315 }
316
317 long sum;
318 if (state != SUMMED) {
319 int first;
320 if (l == org) { // leftmost; no in
321 sum = a[org];
322 first = org + 1;
323 }
324 else {
325 sum = t.in;
326 first = l;
327 }
328 for (int i = first; i < h; ++i) // cumulate
329 a[i] = sum = fn.applyAsLong(sum, a[i]);
330 }
331 else if (h < fnc) { // skip rightmost
332 sum = a[l];
333 for (int i = l + 1; i < h; ++i) // sum only
334 sum = fn.applyAsLong(sum, a[i]);
335 }
336 else
337 sum = t.in;
338 t.out = sum;
339 for (LongCumulateTask par;;) { // propagate
340 if ((par = (LongCumulateTask)t.getCompleter()) == null) {
341 if ((state & FINISHED) != 0) // enable join
342 t.quietlyComplete();
343 break outer;
344 }
345 int b = par.getPendingCount();
346 if ((b & state & FINISHED) != 0)
347 t = par; // both done
348 else if ((b & state & SUMMED) != 0) { // both summed
349 int nextState; LongCumulateTask lt, rt;
350 if ((lt = par.left) != null &&
351 (rt = par.right) != null) {
352 long lout = lt.out;
353 par.out = (rt.hi == fnc ? lout :
354 fn.applyAsLong(lout, rt.out));
355 }
356 int refork = (((b & CUMULATE) == 0 &&
357 par.lo == org) ? CUMULATE : 0);
358 if ((nextState = b|state|refork) == b ||
359 par.compareAndSetPendingCount(b, nextState)) {
360 state = SUMMED; // drop finished
361 t = par;
362 if (refork != 0)
363 par.fork();
364 }
365 }
366 else if (par.compareAndSetPendingCount(b, b|state))
367 break outer; // sib not ready
368 }
369 }
370 }
371 }
372 private static final long serialVersionUID = -5074099945909284273L;
373 }
374
375 static final class DoubleCumulateTask extends CountedCompleter<Void> {
376 final double[] array;
377 final DoubleBinaryOperator function;
378 DoubleCumulateTask left, right;
379 double in, out;
380 final int lo, hi, origin, fence, threshold;
381
382 /** Root task constructor */
383 public DoubleCumulateTask(DoubleCumulateTask parent,
384 DoubleBinaryOperator function,
385 double[] array, int lo, int hi) {
386 super(parent);
387 this.function = function; this.array = array;
388 this.lo = this.origin = lo; this.hi = this.fence = hi;
389 int p;
390 this.threshold =
391 (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
392 <= MIN_PARTITION ? MIN_PARTITION : p;
393 }
394
395 /** Subtask constructor */
396 DoubleCumulateTask(DoubleCumulateTask parent, DoubleBinaryOperator function,
397 double[] array, int origin, int fence, int threshold,
398 int lo, int hi) {
399 super(parent);
400 this.function = function; this.array = array;
401 this.origin = origin; this.fence = fence;
402 this.threshold = threshold;
403 this.lo = lo; this.hi = hi;
404 }
405
406 public final void compute() {
407 final DoubleBinaryOperator fn;
408 final double[] a;
409 if ((fn = this.function) == null || (a = this.array) == null)
410 throw new NullPointerException(); // hoist checks
411 int th = threshold, org = origin, fnc = fence, l, h;
412 DoubleCumulateTask t = this;
413 outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
414 if (h - l > th) {
415 DoubleCumulateTask lt = t.left, rt = t.right, f;
416 if (lt == null) { // first pass
417 int mid = (l + h) >>> 1;
418 f = rt = t.right =
419 new DoubleCumulateTask(t, fn, a, org, fnc, th, mid, h);
420 t = lt = t.left =
421 new DoubleCumulateTask(t, fn, a, org, fnc, th, l, mid);
422 }
423 else { // possibly refork
424 double pin = t.in;
425 lt.in = pin;
426 f = t = null;
427 if (rt != null) {
428 double lout = lt.out;
429 rt.in = (l == org ? lout :
430 fn.applyAsDouble(pin, lout));
431 for (int c;;) {
432 if (((c = rt.getPendingCount()) & CUMULATE) != 0)
433 break;
434 if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
435 t = rt;
436 break;
437 }
438 }
439 }
440 for (int c;;) {
441 if (((c = lt.getPendingCount()) & CUMULATE) != 0)
442 break;
443 if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
444 if (t != null)
445 f = t;
446 t = lt;
447 break;
448 }
449 }
450 if (t == null)
451 break;
452 }
453 if (f != null)
454 f.fork();
455 }
456 else {
457 int state; // Transition to sum, cumulate, or both
458 for (int b;;) {
459 if (((b = t.getPendingCount()) & FINISHED) != 0)
460 break outer; // already done
461 state = ((b & CUMULATE) != 0 ? FINISHED :
462 (l > org) ? SUMMED : (SUMMED|FINISHED));
463 if (t.compareAndSetPendingCount(b, b|state))
464 break;
465 }
466
467 double sum;
468 if (state != SUMMED) {
469 int first;
470 if (l == org) { // leftmost; no in
471 sum = a[org];
472 first = org + 1;
473 }
474 else {
475 sum = t.in;
476 first = l;
477 }
478 for (int i = first; i < h; ++i) // cumulate
479 a[i] = sum = fn.applyAsDouble(sum, a[i]);
480 }
481 else if (h < fnc) { // skip rightmost
482 sum = a[l];
483 for (int i = l + 1; i < h; ++i) // sum only
484 sum = fn.applyAsDouble(sum, a[i]);
485 }
486 else
487 sum = t.in;
488 t.out = sum;
489 for (DoubleCumulateTask par;;) { // propagate
490 if ((par = (DoubleCumulateTask)t.getCompleter()) == null) {
491 if ((state & FINISHED) != 0) // enable join
492 t.quietlyComplete();
493 break outer;
494 }
495 int b = par.getPendingCount();
496 if ((b & state & FINISHED) != 0)
497 t = par; // both done
498 else if ((b & state & SUMMED) != 0) { // both summed
499 int nextState; DoubleCumulateTask lt, rt;
500 if ((lt = par.left) != null &&
501 (rt = par.right) != null) {
502 double lout = lt.out;
503 par.out = (rt.hi == fnc ? lout :
504 fn.applyAsDouble(lout, rt.out));
505 }
506 int refork = (((b & CUMULATE) == 0 &&
507 par.lo == org) ? CUMULATE : 0);
508 if ((nextState = b|state|refork) == b ||
509 par.compareAndSetPendingCount(b, nextState)) {
510 state = SUMMED; // drop finished
511 t = par;
512 if (refork != 0)
513 par.fork();
514 }
515 }
516 else if (par.compareAndSetPendingCount(b, b|state))
517 break outer; // sib not ready
518 }
519 }
520 }
521 }
522 private static final long serialVersionUID = -586947823794232033L;
523 }
524
525 static final class IntCumulateTask extends CountedCompleter<Void> {
526 final int[] array;
527 final IntBinaryOperator function;
528 IntCumulateTask left, right;
529 int in, out;
530 final int lo, hi, origin, fence, threshold;
531
532 /** Root task constructor */
533 public IntCumulateTask(IntCumulateTask parent,
534 IntBinaryOperator function,
535 int[] array, int lo, int hi) {
536 super(parent);
537 this.function = function; this.array = array;
538 this.lo = this.origin = lo; this.hi = this.fence = hi;
539 int p;
540 this.threshold =
541 (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
542 <= MIN_PARTITION ? MIN_PARTITION : p;
543 }
544
545 /** Subtask constructor */
546 IntCumulateTask(IntCumulateTask parent, IntBinaryOperator function,
547 int[] array, int origin, int fence, int threshold,
548 int lo, int hi) {
549 super(parent);
550 this.function = function; this.array = array;
551 this.origin = origin; this.fence = fence;
552 this.threshold = threshold;
553 this.lo = lo; this.hi = hi;
554 }
555
556 public final void compute() {
557 final IntBinaryOperator fn;
558 final int[] a;
559 if ((fn = this.function) == null || (a = this.array) == null)
560 throw new NullPointerException(); // hoist checks
561 int th = threshold, org = origin, fnc = fence, l, h;
562 IntCumulateTask t = this;
563 outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
564 if (h - l > th) {
565 IntCumulateTask lt = t.left, rt = t.right, f;
566 if (lt == null) { // first pass
567 int mid = (l + h) >>> 1;
568 f = rt = t.right =
569 new IntCumulateTask(t, fn, a, org, fnc, th, mid, h);
570 t = lt = t.left =
571 new IntCumulateTask(t, fn, a, org, fnc, th, l, mid);
572 }
573 else { // possibly refork
574 int pin = t.in;
575 lt.in = pin;
576 f = t = null;
577 if (rt != null) {
578 int lout = lt.out;
579 rt.in = (l == org ? lout :
580 fn.applyAsInt(pin, lout));
581 for (int c;;) {
582 if (((c = rt.getPendingCount()) & CUMULATE) != 0)
583 break;
584 if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
585 t = rt;
586 break;
587 }
588 }
589 }
590 for (int c;;) {
591 if (((c = lt.getPendingCount()) & CUMULATE) != 0)
592 break;
593 if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
594 if (t != null)
595 f = t;
596 t = lt;
597 break;
598 }
599 }
600 if (t == null)
601 break;
602 }
603 if (f != null)
604 f.fork();
605 }
606 else {
607 int state; // Transition to sum, cumulate, or both
608 for (int b;;) {
609 if (((b = t.getPendingCount()) & FINISHED) != 0)
610 break outer; // already done
611 state = ((b & CUMULATE) != 0 ? FINISHED :
612 (l > org) ? SUMMED : (SUMMED|FINISHED));
613 if (t.compareAndSetPendingCount(b, b|state))
614 break;
615 }
616
617 int sum;
618 if (state != SUMMED) {
619 int first;
620 if (l == org) { // leftmost; no in
621 sum = a[org];
622 first = org + 1;
623 }
624 else {
625 sum = t.in;
626 first = l;
627 }
628 for (int i = first; i < h; ++i) // cumulate
629 a[i] = sum = fn.applyAsInt(sum, a[i]);
630 }
631 else if (h < fnc) { // skip rightmost
632 sum = a[l];
633 for (int i = l + 1; i < h; ++i) // sum only
634 sum = fn.applyAsInt(sum, a[i]);
635 }
636 else
637 sum = t.in;
638 t.out = sum;
639 for (IntCumulateTask par;;) { // propagate
640 if ((par = (IntCumulateTask)t.getCompleter()) == null) {
641 if ((state & FINISHED) != 0) // enable join
642 t.quietlyComplete();
643 break outer;
644 }
645 int b = par.getPendingCount();
646 if ((b & state & FINISHED) != 0)
647 t = par; // both done
648 else if ((b & state & SUMMED) != 0) { // both summed
649 int nextState; IntCumulateTask lt, rt;
650 if ((lt = par.left) != null &&
651 (rt = par.right) != null) {
652 int lout = lt.out;
653 par.out = (rt.hi == fnc ? lout :
654 fn.applyAsInt(lout, rt.out));
655 }
656 int refork = (((b & CUMULATE) == 0 &&
657 par.lo == org) ? CUMULATE : 0);
658 if ((nextState = b|state|refork) == b ||
659 par.compareAndSetPendingCount(b, nextState)) {
660 state = SUMMED; // drop finished
661 t = par;
662 if (refork != 0)
663 par.fork();
664 }
665 }
666 else if (par.compareAndSetPendingCount(b, b|state))
667 break outer; // sib not ready
668 }
669 }
670 }
671 }
672 private static final long serialVersionUID = 3731755594596840961L;
673 }
674
675 }