/* * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at * http://creativecommons.org/publicdomain/zero/1.0/ * Other contributors include Andrew Wright, Jeffrey Hayes, * Pat Fisher, Mike Judd. */ import static java.util.concurrent.TimeUnit.MILLISECONDS; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.RunnableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import junit.framework.Test; import junit.framework.TestSuite; public class ExecutorCompletionServiceTest extends JSR166TestCase { public static void main(String[] args) { main(suite(), args); } public static Test suite() { return new TestSuite(ExecutorCompletionServiceTest.class); } /** * new ExecutorCompletionService(null) throws NullPointerException */ public void testConstructorNPE() { try { new ExecutorCompletionService(null); shouldThrow(); } catch (NullPointerException success) {} } /** * new ExecutorCompletionService(e, null) throws NullPointerException */ public void testConstructorNPE2() { try { new ExecutorCompletionService(cachedThreadPool, null); shouldThrow(); } catch (NullPointerException success) {} } /** * ecs.submit(null) throws NullPointerException */ public void testSubmitNullCallable() { CompletionService cs = new ExecutorCompletionService(cachedThreadPool); try { cs.submit((Callable) null); shouldThrow(); } catch (NullPointerException success) {} } /** * ecs.submit(null, val) throws NullPointerException */ public void testSubmitNullRunnable() { CompletionService cs = new ExecutorCompletionService(cachedThreadPool); try { cs.submit((Runnable) null, Boolean.TRUE); shouldThrow(); } catch (NullPointerException success) {} } /** * A taken submitted task is completed */ public void testTake() throws InterruptedException, ExecutionException { CompletionService cs = new ExecutorCompletionService(cachedThreadPool); cs.submit(new StringTask()); Future f = cs.take(); assertTrue(f.isDone()); assertSame(TEST_STRING, f.get()); } /** * Take returns the same future object returned by submit */ public void testTake2() throws InterruptedException { CompletionService cs = new ExecutorCompletionService(cachedThreadPool); Future f1 = cs.submit(new StringTask()); Future f2 = cs.take(); assertSame(f1, f2); } /** * poll returns non-null when the returned task is completed */ public void testPoll1() throws InterruptedException, ExecutionException { CompletionService cs = new ExecutorCompletionService(cachedThreadPool); assertNull(cs.poll()); cs.submit(new StringTask()); long startTime = System.nanoTime(); Future f; while ((f = cs.poll()) == null) { if (millisElapsedSince(startTime) > LONG_DELAY_MS) fail("timed out"); Thread.yield(); } assertTrue(f.isDone()); assertSame(TEST_STRING, f.get()); } /** * timed poll returns non-null when the returned task is completed */ public void testPoll2() throws InterruptedException, ExecutionException { CompletionService cs = new ExecutorCompletionService(cachedThreadPool); assertNull(cs.poll()); cs.submit(new StringTask()); long startTime = System.nanoTime(); Future f; while ((f = cs.poll(SHORT_DELAY_MS, MILLISECONDS)) == null) { if (millisElapsedSince(startTime) > LONG_DELAY_MS) fail("timed out"); Thread.yield(); } assertTrue(f.isDone()); assertSame(TEST_STRING, f.get()); } /** * poll returns null before the returned task is completed */ public void testPollReturnsNull() throws InterruptedException, ExecutionException { CompletionService cs = new ExecutorCompletionService(cachedThreadPool); final CountDownLatch proceed = new CountDownLatch(1); cs.submit(new Callable() { public String call() throws Exception { proceed.await(); return TEST_STRING; }}); assertNull(cs.poll()); assertNull(cs.poll(0L, MILLISECONDS)); assertNull(cs.poll(Long.MIN_VALUE, MILLISECONDS)); long startTime = System.nanoTime(); assertNull(cs.poll(timeoutMillis(), MILLISECONDS)); assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); proceed.countDown(); assertSame(TEST_STRING, cs.take().get()); } /** * successful and failed tasks are both returned */ public void testTaskAssortment() throws InterruptedException, ExecutionException { CompletionService cs = new ExecutorCompletionService(cachedThreadPool); ArithmeticException ex = new ArithmeticException(); for (int i = 0; i < 2; i++) { cs.submit(new StringTask()); cs.submit(callableThrowing(ex)); cs.submit(runnableThrowing(ex), null); } int normalCompletions = 0; int exceptionalCompletions = 0; for (int i = 0; i < 3 * 2; i++) { try { if (cs.take().get() == TEST_STRING) normalCompletions++; } catch (ExecutionException expected) { assertTrue(expected.getCause() instanceof ArithmeticException); exceptionalCompletions++; } } assertEquals(2 * 1, normalCompletions); assertEquals(2 * 2, exceptionalCompletions); assertNull(cs.poll()); } /** * Submitting to underlying AES that overrides newTaskFor(Callable) * returns and eventually runs Future returned by newTaskFor. */ public void testNewTaskForCallable() throws InterruptedException { final AtomicBoolean done = new AtomicBoolean(false); class MyCallableFuture extends FutureTask { MyCallableFuture(Callable c) { super(c); } @Override protected void done() { done.set(true); } } final ExecutorService e = new ThreadPoolExecutor(1, 1, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue(1)) { protected RunnableFuture newTaskFor(Callable c) { return new MyCallableFuture(c); }}; CompletionService cs = new ExecutorCompletionService<>(e); try (PoolCleaner cleaner = cleaner(e)) { assertNull(cs.poll()); Callable c = new StringTask(); Future f1 = cs.submit(c); assertTrue("submit must return MyCallableFuture", f1 instanceof MyCallableFuture); Future f2 = cs.take(); assertSame("submit and take must return same objects", f1, f2); assertTrue("completed task must have set done", done.get()); } } /** * Submitting to underlying AES that overrides newTaskFor(Runnable,T) * returns and eventually runs Future returned by newTaskFor. */ public void testNewTaskForRunnable() throws InterruptedException { final AtomicBoolean done = new AtomicBoolean(false); class MyRunnableFuture extends FutureTask { MyRunnableFuture(Runnable t, V r) { super(t, r); } @Override protected void done() { done.set(true); } } final ExecutorService e = new ThreadPoolExecutor(1, 1, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue(1)) { protected RunnableFuture newTaskFor(Runnable t, T r) { return new MyRunnableFuture(t, r); }}; CompletionService cs = new ExecutorCompletionService<>(e); try (PoolCleaner cleaner = cleaner(e)) { assertNull(cs.poll()); Runnable r = new NoOpRunnable(); Future f1 = cs.submit(r, null); assertTrue("submit must return MyRunnableFuture", f1 instanceof MyRunnableFuture); Future f2 = cs.take(); assertSame("submit and take must return same objects", f1, f2); assertTrue("completed task must have set done", done.get()); } } }