--- jsr166/src/test/tck/ThreadPoolExecutorTest.java 2015/10/05 21:42:49 1.100 +++ jsr166/src/test/tck/ThreadPoolExecutorTest.java 2017/07/15 23:15:21 1.123 @@ -11,6 +11,8 @@ import static java.util.concurrent.TimeU import static java.util.concurrent.TimeUnit.SECONDS; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -18,7 +20,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; @@ -27,9 +28,14 @@ import java.util.concurrent.RejectedExec import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ThreadPoolExecutor.AbortPolicy; +import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; +import java.util.concurrent.ThreadPoolExecutor.DiscardPolicy; +import java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import junit.framework.Test; import junit.framework.TestSuite; @@ -92,7 +98,7 @@ public class ThreadPoolExecutorTest exte final Runnable task = new CheckedRunnable() { public void realRun() { done.countDown(); }}; p.execute(task); - assertTrue(done.await(LONG_DELAY_MS, MILLISECONDS)); + await(done); } } @@ -101,23 +107,22 @@ public class ThreadPoolExecutorTest exte * thread becomes active */ public void testGetActiveCount() throws InterruptedException { + final CountDownLatch done = new CountDownLatch(1); final ThreadPoolExecutor p = new ThreadPoolExecutor(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue(10)); - try (PoolCleaner cleaner = cleaner(p)) { + try (PoolCleaner cleaner = cleaner(p, done)) { final CountDownLatch threadStarted = new CountDownLatch(1); - final CountDownLatch done = new CountDownLatch(1); assertEquals(0, p.getActiveCount()); p.execute(new CheckedRunnable() { public void realRun() throws InterruptedException { threadStarted.countDown(); assertEquals(1, p.getActiveCount()); - done.await(); + await(done); }}); - assertTrue(threadStarted.await(MEDIUM_DELAY_MS, MILLISECONDS)); + await(threadStarted); assertEquals(1, p.getActiveCount()); - done.countDown(); } } @@ -187,13 +192,13 @@ public class ThreadPoolExecutorTest exte public void realRun() throws InterruptedException { threadStarted.countDown(); assertEquals(0, p.getCompletedTaskCount()); - threadProceed.await(); + await(threadProceed); threadDone.countDown(); }}); await(threadStarted); assertEquals(0, p.getCompletedTaskCount()); threadProceed.countDown(); - threadDone.await(); + await(threadDone); long startTime = System.nanoTime(); while (p.getCompletedTaskCount() != 1) { if (millisElapsedSince(startTime) > LONG_DELAY_MS) @@ -277,6 +282,19 @@ public class ThreadPoolExecutorTest exte } /** + * The default rejected execution handler is AbortPolicy. + */ + public void testDefaultRejectedExecutionHandler() { + final ThreadPoolExecutor p = + new ThreadPoolExecutor(1, 2, + LONG_DELAY_MS, MILLISECONDS, + new ArrayBlockingQueue(10)); + try (PoolCleaner cleaner = cleaner(p)) { + assertTrue(p.getRejectedExecutionHandler() instanceof AbortPolicy); + } + } + + /** * getRejectedExecutionHandler returns handler in constructor if not set */ public void testGetRejectedExecutionHandler() { @@ -329,24 +347,23 @@ public class ThreadPoolExecutorTest exte */ public void testGetLargestPoolSize() throws InterruptedException { final int THREADS = 3; + final CountDownLatch done = new CountDownLatch(1); final ThreadPoolExecutor p = new ThreadPoolExecutor(THREADS, THREADS, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue(10)); - try (PoolCleaner cleaner = cleaner(p)) { - final CountDownLatch threadsStarted = new CountDownLatch(THREADS); - final CountDownLatch done = new CountDownLatch(1); + try (PoolCleaner cleaner = cleaner(p, done)) { assertEquals(0, p.getLargestPoolSize()); + final CountDownLatch threadsStarted = new CountDownLatch(THREADS); for (int i = 0; i < THREADS; i++) p.execute(new CheckedRunnable() { public void realRun() throws InterruptedException { threadsStarted.countDown(); - done.await(); + await(done); assertEquals(THREADS, p.getLargestPoolSize()); }}); - assertTrue(threadsStarted.await(MEDIUM_DELAY_MS, MILLISECONDS)); + await(threadsStarted); assertEquals(THREADS, p.getLargestPoolSize()); - done.countDown(); // release pool } assertEquals(THREADS, p.getLargestPoolSize()); } @@ -374,23 +391,22 @@ public class ThreadPoolExecutorTest exte * become active */ public void testGetPoolSize() throws InterruptedException { + final CountDownLatch done = new CountDownLatch(1); final ThreadPoolExecutor p = new ThreadPoolExecutor(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue(10)); - try (PoolCleaner cleaner = cleaner(p)) { - final CountDownLatch threadStarted = new CountDownLatch(1); - final CountDownLatch done = new CountDownLatch(1); + try (PoolCleaner cleaner = cleaner(p, done)) { assertEquals(0, p.getPoolSize()); + final CountDownLatch threadStarted = new CountDownLatch(1); p.execute(new CheckedRunnable() { public void realRun() throws InterruptedException { threadStarted.countDown(); assertEquals(1, p.getPoolSize()); - done.await(); + await(done); }}); - assertTrue(threadStarted.await(MEDIUM_DELAY_MS, MILLISECONDS)); + await(threadStarted); assertEquals(1, p.getPoolSize()); - done.countDown(); // release pool } } @@ -411,9 +427,9 @@ public class ThreadPoolExecutorTest exte p.execute(new CheckedRunnable() { public void realRun() throws InterruptedException { threadStarted.countDown(); - done.await(); + await(done); }}); - assertTrue(threadStarted.await(LONG_DELAY_MS, MILLISECONDS)); + await(threadStarted); assertEquals(1, p.getTaskCount()); assertEquals(0, p.getCompletedTaskCount()); for (int i = 0; i < TASKS; i++) { @@ -422,7 +438,7 @@ public class ThreadPoolExecutorTest exte public void realRun() throws InterruptedException { threadStarted.countDown(); assertEquals(1 + TASKS, p.getTaskCount()); - done.await(); + await(done); }}); } assertEquals(1 + TASKS, p.getTaskCount()); @@ -461,8 +477,8 @@ public class ThreadPoolExecutorTest exte assertFalse(p.awaitTermination(Long.MIN_VALUE, MILLISECONDS)); assertFalse(p.awaitTermination(-1L, NANOSECONDS)); assertFalse(p.awaitTermination(-1L, MILLISECONDS)); - assertFalse(p.awaitTermination(0L, NANOSECONDS)); - assertFalse(p.awaitTermination(0L, MILLISECONDS)); + assertFalse(p.awaitTermination(randomExpiredTimeout(), + randomTimeUnit())); long timeoutNanos = 999999L; long startTime = System.nanoTime(); assertFalse(p.awaitTermination(timeoutNanos, NANOSECONDS)); @@ -495,9 +511,9 @@ public class ThreadPoolExecutorTest exte public void realRun() throws InterruptedException { assertFalse(p.isTerminating()); threadStarted.countDown(); - done.await(); + await(done); }}); - assertTrue(threadStarted.await(MEDIUM_DELAY_MS, MILLISECONDS)); + await(threadStarted); assertFalse(p.isTerminating()); done.countDown(); try { p.shutdown(); } catch (SecurityException ok) { return; } @@ -523,9 +539,9 @@ public class ThreadPoolExecutorTest exte public void realRun() throws InterruptedException { assertFalse(p.isTerminating()); threadStarted.countDown(); - done.await(); + await(done); }}); - assertTrue(threadStarted.await(MEDIUM_DELAY_MS, MILLISECONDS)); + await(threadStarted); assertFalse(p.isTerminating()); done.countDown(); try { p.shutdown(); } catch (SecurityException ok) { return; } @@ -539,32 +555,31 @@ public class ThreadPoolExecutorTest exte * getQueue returns the work queue, which contains queued tasks */ public void testGetQueue() throws InterruptedException { - final BlockingQueue q = new ArrayBlockingQueue(10); + final CountDownLatch done = new CountDownLatch(1); + final BlockingQueue q = new ArrayBlockingQueue<>(10); final ThreadPoolExecutor p = new ThreadPoolExecutor(1, 1, LONG_DELAY_MS, MILLISECONDS, q); - try (PoolCleaner cleaner = cleaner(p)) { + try (PoolCleaner cleaner = cleaner(p, done)) { final CountDownLatch threadStarted = new CountDownLatch(1); - final CountDownLatch done = new CountDownLatch(1); FutureTask[] tasks = new FutureTask[5]; for (int i = 0; i < tasks.length; i++) { Callable task = new CheckedCallable() { public Boolean realCall() throws InterruptedException { threadStarted.countDown(); assertSame(q, p.getQueue()); - done.await(); + await(done); return Boolean.TRUE; }}; tasks[i] = new FutureTask(task); p.execute(tasks[i]); } - assertTrue(threadStarted.await(MEDIUM_DELAY_MS, MILLISECONDS)); + await(threadStarted); assertSame(q, p.getQueue()); assertFalse(q.contains(tasks[0])); assertTrue(q.contains(tasks[tasks.length - 1])); assertEquals(tasks.length - 1, q.size()); - done.countDown(); } } @@ -572,24 +587,24 @@ public class ThreadPoolExecutorTest exte * remove(task) removes queued task, and fails to remove active task */ public void testRemove() throws InterruptedException { - BlockingQueue q = new ArrayBlockingQueue(10); + final CountDownLatch done = new CountDownLatch(1); + BlockingQueue q = new ArrayBlockingQueue<>(10); final ThreadPoolExecutor p = new ThreadPoolExecutor(1, 1, LONG_DELAY_MS, MILLISECONDS, q); - try (PoolCleaner cleaner = cleaner(p)) { + try (PoolCleaner cleaner = cleaner(p, done)) { Runnable[] tasks = new Runnable[6]; final CountDownLatch threadStarted = new CountDownLatch(1); - final CountDownLatch done = new CountDownLatch(1); for (int i = 0; i < tasks.length; i++) { tasks[i] = new CheckedRunnable() { public void realRun() throws InterruptedException { threadStarted.countDown(); - done.await(); + await(done); }}; p.execute(tasks[i]); } - assertTrue(threadStarted.await(MEDIUM_DELAY_MS, MILLISECONDS)); + await(threadStarted); assertFalse(p.remove(tasks[0])); assertTrue(q.contains(tasks[4])); assertTrue(q.contains(tasks[3])); @@ -599,7 +614,6 @@ public class ThreadPoolExecutorTest exte assertTrue(q.contains(tasks[3])); assertTrue(p.remove(tasks[3])); assertFalse(q.contains(tasks[3])); - done.countDown(); } } @@ -609,24 +623,24 @@ public class ThreadPoolExecutorTest exte public void testPurge() throws InterruptedException { final CountDownLatch threadStarted = new CountDownLatch(1); final CountDownLatch done = new CountDownLatch(1); - final BlockingQueue q = new ArrayBlockingQueue(10); + final BlockingQueue q = new ArrayBlockingQueue<>(10); final ThreadPoolExecutor p = new ThreadPoolExecutor(1, 1, LONG_DELAY_MS, MILLISECONDS, q); - try (PoolCleaner cleaner = cleaner(p)) { + try (PoolCleaner cleaner = cleaner(p, done)) { FutureTask[] tasks = new FutureTask[5]; for (int i = 0; i < tasks.length; i++) { Callable task = new CheckedCallable() { public Boolean realCall() throws InterruptedException { threadStarted.countDown(); - done.await(); + await(done); return Boolean.TRUE; }}; tasks[i] = new FutureTask(task); p.execute(tasks[i]); } - assertTrue(threadStarted.await(MEDIUM_DELAY_MS, MILLISECONDS)); + await(threadStarted); assertEquals(tasks.length, p.getTaskCount()); assertEquals(tasks.length - 1, q.size()); assertEquals(1L, p.getActiveCount()); @@ -639,7 +653,6 @@ public class ThreadPoolExecutorTest exte p.purge(); // Nothing to do assertEquals(tasks.length - 3, q.size()); assertEquals(tasks.length - 2, p.getTaskCount()); - done.countDown(); } } @@ -665,7 +678,7 @@ public class ThreadPoolExecutorTest exte }}; for (int i = 0; i < count; i++) p.execute(waiter); - assertTrue(threadsStarted.await(LONG_DELAY_MS, MILLISECONDS)); + await(threadsStarted); assertEquals(poolSize, p.getActiveCount()); assertEquals(0, p.getCompletedTaskCount()); final List queuedTasks; @@ -1027,163 +1040,181 @@ public class ThreadPoolExecutorTest exte * get of submitted callable throws InterruptedException if interrupted */ public void testInterruptedSubmit() throws InterruptedException { + final CountDownLatch done = new CountDownLatch(1); final ThreadPoolExecutor p = new ThreadPoolExecutor(1, 1, 60, SECONDS, new ArrayBlockingQueue(10)); - try (PoolCleaner cleaner = cleaner(p)) { + try (PoolCleaner cleaner = cleaner(p, done)) { final CountDownLatch threadStarted = new CountDownLatch(1); - final CountDownLatch done = new CountDownLatch(1); Thread t = newStartedThread(new CheckedInterruptedRunnable() { public void realRun() throws Exception { Callable task = new CheckedCallable() { public Boolean realCall() throws InterruptedException { threadStarted.countDown(); - done.await(); + await(done); return Boolean.TRUE; }}; p.submit(task).get(); }}); - assertTrue(threadStarted.await(MEDIUM_DELAY_MS, MILLISECONDS)); + await(threadStarted); t.interrupt(); - awaitTermination(t, MEDIUM_DELAY_MS); - done.countDown(); + awaitTermination(t); } } /** - * execute throws RejectedExecutionException if saturated. + * Submitted tasks are rejected when saturated. */ - public void testSaturatedExecute() { - final ThreadPoolExecutor p = - new ThreadPoolExecutor(1, 1, - LONG_DELAY_MS, MILLISECONDS, - new ArrayBlockingQueue(1)); - try (PoolCleaner cleaner = cleaner(p)) { - final CountDownLatch done = new CountDownLatch(1); - Runnable task = new CheckedRunnable() { - public void realRun() throws InterruptedException { - done.await(); - }}; - for (int i = 0; i < 2; ++i) - p.execute(task); - for (int i = 0; i < 2; ++i) { + @SuppressWarnings("FutureReturnValueIgnored") + public void testSubmittedTasksRejectedWhenSaturated() { + final ThreadLocalRandom rnd = ThreadLocalRandom.current(); + final CountDownLatch done = new CountDownLatch(1); + final Runnable r = awaiter(done); + final Callable c = new CheckedCallable() { + public Boolean realCall() throws InterruptedException { + await(done); + return Boolean.TRUE; + }}; + final ThreadPoolExecutor p = new ThreadPoolExecutor( + 1, 1, 1, SECONDS, new ArrayBlockingQueue(1)); + + try (PoolCleaner cleaner = cleaner(p, done)) { + // saturate + for (int i = saturatedSize(p); i--> 0; ) { + switch (rnd.nextInt(3)) { + case 0: p.execute(r); break; + case 1: assertFalse(p.submit(r).isDone()); break; + case 2: assertFalse(p.submit(c).isDone()); break; + } + } + + // check default handler + assertTrue(p.getRejectedExecutionHandler() instanceof AbortPolicy); + for (int i = 2; i--> 0; ) { try { - p.execute(task); + p.execute(r); shouldThrow(); } catch (RejectedExecutionException success) {} - assertTrue(p.getTaskCount() <= 2); - } - done.countDown(); - } - } - - /** - * submit(runnable) throws RejectedExecutionException if saturated. - */ - public void testSaturatedSubmitRunnable() { - final ThreadPoolExecutor p = - new ThreadPoolExecutor(1, 1, - LONG_DELAY_MS, MILLISECONDS, - new ArrayBlockingQueue(1)); - try (PoolCleaner cleaner = cleaner(p)) { - final CountDownLatch done = new CountDownLatch(1); - Runnable task = new CheckedRunnable() { - public void realRun() throws InterruptedException { - done.await(); - }}; - for (int i = 0; i < 2; ++i) - p.submit(task); - for (int i = 0; i < 2; ++i) { try { - p.execute(task); + p.submit(r); shouldThrow(); } catch (RejectedExecutionException success) {} - assertTrue(p.getTaskCount() <= 2); - } - done.countDown(); - } - } - - /** - * submit(callable) throws RejectedExecutionException if saturated. - */ - public void testSaturatedSubmitCallable() { - final ThreadPoolExecutor p = - new ThreadPoolExecutor(1, 1, - LONG_DELAY_MS, MILLISECONDS, - new ArrayBlockingQueue(1)); - try (PoolCleaner cleaner = cleaner(p)) { - final CountDownLatch done = new CountDownLatch(1); - Runnable task = new CheckedRunnable() { - public void realRun() throws InterruptedException { - done.await(); - }}; - for (int i = 0; i < 2; ++i) - p.submit(Executors.callable(task)); - for (int i = 0; i < 2; ++i) { try { - p.execute(task); + p.submit(c); shouldThrow(); } catch (RejectedExecutionException success) {} - assertTrue(p.getTaskCount() <= 2); } - done.countDown(); + + // check CallerRunsPolicy runs task in caller thread + { + RejectedExecutionHandler handler = new CallerRunsPolicy(); + p.setRejectedExecutionHandler(handler); + assertSame(handler, p.getRejectedExecutionHandler()); + final AtomicReference thread = new AtomicReference<>(); + p.execute(new Runnable() { public void run() { + thread.set(Thread.currentThread()); }}); + assertSame(Thread.currentThread(), thread.get()); + } + + // check DiscardPolicy does nothing + { + RejectedExecutionHandler handler = new DiscardPolicy(); + p.setRejectedExecutionHandler(handler); + assertSame(handler, p.getRejectedExecutionHandler()); + final AtomicReference thread = new AtomicReference<>(); + p.execute(new Runnable() { public void run() { + thread.set(Thread.currentThread()); }}); + assertNull(thread.get()); + } + + class Recorder implements RejectedExecutionHandler { + public volatile Runnable r = null; + public volatile ThreadPoolExecutor p = null; + public void reset() { r = null; p = null; } + public void rejectedExecution(Runnable r, ThreadPoolExecutor p) { + assertNull(this.r); + assertNull(this.p); + this.r = r; + this.p = p; + } + } + + // check custom handler is invoked exactly once per task + Recorder recorder = new Recorder(); + p.setRejectedExecutionHandler(recorder); + assertSame(recorder, p.getRejectedExecutionHandler()); + for (int i = 2; i--> 0; ) { + recorder.reset(); + p.execute(r); + assertSame(r, recorder.r); + assertSame(p, recorder.p); + + recorder.reset(); + assertFalse(p.submit(r).isDone()); + assertTrue(recorder.r instanceof FutureTask); + assertSame(p, recorder.p); + + recorder.reset(); + assertFalse(p.submit(c).isDone()); + assertTrue(recorder.r instanceof FutureTask); + assertSame(p, recorder.p); + } + + // check that pool was not perturbed by handlers + assertEquals(2, p.getTaskCount()); + assertEquals(0, p.getCompletedTaskCount()); + assertEquals(0, p.getQueue().remainingCapacity()); } + assertEquals(saturatedSize(p), p.getCompletedTaskCount()); } /** * executor using CallerRunsPolicy runs task if saturated. */ public void testSaturatedExecute2() { - final ThreadPoolExecutor p = - new ThreadPoolExecutor(1, 1, - LONG_DELAY_MS, - MILLISECONDS, - new ArrayBlockingQueue(1), - new ThreadPoolExecutor.CallerRunsPolicy()); - try (PoolCleaner cleaner = cleaner(p)) { - final CountDownLatch done = new CountDownLatch(1); - Runnable blocker = new CheckedRunnable() { - public void realRun() throws InterruptedException { - done.await(); - }}; - p.execute(blocker); - TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5]; - for (int i = 0; i < tasks.length; i++) - tasks[i] = new TrackedNoOpRunnable(); + final RejectedExecutionHandler handler = new CallerRunsPolicy(); + final ThreadPoolExecutor p = new ThreadPoolExecutor( + 1, 1, LONG_DELAY_MS, SECONDS, new ArrayBlockingQueue(1), + handler); + assertSame(handler, p.getRejectedExecutionHandler()); + final TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5]; + final CountDownLatch done = new CountDownLatch(1); + try (PoolCleaner cleaner = cleaner(p, done)) { + p.execute(awaiter(done)); + for (int i = 0; i < tasks.length; i++) - p.execute(tasks[i]); + p.execute(tasks[i] = new TrackedNoOpRunnable()); + for (int i = 1; i < tasks.length; i++) assertTrue(tasks[i].done); assertFalse(tasks[0].done); // waiting in queue - done.countDown(); } + for (TrackedNoOpRunnable task : tasks) + assertTrue(task.done); } /** * executor using DiscardPolicy drops task if saturated. */ public void testSaturatedExecute3() { + final RejectedExecutionHandler handler = new DiscardPolicy(); + final ThreadPoolExecutor p = new ThreadPoolExecutor( + 1, 1, LONG_DELAY_MS, SECONDS, new ArrayBlockingQueue(1), + handler); + assertSame(handler, p.getRejectedExecutionHandler()); final TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5]; - for (int i = 0; i < tasks.length; ++i) - tasks[i] = new TrackedNoOpRunnable(); - final ThreadPoolExecutor p = - new ThreadPoolExecutor(1, 1, - LONG_DELAY_MS, MILLISECONDS, - new ArrayBlockingQueue(1), - new ThreadPoolExecutor.DiscardPolicy()); - try (PoolCleaner cleaner = cleaner(p)) { - final CountDownLatch done = new CountDownLatch(1); + final CountDownLatch done = new CountDownLatch(1); + try (PoolCleaner cleaner = cleaner(p, done)) { p.execute(awaiter(done)); - for (TrackedNoOpRunnable task : tasks) - p.execute(task); + for (int i = 0; i < tasks.length; i++) + p.execute(tasks[i] = new TrackedNoOpRunnable()); + for (int i = 1; i < tasks.length; i++) assertFalse(tasks[i].done); - done.countDown(); } for (int i = 1; i < tasks.length; i++) assertFalse(tasks[i].done); @@ -1202,8 +1233,8 @@ public class ThreadPoolExecutorTest exte new ThreadPoolExecutor(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue(1), - new ThreadPoolExecutor.DiscardOldestPolicy()); - try (PoolCleaner cleaner = cleaner(p)) { + new DiscardOldestPolicy()); + try (PoolCleaner cleaner = cleaner(p, done)) { assertEquals(LatchAwaiter.NEW, r1.state); assertEquals(LatchAwaiter.NEW, r2.state); assertEquals(LatchAwaiter.NEW, r3.state); @@ -1213,7 +1244,6 @@ public class ThreadPoolExecutorTest exte p.execute(r3); assertFalse(p.getQueue().contains(r2)); assertTrue(p.getQueue().contains(r3)); - done.countDown(); } assertEquals(LatchAwaiter.DONE, r1.state); assertEquals(LatchAwaiter.NEW, r2.state); @@ -1241,7 +1271,7 @@ public class ThreadPoolExecutorTest exte * execute using CallerRunsPolicy drops task on shutdown */ public void testCallerRunsOnShutdown() { - RejectedExecutionHandler h = new ThreadPoolExecutor.CallerRunsPolicy(); + RejectedExecutionHandler h = new CallerRunsPolicy(); final ThreadPoolExecutor p = new ThreadPoolExecutor(1, 1, LONG_DELAY_MS, MILLISECONDS, @@ -1263,7 +1293,7 @@ public class ThreadPoolExecutorTest exte new ThreadPoolExecutor(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue(1), - new ThreadPoolExecutor.DiscardPolicy()); + new DiscardPolicy()); try { p.shutdown(); } catch (SecurityException ok) { return; } try (PoolCleaner cleaner = cleaner(p)) { @@ -1281,7 +1311,7 @@ public class ThreadPoolExecutorTest exte new ThreadPoolExecutor(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue(1), - new ThreadPoolExecutor.DiscardOldestPolicy()); + new DiscardOldestPolicy()); try { p.shutdown(); } catch (SecurityException ok) { return; } try (PoolCleaner cleaner = cleaner(p)) { @@ -1495,7 +1525,7 @@ public class ThreadPoolExecutorTest exte } /** - * invokeAny(empty collection) throws IAE + * invokeAny(empty collection) throws IllegalArgumentException */ public void testInvokeAny2() throws Exception { final ExecutorService e = @@ -1520,7 +1550,7 @@ public class ThreadPoolExecutorTest exte LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue(10)); try (PoolCleaner cleaner = cleaner(e)) { - List> l = new ArrayList>(); + List> l = new ArrayList<>(); l.add(latchAwaitingStringTask(latch)); l.add(null); try { @@ -1540,7 +1570,7 @@ public class ThreadPoolExecutorTest exte LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue(10)); try (PoolCleaner cleaner = cleaner(e)) { - List> l = new ArrayList>(); + List> l = new ArrayList<>(); l.add(new NPETask()); try { e.invokeAny(l); @@ -1560,7 +1590,7 @@ public class ThreadPoolExecutorTest exte LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue(10)); try (PoolCleaner cleaner = cleaner(e)) { - List> l = new ArrayList>(); + List> l = new ArrayList<>(); l.add(new StringTask()); l.add(new StringTask()); String result = e.invokeAny(l); @@ -1585,15 +1615,17 @@ public class ThreadPoolExecutorTest exte } /** - * invokeAll(empty collection) returns empty collection + * invokeAll(empty collection) returns empty list */ public void testInvokeAll2() throws InterruptedException { final ExecutorService e = new ThreadPoolExecutor(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue(10)); + final Collection> emptyCollection + = Collections.emptyList(); try (PoolCleaner cleaner = cleaner(e)) { - List> r = e.invokeAll(new ArrayList>()); + List> r = e.invokeAll(emptyCollection); assertTrue(r.isEmpty()); } } @@ -1607,7 +1639,7 @@ public class ThreadPoolExecutorTest exte LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue(10)); try (PoolCleaner cleaner = cleaner(e)) { - List> l = new ArrayList>(); + List> l = new ArrayList<>(); l.add(new StringTask()); l.add(null); try { @@ -1626,7 +1658,7 @@ public class ThreadPoolExecutorTest exte LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue(10)); try (PoolCleaner cleaner = cleaner(e)) { - List> l = new ArrayList>(); + List> l = new ArrayList<>(); l.add(new NPETask()); List> futures = e.invokeAll(l); assertEquals(1, futures.size()); @@ -1648,7 +1680,7 @@ public class ThreadPoolExecutorTest exte LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue(10)); try (PoolCleaner cleaner = cleaner(e)) { - List> l = new ArrayList>(); + List> l = new ArrayList<>(); l.add(new StringTask()); l.add(new StringTask()); List> futures = e.invokeAll(l); @@ -1668,7 +1700,7 @@ public class ThreadPoolExecutorTest exte new ArrayBlockingQueue(10)); try (PoolCleaner cleaner = cleaner(e)) { try { - e.invokeAny(null, MEDIUM_DELAY_MS, MILLISECONDS); + e.invokeAny(null, randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (NullPointerException success) {} } @@ -1683,17 +1715,17 @@ public class ThreadPoolExecutorTest exte LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue(10)); try (PoolCleaner cleaner = cleaner(e)) { - List> l = new ArrayList>(); + List> l = new ArrayList<>(); l.add(new StringTask()); try { - e.invokeAny(l, MEDIUM_DELAY_MS, null); + e.invokeAny(l, randomTimeout(), null); shouldThrow(); } catch (NullPointerException success) {} } } /** - * timed invokeAny(empty collection) throws IAE + * timed invokeAny(empty collection) throws IllegalArgumentException */ public void testTimedInvokeAny2() throws Exception { final ExecutorService e = @@ -1703,14 +1735,14 @@ public class ThreadPoolExecutorTest exte try (PoolCleaner cleaner = cleaner(e)) { try { e.invokeAny(new ArrayList>(), - MEDIUM_DELAY_MS, MILLISECONDS); + randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (IllegalArgumentException success) {} } } /** - * timed invokeAny(c) throws NPE if c has null elements + * timed invokeAny(c) throws NullPointerException if c has null elements */ public void testTimedInvokeAny3() throws Exception { final CountDownLatch latch = new CountDownLatch(1); @@ -1719,11 +1751,11 @@ public class ThreadPoolExecutorTest exte LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue(10)); try (PoolCleaner cleaner = cleaner(e)) { - List> l = new ArrayList>(); + List> l = new ArrayList<>(); l.add(latchAwaitingStringTask(latch)); l.add(null); try { - e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS); + e.invokeAny(l, randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (NullPointerException success) {} latch.countDown(); @@ -1739,14 +1771,16 @@ public class ThreadPoolExecutorTest exte LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue(10)); try (PoolCleaner cleaner = cleaner(e)) { - List> l = new ArrayList>(); + long startTime = System.nanoTime(); + List> l = new ArrayList<>(); l.add(new NPETask()); try { - e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS); + e.invokeAny(l, LONG_DELAY_MS, MILLISECONDS); shouldThrow(); } catch (ExecutionException success) { assertTrue(success.getCause() instanceof NullPointerException); } + assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); } } @@ -1759,11 +1793,13 @@ public class ThreadPoolExecutorTest exte LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue(10)); try (PoolCleaner cleaner = cleaner(e)) { - List> l = new ArrayList>(); + long startTime = System.nanoTime(); + List> l = new ArrayList<>(); l.add(new StringTask()); l.add(new StringTask()); - String result = e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS); + String result = e.invokeAny(l, LONG_DELAY_MS, MILLISECONDS); assertSame(TEST_STRING, result); + assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); } } @@ -1777,7 +1813,7 @@ public class ThreadPoolExecutorTest exte new ArrayBlockingQueue(10)); try (PoolCleaner cleaner = cleaner(e)) { try { - e.invokeAll(null, MEDIUM_DELAY_MS, MILLISECONDS); + e.invokeAll(null, randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (NullPointerException success) {} } @@ -1792,26 +1828,28 @@ public class ThreadPoolExecutorTest exte LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue(10)); try (PoolCleaner cleaner = cleaner(e)) { - List> l = new ArrayList>(); + List> l = new ArrayList<>(); l.add(new StringTask()); try { - e.invokeAll(l, MEDIUM_DELAY_MS, null); + e.invokeAll(l, randomTimeout(), null); shouldThrow(); } catch (NullPointerException success) {} } } /** - * timed invokeAll(empty collection) returns empty collection + * timed invokeAll(empty collection) returns empty list */ public void testTimedInvokeAll2() throws InterruptedException { final ExecutorService e = new ThreadPoolExecutor(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue(10)); + final Collection> emptyCollection + = Collections.emptyList(); try (PoolCleaner cleaner = cleaner(e)) { - List> r = e.invokeAll(new ArrayList>(), - MEDIUM_DELAY_MS, MILLISECONDS); + List> r = + e.invokeAll(emptyCollection, randomTimeout(), randomTimeUnit()); assertTrue(r.isEmpty()); } } @@ -1825,11 +1863,11 @@ public class ThreadPoolExecutorTest exte LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue(10)); try (PoolCleaner cleaner = cleaner(e)) { - List> l = new ArrayList>(); + List> l = new ArrayList<>(); l.add(new StringTask()); l.add(null); try { - e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS); + e.invokeAll(l, randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (NullPointerException success) {} } @@ -1844,10 +1882,10 @@ public class ThreadPoolExecutorTest exte LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue(10)); try (PoolCleaner cleaner = cleaner(e)) { - List> l = new ArrayList>(); + List> l = new ArrayList<>(); l.add(new NPETask()); List> futures = - e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS); + e.invokeAll(l, LONG_DELAY_MS, MILLISECONDS); assertEquals(1, futures.size()); try { futures.get(0).get(); @@ -1867,7 +1905,7 @@ public class ThreadPoolExecutorTest exte LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue(10)); try (PoolCleaner cleaner = cleaner(e)) { - List> l = new ArrayList>(); + List> l = new ArrayList<>(); l.add(new StringTask()); l.add(new StringTask()); List> futures = @@ -1882,19 +1920,25 @@ public class ThreadPoolExecutorTest exte * timed invokeAll(c) cancels tasks not completed by timeout */ public void testTimedInvokeAll6() throws Exception { - final ExecutorService e = - new ThreadPoolExecutor(2, 2, - LONG_DELAY_MS, MILLISECONDS, - new ArrayBlockingQueue(10)); - try (PoolCleaner cleaner = cleaner(e)) { - for (long timeout = timeoutMillis();;) { + for (long timeout = timeoutMillis();;) { + final CountDownLatch done = new CountDownLatch(1); + final Callable waiter = new CheckedCallable() { + public String realCall() { + try { done.await(LONG_DELAY_MS, MILLISECONDS); } + catch (InterruptedException ok) {} + return "1"; }}; + final ExecutorService p = + new ThreadPoolExecutor(2, 2, + LONG_DELAY_MS, MILLISECONDS, + new ArrayBlockingQueue(10)); + try (PoolCleaner cleaner = cleaner(p, done)) { List> tasks = new ArrayList<>(); tasks.add(new StringTask("0")); - tasks.add(Executors.callable(new LongPossiblyInterruptedRunnable(), TEST_STRING)); + tasks.add(waiter); tasks.add(new StringTask("2")); long startTime = System.nanoTime(); List> futures = - e.invokeAll(tasks, timeout, MILLISECONDS); + p.invokeAll(tasks, timeout, MILLISECONDS); assertEquals(tasks.size(), futures.size()); assertTrue(millisElapsedSince(startTime) >= timeout); for (Future future : futures) @@ -1931,7 +1975,7 @@ public class ThreadPoolExecutorTest exte public void realRun() { done.countDown(); }}); - assertTrue(done.await(LONG_DELAY_MS, MILLISECONDS)); + await(done); } } @@ -2024,7 +2068,7 @@ public class ThreadPoolExecutorTest exte } } // enough time to run all tasks - assertTrue(done.await(nTasks * SHORT_DELAY_MS, MILLISECONDS)); + await(done, nTasks * SHORT_DELAY_MS); } } @@ -2032,13 +2076,13 @@ public class ThreadPoolExecutorTest exte * get(cancelled task) throws CancellationException */ public void testGet_cancelled() throws Exception { + final CountDownLatch done = new CountDownLatch(1); final ExecutorService e = new ThreadPoolExecutor(1, 1, LONG_DELAY_MS, MILLISECONDS, new LinkedBlockingQueue()); - try (PoolCleaner cleaner = cleaner(e)) { + try (PoolCleaner cleaner = cleaner(e, done)) { final CountDownLatch blockerStarted = new CountDownLatch(1); - final CountDownLatch done = new CountDownLatch(1); final List> futures = new ArrayList<>(); for (int i = 0; i < 2; i++) { Runnable r = new CheckedRunnable() { public void realRun() @@ -2048,7 +2092,7 @@ public class ThreadPoolExecutorTest exte }}; futures.add(e.submit(r)); } - assertTrue(blockerStarted.await(LONG_DELAY_MS, MILLISECONDS)); + await(blockerStarted); for (Future future : futures) future.cancel(false); for (Future future : futures) { try { @@ -2062,8 +2106,34 @@ public class ThreadPoolExecutorTest exte assertTrue(future.isCancelled()); assertTrue(future.isDone()); } - done.countDown(); } } + /** Directly test simple ThreadPoolExecutor RejectedExecutionHandlers. */ + public void testStandardRejectedExecutionHandlers() { + final ThreadPoolExecutor p = + new ThreadPoolExecutor(1, 1, 1, SECONDS, + new ArrayBlockingQueue(1)); + final AtomicReference thread = new AtomicReference<>(); + final Runnable r = new Runnable() { public void run() { + thread.set(Thread.currentThread()); }}; + + try { + new AbortPolicy().rejectedExecution(r, p); + shouldThrow(); + } catch (RejectedExecutionException success) {} + assertNull(thread.get()); + + new DiscardPolicy().rejectedExecution(r, p); + assertNull(thread.get()); + + new CallerRunsPolicy().rejectedExecution(r, p); + assertSame(Thread.currentThread(), thread.get()); + + // check that pool was not perturbed by handlers + assertTrue(p.getRejectedExecutionHandler() instanceof AbortPolicy); + assertEquals(0, p.getTaskCount()); + assertTrue(p.getQueue().isEmpty()); + } + }