--- jsr166/src/test/tck/ThreadPoolExecutorTest.java 2017/03/20 00:21:54 1.115 +++ jsr166/src/test/tck/ThreadPoolExecutorTest.java 2019/09/06 18:43:35 1.126 @@ -11,6 +11,8 @@ import static java.util.concurrent.TimeU import static java.util.concurrent.TimeUnit.SECONDS; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -26,8 +28,14 @@ import java.util.concurrent.RejectedExec import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor.AbortPolicy; +import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; +import java.util.concurrent.ThreadPoolExecutor.DiscardPolicy; +import java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import junit.framework.Test; import junit.framework.TestSuite; @@ -90,7 +98,7 @@ public class ThreadPoolExecutorTest exte final Runnable task = new CheckedRunnable() { public void realRun() { done.countDown(); }}; p.execute(task); - assertTrue(done.await(LONG_DELAY_MS, MILLISECONDS)); + await(done); } } @@ -184,13 +192,13 @@ public class ThreadPoolExecutorTest exte public void realRun() throws InterruptedException { threadStarted.countDown(); assertEquals(0, p.getCompletedTaskCount()); - threadProceed.await(); + await(threadProceed); threadDone.countDown(); }}); await(threadStarted); assertEquals(0, p.getCompletedTaskCount()); threadProceed.countDown(); - threadDone.await(); + await(threadDone); long startTime = System.nanoTime(); while (p.getCompletedTaskCount() != 1) { if (millisElapsedSince(startTime) > LONG_DELAY_MS) @@ -274,6 +282,19 @@ public class ThreadPoolExecutorTest exte } /** + * The default rejected execution handler is AbortPolicy. + */ + public void testDefaultRejectedExecutionHandler() { + final ThreadPoolExecutor p = + new ThreadPoolExecutor(1, 2, + LONG_DELAY_MS, MILLISECONDS, + new ArrayBlockingQueue(10)); + try (PoolCleaner cleaner = cleaner(p)) { + assertTrue(p.getRejectedExecutionHandler() instanceof AbortPolicy); + } + } + + /** * getRejectedExecutionHandler returns handler in constructor if not set */ public void testGetRejectedExecutionHandler() { @@ -456,8 +477,8 @@ public class ThreadPoolExecutorTest exte assertFalse(p.awaitTermination(Long.MIN_VALUE, MILLISECONDS)); assertFalse(p.awaitTermination(-1L, NANOSECONDS)); assertFalse(p.awaitTermination(-1L, MILLISECONDS)); - assertFalse(p.awaitTermination(0L, NANOSECONDS)); - assertFalse(p.awaitTermination(0L, MILLISECONDS)); + assertFalse(p.awaitTermination(randomExpiredTimeout(), + randomTimeUnit())); long timeoutNanos = 999999L; long startTime = System.nanoTime(); assertFalse(p.awaitTermination(timeoutNanos, NANOSECONDS)); @@ -651,7 +672,7 @@ public class ThreadPoolExecutorTest exte Runnable waiter = new CheckedRunnable() { public void realRun() { threadsStarted.countDown(); try { - MILLISECONDS.sleep(2 * LONG_DELAY_MS); + MILLISECONDS.sleep(LONGER_DELAY_MS); } catch (InterruptedException success) {} ran.getAndIncrement(); }}; @@ -1038,149 +1059,76 @@ public class ThreadPoolExecutorTest exte p.submit(task).get(); }}); - await(threadStarted); + await(threadStarted); // ensure quiescence t.interrupt(); awaitTermination(t); } } /** - * execute throws RejectedExecutionException if saturated. + * Submitted tasks are rejected when saturated or shutdown */ - public void testSaturatedExecute() { + public void testSubmittedTasksRejectedWhenSaturatedOrShutdown() throws InterruptedException { + final ThreadPoolExecutor p = new ThreadPoolExecutor( + 1, 1, 1, SECONDS, new ArrayBlockingQueue(1)); + final int saturatedSize = saturatedSize(p); + final ThreadLocalRandom rnd = ThreadLocalRandom.current(); + final CountDownLatch threadsStarted = new CountDownLatch(p.getMaximumPoolSize()); final CountDownLatch done = new CountDownLatch(1); - final ThreadPoolExecutor p = - new ThreadPoolExecutor(1, 1, - LONG_DELAY_MS, MILLISECONDS, - new ArrayBlockingQueue(1)); - try (PoolCleaner cleaner = cleaner(p, done)) { - Runnable task = new CheckedRunnable() { - public void realRun() throws InterruptedException { - await(done); - }}; - for (int i = 0; i < 2; ++i) - p.execute(task); - for (int i = 0; i < 2; ++i) { + final Runnable r = () -> { + threadsStarted.countDown(); + for (;;) { try { - p.execute(task); - shouldThrow(); - } catch (RejectedExecutionException success) {} - assertTrue(p.getTaskCount() <= 2); - } - } - } - - /** - * submit(runnable) throws RejectedExecutionException if saturated. - */ - public void testSaturatedSubmitRunnable() { - final CountDownLatch done = new CountDownLatch(1); - final ThreadPoolExecutor p = - new ThreadPoolExecutor(1, 1, - LONG_DELAY_MS, MILLISECONDS, - new ArrayBlockingQueue(1)); - try (PoolCleaner cleaner = cleaner(p, done)) { - Runnable task = new CheckedRunnable() { - public void realRun() throws InterruptedException { - await(done); - }}; - for (int i = 0; i < 2; ++i) - p.submit(task); - for (int i = 0; i < 2; ++i) { + done.await(); + return; + } catch (InterruptedException shutdownNowDeliberatelyIgnored) {} + }}; + final Callable c = () -> { + threadsStarted.countDown(); + for (;;) { try { - p.execute(task); - shouldThrow(); - } catch (RejectedExecutionException success) {} - assertTrue(p.getTaskCount() <= 2); - } - } - } + done.await(); + return Boolean.TRUE; + } catch (InterruptedException shutdownNowDeliberatelyIgnored) {} + }}; + final boolean shutdownNow = rnd.nextBoolean(); - /** - * submit(callable) throws RejectedExecutionException if saturated. - */ - public void testSaturatedSubmitCallable() { - final CountDownLatch done = new CountDownLatch(1); - final ThreadPoolExecutor p = - new ThreadPoolExecutor(1, 1, - LONG_DELAY_MS, MILLISECONDS, - new ArrayBlockingQueue(1)); try (PoolCleaner cleaner = cleaner(p, done)) { - Runnable task = new CheckedRunnable() { - public void realRun() throws InterruptedException { - await(done); - }}; - for (int i = 0; i < 2; ++i) - p.execute(task); - for (int i = 0; i < 2; ++i) { - try { - p.execute(task); - shouldThrow(); - } catch (RejectedExecutionException success) {} - assertTrue(p.getTaskCount() <= 2); + // saturate + for (int i = saturatedSize; i--> 0; ) { + switch (rnd.nextInt(4)) { + case 0: p.execute(r); break; + case 1: assertFalse(p.submit(r).isDone()); break; + case 2: assertFalse(p.submit(r, Boolean.TRUE).isDone()); break; + case 3: assertFalse(p.submit(c).isDone()); break; + } } - } - } - /** - * executor using CallerRunsPolicy runs task if saturated. - */ - public void testSaturatedExecute2() { - final ThreadPoolExecutor p = - new ThreadPoolExecutor(1, 1, - LONG_DELAY_MS, - MILLISECONDS, - new ArrayBlockingQueue(1), - new ThreadPoolExecutor.CallerRunsPolicy()); - try (PoolCleaner cleaner = cleaner(p)) { - final CountDownLatch done = new CountDownLatch(1); - Runnable blocker = new CheckedRunnable() { - public void realRun() throws InterruptedException { - await(done); - }}; - p.execute(blocker); - TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5]; - for (int i = 0; i < tasks.length; i++) - tasks[i] = new TrackedNoOpRunnable(); - for (int i = 0; i < tasks.length; i++) - p.execute(tasks[i]); - for (int i = 1; i < tasks.length; i++) - assertTrue(tasks[i].done); - assertFalse(tasks[0].done); // waiting in queue - done.countDown(); - } - } + await(threadsStarted); + assertTaskSubmissionsAreRejected(p); - /** - * executor using DiscardPolicy drops task if saturated. - */ - public void testSaturatedExecute3() { - final CountDownLatch done = new CountDownLatch(1); - final TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5]; - for (int i = 0; i < tasks.length; ++i) - tasks[i] = new TrackedNoOpRunnable(); - final ThreadPoolExecutor p = - new ThreadPoolExecutor(1, 1, - LONG_DELAY_MS, MILLISECONDS, - new ArrayBlockingQueue(1), - new ThreadPoolExecutor.DiscardPolicy()); - try (PoolCleaner cleaner = cleaner(p, done)) { - p.execute(awaiter(done)); + if (shutdownNow) + p.shutdownNow(); + else + p.shutdown(); + // Pool is shutdown, but not yet terminated + assertTaskSubmissionsAreRejected(p); + assertFalse(p.isTerminated()); - for (TrackedNoOpRunnable task : tasks) - p.execute(task); - for (int i = 1; i < tasks.length; i++) - assertFalse(tasks[i].done); - } - for (int i = 1; i < tasks.length; i++) - assertFalse(tasks[i].done); - assertTrue(tasks[0].done); // was waiting in queue + done.countDown(); // release blocking tasks + assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS)); + + assertTaskSubmissionsAreRejected(p); + } + assertEquals(saturatedSize(p) + - (shutdownNow ? p.getQueue().remainingCapacity() : 0), + p.getCompletedTaskCount()); } /** * executor using DiscardOldestPolicy drops oldest task if saturated. */ - public void testSaturatedExecute4() { + public void testSaturatedExecute_DiscardOldestPolicy() { final CountDownLatch done = new CountDownLatch(1); LatchAwaiter r1 = awaiter(done); LatchAwaiter r2 = awaiter(done); @@ -1189,7 +1137,7 @@ public class ThreadPoolExecutorTest exte new ThreadPoolExecutor(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue(1), - new ThreadPoolExecutor.DiscardOldestPolicy()); + new DiscardOldestPolicy()); try (PoolCleaner cleaner = cleaner(p, done)) { assertEquals(LatchAwaiter.NEW, r1.state); assertEquals(LatchAwaiter.NEW, r2.state); @@ -1207,59 +1155,6 @@ public class ThreadPoolExecutorTest exte } /** - * execute throws RejectedExecutionException if shutdown - */ - public void testRejectedExecutionExceptionOnShutdown() { - final ThreadPoolExecutor p = - new ThreadPoolExecutor(1, 1, - LONG_DELAY_MS, MILLISECONDS, - new ArrayBlockingQueue(1)); - try { p.shutdown(); } catch (SecurityException ok) { return; } - try (PoolCleaner cleaner = cleaner(p)) { - try { - p.execute(new NoOpRunnable()); - shouldThrow(); - } catch (RejectedExecutionException success) {} - } - } - - /** - * execute using CallerRunsPolicy drops task on shutdown - */ - public void testCallerRunsOnShutdown() { - RejectedExecutionHandler h = new ThreadPoolExecutor.CallerRunsPolicy(); - final ThreadPoolExecutor p = - new ThreadPoolExecutor(1, 1, - LONG_DELAY_MS, MILLISECONDS, - new ArrayBlockingQueue(1), h); - - try { p.shutdown(); } catch (SecurityException ok) { return; } - try (PoolCleaner cleaner = cleaner(p)) { - TrackedNoOpRunnable r = new TrackedNoOpRunnable(); - p.execute(r); - assertFalse(r.done); - } - } - - /** - * execute using DiscardPolicy drops task on shutdown - */ - public void testDiscardOnShutdown() { - final ThreadPoolExecutor p = - new ThreadPoolExecutor(1, 1, - LONG_DELAY_MS, MILLISECONDS, - new ArrayBlockingQueue(1), - new ThreadPoolExecutor.DiscardPolicy()); - - try { p.shutdown(); } catch (SecurityException ok) { return; } - try (PoolCleaner cleaner = cleaner(p)) { - TrackedNoOpRunnable r = new TrackedNoOpRunnable(); - p.execute(r); - assertFalse(r.done); - } - } - - /** * execute using DiscardOldestPolicy drops task on shutdown */ public void testDiscardOldestOnShutdown() { @@ -1267,7 +1162,7 @@ public class ThreadPoolExecutorTest exte new ThreadPoolExecutor(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue(1), - new ThreadPoolExecutor.DiscardOldestPolicy()); + new DiscardOldestPolicy()); try { p.shutdown(); } catch (SecurityException ok) { return; } try (PoolCleaner cleaner = cleaner(p)) { @@ -1278,18 +1173,15 @@ public class ThreadPoolExecutorTest exte } /** - * execute(null) throws NPE + * Submitting null tasks throws NullPointerException */ - public void testExecuteNull() { + public void testNullTaskSubmission() { final ThreadPoolExecutor p = new ThreadPoolExecutor(1, 2, 1L, SECONDS, new ArrayBlockingQueue(10)); try (PoolCleaner cleaner = cleaner(p)) { - try { - p.execute(null); - shouldThrow(); - } catch (NullPointerException success) {} + assertNullTaskSubmissionThrowsNullPointerException(p); } } @@ -1481,7 +1373,7 @@ public class ThreadPoolExecutorTest exte } /** - * invokeAny(empty collection) throws IAE + * invokeAny(empty collection) throws IllegalArgumentException */ public void testInvokeAny2() throws Exception { final ExecutorService e = @@ -1571,15 +1463,17 @@ public class ThreadPoolExecutorTest exte } /** - * invokeAll(empty collection) returns empty collection + * invokeAll(empty collection) returns empty list */ public void testInvokeAll2() throws InterruptedException { final ExecutorService e = new ThreadPoolExecutor(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue(10)); + final Collection> emptyCollection + = Collections.emptyList(); try (PoolCleaner cleaner = cleaner(e)) { - List> r = e.invokeAll(new ArrayList>()); + List> r = e.invokeAll(emptyCollection); assertTrue(r.isEmpty()); } } @@ -1654,7 +1548,7 @@ public class ThreadPoolExecutorTest exte new ArrayBlockingQueue(10)); try (PoolCleaner cleaner = cleaner(e)) { try { - e.invokeAny(null, MEDIUM_DELAY_MS, MILLISECONDS); + e.invokeAny(null, randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (NullPointerException success) {} } @@ -1672,14 +1566,14 @@ public class ThreadPoolExecutorTest exte List> l = new ArrayList<>(); l.add(new StringTask()); try { - e.invokeAny(l, MEDIUM_DELAY_MS, null); + e.invokeAny(l, randomTimeout(), null); shouldThrow(); } catch (NullPointerException success) {} } } /** - * timed invokeAny(empty collection) throws IAE + * timed invokeAny(empty collection) throws IllegalArgumentException */ public void testTimedInvokeAny2() throws Exception { final ExecutorService e = @@ -1689,14 +1583,14 @@ public class ThreadPoolExecutorTest exte try (PoolCleaner cleaner = cleaner(e)) { try { e.invokeAny(new ArrayList>(), - MEDIUM_DELAY_MS, MILLISECONDS); + randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (IllegalArgumentException success) {} } } /** - * timed invokeAny(c) throws NPE if c has null elements + * timed invokeAny(c) throws NullPointerException if c has null elements */ public void testTimedInvokeAny3() throws Exception { final CountDownLatch latch = new CountDownLatch(1); @@ -1709,7 +1603,7 @@ public class ThreadPoolExecutorTest exte l.add(latchAwaitingStringTask(latch)); l.add(null); try { - e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS); + e.invokeAny(l, randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (NullPointerException success) {} latch.countDown(); @@ -1767,7 +1661,7 @@ public class ThreadPoolExecutorTest exte new ArrayBlockingQueue(10)); try (PoolCleaner cleaner = cleaner(e)) { try { - e.invokeAll(null, MEDIUM_DELAY_MS, MILLISECONDS); + e.invokeAll(null, randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (NullPointerException success) {} } @@ -1785,23 +1679,25 @@ public class ThreadPoolExecutorTest exte List> l = new ArrayList<>(); l.add(new StringTask()); try { - e.invokeAll(l, MEDIUM_DELAY_MS, null); + e.invokeAll(l, randomTimeout(), null); shouldThrow(); } catch (NullPointerException success) {} } } /** - * timed invokeAll(empty collection) returns empty collection + * timed invokeAll(empty collection) returns empty list */ public void testTimedInvokeAll2() throws InterruptedException { final ExecutorService e = new ThreadPoolExecutor(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue(10)); + final Collection> emptyCollection + = Collections.emptyList(); try (PoolCleaner cleaner = cleaner(e)) { - List> r = e.invokeAll(new ArrayList>(), - MEDIUM_DELAY_MS, MILLISECONDS); + List> r = + e.invokeAll(emptyCollection, randomTimeout(), randomTimeUnit()); assertTrue(r.isEmpty()); } } @@ -1819,7 +1715,7 @@ public class ThreadPoolExecutorTest exte l.add(new StringTask()); l.add(null); try { - e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS); + e.invokeAll(l, randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (NullPointerException success) {} } @@ -1927,7 +1823,7 @@ public class ThreadPoolExecutorTest exte public void realRun() { done.countDown(); }}); - assertTrue(done.await(LONG_DELAY_MS, MILLISECONDS)); + await(done); } } @@ -2020,7 +1916,7 @@ public class ThreadPoolExecutorTest exte } } // enough time to run all tasks - assertTrue(done.await(nTasks * SHORT_DELAY_MS, MILLISECONDS)); + await(done, nTasks * SHORT_DELAY_MS); } } @@ -2061,4 +1957,76 @@ public class ThreadPoolExecutorTest exte } } + /** Directly test simple ThreadPoolExecutor RejectedExecutionHandlers. */ + public void testStandardRejectedExecutionHandlers() { + final ThreadPoolExecutor p = + new ThreadPoolExecutor(1, 1, 1, SECONDS, + new ArrayBlockingQueue(1)); + final AtomicReference thread = new AtomicReference<>(); + final Runnable r = new Runnable() { public void run() { + thread.set(Thread.currentThread()); }}; + + try { + new AbortPolicy().rejectedExecution(r, p); + shouldThrow(); + } catch (RejectedExecutionException success) {} + assertNull(thread.get()); + + new DiscardPolicy().rejectedExecution(r, p); + assertNull(thread.get()); + + new CallerRunsPolicy().rejectedExecution(r, p); + assertSame(Thread.currentThread(), thread.get()); + + // check that pool was not perturbed by handlers + assertTrue(p.getRejectedExecutionHandler() instanceof AbortPolicy); + assertEquals(0, p.getTaskCount()); + assertTrue(p.getQueue().isEmpty()); + } + + public void testThreadFactoryReturnsTerminatedThread_shouldThrow() { + if (!testImplementationDetails) + return; + + ThreadFactory returnsTerminatedThread = runnableIgnored -> { + Thread thread = new Thread(() -> {}); + thread.start(); + try { thread.join(); } + catch (InterruptedException ex) { throw new Error(ex); } + return thread; + }; + ThreadPoolExecutor p = + new ThreadPoolExecutor(1, 1, 1, SECONDS, + new ArrayBlockingQueue(1), + returnsTerminatedThread); + try (PoolCleaner cleaner = cleaner(p)) { + assertThrows(IllegalThreadStateException.class, + () -> p.execute(() -> {})); + } + } + + public void testThreadFactoryReturnsStartedThread_shouldThrow() { + if (!testImplementationDetails) + return; + + CountDownLatch latch = new CountDownLatch(1); + Runnable awaitLatch = () -> { + try { latch.await(); } + catch (InterruptedException ex) { throw new Error(ex); }}; + ThreadFactory returnsStartedThread = runnable -> { + Thread thread = new Thread(awaitLatch); + thread.start(); + return thread; + }; + ThreadPoolExecutor p = + new ThreadPoolExecutor(1, 1, 1, SECONDS, + new ArrayBlockingQueue(1), + returnsStartedThread); + try (PoolCleaner cleaner = cleaner(p)) { + assertThrows(IllegalThreadStateException.class, + () -> p.execute(() -> {})); + latch.countDown(); + } + } + }