ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/ThreadPoolExecutorSubclassTest.java
Revision: 1.58
Committed: Sun Oct 4 01:52:43 2015 UTC (8 years, 7 months ago) by jsr166
Branch: MAIN
Changes since 1.57: +9 -7 lines
Log Message:
improve testSetThreadFactoryNull

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include Andrew Wright, Jeffrey Hayes,
6 * Pat Fisher, Mike Judd.
7 */
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.SECONDS;
11
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.concurrent.ArrayBlockingQueue;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.CancellationException;
18 import java.util.concurrent.CountDownLatch;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.FutureTask;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.RejectedExecutionHandler;
27 import java.util.concurrent.RunnableFuture;
28 import java.util.concurrent.SynchronousQueue;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.TimeoutException;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.locks.Condition;
35 import java.util.concurrent.locks.ReentrantLock;
36
37 import junit.framework.Test;
38 import junit.framework.TestSuite;
39
40 public class ThreadPoolExecutorSubclassTest extends JSR166TestCase {
41 public static void main(String[] args) {
42 main(suite(), args);
43 }
44 public static Test suite() {
45 return new TestSuite(ThreadPoolExecutorSubclassTest.class);
46 }
47
48 static class CustomTask<V> implements RunnableFuture<V> {
49 final Callable<V> callable;
50 final ReentrantLock lock = new ReentrantLock();
51 final Condition cond = lock.newCondition();
52 boolean done;
53 boolean cancelled;
54 V result;
55 Thread thread;
56 Exception exception;
57 CustomTask(Callable<V> c) {
58 if (c == null) throw new NullPointerException();
59 callable = c;
60 }
61 CustomTask(final Runnable r, final V res) {
62 if (r == null) throw new NullPointerException();
63 callable = new Callable<V>() {
64 public V call() throws Exception { r.run(); return res; }};
65 }
66 public boolean isDone() {
67 lock.lock(); try { return done; } finally { lock.unlock() ; }
68 }
69 public boolean isCancelled() {
70 lock.lock(); try { return cancelled; } finally { lock.unlock() ; }
71 }
72 public boolean cancel(boolean mayInterrupt) {
73 lock.lock();
74 try {
75 if (!done) {
76 cancelled = true;
77 done = true;
78 if (mayInterrupt && thread != null)
79 thread.interrupt();
80 return true;
81 }
82 return false;
83 }
84 finally { lock.unlock() ; }
85 }
86 public void run() {
87 lock.lock();
88 try {
89 if (done)
90 return;
91 thread = Thread.currentThread();
92 }
93 finally { lock.unlock() ; }
94 V v = null;
95 Exception e = null;
96 try {
97 v = callable.call();
98 }
99 catch (Exception ex) {
100 e = ex;
101 }
102 lock.lock();
103 try {
104 if (!done) {
105 result = v;
106 exception = e;
107 done = true;
108 thread = null;
109 cond.signalAll();
110 }
111 }
112 finally { lock.unlock(); }
113 }
114 public V get() throws InterruptedException, ExecutionException {
115 lock.lock();
116 try {
117 while (!done)
118 cond.await();
119 if (cancelled)
120 throw new CancellationException();
121 if (exception != null)
122 throw new ExecutionException(exception);
123 return result;
124 }
125 finally { lock.unlock(); }
126 }
127 public V get(long timeout, TimeUnit unit)
128 throws InterruptedException, ExecutionException, TimeoutException {
129 long nanos = unit.toNanos(timeout);
130 lock.lock();
131 try {
132 while (!done) {
133 if (nanos <= 0L)
134 throw new TimeoutException();
135 nanos = cond.awaitNanos(nanos);
136 }
137 if (cancelled)
138 throw new CancellationException();
139 if (exception != null)
140 throw new ExecutionException(exception);
141 return result;
142 }
143 finally { lock.unlock(); }
144 }
145 }
146
147 static class CustomTPE extends ThreadPoolExecutor {
148 protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
149 return new CustomTask<V>(c);
150 }
151 protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
152 return new CustomTask<V>(r, v);
153 }
154
155 CustomTPE(int corePoolSize,
156 int maximumPoolSize,
157 long keepAliveTime,
158 TimeUnit unit,
159 BlockingQueue<Runnable> workQueue) {
160 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
161 workQueue);
162 }
163 CustomTPE(int corePoolSize,
164 int maximumPoolSize,
165 long keepAliveTime,
166 TimeUnit unit,
167 BlockingQueue<Runnable> workQueue,
168 ThreadFactory threadFactory) {
169 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
170 threadFactory);
171 }
172
173 CustomTPE(int corePoolSize,
174 int maximumPoolSize,
175 long keepAliveTime,
176 TimeUnit unit,
177 BlockingQueue<Runnable> workQueue,
178 RejectedExecutionHandler handler) {
179 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
180 handler);
181 }
182 CustomTPE(int corePoolSize,
183 int maximumPoolSize,
184 long keepAliveTime,
185 TimeUnit unit,
186 BlockingQueue<Runnable> workQueue,
187 ThreadFactory threadFactory,
188 RejectedExecutionHandler handler) {
189 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
190 workQueue, threadFactory, handler);
191 }
192
193 final CountDownLatch beforeCalled = new CountDownLatch(1);
194 final CountDownLatch afterCalled = new CountDownLatch(1);
195 final CountDownLatch terminatedCalled = new CountDownLatch(1);
196
197 public CustomTPE() {
198 super(1, 1, LONG_DELAY_MS, MILLISECONDS, new SynchronousQueue<Runnable>());
199 }
200 protected void beforeExecute(Thread t, Runnable r) {
201 beforeCalled.countDown();
202 }
203 protected void afterExecute(Runnable r, Throwable t) {
204 afterCalled.countDown();
205 }
206 protected void terminated() {
207 terminatedCalled.countDown();
208 }
209
210 public boolean beforeCalled() {
211 return beforeCalled.getCount() == 0;
212 }
213 public boolean afterCalled() {
214 return afterCalled.getCount() == 0;
215 }
216 public boolean terminatedCalled() {
217 return terminatedCalled.getCount() == 0;
218 }
219 }
220
221 static class FailingThreadFactory implements ThreadFactory {
222 int calls = 0;
223 public Thread newThread(Runnable r) {
224 if (++calls > 1) return null;
225 return new Thread(r);
226 }
227 }
228
229 /**
230 * execute successfully executes a runnable
231 */
232 public void testExecute() throws InterruptedException {
233 final ThreadPoolExecutor p =
234 new CustomTPE(1, 1,
235 2 * LONG_DELAY_MS, MILLISECONDS,
236 new ArrayBlockingQueue<Runnable>(10));
237 try (PoolCleaner cleaner = cleaner(p)) {
238 final CountDownLatch done = new CountDownLatch(1);
239 final Runnable task = new CheckedRunnable() {
240 public void realRun() { done.countDown(); }};
241 p.execute(task);
242 assertTrue(done.await(LONG_DELAY_MS, MILLISECONDS));
243 }
244 }
245
246 /**
247 * getActiveCount increases but doesn't overestimate, when a
248 * thread becomes active
249 */
250 public void testGetActiveCount() throws InterruptedException {
251 final ThreadPoolExecutor p =
252 new CustomTPE(2, 2,
253 LONG_DELAY_MS, MILLISECONDS,
254 new ArrayBlockingQueue<Runnable>(10));
255 final CountDownLatch threadStarted = new CountDownLatch(1);
256 final CountDownLatch done = new CountDownLatch(1);
257 try (PoolCleaner cleaner = cleaner(p)) {
258 assertEquals(0, p.getActiveCount());
259 p.execute(new CheckedRunnable() {
260 public void realRun() throws InterruptedException {
261 threadStarted.countDown();
262 assertEquals(1, p.getActiveCount());
263 done.await();
264 }});
265 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
266 assertEquals(1, p.getActiveCount());
267 done.countDown();
268 }
269 }
270
271 /**
272 * prestartCoreThread starts a thread if under corePoolSize, else doesn't
273 */
274 public void testPrestartCoreThread() {
275 ThreadPoolExecutor p =
276 new CustomTPE(2, 6,
277 LONG_DELAY_MS, MILLISECONDS,
278 new ArrayBlockingQueue<Runnable>(10));
279 try (PoolCleaner cleaner = cleaner(p)) {
280 assertEquals(0, p.getPoolSize());
281 assertTrue(p.prestartCoreThread());
282 assertEquals(1, p.getPoolSize());
283 assertTrue(p.prestartCoreThread());
284 assertEquals(2, p.getPoolSize());
285 assertFalse(p.prestartCoreThread());
286 assertEquals(2, p.getPoolSize());
287 p.setCorePoolSize(4);
288 assertTrue(p.prestartCoreThread());
289 assertEquals(3, p.getPoolSize());
290 assertTrue(p.prestartCoreThread());
291 assertEquals(4, p.getPoolSize());
292 assertFalse(p.prestartCoreThread());
293 assertEquals(4, p.getPoolSize());
294 }
295 }
296
297 /**
298 * prestartAllCoreThreads starts all corePoolSize threads
299 */
300 public void testPrestartAllCoreThreads() {
301 ThreadPoolExecutor p =
302 new CustomTPE(2, 6,
303 LONG_DELAY_MS, MILLISECONDS,
304 new ArrayBlockingQueue<Runnable>(10));
305 try (PoolCleaner cleaner = cleaner(p)) {
306 assertEquals(0, p.getPoolSize());
307 p.prestartAllCoreThreads();
308 assertEquals(2, p.getPoolSize());
309 p.prestartAllCoreThreads();
310 assertEquals(2, p.getPoolSize());
311 p.setCorePoolSize(4);
312 p.prestartAllCoreThreads();
313 assertEquals(4, p.getPoolSize());
314 p.prestartAllCoreThreads();
315 assertEquals(4, p.getPoolSize());
316 }
317 }
318
319 /**
320 * getCompletedTaskCount increases, but doesn't overestimate,
321 * when tasks complete
322 */
323 public void testGetCompletedTaskCount() throws InterruptedException {
324 final ThreadPoolExecutor p =
325 new CustomTPE(2, 2,
326 LONG_DELAY_MS, MILLISECONDS,
327 new ArrayBlockingQueue<Runnable>(10));
328 try (PoolCleaner cleaner = cleaner(p)) {
329 final CountDownLatch threadStarted = new CountDownLatch(1);
330 final CountDownLatch threadProceed = new CountDownLatch(1);
331 final CountDownLatch threadDone = new CountDownLatch(1);
332 assertEquals(0, p.getCompletedTaskCount());
333 p.execute(new CheckedRunnable() {
334 public void realRun() throws InterruptedException {
335 threadStarted.countDown();
336 assertEquals(0, p.getCompletedTaskCount());
337 threadProceed.await();
338 threadDone.countDown();
339 }});
340 await(threadStarted);
341 assertEquals(0, p.getCompletedTaskCount());
342 threadProceed.countDown();
343 threadDone.await();
344 long startTime = System.nanoTime();
345 while (p.getCompletedTaskCount() != 1) {
346 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
347 fail("timed out");
348 Thread.yield();
349 }
350 }
351 }
352
353 /**
354 * getCorePoolSize returns size given in constructor if not otherwise set
355 */
356 public void testGetCorePoolSize() {
357 ThreadPoolExecutor p =
358 new CustomTPE(1, 1,
359 LONG_DELAY_MS, MILLISECONDS,
360 new ArrayBlockingQueue<Runnable>(10));
361 try (PoolCleaner cleaner = cleaner(p)) {
362 assertEquals(1, p.getCorePoolSize());
363 }
364 }
365
366 /**
367 * getKeepAliveTime returns value given in constructor if not otherwise set
368 */
369 public void testGetKeepAliveTime() {
370 ThreadPoolExecutor p =
371 new CustomTPE(2, 2,
372 1000, MILLISECONDS,
373 new ArrayBlockingQueue<Runnable>(10));
374 try (PoolCleaner cleaner = cleaner(p)) {
375 assertEquals(1, p.getKeepAliveTime(SECONDS));
376 }
377 }
378
379 /**
380 * getThreadFactory returns factory in constructor if not set
381 */
382 public void testGetThreadFactory() {
383 ThreadFactory threadFactory = new SimpleThreadFactory();
384 ThreadPoolExecutor p =
385 new CustomTPE(1, 2,
386 LONG_DELAY_MS, MILLISECONDS,
387 new ArrayBlockingQueue<Runnable>(10),
388 threadFactory,
389 new NoOpREHandler());
390 try (PoolCleaner cleaner = cleaner(p)) {
391 assertSame(threadFactory, p.getThreadFactory());
392 }
393 }
394
395 /**
396 * setThreadFactory sets the thread factory returned by getThreadFactory
397 */
398 public void testSetThreadFactory() {
399 ThreadPoolExecutor p =
400 new CustomTPE(1, 2,
401 LONG_DELAY_MS, MILLISECONDS,
402 new ArrayBlockingQueue<Runnable>(10));
403 try (PoolCleaner cleaner = cleaner(p)) {
404 ThreadFactory threadFactory = new SimpleThreadFactory();
405 p.setThreadFactory(threadFactory);
406 assertSame(threadFactory, p.getThreadFactory());
407 }
408 }
409
410 /**
411 * setThreadFactory(null) throws NPE
412 */
413 public void testSetThreadFactoryNull() {
414 ThreadPoolExecutor p =
415 new CustomTPE(1, 2,
416 LONG_DELAY_MS, MILLISECONDS,
417 new ArrayBlockingQueue<Runnable>(10));
418 try (PoolCleaner cleaner = cleaner(p)) {
419 try {
420 p.setThreadFactory(null);
421 shouldThrow();
422 } catch (NullPointerException success) {}
423 }
424 }
425
426 /**
427 * getRejectedExecutionHandler returns handler in constructor if not set
428 */
429 public void testGetRejectedExecutionHandler() {
430 RejectedExecutionHandler h = new NoOpREHandler();
431 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), h);
432 assertSame(h, p.getRejectedExecutionHandler());
433 joinPool(p);
434 }
435
436 /**
437 * setRejectedExecutionHandler sets the handler returned by
438 * getRejectedExecutionHandler
439 */
440 public void testSetRejectedExecutionHandler() {
441 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
442 RejectedExecutionHandler h = new NoOpREHandler();
443 p.setRejectedExecutionHandler(h);
444 assertSame(h, p.getRejectedExecutionHandler());
445 joinPool(p);
446 }
447
448 /**
449 * setRejectedExecutionHandler(null) throws NPE
450 */
451 public void testSetRejectedExecutionHandlerNull() {
452 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
453 try {
454 p.setRejectedExecutionHandler(null);
455 shouldThrow();
456 } catch (NullPointerException success) {
457 } finally {
458 joinPool(p);
459 }
460 }
461
462 /**
463 * getLargestPoolSize increases, but doesn't overestimate, when
464 * multiple threads active
465 */
466 public void testGetLargestPoolSize() throws InterruptedException {
467 final int THREADS = 3;
468 final ThreadPoolExecutor p =
469 new CustomTPE(THREADS, THREADS,
470 LONG_DELAY_MS, MILLISECONDS,
471 new ArrayBlockingQueue<Runnable>(10));
472 final CountDownLatch threadsStarted = new CountDownLatch(THREADS);
473 final CountDownLatch done = new CountDownLatch(1);
474 try {
475 assertEquals(0, p.getLargestPoolSize());
476 for (int i = 0; i < THREADS; i++)
477 p.execute(new CheckedRunnable() {
478 public void realRun() throws InterruptedException {
479 threadsStarted.countDown();
480 done.await();
481 assertEquals(THREADS, p.getLargestPoolSize());
482 }});
483 assertTrue(threadsStarted.await(SMALL_DELAY_MS, MILLISECONDS));
484 assertEquals(THREADS, p.getLargestPoolSize());
485 } finally {
486 done.countDown();
487 joinPool(p);
488 assertEquals(THREADS, p.getLargestPoolSize());
489 }
490 }
491
492 /**
493 * getMaximumPoolSize returns value given in constructor if not
494 * otherwise set
495 */
496 public void testGetMaximumPoolSize() {
497 ThreadPoolExecutor p = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
498 assertEquals(2, p.getMaximumPoolSize());
499 joinPool(p);
500 }
501
502 /**
503 * getPoolSize increases, but doesn't overestimate, when threads
504 * become active
505 */
506 public void testGetPoolSize() throws InterruptedException {
507 final ThreadPoolExecutor p =
508 new CustomTPE(1, 1,
509 LONG_DELAY_MS, MILLISECONDS,
510 new ArrayBlockingQueue<Runnable>(10));
511 final CountDownLatch threadStarted = new CountDownLatch(1);
512 final CountDownLatch done = new CountDownLatch(1);
513 try {
514 assertEquals(0, p.getPoolSize());
515 p.execute(new CheckedRunnable() {
516 public void realRun() throws InterruptedException {
517 threadStarted.countDown();
518 assertEquals(1, p.getPoolSize());
519 done.await();
520 }});
521 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
522 assertEquals(1, p.getPoolSize());
523 } finally {
524 done.countDown();
525 joinPool(p);
526 }
527 }
528
529 /**
530 * getTaskCount increases, but doesn't overestimate, when tasks submitted
531 */
532 public void testGetTaskCount() throws InterruptedException {
533 final ThreadPoolExecutor p =
534 new CustomTPE(1, 1,
535 LONG_DELAY_MS, MILLISECONDS,
536 new ArrayBlockingQueue<Runnable>(10));
537 final CountDownLatch threadStarted = new CountDownLatch(1);
538 final CountDownLatch done = new CountDownLatch(1);
539 try {
540 assertEquals(0, p.getTaskCount());
541 p.execute(new CheckedRunnable() {
542 public void realRun() throws InterruptedException {
543 threadStarted.countDown();
544 assertEquals(1, p.getTaskCount());
545 done.await();
546 }});
547 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
548 assertEquals(1, p.getTaskCount());
549 } finally {
550 done.countDown();
551 joinPool(p);
552 }
553 }
554
555 /**
556 * isShutdown is false before shutdown, true after
557 */
558 public void testIsShutdown() {
559
560 ThreadPoolExecutor p = new CustomTPE(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
561 assertFalse(p.isShutdown());
562 try { p.shutdown(); } catch (SecurityException ok) { return; }
563 assertTrue(p.isShutdown());
564 joinPool(p);
565 }
566
567 /**
568 * isTerminated is false before termination, true after
569 */
570 public void testIsTerminated() throws InterruptedException {
571 final ThreadPoolExecutor p =
572 new CustomTPE(1, 1,
573 LONG_DELAY_MS, MILLISECONDS,
574 new ArrayBlockingQueue<Runnable>(10));
575 final CountDownLatch threadStarted = new CountDownLatch(1);
576 final CountDownLatch done = new CountDownLatch(1);
577 try {
578 assertFalse(p.isTerminating());
579 p.execute(new CheckedRunnable() {
580 public void realRun() throws InterruptedException {
581 assertFalse(p.isTerminating());
582 threadStarted.countDown();
583 done.await();
584 }});
585 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
586 assertFalse(p.isTerminating());
587 done.countDown();
588 } finally {
589 try { p.shutdown(); } catch (SecurityException ok) { return; }
590 }
591 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
592 assertTrue(p.isTerminated());
593 assertFalse(p.isTerminating());
594 }
595
596 /**
597 * isTerminating is not true when running or when terminated
598 */
599 public void testIsTerminating() throws InterruptedException {
600 final ThreadPoolExecutor p =
601 new CustomTPE(1, 1,
602 LONG_DELAY_MS, MILLISECONDS,
603 new ArrayBlockingQueue<Runnable>(10));
604 final CountDownLatch threadStarted = new CountDownLatch(1);
605 final CountDownLatch done = new CountDownLatch(1);
606 try {
607 assertFalse(p.isTerminating());
608 p.execute(new CheckedRunnable() {
609 public void realRun() throws InterruptedException {
610 assertFalse(p.isTerminating());
611 threadStarted.countDown();
612 done.await();
613 }});
614 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
615 assertFalse(p.isTerminating());
616 done.countDown();
617 } finally {
618 try { p.shutdown(); } catch (SecurityException ok) { return; }
619 }
620 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
621 assertTrue(p.isTerminated());
622 assertFalse(p.isTerminating());
623 }
624
625 /**
626 * getQueue returns the work queue, which contains queued tasks
627 */
628 public void testGetQueue() throws InterruptedException {
629 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
630 final ThreadPoolExecutor p =
631 new CustomTPE(1, 1,
632 LONG_DELAY_MS, MILLISECONDS,
633 q);
634 final CountDownLatch threadStarted = new CountDownLatch(1);
635 final CountDownLatch done = new CountDownLatch(1);
636 try {
637 FutureTask[] tasks = new FutureTask[5];
638 for (int i = 0; i < tasks.length; i++) {
639 Callable task = new CheckedCallable<Boolean>() {
640 public Boolean realCall() throws InterruptedException {
641 threadStarted.countDown();
642 assertSame(q, p.getQueue());
643 done.await();
644 return Boolean.TRUE;
645 }};
646 tasks[i] = new FutureTask(task);
647 p.execute(tasks[i]);
648 }
649 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
650 assertSame(q, p.getQueue());
651 assertFalse(q.contains(tasks[0]));
652 assertTrue(q.contains(tasks[tasks.length - 1]));
653 assertEquals(tasks.length - 1, q.size());
654 } finally {
655 done.countDown();
656 joinPool(p);
657 }
658 }
659
660 /**
661 * remove(task) removes queued task, and fails to remove active task
662 */
663 public void testRemove() throws InterruptedException {
664 BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
665 final ThreadPoolExecutor p =
666 new CustomTPE(1, 1,
667 LONG_DELAY_MS, MILLISECONDS,
668 q);
669 Runnable[] tasks = new Runnable[6];
670 final CountDownLatch threadStarted = new CountDownLatch(1);
671 final CountDownLatch done = new CountDownLatch(1);
672 try {
673 for (int i = 0; i < tasks.length; i++) {
674 tasks[i] = new CheckedRunnable() {
675 public void realRun() throws InterruptedException {
676 threadStarted.countDown();
677 done.await();
678 }};
679 p.execute(tasks[i]);
680 }
681 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
682 assertFalse(p.remove(tasks[0]));
683 assertTrue(q.contains(tasks[4]));
684 assertTrue(q.contains(tasks[3]));
685 assertTrue(p.remove(tasks[4]));
686 assertFalse(p.remove(tasks[4]));
687 assertFalse(q.contains(tasks[4]));
688 assertTrue(q.contains(tasks[3]));
689 assertTrue(p.remove(tasks[3]));
690 assertFalse(q.contains(tasks[3]));
691 } finally {
692 done.countDown();
693 joinPool(p);
694 }
695 }
696
697 /**
698 * purge removes cancelled tasks from the queue
699 */
700 public void testPurge() throws InterruptedException {
701 final CountDownLatch threadStarted = new CountDownLatch(1);
702 final CountDownLatch done = new CountDownLatch(1);
703 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
704 final ThreadPoolExecutor p =
705 new CustomTPE(1, 1,
706 LONG_DELAY_MS, MILLISECONDS,
707 q);
708 FutureTask[] tasks = new FutureTask[5];
709 try {
710 for (int i = 0; i < tasks.length; i++) {
711 Callable task = new CheckedCallable<Boolean>() {
712 public Boolean realCall() throws InterruptedException {
713 threadStarted.countDown();
714 done.await();
715 return Boolean.TRUE;
716 }};
717 tasks[i] = new FutureTask(task);
718 p.execute(tasks[i]);
719 }
720 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
721 assertEquals(tasks.length, p.getTaskCount());
722 assertEquals(tasks.length - 1, q.size());
723 assertEquals(1L, p.getActiveCount());
724 assertEquals(0L, p.getCompletedTaskCount());
725 tasks[4].cancel(true);
726 tasks[3].cancel(false);
727 p.purge();
728 assertEquals(tasks.length - 3, q.size());
729 assertEquals(tasks.length - 2, p.getTaskCount());
730 p.purge(); // Nothing to do
731 assertEquals(tasks.length - 3, q.size());
732 assertEquals(tasks.length - 2, p.getTaskCount());
733 } finally {
734 done.countDown();
735 joinPool(p);
736 }
737 }
738
739 /**
740 * shutdownNow returns a list containing tasks that were not run,
741 * and those tasks are drained from the queue
742 */
743 public void testShutdownNow() throws InterruptedException {
744 final int poolSize = 2;
745 final int count = 5;
746 final AtomicInteger ran = new AtomicInteger(0);
747 ThreadPoolExecutor p =
748 new CustomTPE(poolSize, poolSize, LONG_DELAY_MS, MILLISECONDS,
749 new ArrayBlockingQueue<Runnable>(10));
750 CountDownLatch threadsStarted = new CountDownLatch(poolSize);
751 Runnable waiter = new CheckedRunnable() { public void realRun() {
752 threadsStarted.countDown();
753 try {
754 MILLISECONDS.sleep(2 * LONG_DELAY_MS);
755 } catch (InterruptedException success) {}
756 ran.getAndIncrement();
757 }};
758 for (int i = 0; i < count; i++)
759 p.execute(waiter);
760 assertTrue(threadsStarted.await(LONG_DELAY_MS, MILLISECONDS));
761 assertEquals(poolSize, p.getActiveCount());
762 assertEquals(0, p.getCompletedTaskCount());
763 final List<Runnable> queuedTasks;
764 try {
765 queuedTasks = p.shutdownNow();
766 } catch (SecurityException ok) {
767 return; // Allowed in case test doesn't have privs
768 }
769 assertTrue(p.isShutdown());
770 assertTrue(p.getQueue().isEmpty());
771 assertEquals(count - poolSize, queuedTasks.size());
772 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
773 assertTrue(p.isTerminated());
774 assertEquals(poolSize, ran.get());
775 assertEquals(poolSize, p.getCompletedTaskCount());
776 }
777
778 // Exception Tests
779
780 /**
781 * Constructor throws if corePoolSize argument is less than zero
782 */
783 public void testConstructor1() {
784 try {
785 new CustomTPE(-1, 1, 1L, SECONDS,
786 new ArrayBlockingQueue<Runnable>(10));
787 shouldThrow();
788 } catch (IllegalArgumentException success) {}
789 }
790
791 /**
792 * Constructor throws if maximumPoolSize is less than zero
793 */
794 public void testConstructor2() {
795 try {
796 new CustomTPE(1, -1, 1L, SECONDS,
797 new ArrayBlockingQueue<Runnable>(10));
798 shouldThrow();
799 } catch (IllegalArgumentException success) {}
800 }
801
802 /**
803 * Constructor throws if maximumPoolSize is equal to zero
804 */
805 public void testConstructor3() {
806 try {
807 new CustomTPE(1, 0, 1L, SECONDS,
808 new ArrayBlockingQueue<Runnable>(10));
809 shouldThrow();
810 } catch (IllegalArgumentException success) {}
811 }
812
813 /**
814 * Constructor throws if keepAliveTime is less than zero
815 */
816 public void testConstructor4() {
817 try {
818 new CustomTPE(1, 2, -1L, SECONDS,
819 new ArrayBlockingQueue<Runnable>(10));
820 shouldThrow();
821 } catch (IllegalArgumentException success) {}
822 }
823
824 /**
825 * Constructor throws if corePoolSize is greater than the maximumPoolSize
826 */
827 public void testConstructor5() {
828 try {
829 new CustomTPE(2, 1, 1L, SECONDS,
830 new ArrayBlockingQueue<Runnable>(10));
831 shouldThrow();
832 } catch (IllegalArgumentException success) {}
833 }
834
835 /**
836 * Constructor throws if workQueue is set to null
837 */
838 public void testConstructorNullPointerException() {
839 try {
840 new CustomTPE(1, 2, 1L, SECONDS, null);
841 shouldThrow();
842 } catch (NullPointerException success) {}
843 }
844
845 /**
846 * Constructor throws if corePoolSize argument is less than zero
847 */
848 public void testConstructor6() {
849 try {
850 new CustomTPE(-1, 1, 1L, SECONDS,
851 new ArrayBlockingQueue<Runnable>(10),
852 new SimpleThreadFactory());
853 shouldThrow();
854 } catch (IllegalArgumentException success) {}
855 }
856
857 /**
858 * Constructor throws if maximumPoolSize is less than zero
859 */
860 public void testConstructor7() {
861 try {
862 new CustomTPE(1,-1, 1L, SECONDS,
863 new ArrayBlockingQueue<Runnable>(10),
864 new SimpleThreadFactory());
865 shouldThrow();
866 } catch (IllegalArgumentException success) {}
867 }
868
869 /**
870 * Constructor throws if maximumPoolSize is equal to zero
871 */
872 public void testConstructor8() {
873 try {
874 new CustomTPE(1, 0, 1L, SECONDS,
875 new ArrayBlockingQueue<Runnable>(10),
876 new SimpleThreadFactory());
877 shouldThrow();
878 } catch (IllegalArgumentException success) {}
879 }
880
881 /**
882 * Constructor throws if keepAliveTime is less than zero
883 */
884 public void testConstructor9() {
885 try {
886 new CustomTPE(1, 2, -1L, SECONDS,
887 new ArrayBlockingQueue<Runnable>(10),
888 new SimpleThreadFactory());
889 shouldThrow();
890 } catch (IllegalArgumentException success) {}
891 }
892
893 /**
894 * Constructor throws if corePoolSize is greater than the maximumPoolSize
895 */
896 public void testConstructor10() {
897 try {
898 new CustomTPE(2, 1, 1L, SECONDS,
899 new ArrayBlockingQueue<Runnable>(10),
900 new SimpleThreadFactory());
901 shouldThrow();
902 } catch (IllegalArgumentException success) {}
903 }
904
905 /**
906 * Constructor throws if workQueue is set to null
907 */
908 public void testConstructorNullPointerException2() {
909 try {
910 new CustomTPE(1, 2, 1L, SECONDS, null, new SimpleThreadFactory());
911 shouldThrow();
912 } catch (NullPointerException success) {}
913 }
914
915 /**
916 * Constructor throws if threadFactory is set to null
917 */
918 public void testConstructorNullPointerException3() {
919 try {
920 new CustomTPE(1, 2, 1L, SECONDS,
921 new ArrayBlockingQueue<Runnable>(10),
922 (ThreadFactory) null);
923 shouldThrow();
924 } catch (NullPointerException success) {}
925 }
926
927 /**
928 * Constructor throws if corePoolSize argument is less than zero
929 */
930 public void testConstructor11() {
931 try {
932 new CustomTPE(-1, 1, 1L, SECONDS,
933 new ArrayBlockingQueue<Runnable>(10),
934 new NoOpREHandler());
935 shouldThrow();
936 } catch (IllegalArgumentException success) {}
937 }
938
939 /**
940 * Constructor throws if maximumPoolSize is less than zero
941 */
942 public void testConstructor12() {
943 try {
944 new CustomTPE(1, -1, 1L, SECONDS,
945 new ArrayBlockingQueue<Runnable>(10),
946 new NoOpREHandler());
947 shouldThrow();
948 } catch (IllegalArgumentException success) {}
949 }
950
951 /**
952 * Constructor throws if maximumPoolSize is equal to zero
953 */
954 public void testConstructor13() {
955 try {
956 new CustomTPE(1, 0, 1L, SECONDS,
957 new ArrayBlockingQueue<Runnable>(10),
958 new NoOpREHandler());
959 shouldThrow();
960 } catch (IllegalArgumentException success) {}
961 }
962
963 /**
964 * Constructor throws if keepAliveTime is less than zero
965 */
966 public void testConstructor14() {
967 try {
968 new CustomTPE(1, 2, -1L, SECONDS,
969 new ArrayBlockingQueue<Runnable>(10),
970 new NoOpREHandler());
971 shouldThrow();
972 } catch (IllegalArgumentException success) {}
973 }
974
975 /**
976 * Constructor throws if corePoolSize is greater than the maximumPoolSize
977 */
978 public void testConstructor15() {
979 try {
980 new CustomTPE(2, 1, 1L, SECONDS,
981 new ArrayBlockingQueue<Runnable>(10),
982 new NoOpREHandler());
983 shouldThrow();
984 } catch (IllegalArgumentException success) {}
985 }
986
987 /**
988 * Constructor throws if workQueue is set to null
989 */
990 public void testConstructorNullPointerException4() {
991 try {
992 new CustomTPE(1, 2, 1L, SECONDS,
993 null,
994 new NoOpREHandler());
995 shouldThrow();
996 } catch (NullPointerException success) {}
997 }
998
999 /**
1000 * Constructor throws if handler is set to null
1001 */
1002 public void testConstructorNullPointerException5() {
1003 try {
1004 new CustomTPE(1, 2, 1L, SECONDS,
1005 new ArrayBlockingQueue<Runnable>(10),
1006 (RejectedExecutionHandler) null);
1007 shouldThrow();
1008 } catch (NullPointerException success) {}
1009 }
1010
1011 /**
1012 * Constructor throws if corePoolSize argument is less than zero
1013 */
1014 public void testConstructor16() {
1015 try {
1016 new CustomTPE(-1, 1, 1L, SECONDS,
1017 new ArrayBlockingQueue<Runnable>(10),
1018 new SimpleThreadFactory(),
1019 new NoOpREHandler());
1020 shouldThrow();
1021 } catch (IllegalArgumentException success) {}
1022 }
1023
1024 /**
1025 * Constructor throws if maximumPoolSize is less than zero
1026 */
1027 public void testConstructor17() {
1028 try {
1029 new CustomTPE(1, -1, 1L, SECONDS,
1030 new ArrayBlockingQueue<Runnable>(10),
1031 new SimpleThreadFactory(),
1032 new NoOpREHandler());
1033 shouldThrow();
1034 } catch (IllegalArgumentException success) {}
1035 }
1036
1037 /**
1038 * Constructor throws if maximumPoolSize is equal to zero
1039 */
1040 public void testConstructor18() {
1041 try {
1042 new CustomTPE(1, 0, 1L, SECONDS,
1043 new ArrayBlockingQueue<Runnable>(10),
1044 new SimpleThreadFactory(),
1045 new NoOpREHandler());
1046 shouldThrow();
1047 } catch (IllegalArgumentException success) {}
1048 }
1049
1050 /**
1051 * Constructor throws if keepAliveTime is less than zero
1052 */
1053 public void testConstructor19() {
1054 try {
1055 new CustomTPE(1, 2, -1L, SECONDS,
1056 new ArrayBlockingQueue<Runnable>(10),
1057 new SimpleThreadFactory(),
1058 new NoOpREHandler());
1059 shouldThrow();
1060 } catch (IllegalArgumentException success) {}
1061 }
1062
1063 /**
1064 * Constructor throws if corePoolSize is greater than the maximumPoolSize
1065 */
1066 public void testConstructor20() {
1067 try {
1068 new CustomTPE(2, 1, 1L, SECONDS,
1069 new ArrayBlockingQueue<Runnable>(10),
1070 new SimpleThreadFactory(),
1071 new NoOpREHandler());
1072 shouldThrow();
1073 } catch (IllegalArgumentException success) {}
1074 }
1075
1076 /**
1077 * Constructor throws if workQueue is null
1078 */
1079 public void testConstructorNullPointerException6() {
1080 try {
1081 new CustomTPE(1, 2, 1L, SECONDS,
1082 null,
1083 new SimpleThreadFactory(),
1084 new NoOpREHandler());
1085 shouldThrow();
1086 } catch (NullPointerException success) {}
1087 }
1088
1089 /**
1090 * Constructor throws if handler is null
1091 */
1092 public void testConstructorNullPointerException7() {
1093 try {
1094 new CustomTPE(1, 2, 1L, SECONDS,
1095 new ArrayBlockingQueue<Runnable>(10),
1096 new SimpleThreadFactory(),
1097 (RejectedExecutionHandler) null);
1098 shouldThrow();
1099 } catch (NullPointerException success) {}
1100 }
1101
1102 /**
1103 * Constructor throws if ThreadFactory is null
1104 */
1105 public void testConstructorNullPointerException8() {
1106 try {
1107 new CustomTPE(1, 2, 1L, SECONDS,
1108 new ArrayBlockingQueue<Runnable>(10),
1109 (ThreadFactory) null,
1110 new NoOpREHandler());
1111 shouldThrow();
1112 } catch (NullPointerException success) {}
1113 }
1114
1115 /**
1116 * execute throws RejectedExecutionException if saturated.
1117 */
1118 public void testSaturatedExecute() {
1119 ThreadPoolExecutor p =
1120 new CustomTPE(1, 1,
1121 LONG_DELAY_MS, MILLISECONDS,
1122 new ArrayBlockingQueue<Runnable>(1));
1123 final CountDownLatch done = new CountDownLatch(1);
1124 try {
1125 Runnable task = new CheckedRunnable() {
1126 public void realRun() throws InterruptedException {
1127 done.await();
1128 }};
1129 for (int i = 0; i < 2; ++i)
1130 p.execute(task);
1131 for (int i = 0; i < 2; ++i) {
1132 try {
1133 p.execute(task);
1134 shouldThrow();
1135 } catch (RejectedExecutionException success) {}
1136 assertTrue(p.getTaskCount() <= 2);
1137 }
1138 } finally {
1139 done.countDown();
1140 joinPool(p);
1141 }
1142 }
1143
1144 /**
1145 * executor using CallerRunsPolicy runs task if saturated.
1146 */
1147 public void testSaturatedExecute2() {
1148 RejectedExecutionHandler h = new CustomTPE.CallerRunsPolicy();
1149 ThreadPoolExecutor p = new CustomTPE(1, 1,
1150 LONG_DELAY_MS, MILLISECONDS,
1151 new ArrayBlockingQueue<Runnable>(1),
1152 h);
1153 try {
1154 TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5];
1155 for (int i = 0; i < tasks.length; ++i)
1156 tasks[i] = new TrackedNoOpRunnable();
1157 TrackedLongRunnable mr = new TrackedLongRunnable();
1158 p.execute(mr);
1159 for (int i = 0; i < tasks.length; ++i)
1160 p.execute(tasks[i]);
1161 for (int i = 1; i < tasks.length; ++i)
1162 assertTrue(tasks[i].done);
1163 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1164 } finally {
1165 joinPool(p);
1166 }
1167 }
1168
1169 /**
1170 * executor using DiscardPolicy drops task if saturated.
1171 */
1172 public void testSaturatedExecute3() {
1173 RejectedExecutionHandler h = new CustomTPE.DiscardPolicy();
1174 ThreadPoolExecutor p =
1175 new CustomTPE(1, 1,
1176 LONG_DELAY_MS, MILLISECONDS,
1177 new ArrayBlockingQueue<Runnable>(1),
1178 h);
1179 try {
1180 TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5];
1181 for (int i = 0; i < tasks.length; ++i)
1182 tasks[i] = new TrackedNoOpRunnable();
1183 p.execute(new TrackedLongRunnable());
1184 for (TrackedNoOpRunnable task : tasks)
1185 p.execute(task);
1186 for (TrackedNoOpRunnable task : tasks)
1187 assertFalse(task.done);
1188 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1189 } finally {
1190 joinPool(p);
1191 }
1192 }
1193
1194 /**
1195 * executor using DiscardOldestPolicy drops oldest task if saturated.
1196 */
1197 public void testSaturatedExecute4() {
1198 RejectedExecutionHandler h = new CustomTPE.DiscardOldestPolicy();
1199 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1200 try {
1201 p.execute(new TrackedLongRunnable());
1202 TrackedLongRunnable r2 = new TrackedLongRunnable();
1203 p.execute(r2);
1204 assertTrue(p.getQueue().contains(r2));
1205 TrackedNoOpRunnable r3 = new TrackedNoOpRunnable();
1206 p.execute(r3);
1207 assertFalse(p.getQueue().contains(r2));
1208 assertTrue(p.getQueue().contains(r3));
1209 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1210 } finally {
1211 joinPool(p);
1212 }
1213 }
1214
1215 /**
1216 * execute throws RejectedExecutionException if shutdown
1217 */
1218 public void testRejectedExecutionExceptionOnShutdown() {
1219 ThreadPoolExecutor p =
1220 new CustomTPE(1,1,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(1));
1221 try { p.shutdown(); } catch (SecurityException ok) { return; }
1222 try {
1223 p.execute(new NoOpRunnable());
1224 shouldThrow();
1225 } catch (RejectedExecutionException success) {}
1226
1227 joinPool(p);
1228 }
1229
1230 /**
1231 * execute using CallerRunsPolicy drops task on shutdown
1232 */
1233 public void testCallerRunsOnShutdown() {
1234 RejectedExecutionHandler h = new CustomTPE.CallerRunsPolicy();
1235 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1236
1237 try { p.shutdown(); } catch (SecurityException ok) { return; }
1238 try {
1239 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1240 p.execute(r);
1241 assertFalse(r.done);
1242 } finally {
1243 joinPool(p);
1244 }
1245 }
1246
1247 /**
1248 * execute using DiscardPolicy drops task on shutdown
1249 */
1250 public void testDiscardOnShutdown() {
1251 RejectedExecutionHandler h = new CustomTPE.DiscardPolicy();
1252 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1253
1254 try { p.shutdown(); } catch (SecurityException ok) { return; }
1255 try {
1256 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1257 p.execute(r);
1258 assertFalse(r.done);
1259 } finally {
1260 joinPool(p);
1261 }
1262 }
1263
1264 /**
1265 * execute using DiscardOldestPolicy drops task on shutdown
1266 */
1267 public void testDiscardOldestOnShutdown() {
1268 RejectedExecutionHandler h = new CustomTPE.DiscardOldestPolicy();
1269 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1270
1271 try { p.shutdown(); } catch (SecurityException ok) { return; }
1272 try {
1273 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1274 p.execute(r);
1275 assertFalse(r.done);
1276 } finally {
1277 joinPool(p);
1278 }
1279 }
1280
1281 /**
1282 * execute(null) throws NPE
1283 */
1284 public void testExecuteNull() {
1285 ThreadPoolExecutor p =
1286 new CustomTPE(1, 2, 1L, SECONDS,
1287 new ArrayBlockingQueue<Runnable>(10));
1288 try {
1289 p.execute(null);
1290 shouldThrow();
1291 } catch (NullPointerException success) {}
1292
1293 joinPool(p);
1294 }
1295
1296 /**
1297 * setCorePoolSize of negative value throws IllegalArgumentException
1298 */
1299 public void testCorePoolSizeIllegalArgumentException() {
1300 ThreadPoolExecutor p =
1301 new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1302 try {
1303 p.setCorePoolSize(-1);
1304 shouldThrow();
1305 } catch (IllegalArgumentException success) {
1306 } finally {
1307 try { p.shutdown(); } catch (SecurityException ok) { return; }
1308 }
1309 joinPool(p);
1310 }
1311
1312 /**
1313 * setMaximumPoolSize(int) throws IllegalArgumentException
1314 * if given a value less the core pool size
1315 */
1316 public void testMaximumPoolSizeIllegalArgumentException() {
1317 ThreadPoolExecutor p =
1318 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1319 try {
1320 p.setMaximumPoolSize(1);
1321 shouldThrow();
1322 } catch (IllegalArgumentException success) {
1323 } finally {
1324 try { p.shutdown(); } catch (SecurityException ok) { return; }
1325 }
1326 joinPool(p);
1327 }
1328
1329 /**
1330 * setMaximumPoolSize throws IllegalArgumentException
1331 * if given a negative value
1332 */
1333 public void testMaximumPoolSizeIllegalArgumentException2() {
1334 ThreadPoolExecutor p =
1335 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1336 try {
1337 p.setMaximumPoolSize(-1);
1338 shouldThrow();
1339 } catch (IllegalArgumentException success) {
1340 } finally {
1341 try { p.shutdown(); } catch (SecurityException ok) { return; }
1342 }
1343 joinPool(p);
1344 }
1345
1346 /**
1347 * setKeepAliveTime throws IllegalArgumentException
1348 * when given a negative value
1349 */
1350 public void testKeepAliveTimeIllegalArgumentException() {
1351 ThreadPoolExecutor p =
1352 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1353
1354 try {
1355 p.setKeepAliveTime(-1,MILLISECONDS);
1356 shouldThrow();
1357 } catch (IllegalArgumentException success) {
1358 } finally {
1359 try { p.shutdown(); } catch (SecurityException ok) { return; }
1360 }
1361 joinPool(p);
1362 }
1363
1364 /**
1365 * terminated() is called on termination
1366 */
1367 public void testTerminated() {
1368 CustomTPE p = new CustomTPE();
1369 try { p.shutdown(); } catch (SecurityException ok) { return; }
1370 assertTrue(p.terminatedCalled());
1371 joinPool(p);
1372 }
1373
1374 /**
1375 * beforeExecute and afterExecute are called when executing task
1376 */
1377 public void testBeforeAfter() throws InterruptedException {
1378 CustomTPE p = new CustomTPE();
1379 try {
1380 final CountDownLatch done = new CountDownLatch(1);
1381 p.execute(new CheckedRunnable() {
1382 public void realRun() {
1383 done.countDown();
1384 }});
1385 await(p.afterCalled);
1386 assertEquals(0, done.getCount());
1387 assertTrue(p.afterCalled());
1388 assertTrue(p.beforeCalled());
1389 try { p.shutdown(); } catch (SecurityException ok) { return; }
1390 } finally {
1391 joinPool(p);
1392 }
1393 }
1394
1395 /**
1396 * completed submit of callable returns result
1397 */
1398 public void testSubmitCallable() throws Exception {
1399 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1400 try {
1401 Future<String> future = e.submit(new StringTask());
1402 String result = future.get();
1403 assertSame(TEST_STRING, result);
1404 } finally {
1405 joinPool(e);
1406 }
1407 }
1408
1409 /**
1410 * completed submit of runnable returns successfully
1411 */
1412 public void testSubmitRunnable() throws Exception {
1413 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1414 try {
1415 Future<?> future = e.submit(new NoOpRunnable());
1416 future.get();
1417 assertTrue(future.isDone());
1418 } finally {
1419 joinPool(e);
1420 }
1421 }
1422
1423 /**
1424 * completed submit of (runnable, result) returns result
1425 */
1426 public void testSubmitRunnable2() throws Exception {
1427 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1428 try {
1429 Future<String> future = e.submit(new NoOpRunnable(), TEST_STRING);
1430 String result = future.get();
1431 assertSame(TEST_STRING, result);
1432 } finally {
1433 joinPool(e);
1434 }
1435 }
1436
1437 /**
1438 * invokeAny(null) throws NPE
1439 */
1440 public void testInvokeAny1() throws Exception {
1441 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1442 try {
1443 e.invokeAny(null);
1444 shouldThrow();
1445 } catch (NullPointerException success) {
1446 } finally {
1447 joinPool(e);
1448 }
1449 }
1450
1451 /**
1452 * invokeAny(empty collection) throws IAE
1453 */
1454 public void testInvokeAny2() throws Exception {
1455 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1456 try {
1457 e.invokeAny(new ArrayList<Callable<String>>());
1458 shouldThrow();
1459 } catch (IllegalArgumentException success) {
1460 } finally {
1461 joinPool(e);
1462 }
1463 }
1464
1465 /**
1466 * invokeAny(c) throws NPE if c has null elements
1467 */
1468 public void testInvokeAny3() throws Exception {
1469 CountDownLatch latch = new CountDownLatch(1);
1470 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1471 List<Callable<String>> l = new ArrayList<Callable<String>>();
1472 l.add(latchAwaitingStringTask(latch));
1473 l.add(null);
1474 try {
1475 e.invokeAny(l);
1476 shouldThrow();
1477 } catch (NullPointerException success) {
1478 } finally {
1479 latch.countDown();
1480 joinPool(e);
1481 }
1482 }
1483
1484 /**
1485 * invokeAny(c) throws ExecutionException if no task completes
1486 */
1487 public void testInvokeAny4() throws Exception {
1488 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1489 List<Callable<String>> l = new ArrayList<Callable<String>>();
1490 l.add(new NPETask());
1491 try {
1492 e.invokeAny(l);
1493 shouldThrow();
1494 } catch (ExecutionException success) {
1495 assertTrue(success.getCause() instanceof NullPointerException);
1496 } finally {
1497 joinPool(e);
1498 }
1499 }
1500
1501 /**
1502 * invokeAny(c) returns result of some task
1503 */
1504 public void testInvokeAny5() throws Exception {
1505 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1506 try {
1507 List<Callable<String>> l = new ArrayList<Callable<String>>();
1508 l.add(new StringTask());
1509 l.add(new StringTask());
1510 String result = e.invokeAny(l);
1511 assertSame(TEST_STRING, result);
1512 } finally {
1513 joinPool(e);
1514 }
1515 }
1516
1517 /**
1518 * invokeAll(null) throws NPE
1519 */
1520 public void testInvokeAll1() throws Exception {
1521 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1522 try {
1523 e.invokeAll(null);
1524 shouldThrow();
1525 } catch (NullPointerException success) {
1526 } finally {
1527 joinPool(e);
1528 }
1529 }
1530
1531 /**
1532 * invokeAll(empty collection) returns empty collection
1533 */
1534 public void testInvokeAll2() throws Exception {
1535 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1536 try {
1537 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>());
1538 assertTrue(r.isEmpty());
1539 } finally {
1540 joinPool(e);
1541 }
1542 }
1543
1544 /**
1545 * invokeAll(c) throws NPE if c has null elements
1546 */
1547 public void testInvokeAll3() throws Exception {
1548 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1549 List<Callable<String>> l = new ArrayList<Callable<String>>();
1550 l.add(new StringTask());
1551 l.add(null);
1552 try {
1553 e.invokeAll(l);
1554 shouldThrow();
1555 } catch (NullPointerException success) {
1556 } finally {
1557 joinPool(e);
1558 }
1559 }
1560
1561 /**
1562 * get of element of invokeAll(c) throws exception on failed task
1563 */
1564 public void testInvokeAll4() throws Exception {
1565 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1566 List<Callable<String>> l = new ArrayList<Callable<String>>();
1567 l.add(new NPETask());
1568 List<Future<String>> futures = e.invokeAll(l);
1569 assertEquals(1, futures.size());
1570 try {
1571 futures.get(0).get();
1572 shouldThrow();
1573 } catch (ExecutionException success) {
1574 assertTrue(success.getCause() instanceof NullPointerException);
1575 } finally {
1576 joinPool(e);
1577 }
1578 }
1579
1580 /**
1581 * invokeAll(c) returns results of all completed tasks
1582 */
1583 public void testInvokeAll5() throws Exception {
1584 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1585 try {
1586 List<Callable<String>> l = new ArrayList<Callable<String>>();
1587 l.add(new StringTask());
1588 l.add(new StringTask());
1589 List<Future<String>> futures = e.invokeAll(l);
1590 assertEquals(2, futures.size());
1591 for (Future<String> future : futures)
1592 assertSame(TEST_STRING, future.get());
1593 } finally {
1594 joinPool(e);
1595 }
1596 }
1597
1598 /**
1599 * timed invokeAny(null) throws NPE
1600 */
1601 public void testTimedInvokeAny1() throws Exception {
1602 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1603 try {
1604 e.invokeAny(null, MEDIUM_DELAY_MS, MILLISECONDS);
1605 shouldThrow();
1606 } catch (NullPointerException success) {
1607 } finally {
1608 joinPool(e);
1609 }
1610 }
1611
1612 /**
1613 * timed invokeAny(,,null) throws NPE
1614 */
1615 public void testTimedInvokeAnyNullTimeUnit() throws Exception {
1616 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1617 List<Callable<String>> l = new ArrayList<Callable<String>>();
1618 l.add(new StringTask());
1619 try {
1620 e.invokeAny(l, MEDIUM_DELAY_MS, null);
1621 shouldThrow();
1622 } catch (NullPointerException success) {
1623 } finally {
1624 joinPool(e);
1625 }
1626 }
1627
1628 /**
1629 * timed invokeAny(empty collection) throws IAE
1630 */
1631 public void testTimedInvokeAny2() throws Exception {
1632 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1633 try {
1634 e.invokeAny(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1635 shouldThrow();
1636 } catch (IllegalArgumentException success) {
1637 } finally {
1638 joinPool(e);
1639 }
1640 }
1641
1642 /**
1643 * timed invokeAny(c) throws NPE if c has null elements
1644 */
1645 public void testTimedInvokeAny3() throws Exception {
1646 CountDownLatch latch = new CountDownLatch(1);
1647 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1648 List<Callable<String>> l = new ArrayList<Callable<String>>();
1649 l.add(latchAwaitingStringTask(latch));
1650 l.add(null);
1651 try {
1652 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1653 shouldThrow();
1654 } catch (NullPointerException success) {
1655 } finally {
1656 latch.countDown();
1657 joinPool(e);
1658 }
1659 }
1660
1661 /**
1662 * timed invokeAny(c) throws ExecutionException if no task completes
1663 */
1664 public void testTimedInvokeAny4() throws Exception {
1665 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1666 List<Callable<String>> l = new ArrayList<Callable<String>>();
1667 l.add(new NPETask());
1668 try {
1669 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1670 shouldThrow();
1671 } catch (ExecutionException success) {
1672 assertTrue(success.getCause() instanceof NullPointerException);
1673 } finally {
1674 joinPool(e);
1675 }
1676 }
1677
1678 /**
1679 * timed invokeAny(c) returns result of some task
1680 */
1681 public void testTimedInvokeAny5() throws Exception {
1682 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1683 try {
1684 List<Callable<String>> l = new ArrayList<Callable<String>>();
1685 l.add(new StringTask());
1686 l.add(new StringTask());
1687 String result = e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1688 assertSame(TEST_STRING, result);
1689 } finally {
1690 joinPool(e);
1691 }
1692 }
1693
1694 /**
1695 * timed invokeAll(null) throws NPE
1696 */
1697 public void testTimedInvokeAll1() throws Exception {
1698 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1699 try {
1700 e.invokeAll(null, MEDIUM_DELAY_MS, MILLISECONDS);
1701 shouldThrow();
1702 } catch (NullPointerException success) {
1703 } finally {
1704 joinPool(e);
1705 }
1706 }
1707
1708 /**
1709 * timed invokeAll(,,null) throws NPE
1710 */
1711 public void testTimedInvokeAllNullTimeUnit() throws Exception {
1712 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1713 List<Callable<String>> l = new ArrayList<Callable<String>>();
1714 l.add(new StringTask());
1715 try {
1716 e.invokeAll(l, MEDIUM_DELAY_MS, null);
1717 shouldThrow();
1718 } catch (NullPointerException success) {
1719 } finally {
1720 joinPool(e);
1721 }
1722 }
1723
1724 /**
1725 * timed invokeAll(empty collection) returns empty collection
1726 */
1727 public void testTimedInvokeAll2() throws Exception {
1728 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1729 try {
1730 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1731 assertTrue(r.isEmpty());
1732 } finally {
1733 joinPool(e);
1734 }
1735 }
1736
1737 /**
1738 * timed invokeAll(c) throws NPE if c has null elements
1739 */
1740 public void testTimedInvokeAll3() throws Exception {
1741 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1742 List<Callable<String>> l = new ArrayList<Callable<String>>();
1743 l.add(new StringTask());
1744 l.add(null);
1745 try {
1746 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1747 shouldThrow();
1748 } catch (NullPointerException success) {
1749 } finally {
1750 joinPool(e);
1751 }
1752 }
1753
1754 /**
1755 * get of element of invokeAll(c) throws exception on failed task
1756 */
1757 public void testTimedInvokeAll4() throws Exception {
1758 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1759 List<Callable<String>> l = new ArrayList<Callable<String>>();
1760 l.add(new NPETask());
1761 List<Future<String>> futures =
1762 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1763 assertEquals(1, futures.size());
1764 try {
1765 futures.get(0).get();
1766 shouldThrow();
1767 } catch (ExecutionException success) {
1768 assertTrue(success.getCause() instanceof NullPointerException);
1769 } finally {
1770 joinPool(e);
1771 }
1772 }
1773
1774 /**
1775 * timed invokeAll(c) returns results of all completed tasks
1776 */
1777 public void testTimedInvokeAll5() throws Exception {
1778 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1779 try {
1780 List<Callable<String>> l = new ArrayList<Callable<String>>();
1781 l.add(new StringTask());
1782 l.add(new StringTask());
1783 List<Future<String>> futures =
1784 e.invokeAll(l, LONG_DELAY_MS, MILLISECONDS);
1785 assertEquals(2, futures.size());
1786 for (Future<String> future : futures)
1787 assertSame(TEST_STRING, future.get());
1788 } finally {
1789 joinPool(e);
1790 }
1791 }
1792
1793 /**
1794 * timed invokeAll(c) cancels tasks not completed by timeout
1795 */
1796 public void testTimedInvokeAll6() throws Exception {
1797 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1798 try {
1799 for (long timeout = timeoutMillis();;) {
1800 List<Callable<String>> tasks = new ArrayList<>();
1801 tasks.add(new StringTask("0"));
1802 tasks.add(Executors.callable(new LongPossiblyInterruptedRunnable(), TEST_STRING));
1803 tasks.add(new StringTask("2"));
1804 long startTime = System.nanoTime();
1805 List<Future<String>> futures =
1806 e.invokeAll(tasks, timeout, MILLISECONDS);
1807 assertEquals(tasks.size(), futures.size());
1808 assertTrue(millisElapsedSince(startTime) >= timeout);
1809 for (Future future : futures)
1810 assertTrue(future.isDone());
1811 assertTrue(futures.get(1).isCancelled());
1812 try {
1813 assertEquals("0", futures.get(0).get());
1814 assertEquals("2", futures.get(2).get());
1815 break;
1816 } catch (CancellationException retryWithLongerTimeout) {
1817 timeout *= 2;
1818 if (timeout >= LONG_DELAY_MS / 2)
1819 fail("expected exactly one task to be cancelled");
1820 }
1821 }
1822 } finally {
1823 joinPool(e);
1824 }
1825 }
1826
1827 /**
1828 * Execution continues if there is at least one thread even if
1829 * thread factory fails to create more
1830 */
1831 public void testFailingThreadFactory() throws InterruptedException {
1832 final ExecutorService e =
1833 new CustomTPE(100, 100,
1834 LONG_DELAY_MS, MILLISECONDS,
1835 new LinkedBlockingQueue<Runnable>(),
1836 new FailingThreadFactory());
1837 try {
1838 final int TASKS = 100;
1839 final CountDownLatch done = new CountDownLatch(TASKS);
1840 for (int k = 0; k < TASKS; ++k)
1841 e.execute(new CheckedRunnable() {
1842 public void realRun() {
1843 done.countDown();
1844 }});
1845 assertTrue(done.await(LONG_DELAY_MS, MILLISECONDS));
1846 } finally {
1847 joinPool(e);
1848 }
1849 }
1850
1851 /**
1852 * allowsCoreThreadTimeOut is by default false.
1853 */
1854 public void testAllowsCoreThreadTimeOut() {
1855 ThreadPoolExecutor p = new CustomTPE(2, 2, 1000, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1856 assertFalse(p.allowsCoreThreadTimeOut());
1857 joinPool(p);
1858 }
1859
1860 /**
1861 * allowCoreThreadTimeOut(true) causes idle threads to time out
1862 */
1863 public void testAllowCoreThreadTimeOut_true() throws Exception {
1864 long keepAliveTime = timeoutMillis();
1865 final ThreadPoolExecutor p =
1866 new CustomTPE(2, 10,
1867 keepAliveTime, MILLISECONDS,
1868 new ArrayBlockingQueue<Runnable>(10));
1869 final CountDownLatch threadStarted = new CountDownLatch(1);
1870 try {
1871 p.allowCoreThreadTimeOut(true);
1872 p.execute(new CheckedRunnable() {
1873 public void realRun() {
1874 threadStarted.countDown();
1875 assertEquals(1, p.getPoolSize());
1876 }});
1877 await(threadStarted);
1878 delay(keepAliveTime);
1879 long startTime = System.nanoTime();
1880 while (p.getPoolSize() > 0
1881 && millisElapsedSince(startTime) < LONG_DELAY_MS)
1882 Thread.yield();
1883 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1884 assertEquals(0, p.getPoolSize());
1885 } finally {
1886 joinPool(p);
1887 }
1888 }
1889
1890 /**
1891 * allowCoreThreadTimeOut(false) causes idle threads not to time out
1892 */
1893 public void testAllowCoreThreadTimeOut_false() throws Exception {
1894 long keepAliveTime = timeoutMillis();
1895 final ThreadPoolExecutor p =
1896 new CustomTPE(2, 10,
1897 keepAliveTime, MILLISECONDS,
1898 new ArrayBlockingQueue<Runnable>(10));
1899 final CountDownLatch threadStarted = new CountDownLatch(1);
1900 try {
1901 p.allowCoreThreadTimeOut(false);
1902 p.execute(new CheckedRunnable() {
1903 public void realRun() throws InterruptedException {
1904 threadStarted.countDown();
1905 assertTrue(p.getPoolSize() >= 1);
1906 }});
1907 delay(2 * keepAliveTime);
1908 assertTrue(p.getPoolSize() >= 1);
1909 } finally {
1910 joinPool(p);
1911 }
1912 }
1913
1914 /**
1915 * get(cancelled task) throws CancellationException
1916 * (in part, a test of CustomTPE itself)
1917 */
1918 public void testGet_cancelled() throws Exception {
1919 final ExecutorService e =
1920 new CustomTPE(1, 1,
1921 LONG_DELAY_MS, MILLISECONDS,
1922 new LinkedBlockingQueue<Runnable>());
1923 try {
1924 final CountDownLatch blockerStarted = new CountDownLatch(1);
1925 final CountDownLatch done = new CountDownLatch(1);
1926 final List<Future<?>> futures = new ArrayList<>();
1927 for (int i = 0; i < 2; i++) {
1928 Runnable r = new CheckedRunnable() { public void realRun()
1929 throws Throwable {
1930 blockerStarted.countDown();
1931 assertTrue(done.await(2 * LONG_DELAY_MS, MILLISECONDS));
1932 }};
1933 futures.add(e.submit(r));
1934 }
1935 assertTrue(blockerStarted.await(LONG_DELAY_MS, MILLISECONDS));
1936 for (Future<?> future : futures) future.cancel(false);
1937 for (Future<?> future : futures) {
1938 try {
1939 future.get();
1940 shouldThrow();
1941 } catch (CancellationException success) {}
1942 try {
1943 future.get(LONG_DELAY_MS, MILLISECONDS);
1944 shouldThrow();
1945 } catch (CancellationException success) {}
1946 assertTrue(future.isCancelled());
1947 assertTrue(future.isDone());
1948 }
1949 done.countDown();
1950 } finally {
1951 joinPool(e);
1952 }
1953 }
1954
1955 }