ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/forkjoin/LongTasks.java
Revision: 1.4
Committed: Thu Aug 16 12:35:16 2007 UTC (16 years, 9 months ago) by dl
Branch: MAIN
CVS Tags: HEAD
Changes since 1.3: +0 -0 lines
State: FILE REMOVED
Log Message:
Refactor *Tasks into Parallel*

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package jsr166y.forkjoin;
8 import static jsr166y.forkjoin.TaskTypes.*;
9 import java.util.*;
10 import java.util.concurrent.atomic.*;
11
12 /**
13 * Parallel long operations on collections and arrays. This class is a
14 * stand-in for functionality that will probably be supported in some
15 * other way in Java 7.
16 */
17 public class LongTasks {
18 /**
19 * default granularity for divide-by-two tasks. Provides about
20 * four times as many finest-grained tasks as there are CPUs.
21 */
22 static int defaultGranularity(ForkJoinExecutor ex, int n) {
23 int threads = ex.getParallelismLevel();
24 return 1 + n / ((threads << 2) - 3);
25 }
26
27 /**
28 * Applies the given procedure to each element of the array.
29 * @param ex the executor
30 * @param array the array
31 * @param proc the procedure
32 */
33 public static <T> void apply(ForkJoinExecutor ex,
34 long[] array,
35 LongProcedure proc) {
36 int n = array.length;
37 ex.invoke(new FJApplyer(array, proc, 0, n-1, defaultGranularity(ex, n)));
38 }
39
40 /**
41 * Returns reduction of given array
42 * @param ex the executor
43 * @param array the array
44 * @param reducer the reducer
45 * @param base the result for an empty array
46 */
47 public static long reduce(ForkJoinExecutor ex,
48 long[] array,
49 LongReducer reducer,
50 long base) {
51 int n = array.length;
52 FJReducer r = new FJReducer(array,reducer, base,
53 0, n-1, defaultGranularity(ex, n));
54 ex.invoke(r);
55 return r.result;
56 }
57
58 /**
59 * Applies mapper to each element of list and reduces result
60 * @param ex the executor
61 * @param list the list
62 * @param mapper the mapper
63 * @param reducer the reducer
64 * @param base the result for an empty list
65 */
66 public static <T> long reduce(ForkJoinExecutor ex,
67 List<T> list,
68 MapperToLong<T> mapper,
69 LongReducer reducer,
70 long base) {
71 int n = list.size();
72 FJMapReducer<T> r =
73 new FJMapReducer<T>(list, mapper, reducer, base,
74 0, n-1, defaultGranularity(ex, n));
75 ex.invoke(r);
76 return r.result;
77 }
78
79 /**
80 * Applies mapper to each element of array and reduces result
81 * @param ex the executor
82 * @param array the array
83 * @param mapper the mapper
84 * @param reducer the reducer
85 * @param base the result for an empty array
86 */
87 public static <T> long reduce(ForkJoinExecutor ex,
88 long[] array,
89 LongTransformer mapper,
90 LongReducer reducer,
91 long base) {
92 int n = array.length;
93 FJTransformReducer r =
94 new FJTransformReducer(array, mapper, reducer, base,
95 0, n-1, defaultGranularity(ex, n));
96 ex.invoke(r);
97 return r.result;
98 }
99
100 /**
101 * Returns a array mapping each element of given array using mapper
102 * @param ex the executor
103 * @param array the array
104 * @param mapper the mapper
105 */
106 public static long[] map(ForkJoinExecutor ex,
107 long[] array,
108 LongTransformer mapper) {
109 int n = array.length;
110 long[] dest = new long[n];
111 ex.invoke(new FJMapper(array, dest, mapper,
112 0, n-1, defaultGranularity(ex, n)));
113 return dest;
114 }
115
116 /**
117 * Returns an element of the array matching the given predicate, or
118 * missing if none
119 * @param ex the executor
120 * @param array the array
121 * @param pred the predicate
122 * @param missing the value to return if no such element exists
123 */
124 public static long findAny(ForkJoinExecutor ex,
125 long[] array,
126 LongPredicate pred,
127 long missing) {
128 int n = array.length;
129 VolatileLong result = new VolatileLong(missing);
130 ex.invoke(new FJFindAny(array, pred, result, missing,
131 0, n-1, defaultGranularity(ex, n)));
132 return result.value;
133 }
134
135 static final class VolatileLong {
136 volatile long value;
137 VolatileLong(long v) { value = v; }
138 }
139
140 /**
141 * Returns a list of all elements of the array matching pred
142 * @param ex the executor
143 * @param array the array
144 * @param pred the predicate
145 */
146 public static List<Long> findAll(ForkJoinExecutor ex,
147 long[] array,
148 LongPredicate pred) {
149 int n = array.length;
150 Vector<Long> dest = new Vector<Long>(); // todo: use smarter list
151 ex.invoke(new FJFindAll(array, pred, dest,
152 0, n-1, defaultGranularity(ex, n)));
153 return dest;
154 }
155
156
157 /**
158 * Sorts the given array
159 * @param ex the executor
160 * @param array the array
161 */
162 public static void sort(ForkJoinExecutor ex, long[] array) {
163 int n = array.length;
164 long[] workSpace = new long[n];
165 ex.invoke(new FJSorter(array, 0, workSpace, 0, n));
166 }
167
168
169 /**
170 * Returns the sum of all elements
171 * @param ex the executor
172 * @param array the array
173 */
174 public static long sum(ForkJoinExecutor ex,
175 long[] array) {
176 int n = array.length;
177 FJSum r = new FJSum(array, 0, n-1, defaultGranularity(ex, n));
178 ex.invoke(r);
179 return r.result;
180 }
181
182 /**
183 * Returns the sum of all mapped elements
184 * @param ex the executor
185 * @param array the array
186 * @param mapper the mapper
187 */
188 public static long sum(ForkJoinExecutor ex,
189 long[] array,
190 LongTransformer mapper) {
191 int n = array.length;
192 FJTransformSum r =
193 new FJTransformSum(array, mapper, 0, n-1, defaultGranularity(ex, n));
194 ex.invoke(r);
195 return r.result;
196 }
197
198 /**
199 * Replaces each element with running cumulative sum.
200 * @param ex the executor
201 * @param array the array
202 * @return the sum of all elements
203 */
204 public static long cumulate(ForkJoinExecutor ex, long[] array) {
205 int n = array.length;
206 if (n == 0)
207 return 0;
208 if (n == 1)
209 return array[0];
210 int gran;
211 int threads = ex.getParallelismLevel();
212 if (threads == 1)
213 gran = n;
214 else
215 gran = n / (threads << 3);
216 if (gran < 1024)
217 gran = 1024;
218 FJCumulator.Ctl ctl = new FJCumulator.Ctl(array, gran);
219 FJCumulator r = new FJCumulator(null, ctl, 0, n);
220 ex.invoke(r);
221 return array[n-1];
222 }
223
224
225 /**
226 * Returns the minimum of all elements, or MAX_VALUE if empty
227 * @param ex the executor
228 * @param array the array
229 */
230 public static long min(ForkJoinExecutor ex,
231 long[] array) {
232 int n = array.length;
233 FJMin r = new FJMin(array, 0, n-1, defaultGranularity(ex, n));
234 ex.invoke(r);
235 return r.result;
236 }
237
238 /**
239 * Returns the maximum of all elements, or MIN_VALUE if empty
240 * @param ex the executor
241 * @param array the array
242 */
243 public static long max(ForkJoinExecutor ex,
244 long[] array) {
245 int n = array.length;
246 FJMax r = new FJMax(array, 0, n-1, defaultGranularity(ex, n));
247 ex.invoke(r);
248 return r.result;
249 }
250
251
252 /**
253 * Fork/Join version of apply
254 */
255 static final class FJApplyer extends RecursiveAction {
256 final long[] array;
257 final LongProcedure f;
258 final int lo;
259 final int hi;
260 final int gran;
261 FJApplyer next;
262
263 FJApplyer(long[] array, LongProcedure f, int lo, int hi, int gran){
264 this.array = array;
265 this.f = f;
266 this.lo = lo;
267 this.hi = hi;
268 this.gran = gran;
269 }
270
271 protected void compute() {
272 FJApplyer right = null;
273 int l = lo;
274 int h = hi;
275 int g = gran;
276 while (h - l > g) {
277 int mid = (l + h) >>> 1;
278 FJApplyer r = new FJApplyer(array, f, mid+1, h, g);
279 r.fork();
280 r.next = right;
281
282 right = r;
283 h = mid;
284 }
285 for (int i = l; i <= h; ++i)
286 f.apply(array[i]);
287 while (right != null) {
288 right.join();
289 right = right.next;
290 }
291 }
292 }
293
294 /**
295 * Fork/Join version of MapReduce
296 */
297 static final class FJMapReducer<T> extends RecursiveAction {
298 final List<T> list;
299 final MapperToLong<T> mapper;
300 final LongReducer reducer;
301 final long base;
302 final int lo;
303 final int hi;
304 final int gran;
305 long result;
306 FJMapReducer<T> next;
307
308 FJMapReducer(List<T> list,
309 MapperToLong<T> mapper,
310 LongReducer reducer,
311 long base,
312 int lo,
313 int hi,
314 int gran) {
315 this.list = list;
316 this.mapper = mapper;
317 this.reducer = reducer;
318 this.base = base;
319 this.lo = lo;
320 this.hi = hi;
321 this.gran = gran;
322 }
323
324
325 protected void compute() {
326 FJMapReducer<T> right = null;
327 int l = lo;
328 int h = hi;
329 int g = gran;
330 while (h - l > g) {
331 int mid = (l + h) >>> 1;
332 FJMapReducer<T> r =
333 new FJMapReducer<T>(list, mapper, reducer,
334 base, mid + 1, h, g);
335 r.fork();
336 r.next = right;
337
338 right = r;
339 h = mid;
340 }
341 long x = base;
342 for (int i = l; i <= h; ++i)
343 x = reducer.combine(x, mapper.map(list.get(i)));
344 while (right != null) {
345 right.join();
346 x = reducer.combine(x, right.result);
347 right = right.next;
348 }
349 result = x;
350 }
351 }
352
353 /**
354 * Fork/Join version of TransformReduce
355 */
356 static final class FJTransformReducer extends RecursiveAction {
357 final long[] array;
358 final LongTransformer mapper;
359 final LongReducer reducer;
360 final long base;
361 final int lo;
362 final int hi;
363 final int gran;
364 long result;
365 FJTransformReducer next;
366
367 FJTransformReducer(long[] array,
368 LongTransformer mapper,
369 LongReducer reducer,
370 long base,
371 int lo,
372 int hi,
373 int gran) {
374 this.array = array;
375 this.mapper = mapper;
376 this.reducer = reducer;
377 this.base = base;
378 this.lo = lo;
379 this.hi = hi;
380 this.gran = gran;
381 }
382
383 protected void compute() {
384 FJTransformReducer right = null;
385 int l = lo;
386 int h = hi;
387 int g = gran;
388 while (h - l > g) {
389 int mid = (l + h) >>> 1;
390 FJTransformReducer r =
391 new FJTransformReducer(array, mapper, reducer,
392 base, mid + 1, h, g);
393 r.fork();
394 r.next = right;
395
396 right = r;
397 h = mid;
398 }
399 long x = base;
400 for (int i = l; i <= h; ++i)
401 x = reducer.combine(x, mapper.map(array[i]));
402 while (right != null) {
403 right.join();
404 x = reducer.combine(x, right.result);
405 right = right.next;
406 }
407 result = x;
408 }
409
410 }
411
412 /**
413 * Fork/Join version of Map
414 */
415 static final class FJMapper extends RecursiveAction {
416 final long[] array;
417 final long[] dest;
418 final LongTransformer mapper;
419 final int lo;
420 final int hi;
421 final int gran;
422 FJMapper next;
423
424 FJMapper(long[] array,
425 long[] dest,
426 LongTransformer mapper,
427 int lo,
428 int hi,
429 int gran) {
430 this.array = array;
431 this.dest = dest;
432 this.mapper = mapper;
433 this.lo = lo;
434 this.hi = hi;
435 this.gran = gran;
436 }
437
438
439 protected void compute() {
440 FJMapper right = null;
441 int l = lo;
442 int h = hi;
443 int g = gran;
444 while (h - l > g) {
445 int mid = (l + h) >>> 1;
446 FJMapper r =
447 new FJMapper(array, dest, mapper, mid + 1, h, g);
448 r.fork();
449 r.next = right;
450
451 right = r;
452 h = mid;
453 }
454 for (int i = l; i <= h; ++i)
455 dest[i] = mapper.map(array[i]);
456 while (right != null) {
457 right.join();
458 right = right.next;
459 }
460 }
461 }
462
463 /**
464 * Fork/Join version of Reduce
465 */
466 static final class FJReducer extends RecursiveAction {
467 final long[] array;
468 final LongReducer reducer;
469 final long base;
470 final int lo;
471 final int hi;
472 final int gran;
473 long result;
474 FJReducer next;
475
476 FJReducer(long[] array,
477 LongReducer reducer,
478 long base,
479 int lo,
480 int hi,
481 int gran) {
482 this.array = array;
483 this.reducer = reducer;
484 this.base = base;
485 this.lo = lo;
486 this.hi = hi;
487 this.gran = gran;
488 }
489
490 protected void compute() {
491 FJReducer right = null;
492 int l = lo;
493 int h = hi;
494 int g = gran;
495 while (h - l > g) {
496 int mid = (l + h) >>> 1;
497 FJReducer r =
498 new FJReducer(array, reducer, base, mid + 1, h, g);
499 r.fork();
500 r.next = right;
501
502 right = r;
503 h = mid;
504 }
505 long x = base;
506 for (int i = l; i <= h; ++i)
507 x = reducer.combine(x, array[i]);
508 while (right != null) {
509 right.join();
510 x = reducer.combine(x, right.result);
511 right = right.next;
512 }
513 result = x;
514 }
515 }
516
517 /**
518 * Fork/Join version of sum
519 */
520 static final class FJSum extends RecursiveAction {
521 final long[] array;
522 final int lo;
523 final int hi;
524 final int gran;
525 long result;
526 FJSum next;
527
528 FJSum(long[] array,
529 int lo,
530 int hi,
531 int gran) {
532 this.array = array;
533 this.lo = lo;
534 this.hi = hi;
535 this.gran = gran;
536 }
537
538 protected void compute() {
539 FJSum right = null;
540 int l = lo;
541 int h = hi;
542 int g = gran;
543 while (h - l > g) {
544 int mid = (l + h) >>> 1;
545 FJSum r =
546 new FJSum(array, mid + 1, h, g);
547 r.fork();
548 r.next = right;
549
550 right = r;
551 h = mid;
552 }
553 long x = 0;
554 for (int i = l; i <= h; ++i)
555 x += array[i];
556 while (right != null) {
557 right.join();
558 x += right.result;
559 right = right.next;
560 }
561 result = x;
562 }
563 }
564
565 /**
566 * Fork/Join version of min
567 */
568 static final class FJMin extends RecursiveAction {
569 final long[] array;
570 final int lo;
571 final int hi;
572 final int gran;
573 long result;
574 FJMin next;
575
576 FJMin(long[] array,
577 int lo,
578 int hi,
579 int gran) {
580 this.array = array;
581 this.lo = lo;
582 this.hi = hi;
583 this.gran = gran;
584 }
585
586 protected void compute() {
587 FJMin right = null;
588 int l = lo;
589 int h = hi;
590 int g = gran;
591 while (h - l > g) {
592 int mid = (l + h) >>> 1;
593 FJMin r =
594 new FJMin(array, mid + 1, h, g);
595 r.fork();
596 r.next = right;
597
598 right = r;
599 h = mid;
600 }
601 long x = Long.MAX_VALUE;
602 for (int i = l; i <= h; ++i) {
603 long y = array[i];
604 if (y < x)
605 x = y;
606 }
607 while (right != null) {
608 right.join();
609 long y = right.result;
610 if (y < x)
611 x = y;
612 right = right.next;
613 }
614 result = x;
615 }
616 }
617
618 /**
619 * Fork/Join version of max
620 */
621 static final class FJMax extends RecursiveAction {
622 final long[] array;
623 final int lo;
624 final int hi;
625 final int gran;
626 long result;
627 FJMax next;
628
629 FJMax(long[] array,
630 int lo,
631 int hi,
632 int gran) {
633 this.array = array;
634 this.lo = lo;
635 this.hi = hi;
636 this.gran = gran;
637 }
638
639 protected void compute() {
640 FJMax right = null;
641 int l = lo;
642 int h = hi;
643 int g = gran;
644 while (h - l > g) {
645 int mid = (l + h) >>> 1;
646 FJMax r =
647 new FJMax(array, mid + 1, h, g);
648 r.fork();
649 r.next = right;
650
651 right = r;
652 h = mid;
653 }
654 long x = Long.MAX_VALUE;
655 for (int i = l; i <= h; ++i) {
656 long y = array[i];
657 if (y > x)
658 x = y;
659 }
660 while (right != null) {
661 right.join();
662 long y = right.result;
663 if (y > x)
664 x = y;
665 right = right.next;
666 }
667 result = x;
668 }
669 }
670
671 /**
672 * Fork/Join version of TransformSum
673 */
674 static final class FJTransformSum extends RecursiveAction {
675 final long[] array;
676 final LongTransformer mapper;
677 final int lo;
678 final int hi;
679 final int gran;
680 long result;
681 FJTransformSum next;
682
683 FJTransformSum(long[] array,
684 LongTransformer mapper,
685 int lo,
686 int hi,
687 int gran) {
688 this.array = array;
689 this.mapper = mapper;
690 this.lo = lo;
691 this.hi = hi;
692 this.gran = gran;
693 }
694
695 protected void compute() {
696 FJTransformSum right = null;
697 int l = lo;
698 int h = hi;
699 int g = gran;
700 while (h - l > g) {
701 int mid = (l + h) >>> 1;
702 FJTransformSum r =
703 new FJTransformSum(array, mapper, mid + 1, h, g);
704 r.fork();
705 r.next = right;
706
707 right = r;
708 h = mid;
709 }
710 long x = 0;
711 for (int i = l; i <= h; ++i)
712 x += mapper.map(array[i]);
713 while (right != null) {
714 right.join();
715 x += right.result;
716 right = right.next;
717 }
718 result = x;
719 }
720
721 }
722
723 /**
724 * Fork/Join version of scan
725 *
726 * A basic version of scan is straightforward.
727 * Keep dividing by two to threshold segment size, and then:
728 * Pass 1: Create tree of partial sums for each segment
729 * Pass 2: For each segment, cumulate with offset of left sibling
730 * See G. Blelloch's http://www.cs.cmu.edu/~scandal/alg/scan.html
731 *
732 * This version improves performance within FJ framework:
733 * a) It allows second pass of ready left-hand sides to proceed even
734 * if some right-hand side first passes are still executing.
735 * b) It collapses the first and second passes of segments for which
736 * incoming cumulations are ready before summing.
737 * c) It skips first pass for rightmost segment (whose
738 * result is not needed for second pass).
739 *
740 */
741 static final class FJCumulator extends AsyncAction {
742
743 /**
744 * Shared control across nodes
745 */
746 static final class Ctl {
747 final long[] array;
748 final int granularity;
749 /**
750 * The index of the max current consecutive
751 * cumulation starting from leftmost. Initially zero.
752 */
753 volatile int consecutiveIndex;
754 /**
755 * The current consecutive cumulation
756 */
757 long consecutiveSum;
758
759 Ctl(long[] array, int granularity) {
760 this.array = array;
761 this.granularity = granularity;
762 }
763 }
764
765 final FJCumulator parent;
766 final FJCumulator.Ctl ctl;
767 FJCumulator left, right;
768 final int lo;
769 final int hi;
770
771 /** Incoming cumulative sum */
772 long in;
773
774 /** Sum of this subtree */
775 long out;
776
777 /**
778 * Phase/state control, updated only via transitionTo, for
779 * CUMULATE, SUMMED, and FINISHED bits.
780 */
781 volatile int phase;
782
783 /**
784 * Phase bit. When false, segments compute only their sum.
785 * When true, they cumulate array elements. CUMULATE is set at
786 * root at beginning of second pass and then propagated
787 * down. But it may also be set earlier in two cases when
788 * cumulations are known to be ready: (1) For subtrees with
789 * lo==0 (the left spine of tree) (2) Leaf nodes with
790 * completed predecessors.
791 */
792 static final int CUMULATE = 1;
793
794 /**
795 * One bit join count. For leafs, set when summed. For
796 * internal nodes, becomes true when one child is summed.
797 * When second child finishes summing, it then moves up tree
798 * to trigger cumulate phase.
799 */
800 static final int SUMMED = 2;
801
802 /**
803 * One bit join count. For leafs, set when cumulated. For
804 * internal nodes, becomes true when one child is cumulated.
805 * When second child finishes cumulating, it then moves up
806 * tree, excecuting finish() at the root.
807 */
808 static final int FINISHED = 4;
809
810 static final AtomicIntegerFieldUpdater<FJCumulator> phaseUpdater =
811 AtomicIntegerFieldUpdater.newUpdater(FJCumulator.class, "phase");
812
813 FJCumulator(FJCumulator parent,
814 FJCumulator.Ctl ctl,
815 int lo,
816 int hi) {
817 this.parent = parent;
818 this.ctl = ctl;
819 this.lo = lo;
820 this.hi = hi;
821 }
822
823 public void compute() {
824 if (hi - lo <= ctl.granularity) {
825 int cb = establishLeafPhase();
826 leafSum(cb);
827 propagateLeafPhase(cb);
828 }
829 else
830 spawnTasks();
831 }
832
833 /**
834 * decide which leaf action to take - sum, cumulate, or both
835 * @return associated bit s
836 */
837 int establishLeafPhase() {
838 for (;;) {
839 int b = phase;
840 if ((b & FINISHED) != 0) // already done
841 return 0;
842 int cb;
843 if ((b & CUMULATE) != 0)
844 cb = FINISHED;
845 else if (lo == ctl.consecutiveIndex)
846 cb = (SUMMED|FINISHED);
847 else
848 cb = SUMMED;
849 if (phaseUpdater.compareAndSet(this, b, b|cb))
850 return cb;
851 }
852 }
853
854 void propagateLeafPhase(int cb) {
855 FJCumulator c = this;
856 FJCumulator p = parent;
857 for (;;) {
858 if (p == null) {
859 if ((cb & FINISHED) != 0)
860 c.finish();
861 break;
862 }
863 int pb = p.phase;
864 if ((pb & cb & FINISHED) != 0) { // both finished
865 c = p;
866 p = p.parent;
867 }
868 else if ((pb & cb & SUMMED) != 0) { // both summed
869 int refork = 0;
870 if ((pb & CUMULATE) == 0 && p.lo == 0)
871 refork = CUMULATE;
872 int next = pb|cb|refork;
873 if (pb == next ||
874 phaseUpdater.compareAndSet(p, pb, next)) {
875 if (refork != 0)
876 p.fork();
877 cb = SUMMED; // drop finished bit
878 c = p;
879 p = p.parent;
880 }
881 }
882 else if (phaseUpdater.compareAndSet(p, pb, pb|cb))
883 break;
884 }
885 }
886
887 void leafSum(int cb) {
888 long[] array = ctl.array;
889 if (cb == SUMMED) {
890 if (hi < array.length) { // skip rightmost
891 long sum = 0;
892 for (int i = lo; i < hi; ++i)
893 sum += array[i];
894 out = sum;
895 }
896 }
897 else if (cb == FINISHED) {
898 long sum = in;
899 for (int i = lo; i < hi; ++i)
900 sum = array[i] += sum;
901 }
902 else if (cb == (SUMMED|FINISHED)) {
903 long cin = ctl.consecutiveSum;
904 long sum = cin;
905 for (int i = lo; i < hi; ++i)
906 sum = array[i] += sum;
907 out = sum - cin;
908 ctl.consecutiveSum = sum;
909 ctl.consecutiveIndex = hi;
910 }
911 }
912
913 /**
914 * Returns true if can CAS CUMULATE bit true
915 */
916 boolean transitionToCumulate() {
917 int c;
918 while (((c = phase) & CUMULATE) == 0)
919 if (phaseUpdater.compareAndSet(this, c, c | CUMULATE))
920 return true;
921 return false;
922 }
923
924 void spawnTasks() {
925 if (left == null) {
926 int mid = (lo + hi) >>> 1;
927 left = new FJCumulator(this, ctl, lo, mid);
928 right = new FJCumulator(this, ctl, mid, hi);
929 }
930
931 boolean cumulate = (phase & CUMULATE) != 0;
932 if (cumulate) { // push down sums
933 long cin = in;
934 left.in = cin;
935 right.in = cin + left.out;
936 }
937
938 if (!cumulate || right.transitionToCumulate())
939 right.fork();
940 if (!cumulate || left.transitionToCumulate())
941 left.compute();
942 }
943
944 }
945
946 /**
947 * Fork/Join version of FindAny
948 */
949 static final class FJFindAny extends RecursiveAction {
950 final long[] array;
951 final LongPredicate pred;
952 final VolatileLong result;
953 final long missing;
954 final int lo;
955 final int hi;
956 final int gran;
957
958 FJFindAny(long[] array,
959 LongPredicate pred,
960 VolatileLong result,
961 long missing,
962 int lo,
963 int hi,
964 int gran) {
965 this.array = array;
966 this.pred = pred;
967 this.result = result;
968 this.missing = missing;
969 this.lo = lo;
970 this.hi = hi;
971 this.gran = gran;
972 }
973
974 void seqCompute() {
975 for (int i = lo; i <= hi; ++i) {
976 long x = array[i];
977 if (pred.evaluate(x) && result.value == missing) {
978 result.value = x;
979 break;
980 }
981 }
982 }
983
984 protected void compute() {
985 if (result.value != missing)
986 return;
987 if (hi - lo <= gran) {
988 seqCompute();
989 return;
990 }
991 int mid = (lo + hi) >>> 1;
992 FJFindAny left =
993 new FJFindAny(array, pred, result, missing, lo, mid, gran);
994 left.fork();
995 FJFindAny right =
996 new FJFindAny(array, pred, result, missing, mid + 1, hi, gran);
997 right.invoke();
998 if (result.value != missing)
999 left.cancel();
1000 else
1001 left.join();
1002 }
1003 }
1004
1005 /**
1006 * Fork/Join version of FindAll
1007 */
1008 static final class FJFindAll extends RecursiveAction {
1009 final long[] array;
1010 final LongPredicate pred;
1011 final List<Long> result;
1012 final int lo;
1013 final int hi;
1014 final int gran;
1015
1016 FJFindAll(long[] array,
1017 LongPredicate pred,
1018 List<Long> result,
1019 int lo,
1020 int hi,
1021 int gran) {
1022 this.array = array;
1023 this.pred = pred;
1024 this.result = result;
1025 this.lo = lo;
1026 this.hi = hi;
1027 this.gran = gran;
1028 }
1029
1030
1031 void seqCompute() {
1032 for (int i = lo; i <= hi; ++i) {
1033 long x = array[i];
1034 if (pred.evaluate(x))
1035 result.add(x);
1036 }
1037 }
1038
1039 protected void compute() {
1040 if (hi - lo <= gran) {
1041 seqCompute();
1042 return;
1043 }
1044 int mid = (lo + hi) >>> 1;
1045 FJFindAll left =
1046 new FJFindAll(array, pred, result, lo, mid, gran);
1047 FJFindAll right =
1048 new FJFindAll(array, pred, result, mid + 1, hi, gran);
1049 coInvoke(left, right);
1050 }
1051 }
1052
1053 /*
1054 * Sort algorithm based mainly on CilkSort
1055 * <A href="http://supertech.lcs.mit.edu/cilk/"> Cilk</A>:
1056 * if array size is small, just use a sequential quicksort
1057 * Otherwise:
1058 * 1. Break array in half.
1059 * 2. For each half,
1060 * a. break the half in half (i.e., quarters),
1061 * b. sort the quarters
1062 * c. merge them together
1063 * 3. merge together the two halves.
1064 *
1065 * One reason for splitting in quarters is that this guarantees
1066 * that the final sort is in the main array, not the workspace array.
1067 * (workspace and main swap roles on each subsort step.)
1068 *
1069 */
1070
1071 // Cutoff for when to do sequential versus parallel sorts and merges
1072 static final int SEQUENTIAL_THRESHOLD = 256; // == 16 * 16
1073 // Todo: check for #cpu sensitivity
1074
1075
1076 static class FJSorter extends RecursiveAction {
1077 final long[] a; // Array to be sorted.
1078 final int ao; // origin of the part of array we deal with
1079 final long[] w; // workspace array for merge
1080 final int wo; // its origin
1081 final int n; // Number of elements in (sub)arrays.
1082
1083 FJSorter (long[] a, int ao, long[] w, int wo, int n) {
1084 this.a = a; this.ao = ao; this.w = w; this.wo = wo; this.n = n;
1085 }
1086
1087 protected void compute() {
1088 if (n <= SEQUENTIAL_THRESHOLD)
1089 quickSort(a, ao, ao+n-1);
1090 else {
1091 int q = n >>> 2; // lower quarter index
1092 int h = n >>> 1; // half
1093 int u = h + q; // upper quarter
1094
1095 coInvoke(new SubSorter(new FJSorter(a, ao, w, wo, q),
1096 new FJSorter(a, ao+q, w, wo+q, q),
1097 new FJMerger(a, ao, q, ao+q, q,
1098 w, wo)),
1099 new SubSorter(new FJSorter(a, ao+h, w, wo+h, q),
1100 new FJSorter(a, ao+u, w, wo+u, n-u),
1101 new FJMerger(a, ao+h, q, ao+u, n-u,
1102 w, wo+h)));
1103 new FJMerger(w, wo, h, wo+h, n-h, a, ao).compute();
1104 }
1105 }
1106
1107 }
1108
1109 /**
1110 * A boring class to run two given sorts in parallel, then merge them.
1111 */
1112 static class SubSorter extends RecursiveAction {
1113 final FJSorter left;
1114 final FJSorter right;
1115 final FJMerger merger;
1116 SubSorter(FJSorter left, FJSorter right, FJMerger merger) {
1117 this.left = left; this.right = right; this.merger = merger;
1118 }
1119 protected void compute() {
1120 coInvoke(left, right);
1121 merger.invoke();
1122 }
1123 }
1124
1125 static class FJMerger extends RecursiveAction {
1126 final long[] a; // partitioned array.
1127 final int lo; // relative origin of left side
1128 final int ln; // number of elements on left
1129 final int ro; // relative origin of right side
1130 final int rn; // number of elements on right
1131
1132 final long[] w; // Output array.
1133 final int wo;
1134
1135 FJMerger (long[] a, int lo, int ln, int ro, int rn, long[] w, int wo) {
1136 this.a = a;
1137 this.w = w;
1138 this.wo = wo;
1139 // Left side should be largest of the two for fiding split.
1140 // Swap now, since left/right doesn't otherwise matter
1141 if (ln >= rn) {
1142 this.lo = lo; this.ln = ln;
1143 this.ro = ro; this.rn = rn;
1144 }
1145 else {
1146 this.lo = ro; this.ln = rn;
1147 this.ro = lo; this.rn = ln;
1148 }
1149 }
1150
1151 protected void compute() {
1152 /*
1153 If partiions are small, then just sequentially merge.
1154 Otherwise:
1155 1. Split Left partition in half.
1156 2. Find the greatest point in Right partition
1157 less than the beginning of the second half of left,
1158 via binary search.
1159 3. In parallel:
1160 merge left half of L with elements of R up to split point
1161 merge right half of L with elements of R past split point
1162 */
1163
1164 if (ln <= SEQUENTIAL_THRESHOLD)
1165 merge();
1166 else {
1167 int lh = ln >>> 1;
1168 int ls = lo + lh; // index of split
1169 long split = a[ls];
1170 int rl = 0;
1171 int rh = rn;
1172 while (rl < rh) {
1173 int mid = (rl + rh) >>> 1;
1174 if (split <= a[ro + mid])
1175 rh = mid;
1176 else
1177 rl = mid + 1;
1178 }
1179 coInvoke(new FJMerger(a, lo, lh, ro, rh, w, wo),
1180 new FJMerger(a, ls, ln-lh, ro+rh, rn-rh, w, wo+lh+rh));
1181 }
1182 }
1183
1184 /** a standard sequential merge */
1185 void merge() {
1186 int l = lo;
1187 int lFence = lo+ln;
1188 int r = ro;
1189 int rFence = ro+rn;
1190 int k = wo;
1191 while (l < lFence && r < rFence)
1192 w[k++] = (a[l] <= a[r])? a[l++] : a[r++];
1193 while (l < lFence)
1194 w[k++] = a[l++];
1195 while (r < rFence)
1196 w[k++] = a[r++];
1197 }
1198 }
1199
1200 // Cutoff for when to use insertion-sort instead of quicksort
1201 static final int INSERTION_SORT_THRESHOLD = 16;
1202
1203 /** A standard sequential quicksort */
1204 static void quickSort(long[] a, int lo, int hi) {
1205 // If under threshold, use insertion sort
1206 if (hi - lo <= INSERTION_SORT_THRESHOLD) {
1207 for (int i = lo + 1; i <= hi; i++) {
1208 long t = a[i];
1209 int j = i - 1;
1210 while (j >= lo && t < a[j]) {
1211 a[j+1] = a[j];
1212 --j;
1213 }
1214 a[j+1] = t;
1215 }
1216 return;
1217 }
1218
1219 // Use median-of-three(lo, mid, hi) to pick a partition.
1220 // Also swap them into relative order while we are at it.
1221 int mid = (lo + hi) >>> 1;
1222 if (a[lo] > a[mid]) {
1223 long t = a[lo]; a[lo] = a[mid]; a[mid] = t;
1224 }
1225 if (a[mid] > a[hi]) {
1226 long t = a[mid]; a[mid] = a[hi]; a[hi] = t;
1227 if (a[lo] > a[mid]) {
1228 t = a[lo]; a[lo] = a[mid]; a[mid] = t;
1229 }
1230 }
1231
1232 long pivot = a[mid];
1233 int left = lo+1;
1234 int right = hi-1;
1235 for (;;) {
1236 while (pivot < a[right])
1237 --right;
1238 while (left < right && pivot >= a[left])
1239 ++left;
1240 if (left < right) {
1241 long t = a[left]; a[left] = a[right]; a[right] = t;
1242 --right;
1243 }
1244 else break;
1245 }
1246 quickSort(a, lo, left);
1247 quickSort(a, left+1, hi);
1248 }
1249
1250
1251
1252 }