ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/ThreadPoolExecutorSubclassTest.java
Revision: 1.107
Committed: Tue Mar 22 21:29:24 2022 UTC (2 years ago) by dl
Branch: MAIN
CVS Tags: HEAD
Changes since 1.106: +1 -1 lines
Log Message:
Updates for jdk17+

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include Andrew Wright, Jeffrey Hayes,
6 * Pat Fisher, Mike Judd.
7 */
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.SECONDS;
11
12 import java.util.ArrayList;
13 import java.util.Collection;
14 import java.util.Collections;
15 import java.util.List;
16 import java.util.concurrent.ArrayBlockingQueue;
17 import java.util.concurrent.BlockingQueue;
18 import java.util.concurrent.Callable;
19 import java.util.concurrent.CancellationException;
20 import java.util.concurrent.CountDownLatch;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.FutureTask;
25 import java.util.concurrent.LinkedBlockingQueue;
26 import java.util.concurrent.RejectedExecutionHandler;
27 import java.util.concurrent.RunnableFuture;
28 import java.util.concurrent.SynchronousQueue;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.ThreadLocalRandom;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeoutException;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.concurrent.locks.Condition;
36 import java.util.concurrent.locks.ReentrantLock;
37
38 import junit.framework.Test;
39 import junit.framework.TestSuite;
40
41 public class ThreadPoolExecutorSubclassTest extends JSR166TestCase {
42 public static void main(String[] args) {
43 main(suite(), args);
44 }
45 public static Test suite() {
46 return new TestSuite(ThreadPoolExecutorSubclassTest.class);
47 }
48
49 static class CustomTask<V> implements RunnableFuture<V> {
50 final Callable<V> callable;
51 final ReentrantLock lock = new ReentrantLock();
52 final Condition cond = lock.newCondition();
53 boolean done;
54 boolean cancelled;
55 V result;
56 Thread thread;
57 Exception exception;
58 CustomTask(Callable<V> c) {
59 if (c == null) throw new NullPointerException();
60 callable = c;
61 }
62 CustomTask(final Runnable r, final V res) {
63 if (r == null) throw new NullPointerException();
64 callable = new Callable<V>() {
65 public V call() throws Exception { r.run(); return res; }};
66 }
67 public boolean isDone() {
68 lock.lock(); try { return done; } finally { lock.unlock() ; }
69 }
70 public boolean isCancelled() {
71 lock.lock(); try { return cancelled; } finally { lock.unlock() ; }
72 }
73 public boolean cancel(boolean mayInterrupt) {
74 lock.lock();
75 try {
76 if (!done) {
77 cancelled = true;
78 done = true;
79 if (mayInterrupt && thread != null)
80 thread.interrupt();
81 return true;
82 }
83 return false;
84 }
85 finally { lock.unlock() ; }
86 }
87 public void run() {
88 lock.lock();
89 try {
90 if (done)
91 return;
92 thread = Thread.currentThread();
93 }
94 finally { lock.unlock() ; }
95 V v = null;
96 Exception e = null;
97 try {
98 v = callable.call();
99 }
100 catch (Exception ex) {
101 e = ex;
102 }
103 lock.lock();
104 try {
105 if (!done) {
106 result = v;
107 exception = e;
108 done = true;
109 thread = null;
110 cond.signalAll();
111 }
112 }
113 finally { lock.unlock(); }
114 }
115 public V get() throws InterruptedException, ExecutionException {
116 lock.lock();
117 try {
118 while (!done)
119 cond.await();
120 if (cancelled)
121 throw new CancellationException();
122 if (exception != null)
123 throw new ExecutionException(exception);
124 return result;
125 }
126 finally { lock.unlock(); }
127 }
128 public V get(long timeout, TimeUnit unit)
129 throws InterruptedException, ExecutionException, TimeoutException {
130 long nanos = unit.toNanos(timeout);
131 lock.lock();
132 try {
133 while (!done) {
134 if (nanos <= 0L)
135 throw new TimeoutException();
136 nanos = cond.awaitNanos(nanos);
137 }
138 if (cancelled)
139 throw new CancellationException();
140 if (exception != null)
141 throw new ExecutionException(exception);
142 return result;
143 }
144 finally { lock.unlock(); }
145 }
146 }
147
148 static class CustomTPE extends ThreadPoolExecutor {
149 protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
150 return new CustomTask<V>(c);
151 }
152 protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
153 return new CustomTask<V>(r, v);
154 }
155
156 CustomTPE(int corePoolSize,
157 int maximumPoolSize,
158 long keepAliveTime,
159 TimeUnit unit,
160 BlockingQueue<Runnable> workQueue) {
161 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
162 workQueue);
163 }
164 CustomTPE(int corePoolSize,
165 int maximumPoolSize,
166 long keepAliveTime,
167 TimeUnit unit,
168 BlockingQueue<Runnable> workQueue,
169 ThreadFactory threadFactory) {
170 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
171 threadFactory);
172 }
173
174 CustomTPE(int corePoolSize,
175 int maximumPoolSize,
176 long keepAliveTime,
177 TimeUnit unit,
178 BlockingQueue<Runnable> workQueue,
179 RejectedExecutionHandler handler) {
180 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
181 handler);
182 }
183 CustomTPE(int corePoolSize,
184 int maximumPoolSize,
185 long keepAliveTime,
186 TimeUnit unit,
187 BlockingQueue<Runnable> workQueue,
188 ThreadFactory threadFactory,
189 RejectedExecutionHandler handler) {
190 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
191 workQueue, threadFactory, handler);
192 }
193
194 final CountDownLatch beforeCalled = new CountDownLatch(1);
195 final CountDownLatch afterCalled = new CountDownLatch(1);
196 final CountDownLatch terminatedCalled = new CountDownLatch(1);
197
198 public CustomTPE() {
199 super(1, 1, LONG_DELAY_MS, MILLISECONDS, new SynchronousQueue<Runnable>());
200 }
201 protected void beforeExecute(Thread t, Runnable r) {
202 beforeCalled.countDown();
203 }
204 protected void afterExecute(Runnable r, Throwable t) {
205 afterCalled.countDown();
206 }
207 protected void terminated() {
208 terminatedCalled.countDown();
209 }
210
211 public boolean beforeCalled() {
212 return beforeCalled.getCount() == 0;
213 }
214 public boolean afterCalled() {
215 return afterCalled.getCount() == 0;
216 }
217 public boolean terminatedCalled() {
218 return terminatedCalled.getCount() == 0;
219 }
220 }
221
222 static class FailingThreadFactory implements ThreadFactory {
223 int calls = 0;
224 public Thread newThread(Runnable r) {
225 if (++calls > 1) return null;
226 return new Thread(r);
227 }
228 }
229
230 /**
231 * execute successfully executes a runnable
232 */
233 public void testExecute() throws InterruptedException {
234 final ThreadPoolExecutor p =
235 new CustomTPE(1, 1,
236 2 * LONG_DELAY_MS, MILLISECONDS,
237 new ArrayBlockingQueue<Runnable>(10));
238 try (PoolCleaner cleaner = cleaner(p)) {
239 final CountDownLatch done = new CountDownLatch(1);
240 final Runnable task = new CheckedRunnable() {
241 public void realRun() { done.countDown(); }};
242 p.execute(task);
243 await(done);
244 }
245 }
246
247 /**
248 * getActiveCount increases but doesn't overestimate, when a
249 * thread becomes active
250 */
251 public void testGetActiveCount() throws InterruptedException {
252 final CountDownLatch done = new CountDownLatch(1);
253 final ThreadPoolExecutor p =
254 new CustomTPE(2, 2,
255 LONG_DELAY_MS, MILLISECONDS,
256 new ArrayBlockingQueue<Runnable>(10));
257 try (PoolCleaner cleaner = cleaner(p, done)) {
258 final CountDownLatch threadStarted = new CountDownLatch(1);
259 assertEquals(0, p.getActiveCount());
260 p.execute(new CheckedRunnable() {
261 public void realRun() throws InterruptedException {
262 threadStarted.countDown();
263 assertEquals(1, p.getActiveCount());
264 await(done);
265 }});
266 await(threadStarted);
267 assertEquals(1, p.getActiveCount());
268 }
269 }
270
271 /**
272 * prestartCoreThread starts a thread if under corePoolSize, else doesn't
273 */
274 public void testPrestartCoreThread() {
275 final ThreadPoolExecutor p =
276 new CustomTPE(2, 6,
277 LONG_DELAY_MS, MILLISECONDS,
278 new ArrayBlockingQueue<Runnable>(10));
279 try (PoolCleaner cleaner = cleaner(p)) {
280 assertEquals(0, p.getPoolSize());
281 assertTrue(p.prestartCoreThread());
282 assertEquals(1, p.getPoolSize());
283 assertTrue(p.prestartCoreThread());
284 assertEquals(2, p.getPoolSize());
285 assertFalse(p.prestartCoreThread());
286 assertEquals(2, p.getPoolSize());
287 p.setCorePoolSize(4);
288 assertTrue(p.prestartCoreThread());
289 assertEquals(3, p.getPoolSize());
290 assertTrue(p.prestartCoreThread());
291 assertEquals(4, p.getPoolSize());
292 assertFalse(p.prestartCoreThread());
293 assertEquals(4, p.getPoolSize());
294 }
295 }
296
297 /**
298 * prestartAllCoreThreads starts all corePoolSize threads
299 */
300 public void testPrestartAllCoreThreads() {
301 final ThreadPoolExecutor p =
302 new CustomTPE(2, 6,
303 LONG_DELAY_MS, MILLISECONDS,
304 new ArrayBlockingQueue<Runnable>(10));
305 try (PoolCleaner cleaner = cleaner(p)) {
306 assertEquals(0, p.getPoolSize());
307 p.prestartAllCoreThreads();
308 assertEquals(2, p.getPoolSize());
309 p.prestartAllCoreThreads();
310 assertEquals(2, p.getPoolSize());
311 p.setCorePoolSize(4);
312 p.prestartAllCoreThreads();
313 assertEquals(4, p.getPoolSize());
314 p.prestartAllCoreThreads();
315 assertEquals(4, p.getPoolSize());
316 }
317 }
318
319 /**
320 * getCompletedTaskCount increases, but doesn't overestimate,
321 * when tasks complete
322 */
323 public void testGetCompletedTaskCount() throws InterruptedException {
324 final ThreadPoolExecutor p =
325 new CustomTPE(2, 2,
326 LONG_DELAY_MS, MILLISECONDS,
327 new ArrayBlockingQueue<Runnable>(10));
328 try (PoolCleaner cleaner = cleaner(p)) {
329 final CountDownLatch threadStarted = new CountDownLatch(1);
330 final CountDownLatch threadProceed = new CountDownLatch(1);
331 final CountDownLatch threadDone = new CountDownLatch(1);
332 assertEquals(0, p.getCompletedTaskCount());
333 p.execute(new CheckedRunnable() {
334 public void realRun() throws InterruptedException {
335 threadStarted.countDown();
336 assertEquals(0, p.getCompletedTaskCount());
337 await(threadProceed);
338 threadDone.countDown();
339 }});
340 await(threadStarted);
341 assertEquals(0, p.getCompletedTaskCount());
342 threadProceed.countDown();
343 await(threadDone);
344 long startTime = System.nanoTime();
345 while (p.getCompletedTaskCount() != 1) {
346 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
347 fail("timed out");
348 Thread.yield();
349 }
350 }
351 }
352
353 /**
354 * getCorePoolSize returns size given in constructor if not otherwise set
355 */
356 public void testGetCorePoolSize() {
357 final ThreadPoolExecutor p =
358 new CustomTPE(1, 1,
359 LONG_DELAY_MS, MILLISECONDS,
360 new ArrayBlockingQueue<Runnable>(10));
361 try (PoolCleaner cleaner = cleaner(p)) {
362 assertEquals(1, p.getCorePoolSize());
363 }
364 }
365
366 /**
367 * getKeepAliveTime returns value given in constructor if not otherwise set
368 */
369 public void testGetKeepAliveTime() {
370 final ThreadPoolExecutor p =
371 new CustomTPE(2, 2,
372 1000, MILLISECONDS,
373 new ArrayBlockingQueue<Runnable>(10));
374 try (PoolCleaner cleaner = cleaner(p)) {
375 assertEquals(1, p.getKeepAliveTime(SECONDS));
376 }
377 }
378
379 /**
380 * getThreadFactory returns factory in constructor if not set
381 */
382 public void testGetThreadFactory() {
383 final ThreadFactory threadFactory = new SimpleThreadFactory();
384 final ThreadPoolExecutor p =
385 new CustomTPE(1, 2,
386 LONG_DELAY_MS, MILLISECONDS,
387 new ArrayBlockingQueue<Runnable>(10),
388 threadFactory,
389 new NoOpREHandler());
390 try (PoolCleaner cleaner = cleaner(p)) {
391 assertSame(threadFactory, p.getThreadFactory());
392 }
393 }
394
395 /**
396 * setThreadFactory sets the thread factory returned by getThreadFactory
397 */
398 public void testSetThreadFactory() {
399 final ThreadPoolExecutor p =
400 new CustomTPE(1, 2,
401 LONG_DELAY_MS, MILLISECONDS,
402 new ArrayBlockingQueue<Runnable>(10));
403 try (PoolCleaner cleaner = cleaner(p)) {
404 ThreadFactory threadFactory = new SimpleThreadFactory();
405 p.setThreadFactory(threadFactory);
406 assertSame(threadFactory, p.getThreadFactory());
407 }
408 }
409
410 /**
411 * setThreadFactory(null) throws NPE
412 */
413 public void testSetThreadFactoryNull() {
414 final ThreadPoolExecutor p =
415 new CustomTPE(1, 2,
416 LONG_DELAY_MS, MILLISECONDS,
417 new ArrayBlockingQueue<Runnable>(10));
418 try (PoolCleaner cleaner = cleaner(p)) {
419 try {
420 p.setThreadFactory(null);
421 shouldThrow();
422 } catch (NullPointerException success) {}
423 }
424 }
425
426 /**
427 * getRejectedExecutionHandler returns handler in constructor if not set
428 */
429 public void testGetRejectedExecutionHandler() {
430 final RejectedExecutionHandler handler = new NoOpREHandler();
431 final ThreadPoolExecutor p =
432 new CustomTPE(1, 2,
433 LONG_DELAY_MS, MILLISECONDS,
434 new ArrayBlockingQueue<Runnable>(10),
435 handler);
436 try (PoolCleaner cleaner = cleaner(p)) {
437 assertSame(handler, p.getRejectedExecutionHandler());
438 }
439 }
440
441 /**
442 * setRejectedExecutionHandler sets the handler returned by
443 * getRejectedExecutionHandler
444 */
445 public void testSetRejectedExecutionHandler() {
446 final ThreadPoolExecutor p =
447 new CustomTPE(1, 2,
448 LONG_DELAY_MS, MILLISECONDS,
449 new ArrayBlockingQueue<Runnable>(10));
450 try (PoolCleaner cleaner = cleaner(p)) {
451 RejectedExecutionHandler handler = new NoOpREHandler();
452 p.setRejectedExecutionHandler(handler);
453 assertSame(handler, p.getRejectedExecutionHandler());
454 }
455 }
456
457 /**
458 * setRejectedExecutionHandler(null) throws NPE
459 */
460 public void testSetRejectedExecutionHandlerNull() {
461 final ThreadPoolExecutor p =
462 new CustomTPE(1, 2,
463 LONG_DELAY_MS, MILLISECONDS,
464 new ArrayBlockingQueue<Runnable>(10));
465 try (PoolCleaner cleaner = cleaner(p)) {
466 try {
467 p.setRejectedExecutionHandler(null);
468 shouldThrow();
469 } catch (NullPointerException success) {}
470 }
471 }
472
473 /**
474 * getLargestPoolSize increases, but doesn't overestimate, when
475 * multiple threads active
476 */
477 public void testGetLargestPoolSize() throws InterruptedException {
478 final int THREADS = 3;
479 final CountDownLatch done = new CountDownLatch(1);
480 final ThreadPoolExecutor p =
481 new CustomTPE(THREADS, THREADS,
482 LONG_DELAY_MS, MILLISECONDS,
483 new ArrayBlockingQueue<Runnable>(10));
484 try (PoolCleaner cleaner = cleaner(p, done)) {
485 assertEquals(0, p.getLargestPoolSize());
486 final CountDownLatch threadsStarted = new CountDownLatch(THREADS);
487 for (int i = 0; i < THREADS; i++)
488 p.execute(new CheckedRunnable() {
489 public void realRun() throws InterruptedException {
490 threadsStarted.countDown();
491 await(done);
492 assertEquals(THREADS, p.getLargestPoolSize());
493 }});
494 await(threadsStarted);
495 assertEquals(THREADS, p.getLargestPoolSize());
496 }
497 assertEquals(THREADS, p.getLargestPoolSize());
498 }
499
500 /**
501 * getMaximumPoolSize returns value given in constructor if not
502 * otherwise set
503 */
504 public void testGetMaximumPoolSize() {
505 final ThreadPoolExecutor p =
506 new CustomTPE(2, 3,
507 LONG_DELAY_MS, MILLISECONDS,
508 new ArrayBlockingQueue<Runnable>(10));
509 try (PoolCleaner cleaner = cleaner(p)) {
510 assertEquals(3, p.getMaximumPoolSize());
511 p.setMaximumPoolSize(5);
512 assertEquals(5, p.getMaximumPoolSize());
513 p.setMaximumPoolSize(4);
514 assertEquals(4, p.getMaximumPoolSize());
515 }
516 }
517
518 /**
519 * getPoolSize increases, but doesn't overestimate, when threads
520 * become active
521 */
522 public void testGetPoolSize() throws InterruptedException {
523 final CountDownLatch done = new CountDownLatch(1);
524 final ThreadPoolExecutor p =
525 new CustomTPE(1, 1,
526 LONG_DELAY_MS, MILLISECONDS,
527 new ArrayBlockingQueue<Runnable>(10));
528 try (PoolCleaner cleaner = cleaner(p, done)) {
529 assertEquals(0, p.getPoolSize());
530 final CountDownLatch threadStarted = new CountDownLatch(1);
531 p.execute(new CheckedRunnable() {
532 public void realRun() throws InterruptedException {
533 threadStarted.countDown();
534 assertEquals(1, p.getPoolSize());
535 await(done);
536 }});
537 await(threadStarted);
538 assertEquals(1, p.getPoolSize());
539 }
540 }
541
542 /**
543 * getTaskCount increases, but doesn't overestimate, when tasks submitted
544 */
545 public void testGetTaskCount() throws InterruptedException {
546 final int TASKS = 3;
547 final CountDownLatch done = new CountDownLatch(1);
548 final ThreadPoolExecutor p =
549 new CustomTPE(1, 1,
550 LONG_DELAY_MS, MILLISECONDS,
551 new ArrayBlockingQueue<Runnable>(10));
552 try (PoolCleaner cleaner = cleaner(p, done)) {
553 final CountDownLatch threadStarted = new CountDownLatch(1);
554 assertEquals(0, p.getTaskCount());
555 assertEquals(0, p.getCompletedTaskCount());
556 p.execute(new CheckedRunnable() {
557 public void realRun() throws InterruptedException {
558 threadStarted.countDown();
559 await(done);
560 }});
561 await(threadStarted);
562 assertEquals(1, p.getTaskCount());
563 assertEquals(0, p.getCompletedTaskCount());
564 for (int i = 0; i < TASKS; i++) {
565 assertEquals(1 + i, p.getTaskCount());
566 p.execute(new CheckedRunnable() {
567 public void realRun() throws InterruptedException {
568 threadStarted.countDown();
569 assertEquals(1 + TASKS, p.getTaskCount());
570 await(done);
571 }});
572 }
573 assertEquals(1 + TASKS, p.getTaskCount());
574 assertEquals(0, p.getCompletedTaskCount());
575 }
576 assertEquals(1 + TASKS, p.getTaskCount());
577 assertEquals(1 + TASKS, p.getCompletedTaskCount());
578 }
579
580 /**
581 * isShutdown is false before shutdown, true after
582 */
583 public void testIsShutdown() {
584 final ThreadPoolExecutor p =
585 new CustomTPE(1, 1,
586 LONG_DELAY_MS, MILLISECONDS,
587 new ArrayBlockingQueue<Runnable>(10));
588 try (PoolCleaner cleaner = cleaner(p)) {
589 assertFalse(p.isShutdown());
590 try { p.shutdown(); } catch (SecurityException ok) { return; }
591 assertTrue(p.isShutdown());
592 }
593 }
594
595 /**
596 * isTerminated is false before termination, true after
597 */
598 public void testIsTerminated() throws InterruptedException {
599 final ThreadPoolExecutor p =
600 new CustomTPE(1, 1,
601 LONG_DELAY_MS, MILLISECONDS,
602 new ArrayBlockingQueue<Runnable>(10));
603 try (PoolCleaner cleaner = cleaner(p)) {
604 final CountDownLatch threadStarted = new CountDownLatch(1);
605 final CountDownLatch done = new CountDownLatch(1);
606 assertFalse(p.isTerminating());
607 p.execute(new CheckedRunnable() {
608 public void realRun() throws InterruptedException {
609 assertFalse(p.isTerminating());
610 threadStarted.countDown();
611 await(done);
612 }});
613 await(threadStarted);
614 assertFalse(p.isTerminating());
615 done.countDown();
616 try { p.shutdown(); } catch (SecurityException ok) { return; }
617 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
618 assertTrue(p.isTerminated());
619 assertFalse(p.isTerminating());
620 }
621 }
622
623 /**
624 * isTerminating is not true when running or when terminated
625 */
626 public void testIsTerminating() throws InterruptedException {
627 final ThreadPoolExecutor p =
628 new CustomTPE(1, 1,
629 LONG_DELAY_MS, MILLISECONDS,
630 new ArrayBlockingQueue<Runnable>(10));
631 try (PoolCleaner cleaner = cleaner(p)) {
632 final CountDownLatch threadStarted = new CountDownLatch(1);
633 final CountDownLatch done = new CountDownLatch(1);
634 assertFalse(p.isTerminating());
635 p.execute(new CheckedRunnable() {
636 public void realRun() throws InterruptedException {
637 assertFalse(p.isTerminating());
638 threadStarted.countDown();
639 await(done);
640 }});
641 await(threadStarted);
642 assertFalse(p.isTerminating());
643 done.countDown();
644 try { p.shutdown(); } catch (SecurityException ok) { return; }
645 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
646 assertTrue(p.isTerminated());
647 assertFalse(p.isTerminating());
648 }
649 }
650
651 /**
652 * getQueue returns the work queue, which contains queued tasks
653 */
654 public void testGetQueue() throws InterruptedException {
655 final CountDownLatch done = new CountDownLatch(1);
656 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<>(10);
657 final ThreadPoolExecutor p =
658 new CustomTPE(1, 1,
659 LONG_DELAY_MS, MILLISECONDS,
660 q);
661 try (PoolCleaner cleaner = cleaner(p, done)) {
662 final CountDownLatch threadStarted = new CountDownLatch(1);
663 FutureTask[] rtasks = new FutureTask[5];
664 @SuppressWarnings("unchecked")
665 FutureTask<Boolean>[] tasks = (FutureTask<Boolean>[])rtasks;
666 for (int i = 0; i < tasks.length; i++) {
667 Callable<Boolean> task = new CheckedCallable<>() {
668 public Boolean realCall() throws InterruptedException {
669 threadStarted.countDown();
670 assertSame(q, p.getQueue());
671 await(done);
672 return Boolean.TRUE;
673 }};
674 tasks[i] = new FutureTask<>(task);
675 p.execute(tasks[i]);
676 }
677 await(threadStarted);
678 assertSame(q, p.getQueue());
679 assertFalse(q.contains(tasks[0]));
680 assertTrue(q.contains(tasks[tasks.length - 1]));
681 assertEquals(tasks.length - 1, q.size());
682 }
683 }
684
685 /**
686 * remove(task) removes queued task, and fails to remove active task
687 */
688 public void testRemove() throws InterruptedException {
689 final CountDownLatch done = new CountDownLatch(1);
690 BlockingQueue<Runnable> q = new ArrayBlockingQueue<>(10);
691 final ThreadPoolExecutor p =
692 new CustomTPE(1, 1,
693 LONG_DELAY_MS, MILLISECONDS,
694 q);
695 try (PoolCleaner cleaner = cleaner(p, done)) {
696 Runnable[] tasks = new Runnable[6];
697 final CountDownLatch threadStarted = new CountDownLatch(1);
698 for (int i = 0; i < tasks.length; i++) {
699 tasks[i] = new CheckedRunnable() {
700 public void realRun() throws InterruptedException {
701 threadStarted.countDown();
702 await(done);
703 }};
704 p.execute(tasks[i]);
705 }
706 await(threadStarted);
707 assertFalse(p.remove(tasks[0]));
708 assertTrue(q.contains(tasks[4]));
709 assertTrue(q.contains(tasks[3]));
710 assertTrue(p.remove(tasks[4]));
711 assertFalse(p.remove(tasks[4]));
712 assertFalse(q.contains(tasks[4]));
713 assertTrue(q.contains(tasks[3]));
714 assertTrue(p.remove(tasks[3]));
715 assertFalse(q.contains(tasks[3]));
716 }
717 }
718
719 /**
720 * purge removes cancelled tasks from the queue
721 */
722 public void testPurge() throws InterruptedException {
723 final CountDownLatch threadStarted = new CountDownLatch(1);
724 final CountDownLatch done = new CountDownLatch(1);
725 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<>(10);
726 final ThreadPoolExecutor p =
727 new CustomTPE(1, 1,
728 LONG_DELAY_MS, MILLISECONDS,
729 q);
730 try (PoolCleaner cleaner = cleaner(p, done)) {
731 FutureTask[] rtasks = new FutureTask[5];
732 @SuppressWarnings("unchecked")
733 FutureTask<Boolean>[] tasks = (FutureTask<Boolean>[])rtasks;
734 for (int i = 0; i < tasks.length; i++) {
735 Callable<Boolean> task = new CheckedCallable<>() {
736 public Boolean realCall() throws InterruptedException {
737 threadStarted.countDown();
738 await(done);
739 return Boolean.TRUE;
740 }};
741 tasks[i] = new FutureTask<>(task);
742 p.execute(tasks[i]);
743 }
744 await(threadStarted);
745 assertEquals(tasks.length, p.getTaskCount());
746 assertEquals(tasks.length - 1, q.size());
747 assertEquals(1L, p.getActiveCount());
748 assertEquals(0L, p.getCompletedTaskCount());
749 tasks[4].cancel(true);
750 tasks[3].cancel(false);
751 p.purge();
752 assertEquals(tasks.length - 3, q.size());
753 assertEquals(tasks.length - 2, p.getTaskCount());
754 p.purge(); // Nothing to do
755 assertEquals(tasks.length - 3, q.size());
756 assertEquals(tasks.length - 2, p.getTaskCount());
757 }
758 }
759
760 /**
761 * shutdownNow returns a list containing tasks that were not run,
762 * and those tasks are drained from the queue
763 */
764 public void testShutdownNow() throws InterruptedException {
765 final int poolSize = 2;
766 final int count = 5;
767 final AtomicInteger ran = new AtomicInteger(0);
768 final ThreadPoolExecutor p =
769 new CustomTPE(poolSize, poolSize,
770 LONG_DELAY_MS, MILLISECONDS,
771 new ArrayBlockingQueue<Runnable>(10));
772 final CountDownLatch threadsStarted = new CountDownLatch(poolSize);
773 Runnable waiter = new CheckedRunnable() { public void realRun() {
774 threadsStarted.countDown();
775 try {
776 MILLISECONDS.sleep(LONGER_DELAY_MS);
777 } catch (InterruptedException success) {}
778 ran.getAndIncrement();
779 }};
780 for (int i = 0; i < count; i++)
781 p.execute(waiter);
782 await(threadsStarted);
783 assertEquals(poolSize, p.getActiveCount());
784 assertEquals(0, p.getCompletedTaskCount());
785 final List<Runnable> queuedTasks;
786 try {
787 queuedTasks = p.shutdownNow();
788 } catch (SecurityException ok) {
789 return; // Allowed in case test doesn't have privs
790 }
791 assertTrue(p.isShutdown());
792 assertTrue(p.getQueue().isEmpty());
793 assertEquals(count - poolSize, queuedTasks.size());
794 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
795 assertTrue(p.isTerminated());
796 assertEquals(poolSize, ran.get());
797 assertEquals(poolSize, p.getCompletedTaskCount());
798 }
799
800 // Exception Tests
801
802 /**
803 * Constructor throws if corePoolSize argument is less than zero
804 */
805 public void testConstructor1() {
806 try {
807 new CustomTPE(-1, 1, 1L, SECONDS,
808 new ArrayBlockingQueue<Runnable>(10));
809 shouldThrow();
810 } catch (IllegalArgumentException success) {}
811 }
812
813 /**
814 * Constructor throws if maximumPoolSize is less than zero
815 */
816 public void testConstructor2() {
817 try {
818 new CustomTPE(1, -1, 1L, SECONDS,
819 new ArrayBlockingQueue<Runnable>(10));
820 shouldThrow();
821 } catch (IllegalArgumentException success) {}
822 }
823
824 /**
825 * Constructor throws if maximumPoolSize is equal to zero
826 */
827 public void testConstructor3() {
828 try {
829 new CustomTPE(1, 0, 1L, SECONDS,
830 new ArrayBlockingQueue<Runnable>(10));
831 shouldThrow();
832 } catch (IllegalArgumentException success) {}
833 }
834
835 /**
836 * Constructor throws if keepAliveTime is less than zero
837 */
838 public void testConstructor4() {
839 try {
840 new CustomTPE(1, 2, -1L, SECONDS,
841 new ArrayBlockingQueue<Runnable>(10));
842 shouldThrow();
843 } catch (IllegalArgumentException success) {}
844 }
845
846 /**
847 * Constructor throws if corePoolSize is greater than the maximumPoolSize
848 */
849 public void testConstructor5() {
850 try {
851 new CustomTPE(2, 1, 1L, SECONDS,
852 new ArrayBlockingQueue<Runnable>(10));
853 shouldThrow();
854 } catch (IllegalArgumentException success) {}
855 }
856
857 /**
858 * Constructor throws if workQueue is set to null
859 */
860 public void testConstructorNullPointerException() {
861 try {
862 new CustomTPE(1, 2, 1L, SECONDS, null);
863 shouldThrow();
864 } catch (NullPointerException success) {}
865 }
866
867 /**
868 * Constructor throws if corePoolSize argument is less than zero
869 */
870 public void testConstructor6() {
871 try {
872 new CustomTPE(-1, 1, 1L, SECONDS,
873 new ArrayBlockingQueue<Runnable>(10),
874 new SimpleThreadFactory());
875 shouldThrow();
876 } catch (IllegalArgumentException success) {}
877 }
878
879 /**
880 * Constructor throws if maximumPoolSize is less than zero
881 */
882 public void testConstructor7() {
883 try {
884 new CustomTPE(1,-1, 1L, SECONDS,
885 new ArrayBlockingQueue<Runnable>(10),
886 new SimpleThreadFactory());
887 shouldThrow();
888 } catch (IllegalArgumentException success) {}
889 }
890
891 /**
892 * Constructor throws if maximumPoolSize is equal to zero
893 */
894 public void testConstructor8() {
895 try {
896 new CustomTPE(1, 0, 1L, SECONDS,
897 new ArrayBlockingQueue<Runnable>(10),
898 new SimpleThreadFactory());
899 shouldThrow();
900 } catch (IllegalArgumentException success) {}
901 }
902
903 /**
904 * Constructor throws if keepAliveTime is less than zero
905 */
906 public void testConstructor9() {
907 try {
908 new CustomTPE(1, 2, -1L, SECONDS,
909 new ArrayBlockingQueue<Runnable>(10),
910 new SimpleThreadFactory());
911 shouldThrow();
912 } catch (IllegalArgumentException success) {}
913 }
914
915 /**
916 * Constructor throws if corePoolSize is greater than the maximumPoolSize
917 */
918 public void testConstructor10() {
919 try {
920 new CustomTPE(2, 1, 1L, SECONDS,
921 new ArrayBlockingQueue<Runnable>(10),
922 new SimpleThreadFactory());
923 shouldThrow();
924 } catch (IllegalArgumentException success) {}
925 }
926
927 /**
928 * Constructor throws if workQueue is set to null
929 */
930 public void testConstructorNullPointerException2() {
931 try {
932 new CustomTPE(1, 2, 1L, SECONDS, null, new SimpleThreadFactory());
933 shouldThrow();
934 } catch (NullPointerException success) {}
935 }
936
937 /**
938 * Constructor throws if threadFactory is set to null
939 */
940 public void testConstructorNullPointerException3() {
941 try {
942 new CustomTPE(1, 2, 1L, SECONDS,
943 new ArrayBlockingQueue<Runnable>(10),
944 (ThreadFactory) null);
945 shouldThrow();
946 } catch (NullPointerException success) {}
947 }
948
949 /**
950 * Constructor throws if corePoolSize argument is less than zero
951 */
952 public void testConstructor11() {
953 try {
954 new CustomTPE(-1, 1, 1L, SECONDS,
955 new ArrayBlockingQueue<Runnable>(10),
956 new NoOpREHandler());
957 shouldThrow();
958 } catch (IllegalArgumentException success) {}
959 }
960
961 /**
962 * Constructor throws if maximumPoolSize is less than zero
963 */
964 public void testConstructor12() {
965 try {
966 new CustomTPE(1, -1, 1L, SECONDS,
967 new ArrayBlockingQueue<Runnable>(10),
968 new NoOpREHandler());
969 shouldThrow();
970 } catch (IllegalArgumentException success) {}
971 }
972
973 /**
974 * Constructor throws if maximumPoolSize is equal to zero
975 */
976 public void testConstructor13() {
977 try {
978 new CustomTPE(1, 0, 1L, SECONDS,
979 new ArrayBlockingQueue<Runnable>(10),
980 new NoOpREHandler());
981 shouldThrow();
982 } catch (IllegalArgumentException success) {}
983 }
984
985 /**
986 * Constructor throws if keepAliveTime is less than zero
987 */
988 public void testConstructor14() {
989 try {
990 new CustomTPE(1, 2, -1L, SECONDS,
991 new ArrayBlockingQueue<Runnable>(10),
992 new NoOpREHandler());
993 shouldThrow();
994 } catch (IllegalArgumentException success) {}
995 }
996
997 /**
998 * Constructor throws if corePoolSize is greater than the maximumPoolSize
999 */
1000 public void testConstructor15() {
1001 try {
1002 new CustomTPE(2, 1, 1L, SECONDS,
1003 new ArrayBlockingQueue<Runnable>(10),
1004 new NoOpREHandler());
1005 shouldThrow();
1006 } catch (IllegalArgumentException success) {}
1007 }
1008
1009 /**
1010 * Constructor throws if workQueue is set to null
1011 */
1012 public void testConstructorNullPointerException4() {
1013 try {
1014 new CustomTPE(1, 2, 1L, SECONDS,
1015 null,
1016 new NoOpREHandler());
1017 shouldThrow();
1018 } catch (NullPointerException success) {}
1019 }
1020
1021 /**
1022 * Constructor throws if handler is set to null
1023 */
1024 public void testConstructorNullPointerException5() {
1025 try {
1026 new CustomTPE(1, 2, 1L, SECONDS,
1027 new ArrayBlockingQueue<Runnable>(10),
1028 (RejectedExecutionHandler) null);
1029 shouldThrow();
1030 } catch (NullPointerException success) {}
1031 }
1032
1033 /**
1034 * Constructor throws if corePoolSize argument is less than zero
1035 */
1036 public void testConstructor16() {
1037 try {
1038 new CustomTPE(-1, 1, 1L, SECONDS,
1039 new ArrayBlockingQueue<Runnable>(10),
1040 new SimpleThreadFactory(),
1041 new NoOpREHandler());
1042 shouldThrow();
1043 } catch (IllegalArgumentException success) {}
1044 }
1045
1046 /**
1047 * Constructor throws if maximumPoolSize is less than zero
1048 */
1049 public void testConstructor17() {
1050 try {
1051 new CustomTPE(1, -1, 1L, SECONDS,
1052 new ArrayBlockingQueue<Runnable>(10),
1053 new SimpleThreadFactory(),
1054 new NoOpREHandler());
1055 shouldThrow();
1056 } catch (IllegalArgumentException success) {}
1057 }
1058
1059 /**
1060 * Constructor throws if maximumPoolSize is equal to zero
1061 */
1062 public void testConstructor18() {
1063 try {
1064 new CustomTPE(1, 0, 1L, SECONDS,
1065 new ArrayBlockingQueue<Runnable>(10),
1066 new SimpleThreadFactory(),
1067 new NoOpREHandler());
1068 shouldThrow();
1069 } catch (IllegalArgumentException success) {}
1070 }
1071
1072 /**
1073 * Constructor throws if keepAliveTime is less than zero
1074 */
1075 public void testConstructor19() {
1076 try {
1077 new CustomTPE(1, 2, -1L, SECONDS,
1078 new ArrayBlockingQueue<Runnable>(10),
1079 new SimpleThreadFactory(),
1080 new NoOpREHandler());
1081 shouldThrow();
1082 } catch (IllegalArgumentException success) {}
1083 }
1084
1085 /**
1086 * Constructor throws if corePoolSize is greater than the maximumPoolSize
1087 */
1088 public void testConstructor20() {
1089 try {
1090 new CustomTPE(2, 1, 1L, SECONDS,
1091 new ArrayBlockingQueue<Runnable>(10),
1092 new SimpleThreadFactory(),
1093 new NoOpREHandler());
1094 shouldThrow();
1095 } catch (IllegalArgumentException success) {}
1096 }
1097
1098 /**
1099 * Constructor throws if workQueue is null
1100 */
1101 public void testConstructorNullPointerException6() {
1102 try {
1103 new CustomTPE(1, 2, 1L, SECONDS,
1104 null,
1105 new SimpleThreadFactory(),
1106 new NoOpREHandler());
1107 shouldThrow();
1108 } catch (NullPointerException success) {}
1109 }
1110
1111 /**
1112 * Constructor throws if handler is null
1113 */
1114 public void testConstructorNullPointerException7() {
1115 try {
1116 new CustomTPE(1, 2, 1L, SECONDS,
1117 new ArrayBlockingQueue<Runnable>(10),
1118 new SimpleThreadFactory(),
1119 (RejectedExecutionHandler) null);
1120 shouldThrow();
1121 } catch (NullPointerException success) {}
1122 }
1123
1124 /**
1125 * Constructor throws if ThreadFactory is null
1126 */
1127 public void testConstructorNullPointerException8() {
1128 try {
1129 new CustomTPE(1, 2, 1L, SECONDS,
1130 new ArrayBlockingQueue<Runnable>(10),
1131 (ThreadFactory) null,
1132 new NoOpREHandler());
1133 shouldThrow();
1134 } catch (NullPointerException success) {}
1135 }
1136
1137 /**
1138 * Submitted tasks are rejected when saturated or shutdown
1139 */
1140 public void testSubmittedTasksRejectedWhenSaturatedOrShutdown() throws InterruptedException {
1141 final ThreadPoolExecutor p =
1142 new CustomTPE(1, 1,
1143 LONG_DELAY_MS, MILLISECONDS,
1144 new ArrayBlockingQueue<Runnable>(1));
1145 final int saturatedSize = saturatedSize(p);
1146 final ThreadLocalRandom rnd = ThreadLocalRandom.current();
1147 final CountDownLatch threadsStarted = new CountDownLatch(p.getMaximumPoolSize());
1148 final CountDownLatch done = new CountDownLatch(1);
1149 final Runnable r = () -> {
1150 threadsStarted.countDown();
1151 for (;;) {
1152 try {
1153 done.await();
1154 return;
1155 } catch (InterruptedException shutdownNowDeliberatelyIgnored) {}
1156 }};
1157 final Callable<Boolean> c = () -> {
1158 threadsStarted.countDown();
1159 for (;;) {
1160 try {
1161 done.await();
1162 return Boolean.TRUE;
1163 } catch (InterruptedException shutdownNowDeliberatelyIgnored) {}
1164 }};
1165 final boolean shutdownNow = rnd.nextBoolean();
1166
1167 try (PoolCleaner cleaner = cleaner(p, done)) {
1168 // saturate
1169 for (int i = saturatedSize; i--> 0; ) {
1170 switch (rnd.nextInt(4)) {
1171 case 0: p.execute(r); break;
1172 case 1: assertFalse(p.submit(r).isDone()); break;
1173 case 2: assertFalse(p.submit(r, Boolean.TRUE).isDone()); break;
1174 case 3: assertFalse(p.submit(c).isDone()); break;
1175 }
1176 }
1177
1178 await(threadsStarted);
1179 assertTaskSubmissionsAreRejected(p);
1180
1181 if (shutdownNow)
1182 p.shutdownNow();
1183 else
1184 p.shutdown();
1185 // Pool is shutdown, but not yet terminated
1186 assertTaskSubmissionsAreRejected(p);
1187 assertFalse(p.isTerminated());
1188
1189 done.countDown(); // release blocking tasks
1190 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
1191
1192 assertTaskSubmissionsAreRejected(p);
1193 }
1194 assertEquals(saturatedSize(p)
1195 - (shutdownNow ? p.getQueue().remainingCapacity() : 0),
1196 p.getCompletedTaskCount());
1197 }
1198
1199 /**
1200 * executor using DiscardOldestPolicy drops oldest task if saturated.
1201 */
1202 public void testSaturatedExecute_DiscardOldestPolicy() {
1203 final CountDownLatch done = new CountDownLatch(1);
1204 LatchAwaiter r1 = awaiter(done);
1205 LatchAwaiter r2 = awaiter(done);
1206 LatchAwaiter r3 = awaiter(done);
1207 final ThreadPoolExecutor p =
1208 new CustomTPE(1, 1,
1209 LONG_DELAY_MS, MILLISECONDS,
1210 new ArrayBlockingQueue<Runnable>(1),
1211 new ThreadPoolExecutor.DiscardOldestPolicy());
1212 try (PoolCleaner cleaner = cleaner(p, done)) {
1213 assertEquals(LatchAwaiter.NEW, r1.state);
1214 assertEquals(LatchAwaiter.NEW, r2.state);
1215 assertEquals(LatchAwaiter.NEW, r3.state);
1216 p.execute(r1);
1217 p.execute(r2);
1218 assertTrue(p.getQueue().contains(r2));
1219 p.execute(r3);
1220 assertFalse(p.getQueue().contains(r2));
1221 assertTrue(p.getQueue().contains(r3));
1222 }
1223 assertEquals(LatchAwaiter.DONE, r1.state);
1224 assertEquals(LatchAwaiter.NEW, r2.state);
1225 assertEquals(LatchAwaiter.DONE, r3.state);
1226 }
1227
1228 /**
1229 * execute using DiscardOldestPolicy drops task on shutdown
1230 */
1231 public void testDiscardOldestOnShutdown() {
1232 final ThreadPoolExecutor p =
1233 new CustomTPE(1, 1,
1234 LONG_DELAY_MS, MILLISECONDS,
1235 new ArrayBlockingQueue<Runnable>(1),
1236 new ThreadPoolExecutor.DiscardOldestPolicy());
1237
1238 try { p.shutdown(); } catch (SecurityException ok) { return; }
1239 try (PoolCleaner cleaner = cleaner(p)) {
1240 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1241 p.execute(r);
1242 assertFalse(r.done);
1243 }
1244 }
1245
1246 /**
1247 * Submitting null tasks throws NullPointerException
1248 */
1249 public void testNullTaskSubmission() {
1250 final ThreadPoolExecutor p =
1251 new CustomTPE(1, 2,
1252 1L, SECONDS,
1253 new ArrayBlockingQueue<Runnable>(10));
1254 try (PoolCleaner cleaner = cleaner(p)) {
1255 assertNullTaskSubmissionThrowsNullPointerException(p);
1256 }
1257 }
1258
1259 /**
1260 * setCorePoolSize of negative value throws IllegalArgumentException
1261 */
1262 public void testCorePoolSizeIllegalArgumentException() {
1263 final ThreadPoolExecutor p =
1264 new CustomTPE(1, 2,
1265 LONG_DELAY_MS, MILLISECONDS,
1266 new ArrayBlockingQueue<Runnable>(10));
1267 try (PoolCleaner cleaner = cleaner(p)) {
1268 try {
1269 p.setCorePoolSize(-1);
1270 shouldThrow();
1271 } catch (IllegalArgumentException success) {}
1272 }
1273 }
1274
1275 /**
1276 * setMaximumPoolSize(int) throws IllegalArgumentException
1277 * if given a value less the core pool size
1278 */
1279 public void testMaximumPoolSizeIllegalArgumentException() {
1280 final ThreadPoolExecutor p =
1281 new CustomTPE(2, 3,
1282 LONG_DELAY_MS, MILLISECONDS,
1283 new ArrayBlockingQueue<Runnable>(10));
1284 try (PoolCleaner cleaner = cleaner(p)) {
1285 try {
1286 p.setMaximumPoolSize(1);
1287 shouldThrow();
1288 } catch (IllegalArgumentException success) {}
1289 }
1290 }
1291
1292 /**
1293 * setMaximumPoolSize throws IllegalArgumentException
1294 * if given a negative value
1295 */
1296 public void testMaximumPoolSizeIllegalArgumentException2() {
1297 final ThreadPoolExecutor p =
1298 new CustomTPE(2, 3,
1299 LONG_DELAY_MS, MILLISECONDS,
1300 new ArrayBlockingQueue<Runnable>(10));
1301 try (PoolCleaner cleaner = cleaner(p)) {
1302 try {
1303 p.setMaximumPoolSize(-1);
1304 shouldThrow();
1305 } catch (IllegalArgumentException success) {}
1306 }
1307 }
1308
1309 /**
1310 * setKeepAliveTime throws IllegalArgumentException
1311 * when given a negative value
1312 */
1313 public void testKeepAliveTimeIllegalArgumentException() {
1314 final ThreadPoolExecutor p =
1315 new CustomTPE(2, 3,
1316 LONG_DELAY_MS, MILLISECONDS,
1317 new ArrayBlockingQueue<Runnable>(10));
1318 try (PoolCleaner cleaner = cleaner(p)) {
1319 try {
1320 p.setKeepAliveTime(-1, MILLISECONDS);
1321 shouldThrow();
1322 } catch (IllegalArgumentException success) {}
1323 }
1324 }
1325
1326 /**
1327 * terminated() is called on termination
1328 */
1329 public void testTerminated() {
1330 CustomTPE p = new CustomTPE();
1331 try (PoolCleaner cleaner = cleaner(p)) {
1332 try { p.shutdown(); } catch (SecurityException ok) { return; }
1333 assertTrue(p.terminatedCalled());
1334 assertTrue(p.isShutdown());
1335 }
1336 }
1337
1338 /**
1339 * beforeExecute and afterExecute are called when executing task
1340 */
1341 public void testBeforeAfter() throws InterruptedException {
1342 CustomTPE p = new CustomTPE();
1343 try (PoolCleaner cleaner = cleaner(p)) {
1344 final CountDownLatch done = new CountDownLatch(1);
1345 p.execute(new CheckedRunnable() {
1346 public void realRun() {
1347 done.countDown();
1348 }});
1349 await(p.afterCalled);
1350 assertEquals(0, done.getCount());
1351 assertTrue(p.afterCalled());
1352 assertTrue(p.beforeCalled());
1353 }
1354 }
1355
1356 /**
1357 * completed submit of callable returns result
1358 */
1359 public void testSubmitCallable() throws Exception {
1360 final ExecutorService e =
1361 new CustomTPE(2, 2,
1362 LONG_DELAY_MS, MILLISECONDS,
1363 new ArrayBlockingQueue<Runnable>(10));
1364 try (PoolCleaner cleaner = cleaner(e)) {
1365 Future<String> future = e.submit(new StringTask());
1366 String result = future.get();
1367 assertSame(TEST_STRING, result);
1368 }
1369 }
1370
1371 /**
1372 * completed submit of runnable returns successfully
1373 */
1374 public void testSubmitRunnable() throws Exception {
1375 final ExecutorService e =
1376 new CustomTPE(2, 2,
1377 LONG_DELAY_MS, MILLISECONDS,
1378 new ArrayBlockingQueue<Runnable>(10));
1379 try (PoolCleaner cleaner = cleaner(e)) {
1380 Future<?> future = e.submit(new NoOpRunnable());
1381 future.get();
1382 assertTrue(future.isDone());
1383 }
1384 }
1385
1386 /**
1387 * completed submit of (runnable, result) returns result
1388 */
1389 public void testSubmitRunnable2() throws Exception {
1390 final ExecutorService e =
1391 new CustomTPE(2, 2,
1392 LONG_DELAY_MS, MILLISECONDS,
1393 new ArrayBlockingQueue<Runnable>(10));
1394 try (PoolCleaner cleaner = cleaner(e)) {
1395 Future<String> future = e.submit(new NoOpRunnable(), TEST_STRING);
1396 String result = future.get();
1397 assertSame(TEST_STRING, result);
1398 }
1399 }
1400
1401 /**
1402 * invokeAny(null) throws NullPointerException
1403 */
1404 public void testInvokeAny1() throws Exception {
1405 final ExecutorService e =
1406 new CustomTPE(2, 2,
1407 LONG_DELAY_MS, MILLISECONDS,
1408 new ArrayBlockingQueue<Runnable>(10));
1409 try (PoolCleaner cleaner = cleaner(e)) {
1410 try {
1411 e.invokeAny(null);
1412 shouldThrow();
1413 } catch (NullPointerException success) {}
1414 }
1415 }
1416
1417 /**
1418 * invokeAny(empty collection) throws IllegalArgumentException
1419 */
1420 public void testInvokeAny2() throws Exception {
1421 final ExecutorService e =
1422 new CustomTPE(2, 2,
1423 LONG_DELAY_MS, MILLISECONDS,
1424 new ArrayBlockingQueue<Runnable>(10));
1425 try (PoolCleaner cleaner = cleaner(e)) {
1426 try {
1427 e.invokeAny(new ArrayList<Callable<String>>());
1428 shouldThrow();
1429 } catch (IllegalArgumentException success) {}
1430 }
1431 }
1432
1433 /**
1434 * invokeAny(c) throws NPE if c has null elements
1435 */
1436 public void testInvokeAny3() throws Exception {
1437 CountDownLatch latch = new CountDownLatch(1);
1438 final ExecutorService e =
1439 new CustomTPE(2, 2,
1440 LONG_DELAY_MS, MILLISECONDS,
1441 new ArrayBlockingQueue<Runnable>(10));
1442 try (PoolCleaner cleaner = cleaner(e)) {
1443 List<Callable<String>> l = new ArrayList<>();
1444 l.add(latchAwaitingStringTask(latch));
1445 l.add(null);
1446 try {
1447 e.invokeAny(l);
1448 shouldThrow();
1449 } catch (NullPointerException success) {}
1450 latch.countDown();
1451 }
1452 }
1453
1454 /**
1455 * invokeAny(c) throws ExecutionException if no task completes
1456 */
1457 public void testInvokeAny4() throws Exception {
1458 final ExecutorService e =
1459 new CustomTPE(2, 2,
1460 LONG_DELAY_MS, MILLISECONDS,
1461 new ArrayBlockingQueue<Runnable>(10));
1462 try (PoolCleaner cleaner = cleaner(e)) {
1463 List<Callable<String>> l = new ArrayList<>();
1464 l.add(new NPETask());
1465 try {
1466 e.invokeAny(l);
1467 shouldThrow();
1468 } catch (ExecutionException success) {
1469 assertTrue(success.getCause() instanceof NullPointerException);
1470 }
1471 }
1472 }
1473
1474 /**
1475 * invokeAny(c) returns result of some task
1476 */
1477 public void testInvokeAny5() throws Exception {
1478 final ExecutorService e =
1479 new CustomTPE(2, 2,
1480 LONG_DELAY_MS, MILLISECONDS,
1481 new ArrayBlockingQueue<Runnable>(10));
1482 try (PoolCleaner cleaner = cleaner(e)) {
1483 List<Callable<String>> l = new ArrayList<>();
1484 l.add(new StringTask());
1485 l.add(new StringTask());
1486 String result = e.invokeAny(l);
1487 assertSame(TEST_STRING, result);
1488 }
1489 }
1490
1491 /**
1492 * invokeAll(null) throws NPE
1493 */
1494 public void testInvokeAll1() throws Exception {
1495 final ExecutorService e =
1496 new CustomTPE(2, 2,
1497 LONG_DELAY_MS, MILLISECONDS,
1498 new ArrayBlockingQueue<Runnable>(10));
1499 try (PoolCleaner cleaner = cleaner(e)) {
1500 try {
1501 e.invokeAll(null);
1502 shouldThrow();
1503 } catch (NullPointerException success) {}
1504 }
1505 }
1506
1507 /**
1508 * invokeAll(empty collection) returns empty list
1509 */
1510 public void testInvokeAll2() throws Exception {
1511 final ExecutorService e =
1512 new CustomTPE(2, 2,
1513 LONG_DELAY_MS, MILLISECONDS,
1514 new ArrayBlockingQueue<Runnable>(10));
1515 final Collection<Callable<String>> emptyCollection
1516 = Collections.emptyList();
1517 try (PoolCleaner cleaner = cleaner(e)) {
1518 List<Future<String>> r = e.invokeAll(emptyCollection);
1519 assertTrue(r.isEmpty());
1520 }
1521 }
1522
1523 /**
1524 * invokeAll(c) throws NPE if c has null elements
1525 */
1526 public void testInvokeAll3() throws Exception {
1527 final ExecutorService e =
1528 new CustomTPE(2, 2,
1529 LONG_DELAY_MS, MILLISECONDS,
1530 new ArrayBlockingQueue<Runnable>(10));
1531 try (PoolCleaner cleaner = cleaner(e)) {
1532 List<Callable<String>> l = new ArrayList<>();
1533 l.add(new StringTask());
1534 l.add(null);
1535 try {
1536 e.invokeAll(l);
1537 shouldThrow();
1538 } catch (NullPointerException success) {}
1539 }
1540 }
1541
1542 /**
1543 * get of element of invokeAll(c) throws exception on failed task
1544 */
1545 public void testInvokeAll4() throws Exception {
1546 final ExecutorService e =
1547 new CustomTPE(2, 2,
1548 LONG_DELAY_MS, MILLISECONDS,
1549 new ArrayBlockingQueue<Runnable>(10));
1550 try (PoolCleaner cleaner = cleaner(e)) {
1551 List<Callable<String>> l = new ArrayList<>();
1552 l.add(new NPETask());
1553 List<Future<String>> futures = e.invokeAll(l);
1554 assertEquals(1, futures.size());
1555 try {
1556 futures.get(0).get();
1557 shouldThrow();
1558 } catch (ExecutionException success) {
1559 assertTrue(success.getCause() instanceof NullPointerException);
1560 }
1561 }
1562 }
1563
1564 /**
1565 * invokeAll(c) returns results of all completed tasks
1566 */
1567 public void testInvokeAll5() throws Exception {
1568 final ExecutorService e =
1569 new CustomTPE(2, 2,
1570 LONG_DELAY_MS, MILLISECONDS,
1571 new ArrayBlockingQueue<Runnable>(10));
1572 try (PoolCleaner cleaner = cleaner(e)) {
1573 List<Callable<String>> l = new ArrayList<>();
1574 l.add(new StringTask());
1575 l.add(new StringTask());
1576 List<Future<String>> futures = e.invokeAll(l);
1577 assertEquals(2, futures.size());
1578 for (Future<String> future : futures)
1579 assertSame(TEST_STRING, future.get());
1580 }
1581 }
1582
1583 /**
1584 * timed invokeAny(null) throws NPE
1585 */
1586 public void testTimedInvokeAny1() throws Exception {
1587 final ExecutorService e =
1588 new CustomTPE(2, 2,
1589 LONG_DELAY_MS, MILLISECONDS,
1590 new ArrayBlockingQueue<Runnable>(10));
1591 try (PoolCleaner cleaner = cleaner(e)) {
1592 try {
1593 e.invokeAny(null, randomTimeout(), randomTimeUnit());
1594 shouldThrow();
1595 } catch (NullPointerException success) {}
1596 }
1597 }
1598
1599 /**
1600 * timed invokeAny(,,null) throws NPE
1601 */
1602 public void testTimedInvokeAnyNullTimeUnit() throws Exception {
1603 final ExecutorService e =
1604 new CustomTPE(2, 2,
1605 LONG_DELAY_MS, MILLISECONDS,
1606 new ArrayBlockingQueue<Runnable>(10));
1607 try (PoolCleaner cleaner = cleaner(e)) {
1608 List<Callable<String>> l = new ArrayList<>();
1609 l.add(new StringTask());
1610 try {
1611 e.invokeAny(l, randomTimeout(), null);
1612 shouldThrow();
1613 } catch (NullPointerException success) {}
1614 }
1615 }
1616
1617 /**
1618 * timed invokeAny(empty collection) throws IllegalArgumentException
1619 */
1620 public void testTimedInvokeAny2() throws Exception {
1621 final ExecutorService e =
1622 new CustomTPE(2, 2,
1623 LONG_DELAY_MS, MILLISECONDS,
1624 new ArrayBlockingQueue<Runnable>(10));
1625 final Collection<Callable<String>> emptyCollection
1626 = Collections.emptyList();
1627 try (PoolCleaner cleaner = cleaner(e)) {
1628 try {
1629 e.invokeAny(emptyCollection, randomTimeout(), randomTimeUnit());
1630 shouldThrow();
1631 } catch (IllegalArgumentException success) {}
1632 }
1633 }
1634
1635 /**
1636 * timed invokeAny(c) throws NPE if c has null elements
1637 */
1638 public void testTimedInvokeAny3() throws Exception {
1639 CountDownLatch latch = new CountDownLatch(1);
1640 final ExecutorService e =
1641 new CustomTPE(2, 2,
1642 LONG_DELAY_MS, MILLISECONDS,
1643 new ArrayBlockingQueue<Runnable>(10));
1644 try (PoolCleaner cleaner = cleaner(e)) {
1645 List<Callable<String>> l = new ArrayList<>();
1646 l.add(latchAwaitingStringTask(latch));
1647 l.add(null);
1648 try {
1649 e.invokeAny(l, randomTimeout(), randomTimeUnit());
1650 shouldThrow();
1651 } catch (NullPointerException success) {}
1652 latch.countDown();
1653 }
1654 }
1655
1656 /**
1657 * timed invokeAny(c) throws ExecutionException if no task completes
1658 */
1659 public void testTimedInvokeAny4() throws Exception {
1660 final ExecutorService e =
1661 new CustomTPE(2, 2,
1662 LONG_DELAY_MS, MILLISECONDS,
1663 new ArrayBlockingQueue<Runnable>(10));
1664 try (PoolCleaner cleaner = cleaner(e)) {
1665 long startTime = System.nanoTime();
1666 List<Callable<String>> l = new ArrayList<>();
1667 l.add(new NPETask());
1668 try {
1669 e.invokeAny(l, LONG_DELAY_MS, MILLISECONDS);
1670 shouldThrow();
1671 } catch (ExecutionException success) {
1672 assertTrue(success.getCause() instanceof NullPointerException);
1673 }
1674 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1675 }
1676 }
1677
1678 /**
1679 * timed invokeAny(c) returns result of some task
1680 */
1681 public void testTimedInvokeAny5() throws Exception {
1682 final ExecutorService e =
1683 new CustomTPE(2, 2,
1684 LONG_DELAY_MS, MILLISECONDS,
1685 new ArrayBlockingQueue<Runnable>(10));
1686 try (PoolCleaner cleaner = cleaner(e)) {
1687 long startTime = System.nanoTime();
1688 List<Callable<String>> l = new ArrayList<>();
1689 l.add(new StringTask());
1690 l.add(new StringTask());
1691 String result = e.invokeAny(l, LONG_DELAY_MS, MILLISECONDS);
1692 assertSame(TEST_STRING, result);
1693 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1694 }
1695 }
1696
1697 /**
1698 * timed invokeAll(null) throws NPE
1699 */
1700 public void testTimedInvokeAll1() throws Exception {
1701 final ExecutorService e =
1702 new CustomTPE(2, 2,
1703 LONG_DELAY_MS, MILLISECONDS,
1704 new ArrayBlockingQueue<Runnable>(10));
1705 try (PoolCleaner cleaner = cleaner(e)) {
1706 try {
1707 e.invokeAll(null, randomTimeout(), randomTimeUnit());
1708 shouldThrow();
1709 } catch (NullPointerException success) {}
1710 }
1711 }
1712
1713 /**
1714 * timed invokeAll(,,null) throws NPE
1715 */
1716 public void testTimedInvokeAllNullTimeUnit() throws Exception {
1717 final ExecutorService e =
1718 new CustomTPE(2, 2,
1719 LONG_DELAY_MS, MILLISECONDS,
1720 new ArrayBlockingQueue<Runnable>(10));
1721 try (PoolCleaner cleaner = cleaner(e)) {
1722 List<Callable<String>> l = new ArrayList<>();
1723 l.add(new StringTask());
1724 try {
1725 e.invokeAll(l, randomTimeout(), null);
1726 shouldThrow();
1727 } catch (NullPointerException success) {}
1728 }
1729 }
1730
1731 /**
1732 * timed invokeAll(empty collection) returns empty list
1733 */
1734 public void testTimedInvokeAll2() throws Exception {
1735 final ExecutorService e =
1736 new CustomTPE(2, 2,
1737 LONG_DELAY_MS, MILLISECONDS,
1738 new ArrayBlockingQueue<Runnable>(10));
1739 final Collection<Callable<String>> emptyCollection
1740 = Collections.emptyList();
1741 try (PoolCleaner cleaner = cleaner(e)) {
1742 List<Future<String>> r =
1743 e.invokeAll(emptyCollection, randomTimeout(), randomTimeUnit());
1744 assertTrue(r.isEmpty());
1745 }
1746 }
1747
1748 /**
1749 * timed invokeAll(c) throws NPE if c has null elements
1750 */
1751 public void testTimedInvokeAll3() throws Exception {
1752 final ExecutorService e =
1753 new CustomTPE(2, 2,
1754 LONG_DELAY_MS, MILLISECONDS,
1755 new ArrayBlockingQueue<Runnable>(10));
1756 try (PoolCleaner cleaner = cleaner(e)) {
1757 List<Callable<String>> l = new ArrayList<>();
1758 l.add(new StringTask());
1759 l.add(null);
1760 try {
1761 e.invokeAll(l, randomTimeout(), randomTimeUnit());
1762 shouldThrow();
1763 } catch (NullPointerException success) {}
1764 }
1765 }
1766
1767 /**
1768 * get of element of invokeAll(c) throws exception on failed task
1769 */
1770 public void testTimedInvokeAll4() throws Exception {
1771 final ExecutorService e =
1772 new CustomTPE(2, 2,
1773 LONG_DELAY_MS, MILLISECONDS,
1774 new ArrayBlockingQueue<Runnable>(10));
1775 try (PoolCleaner cleaner = cleaner(e)) {
1776 List<Callable<String>> l = new ArrayList<>();
1777 l.add(new NPETask());
1778 List<Future<String>> futures =
1779 e.invokeAll(l, LONG_DELAY_MS, MILLISECONDS);
1780 assertEquals(1, futures.size());
1781 try {
1782 futures.get(0).get();
1783 shouldThrow();
1784 } catch (ExecutionException success) {
1785 assertTrue(success.getCause() instanceof NullPointerException);
1786 }
1787 }
1788 }
1789
1790 /**
1791 * timed invokeAll(c) returns results of all completed tasks
1792 */
1793 public void testTimedInvokeAll5() throws Exception {
1794 final ExecutorService e =
1795 new CustomTPE(2, 2,
1796 LONG_DELAY_MS, MILLISECONDS,
1797 new ArrayBlockingQueue<Runnable>(10));
1798 try (PoolCleaner cleaner = cleaner(e)) {
1799 List<Callable<String>> l = new ArrayList<>();
1800 l.add(new StringTask());
1801 l.add(new StringTask());
1802 List<Future<String>> futures =
1803 e.invokeAll(l, LONG_DELAY_MS, MILLISECONDS);
1804 assertEquals(2, futures.size());
1805 for (Future<String> future : futures)
1806 assertSame(TEST_STRING, future.get());
1807 }
1808 }
1809
1810 /**
1811 * timed invokeAll(c) cancels tasks not completed by timeout
1812 */
1813 public void testTimedInvokeAll6() throws Exception {
1814 for (long timeout = timeoutMillis();;) {
1815 final CountDownLatch done = new CountDownLatch(1);
1816 final Callable<String> waiter = new CheckedCallable<>() {
1817 public String realCall() {
1818 try { done.await(LONG_DELAY_MS, MILLISECONDS); }
1819 catch (InterruptedException ok) {}
1820 return "1"; }};
1821 final ExecutorService p =
1822 new CustomTPE(2, 2,
1823 LONG_DELAY_MS, MILLISECONDS,
1824 new ArrayBlockingQueue<Runnable>(10));
1825 try (PoolCleaner cleaner = cleaner(p, done)) {
1826 List<Callable<String>> tasks = new ArrayList<>();
1827 tasks.add(new StringTask("0"));
1828 tasks.add(waiter);
1829 tasks.add(new StringTask("2"));
1830 long startTime = System.nanoTime();
1831 List<Future<String>> futures =
1832 p.invokeAll(tasks, timeout, MILLISECONDS);
1833 assertEquals(tasks.size(), futures.size());
1834 assertTrue(millisElapsedSince(startTime) >= timeout);
1835 for (Future<?> future : futures)
1836 assertTrue(future.isDone());
1837 assertTrue(futures.get(1).isCancelled());
1838 try {
1839 assertEquals("0", futures.get(0).get());
1840 assertEquals("2", futures.get(2).get());
1841 break;
1842 } catch (CancellationException retryWithLongerTimeout) {
1843 timeout *= 2;
1844 if (timeout >= LONG_DELAY_MS / 2)
1845 fail("expected exactly one task to be cancelled");
1846 }
1847 }
1848 }
1849 }
1850
1851 /**
1852 * Execution continues if there is at least one thread even if
1853 * thread factory fails to create more
1854 */
1855 public void testFailingThreadFactory() throws InterruptedException {
1856 final ExecutorService e =
1857 new CustomTPE(100, 100,
1858 LONG_DELAY_MS, MILLISECONDS,
1859 new LinkedBlockingQueue<Runnable>(),
1860 new FailingThreadFactory());
1861 try (PoolCleaner cleaner = cleaner(e)) {
1862 final int TASKS = 100;
1863 final CountDownLatch done = new CountDownLatch(TASKS);
1864 for (int k = 0; k < TASKS; ++k)
1865 e.execute(new CheckedRunnable() {
1866 public void realRun() {
1867 done.countDown();
1868 }});
1869 await(done);
1870 }
1871 }
1872
1873 /**
1874 * allowsCoreThreadTimeOut is by default false.
1875 */
1876 public void testAllowsCoreThreadTimeOut() {
1877 final ThreadPoolExecutor p =
1878 new CustomTPE(2, 2,
1879 1000, MILLISECONDS,
1880 new ArrayBlockingQueue<Runnable>(10));
1881 try (PoolCleaner cleaner = cleaner(p)) {
1882 assertFalse(p.allowsCoreThreadTimeOut());
1883 }
1884 }
1885
1886 /**
1887 * allowCoreThreadTimeOut(true) causes idle threads to time out
1888 */
1889 public void testAllowCoreThreadTimeOut_true() throws Exception {
1890 long keepAliveTime = timeoutMillis();
1891 final ThreadPoolExecutor p =
1892 new CustomTPE(2, 10,
1893 keepAliveTime, MILLISECONDS,
1894 new ArrayBlockingQueue<Runnable>(10));
1895 try (PoolCleaner cleaner = cleaner(p)) {
1896 final CountDownLatch threadStarted = new CountDownLatch(1);
1897 p.allowCoreThreadTimeOut(true);
1898 p.execute(new CheckedRunnable() {
1899 public void realRun() {
1900 threadStarted.countDown();
1901 assertEquals(1, p.getPoolSize());
1902 }});
1903 await(threadStarted);
1904 delay(keepAliveTime);
1905 long startTime = System.nanoTime();
1906 while (p.getPoolSize() > 0
1907 && millisElapsedSince(startTime) < LONG_DELAY_MS)
1908 Thread.yield();
1909 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1910 assertEquals(0, p.getPoolSize());
1911 }
1912 }
1913
1914 /**
1915 * allowCoreThreadTimeOut(false) causes idle threads not to time out
1916 */
1917 public void testAllowCoreThreadTimeOut_false() throws Exception {
1918 long keepAliveTime = timeoutMillis();
1919 final ThreadPoolExecutor p =
1920 new CustomTPE(2, 10,
1921 keepAliveTime, MILLISECONDS,
1922 new ArrayBlockingQueue<Runnable>(10));
1923 try (PoolCleaner cleaner = cleaner(p)) {
1924 final CountDownLatch threadStarted = new CountDownLatch(1);
1925 p.allowCoreThreadTimeOut(false);
1926 p.execute(new CheckedRunnable() {
1927 public void realRun() throws InterruptedException {
1928 threadStarted.countDown();
1929 assertTrue(p.getPoolSize() >= 1);
1930 }});
1931 delay(2 * keepAliveTime);
1932 assertTrue(p.getPoolSize() >= 1);
1933 }
1934 }
1935
1936 /**
1937 * get(cancelled task) throws CancellationException
1938 * (in part, a test of CustomTPE itself)
1939 */
1940 public void testGet_cancelled() throws Exception {
1941 final CountDownLatch done = new CountDownLatch(1);
1942 final ExecutorService e =
1943 new CustomTPE(1, 1,
1944 LONG_DELAY_MS, MILLISECONDS,
1945 new LinkedBlockingQueue<Runnable>());
1946 try (PoolCleaner cleaner = cleaner(e, done)) {
1947 final CountDownLatch blockerStarted = new CountDownLatch(1);
1948 final List<Future<?>> futures = new ArrayList<>();
1949 for (int i = 0; i < 2; i++) {
1950 Runnable r = new CheckedRunnable() { public void realRun()
1951 throws Throwable {
1952 blockerStarted.countDown();
1953 assertTrue(done.await(2 * LONG_DELAY_MS, MILLISECONDS));
1954 }};
1955 futures.add(e.submit(r));
1956 }
1957 await(blockerStarted);
1958 for (Future<?> future : futures) future.cancel(false);
1959 for (Future<?> future : futures) {
1960 try {
1961 future.get();
1962 shouldThrow();
1963 } catch (CancellationException success) {}
1964 try {
1965 future.get(LONG_DELAY_MS, MILLISECONDS);
1966 shouldThrow();
1967 } catch (CancellationException success) {}
1968 assertTrue(future.isCancelled());
1969 assertTrue(future.isDone());
1970 }
1971 }
1972 }
1973 @SuppressWarnings("removal")
1974 public void testFinalizeMethodCallsSuperFinalize() {
1975 new CustomTPE(1, 1,
1976 LONG_DELAY_MS, MILLISECONDS,
1977 new LinkedBlockingQueue<Runnable>()) {
1978
1979 /**
1980 * A finalize method without "throws Throwable", that
1981 * calls super.finalize().
1982 */
1983 protected void finalize() {
1984 super.finalize();
1985 }
1986 }.shutdown();
1987 }
1988
1989 }