--- jsr166/src/test/tck/CompletableFutureTest.java 2014/06/17 20:50:01 1.92 +++ jsr166/src/test/tck/CompletableFutureTest.java 2015/09/03 11:45:34 1.103 @@ -5,35 +5,38 @@ * http://creativecommons.org/publicdomain/zero/1.0/ */ -import junit.framework.*; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; import java.util.concurrent.Callable; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.CancellationException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.TimeoutException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; -import java.util.*; -import java.util.function.Supplier; -import java.util.function.Consumer; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; -import java.util.function.Function; import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import junit.framework.Test; +import junit.framework.TestSuite; public class CompletableFutureTest extends JSR166TestCase { public static void main(String[] args) { - junit.textui.TestRunner.run(suite()); + main(suite(), args); } public static Test suite() { return new TestSuite(CompletableFutureTest.class); @@ -44,7 +47,7 @@ public class CompletableFutureTest exten void checkIncomplete(CompletableFuture f) { assertFalse(f.isDone()); assertFalse(f.isCancelled()); - assertTrue(f.toString().contains("[Not completed]")); + assertTrue(f.toString().contains("Not completed")); try { assertNull(f.getNow(null)); } catch (Throwable fail) { threadUnexpectedException(fail); } @@ -57,9 +60,8 @@ public class CompletableFutureTest exten } void checkCompletedNormally(CompletableFuture f, T value) { - try { - assertEquals(value, f.get(LONG_DELAY_MS, MILLISECONDS)); - } catch (Throwable fail) { threadUnexpectedException(fail); } + checkTimedGet(f, value); + try { assertEquals(value, f.join()); } catch (Throwable fail) { threadUnexpectedException(fail); } @@ -76,12 +78,16 @@ public class CompletableFutureTest exten } void checkCompletedWithWrappedCFException(CompletableFuture f) { + long startTime = System.nanoTime(); + long timeoutMillis = LONG_DELAY_MS; try { - f.get(LONG_DELAY_MS, MILLISECONDS); + f.get(timeoutMillis, MILLISECONDS); shouldThrow(); } catch (ExecutionException success) { assertTrue(success.getCause() instanceof CFException); } catch (Throwable fail) { threadUnexpectedException(fail); } + assertTrue(millisElapsedSince(startTime) < timeoutMillis/2); + try { f.join(); shouldThrow(); @@ -107,12 +113,16 @@ public class CompletableFutureTest exten void checkCompletedExceptionallyWithRootCause(CompletableFuture f, Throwable ex) { + long startTime = System.nanoTime(); + long timeoutMillis = LONG_DELAY_MS; try { - f.get(LONG_DELAY_MS, MILLISECONDS); + f.get(timeoutMillis, MILLISECONDS); shouldThrow(); } catch (ExecutionException success) { assertSame(ex, success.getCause()); } catch (Throwable fail) { threadUnexpectedException(fail); } + assertTrue(millisElapsedSince(startTime) < timeoutMillis/2); + try { f.join(); shouldThrow(); @@ -137,6 +147,43 @@ public class CompletableFutureTest exten assertTrue(f.toString().contains("[Completed exceptionally]")); } + void checkCompletedExceptionallyWithTimeout(CompletableFuture f) { + long startTime = System.nanoTime(); + long timeoutMillis = LONG_DELAY_MS; + try { + f.get(timeoutMillis, MILLISECONDS); + shouldThrow(); + } catch (ExecutionException ex) { + assertTrue(ex.getCause() instanceof TimeoutException); + } catch (Throwable fail) { threadUnexpectedException(fail); } + assertTrue(millisElapsedSince(startTime) < timeoutMillis/2); + + try { + f.join(); + shouldThrow(); + } catch (Throwable ex) { + assertTrue(ex.getCause() instanceof TimeoutException); + } + + try { + f.getNow(null); + shouldThrow(); + } catch (Throwable ex) { + assertTrue(ex.getCause() instanceof TimeoutException); + } + + try { + f.get(); + shouldThrow(); + } catch (ExecutionException ex) { + assertTrue(ex.getCause() instanceof TimeoutException); + } catch (Throwable fail) { threadUnexpectedException(fail); } + + assertTrue(f.isDone()); + assertFalse(f.isCancelled()); + assertTrue(f.toString().contains("[Completed exceptionally]")); + } + void checkCompletedWithWrappedException(CompletableFuture f, Throwable ex) { checkCompletedExceptionallyWithRootCause(f, ex); @@ -158,11 +205,15 @@ public class CompletableFutureTest exten } void checkCancelled(CompletableFuture f) { + long startTime = System.nanoTime(); + long timeoutMillis = LONG_DELAY_MS; try { - f.get(LONG_DELAY_MS, MILLISECONDS); + f.get(timeoutMillis, MILLISECONDS); shouldThrow(); } catch (CancellationException success) { } catch (Throwable fail) { threadUnexpectedException(fail); } + assertTrue(millisElapsedSince(startTime) < timeoutMillis/2); + try { f.join(); shouldThrow(); @@ -183,12 +234,16 @@ public class CompletableFutureTest exten } void checkCompletedWithWrappedCancellationException(CompletableFuture f) { + long startTime = System.nanoTime(); + long timeoutMillis = LONG_DELAY_MS; try { - f.get(LONG_DELAY_MS, MILLISECONDS); + f.get(timeoutMillis, MILLISECONDS); shouldThrow(); } catch (ExecutionException success) { assertTrue(success.getCause() instanceof CancellationException); } catch (Throwable fail) { threadUnexpectedException(fail); } + assertTrue(millisElapsedSince(startTime) < timeoutMillis/2); + try { f.join(); shouldThrow(); @@ -257,8 +312,9 @@ public class CompletableFutureTest exten { CompletableFuture f = new CompletableFuture<>(); checkIncomplete(f); - assertTrue(f.cancel(true)); - assertTrue(f.cancel(true)); + assertTrue(f.cancel(mayInterruptIfRunning)); + assertTrue(f.cancel(mayInterruptIfRunning)); + assertTrue(f.cancel(!mayInterruptIfRunning)); checkCancelled(f); }} @@ -530,7 +586,6 @@ public class CompletableFutureTest exten } } - class CompletableFutureInc extends CheckedIntegerAction implements Function> { @@ -569,12 +624,15 @@ public class CompletableFutureTest exten } } + static final boolean defaultExecutorIsCommonPool + = ForkJoinPool.getCommonPoolParallelism() > 1; + /** * Permits the testing of parallel code for the 3 different * execution modes without copy/pasting all the test methods. */ enum ExecutionMode { - DEFAULT { + SYNC { public void checkExecutionMode() { assertFalse(ThreadExecutor.startedCurrentThread()); assertNull(ForkJoinTask.getPool()); @@ -650,8 +708,8 @@ public class CompletableFutureTest exten ASYNC { public void checkExecutionMode() { - assertSame(ForkJoinPool.commonPool(), - ForkJoinTask.getPool()); + assertEquals(defaultExecutorIsCommonPool, + (ForkJoinPool.commonPool() == ForkJoinTask.getPool())); } public CompletableFuture runAsync(Runnable a) { return CompletableFuture.runAsync(a); @@ -875,7 +933,7 @@ public class CompletableFutureTest exten if (!createIncomplete) f.completeExceptionally(ex); final CompletableFuture g = f.exceptionally ((Throwable t) -> { - ExecutionMode.DEFAULT.checkExecutionMode(); + ExecutionMode.SYNC.checkExecutionMode(); threadAssertSame(t, ex); a.getAndIncrement(); return v1; @@ -888,7 +946,6 @@ public class CompletableFutureTest exten public void testExceptionally_exceptionalCompletionActionFailed() { for (boolean createIncomplete : new boolean[] { true, false }) - for (Integer v1 : new Integer[] { 1, null }) { final AtomicInteger a = new AtomicInteger(0); final CFException ex1 = new CFException(); @@ -897,7 +954,7 @@ public class CompletableFutureTest exten if (!createIncomplete) f.completeExceptionally(ex1); final CompletableFuture g = f.exceptionally ((Throwable t) -> { - ExecutionMode.DEFAULT.checkExecutionMode(); + ExecutionMode.SYNC.checkExecutionMode(); threadAssertSame(t, ex1); a.getAndIncrement(); throw ex2; @@ -942,7 +999,6 @@ public class CompletableFutureTest exten public void testWhenComplete_exceptionalCompletion() { for (ExecutionMode m : ExecutionMode.values()) for (boolean createIncomplete : new boolean[] { true, false }) - for (Integer v1 : new Integer[] { 1, null }) { final AtomicInteger a = new AtomicInteger(0); final CFException ex = new CFException(); @@ -1027,7 +1083,6 @@ public class CompletableFutureTest exten public void testWhenComplete_actionFailedSourceFailed() { for (boolean createIncomplete : new boolean[] { true, false }) for (ExecutionMode m : ExecutionMode.values()) - for (Integer v1 : new Integer[] { 1, null }) { final AtomicInteger a = new AtomicInteger(0); final CFException ex1 = new CFException(); @@ -1600,31 +1655,35 @@ public class CompletableFutureTest exten { final CompletableFuture f = new CompletableFuture<>(); final CompletableFuture g = new CompletableFuture<>(); - final SubtractFunction r1 = new SubtractFunction(m); - final SubtractFunction r2 = new SubtractFunction(m); - final SubtractFunction r3 = new SubtractFunction(m); + final SubtractFunction[] rs = new SubtractFunction[6]; + for (int i = 0; i < rs.length; i++) rs[i] = new SubtractFunction(m); final CompletableFuture fst = fFirst ? f : g; final CompletableFuture snd = !fFirst ? f : g; final Integer w1 = fFirst ? v1 : v2; final Integer w2 = !fFirst ? v1 : v2; - final CompletableFuture h1 = m.thenCombine(f, g, r1); + final CompletableFuture h0 = m.thenCombine(f, g, rs[0]); + final CompletableFuture h1 = m.thenCombine(fst, fst, rs[1]); assertTrue(fst.complete(w1)); - final CompletableFuture h2 = m.thenCombine(f, g, r2); - checkIncomplete(h1); - checkIncomplete(h2); - r1.assertNotInvoked(); - r2.assertNotInvoked(); + final CompletableFuture h2 = m.thenCombine(f, g, rs[2]); + final CompletableFuture h3 = m.thenCombine(fst, fst, rs[3]); + checkIncomplete(h0); rs[0].assertNotInvoked(); + checkIncomplete(h2); rs[2].assertNotInvoked(); + checkCompletedNormally(h1, subtract(w1, w1)); + checkCompletedNormally(h3, subtract(w1, w1)); + rs[1].assertValue(subtract(w1, w1)); + rs[3].assertValue(subtract(w1, w1)); assertTrue(snd.complete(w2)); - final CompletableFuture h3 = m.thenCombine(f, g, r3); + final CompletableFuture h4 = m.thenCombine(f, g, rs[4]); - checkCompletedNormally(h1, subtract(v1, v2)); + checkCompletedNormally(h0, subtract(v1, v2)); checkCompletedNormally(h2, subtract(v1, v2)); - checkCompletedNormally(h3, subtract(v1, v2)); - r1.assertValue(subtract(v1, v2)); - r2.assertValue(subtract(v1, v2)); - r3.assertValue(subtract(v1, v2)); + checkCompletedNormally(h4, subtract(v1, v2)); + rs[0].assertValue(subtract(v1, v2)); + rs[2].assertValue(subtract(v1, v2)); + rs[4].assertValue(subtract(v1, v2)); + checkCompletedNormally(f, v1); checkCompletedNormally(g, v2); }} @@ -2971,6 +3030,58 @@ public class CompletableFutureTest exten checkCancelled(f); }} + /** + * thenCompose result completes exceptionally if the result of the action does + */ + public void testThenCompose_actionReturnsFailingFuture() { + for (ExecutionMode m : ExecutionMode.values()) + for (int order = 0; order < 6; order++) + for (Integer v1 : new Integer[] { 1, null }) + { + final CFException ex = new CFException(); + final CompletableFuture f = new CompletableFuture<>(); + final CompletableFuture g = new CompletableFuture<>(); + final CompletableFuture h; + // Test all permutations of orders + switch (order) { + case 0: + assertTrue(f.complete(v1)); + assertTrue(g.completeExceptionally(ex)); + h = m.thenCompose(f, (x -> g)); + break; + case 1: + assertTrue(f.complete(v1)); + h = m.thenCompose(f, (x -> g)); + assertTrue(g.completeExceptionally(ex)); + break; + case 2: + assertTrue(g.completeExceptionally(ex)); + assertTrue(f.complete(v1)); + h = m.thenCompose(f, (x -> g)); + break; + case 3: + assertTrue(g.completeExceptionally(ex)); + h = m.thenCompose(f, (x -> g)); + assertTrue(f.complete(v1)); + break; + case 4: + h = m.thenCompose(f, (x -> g)); + assertTrue(f.complete(v1)); + assertTrue(g.completeExceptionally(ex)); + break; + case 5: + h = m.thenCompose(f, (x -> g)); + assertTrue(f.complete(v1)); + assertTrue(g.completeExceptionally(ex)); + break; + default: throw new AssertionError(); + } + + checkCompletedExceptionally(g, ex); + checkCompletedWithWrappedException(h, ex); + checkCompletedNormally(f, v1); + }} + // other static methods /** @@ -3138,13 +3249,12 @@ public class CompletableFutureTest exten CompletableFuture f = new CompletableFuture<>(); CompletableFuture g = new CompletableFuture<>(); CompletableFuture nullFuture = (CompletableFuture)null; - CompletableFuture h; ThreadExecutor exec = new ThreadExecutor(); Runnable[] throwingActions = { () -> CompletableFuture.supplyAsync(null), () -> CompletableFuture.supplyAsync(null, exec), - () -> CompletableFuture.supplyAsync(new IntegerSupplier(ExecutionMode.DEFAULT, 42), null), + () -> CompletableFuture.supplyAsync(new IntegerSupplier(ExecutionMode.SYNC, 42), null), () -> CompletableFuture.runAsync(null), () -> CompletableFuture.runAsync(null, exec), @@ -3223,7 +3333,6 @@ public class CompletableFutureTest exten () -> f.exceptionally(null), () -> f.handle(null), - () -> CompletableFuture.allOf((CompletableFuture)null), () -> CompletableFuture.allOf((CompletableFuture[])null), () -> CompletableFuture.allOf(f, null), @@ -3249,6 +3358,266 @@ public class CompletableFutureTest exten assertSame(f, f.toCompletableFuture()); } + // jdk9 + + /** + * newIncompleteFuture returns an incomplete CompletableFuture + */ + public void testNewIncompleteFuture() { + CompletableFuture f = new CompletableFuture<>(); + CompletableFuture g = f.newIncompleteFuture(); + checkIncomplete(f); + checkIncomplete(g); + } + + /** + * completedStage returns a completed CompletionStage + */ + public void testCompletedStage() { + AtomicInteger x = new AtomicInteger(); + AtomicReference r = new AtomicReference(); + CompletionStage f = CompletableFuture.completedStage(1); + f.whenComplete((v, e) -> {if (e != null) r.set(e); else x.set(v);}); + assertEquals(x.get(), 1); + assertNull(r.get()); + } + + /** + * defaultExecutor by default returns the commonPool if + * it supports at least one thread. + */ + public void testDefaultExecutor() { + CompletableFuture f = new CompletableFuture<>(); + Executor e = f.defaultExecutor(); + Executor c = ForkJoinPool.commonPool(); + if (ForkJoinPool.getCommonPoolParallelism() > 0) + assertSame(e, c); + } + + /** + * failedFuture returns a CompletableFuture completed + * exceptionally with the given Exception + */ + public void testFailedFuture() { + CFException ex = new CFException(); + CompletableFuture f = CompletableFuture.failedFuture(ex); + checkCompletedExceptionallyWithRootCause(f, ex); + } + + /** + * failedFuture(null) throws NPE + */ + public void testFailedFuture2() { + try { + CompletableFuture f = CompletableFuture.failedFuture(null); + } catch(NullPointerException success) { + } + } + + /** + * copy returns a CompletableFuture that is completed normally, + * with the same value, when source is. + */ + public void testCopy() { + CompletableFuture f = new CompletableFuture<>(); + CompletableFuture g = f.copy(); + checkIncomplete(f); + checkIncomplete(g); + f.complete(1); + checkCompletedNormally(f, 1); + checkCompletedNormally(g, 1); + } + + /** + * copy returns a CompletableFuture that is completed exceptionally + * when source is. + */ + public void testCopy2() { + CompletableFuture f = new CompletableFuture<>(); + CompletableFuture g = f.copy(); + checkIncomplete(f); + checkIncomplete(g); + CFException ex = new CFException(); + f.completeExceptionally(ex); + checkCompletedExceptionally(f, ex); + checkCompletedWithWrappedCFException(g); + } + + /** + * minimalCompletionStage returns a CompletableFuture that is + * completed normally, with the same value, when source is. + */ + public void testMinimalCompletionStage() { + CompletableFuture f = new CompletableFuture<>(); + CompletionStage g = f.minimalCompletionStage(); + AtomicInteger x = new AtomicInteger(); + AtomicReference r = new AtomicReference(); + checkIncomplete(f); + g.whenComplete((v, e) -> {if (e != null) r.set(e); else x.set(v);}); + f.complete(1); + checkCompletedNormally(f, 1); + assertEquals(x.get(), 1); + assertNull(r.get()); + } + + /** + * minimalCompletionStage returns a CompletableFuture that is + * completed exceptionally when source is. + */ + public void testMinimalCompletionStage2() { + CompletableFuture f = new CompletableFuture<>(); + CompletionStage g = f.minimalCompletionStage(); + AtomicInteger x = new AtomicInteger(); + AtomicReference r = new AtomicReference(); + g.whenComplete((v, e) -> {if (e != null) r.set(e); else x.set(v);}); + checkIncomplete(f); + CFException ex = new CFException(); + f.completeExceptionally(ex); + checkCompletedExceptionally(f, ex); + assertEquals(x.get(), 0); + assertEquals(r.get().getCause(), ex); + } + + /** + * failedStage returns a Completionstage completed + * exceptionally with the given Exception + */ + public void testFailedStage() { + CFException ex = new CFException(); + CompletionStage f = CompletableFuture.failedStage(ex); + AtomicInteger x = new AtomicInteger(); + AtomicReference r = new AtomicReference(); + f.whenComplete((v, e) -> {if (e != null) r.set(e); else x.set(v);}); + assertEquals(x.get(), 0); + assertEquals(r.get().getCause(), ex); + } + + /** + * completeAsync completes with value of given supplier + */ + public void testCompleteAsync() { + CompletableFuture f = new CompletableFuture<>(); + f.completeAsync(() -> 1); + f.join(); + checkCompletedNormally(f, 1); + } + + /** + * completeAsync completes exceptionally if given supplier throws + */ + public void testCompleteAsync2() { + CompletableFuture f = new CompletableFuture<>(); + CFException ex = new CFException(); + f.completeAsync(() -> {if (true) throw ex; return 1;}); + try { + f.join(); + shouldThrow(); + } catch(Exception success) { + } + checkCompletedWithWrappedCFException(f); + } + + /** + * completeAsync with given executor completes with value of given supplier + */ + public void testCompleteAsync3() { + CompletableFuture f = new CompletableFuture<>(); + f.completeAsync(() -> 1, new ThreadExecutor()); + f.join(); + checkCompletedNormally(f, 1); + } + + /** + * completeAsync with given executor completes exceptionally if + * given supplier throws + */ + public void testCompleteAsync4() { + CompletableFuture f = new CompletableFuture<>(); + CFException ex = new CFException(); + f.completeAsync(() -> {if (true) throw ex; return 1;}, new ThreadExecutor()); + try { + f.join(); + shouldThrow(); + } catch(Exception success) { + } + checkCompletedWithWrappedCFException(f); + } + + /** + * orTimeout completes with TimeoutException if not complete + */ + public void testOrTimeout() { + CompletableFuture f = new CompletableFuture<>(); + f.orTimeout(SHORT_DELAY_MS, TimeUnit.MILLISECONDS); + checkCompletedExceptionallyWithTimeout(f); + } + + /** + * orTimeout completes normally if completed before timeout + */ + public void testOrTimeout2() { + CompletableFuture f = new CompletableFuture<>(); + f.complete(1); + f.orTimeout(SHORT_DELAY_MS, TimeUnit.MILLISECONDS); + checkCompletedNormally(f, 1); + } + + /** + * completeOnTimeout completes with given value if not complete + */ + public void testCompleteOnTimeout() { + CompletableFuture f = new CompletableFuture<>(); + f.completeOnTimeout(-1, SHORT_DELAY_MS, TimeUnit.MILLISECONDS); + f.join(); + checkCompletedNormally(f, -1); + } + + /** + * completeOnTimeout has no effect if completed within timeout + */ + public void testCompleteOnTimeout2() { + CompletableFuture f = new CompletableFuture<>(); + f.complete(1); + f.completeOnTimeout(-1, SHORT_DELAY_MS, TimeUnit.MILLISECONDS); + checkCompletedNormally(f, 1); + } + + /** + * delayedExecutor returns an executor that delays submission + */ + public void testDelayedExecutor() { + long timeoutMillis = SMALL_DELAY_MS; + Executor d = CompletableFuture.delayedExecutor(timeoutMillis, + MILLISECONDS); + long startTime = System.nanoTime(); + CompletableFuture f = CompletableFuture.supplyAsync(() -> 1, d); + assertNull(f.getNow(null)); + try { + f.get(LONG_DELAY_MS, MILLISECONDS); + } catch (Throwable fail) { threadUnexpectedException(fail); } + assertTrue(millisElapsedSince(startTime) > timeoutMillis/2); + checkCompletedNormally(f, 1); + } + + /** + * delayedExecutor for a given executor returns an executor that + * delays submission + */ + public void testDelayedExecutor2() { + long timeoutMillis = SMALL_DELAY_MS; + Executor d = CompletableFuture.delayedExecutor(timeoutMillis, + MILLISECONDS, + new ThreadExecutor()); + long startTime = System.nanoTime(); + CompletableFuture f = CompletableFuture.supplyAsync(() -> 1, d); + assertNull(f.getNow(null)); + try { + f.get(LONG_DELAY_MS, MILLISECONDS); + } catch (Throwable fail) { threadUnexpectedException(fail); } + assertTrue(millisElapsedSince(startTime) > timeoutMillis/2); + checkCompletedNormally(f, 1); + } + //--- tests of implementation details; not part of official tck --- Object resultOf(CompletableFuture f) {