ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/ArrayPrefixHelpers.java
Revision: 1.3
Committed: Wed Dec 31 04:53:37 2014 UTC (9 years, 4 months ago) by jsr166
Branch: MAIN
Changes since 1.2: +1 -1 lines
Log Message:
delete stray semicolon

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util;
8 import java.util.concurrent.ForkJoinPool;
9 import java.util.concurrent.CountedCompleter;
10 import java.util.function.BinaryOperator;
11 import java.util.function.IntBinaryOperator;
12 import java.util.function.LongBinaryOperator;
13 import java.util.function.DoubleBinaryOperator;
14
15 /**
16 * ForkJoin tasks to perform Arrays.parallelPrefix operations.
17 *
18 * @author Doug Lea
19 * @since 1.8
20 */
21 class ArrayPrefixHelpers {
22 private ArrayPrefixHelpers() {} // non-instantiable
23
24 /*
25 * Parallel prefix (aka cumulate, scan) task classes
26 * are based loosely on Guy Blelloch's original
27 * algorithm (http://www.cs.cmu.edu/~scandal/alg/scan.html):
28 * Keep dividing by two to threshold segment size, and then:
29 * Pass 1: Create tree of partial sums for each segment
30 * Pass 2: For each segment, cumulate with offset of left sibling
31 *
32 * This version improves performance within FJ framework mainly by
33 * allowing the second pass of ready left-hand sides to proceed
34 * even if some right-hand side first passes are still executing.
35 * It also combines first and second pass for leftmost segment,
36 * and skips the first pass for rightmost segment (whose result is
37 * not needed for second pass). It similarly manages to avoid
38 * requiring that users supply an identity basis for accumulations
39 * by tracking those segments/subtasks for which the first
40 * existing element is used as base.
41 *
42 * Managing this relies on ORing some bits in the pendingCount for
43 * phases/states: CUMULATE, SUMMED, and FINISHED. CUMULATE is the
44 * main phase bit. When false, segments compute only their sum.
45 * When true, they cumulate array elements. CUMULATE is set at
46 * root at beginning of second pass and then propagated down. But
47 * it may also be set earlier for subtrees with lo==0 (the left
48 * spine of tree). SUMMED is a one bit join count. For leafs, it
49 * is set when summed. For internal nodes, it becomes true when
50 * one child is summed. When the second child finishes summing,
51 * we then moves up tree to trigger the cumulate phase. FINISHED
52 * is also a one bit join count. For leafs, it is set when
53 * cumulated. For internal nodes, it becomes true when one child
54 * is cumulated. When the second child finishes cumulating, it
55 * then moves up tree, completing at the root.
56 *
57 * To better exploit locality and reduce overhead, the compute
58 * method loops starting with the current task, moving if possible
59 * to one of its subtasks rather than forking.
60 *
61 * As usual for this sort of utility, there are 4 versions, that
62 * are simple copy/paste/adapt variants of each other. (The
63 * double and int versions differ from long version solely by
64 * replacing "long" (with case-matching)).
65 */
66
67 // see above
68 static final int CUMULATE = 1;
69 static final int SUMMED = 2;
70 static final int FINISHED = 4;
71
72 /** The smallest subtask array partition size to use as threshold */
73 static final int MIN_PARTITION = 16;
74
75 static final class CumulateTask<T> extends CountedCompleter<Void> {
76 final T[] array;
77 final BinaryOperator<T> function;
78 CumulateTask<T> left, right;
79 T in, out;
80 final int lo, hi, origin, fence, threshold;
81
82 /** Root task constructor */
83 public CumulateTask(CumulateTask<T> parent,
84 BinaryOperator<T> function,
85 T[] array, int lo, int hi) {
86 super(parent);
87 this.function = function; this.array = array;
88 this.lo = this.origin = lo; this.hi = this.fence = hi;
89 int p;
90 this.threshold =
91 (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
92 <= MIN_PARTITION ? MIN_PARTITION : p;
93 }
94
95 /** Subtask constructor */
96 CumulateTask(CumulateTask<T> parent, BinaryOperator<T> function,
97 T[] array, int origin, int fence, int threshold,
98 int lo, int hi) {
99 super(parent);
100 this.function = function; this.array = array;
101 this.origin = origin; this.fence = fence;
102 this.threshold = threshold;
103 this.lo = lo; this.hi = hi;
104 }
105
106 public final void compute() {
107 final BinaryOperator<T> fn;
108 final T[] a;
109 if ((fn = this.function) == null || (a = this.array) == null)
110 throw new NullPointerException(); // hoist checks
111 int th = threshold, org = origin, fnc = fence, l, h;
112 CumulateTask<T> t = this;
113 outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
114 if (h - l > th) {
115 CumulateTask<T> lt = t.left, rt = t.right, f;
116 if (lt == null) { // first pass
117 int mid = (l + h) >>> 1;
118 f = rt = t.right =
119 new CumulateTask<T>(t, fn, a, org, fnc, th, mid, h);
120 t = lt = t.left =
121 new CumulateTask<T>(t, fn, a, org, fnc, th, l, mid);
122 }
123 else { // possibly refork
124 T pin = t.in;
125 lt.in = pin;
126 f = t = null;
127 if (rt != null) {
128 T lout = lt.out;
129 rt.in = (l == org ? lout :
130 fn.apply(pin, lout));
131 for (int c;;) {
132 if (((c = rt.getPendingCount()) & CUMULATE) != 0)
133 break;
134 if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
135 t = rt;
136 break;
137 }
138 }
139 }
140 for (int c;;) {
141 if (((c = lt.getPendingCount()) & CUMULATE) != 0)
142 break;
143 if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
144 if (t != null)
145 f = t;
146 t = lt;
147 break;
148 }
149 }
150 if (t == null)
151 break;
152 }
153 if (f != null)
154 f.fork();
155 }
156 else {
157 int state; // Transition to sum, cumulate, or both
158 for (int b;;) {
159 if (((b = t.getPendingCount()) & FINISHED) != 0)
160 break outer; // already done
161 state = ((b & CUMULATE) != 0 ? FINISHED :
162 (l > org) ? SUMMED : (SUMMED|FINISHED));
163 if (t.compareAndSetPendingCount(b, b|state))
164 break;
165 }
166
167 T sum;
168 if (state != SUMMED) {
169 int first;
170 if (l == org) { // leftmost; no in
171 sum = a[org];
172 first = org + 1;
173 }
174 else {
175 sum = t.in;
176 first = l;
177 }
178 for (int i = first; i < h; ++i) // cumulate
179 a[i] = sum = fn.apply(sum, a[i]);
180 }
181 else if (h < fnc) { // skip rightmost
182 sum = a[l];
183 for (int i = l + 1; i < h; ++i) // sum only
184 sum = fn.apply(sum, a[i]);
185 }
186 else
187 sum = t.in;
188 t.out = sum;
189 for (CumulateTask<T> par;;) { // propagate
190 @SuppressWarnings("unchecked") CumulateTask<T> partmp
191 = (CumulateTask<T>)t.getCompleter();
192 if ((par = partmp) == null) {
193 if ((state & FINISHED) != 0) // enable join
194 t.quietlyComplete();
195 break outer;
196 }
197 int b = par.getPendingCount();
198 if ((b & state & FINISHED) != 0)
199 t = par; // both done
200 else if ((b & state & SUMMED) != 0) { // both summed
201 int nextState; CumulateTask<T> lt, rt;
202 if ((lt = par.left) != null &&
203 (rt = par.right) != null) {
204 T lout = lt.out;
205 par.out = (rt.hi == fnc ? lout :
206 fn.apply(lout, rt.out));
207 }
208 int refork = (((b & CUMULATE) == 0 &&
209 par.lo == org) ? CUMULATE : 0);
210 if ((nextState = b|state|refork) == b ||
211 par.compareAndSetPendingCount(b, nextState)) {
212 state = SUMMED; // drop finished
213 t = par;
214 if (refork != 0)
215 par.fork();
216 }
217 }
218 else if (par.compareAndSetPendingCount(b, b|state))
219 break outer; // sib not ready
220 }
221 }
222 }
223 }
224 private static final long serialVersionUID = 5293554502939613543L;
225 }
226
227 static final class LongCumulateTask extends CountedCompleter<Void> {
228 final long[] array;
229 final LongBinaryOperator function;
230 LongCumulateTask left, right;
231 long in, out;
232 final int lo, hi, origin, fence, threshold;
233
234 /** Root task constructor */
235 public LongCumulateTask(LongCumulateTask parent,
236 LongBinaryOperator function,
237 long[] array, int lo, int hi) {
238 super(parent);
239 this.function = function; this.array = array;
240 this.lo = this.origin = lo; this.hi = this.fence = hi;
241 int p;
242 this.threshold =
243 (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
244 <= MIN_PARTITION ? MIN_PARTITION : p;
245 }
246
247 /** Subtask constructor */
248 LongCumulateTask(LongCumulateTask parent, LongBinaryOperator function,
249 long[] array, int origin, int fence, int threshold,
250 int lo, int hi) {
251 super(parent);
252 this.function = function; this.array = array;
253 this.origin = origin; this.fence = fence;
254 this.threshold = threshold;
255 this.lo = lo; this.hi = hi;
256 }
257
258 public final void compute() {
259 final LongBinaryOperator fn;
260 final long[] a;
261 if ((fn = this.function) == null || (a = this.array) == null)
262 throw new NullPointerException(); // hoist checks
263 int th = threshold, org = origin, fnc = fence, l, h;
264 LongCumulateTask t = this;
265 outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
266 if (h - l > th) {
267 LongCumulateTask lt = t.left, rt = t.right, f;
268 if (lt == null) { // first pass
269 int mid = (l + h) >>> 1;
270 f = rt = t.right =
271 new LongCumulateTask(t, fn, a, org, fnc, th, mid, h);
272 t = lt = t.left =
273 new LongCumulateTask(t, fn, a, org, fnc, th, l, mid);
274 }
275 else { // possibly refork
276 long pin = t.in;
277 lt.in = pin;
278 f = t = null;
279 if (rt != null) {
280 long lout = lt.out;
281 rt.in = (l == org ? lout :
282 fn.applyAsLong(pin, lout));
283 for (int c;;) {
284 if (((c = rt.getPendingCount()) & CUMULATE) != 0)
285 break;
286 if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
287 t = rt;
288 break;
289 }
290 }
291 }
292 for (int c;;) {
293 if (((c = lt.getPendingCount()) & CUMULATE) != 0)
294 break;
295 if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
296 if (t != null)
297 f = t;
298 t = lt;
299 break;
300 }
301 }
302 if (t == null)
303 break;
304 }
305 if (f != null)
306 f.fork();
307 }
308 else {
309 int state; // Transition to sum, cumulate, or both
310 for (int b;;) {
311 if (((b = t.getPendingCount()) & FINISHED) != 0)
312 break outer; // already done
313 state = ((b & CUMULATE) != 0 ? FINISHED :
314 (l > org) ? SUMMED : (SUMMED|FINISHED));
315 if (t.compareAndSetPendingCount(b, b|state))
316 break;
317 }
318
319 long sum;
320 if (state != SUMMED) {
321 int first;
322 if (l == org) { // leftmost; no in
323 sum = a[org];
324 first = org + 1;
325 }
326 else {
327 sum = t.in;
328 first = l;
329 }
330 for (int i = first; i < h; ++i) // cumulate
331 a[i] = sum = fn.applyAsLong(sum, a[i]);
332 }
333 else if (h < fnc) { // skip rightmost
334 sum = a[l];
335 for (int i = l + 1; i < h; ++i) // sum only
336 sum = fn.applyAsLong(sum, a[i]);
337 }
338 else
339 sum = t.in;
340 t.out = sum;
341 for (LongCumulateTask par;;) { // propagate
342 if ((par = (LongCumulateTask)t.getCompleter()) == null) {
343 if ((state & FINISHED) != 0) // enable join
344 t.quietlyComplete();
345 break outer;
346 }
347 int b = par.getPendingCount();
348 if ((b & state & FINISHED) != 0)
349 t = par; // both done
350 else if ((b & state & SUMMED) != 0) { // both summed
351 int nextState; LongCumulateTask lt, rt;
352 if ((lt = par.left) != null &&
353 (rt = par.right) != null) {
354 long lout = lt.out;
355 par.out = (rt.hi == fnc ? lout :
356 fn.applyAsLong(lout, rt.out));
357 }
358 int refork = (((b & CUMULATE) == 0 &&
359 par.lo == org) ? CUMULATE : 0);
360 if ((nextState = b|state|refork) == b ||
361 par.compareAndSetPendingCount(b, nextState)) {
362 state = SUMMED; // drop finished
363 t = par;
364 if (refork != 0)
365 par.fork();
366 }
367 }
368 else if (par.compareAndSetPendingCount(b, b|state))
369 break outer; // sib not ready
370 }
371 }
372 }
373 }
374 private static final long serialVersionUID = -5074099945909284273L;
375 }
376
377 static final class DoubleCumulateTask extends CountedCompleter<Void> {
378 final double[] array;
379 final DoubleBinaryOperator function;
380 DoubleCumulateTask left, right;
381 double in, out;
382 final int lo, hi, origin, fence, threshold;
383
384 /** Root task constructor */
385 public DoubleCumulateTask(DoubleCumulateTask parent,
386 DoubleBinaryOperator function,
387 double[] array, int lo, int hi) {
388 super(parent);
389 this.function = function; this.array = array;
390 this.lo = this.origin = lo; this.hi = this.fence = hi;
391 int p;
392 this.threshold =
393 (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
394 <= MIN_PARTITION ? MIN_PARTITION : p;
395 }
396
397 /** Subtask constructor */
398 DoubleCumulateTask(DoubleCumulateTask parent, DoubleBinaryOperator function,
399 double[] array, int origin, int fence, int threshold,
400 int lo, int hi) {
401 super(parent);
402 this.function = function; this.array = array;
403 this.origin = origin; this.fence = fence;
404 this.threshold = threshold;
405 this.lo = lo; this.hi = hi;
406 }
407
408 public final void compute() {
409 final DoubleBinaryOperator fn;
410 final double[] a;
411 if ((fn = this.function) == null || (a = this.array) == null)
412 throw new NullPointerException(); // hoist checks
413 int th = threshold, org = origin, fnc = fence, l, h;
414 DoubleCumulateTask t = this;
415 outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
416 if (h - l > th) {
417 DoubleCumulateTask lt = t.left, rt = t.right, f;
418 if (lt == null) { // first pass
419 int mid = (l + h) >>> 1;
420 f = rt = t.right =
421 new DoubleCumulateTask(t, fn, a, org, fnc, th, mid, h);
422 t = lt = t.left =
423 new DoubleCumulateTask(t, fn, a, org, fnc, th, l, mid);
424 }
425 else { // possibly refork
426 double pin = t.in;
427 lt.in = pin;
428 f = t = null;
429 if (rt != null) {
430 double lout = lt.out;
431 rt.in = (l == org ? lout :
432 fn.applyAsDouble(pin, lout));
433 for (int c;;) {
434 if (((c = rt.getPendingCount()) & CUMULATE) != 0)
435 break;
436 if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
437 t = rt;
438 break;
439 }
440 }
441 }
442 for (int c;;) {
443 if (((c = lt.getPendingCount()) & CUMULATE) != 0)
444 break;
445 if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
446 if (t != null)
447 f = t;
448 t = lt;
449 break;
450 }
451 }
452 if (t == null)
453 break;
454 }
455 if (f != null)
456 f.fork();
457 }
458 else {
459 int state; // Transition to sum, cumulate, or both
460 for (int b;;) {
461 if (((b = t.getPendingCount()) & FINISHED) != 0)
462 break outer; // already done
463 state = ((b & CUMULATE) != 0 ? FINISHED :
464 (l > org) ? SUMMED : (SUMMED|FINISHED));
465 if (t.compareAndSetPendingCount(b, b|state))
466 break;
467 }
468
469 double sum;
470 if (state != SUMMED) {
471 int first;
472 if (l == org) { // leftmost; no in
473 sum = a[org];
474 first = org + 1;
475 }
476 else {
477 sum = t.in;
478 first = l;
479 }
480 for (int i = first; i < h; ++i) // cumulate
481 a[i] = sum = fn.applyAsDouble(sum, a[i]);
482 }
483 else if (h < fnc) { // skip rightmost
484 sum = a[l];
485 for (int i = l + 1; i < h; ++i) // sum only
486 sum = fn.applyAsDouble(sum, a[i]);
487 }
488 else
489 sum = t.in;
490 t.out = sum;
491 for (DoubleCumulateTask par;;) { // propagate
492 if ((par = (DoubleCumulateTask)t.getCompleter()) == null) {
493 if ((state & FINISHED) != 0) // enable join
494 t.quietlyComplete();
495 break outer;
496 }
497 int b = par.getPendingCount();
498 if ((b & state & FINISHED) != 0)
499 t = par; // both done
500 else if ((b & state & SUMMED) != 0) { // both summed
501 int nextState; DoubleCumulateTask lt, rt;
502 if ((lt = par.left) != null &&
503 (rt = par.right) != null) {
504 double lout = lt.out;
505 par.out = (rt.hi == fnc ? lout :
506 fn.applyAsDouble(lout, rt.out));
507 }
508 int refork = (((b & CUMULATE) == 0 &&
509 par.lo == org) ? CUMULATE : 0);
510 if ((nextState = b|state|refork) == b ||
511 par.compareAndSetPendingCount(b, nextState)) {
512 state = SUMMED; // drop finished
513 t = par;
514 if (refork != 0)
515 par.fork();
516 }
517 }
518 else if (par.compareAndSetPendingCount(b, b|state))
519 break outer; // sib not ready
520 }
521 }
522 }
523 }
524 private static final long serialVersionUID = -586947823794232033L;
525 }
526
527 static final class IntCumulateTask extends CountedCompleter<Void> {
528 final int[] array;
529 final IntBinaryOperator function;
530 IntCumulateTask left, right;
531 int in, out;
532 final int lo, hi, origin, fence, threshold;
533
534 /** Root task constructor */
535 public IntCumulateTask(IntCumulateTask parent,
536 IntBinaryOperator function,
537 int[] array, int lo, int hi) {
538 super(parent);
539 this.function = function; this.array = array;
540 this.lo = this.origin = lo; this.hi = this.fence = hi;
541 int p;
542 this.threshold =
543 (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
544 <= MIN_PARTITION ? MIN_PARTITION : p;
545 }
546
547 /** Subtask constructor */
548 IntCumulateTask(IntCumulateTask parent, IntBinaryOperator function,
549 int[] array, int origin, int fence, int threshold,
550 int lo, int hi) {
551 super(parent);
552 this.function = function; this.array = array;
553 this.origin = origin; this.fence = fence;
554 this.threshold = threshold;
555 this.lo = lo; this.hi = hi;
556 }
557
558 public final void compute() {
559 final IntBinaryOperator fn;
560 final int[] a;
561 if ((fn = this.function) == null || (a = this.array) == null)
562 throw new NullPointerException(); // hoist checks
563 int th = threshold, org = origin, fnc = fence, l, h;
564 IntCumulateTask t = this;
565 outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
566 if (h - l > th) {
567 IntCumulateTask lt = t.left, rt = t.right, f;
568 if (lt == null) { // first pass
569 int mid = (l + h) >>> 1;
570 f = rt = t.right =
571 new IntCumulateTask(t, fn, a, org, fnc, th, mid, h);
572 t = lt = t.left =
573 new IntCumulateTask(t, fn, a, org, fnc, th, l, mid);
574 }
575 else { // possibly refork
576 int pin = t.in;
577 lt.in = pin;
578 f = t = null;
579 if (rt != null) {
580 int lout = lt.out;
581 rt.in = (l == org ? lout :
582 fn.applyAsInt(pin, lout));
583 for (int c;;) {
584 if (((c = rt.getPendingCount()) & CUMULATE) != 0)
585 break;
586 if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
587 t = rt;
588 break;
589 }
590 }
591 }
592 for (int c;;) {
593 if (((c = lt.getPendingCount()) & CUMULATE) != 0)
594 break;
595 if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
596 if (t != null)
597 f = t;
598 t = lt;
599 break;
600 }
601 }
602 if (t == null)
603 break;
604 }
605 if (f != null)
606 f.fork();
607 }
608 else {
609 int state; // Transition to sum, cumulate, or both
610 for (int b;;) {
611 if (((b = t.getPendingCount()) & FINISHED) != 0)
612 break outer; // already done
613 state = ((b & CUMULATE) != 0 ? FINISHED :
614 (l > org) ? SUMMED : (SUMMED|FINISHED));
615 if (t.compareAndSetPendingCount(b, b|state))
616 break;
617 }
618
619 int sum;
620 if (state != SUMMED) {
621 int first;
622 if (l == org) { // leftmost; no in
623 sum = a[org];
624 first = org + 1;
625 }
626 else {
627 sum = t.in;
628 first = l;
629 }
630 for (int i = first; i < h; ++i) // cumulate
631 a[i] = sum = fn.applyAsInt(sum, a[i]);
632 }
633 else if (h < fnc) { // skip rightmost
634 sum = a[l];
635 for (int i = l + 1; i < h; ++i) // sum only
636 sum = fn.applyAsInt(sum, a[i]);
637 }
638 else
639 sum = t.in;
640 t.out = sum;
641 for (IntCumulateTask par;;) { // propagate
642 if ((par = (IntCumulateTask)t.getCompleter()) == null) {
643 if ((state & FINISHED) != 0) // enable join
644 t.quietlyComplete();
645 break outer;
646 }
647 int b = par.getPendingCount();
648 if ((b & state & FINISHED) != 0)
649 t = par; // both done
650 else if ((b & state & SUMMED) != 0) { // both summed
651 int nextState; IntCumulateTask lt, rt;
652 if ((lt = par.left) != null &&
653 (rt = par.right) != null) {
654 int lout = lt.out;
655 par.out = (rt.hi == fnc ? lout :
656 fn.applyAsInt(lout, rt.out));
657 }
658 int refork = (((b & CUMULATE) == 0 &&
659 par.lo == org) ? CUMULATE : 0);
660 if ((nextState = b|state|refork) == b ||
661 par.compareAndSetPendingCount(b, nextState)) {
662 state = SUMMED; // drop finished
663 t = par;
664 if (refork != 0)
665 par.fork();
666 }
667 }
668 else if (par.compareAndSetPendingCount(b, b|state))
669 break outer; // sib not ready
670 }
671 }
672 }
673 }
674 private static final long serialVersionUID = 3731755594596840961L;
675 }
676
677 }