ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/ThreadPoolExecutorSubclassTest.java
Revision: 1.101
Committed: Sun Jul 23 15:59:49 2017 UTC (6 years, 9 months ago) by jsr166
Branch: MAIN
Changes since 1.100: +2 -2 lines
Log Message:
whitespace

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include Andrew Wright, Jeffrey Hayes,
6 * Pat Fisher, Mike Judd.
7 */
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.SECONDS;
11
12 import java.util.ArrayList;
13 import java.util.Collection;
14 import java.util.Collections;
15 import java.util.List;
16 import java.util.concurrent.ArrayBlockingQueue;
17 import java.util.concurrent.BlockingQueue;
18 import java.util.concurrent.Callable;
19 import java.util.concurrent.CancellationException;
20 import java.util.concurrent.CountDownLatch;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.FutureTask;
25 import java.util.concurrent.LinkedBlockingQueue;
26 import java.util.concurrent.RejectedExecutionHandler;
27 import java.util.concurrent.RunnableFuture;
28 import java.util.concurrent.SynchronousQueue;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.ThreadLocalRandom;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeoutException;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.concurrent.locks.Condition;
36 import java.util.concurrent.locks.ReentrantLock;
37
38 import junit.framework.Test;
39 import junit.framework.TestSuite;
40
41 public class ThreadPoolExecutorSubclassTest extends JSR166TestCase {
42 public static void main(String[] args) {
43 main(suite(), args);
44 }
45 public static Test suite() {
46 return new TestSuite(ThreadPoolExecutorSubclassTest.class);
47 }
48
49 static class CustomTask<V> implements RunnableFuture<V> {
50 final Callable<V> callable;
51 final ReentrantLock lock = new ReentrantLock();
52 final Condition cond = lock.newCondition();
53 boolean done;
54 boolean cancelled;
55 V result;
56 Thread thread;
57 Exception exception;
58 CustomTask(Callable<V> c) {
59 if (c == null) throw new NullPointerException();
60 callable = c;
61 }
62 CustomTask(final Runnable r, final V res) {
63 if (r == null) throw new NullPointerException();
64 callable = new Callable<V>() {
65 public V call() throws Exception { r.run(); return res; }};
66 }
67 public boolean isDone() {
68 lock.lock(); try { return done; } finally { lock.unlock() ; }
69 }
70 public boolean isCancelled() {
71 lock.lock(); try { return cancelled; } finally { lock.unlock() ; }
72 }
73 public boolean cancel(boolean mayInterrupt) {
74 lock.lock();
75 try {
76 if (!done) {
77 cancelled = true;
78 done = true;
79 if (mayInterrupt && thread != null)
80 thread.interrupt();
81 return true;
82 }
83 return false;
84 }
85 finally { lock.unlock() ; }
86 }
87 public void run() {
88 lock.lock();
89 try {
90 if (done)
91 return;
92 thread = Thread.currentThread();
93 }
94 finally { lock.unlock() ; }
95 V v = null;
96 Exception e = null;
97 try {
98 v = callable.call();
99 }
100 catch (Exception ex) {
101 e = ex;
102 }
103 lock.lock();
104 try {
105 if (!done) {
106 result = v;
107 exception = e;
108 done = true;
109 thread = null;
110 cond.signalAll();
111 }
112 }
113 finally { lock.unlock(); }
114 }
115 public V get() throws InterruptedException, ExecutionException {
116 lock.lock();
117 try {
118 while (!done)
119 cond.await();
120 if (cancelled)
121 throw new CancellationException();
122 if (exception != null)
123 throw new ExecutionException(exception);
124 return result;
125 }
126 finally { lock.unlock(); }
127 }
128 public V get(long timeout, TimeUnit unit)
129 throws InterruptedException, ExecutionException, TimeoutException {
130 long nanos = unit.toNanos(timeout);
131 lock.lock();
132 try {
133 while (!done) {
134 if (nanos <= 0L)
135 throw new TimeoutException();
136 nanos = cond.awaitNanos(nanos);
137 }
138 if (cancelled)
139 throw new CancellationException();
140 if (exception != null)
141 throw new ExecutionException(exception);
142 return result;
143 }
144 finally { lock.unlock(); }
145 }
146 }
147
148 static class CustomTPE extends ThreadPoolExecutor {
149 protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
150 return new CustomTask<V>(c);
151 }
152 protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
153 return new CustomTask<V>(r, v);
154 }
155
156 CustomTPE(int corePoolSize,
157 int maximumPoolSize,
158 long keepAliveTime,
159 TimeUnit unit,
160 BlockingQueue<Runnable> workQueue) {
161 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
162 workQueue);
163 }
164 CustomTPE(int corePoolSize,
165 int maximumPoolSize,
166 long keepAliveTime,
167 TimeUnit unit,
168 BlockingQueue<Runnable> workQueue,
169 ThreadFactory threadFactory) {
170 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
171 threadFactory);
172 }
173
174 CustomTPE(int corePoolSize,
175 int maximumPoolSize,
176 long keepAliveTime,
177 TimeUnit unit,
178 BlockingQueue<Runnable> workQueue,
179 RejectedExecutionHandler handler) {
180 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
181 handler);
182 }
183 CustomTPE(int corePoolSize,
184 int maximumPoolSize,
185 long keepAliveTime,
186 TimeUnit unit,
187 BlockingQueue<Runnable> workQueue,
188 ThreadFactory threadFactory,
189 RejectedExecutionHandler handler) {
190 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
191 workQueue, threadFactory, handler);
192 }
193
194 final CountDownLatch beforeCalled = new CountDownLatch(1);
195 final CountDownLatch afterCalled = new CountDownLatch(1);
196 final CountDownLatch terminatedCalled = new CountDownLatch(1);
197
198 public CustomTPE() {
199 super(1, 1, LONG_DELAY_MS, MILLISECONDS, new SynchronousQueue<Runnable>());
200 }
201 protected void beforeExecute(Thread t, Runnable r) {
202 beforeCalled.countDown();
203 }
204 protected void afterExecute(Runnable r, Throwable t) {
205 afterCalled.countDown();
206 }
207 protected void terminated() {
208 terminatedCalled.countDown();
209 }
210
211 public boolean beforeCalled() {
212 return beforeCalled.getCount() == 0;
213 }
214 public boolean afterCalled() {
215 return afterCalled.getCount() == 0;
216 }
217 public boolean terminatedCalled() {
218 return terminatedCalled.getCount() == 0;
219 }
220 }
221
222 static class FailingThreadFactory implements ThreadFactory {
223 int calls = 0;
224 public Thread newThread(Runnable r) {
225 if (++calls > 1) return null;
226 return new Thread(r);
227 }
228 }
229
230 /**
231 * execute successfully executes a runnable
232 */
233 public void testExecute() throws InterruptedException {
234 final ThreadPoolExecutor p =
235 new CustomTPE(1, 1,
236 2 * LONG_DELAY_MS, MILLISECONDS,
237 new ArrayBlockingQueue<Runnable>(10));
238 try (PoolCleaner cleaner = cleaner(p)) {
239 final CountDownLatch done = new CountDownLatch(1);
240 final Runnable task = new CheckedRunnable() {
241 public void realRun() { done.countDown(); }};
242 p.execute(task);
243 await(done);
244 }
245 }
246
247 /**
248 * getActiveCount increases but doesn't overestimate, when a
249 * thread becomes active
250 */
251 public void testGetActiveCount() throws InterruptedException {
252 final CountDownLatch done = new CountDownLatch(1);
253 final ThreadPoolExecutor p =
254 new CustomTPE(2, 2,
255 LONG_DELAY_MS, MILLISECONDS,
256 new ArrayBlockingQueue<Runnable>(10));
257 try (PoolCleaner cleaner = cleaner(p, done)) {
258 final CountDownLatch threadStarted = new CountDownLatch(1);
259 assertEquals(0, p.getActiveCount());
260 p.execute(new CheckedRunnable() {
261 public void realRun() throws InterruptedException {
262 threadStarted.countDown();
263 assertEquals(1, p.getActiveCount());
264 await(done);
265 }});
266 await(threadStarted);
267 assertEquals(1, p.getActiveCount());
268 }
269 }
270
271 /**
272 * prestartCoreThread starts a thread if under corePoolSize, else doesn't
273 */
274 public void testPrestartCoreThread() {
275 final ThreadPoolExecutor p =
276 new CustomTPE(2, 6,
277 LONG_DELAY_MS, MILLISECONDS,
278 new ArrayBlockingQueue<Runnable>(10));
279 try (PoolCleaner cleaner = cleaner(p)) {
280 assertEquals(0, p.getPoolSize());
281 assertTrue(p.prestartCoreThread());
282 assertEquals(1, p.getPoolSize());
283 assertTrue(p.prestartCoreThread());
284 assertEquals(2, p.getPoolSize());
285 assertFalse(p.prestartCoreThread());
286 assertEquals(2, p.getPoolSize());
287 p.setCorePoolSize(4);
288 assertTrue(p.prestartCoreThread());
289 assertEquals(3, p.getPoolSize());
290 assertTrue(p.prestartCoreThread());
291 assertEquals(4, p.getPoolSize());
292 assertFalse(p.prestartCoreThread());
293 assertEquals(4, p.getPoolSize());
294 }
295 }
296
297 /**
298 * prestartAllCoreThreads starts all corePoolSize threads
299 */
300 public void testPrestartAllCoreThreads() {
301 final ThreadPoolExecutor p =
302 new CustomTPE(2, 6,
303 LONG_DELAY_MS, MILLISECONDS,
304 new ArrayBlockingQueue<Runnable>(10));
305 try (PoolCleaner cleaner = cleaner(p)) {
306 assertEquals(0, p.getPoolSize());
307 p.prestartAllCoreThreads();
308 assertEquals(2, p.getPoolSize());
309 p.prestartAllCoreThreads();
310 assertEquals(2, p.getPoolSize());
311 p.setCorePoolSize(4);
312 p.prestartAllCoreThreads();
313 assertEquals(4, p.getPoolSize());
314 p.prestartAllCoreThreads();
315 assertEquals(4, p.getPoolSize());
316 }
317 }
318
319 /**
320 * getCompletedTaskCount increases, but doesn't overestimate,
321 * when tasks complete
322 */
323 public void testGetCompletedTaskCount() throws InterruptedException {
324 final ThreadPoolExecutor p =
325 new CustomTPE(2, 2,
326 LONG_DELAY_MS, MILLISECONDS,
327 new ArrayBlockingQueue<Runnable>(10));
328 try (PoolCleaner cleaner = cleaner(p)) {
329 final CountDownLatch threadStarted = new CountDownLatch(1);
330 final CountDownLatch threadProceed = new CountDownLatch(1);
331 final CountDownLatch threadDone = new CountDownLatch(1);
332 assertEquals(0, p.getCompletedTaskCount());
333 p.execute(new CheckedRunnable() {
334 public void realRun() throws InterruptedException {
335 threadStarted.countDown();
336 assertEquals(0, p.getCompletedTaskCount());
337 await(threadProceed);
338 threadDone.countDown();
339 }});
340 await(threadStarted);
341 assertEquals(0, p.getCompletedTaskCount());
342 threadProceed.countDown();
343 await(threadDone);
344 long startTime = System.nanoTime();
345 while (p.getCompletedTaskCount() != 1) {
346 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
347 fail("timed out");
348 Thread.yield();
349 }
350 }
351 }
352
353 /**
354 * getCorePoolSize returns size given in constructor if not otherwise set
355 */
356 public void testGetCorePoolSize() {
357 final ThreadPoolExecutor p =
358 new CustomTPE(1, 1,
359 LONG_DELAY_MS, MILLISECONDS,
360 new ArrayBlockingQueue<Runnable>(10));
361 try (PoolCleaner cleaner = cleaner(p)) {
362 assertEquals(1, p.getCorePoolSize());
363 }
364 }
365
366 /**
367 * getKeepAliveTime returns value given in constructor if not otherwise set
368 */
369 public void testGetKeepAliveTime() {
370 final ThreadPoolExecutor p =
371 new CustomTPE(2, 2,
372 1000, MILLISECONDS,
373 new ArrayBlockingQueue<Runnable>(10));
374 try (PoolCleaner cleaner = cleaner(p)) {
375 assertEquals(1, p.getKeepAliveTime(SECONDS));
376 }
377 }
378
379 /**
380 * getThreadFactory returns factory in constructor if not set
381 */
382 public void testGetThreadFactory() {
383 final ThreadFactory threadFactory = new SimpleThreadFactory();
384 final ThreadPoolExecutor p =
385 new CustomTPE(1, 2,
386 LONG_DELAY_MS, MILLISECONDS,
387 new ArrayBlockingQueue<Runnable>(10),
388 threadFactory,
389 new NoOpREHandler());
390 try (PoolCleaner cleaner = cleaner(p)) {
391 assertSame(threadFactory, p.getThreadFactory());
392 }
393 }
394
395 /**
396 * setThreadFactory sets the thread factory returned by getThreadFactory
397 */
398 public void testSetThreadFactory() {
399 final ThreadPoolExecutor p =
400 new CustomTPE(1, 2,
401 LONG_DELAY_MS, MILLISECONDS,
402 new ArrayBlockingQueue<Runnable>(10));
403 try (PoolCleaner cleaner = cleaner(p)) {
404 ThreadFactory threadFactory = new SimpleThreadFactory();
405 p.setThreadFactory(threadFactory);
406 assertSame(threadFactory, p.getThreadFactory());
407 }
408 }
409
410 /**
411 * setThreadFactory(null) throws NPE
412 */
413 public void testSetThreadFactoryNull() {
414 final ThreadPoolExecutor p =
415 new CustomTPE(1, 2,
416 LONG_DELAY_MS, MILLISECONDS,
417 new ArrayBlockingQueue<Runnable>(10));
418 try (PoolCleaner cleaner = cleaner(p)) {
419 try {
420 p.setThreadFactory(null);
421 shouldThrow();
422 } catch (NullPointerException success) {}
423 }
424 }
425
426 /**
427 * getRejectedExecutionHandler returns handler in constructor if not set
428 */
429 public void testGetRejectedExecutionHandler() {
430 final RejectedExecutionHandler handler = new NoOpREHandler();
431 final ThreadPoolExecutor p =
432 new CustomTPE(1, 2,
433 LONG_DELAY_MS, MILLISECONDS,
434 new ArrayBlockingQueue<Runnable>(10),
435 handler);
436 try (PoolCleaner cleaner = cleaner(p)) {
437 assertSame(handler, p.getRejectedExecutionHandler());
438 }
439 }
440
441 /**
442 * setRejectedExecutionHandler sets the handler returned by
443 * getRejectedExecutionHandler
444 */
445 public void testSetRejectedExecutionHandler() {
446 final ThreadPoolExecutor p =
447 new CustomTPE(1, 2,
448 LONG_DELAY_MS, MILLISECONDS,
449 new ArrayBlockingQueue<Runnable>(10));
450 try (PoolCleaner cleaner = cleaner(p)) {
451 RejectedExecutionHandler handler = new NoOpREHandler();
452 p.setRejectedExecutionHandler(handler);
453 assertSame(handler, p.getRejectedExecutionHandler());
454 }
455 }
456
457 /**
458 * setRejectedExecutionHandler(null) throws NPE
459 */
460 public void testSetRejectedExecutionHandlerNull() {
461 final ThreadPoolExecutor p =
462 new CustomTPE(1, 2,
463 LONG_DELAY_MS, MILLISECONDS,
464 new ArrayBlockingQueue<Runnable>(10));
465 try (PoolCleaner cleaner = cleaner(p)) {
466 try {
467 p.setRejectedExecutionHandler(null);
468 shouldThrow();
469 } catch (NullPointerException success) {}
470 }
471 }
472
473 /**
474 * getLargestPoolSize increases, but doesn't overestimate, when
475 * multiple threads active
476 */
477 public void testGetLargestPoolSize() throws InterruptedException {
478 final int THREADS = 3;
479 final CountDownLatch done = new CountDownLatch(1);
480 final ThreadPoolExecutor p =
481 new CustomTPE(THREADS, THREADS,
482 LONG_DELAY_MS, MILLISECONDS,
483 new ArrayBlockingQueue<Runnable>(10));
484 try (PoolCleaner cleaner = cleaner(p, done)) {
485 assertEquals(0, p.getLargestPoolSize());
486 final CountDownLatch threadsStarted = new CountDownLatch(THREADS);
487 for (int i = 0; i < THREADS; i++)
488 p.execute(new CheckedRunnable() {
489 public void realRun() throws InterruptedException {
490 threadsStarted.countDown();
491 await(done);
492 assertEquals(THREADS, p.getLargestPoolSize());
493 }});
494 await(threadsStarted);
495 assertEquals(THREADS, p.getLargestPoolSize());
496 }
497 assertEquals(THREADS, p.getLargestPoolSize());
498 }
499
500 /**
501 * getMaximumPoolSize returns value given in constructor if not
502 * otherwise set
503 */
504 public void testGetMaximumPoolSize() {
505 final ThreadPoolExecutor p =
506 new CustomTPE(2, 3,
507 LONG_DELAY_MS, MILLISECONDS,
508 new ArrayBlockingQueue<Runnable>(10));
509 try (PoolCleaner cleaner = cleaner(p)) {
510 assertEquals(3, p.getMaximumPoolSize());
511 p.setMaximumPoolSize(5);
512 assertEquals(5, p.getMaximumPoolSize());
513 p.setMaximumPoolSize(4);
514 assertEquals(4, p.getMaximumPoolSize());
515 }
516 }
517
518 /**
519 * getPoolSize increases, but doesn't overestimate, when threads
520 * become active
521 */
522 public void testGetPoolSize() throws InterruptedException {
523 final CountDownLatch done = new CountDownLatch(1);
524 final ThreadPoolExecutor p =
525 new CustomTPE(1, 1,
526 LONG_DELAY_MS, MILLISECONDS,
527 new ArrayBlockingQueue<Runnable>(10));
528 try (PoolCleaner cleaner = cleaner(p, done)) {
529 assertEquals(0, p.getPoolSize());
530 final CountDownLatch threadStarted = new CountDownLatch(1);
531 p.execute(new CheckedRunnable() {
532 public void realRun() throws InterruptedException {
533 threadStarted.countDown();
534 assertEquals(1, p.getPoolSize());
535 await(done);
536 }});
537 await(threadStarted);
538 assertEquals(1, p.getPoolSize());
539 }
540 }
541
542 /**
543 * getTaskCount increases, but doesn't overestimate, when tasks submitted
544 */
545 public void testGetTaskCount() throws InterruptedException {
546 final int TASKS = 3;
547 final CountDownLatch done = new CountDownLatch(1);
548 final ThreadPoolExecutor p =
549 new CustomTPE(1, 1,
550 LONG_DELAY_MS, MILLISECONDS,
551 new ArrayBlockingQueue<Runnable>(10));
552 try (PoolCleaner cleaner = cleaner(p, done)) {
553 final CountDownLatch threadStarted = new CountDownLatch(1);
554 assertEquals(0, p.getTaskCount());
555 assertEquals(0, p.getCompletedTaskCount());
556 p.execute(new CheckedRunnable() {
557 public void realRun() throws InterruptedException {
558 threadStarted.countDown();
559 await(done);
560 }});
561 await(threadStarted);
562 assertEquals(1, p.getTaskCount());
563 assertEquals(0, p.getCompletedTaskCount());
564 for (int i = 0; i < TASKS; i++) {
565 assertEquals(1 + i, p.getTaskCount());
566 p.execute(new CheckedRunnable() {
567 public void realRun() throws InterruptedException {
568 threadStarted.countDown();
569 assertEquals(1 + TASKS, p.getTaskCount());
570 await(done);
571 }});
572 }
573 assertEquals(1 + TASKS, p.getTaskCount());
574 assertEquals(0, p.getCompletedTaskCount());
575 }
576 assertEquals(1 + TASKS, p.getTaskCount());
577 assertEquals(1 + TASKS, p.getCompletedTaskCount());
578 }
579
580 /**
581 * isShutdown is false before shutdown, true after
582 */
583 public void testIsShutdown() {
584 final ThreadPoolExecutor p =
585 new CustomTPE(1, 1,
586 LONG_DELAY_MS, MILLISECONDS,
587 new ArrayBlockingQueue<Runnable>(10));
588 try (PoolCleaner cleaner = cleaner(p)) {
589 assertFalse(p.isShutdown());
590 try { p.shutdown(); } catch (SecurityException ok) { return; }
591 assertTrue(p.isShutdown());
592 }
593 }
594
595 /**
596 * isTerminated is false before termination, true after
597 */
598 public void testIsTerminated() throws InterruptedException {
599 final ThreadPoolExecutor p =
600 new CustomTPE(1, 1,
601 LONG_DELAY_MS, MILLISECONDS,
602 new ArrayBlockingQueue<Runnable>(10));
603 try (PoolCleaner cleaner = cleaner(p)) {
604 final CountDownLatch threadStarted = new CountDownLatch(1);
605 final CountDownLatch done = new CountDownLatch(1);
606 assertFalse(p.isTerminating());
607 p.execute(new CheckedRunnable() {
608 public void realRun() throws InterruptedException {
609 assertFalse(p.isTerminating());
610 threadStarted.countDown();
611 await(done);
612 }});
613 await(threadStarted);
614 assertFalse(p.isTerminating());
615 done.countDown();
616 try { p.shutdown(); } catch (SecurityException ok) { return; }
617 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
618 assertTrue(p.isTerminated());
619 assertFalse(p.isTerminating());
620 }
621 }
622
623 /**
624 * isTerminating is not true when running or when terminated
625 */
626 public void testIsTerminating() throws InterruptedException {
627 final ThreadPoolExecutor p =
628 new CustomTPE(1, 1,
629 LONG_DELAY_MS, MILLISECONDS,
630 new ArrayBlockingQueue<Runnable>(10));
631 try (PoolCleaner cleaner = cleaner(p)) {
632 final CountDownLatch threadStarted = new CountDownLatch(1);
633 final CountDownLatch done = new CountDownLatch(1);
634 assertFalse(p.isTerminating());
635 p.execute(new CheckedRunnable() {
636 public void realRun() throws InterruptedException {
637 assertFalse(p.isTerminating());
638 threadStarted.countDown();
639 await(done);
640 }});
641 await(threadStarted);
642 assertFalse(p.isTerminating());
643 done.countDown();
644 try { p.shutdown(); } catch (SecurityException ok) { return; }
645 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
646 assertTrue(p.isTerminated());
647 assertFalse(p.isTerminating());
648 }
649 }
650
651 /**
652 * getQueue returns the work queue, which contains queued tasks
653 */
654 public void testGetQueue() throws InterruptedException {
655 final CountDownLatch done = new CountDownLatch(1);
656 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<>(10);
657 final ThreadPoolExecutor p =
658 new CustomTPE(1, 1,
659 LONG_DELAY_MS, MILLISECONDS,
660 q);
661 try (PoolCleaner cleaner = cleaner(p, done)) {
662 final CountDownLatch threadStarted = new CountDownLatch(1);
663 FutureTask[] tasks = new FutureTask[5];
664 for (int i = 0; i < tasks.length; i++) {
665 Callable<Boolean> task = new CheckedCallable<Boolean>() {
666 public Boolean realCall() throws InterruptedException {
667 threadStarted.countDown();
668 assertSame(q, p.getQueue());
669 await(done);
670 return Boolean.TRUE;
671 }};
672 tasks[i] = new FutureTask(task);
673 p.execute(tasks[i]);
674 }
675 await(threadStarted);
676 assertSame(q, p.getQueue());
677 assertFalse(q.contains(tasks[0]));
678 assertTrue(q.contains(tasks[tasks.length - 1]));
679 assertEquals(tasks.length - 1, q.size());
680 }
681 }
682
683 /**
684 * remove(task) removes queued task, and fails to remove active task
685 */
686 public void testRemove() throws InterruptedException {
687 final CountDownLatch done = new CountDownLatch(1);
688 BlockingQueue<Runnable> q = new ArrayBlockingQueue<>(10);
689 final ThreadPoolExecutor p =
690 new CustomTPE(1, 1,
691 LONG_DELAY_MS, MILLISECONDS,
692 q);
693 try (PoolCleaner cleaner = cleaner(p, done)) {
694 Runnable[] tasks = new Runnable[6];
695 final CountDownLatch threadStarted = new CountDownLatch(1);
696 for (int i = 0; i < tasks.length; i++) {
697 tasks[i] = new CheckedRunnable() {
698 public void realRun() throws InterruptedException {
699 threadStarted.countDown();
700 await(done);
701 }};
702 p.execute(tasks[i]);
703 }
704 await(threadStarted);
705 assertFalse(p.remove(tasks[0]));
706 assertTrue(q.contains(tasks[4]));
707 assertTrue(q.contains(tasks[3]));
708 assertTrue(p.remove(tasks[4]));
709 assertFalse(p.remove(tasks[4]));
710 assertFalse(q.contains(tasks[4]));
711 assertTrue(q.contains(tasks[3]));
712 assertTrue(p.remove(tasks[3]));
713 assertFalse(q.contains(tasks[3]));
714 }
715 }
716
717 /**
718 * purge removes cancelled tasks from the queue
719 */
720 public void testPurge() throws InterruptedException {
721 final CountDownLatch threadStarted = new CountDownLatch(1);
722 final CountDownLatch done = new CountDownLatch(1);
723 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<>(10);
724 final ThreadPoolExecutor p =
725 new CustomTPE(1, 1,
726 LONG_DELAY_MS, MILLISECONDS,
727 q);
728 try (PoolCleaner cleaner = cleaner(p, done)) {
729 FutureTask[] tasks = new FutureTask[5];
730 for (int i = 0; i < tasks.length; i++) {
731 Callable<Boolean> task = new CheckedCallable<Boolean>() {
732 public Boolean realCall() throws InterruptedException {
733 threadStarted.countDown();
734 await(done);
735 return Boolean.TRUE;
736 }};
737 tasks[i] = new FutureTask(task);
738 p.execute(tasks[i]);
739 }
740 await(threadStarted);
741 assertEquals(tasks.length, p.getTaskCount());
742 assertEquals(tasks.length - 1, q.size());
743 assertEquals(1L, p.getActiveCount());
744 assertEquals(0L, p.getCompletedTaskCount());
745 tasks[4].cancel(true);
746 tasks[3].cancel(false);
747 p.purge();
748 assertEquals(tasks.length - 3, q.size());
749 assertEquals(tasks.length - 2, p.getTaskCount());
750 p.purge(); // Nothing to do
751 assertEquals(tasks.length - 3, q.size());
752 assertEquals(tasks.length - 2, p.getTaskCount());
753 }
754 }
755
756 /**
757 * shutdownNow returns a list containing tasks that were not run,
758 * and those tasks are drained from the queue
759 */
760 public void testShutdownNow() throws InterruptedException {
761 final int poolSize = 2;
762 final int count = 5;
763 final AtomicInteger ran = new AtomicInteger(0);
764 final ThreadPoolExecutor p =
765 new CustomTPE(poolSize, poolSize,
766 LONG_DELAY_MS, MILLISECONDS,
767 new ArrayBlockingQueue<Runnable>(10));
768 final CountDownLatch threadsStarted = new CountDownLatch(poolSize);
769 Runnable waiter = new CheckedRunnable() { public void realRun() {
770 threadsStarted.countDown();
771 try {
772 MILLISECONDS.sleep(2 * LONG_DELAY_MS);
773 } catch (InterruptedException success) {}
774 ran.getAndIncrement();
775 }};
776 for (int i = 0; i < count; i++)
777 p.execute(waiter);
778 await(threadsStarted);
779 assertEquals(poolSize, p.getActiveCount());
780 assertEquals(0, p.getCompletedTaskCount());
781 final List<Runnable> queuedTasks;
782 try {
783 queuedTasks = p.shutdownNow();
784 } catch (SecurityException ok) {
785 return; // Allowed in case test doesn't have privs
786 }
787 assertTrue(p.isShutdown());
788 assertTrue(p.getQueue().isEmpty());
789 assertEquals(count - poolSize, queuedTasks.size());
790 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
791 assertTrue(p.isTerminated());
792 assertEquals(poolSize, ran.get());
793 assertEquals(poolSize, p.getCompletedTaskCount());
794 }
795
796 // Exception Tests
797
798 /**
799 * Constructor throws if corePoolSize argument is less than zero
800 */
801 public void testConstructor1() {
802 try {
803 new CustomTPE(-1, 1, 1L, SECONDS,
804 new ArrayBlockingQueue<Runnable>(10));
805 shouldThrow();
806 } catch (IllegalArgumentException success) {}
807 }
808
809 /**
810 * Constructor throws if maximumPoolSize is less than zero
811 */
812 public void testConstructor2() {
813 try {
814 new CustomTPE(1, -1, 1L, SECONDS,
815 new ArrayBlockingQueue<Runnable>(10));
816 shouldThrow();
817 } catch (IllegalArgumentException success) {}
818 }
819
820 /**
821 * Constructor throws if maximumPoolSize is equal to zero
822 */
823 public void testConstructor3() {
824 try {
825 new CustomTPE(1, 0, 1L, SECONDS,
826 new ArrayBlockingQueue<Runnable>(10));
827 shouldThrow();
828 } catch (IllegalArgumentException success) {}
829 }
830
831 /**
832 * Constructor throws if keepAliveTime is less than zero
833 */
834 public void testConstructor4() {
835 try {
836 new CustomTPE(1, 2, -1L, SECONDS,
837 new ArrayBlockingQueue<Runnable>(10));
838 shouldThrow();
839 } catch (IllegalArgumentException success) {}
840 }
841
842 /**
843 * Constructor throws if corePoolSize is greater than the maximumPoolSize
844 */
845 public void testConstructor5() {
846 try {
847 new CustomTPE(2, 1, 1L, SECONDS,
848 new ArrayBlockingQueue<Runnable>(10));
849 shouldThrow();
850 } catch (IllegalArgumentException success) {}
851 }
852
853 /**
854 * Constructor throws if workQueue is set to null
855 */
856 public void testConstructorNullPointerException() {
857 try {
858 new CustomTPE(1, 2, 1L, SECONDS, null);
859 shouldThrow();
860 } catch (NullPointerException success) {}
861 }
862
863 /**
864 * Constructor throws if corePoolSize argument is less than zero
865 */
866 public void testConstructor6() {
867 try {
868 new CustomTPE(-1, 1, 1L, SECONDS,
869 new ArrayBlockingQueue<Runnable>(10),
870 new SimpleThreadFactory());
871 shouldThrow();
872 } catch (IllegalArgumentException success) {}
873 }
874
875 /**
876 * Constructor throws if maximumPoolSize is less than zero
877 */
878 public void testConstructor7() {
879 try {
880 new CustomTPE(1,-1, 1L, SECONDS,
881 new ArrayBlockingQueue<Runnable>(10),
882 new SimpleThreadFactory());
883 shouldThrow();
884 } catch (IllegalArgumentException success) {}
885 }
886
887 /**
888 * Constructor throws if maximumPoolSize is equal to zero
889 */
890 public void testConstructor8() {
891 try {
892 new CustomTPE(1, 0, 1L, SECONDS,
893 new ArrayBlockingQueue<Runnable>(10),
894 new SimpleThreadFactory());
895 shouldThrow();
896 } catch (IllegalArgumentException success) {}
897 }
898
899 /**
900 * Constructor throws if keepAliveTime is less than zero
901 */
902 public void testConstructor9() {
903 try {
904 new CustomTPE(1, 2, -1L, SECONDS,
905 new ArrayBlockingQueue<Runnable>(10),
906 new SimpleThreadFactory());
907 shouldThrow();
908 } catch (IllegalArgumentException success) {}
909 }
910
911 /**
912 * Constructor throws if corePoolSize is greater than the maximumPoolSize
913 */
914 public void testConstructor10() {
915 try {
916 new CustomTPE(2, 1, 1L, SECONDS,
917 new ArrayBlockingQueue<Runnable>(10),
918 new SimpleThreadFactory());
919 shouldThrow();
920 } catch (IllegalArgumentException success) {}
921 }
922
923 /**
924 * Constructor throws if workQueue is set to null
925 */
926 public void testConstructorNullPointerException2() {
927 try {
928 new CustomTPE(1, 2, 1L, SECONDS, null, new SimpleThreadFactory());
929 shouldThrow();
930 } catch (NullPointerException success) {}
931 }
932
933 /**
934 * Constructor throws if threadFactory is set to null
935 */
936 public void testConstructorNullPointerException3() {
937 try {
938 new CustomTPE(1, 2, 1L, SECONDS,
939 new ArrayBlockingQueue<Runnable>(10),
940 (ThreadFactory) null);
941 shouldThrow();
942 } catch (NullPointerException success) {}
943 }
944
945 /**
946 * Constructor throws if corePoolSize argument is less than zero
947 */
948 public void testConstructor11() {
949 try {
950 new CustomTPE(-1, 1, 1L, SECONDS,
951 new ArrayBlockingQueue<Runnable>(10),
952 new NoOpREHandler());
953 shouldThrow();
954 } catch (IllegalArgumentException success) {}
955 }
956
957 /**
958 * Constructor throws if maximumPoolSize is less than zero
959 */
960 public void testConstructor12() {
961 try {
962 new CustomTPE(1, -1, 1L, SECONDS,
963 new ArrayBlockingQueue<Runnable>(10),
964 new NoOpREHandler());
965 shouldThrow();
966 } catch (IllegalArgumentException success) {}
967 }
968
969 /**
970 * Constructor throws if maximumPoolSize is equal to zero
971 */
972 public void testConstructor13() {
973 try {
974 new CustomTPE(1, 0, 1L, SECONDS,
975 new ArrayBlockingQueue<Runnable>(10),
976 new NoOpREHandler());
977 shouldThrow();
978 } catch (IllegalArgumentException success) {}
979 }
980
981 /**
982 * Constructor throws if keepAliveTime is less than zero
983 */
984 public void testConstructor14() {
985 try {
986 new CustomTPE(1, 2, -1L, SECONDS,
987 new ArrayBlockingQueue<Runnable>(10),
988 new NoOpREHandler());
989 shouldThrow();
990 } catch (IllegalArgumentException success) {}
991 }
992
993 /**
994 * Constructor throws if corePoolSize is greater than the maximumPoolSize
995 */
996 public void testConstructor15() {
997 try {
998 new CustomTPE(2, 1, 1L, SECONDS,
999 new ArrayBlockingQueue<Runnable>(10),
1000 new NoOpREHandler());
1001 shouldThrow();
1002 } catch (IllegalArgumentException success) {}
1003 }
1004
1005 /**
1006 * Constructor throws if workQueue is set to null
1007 */
1008 public void testConstructorNullPointerException4() {
1009 try {
1010 new CustomTPE(1, 2, 1L, SECONDS,
1011 null,
1012 new NoOpREHandler());
1013 shouldThrow();
1014 } catch (NullPointerException success) {}
1015 }
1016
1017 /**
1018 * Constructor throws if handler is set to null
1019 */
1020 public void testConstructorNullPointerException5() {
1021 try {
1022 new CustomTPE(1, 2, 1L, SECONDS,
1023 new ArrayBlockingQueue<Runnable>(10),
1024 (RejectedExecutionHandler) null);
1025 shouldThrow();
1026 } catch (NullPointerException success) {}
1027 }
1028
1029 /**
1030 * Constructor throws if corePoolSize argument is less than zero
1031 */
1032 public void testConstructor16() {
1033 try {
1034 new CustomTPE(-1, 1, 1L, SECONDS,
1035 new ArrayBlockingQueue<Runnable>(10),
1036 new SimpleThreadFactory(),
1037 new NoOpREHandler());
1038 shouldThrow();
1039 } catch (IllegalArgumentException success) {}
1040 }
1041
1042 /**
1043 * Constructor throws if maximumPoolSize is less than zero
1044 */
1045 public void testConstructor17() {
1046 try {
1047 new CustomTPE(1, -1, 1L, SECONDS,
1048 new ArrayBlockingQueue<Runnable>(10),
1049 new SimpleThreadFactory(),
1050 new NoOpREHandler());
1051 shouldThrow();
1052 } catch (IllegalArgumentException success) {}
1053 }
1054
1055 /**
1056 * Constructor throws if maximumPoolSize is equal to zero
1057 */
1058 public void testConstructor18() {
1059 try {
1060 new CustomTPE(1, 0, 1L, SECONDS,
1061 new ArrayBlockingQueue<Runnable>(10),
1062 new SimpleThreadFactory(),
1063 new NoOpREHandler());
1064 shouldThrow();
1065 } catch (IllegalArgumentException success) {}
1066 }
1067
1068 /**
1069 * Constructor throws if keepAliveTime is less than zero
1070 */
1071 public void testConstructor19() {
1072 try {
1073 new CustomTPE(1, 2, -1L, SECONDS,
1074 new ArrayBlockingQueue<Runnable>(10),
1075 new SimpleThreadFactory(),
1076 new NoOpREHandler());
1077 shouldThrow();
1078 } catch (IllegalArgumentException success) {}
1079 }
1080
1081 /**
1082 * Constructor throws if corePoolSize is greater than the maximumPoolSize
1083 */
1084 public void testConstructor20() {
1085 try {
1086 new CustomTPE(2, 1, 1L, SECONDS,
1087 new ArrayBlockingQueue<Runnable>(10),
1088 new SimpleThreadFactory(),
1089 new NoOpREHandler());
1090 shouldThrow();
1091 } catch (IllegalArgumentException success) {}
1092 }
1093
1094 /**
1095 * Constructor throws if workQueue is null
1096 */
1097 public void testConstructorNullPointerException6() {
1098 try {
1099 new CustomTPE(1, 2, 1L, SECONDS,
1100 null,
1101 new SimpleThreadFactory(),
1102 new NoOpREHandler());
1103 shouldThrow();
1104 } catch (NullPointerException success) {}
1105 }
1106
1107 /**
1108 * Constructor throws if handler is null
1109 */
1110 public void testConstructorNullPointerException7() {
1111 try {
1112 new CustomTPE(1, 2, 1L, SECONDS,
1113 new ArrayBlockingQueue<Runnable>(10),
1114 new SimpleThreadFactory(),
1115 (RejectedExecutionHandler) null);
1116 shouldThrow();
1117 } catch (NullPointerException success) {}
1118 }
1119
1120 /**
1121 * Constructor throws if ThreadFactory is null
1122 */
1123 public void testConstructorNullPointerException8() {
1124 try {
1125 new CustomTPE(1, 2, 1L, SECONDS,
1126 new ArrayBlockingQueue<Runnable>(10),
1127 (ThreadFactory) null,
1128 new NoOpREHandler());
1129 shouldThrow();
1130 } catch (NullPointerException success) {}
1131 }
1132
1133 /**
1134 * Submitted tasks are rejected when saturated or shutdown
1135 */
1136 public void testSubmittedTasksRejectedWhenSaturatedOrShutdown() throws InterruptedException {
1137 final ThreadPoolExecutor p =
1138 new CustomTPE(1, 1,
1139 LONG_DELAY_MS, MILLISECONDS,
1140 new ArrayBlockingQueue<Runnable>(1));
1141 final int saturatedSize = saturatedSize(p);
1142 final ThreadLocalRandom rnd = ThreadLocalRandom.current();
1143 final CountDownLatch threadsStarted = new CountDownLatch(p.getMaximumPoolSize());
1144 final CountDownLatch done = new CountDownLatch(1);
1145 final Runnable r = () -> {
1146 threadsStarted.countDown();
1147 for (;;) {
1148 try {
1149 done.await();
1150 return;
1151 } catch (InterruptedException shutdownNowDeliberatelyIgnored) {}
1152 }};
1153 final Callable<Boolean> c = () -> {
1154 threadsStarted.countDown();
1155 for (;;) {
1156 try {
1157 done.await();
1158 return Boolean.TRUE;
1159 } catch (InterruptedException shutdownNowDeliberatelyIgnored) {}
1160 }};
1161 final boolean shutdownNow = rnd.nextBoolean();
1162
1163 try (PoolCleaner cleaner = cleaner(p, done)) {
1164 // saturate
1165 for (int i = saturatedSize; i--> 0; ) {
1166 switch (rnd.nextInt(4)) {
1167 case 0: p.execute(r); break;
1168 case 1: assertFalse(p.submit(r).isDone()); break;
1169 case 2: assertFalse(p.submit(r, Boolean.TRUE).isDone()); break;
1170 case 3: assertFalse(p.submit(c).isDone()); break;
1171 }
1172 }
1173
1174 await(threadsStarted);
1175 assertTaskSubmissionsAreRejected(p);
1176
1177 if (shutdownNow)
1178 p.shutdownNow();
1179 else
1180 p.shutdown();
1181 // Pool is shutdown, but not yet terminated
1182 assertTaskSubmissionsAreRejected(p);
1183 assertFalse(p.isTerminated());
1184
1185 done.countDown(); // release blocking tasks
1186 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
1187
1188 assertTaskSubmissionsAreRejected(p);
1189 }
1190 assertEquals(saturatedSize(p)
1191 - (shutdownNow ? p.getQueue().remainingCapacity() : 0),
1192 p.getCompletedTaskCount());
1193 }
1194
1195 /**
1196 * executor using DiscardOldestPolicy drops oldest task if saturated.
1197 */
1198 public void testSaturatedExecute_DiscardOldestPolicy() {
1199 final CountDownLatch done = new CountDownLatch(1);
1200 LatchAwaiter r1 = awaiter(done);
1201 LatchAwaiter r2 = awaiter(done);
1202 LatchAwaiter r3 = awaiter(done);
1203 final ThreadPoolExecutor p =
1204 new CustomTPE(1, 1,
1205 LONG_DELAY_MS, MILLISECONDS,
1206 new ArrayBlockingQueue<Runnable>(1),
1207 new ThreadPoolExecutor.DiscardOldestPolicy());
1208 try (PoolCleaner cleaner = cleaner(p, done)) {
1209 assertEquals(LatchAwaiter.NEW, r1.state);
1210 assertEquals(LatchAwaiter.NEW, r2.state);
1211 assertEquals(LatchAwaiter.NEW, r3.state);
1212 p.execute(r1);
1213 p.execute(r2);
1214 assertTrue(p.getQueue().contains(r2));
1215 p.execute(r3);
1216 assertFalse(p.getQueue().contains(r2));
1217 assertTrue(p.getQueue().contains(r3));
1218 }
1219 assertEquals(LatchAwaiter.DONE, r1.state);
1220 assertEquals(LatchAwaiter.NEW, r2.state);
1221 assertEquals(LatchAwaiter.DONE, r3.state);
1222 }
1223
1224 /**
1225 * execute using DiscardOldestPolicy drops task on shutdown
1226 */
1227 public void testDiscardOldestOnShutdown() {
1228 final ThreadPoolExecutor p =
1229 new CustomTPE(1, 1,
1230 LONG_DELAY_MS, MILLISECONDS,
1231 new ArrayBlockingQueue<Runnable>(1),
1232 new ThreadPoolExecutor.DiscardOldestPolicy());
1233
1234 try { p.shutdown(); } catch (SecurityException ok) { return; }
1235 try (PoolCleaner cleaner = cleaner(p)) {
1236 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1237 p.execute(r);
1238 assertFalse(r.done);
1239 }
1240 }
1241
1242 /**
1243 * Submitting null tasks throws NullPointerException
1244 */
1245 public void testNullTaskSubmission() {
1246 final ThreadPoolExecutor p =
1247 new CustomTPE(1, 2,
1248 1L, SECONDS,
1249 new ArrayBlockingQueue<Runnable>(10));
1250 try (PoolCleaner cleaner = cleaner(p)) {
1251 assertNullTaskSubmissionThrowsNullPointerException(p);
1252 }
1253 }
1254
1255 /**
1256 * setCorePoolSize of negative value throws IllegalArgumentException
1257 */
1258 public void testCorePoolSizeIllegalArgumentException() {
1259 final ThreadPoolExecutor p =
1260 new CustomTPE(1, 2,
1261 LONG_DELAY_MS, MILLISECONDS,
1262 new ArrayBlockingQueue<Runnable>(10));
1263 try (PoolCleaner cleaner = cleaner(p)) {
1264 try {
1265 p.setCorePoolSize(-1);
1266 shouldThrow();
1267 } catch (IllegalArgumentException success) {}
1268 }
1269 }
1270
1271 /**
1272 * setMaximumPoolSize(int) throws IllegalArgumentException
1273 * if given a value less the core pool size
1274 */
1275 public void testMaximumPoolSizeIllegalArgumentException() {
1276 final ThreadPoolExecutor p =
1277 new CustomTPE(2, 3,
1278 LONG_DELAY_MS, MILLISECONDS,
1279 new ArrayBlockingQueue<Runnable>(10));
1280 try (PoolCleaner cleaner = cleaner(p)) {
1281 try {
1282 p.setMaximumPoolSize(1);
1283 shouldThrow();
1284 } catch (IllegalArgumentException success) {}
1285 }
1286 }
1287
1288 /**
1289 * setMaximumPoolSize throws IllegalArgumentException
1290 * if given a negative value
1291 */
1292 public void testMaximumPoolSizeIllegalArgumentException2() {
1293 final ThreadPoolExecutor p =
1294 new CustomTPE(2, 3,
1295 LONG_DELAY_MS, MILLISECONDS,
1296 new ArrayBlockingQueue<Runnable>(10));
1297 try (PoolCleaner cleaner = cleaner(p)) {
1298 try {
1299 p.setMaximumPoolSize(-1);
1300 shouldThrow();
1301 } catch (IllegalArgumentException success) {}
1302 }
1303 }
1304
1305 /**
1306 * setKeepAliveTime throws IllegalArgumentException
1307 * when given a negative value
1308 */
1309 public void testKeepAliveTimeIllegalArgumentException() {
1310 final ThreadPoolExecutor p =
1311 new CustomTPE(2, 3,
1312 LONG_DELAY_MS, MILLISECONDS,
1313 new ArrayBlockingQueue<Runnable>(10));
1314 try (PoolCleaner cleaner = cleaner(p)) {
1315 try {
1316 p.setKeepAliveTime(-1, MILLISECONDS);
1317 shouldThrow();
1318 } catch (IllegalArgumentException success) {}
1319 }
1320 }
1321
1322 /**
1323 * terminated() is called on termination
1324 */
1325 public void testTerminated() {
1326 CustomTPE p = new CustomTPE();
1327 try (PoolCleaner cleaner = cleaner(p)) {
1328 try { p.shutdown(); } catch (SecurityException ok) { return; }
1329 assertTrue(p.terminatedCalled());
1330 assertTrue(p.isShutdown());
1331 }
1332 }
1333
1334 /**
1335 * beforeExecute and afterExecute are called when executing task
1336 */
1337 public void testBeforeAfter() throws InterruptedException {
1338 CustomTPE p = new CustomTPE();
1339 try (PoolCleaner cleaner = cleaner(p)) {
1340 final CountDownLatch done = new CountDownLatch(1);
1341 p.execute(new CheckedRunnable() {
1342 public void realRun() {
1343 done.countDown();
1344 }});
1345 await(p.afterCalled);
1346 assertEquals(0, done.getCount());
1347 assertTrue(p.afterCalled());
1348 assertTrue(p.beforeCalled());
1349 }
1350 }
1351
1352 /**
1353 * completed submit of callable returns result
1354 */
1355 public void testSubmitCallable() throws Exception {
1356 final ExecutorService e =
1357 new CustomTPE(2, 2,
1358 LONG_DELAY_MS, MILLISECONDS,
1359 new ArrayBlockingQueue<Runnable>(10));
1360 try (PoolCleaner cleaner = cleaner(e)) {
1361 Future<String> future = e.submit(new StringTask());
1362 String result = future.get();
1363 assertSame(TEST_STRING, result);
1364 }
1365 }
1366
1367 /**
1368 * completed submit of runnable returns successfully
1369 */
1370 public void testSubmitRunnable() throws Exception {
1371 final ExecutorService e =
1372 new CustomTPE(2, 2,
1373 LONG_DELAY_MS, MILLISECONDS,
1374 new ArrayBlockingQueue<Runnable>(10));
1375 try (PoolCleaner cleaner = cleaner(e)) {
1376 Future<?> future = e.submit(new NoOpRunnable());
1377 future.get();
1378 assertTrue(future.isDone());
1379 }
1380 }
1381
1382 /**
1383 * completed submit of (runnable, result) returns result
1384 */
1385 public void testSubmitRunnable2() throws Exception {
1386 final ExecutorService e =
1387 new CustomTPE(2, 2,
1388 LONG_DELAY_MS, MILLISECONDS,
1389 new ArrayBlockingQueue<Runnable>(10));
1390 try (PoolCleaner cleaner = cleaner(e)) {
1391 Future<String> future = e.submit(new NoOpRunnable(), TEST_STRING);
1392 String result = future.get();
1393 assertSame(TEST_STRING, result);
1394 }
1395 }
1396
1397 /**
1398 * invokeAny(null) throws NullPointerException
1399 */
1400 public void testInvokeAny1() throws Exception {
1401 final ExecutorService e =
1402 new CustomTPE(2, 2,
1403 LONG_DELAY_MS, MILLISECONDS,
1404 new ArrayBlockingQueue<Runnable>(10));
1405 try (PoolCleaner cleaner = cleaner(e)) {
1406 try {
1407 e.invokeAny(null);
1408 shouldThrow();
1409 } catch (NullPointerException success) {}
1410 }
1411 }
1412
1413 /**
1414 * invokeAny(empty collection) throws IllegalArgumentException
1415 */
1416 public void testInvokeAny2() throws Exception {
1417 final ExecutorService e =
1418 new CustomTPE(2, 2,
1419 LONG_DELAY_MS, MILLISECONDS,
1420 new ArrayBlockingQueue<Runnable>(10));
1421 try (PoolCleaner cleaner = cleaner(e)) {
1422 try {
1423 e.invokeAny(new ArrayList<Callable<String>>());
1424 shouldThrow();
1425 } catch (IllegalArgumentException success) {}
1426 }
1427 }
1428
1429 /**
1430 * invokeAny(c) throws NPE if c has null elements
1431 */
1432 public void testInvokeAny3() throws Exception {
1433 CountDownLatch latch = new CountDownLatch(1);
1434 final ExecutorService e =
1435 new CustomTPE(2, 2,
1436 LONG_DELAY_MS, MILLISECONDS,
1437 new ArrayBlockingQueue<Runnable>(10));
1438 try (PoolCleaner cleaner = cleaner(e)) {
1439 List<Callable<String>> l = new ArrayList<>();
1440 l.add(latchAwaitingStringTask(latch));
1441 l.add(null);
1442 try {
1443 e.invokeAny(l);
1444 shouldThrow();
1445 } catch (NullPointerException success) {}
1446 latch.countDown();
1447 }
1448 }
1449
1450 /**
1451 * invokeAny(c) throws ExecutionException if no task completes
1452 */
1453 public void testInvokeAny4() throws Exception {
1454 final ExecutorService e =
1455 new CustomTPE(2, 2,
1456 LONG_DELAY_MS, MILLISECONDS,
1457 new ArrayBlockingQueue<Runnable>(10));
1458 try (PoolCleaner cleaner = cleaner(e)) {
1459 List<Callable<String>> l = new ArrayList<>();
1460 l.add(new NPETask());
1461 try {
1462 e.invokeAny(l);
1463 shouldThrow();
1464 } catch (ExecutionException success) {
1465 assertTrue(success.getCause() instanceof NullPointerException);
1466 }
1467 }
1468 }
1469
1470 /**
1471 * invokeAny(c) returns result of some task
1472 */
1473 public void testInvokeAny5() throws Exception {
1474 final ExecutorService e =
1475 new CustomTPE(2, 2,
1476 LONG_DELAY_MS, MILLISECONDS,
1477 new ArrayBlockingQueue<Runnable>(10));
1478 try (PoolCleaner cleaner = cleaner(e)) {
1479 List<Callable<String>> l = new ArrayList<>();
1480 l.add(new StringTask());
1481 l.add(new StringTask());
1482 String result = e.invokeAny(l);
1483 assertSame(TEST_STRING, result);
1484 }
1485 }
1486
1487 /**
1488 * invokeAll(null) throws NPE
1489 */
1490 public void testInvokeAll1() throws Exception {
1491 final ExecutorService e =
1492 new CustomTPE(2, 2,
1493 LONG_DELAY_MS, MILLISECONDS,
1494 new ArrayBlockingQueue<Runnable>(10));
1495 try (PoolCleaner cleaner = cleaner(e)) {
1496 try {
1497 e.invokeAll(null);
1498 shouldThrow();
1499 } catch (NullPointerException success) {}
1500 }
1501 }
1502
1503 /**
1504 * invokeAll(empty collection) returns empty list
1505 */
1506 public void testInvokeAll2() throws Exception {
1507 final ExecutorService e =
1508 new CustomTPE(2, 2,
1509 LONG_DELAY_MS, MILLISECONDS,
1510 new ArrayBlockingQueue<Runnable>(10));
1511 final Collection<Callable<String>> emptyCollection
1512 = Collections.emptyList();
1513 try (PoolCleaner cleaner = cleaner(e)) {
1514 List<Future<String>> r = e.invokeAll(emptyCollection);
1515 assertTrue(r.isEmpty());
1516 }
1517 }
1518
1519 /**
1520 * invokeAll(c) throws NPE if c has null elements
1521 */
1522 public void testInvokeAll3() throws Exception {
1523 final ExecutorService e =
1524 new CustomTPE(2, 2,
1525 LONG_DELAY_MS, MILLISECONDS,
1526 new ArrayBlockingQueue<Runnable>(10));
1527 try (PoolCleaner cleaner = cleaner(e)) {
1528 List<Callable<String>> l = new ArrayList<>();
1529 l.add(new StringTask());
1530 l.add(null);
1531 try {
1532 e.invokeAll(l);
1533 shouldThrow();
1534 } catch (NullPointerException success) {}
1535 }
1536 }
1537
1538 /**
1539 * get of element of invokeAll(c) throws exception on failed task
1540 */
1541 public void testInvokeAll4() throws Exception {
1542 final ExecutorService e =
1543 new CustomTPE(2, 2,
1544 LONG_DELAY_MS, MILLISECONDS,
1545 new ArrayBlockingQueue<Runnable>(10));
1546 try (PoolCleaner cleaner = cleaner(e)) {
1547 List<Callable<String>> l = new ArrayList<>();
1548 l.add(new NPETask());
1549 List<Future<String>> futures = e.invokeAll(l);
1550 assertEquals(1, futures.size());
1551 try {
1552 futures.get(0).get();
1553 shouldThrow();
1554 } catch (ExecutionException success) {
1555 assertTrue(success.getCause() instanceof NullPointerException);
1556 }
1557 }
1558 }
1559
1560 /**
1561 * invokeAll(c) returns results of all completed tasks
1562 */
1563 public void testInvokeAll5() throws Exception {
1564 final ExecutorService e =
1565 new CustomTPE(2, 2,
1566 LONG_DELAY_MS, MILLISECONDS,
1567 new ArrayBlockingQueue<Runnable>(10));
1568 try (PoolCleaner cleaner = cleaner(e)) {
1569 List<Callable<String>> l = new ArrayList<>();
1570 l.add(new StringTask());
1571 l.add(new StringTask());
1572 List<Future<String>> futures = e.invokeAll(l);
1573 assertEquals(2, futures.size());
1574 for (Future<String> future : futures)
1575 assertSame(TEST_STRING, future.get());
1576 }
1577 }
1578
1579 /**
1580 * timed invokeAny(null) throws NPE
1581 */
1582 public void testTimedInvokeAny1() throws Exception {
1583 final ExecutorService e =
1584 new CustomTPE(2, 2,
1585 LONG_DELAY_MS, MILLISECONDS,
1586 new ArrayBlockingQueue<Runnable>(10));
1587 try (PoolCleaner cleaner = cleaner(e)) {
1588 try {
1589 e.invokeAny(null, randomTimeout(), randomTimeUnit());
1590 shouldThrow();
1591 } catch (NullPointerException success) {}
1592 }
1593 }
1594
1595 /**
1596 * timed invokeAny(,,null) throws NPE
1597 */
1598 public void testTimedInvokeAnyNullTimeUnit() throws Exception {
1599 final ExecutorService e =
1600 new CustomTPE(2, 2,
1601 LONG_DELAY_MS, MILLISECONDS,
1602 new ArrayBlockingQueue<Runnable>(10));
1603 try (PoolCleaner cleaner = cleaner(e)) {
1604 List<Callable<String>> l = new ArrayList<>();
1605 l.add(new StringTask());
1606 try {
1607 e.invokeAny(l, randomTimeout(), null);
1608 shouldThrow();
1609 } catch (NullPointerException success) {}
1610 }
1611 }
1612
1613 /**
1614 * timed invokeAny(empty collection) throws IllegalArgumentException
1615 */
1616 public void testTimedInvokeAny2() throws Exception {
1617 final ExecutorService e =
1618 new CustomTPE(2, 2,
1619 LONG_DELAY_MS, MILLISECONDS,
1620 new ArrayBlockingQueue<Runnable>(10));
1621 final Collection<Callable<String>> emptyCollection
1622 = Collections.emptyList();
1623 try (PoolCleaner cleaner = cleaner(e)) {
1624 try {
1625 e.invokeAny(emptyCollection, randomTimeout(), randomTimeUnit());
1626 shouldThrow();
1627 } catch (IllegalArgumentException success) {}
1628 }
1629 }
1630
1631 /**
1632 * timed invokeAny(c) throws NPE if c has null elements
1633 */
1634 public void testTimedInvokeAny3() throws Exception {
1635 CountDownLatch latch = new CountDownLatch(1);
1636 final ExecutorService e =
1637 new CustomTPE(2, 2,
1638 LONG_DELAY_MS, MILLISECONDS,
1639 new ArrayBlockingQueue<Runnable>(10));
1640 try (PoolCleaner cleaner = cleaner(e)) {
1641 List<Callable<String>> l = new ArrayList<>();
1642 l.add(latchAwaitingStringTask(latch));
1643 l.add(null);
1644 try {
1645 e.invokeAny(l, randomTimeout(), MILLISECONDS);
1646 shouldThrow();
1647 } catch (NullPointerException success) {}
1648 latch.countDown();
1649 }
1650 }
1651
1652 /**
1653 * timed invokeAny(c) throws ExecutionException if no task completes
1654 */
1655 public void testTimedInvokeAny4() throws Exception {
1656 final ExecutorService e =
1657 new CustomTPE(2, 2,
1658 LONG_DELAY_MS, MILLISECONDS,
1659 new ArrayBlockingQueue<Runnable>(10));
1660 try (PoolCleaner cleaner = cleaner(e)) {
1661 long startTime = System.nanoTime();
1662 List<Callable<String>> l = new ArrayList<>();
1663 l.add(new NPETask());
1664 try {
1665 e.invokeAny(l, LONG_DELAY_MS, MILLISECONDS);
1666 shouldThrow();
1667 } catch (ExecutionException success) {
1668 assertTrue(success.getCause() instanceof NullPointerException);
1669 }
1670 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1671 }
1672 }
1673
1674 /**
1675 * timed invokeAny(c) returns result of some task
1676 */
1677 public void testTimedInvokeAny5() throws Exception {
1678 final ExecutorService e =
1679 new CustomTPE(2, 2,
1680 LONG_DELAY_MS, MILLISECONDS,
1681 new ArrayBlockingQueue<Runnable>(10));
1682 try (PoolCleaner cleaner = cleaner(e)) {
1683 long startTime = System.nanoTime();
1684 List<Callable<String>> l = new ArrayList<>();
1685 l.add(new StringTask());
1686 l.add(new StringTask());
1687 String result = e.invokeAny(l, LONG_DELAY_MS, MILLISECONDS);
1688 assertSame(TEST_STRING, result);
1689 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1690 }
1691 }
1692
1693 /**
1694 * timed invokeAll(null) throws NPE
1695 */
1696 public void testTimedInvokeAll1() throws Exception {
1697 final ExecutorService e =
1698 new CustomTPE(2, 2,
1699 LONG_DELAY_MS, MILLISECONDS,
1700 new ArrayBlockingQueue<Runnable>(10));
1701 try (PoolCleaner cleaner = cleaner(e)) {
1702 try {
1703 e.invokeAll(null, randomTimeout(), randomTimeUnit());
1704 shouldThrow();
1705 } catch (NullPointerException success) {}
1706 }
1707 }
1708
1709 /**
1710 * timed invokeAll(,,null) throws NPE
1711 */
1712 public void testTimedInvokeAllNullTimeUnit() throws Exception {
1713 final ExecutorService e =
1714 new CustomTPE(2, 2,
1715 LONG_DELAY_MS, MILLISECONDS,
1716 new ArrayBlockingQueue<Runnable>(10));
1717 try (PoolCleaner cleaner = cleaner(e)) {
1718 List<Callable<String>> l = new ArrayList<>();
1719 l.add(new StringTask());
1720 try {
1721 e.invokeAll(l, randomTimeout(), null);
1722 shouldThrow();
1723 } catch (NullPointerException success) {}
1724 }
1725 }
1726
1727 /**
1728 * timed invokeAll(empty collection) returns empty list
1729 */
1730 public void testTimedInvokeAll2() throws Exception {
1731 final ExecutorService e =
1732 new CustomTPE(2, 2,
1733 LONG_DELAY_MS, MILLISECONDS,
1734 new ArrayBlockingQueue<Runnable>(10));
1735 final Collection<Callable<String>> emptyCollection
1736 = Collections.emptyList();
1737 try (PoolCleaner cleaner = cleaner(e)) {
1738 List<Future<String>> r =
1739 e.invokeAll(emptyCollection, randomTimeout(), randomTimeUnit());
1740 assertTrue(r.isEmpty());
1741 }
1742 }
1743
1744 /**
1745 * timed invokeAll(c) throws NPE if c has null elements
1746 */
1747 public void testTimedInvokeAll3() throws Exception {
1748 final ExecutorService e =
1749 new CustomTPE(2, 2,
1750 LONG_DELAY_MS, MILLISECONDS,
1751 new ArrayBlockingQueue<Runnable>(10));
1752 try (PoolCleaner cleaner = cleaner(e)) {
1753 List<Callable<String>> l = new ArrayList<>();
1754 l.add(new StringTask());
1755 l.add(null);
1756 try {
1757 e.invokeAll(l, randomTimeout(), randomTimeUnit());
1758 shouldThrow();
1759 } catch (NullPointerException success) {}
1760 }
1761 }
1762
1763 /**
1764 * get of element of invokeAll(c) throws exception on failed task
1765 */
1766 public void testTimedInvokeAll4() throws Exception {
1767 final ExecutorService e =
1768 new CustomTPE(2, 2,
1769 LONG_DELAY_MS, MILLISECONDS,
1770 new ArrayBlockingQueue<Runnable>(10));
1771 try (PoolCleaner cleaner = cleaner(e)) {
1772 List<Callable<String>> l = new ArrayList<>();
1773 l.add(new NPETask());
1774 List<Future<String>> futures =
1775 e.invokeAll(l, LONG_DELAY_MS, MILLISECONDS);
1776 assertEquals(1, futures.size());
1777 try {
1778 futures.get(0).get();
1779 shouldThrow();
1780 } catch (ExecutionException success) {
1781 assertTrue(success.getCause() instanceof NullPointerException);
1782 }
1783 }
1784 }
1785
1786 /**
1787 * timed invokeAll(c) returns results of all completed tasks
1788 */
1789 public void testTimedInvokeAll5() throws Exception {
1790 final ExecutorService e =
1791 new CustomTPE(2, 2,
1792 LONG_DELAY_MS, MILLISECONDS,
1793 new ArrayBlockingQueue<Runnable>(10));
1794 try (PoolCleaner cleaner = cleaner(e)) {
1795 List<Callable<String>> l = new ArrayList<>();
1796 l.add(new StringTask());
1797 l.add(new StringTask());
1798 List<Future<String>> futures =
1799 e.invokeAll(l, LONG_DELAY_MS, MILLISECONDS);
1800 assertEquals(2, futures.size());
1801 for (Future<String> future : futures)
1802 assertSame(TEST_STRING, future.get());
1803 }
1804 }
1805
1806 /**
1807 * timed invokeAll(c) cancels tasks not completed by timeout
1808 */
1809 public void testTimedInvokeAll6() throws Exception {
1810 for (long timeout = timeoutMillis();;) {
1811 final CountDownLatch done = new CountDownLatch(1);
1812 final Callable<String> waiter = new CheckedCallable<String>() {
1813 public String realCall() {
1814 try { done.await(LONG_DELAY_MS, MILLISECONDS); }
1815 catch (InterruptedException ok) {}
1816 return "1"; }};
1817 final ExecutorService p =
1818 new CustomTPE(2, 2,
1819 LONG_DELAY_MS, MILLISECONDS,
1820 new ArrayBlockingQueue<Runnable>(10));
1821 try (PoolCleaner cleaner = cleaner(p, done)) {
1822 List<Callable<String>> tasks = new ArrayList<>();
1823 tasks.add(new StringTask("0"));
1824 tasks.add(waiter);
1825 tasks.add(new StringTask("2"));
1826 long startTime = System.nanoTime();
1827 List<Future<String>> futures =
1828 p.invokeAll(tasks, timeout, MILLISECONDS);
1829 assertEquals(tasks.size(), futures.size());
1830 assertTrue(millisElapsedSince(startTime) >= timeout);
1831 for (Future future : futures)
1832 assertTrue(future.isDone());
1833 assertTrue(futures.get(1).isCancelled());
1834 try {
1835 assertEquals("0", futures.get(0).get());
1836 assertEquals("2", futures.get(2).get());
1837 break;
1838 } catch (CancellationException retryWithLongerTimeout) {
1839 timeout *= 2;
1840 if (timeout >= LONG_DELAY_MS / 2)
1841 fail("expected exactly one task to be cancelled");
1842 }
1843 }
1844 }
1845 }
1846
1847 /**
1848 * Execution continues if there is at least one thread even if
1849 * thread factory fails to create more
1850 */
1851 public void testFailingThreadFactory() throws InterruptedException {
1852 final ExecutorService e =
1853 new CustomTPE(100, 100,
1854 LONG_DELAY_MS, MILLISECONDS,
1855 new LinkedBlockingQueue<Runnable>(),
1856 new FailingThreadFactory());
1857 try (PoolCleaner cleaner = cleaner(e)) {
1858 final int TASKS = 100;
1859 final CountDownLatch done = new CountDownLatch(TASKS);
1860 for (int k = 0; k < TASKS; ++k)
1861 e.execute(new CheckedRunnable() {
1862 public void realRun() {
1863 done.countDown();
1864 }});
1865 await(done);
1866 }
1867 }
1868
1869 /**
1870 * allowsCoreThreadTimeOut is by default false.
1871 */
1872 public void testAllowsCoreThreadTimeOut() {
1873 final ThreadPoolExecutor p =
1874 new CustomTPE(2, 2,
1875 1000, MILLISECONDS,
1876 new ArrayBlockingQueue<Runnable>(10));
1877 try (PoolCleaner cleaner = cleaner(p)) {
1878 assertFalse(p.allowsCoreThreadTimeOut());
1879 }
1880 }
1881
1882 /**
1883 * allowCoreThreadTimeOut(true) causes idle threads to time out
1884 */
1885 public void testAllowCoreThreadTimeOut_true() throws Exception {
1886 long keepAliveTime = timeoutMillis();
1887 final ThreadPoolExecutor p =
1888 new CustomTPE(2, 10,
1889 keepAliveTime, MILLISECONDS,
1890 new ArrayBlockingQueue<Runnable>(10));
1891 try (PoolCleaner cleaner = cleaner(p)) {
1892 final CountDownLatch threadStarted = new CountDownLatch(1);
1893 p.allowCoreThreadTimeOut(true);
1894 p.execute(new CheckedRunnable() {
1895 public void realRun() {
1896 threadStarted.countDown();
1897 assertEquals(1, p.getPoolSize());
1898 }});
1899 await(threadStarted);
1900 delay(keepAliveTime);
1901 long startTime = System.nanoTime();
1902 while (p.getPoolSize() > 0
1903 && millisElapsedSince(startTime) < LONG_DELAY_MS)
1904 Thread.yield();
1905 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1906 assertEquals(0, p.getPoolSize());
1907 }
1908 }
1909
1910 /**
1911 * allowCoreThreadTimeOut(false) causes idle threads not to time out
1912 */
1913 public void testAllowCoreThreadTimeOut_false() throws Exception {
1914 long keepAliveTime = timeoutMillis();
1915 final ThreadPoolExecutor p =
1916 new CustomTPE(2, 10,
1917 keepAliveTime, MILLISECONDS,
1918 new ArrayBlockingQueue<Runnable>(10));
1919 try (PoolCleaner cleaner = cleaner(p)) {
1920 final CountDownLatch threadStarted = new CountDownLatch(1);
1921 p.allowCoreThreadTimeOut(false);
1922 p.execute(new CheckedRunnable() {
1923 public void realRun() throws InterruptedException {
1924 threadStarted.countDown();
1925 assertTrue(p.getPoolSize() >= 1);
1926 }});
1927 delay(2 * keepAliveTime);
1928 assertTrue(p.getPoolSize() >= 1);
1929 }
1930 }
1931
1932 /**
1933 * get(cancelled task) throws CancellationException
1934 * (in part, a test of CustomTPE itself)
1935 */
1936 public void testGet_cancelled() throws Exception {
1937 final CountDownLatch done = new CountDownLatch(1);
1938 final ExecutorService e =
1939 new CustomTPE(1, 1,
1940 LONG_DELAY_MS, MILLISECONDS,
1941 new LinkedBlockingQueue<Runnable>());
1942 try (PoolCleaner cleaner = cleaner(e, done)) {
1943 final CountDownLatch blockerStarted = new CountDownLatch(1);
1944 final List<Future<?>> futures = new ArrayList<>();
1945 for (int i = 0; i < 2; i++) {
1946 Runnable r = new CheckedRunnable() { public void realRun()
1947 throws Throwable {
1948 blockerStarted.countDown();
1949 assertTrue(done.await(2 * LONG_DELAY_MS, MILLISECONDS));
1950 }};
1951 futures.add(e.submit(r));
1952 }
1953 await(blockerStarted);
1954 for (Future<?> future : futures) future.cancel(false);
1955 for (Future<?> future : futures) {
1956 try {
1957 future.get();
1958 shouldThrow();
1959 } catch (CancellationException success) {}
1960 try {
1961 future.get(LONG_DELAY_MS, MILLISECONDS);
1962 shouldThrow();
1963 } catch (CancellationException success) {}
1964 assertTrue(future.isCancelled());
1965 assertTrue(future.isDone());
1966 }
1967 }
1968 }
1969
1970 }