--- jsr166/src/test/tck/ExecutorCompletionServiceTest.java 2011/05/27 19:42:42 1.15 +++ jsr166/src/test/tck/ExecutorCompletionServiceTest.java 2021/01/26 13:33:06 1.30 @@ -6,146 +6,181 @@ * Pat Fisher, Mike Judd. */ -import junit.framework.*; -import java.util.*; -import java.util.concurrent.*; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import java.util.concurrent.atomic.*; -import java.math.BigInteger; -import java.security.*; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import junit.framework.Test; +import junit.framework.TestSuite; public class ExecutorCompletionServiceTest extends JSR166TestCase { public static void main(String[] args) { - junit.textui.TestRunner.run(suite()); + main(suite(), args); } public static Test suite() { return new TestSuite(ExecutorCompletionServiceTest.class); } /** - * Creating a new ECS with null Executor throw NPE + * new ExecutorCompletionService(null) throws NullPointerException */ public void testConstructorNPE() { try { - ExecutorCompletionService ecs = new ExecutorCompletionService(null); + new ExecutorCompletionService(null); shouldThrow(); } catch (NullPointerException success) {} } /** - * Creating a new ECS with null queue throw NPE + * new ExecutorCompletionService(e, null) throws NullPointerException */ public void testConstructorNPE2() { try { - ExecutorService e = Executors.newCachedThreadPool(); - ExecutorCompletionService ecs = new ExecutorCompletionService(e, null); + new ExecutorCompletionService(cachedThreadPool, null); shouldThrow(); } catch (NullPointerException success) {} } /** - * Submitting a null callable throws NPE + * ecs.submit(null) throws NullPointerException */ - public void testSubmitNPE() { - ExecutorService e = Executors.newCachedThreadPool(); - ExecutorCompletionService ecs = new ExecutorCompletionService(e); + public void testSubmitNullCallable() { + CompletionService cs = new ExecutorCompletionService(cachedThreadPool); try { - Callable c = null; - ecs.submit(c); + cs.submit((Callable) null); shouldThrow(); - } catch (NullPointerException success) { - } finally { - joinPool(e); - } + } catch (NullPointerException success) {} } /** - * Submitting a null runnable throws NPE + * ecs.submit(null, val) throws NullPointerException */ - public void testSubmitNPE2() { - ExecutorService e = Executors.newCachedThreadPool(); - ExecutorCompletionService ecs = new ExecutorCompletionService(e); + public void testSubmitNullRunnable() { + CompletionService cs = new ExecutorCompletionService(cachedThreadPool); try { - Runnable r = null; - ecs.submit(r, Boolean.TRUE); + cs.submit((Runnable) null, Boolean.TRUE); shouldThrow(); - } catch (NullPointerException success) { - } finally { - joinPool(e); - } + } catch (NullPointerException success) {} } /** * A taken submitted task is completed */ - public void testTake() throws InterruptedException { - ExecutorService e = Executors.newCachedThreadPool(); - ExecutorCompletionService ecs = new ExecutorCompletionService(e); - try { - Callable c = new StringTask(); - ecs.submit(c); - Future f = ecs.take(); - assertTrue(f.isDone()); - } finally { - joinPool(e); - } + public void testTake() throws Exception { + CompletionService cs = new ExecutorCompletionService(cachedThreadPool); + cs.submit(new StringTask()); + Future f = cs.take(); + assertTrue(f.isDone()); + assertSame(TEST_STRING, f.get()); } /** * Take returns the same future object returned by submit */ public void testTake2() throws InterruptedException { - ExecutorService e = Executors.newCachedThreadPool(); - ExecutorCompletionService ecs = new ExecutorCompletionService(e); - try { - Callable c = new StringTask(); - Future f1 = ecs.submit(c); - Future f2 = ecs.take(); - assertSame(f1, f2); - } finally { - joinPool(e); - } - } - - /** - * If poll returns non-null, the returned task is completed - */ - public void testPoll1() throws InterruptedException { - ExecutorService e = Executors.newCachedThreadPool(); - ExecutorCompletionService ecs = new ExecutorCompletionService(e); - try { - assertNull(ecs.poll()); - Callable c = new StringTask(); - ecs.submit(c); - delay(SHORT_DELAY_MS); - for (;;) { - Future f = ecs.poll(); - if (f != null) { - assertTrue(f.isDone()); - break; - } + CompletionService cs = new ExecutorCompletionService(cachedThreadPool); + Future f1 = cs.submit(new StringTask()); + Future f2 = cs.take(); + assertSame(f1, f2); + } + + /** + * poll returns non-null when the returned task is completed + */ + public void testPoll1() throws Exception { + CompletionService cs = new ExecutorCompletionService(cachedThreadPool); + assertNull(cs.poll()); + cs.submit(new StringTask()); + + long startTime = System.nanoTime(); + Future f; + while ((f = cs.poll()) == null) { + if (millisElapsedSince(startTime) > LONG_DELAY_MS) + fail("timed out"); + Thread.yield(); + } + assertTrue(f.isDone()); + assertSame(TEST_STRING, f.get()); + } + + /** + * timed poll returns non-null when the returned task is completed + */ + public void testPoll2() throws Exception { + CompletionService cs = new ExecutorCompletionService(cachedThreadPool); + assertNull(cs.poll()); + cs.submit(new StringTask()); + + long startTime = System.nanoTime(); + Future f; + while ((f = cs.poll(timeoutMillis(), MILLISECONDS)) == null) { + assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); + if (millisElapsedSince(startTime) > LONG_DELAY_MS) + fail("timed out"); + Thread.yield(); + } + assertTrue(f.isDone()); + assertSame(TEST_STRING, f.get()); + } + + /** + * poll returns null before the returned task is completed + */ + public void testPollReturnsNullBeforeCompletion() throws Exception { + CompletionService cs = new ExecutorCompletionService(cachedThreadPool); + final CountDownLatch proceed = new CountDownLatch(1); + cs.submit(new Callable() { public String call() throws Exception { + await(proceed); + return TEST_STRING; + }}); + assertNull(cs.poll()); + assertNull(cs.poll(0L, MILLISECONDS)); + assertNull(cs.poll(Long.MIN_VALUE, MILLISECONDS)); + long startTime = System.nanoTime(); + assertNull(cs.poll(timeoutMillis(), MILLISECONDS)); + assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); + proceed.countDown(); + assertSame(TEST_STRING, cs.take().get()); + } + + /** + * successful and failed tasks are both returned + */ + public void testTaskAssortment() throws Exception { + CompletionService cs = new ExecutorCompletionService(cachedThreadPool); + ArithmeticException ex = new ArithmeticException(); + final int rounds = 2; + for (int i = rounds; i--> 0; ) { + cs.submit(new StringTask()); + cs.submit(callableThrowing(ex)); + cs.submit(runnableThrowing(ex), null); + } + int normalCompletions = 0; + int exceptionalCompletions = 0; + for (int i = 3 * rounds; i--> 0; ) { + try { + assertSame(TEST_STRING, cs.take().get()); + normalCompletions++; + } catch (ExecutionException expected) { + assertSame(ex, expected.getCause()); + exceptionalCompletions++; } - } finally { - joinPool(e); - } - } - - /** - * If timed poll returns non-null, the returned task is completed - */ - public void testPoll2() throws InterruptedException { - ExecutorService e = Executors.newCachedThreadPool(); - ExecutorCompletionService ecs = new ExecutorCompletionService(e); - try { - assertNull(ecs.poll()); - Callable c = new StringTask(); - ecs.submit(c); - Future f = ecs.poll(SHORT_DELAY_MS, MILLISECONDS); - if (f != null) - assertTrue(f.isDone()); - } finally { - joinPool(e); } + assertEquals(1 * rounds, normalCompletions); + assertEquals(2 * rounds, exceptionalCompletions); + assertNull(cs.poll()); } /** @@ -156,27 +191,25 @@ public class ExecutorCompletionServiceTe final AtomicBoolean done = new AtomicBoolean(false); class MyCallableFuture extends FutureTask { MyCallableFuture(Callable c) { super(c); } - protected void done() { done.set(true); } + @Override protected void done() { done.set(true); } } - ExecutorService e = new ThreadPoolExecutor( - 1, 1, 30L, TimeUnit.SECONDS, - new ArrayBlockingQueue(1)) { - protected RunnableFuture newTaskFor(Callable c) { - return new MyCallableFuture(c); - }}; - ExecutorCompletionService ecs = - new ExecutorCompletionService(e); - try { - assertNull(ecs.poll()); + final ExecutorService e = + new ThreadPoolExecutor(1, 1, + 30L, TimeUnit.SECONDS, + new ArrayBlockingQueue(1)) { + protected RunnableFuture newTaskFor(Callable c) { + return new MyCallableFuture(c); + }}; + CompletionService cs = new ExecutorCompletionService<>(e); + try (PoolCleaner cleaner = cleaner(e)) { + assertNull(cs.poll()); Callable c = new StringTask(); - Future f1 = ecs.submit(c); + Future f1 = cs.submit(c); assertTrue("submit must return MyCallableFuture", f1 instanceof MyCallableFuture); - Future f2 = ecs.take(); + Future f2 = cs.take(); assertSame("submit and take must return same objects", f1, f2); assertTrue("completed task must have set done", done.get()); - } finally { - joinPool(e); } } @@ -188,27 +221,25 @@ public class ExecutorCompletionServiceTe final AtomicBoolean done = new AtomicBoolean(false); class MyRunnableFuture extends FutureTask { MyRunnableFuture(Runnable t, V r) { super(t, r); } - protected void done() { done.set(true); } + @Override protected void done() { done.set(true); } } - ExecutorService e = new ThreadPoolExecutor( - 1, 1, 30L, TimeUnit.SECONDS, - new ArrayBlockingQueue(1)) { - protected RunnableFuture newTaskFor(Runnable t, T r) { - return new MyRunnableFuture(t, r); - }}; - ExecutorCompletionService ecs = - new ExecutorCompletionService(e); - try { - assertNull(ecs.poll()); + final ExecutorService e = + new ThreadPoolExecutor(1, 1, + 30L, TimeUnit.SECONDS, + new ArrayBlockingQueue(1)) { + protected RunnableFuture newTaskFor(Runnable t, T r) { + return new MyRunnableFuture(t, r); + }}; + CompletionService cs = new ExecutorCompletionService<>(e); + try (PoolCleaner cleaner = cleaner(e)) { + assertNull(cs.poll()); Runnable r = new NoOpRunnable(); - Future f1 = ecs.submit(r, null); + Future f1 = cs.submit(r, null); assertTrue("submit must return MyRunnableFuture", f1 instanceof MyRunnableFuture); - Future f2 = ecs.take(); + Future f2 = cs.take(); assertSame("submit and take must return same objects", f1, f2); assertTrue("completed task must have set done", done.get()); - } finally { - joinPool(e); } }