3 |
|
* Expert Group and released to the public domain, as explained at |
4 |
|
* http://creativecommons.org/publicdomain/zero/1.0/ |
5 |
|
*/ |
6 |
< |
import java.util.concurrent.ExecutionException; |
6 |
> |
|
7 |
> |
import static java.util.concurrent.TimeUnit.MILLISECONDS; |
8 |
> |
import static java.util.concurrent.TimeUnit.SECONDS; |
9 |
> |
|
10 |
> |
import java.util.HashSet; |
11 |
|
import java.util.concurrent.CancellationException; |
12 |
+ |
import java.util.concurrent.CountedCompleter; |
13 |
+ |
import java.util.concurrent.ExecutionException; |
14 |
|
import java.util.concurrent.ForkJoinPool; |
15 |
|
import java.util.concurrent.ForkJoinTask; |
16 |
< |
import java.util.concurrent.CountedCompleter; |
11 |
< |
import java.util.concurrent.ForkJoinWorkerThread; |
12 |
< |
import java.util.concurrent.RecursiveAction; |
13 |
< |
import java.util.concurrent.TimeUnit; |
16 |
> |
import java.util.concurrent.ThreadLocalRandom; |
17 |
|
import java.util.concurrent.TimeoutException; |
18 |
|
import java.util.concurrent.atomic.AtomicInteger; |
16 |
– |
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; |
19 |
|
import java.util.concurrent.atomic.AtomicReference; |
20 |
< |
import static java.util.concurrent.TimeUnit.MILLISECONDS; |
21 |
< |
import static java.util.concurrent.TimeUnit.SECONDS; |
22 |
< |
import java.util.HashSet; |
23 |
< |
import junit.framework.*; |
20 |
> |
import java.util.function.BiConsumer; |
21 |
> |
import java.util.function.Consumer; |
22 |
> |
|
23 |
> |
import junit.framework.Test; |
24 |
> |
import junit.framework.TestSuite; |
25 |
|
|
26 |
|
public class CountedCompleterTest extends JSR166TestCase { |
27 |
|
|
28 |
|
public static void main(String[] args) { |
29 |
< |
junit.textui.TestRunner.run(suite()); |
29 |
> |
main(suite(), args); |
30 |
|
} |
31 |
|
|
32 |
|
public static Test suite() { |
52 |
|
} |
53 |
|
|
54 |
|
private void testInvokeOnPool(ForkJoinPool pool, ForkJoinTask a) { |
55 |
< |
try { |
55 |
> |
try (PoolCleaner cleaner = cleaner(pool)) { |
56 |
|
assertFalse(a.isDone()); |
57 |
|
assertFalse(a.isCompletedNormally()); |
58 |
|
assertFalse(a.isCompletedAbnormally()); |
68 |
|
assertFalse(a.isCancelled()); |
69 |
|
assertNull(a.getException()); |
70 |
|
assertNull(a.getRawResult()); |
68 |
– |
} finally { |
69 |
– |
joinPool(pool); |
71 |
|
} |
72 |
|
} |
73 |
|
|
96 |
|
|
97 |
|
{ |
98 |
|
Thread.currentThread().interrupt(); |
99 |
< |
long t0 = System.nanoTime(); |
99 |
> |
long startTime = System.nanoTime(); |
100 |
|
assertNull(a.join()); |
101 |
< |
assertTrue(millisElapsedSince(t0) < SMALL_DELAY_MS); |
101 |
> |
assertTrue(millisElapsedSince(startTime) < SMALL_DELAY_MS); |
102 |
|
Thread.interrupted(); |
103 |
|
} |
104 |
|
|
105 |
|
{ |
106 |
|
Thread.currentThread().interrupt(); |
107 |
< |
long t0 = System.nanoTime(); |
107 |
> |
long startTime = System.nanoTime(); |
108 |
|
a.quietlyJoin(); // should be no-op |
109 |
< |
assertTrue(millisElapsedSince(t0) < SMALL_DELAY_MS); |
109 |
> |
assertTrue(millisElapsedSince(startTime) < SMALL_DELAY_MS); |
110 |
|
Thread.interrupted(); |
111 |
|
} |
112 |
|
|
139 |
|
Thread.interrupted(); |
140 |
|
|
141 |
|
{ |
142 |
< |
long t0 = System.nanoTime(); |
142 |
> |
long startTime = System.nanoTime(); |
143 |
|
a.quietlyJoin(); // should be no-op |
144 |
< |
assertTrue(millisElapsedSince(t0) < SMALL_DELAY_MS); |
144 |
> |
assertTrue(millisElapsedSince(startTime) < SMALL_DELAY_MS); |
145 |
|
} |
146 |
|
|
147 |
|
try { |
177 |
|
Thread.interrupted(); |
178 |
|
|
179 |
|
{ |
180 |
< |
long t0 = System.nanoTime(); |
180 |
> |
long startTime = System.nanoTime(); |
181 |
|
a.quietlyJoin(); // should be no-op |
182 |
< |
assertTrue(millisElapsedSince(t0) < SMALL_DELAY_MS); |
182 |
> |
assertTrue(millisElapsedSince(startTime) < SMALL_DELAY_MS); |
183 |
|
} |
184 |
|
|
185 |
|
try { |
199 |
|
try { |
200 |
|
a.invoke(); |
201 |
|
shouldThrow(); |
202 |
< |
} catch (Throwable ex) { |
203 |
< |
assertSame(t, ex); |
202 |
> |
} catch (Throwable success) { |
203 |
> |
assertSame(t, success); |
204 |
|
} |
205 |
|
} |
206 |
|
|
213 |
|
final AtomicInteger onCompletionN = new AtomicInteger(0); |
214 |
|
final AtomicInteger onExceptionalCompletionN = new AtomicInteger(0); |
215 |
|
final AtomicInteger setRawResultN = new AtomicInteger(0); |
216 |
< |
final AtomicReference<Object> rawResult = new AtomicReference<>(null); |
216 |
> |
final AtomicReference<Object> rawResult = new AtomicReference<Object>(null); |
217 |
|
int computeN() { return computeN.get(); } |
218 |
|
int onCompletionN() { return onCompletionN.get(); } |
219 |
|
int onExceptionalCompletionN() { return onExceptionalCompletionN.get(); } |
253 |
|
} |
254 |
|
void checkCompletes(Object rawResult) { |
255 |
|
checkIncomplete(); |
256 |
+ |
int pendingCount = getPendingCount(); |
257 |
|
complete(rawResult); |
258 |
+ |
assertEquals(pendingCount, getPendingCount()); |
259 |
|
assertEquals(0, computeN()); |
260 |
|
assertEquals(1, onCompletionN()); |
261 |
|
assertEquals(0, onExceptionalCompletionN()); |
281 |
|
final class NoopCC extends CheckedCC { |
282 |
|
NoopCC() { super(); } |
283 |
|
NoopCC(CountedCompleter p) { super(p); } |
284 |
+ |
NoopCC(CountedCompleter p, int initialPendingCount) { |
285 |
+ |
super(p, initialPendingCount); |
286 |
+ |
} |
287 |
|
protected void realCompute() {} |
288 |
|
} |
289 |
|
|
290 |
|
/** |
291 |
|
* A newly constructed CountedCompleter is not completed; |
292 |
< |
* complete() causes completion. |
292 |
> |
* complete() causes completion. pendingCount is ignored. |
293 |
|
*/ |
294 |
|
public void testComplete() { |
295 |
|
for (Object x : new Object[] { Boolean.TRUE, null }) { |
296 |
< |
new NoopCC() |
297 |
< |
.checkCompletes(x); |
298 |
< |
new NoopCC(new NoopCC()) |
299 |
< |
.checkCompletes(x); |
296 |
> |
for (int pendingCount : new int[] { 0, 42 }) { |
297 |
> |
testComplete(new NoopCC(), x, pendingCount); |
298 |
> |
testComplete(new NoopCC(new NoopCC()), x, pendingCount); |
299 |
> |
} |
300 |
|
} |
301 |
|
} |
302 |
+ |
void testComplete(NoopCC cc, Object x, int pendingCount) { |
303 |
+ |
cc.setPendingCount(pendingCount); |
304 |
+ |
cc.checkCompletes(x); |
305 |
+ |
assertEquals(pendingCount, cc.getPendingCount()); |
306 |
+ |
} |
307 |
|
|
308 |
|
/** |
309 |
|
* completeExceptionally completes exceptionally |
316 |
|
} |
317 |
|
|
318 |
|
/** |
319 |
< |
* completeExceptionally(null) throws NullPointerException |
319 |
> |
* completeExceptionally(null) surprisingly has the same effect as |
320 |
> |
* completeExceptionally(new RuntimeException()) |
321 |
|
*/ |
322 |
|
public void testCompleteExceptionally_null() { |
323 |
+ |
NoopCC a = new NoopCC(); |
324 |
+ |
a.completeExceptionally(null); |
325 |
|
try { |
326 |
< |
new NoopCC() |
313 |
< |
.checkCompletesExceptionally(null); |
326 |
> |
a.invoke(); |
327 |
|
shouldThrow(); |
328 |
< |
} catch (NullPointerException success) {} |
328 |
> |
} catch (RuntimeException success) { |
329 |
> |
assertSame(success.getClass(), RuntimeException.class); |
330 |
> |
assertNull(success.getCause()); |
331 |
> |
a.checkCompletedExceptionally(success); |
332 |
> |
} |
333 |
|
} |
334 |
|
|
335 |
|
/** |
338 |
|
public void testSetPendingCount() { |
339 |
|
NoopCC a = new NoopCC(); |
340 |
|
assertEquals(0, a.getPendingCount()); |
341 |
< |
a.setPendingCount(1); |
342 |
< |
assertEquals(1, a.getPendingCount()); |
343 |
< |
a.setPendingCount(27); |
344 |
< |
assertEquals(27, a.getPendingCount()); |
341 |
> |
int[] vals = { |
342 |
> |
-1, 0, 1, |
343 |
> |
Integer.MIN_VALUE, |
344 |
> |
Integer.MAX_VALUE, |
345 |
> |
}; |
346 |
> |
for (int val : vals) { |
347 |
> |
a.setPendingCount(val); |
348 |
> |
assertEquals(val, a.getPendingCount()); |
349 |
> |
} |
350 |
|
} |
351 |
|
|
352 |
|
/** |
359 |
|
assertEquals(1, a.getPendingCount()); |
360 |
|
a.addToPendingCount(27); |
361 |
|
assertEquals(28, a.getPendingCount()); |
362 |
+ |
a.addToPendingCount(-28); |
363 |
+ |
assertEquals(0, a.getPendingCount()); |
364 |
|
} |
365 |
|
|
366 |
|
/** |
367 |
|
* decrementPendingCountUnlessZero decrements reported pending |
368 |
|
* count unless zero |
369 |
|
*/ |
370 |
< |
public void testDecrementPendingCount() { |
371 |
< |
NoopCC a = new NoopCC(); |
372 |
< |
assertEquals(0, a.getPendingCount()); |
373 |
< |
a.addToPendingCount(1); |
370 |
> |
public void testDecrementPendingCountUnlessZero() { |
371 |
> |
NoopCC a = new NoopCC(null, 2); |
372 |
> |
assertEquals(2, a.getPendingCount()); |
373 |
> |
assertEquals(2, a.decrementPendingCountUnlessZero()); |
374 |
|
assertEquals(1, a.getPendingCount()); |
375 |
< |
a.decrementPendingCountUnlessZero(); |
375 |
> |
assertEquals(1, a.decrementPendingCountUnlessZero()); |
376 |
|
assertEquals(0, a.getPendingCount()); |
377 |
< |
a.decrementPendingCountUnlessZero(); |
377 |
> |
assertEquals(0, a.decrementPendingCountUnlessZero()); |
378 |
|
assertEquals(0, a.getPendingCount()); |
379 |
+ |
a.setPendingCount(-1); |
380 |
+ |
assertEquals(-1, a.decrementPendingCountUnlessZero()); |
381 |
+ |
assertEquals(-2, a.getPendingCount()); |
382 |
|
} |
383 |
|
|
384 |
|
/** |
502 |
|
} |
503 |
|
|
504 |
|
/** |
505 |
< |
* quietlyCompleteRoot completes root task |
505 |
> |
* quietlyCompleteRoot completes root task and only root task |
506 |
|
*/ |
507 |
|
public void testQuietlyCompleteRoot() { |
508 |
|
NoopCC a = new NoopCC(); |
520 |
|
// Invocation tests use some interdependent task classes |
521 |
|
// to better test propagation etc |
522 |
|
|
523 |
< |
|
524 |
< |
// Version of Fibonacci with different classes for left vs right forks |
523 |
> |
/** |
524 |
> |
* Version of Fibonacci with different classes for left vs right forks |
525 |
> |
*/ |
526 |
|
abstract class CCF extends CheckedCC { |
527 |
|
int number; |
528 |
|
int rnumber; |
1158 |
|
} |
1159 |
|
|
1160 |
|
/** |
1161 |
< |
* invokeAll(collection) throws exception if any task does |
1161 |
> |
* invokeAll(collection) throws exception if any task does |
1162 |
|
*/ |
1163 |
|
public void testAbnormalInvokeAllCollection() { |
1164 |
|
ForkJoinTask a = new CheckedRecursiveAction() { |
1823 |
|
} |
1824 |
|
|
1825 |
|
/** |
1826 |
< |
* invokeAll(collection) throws exception if any task does |
1826 |
> |
* invokeAll(collection) throws exception if any task does |
1827 |
|
*/ |
1828 |
|
public void testAbnormalInvokeAllCollectionSingleton() { |
1829 |
|
ForkJoinTask a = new CheckedRecursiveAction() { |
1845 |
|
testInvokeOnPool(singletonPool(), a); |
1846 |
|
} |
1847 |
|
|
1848 |
+ |
/** CountedCompleter class javadoc code sample, version 1. */ |
1849 |
+ |
public static <E> void forEach1(E[] array, Consumer<E> action) { |
1850 |
+ |
class Task extends CountedCompleter<Void> { |
1851 |
+ |
final int lo, hi; |
1852 |
+ |
Task(Task parent, int lo, int hi) { |
1853 |
+ |
super(parent); this.lo = lo; this.hi = hi; |
1854 |
+ |
} |
1855 |
+ |
|
1856 |
+ |
public void compute() { |
1857 |
+ |
if (hi - lo >= 2) { |
1858 |
+ |
int mid = (lo + hi) >>> 1; |
1859 |
+ |
// must set pending count before fork |
1860 |
+ |
setPendingCount(2); |
1861 |
+ |
new Task(this, mid, hi).fork(); // right child |
1862 |
+ |
new Task(this, lo, mid).fork(); // left child |
1863 |
+ |
} |
1864 |
+ |
else if (hi > lo) |
1865 |
+ |
action.accept(array[lo]); |
1866 |
+ |
tryComplete(); |
1867 |
+ |
} |
1868 |
+ |
} |
1869 |
+ |
new Task(null, 0, array.length).invoke(); |
1870 |
+ |
} |
1871 |
+ |
|
1872 |
+ |
/** CountedCompleter class javadoc code sample, version 2. */ |
1873 |
+ |
public static <E> void forEach2(E[] array, Consumer<E> action) { |
1874 |
+ |
class Task extends CountedCompleter<Void> { |
1875 |
+ |
final int lo, hi; |
1876 |
+ |
Task(Task parent, int lo, int hi) { |
1877 |
+ |
super(parent); this.lo = lo; this.hi = hi; |
1878 |
+ |
} |
1879 |
+ |
|
1880 |
+ |
public void compute() { |
1881 |
+ |
if (hi - lo >= 2) { |
1882 |
+ |
int mid = (lo + hi) >>> 1; |
1883 |
+ |
setPendingCount(1); // looks off by one, but correct! |
1884 |
+ |
new Task(this, mid, hi).fork(); // right child |
1885 |
+ |
new Task(this, lo, mid).compute(); // direct invoke |
1886 |
+ |
} else { |
1887 |
+ |
if (hi > lo) |
1888 |
+ |
action.accept(array[lo]); |
1889 |
+ |
tryComplete(); |
1890 |
+ |
} |
1891 |
+ |
} |
1892 |
+ |
} |
1893 |
+ |
new Task(null, 0, array.length).invoke(); |
1894 |
+ |
} |
1895 |
+ |
|
1896 |
+ |
/** CountedCompleter class javadoc code sample, version 3. */ |
1897 |
+ |
public static <E> void forEach3(E[] array, Consumer<E> action) { |
1898 |
+ |
class Task extends CountedCompleter<Void> { |
1899 |
+ |
final int lo, hi; |
1900 |
+ |
Task(Task parent, int lo, int hi) { |
1901 |
+ |
super(parent); this.lo = lo; this.hi = hi; |
1902 |
+ |
} |
1903 |
+ |
|
1904 |
+ |
public void compute() { |
1905 |
+ |
int n = hi - lo; |
1906 |
+ |
for (; n >= 2; n /= 2) { |
1907 |
+ |
addToPendingCount(1); |
1908 |
+ |
new Task(this, lo + n/2, lo + n).fork(); |
1909 |
+ |
} |
1910 |
+ |
if (n > 0) |
1911 |
+ |
action.accept(array[lo]); |
1912 |
+ |
propagateCompletion(); |
1913 |
+ |
} |
1914 |
+ |
} |
1915 |
+ |
new Task(null, 0, array.length).invoke(); |
1916 |
+ |
} |
1917 |
+ |
|
1918 |
+ |
/** CountedCompleter class javadoc code sample, version 4. */ |
1919 |
+ |
public static <E> void forEach4(E[] array, Consumer<E> action) { |
1920 |
+ |
class Task extends CountedCompleter<Void> { |
1921 |
+ |
final int lo, hi; |
1922 |
+ |
Task(Task parent, int lo, int hi) { |
1923 |
+ |
super(parent, 31 - Integer.numberOfLeadingZeros(hi - lo)); |
1924 |
+ |
this.lo = lo; this.hi = hi; |
1925 |
+ |
} |
1926 |
+ |
|
1927 |
+ |
public void compute() { |
1928 |
+ |
for (int n = hi - lo; n >= 2; n /= 2) |
1929 |
+ |
new Task(this, lo + n/2, lo + n).fork(); |
1930 |
+ |
action.accept(array[lo]); |
1931 |
+ |
propagateCompletion(); |
1932 |
+ |
} |
1933 |
+ |
} |
1934 |
+ |
if (array.length > 0) |
1935 |
+ |
new Task(null, 0, array.length).invoke(); |
1936 |
+ |
} |
1937 |
+ |
|
1938 |
+ |
void testRecursiveDecomposition( |
1939 |
+ |
BiConsumer<Integer[], Consumer<Integer>> action) { |
1940 |
+ |
int n = ThreadLocalRandom.current().nextInt(8); |
1941 |
+ |
Integer[] a = new Integer[n]; |
1942 |
+ |
for (int i = 0; i < n; i++) a[i] = i + 1; |
1943 |
+ |
AtomicInteger ai = new AtomicInteger(0); |
1944 |
+ |
action.accept(a, (x) -> ai.addAndGet(x)); |
1945 |
+ |
assertEquals(n * (n + 1) / 2, ai.get()); |
1946 |
+ |
} |
1947 |
+ |
|
1948 |
+ |
/** |
1949 |
+ |
* Variants of divide-by-two recursive decomposition into leaf tasks, |
1950 |
+ |
* as described in the CountedCompleter class javadoc code samples |
1951 |
+ |
*/ |
1952 |
+ |
public void testRecursiveDecomposition() { |
1953 |
+ |
testRecursiveDecomposition(CountedCompleterTest::forEach1); |
1954 |
+ |
testRecursiveDecomposition(CountedCompleterTest::forEach2); |
1955 |
+ |
testRecursiveDecomposition(CountedCompleterTest::forEach3); |
1956 |
+ |
testRecursiveDecomposition(CountedCompleterTest::forEach4); |
1957 |
+ |
} |
1958 |
+ |
|
1959 |
|
} |