ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/ThreadPoolExecutorSubclassTest.java
Revision: 1.55
Committed: Sun Oct 4 01:29:09 2015 UTC (8 years, 7 months ago) by jsr166
Branch: MAIN
Changes since 1.54: +7 -3 lines
Log Message:
improve testGetCorePoolSize

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include Andrew Wright, Jeffrey Hayes,
6 * Pat Fisher, Mike Judd.
7 */
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.SECONDS;
11
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.concurrent.ArrayBlockingQueue;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.CancellationException;
18 import java.util.concurrent.CountDownLatch;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.FutureTask;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.RejectedExecutionHandler;
27 import java.util.concurrent.RunnableFuture;
28 import java.util.concurrent.SynchronousQueue;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.TimeoutException;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.locks.Condition;
35 import java.util.concurrent.locks.ReentrantLock;
36
37 import junit.framework.Test;
38 import junit.framework.TestSuite;
39
40 public class ThreadPoolExecutorSubclassTest extends JSR166TestCase {
41 public static void main(String[] args) {
42 main(suite(), args);
43 }
44 public static Test suite() {
45 return new TestSuite(ThreadPoolExecutorSubclassTest.class);
46 }
47
48 static class CustomTask<V> implements RunnableFuture<V> {
49 final Callable<V> callable;
50 final ReentrantLock lock = new ReentrantLock();
51 final Condition cond = lock.newCondition();
52 boolean done;
53 boolean cancelled;
54 V result;
55 Thread thread;
56 Exception exception;
57 CustomTask(Callable<V> c) {
58 if (c == null) throw new NullPointerException();
59 callable = c;
60 }
61 CustomTask(final Runnable r, final V res) {
62 if (r == null) throw new NullPointerException();
63 callable = new Callable<V>() {
64 public V call() throws Exception { r.run(); return res; }};
65 }
66 public boolean isDone() {
67 lock.lock(); try { return done; } finally { lock.unlock() ; }
68 }
69 public boolean isCancelled() {
70 lock.lock(); try { return cancelled; } finally { lock.unlock() ; }
71 }
72 public boolean cancel(boolean mayInterrupt) {
73 lock.lock();
74 try {
75 if (!done) {
76 cancelled = true;
77 done = true;
78 if (mayInterrupt && thread != null)
79 thread.interrupt();
80 return true;
81 }
82 return false;
83 }
84 finally { lock.unlock() ; }
85 }
86 public void run() {
87 lock.lock();
88 try {
89 if (done)
90 return;
91 thread = Thread.currentThread();
92 }
93 finally { lock.unlock() ; }
94 V v = null;
95 Exception e = null;
96 try {
97 v = callable.call();
98 }
99 catch (Exception ex) {
100 e = ex;
101 }
102 lock.lock();
103 try {
104 if (!done) {
105 result = v;
106 exception = e;
107 done = true;
108 thread = null;
109 cond.signalAll();
110 }
111 }
112 finally { lock.unlock(); }
113 }
114 public V get() throws InterruptedException, ExecutionException {
115 lock.lock();
116 try {
117 while (!done)
118 cond.await();
119 if (cancelled)
120 throw new CancellationException();
121 if (exception != null)
122 throw new ExecutionException(exception);
123 return result;
124 }
125 finally { lock.unlock(); }
126 }
127 public V get(long timeout, TimeUnit unit)
128 throws InterruptedException, ExecutionException, TimeoutException {
129 long nanos = unit.toNanos(timeout);
130 lock.lock();
131 try {
132 while (!done) {
133 if (nanos <= 0L)
134 throw new TimeoutException();
135 nanos = cond.awaitNanos(nanos);
136 }
137 if (cancelled)
138 throw new CancellationException();
139 if (exception != null)
140 throw new ExecutionException(exception);
141 return result;
142 }
143 finally { lock.unlock(); }
144 }
145 }
146
147 static class CustomTPE extends ThreadPoolExecutor {
148 protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
149 return new CustomTask<V>(c);
150 }
151 protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
152 return new CustomTask<V>(r, v);
153 }
154
155 CustomTPE(int corePoolSize,
156 int maximumPoolSize,
157 long keepAliveTime,
158 TimeUnit unit,
159 BlockingQueue<Runnable> workQueue) {
160 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
161 workQueue);
162 }
163 CustomTPE(int corePoolSize,
164 int maximumPoolSize,
165 long keepAliveTime,
166 TimeUnit unit,
167 BlockingQueue<Runnable> workQueue,
168 ThreadFactory threadFactory) {
169 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
170 threadFactory);
171 }
172
173 CustomTPE(int corePoolSize,
174 int maximumPoolSize,
175 long keepAliveTime,
176 TimeUnit unit,
177 BlockingQueue<Runnable> workQueue,
178 RejectedExecutionHandler handler) {
179 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
180 handler);
181 }
182 CustomTPE(int corePoolSize,
183 int maximumPoolSize,
184 long keepAliveTime,
185 TimeUnit unit,
186 BlockingQueue<Runnable> workQueue,
187 ThreadFactory threadFactory,
188 RejectedExecutionHandler handler) {
189 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
190 workQueue, threadFactory, handler);
191 }
192
193 final CountDownLatch beforeCalled = new CountDownLatch(1);
194 final CountDownLatch afterCalled = new CountDownLatch(1);
195 final CountDownLatch terminatedCalled = new CountDownLatch(1);
196
197 public CustomTPE() {
198 super(1, 1, LONG_DELAY_MS, MILLISECONDS, new SynchronousQueue<Runnable>());
199 }
200 protected void beforeExecute(Thread t, Runnable r) {
201 beforeCalled.countDown();
202 }
203 protected void afterExecute(Runnable r, Throwable t) {
204 afterCalled.countDown();
205 }
206 protected void terminated() {
207 terminatedCalled.countDown();
208 }
209
210 public boolean beforeCalled() {
211 return beforeCalled.getCount() == 0;
212 }
213 public boolean afterCalled() {
214 return afterCalled.getCount() == 0;
215 }
216 public boolean terminatedCalled() {
217 return terminatedCalled.getCount() == 0;
218 }
219 }
220
221 static class FailingThreadFactory implements ThreadFactory {
222 int calls = 0;
223 public Thread newThread(Runnable r) {
224 if (++calls > 1) return null;
225 return new Thread(r);
226 }
227 }
228
229 /**
230 * execute successfully executes a runnable
231 */
232 public void testExecute() throws InterruptedException {
233 final ThreadPoolExecutor p =
234 new CustomTPE(1, 1,
235 2 * LONG_DELAY_MS, MILLISECONDS,
236 new ArrayBlockingQueue<Runnable>(10));
237 try (PoolCleaner cleaner = cleaner(p)) {
238 final CountDownLatch done = new CountDownLatch(1);
239 final Runnable task = new CheckedRunnable() {
240 public void realRun() { done.countDown(); }};
241 p.execute(task);
242 assertTrue(done.await(LONG_DELAY_MS, MILLISECONDS));
243 }
244 }
245
246 /**
247 * getActiveCount increases but doesn't overestimate, when a
248 * thread becomes active
249 */
250 public void testGetActiveCount() throws InterruptedException {
251 final ThreadPoolExecutor p =
252 new CustomTPE(2, 2,
253 LONG_DELAY_MS, MILLISECONDS,
254 new ArrayBlockingQueue<Runnable>(10));
255 final CountDownLatch threadStarted = new CountDownLatch(1);
256 final CountDownLatch done = new CountDownLatch(1);
257 try (PoolCleaner cleaner = cleaner(p)) {
258 assertEquals(0, p.getActiveCount());
259 p.execute(new CheckedRunnable() {
260 public void realRun() throws InterruptedException {
261 threadStarted.countDown();
262 assertEquals(1, p.getActiveCount());
263 done.await();
264 }});
265 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
266 assertEquals(1, p.getActiveCount());
267 done.countDown();
268 }
269 }
270
271 /**
272 * prestartCoreThread starts a thread if under corePoolSize, else doesn't
273 */
274 public void testPrestartCoreThread() {
275 ThreadPoolExecutor p =
276 new CustomTPE(2, 6,
277 LONG_DELAY_MS, MILLISECONDS,
278 new ArrayBlockingQueue<Runnable>(10));
279 try (PoolCleaner cleaner = cleaner(p)) {
280 assertEquals(0, p.getPoolSize());
281 assertTrue(p.prestartCoreThread());
282 assertEquals(1, p.getPoolSize());
283 assertTrue(p.prestartCoreThread());
284 assertEquals(2, p.getPoolSize());
285 assertFalse(p.prestartCoreThread());
286 assertEquals(2, p.getPoolSize());
287 p.setCorePoolSize(4);
288 assertTrue(p.prestartCoreThread());
289 assertEquals(3, p.getPoolSize());
290 assertTrue(p.prestartCoreThread());
291 assertEquals(4, p.getPoolSize());
292 assertFalse(p.prestartCoreThread());
293 assertEquals(4, p.getPoolSize());
294 }
295 }
296
297 /**
298 * prestartAllCoreThreads starts all corePoolSize threads
299 */
300 public void testPrestartAllCoreThreads() {
301 ThreadPoolExecutor p =
302 new CustomTPE(2, 6,
303 LONG_DELAY_MS, MILLISECONDS,
304 new ArrayBlockingQueue<Runnable>(10));
305 try (PoolCleaner cleaner = cleaner(p)) {
306 assertEquals(0, p.getPoolSize());
307 p.prestartAllCoreThreads();
308 assertEquals(2, p.getPoolSize());
309 p.prestartAllCoreThreads();
310 assertEquals(2, p.getPoolSize());
311 p.setCorePoolSize(4);
312 p.prestartAllCoreThreads();
313 assertEquals(4, p.getPoolSize());
314 p.prestartAllCoreThreads();
315 assertEquals(4, p.getPoolSize());
316 }
317 }
318
319 /**
320 * getCompletedTaskCount increases, but doesn't overestimate,
321 * when tasks complete
322 */
323 public void testGetCompletedTaskCount() throws InterruptedException {
324 final ThreadPoolExecutor p =
325 new CustomTPE(2, 2,
326 LONG_DELAY_MS, MILLISECONDS,
327 new ArrayBlockingQueue<Runnable>(10));
328 try (PoolCleaner cleaner = cleaner(p)) {
329 final CountDownLatch threadStarted = new CountDownLatch(1);
330 final CountDownLatch threadProceed = new CountDownLatch(1);
331 final CountDownLatch threadDone = new CountDownLatch(1);
332 assertEquals(0, p.getCompletedTaskCount());
333 p.execute(new CheckedRunnable() {
334 public void realRun() throws InterruptedException {
335 threadStarted.countDown();
336 assertEquals(0, p.getCompletedTaskCount());
337 threadProceed.await();
338 threadDone.countDown();
339 }});
340 await(threadStarted);
341 assertEquals(0, p.getCompletedTaskCount());
342 threadProceed.countDown();
343 threadDone.await();
344 long startTime = System.nanoTime();
345 while (p.getCompletedTaskCount() != 1) {
346 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
347 fail("timed out");
348 Thread.yield();
349 }
350 }
351 }
352
353 /**
354 * getCorePoolSize returns size given in constructor if not otherwise set
355 */
356 public void testGetCorePoolSize() {
357 ThreadPoolExecutor p =
358 new CustomTPE(1, 1,
359 LONG_DELAY_MS, MILLISECONDS,
360 new ArrayBlockingQueue<Runnable>(10));
361 try (PoolCleaner cleaner = cleaner(p)) {
362 assertEquals(1, p.getCorePoolSize());
363 }
364 }
365
366 /**
367 * getKeepAliveTime returns value given in constructor if not otherwise set
368 */
369 public void testGetKeepAliveTime() {
370 ThreadPoolExecutor p = new CustomTPE(2, 2, 1000, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
371 assertEquals(1, p.getKeepAliveTime(SECONDS));
372 joinPool(p);
373 }
374
375 /**
376 * getThreadFactory returns factory in constructor if not set
377 */
378 public void testGetThreadFactory() {
379 ThreadFactory tf = new SimpleThreadFactory();
380 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), tf, new NoOpREHandler());
381 assertSame(tf, p.getThreadFactory());
382 joinPool(p);
383 }
384
385 /**
386 * setThreadFactory sets the thread factory returned by getThreadFactory
387 */
388 public void testSetThreadFactory() {
389 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
390 ThreadFactory tf = new SimpleThreadFactory();
391 p.setThreadFactory(tf);
392 assertSame(tf, p.getThreadFactory());
393 joinPool(p);
394 }
395
396 /**
397 * setThreadFactory(null) throws NPE
398 */
399 public void testSetThreadFactoryNull() {
400 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
401 try {
402 p.setThreadFactory(null);
403 shouldThrow();
404 } catch (NullPointerException success) {
405 } finally {
406 joinPool(p);
407 }
408 }
409
410 /**
411 * getRejectedExecutionHandler returns handler in constructor if not set
412 */
413 public void testGetRejectedExecutionHandler() {
414 RejectedExecutionHandler h = new NoOpREHandler();
415 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), h);
416 assertSame(h, p.getRejectedExecutionHandler());
417 joinPool(p);
418 }
419
420 /**
421 * setRejectedExecutionHandler sets the handler returned by
422 * getRejectedExecutionHandler
423 */
424 public void testSetRejectedExecutionHandler() {
425 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
426 RejectedExecutionHandler h = new NoOpREHandler();
427 p.setRejectedExecutionHandler(h);
428 assertSame(h, p.getRejectedExecutionHandler());
429 joinPool(p);
430 }
431
432 /**
433 * setRejectedExecutionHandler(null) throws NPE
434 */
435 public void testSetRejectedExecutionHandlerNull() {
436 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
437 try {
438 p.setRejectedExecutionHandler(null);
439 shouldThrow();
440 } catch (NullPointerException success) {
441 } finally {
442 joinPool(p);
443 }
444 }
445
446 /**
447 * getLargestPoolSize increases, but doesn't overestimate, when
448 * multiple threads active
449 */
450 public void testGetLargestPoolSize() throws InterruptedException {
451 final int THREADS = 3;
452 final ThreadPoolExecutor p =
453 new CustomTPE(THREADS, THREADS,
454 LONG_DELAY_MS, MILLISECONDS,
455 new ArrayBlockingQueue<Runnable>(10));
456 final CountDownLatch threadsStarted = new CountDownLatch(THREADS);
457 final CountDownLatch done = new CountDownLatch(1);
458 try {
459 assertEquals(0, p.getLargestPoolSize());
460 for (int i = 0; i < THREADS; i++)
461 p.execute(new CheckedRunnable() {
462 public void realRun() throws InterruptedException {
463 threadsStarted.countDown();
464 done.await();
465 assertEquals(THREADS, p.getLargestPoolSize());
466 }});
467 assertTrue(threadsStarted.await(SMALL_DELAY_MS, MILLISECONDS));
468 assertEquals(THREADS, p.getLargestPoolSize());
469 } finally {
470 done.countDown();
471 joinPool(p);
472 assertEquals(THREADS, p.getLargestPoolSize());
473 }
474 }
475
476 /**
477 * getMaximumPoolSize returns value given in constructor if not
478 * otherwise set
479 */
480 public void testGetMaximumPoolSize() {
481 ThreadPoolExecutor p = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
482 assertEquals(2, p.getMaximumPoolSize());
483 joinPool(p);
484 }
485
486 /**
487 * getPoolSize increases, but doesn't overestimate, when threads
488 * become active
489 */
490 public void testGetPoolSize() throws InterruptedException {
491 final ThreadPoolExecutor p =
492 new CustomTPE(1, 1,
493 LONG_DELAY_MS, MILLISECONDS,
494 new ArrayBlockingQueue<Runnable>(10));
495 final CountDownLatch threadStarted = new CountDownLatch(1);
496 final CountDownLatch done = new CountDownLatch(1);
497 try {
498 assertEquals(0, p.getPoolSize());
499 p.execute(new CheckedRunnable() {
500 public void realRun() throws InterruptedException {
501 threadStarted.countDown();
502 assertEquals(1, p.getPoolSize());
503 done.await();
504 }});
505 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
506 assertEquals(1, p.getPoolSize());
507 } finally {
508 done.countDown();
509 joinPool(p);
510 }
511 }
512
513 /**
514 * getTaskCount increases, but doesn't overestimate, when tasks submitted
515 */
516 public void testGetTaskCount() throws InterruptedException {
517 final ThreadPoolExecutor p =
518 new CustomTPE(1, 1,
519 LONG_DELAY_MS, MILLISECONDS,
520 new ArrayBlockingQueue<Runnable>(10));
521 final CountDownLatch threadStarted = new CountDownLatch(1);
522 final CountDownLatch done = new CountDownLatch(1);
523 try {
524 assertEquals(0, p.getTaskCount());
525 p.execute(new CheckedRunnable() {
526 public void realRun() throws InterruptedException {
527 threadStarted.countDown();
528 assertEquals(1, p.getTaskCount());
529 done.await();
530 }});
531 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
532 assertEquals(1, p.getTaskCount());
533 } finally {
534 done.countDown();
535 joinPool(p);
536 }
537 }
538
539 /**
540 * isShutdown is false before shutdown, true after
541 */
542 public void testIsShutdown() {
543
544 ThreadPoolExecutor p = new CustomTPE(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
545 assertFalse(p.isShutdown());
546 try { p.shutdown(); } catch (SecurityException ok) { return; }
547 assertTrue(p.isShutdown());
548 joinPool(p);
549 }
550
551 /**
552 * isTerminated is false before termination, true after
553 */
554 public void testIsTerminated() throws InterruptedException {
555 final ThreadPoolExecutor p =
556 new CustomTPE(1, 1,
557 LONG_DELAY_MS, MILLISECONDS,
558 new ArrayBlockingQueue<Runnable>(10));
559 final CountDownLatch threadStarted = new CountDownLatch(1);
560 final CountDownLatch done = new CountDownLatch(1);
561 try {
562 assertFalse(p.isTerminating());
563 p.execute(new CheckedRunnable() {
564 public void realRun() throws InterruptedException {
565 assertFalse(p.isTerminating());
566 threadStarted.countDown();
567 done.await();
568 }});
569 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
570 assertFalse(p.isTerminating());
571 done.countDown();
572 } finally {
573 try { p.shutdown(); } catch (SecurityException ok) { return; }
574 }
575 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
576 assertTrue(p.isTerminated());
577 assertFalse(p.isTerminating());
578 }
579
580 /**
581 * isTerminating is not true when running or when terminated
582 */
583 public void testIsTerminating() throws InterruptedException {
584 final ThreadPoolExecutor p =
585 new CustomTPE(1, 1,
586 LONG_DELAY_MS, MILLISECONDS,
587 new ArrayBlockingQueue<Runnable>(10));
588 final CountDownLatch threadStarted = new CountDownLatch(1);
589 final CountDownLatch done = new CountDownLatch(1);
590 try {
591 assertFalse(p.isTerminating());
592 p.execute(new CheckedRunnable() {
593 public void realRun() throws InterruptedException {
594 assertFalse(p.isTerminating());
595 threadStarted.countDown();
596 done.await();
597 }});
598 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
599 assertFalse(p.isTerminating());
600 done.countDown();
601 } finally {
602 try { p.shutdown(); } catch (SecurityException ok) { return; }
603 }
604 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
605 assertTrue(p.isTerminated());
606 assertFalse(p.isTerminating());
607 }
608
609 /**
610 * getQueue returns the work queue, which contains queued tasks
611 */
612 public void testGetQueue() throws InterruptedException {
613 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
614 final ThreadPoolExecutor p =
615 new CustomTPE(1, 1,
616 LONG_DELAY_MS, MILLISECONDS,
617 q);
618 final CountDownLatch threadStarted = new CountDownLatch(1);
619 final CountDownLatch done = new CountDownLatch(1);
620 try {
621 FutureTask[] tasks = new FutureTask[5];
622 for (int i = 0; i < tasks.length; i++) {
623 Callable task = new CheckedCallable<Boolean>() {
624 public Boolean realCall() throws InterruptedException {
625 threadStarted.countDown();
626 assertSame(q, p.getQueue());
627 done.await();
628 return Boolean.TRUE;
629 }};
630 tasks[i] = new FutureTask(task);
631 p.execute(tasks[i]);
632 }
633 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
634 assertSame(q, p.getQueue());
635 assertFalse(q.contains(tasks[0]));
636 assertTrue(q.contains(tasks[tasks.length - 1]));
637 assertEquals(tasks.length - 1, q.size());
638 } finally {
639 done.countDown();
640 joinPool(p);
641 }
642 }
643
644 /**
645 * remove(task) removes queued task, and fails to remove active task
646 */
647 public void testRemove() throws InterruptedException {
648 BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
649 final ThreadPoolExecutor p =
650 new CustomTPE(1, 1,
651 LONG_DELAY_MS, MILLISECONDS,
652 q);
653 Runnable[] tasks = new Runnable[6];
654 final CountDownLatch threadStarted = new CountDownLatch(1);
655 final CountDownLatch done = new CountDownLatch(1);
656 try {
657 for (int i = 0; i < tasks.length; i++) {
658 tasks[i] = new CheckedRunnable() {
659 public void realRun() throws InterruptedException {
660 threadStarted.countDown();
661 done.await();
662 }};
663 p.execute(tasks[i]);
664 }
665 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
666 assertFalse(p.remove(tasks[0]));
667 assertTrue(q.contains(tasks[4]));
668 assertTrue(q.contains(tasks[3]));
669 assertTrue(p.remove(tasks[4]));
670 assertFalse(p.remove(tasks[4]));
671 assertFalse(q.contains(tasks[4]));
672 assertTrue(q.contains(tasks[3]));
673 assertTrue(p.remove(tasks[3]));
674 assertFalse(q.contains(tasks[3]));
675 } finally {
676 done.countDown();
677 joinPool(p);
678 }
679 }
680
681 /**
682 * purge removes cancelled tasks from the queue
683 */
684 public void testPurge() throws InterruptedException {
685 final CountDownLatch threadStarted = new CountDownLatch(1);
686 final CountDownLatch done = new CountDownLatch(1);
687 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
688 final ThreadPoolExecutor p =
689 new CustomTPE(1, 1,
690 LONG_DELAY_MS, MILLISECONDS,
691 q);
692 FutureTask[] tasks = new FutureTask[5];
693 try {
694 for (int i = 0; i < tasks.length; i++) {
695 Callable task = new CheckedCallable<Boolean>() {
696 public Boolean realCall() throws InterruptedException {
697 threadStarted.countDown();
698 done.await();
699 return Boolean.TRUE;
700 }};
701 tasks[i] = new FutureTask(task);
702 p.execute(tasks[i]);
703 }
704 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
705 assertEquals(tasks.length, p.getTaskCount());
706 assertEquals(tasks.length - 1, q.size());
707 assertEquals(1L, p.getActiveCount());
708 assertEquals(0L, p.getCompletedTaskCount());
709 tasks[4].cancel(true);
710 tasks[3].cancel(false);
711 p.purge();
712 assertEquals(tasks.length - 3, q.size());
713 assertEquals(tasks.length - 2, p.getTaskCount());
714 p.purge(); // Nothing to do
715 assertEquals(tasks.length - 3, q.size());
716 assertEquals(tasks.length - 2, p.getTaskCount());
717 } finally {
718 done.countDown();
719 joinPool(p);
720 }
721 }
722
723 /**
724 * shutdownNow returns a list containing tasks that were not run,
725 * and those tasks are drained from the queue
726 */
727 public void testShutdownNow() throws InterruptedException {
728 final int poolSize = 2;
729 final int count = 5;
730 final AtomicInteger ran = new AtomicInteger(0);
731 ThreadPoolExecutor p =
732 new CustomTPE(poolSize, poolSize, LONG_DELAY_MS, MILLISECONDS,
733 new ArrayBlockingQueue<Runnable>(10));
734 CountDownLatch threadsStarted = new CountDownLatch(poolSize);
735 Runnable waiter = new CheckedRunnable() { public void realRun() {
736 threadsStarted.countDown();
737 try {
738 MILLISECONDS.sleep(2 * LONG_DELAY_MS);
739 } catch (InterruptedException success) {}
740 ran.getAndIncrement();
741 }};
742 for (int i = 0; i < count; i++)
743 p.execute(waiter);
744 assertTrue(threadsStarted.await(LONG_DELAY_MS, MILLISECONDS));
745 assertEquals(poolSize, p.getActiveCount());
746 assertEquals(0, p.getCompletedTaskCount());
747 final List<Runnable> queuedTasks;
748 try {
749 queuedTasks = p.shutdownNow();
750 } catch (SecurityException ok) {
751 return; // Allowed in case test doesn't have privs
752 }
753 assertTrue(p.isShutdown());
754 assertTrue(p.getQueue().isEmpty());
755 assertEquals(count - poolSize, queuedTasks.size());
756 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
757 assertTrue(p.isTerminated());
758 assertEquals(poolSize, ran.get());
759 assertEquals(poolSize, p.getCompletedTaskCount());
760 }
761
762 // Exception Tests
763
764 /**
765 * Constructor throws if corePoolSize argument is less than zero
766 */
767 public void testConstructor1() {
768 try {
769 new CustomTPE(-1, 1, 1L, SECONDS,
770 new ArrayBlockingQueue<Runnable>(10));
771 shouldThrow();
772 } catch (IllegalArgumentException success) {}
773 }
774
775 /**
776 * Constructor throws if maximumPoolSize is less than zero
777 */
778 public void testConstructor2() {
779 try {
780 new CustomTPE(1, -1, 1L, SECONDS,
781 new ArrayBlockingQueue<Runnable>(10));
782 shouldThrow();
783 } catch (IllegalArgumentException success) {}
784 }
785
786 /**
787 * Constructor throws if maximumPoolSize is equal to zero
788 */
789 public void testConstructor3() {
790 try {
791 new CustomTPE(1, 0, 1L, SECONDS,
792 new ArrayBlockingQueue<Runnable>(10));
793 shouldThrow();
794 } catch (IllegalArgumentException success) {}
795 }
796
797 /**
798 * Constructor throws if keepAliveTime is less than zero
799 */
800 public void testConstructor4() {
801 try {
802 new CustomTPE(1, 2, -1L, SECONDS,
803 new ArrayBlockingQueue<Runnable>(10));
804 shouldThrow();
805 } catch (IllegalArgumentException success) {}
806 }
807
808 /**
809 * Constructor throws if corePoolSize is greater than the maximumPoolSize
810 */
811 public void testConstructor5() {
812 try {
813 new CustomTPE(2, 1, 1L, SECONDS,
814 new ArrayBlockingQueue<Runnable>(10));
815 shouldThrow();
816 } catch (IllegalArgumentException success) {}
817 }
818
819 /**
820 * Constructor throws if workQueue is set to null
821 */
822 public void testConstructorNullPointerException() {
823 try {
824 new CustomTPE(1, 2, 1L, SECONDS, null);
825 shouldThrow();
826 } catch (NullPointerException success) {}
827 }
828
829 /**
830 * Constructor throws if corePoolSize argument is less than zero
831 */
832 public void testConstructor6() {
833 try {
834 new CustomTPE(-1, 1, 1L, SECONDS,
835 new ArrayBlockingQueue<Runnable>(10),
836 new SimpleThreadFactory());
837 shouldThrow();
838 } catch (IllegalArgumentException success) {}
839 }
840
841 /**
842 * Constructor throws if maximumPoolSize is less than zero
843 */
844 public void testConstructor7() {
845 try {
846 new CustomTPE(1,-1, 1L, SECONDS,
847 new ArrayBlockingQueue<Runnable>(10),
848 new SimpleThreadFactory());
849 shouldThrow();
850 } catch (IllegalArgumentException success) {}
851 }
852
853 /**
854 * Constructor throws if maximumPoolSize is equal to zero
855 */
856 public void testConstructor8() {
857 try {
858 new CustomTPE(1, 0, 1L, SECONDS,
859 new ArrayBlockingQueue<Runnable>(10),
860 new SimpleThreadFactory());
861 shouldThrow();
862 } catch (IllegalArgumentException success) {}
863 }
864
865 /**
866 * Constructor throws if keepAliveTime is less than zero
867 */
868 public void testConstructor9() {
869 try {
870 new CustomTPE(1, 2, -1L, SECONDS,
871 new ArrayBlockingQueue<Runnable>(10),
872 new SimpleThreadFactory());
873 shouldThrow();
874 } catch (IllegalArgumentException success) {}
875 }
876
877 /**
878 * Constructor throws if corePoolSize is greater than the maximumPoolSize
879 */
880 public void testConstructor10() {
881 try {
882 new CustomTPE(2, 1, 1L, SECONDS,
883 new ArrayBlockingQueue<Runnable>(10),
884 new SimpleThreadFactory());
885 shouldThrow();
886 } catch (IllegalArgumentException success) {}
887 }
888
889 /**
890 * Constructor throws if workQueue is set to null
891 */
892 public void testConstructorNullPointerException2() {
893 try {
894 new CustomTPE(1, 2, 1L, SECONDS, null, new SimpleThreadFactory());
895 shouldThrow();
896 } catch (NullPointerException success) {}
897 }
898
899 /**
900 * Constructor throws if threadFactory is set to null
901 */
902 public void testConstructorNullPointerException3() {
903 try {
904 new CustomTPE(1, 2, 1L, SECONDS,
905 new ArrayBlockingQueue<Runnable>(10),
906 (ThreadFactory) null);
907 shouldThrow();
908 } catch (NullPointerException success) {}
909 }
910
911 /**
912 * Constructor throws if corePoolSize argument is less than zero
913 */
914 public void testConstructor11() {
915 try {
916 new CustomTPE(-1, 1, 1L, SECONDS,
917 new ArrayBlockingQueue<Runnable>(10),
918 new NoOpREHandler());
919 shouldThrow();
920 } catch (IllegalArgumentException success) {}
921 }
922
923 /**
924 * Constructor throws if maximumPoolSize is less than zero
925 */
926 public void testConstructor12() {
927 try {
928 new CustomTPE(1, -1, 1L, SECONDS,
929 new ArrayBlockingQueue<Runnable>(10),
930 new NoOpREHandler());
931 shouldThrow();
932 } catch (IllegalArgumentException success) {}
933 }
934
935 /**
936 * Constructor throws if maximumPoolSize is equal to zero
937 */
938 public void testConstructor13() {
939 try {
940 new CustomTPE(1, 0, 1L, SECONDS,
941 new ArrayBlockingQueue<Runnable>(10),
942 new NoOpREHandler());
943 shouldThrow();
944 } catch (IllegalArgumentException success) {}
945 }
946
947 /**
948 * Constructor throws if keepAliveTime is less than zero
949 */
950 public void testConstructor14() {
951 try {
952 new CustomTPE(1, 2, -1L, SECONDS,
953 new ArrayBlockingQueue<Runnable>(10),
954 new NoOpREHandler());
955 shouldThrow();
956 } catch (IllegalArgumentException success) {}
957 }
958
959 /**
960 * Constructor throws if corePoolSize is greater than the maximumPoolSize
961 */
962 public void testConstructor15() {
963 try {
964 new CustomTPE(2, 1, 1L, SECONDS,
965 new ArrayBlockingQueue<Runnable>(10),
966 new NoOpREHandler());
967 shouldThrow();
968 } catch (IllegalArgumentException success) {}
969 }
970
971 /**
972 * Constructor throws if workQueue is set to null
973 */
974 public void testConstructorNullPointerException4() {
975 try {
976 new CustomTPE(1, 2, 1L, SECONDS,
977 null,
978 new NoOpREHandler());
979 shouldThrow();
980 } catch (NullPointerException success) {}
981 }
982
983 /**
984 * Constructor throws if handler is set to null
985 */
986 public void testConstructorNullPointerException5() {
987 try {
988 new CustomTPE(1, 2, 1L, SECONDS,
989 new ArrayBlockingQueue<Runnable>(10),
990 (RejectedExecutionHandler) null);
991 shouldThrow();
992 } catch (NullPointerException success) {}
993 }
994
995 /**
996 * Constructor throws if corePoolSize argument is less than zero
997 */
998 public void testConstructor16() {
999 try {
1000 new CustomTPE(-1, 1, 1L, SECONDS,
1001 new ArrayBlockingQueue<Runnable>(10),
1002 new SimpleThreadFactory(),
1003 new NoOpREHandler());
1004 shouldThrow();
1005 } catch (IllegalArgumentException success) {}
1006 }
1007
1008 /**
1009 * Constructor throws if maximumPoolSize is less than zero
1010 */
1011 public void testConstructor17() {
1012 try {
1013 new CustomTPE(1, -1, 1L, SECONDS,
1014 new ArrayBlockingQueue<Runnable>(10),
1015 new SimpleThreadFactory(),
1016 new NoOpREHandler());
1017 shouldThrow();
1018 } catch (IllegalArgumentException success) {}
1019 }
1020
1021 /**
1022 * Constructor throws if maximumPoolSize is equal to zero
1023 */
1024 public void testConstructor18() {
1025 try {
1026 new CustomTPE(1, 0, 1L, SECONDS,
1027 new ArrayBlockingQueue<Runnable>(10),
1028 new SimpleThreadFactory(),
1029 new NoOpREHandler());
1030 shouldThrow();
1031 } catch (IllegalArgumentException success) {}
1032 }
1033
1034 /**
1035 * Constructor throws if keepAliveTime is less than zero
1036 */
1037 public void testConstructor19() {
1038 try {
1039 new CustomTPE(1, 2, -1L, SECONDS,
1040 new ArrayBlockingQueue<Runnable>(10),
1041 new SimpleThreadFactory(),
1042 new NoOpREHandler());
1043 shouldThrow();
1044 } catch (IllegalArgumentException success) {}
1045 }
1046
1047 /**
1048 * Constructor throws if corePoolSize is greater than the maximumPoolSize
1049 */
1050 public void testConstructor20() {
1051 try {
1052 new CustomTPE(2, 1, 1L, SECONDS,
1053 new ArrayBlockingQueue<Runnable>(10),
1054 new SimpleThreadFactory(),
1055 new NoOpREHandler());
1056 shouldThrow();
1057 } catch (IllegalArgumentException success) {}
1058 }
1059
1060 /**
1061 * Constructor throws if workQueue is null
1062 */
1063 public void testConstructorNullPointerException6() {
1064 try {
1065 new CustomTPE(1, 2, 1L, SECONDS,
1066 null,
1067 new SimpleThreadFactory(),
1068 new NoOpREHandler());
1069 shouldThrow();
1070 } catch (NullPointerException success) {}
1071 }
1072
1073 /**
1074 * Constructor throws if handler is null
1075 */
1076 public void testConstructorNullPointerException7() {
1077 try {
1078 new CustomTPE(1, 2, 1L, SECONDS,
1079 new ArrayBlockingQueue<Runnable>(10),
1080 new SimpleThreadFactory(),
1081 (RejectedExecutionHandler) null);
1082 shouldThrow();
1083 } catch (NullPointerException success) {}
1084 }
1085
1086 /**
1087 * Constructor throws if ThreadFactory is null
1088 */
1089 public void testConstructorNullPointerException8() {
1090 try {
1091 new CustomTPE(1, 2, 1L, SECONDS,
1092 new ArrayBlockingQueue<Runnable>(10),
1093 (ThreadFactory) null,
1094 new NoOpREHandler());
1095 shouldThrow();
1096 } catch (NullPointerException success) {}
1097 }
1098
1099 /**
1100 * execute throws RejectedExecutionException if saturated.
1101 */
1102 public void testSaturatedExecute() {
1103 ThreadPoolExecutor p =
1104 new CustomTPE(1, 1,
1105 LONG_DELAY_MS, MILLISECONDS,
1106 new ArrayBlockingQueue<Runnable>(1));
1107 final CountDownLatch done = new CountDownLatch(1);
1108 try {
1109 Runnable task = new CheckedRunnable() {
1110 public void realRun() throws InterruptedException {
1111 done.await();
1112 }};
1113 for (int i = 0; i < 2; ++i)
1114 p.execute(task);
1115 for (int i = 0; i < 2; ++i) {
1116 try {
1117 p.execute(task);
1118 shouldThrow();
1119 } catch (RejectedExecutionException success) {}
1120 assertTrue(p.getTaskCount() <= 2);
1121 }
1122 } finally {
1123 done.countDown();
1124 joinPool(p);
1125 }
1126 }
1127
1128 /**
1129 * executor using CallerRunsPolicy runs task if saturated.
1130 */
1131 public void testSaturatedExecute2() {
1132 RejectedExecutionHandler h = new CustomTPE.CallerRunsPolicy();
1133 ThreadPoolExecutor p = new CustomTPE(1, 1,
1134 LONG_DELAY_MS, MILLISECONDS,
1135 new ArrayBlockingQueue<Runnable>(1),
1136 h);
1137 try {
1138 TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5];
1139 for (int i = 0; i < tasks.length; ++i)
1140 tasks[i] = new TrackedNoOpRunnable();
1141 TrackedLongRunnable mr = new TrackedLongRunnable();
1142 p.execute(mr);
1143 for (int i = 0; i < tasks.length; ++i)
1144 p.execute(tasks[i]);
1145 for (int i = 1; i < tasks.length; ++i)
1146 assertTrue(tasks[i].done);
1147 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1148 } finally {
1149 joinPool(p);
1150 }
1151 }
1152
1153 /**
1154 * executor using DiscardPolicy drops task if saturated.
1155 */
1156 public void testSaturatedExecute3() {
1157 RejectedExecutionHandler h = new CustomTPE.DiscardPolicy();
1158 ThreadPoolExecutor p =
1159 new CustomTPE(1, 1,
1160 LONG_DELAY_MS, MILLISECONDS,
1161 new ArrayBlockingQueue<Runnable>(1),
1162 h);
1163 try {
1164 TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5];
1165 for (int i = 0; i < tasks.length; ++i)
1166 tasks[i] = new TrackedNoOpRunnable();
1167 p.execute(new TrackedLongRunnable());
1168 for (TrackedNoOpRunnable task : tasks)
1169 p.execute(task);
1170 for (TrackedNoOpRunnable task : tasks)
1171 assertFalse(task.done);
1172 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1173 } finally {
1174 joinPool(p);
1175 }
1176 }
1177
1178 /**
1179 * executor using DiscardOldestPolicy drops oldest task if saturated.
1180 */
1181 public void testSaturatedExecute4() {
1182 RejectedExecutionHandler h = new CustomTPE.DiscardOldestPolicy();
1183 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1184 try {
1185 p.execute(new TrackedLongRunnable());
1186 TrackedLongRunnable r2 = new TrackedLongRunnable();
1187 p.execute(r2);
1188 assertTrue(p.getQueue().contains(r2));
1189 TrackedNoOpRunnable r3 = new TrackedNoOpRunnable();
1190 p.execute(r3);
1191 assertFalse(p.getQueue().contains(r2));
1192 assertTrue(p.getQueue().contains(r3));
1193 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1194 } finally {
1195 joinPool(p);
1196 }
1197 }
1198
1199 /**
1200 * execute throws RejectedExecutionException if shutdown
1201 */
1202 public void testRejectedExecutionExceptionOnShutdown() {
1203 ThreadPoolExecutor p =
1204 new CustomTPE(1,1,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(1));
1205 try { p.shutdown(); } catch (SecurityException ok) { return; }
1206 try {
1207 p.execute(new NoOpRunnable());
1208 shouldThrow();
1209 } catch (RejectedExecutionException success) {}
1210
1211 joinPool(p);
1212 }
1213
1214 /**
1215 * execute using CallerRunsPolicy drops task on shutdown
1216 */
1217 public void testCallerRunsOnShutdown() {
1218 RejectedExecutionHandler h = new CustomTPE.CallerRunsPolicy();
1219 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1220
1221 try { p.shutdown(); } catch (SecurityException ok) { return; }
1222 try {
1223 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1224 p.execute(r);
1225 assertFalse(r.done);
1226 } finally {
1227 joinPool(p);
1228 }
1229 }
1230
1231 /**
1232 * execute using DiscardPolicy drops task on shutdown
1233 */
1234 public void testDiscardOnShutdown() {
1235 RejectedExecutionHandler h = new CustomTPE.DiscardPolicy();
1236 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1237
1238 try { p.shutdown(); } catch (SecurityException ok) { return; }
1239 try {
1240 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1241 p.execute(r);
1242 assertFalse(r.done);
1243 } finally {
1244 joinPool(p);
1245 }
1246 }
1247
1248 /**
1249 * execute using DiscardOldestPolicy drops task on shutdown
1250 */
1251 public void testDiscardOldestOnShutdown() {
1252 RejectedExecutionHandler h = new CustomTPE.DiscardOldestPolicy();
1253 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1254
1255 try { p.shutdown(); } catch (SecurityException ok) { return; }
1256 try {
1257 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1258 p.execute(r);
1259 assertFalse(r.done);
1260 } finally {
1261 joinPool(p);
1262 }
1263 }
1264
1265 /**
1266 * execute(null) throws NPE
1267 */
1268 public void testExecuteNull() {
1269 ThreadPoolExecutor p =
1270 new CustomTPE(1, 2, 1L, SECONDS,
1271 new ArrayBlockingQueue<Runnable>(10));
1272 try {
1273 p.execute(null);
1274 shouldThrow();
1275 } catch (NullPointerException success) {}
1276
1277 joinPool(p);
1278 }
1279
1280 /**
1281 * setCorePoolSize of negative value throws IllegalArgumentException
1282 */
1283 public void testCorePoolSizeIllegalArgumentException() {
1284 ThreadPoolExecutor p =
1285 new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1286 try {
1287 p.setCorePoolSize(-1);
1288 shouldThrow();
1289 } catch (IllegalArgumentException success) {
1290 } finally {
1291 try { p.shutdown(); } catch (SecurityException ok) { return; }
1292 }
1293 joinPool(p);
1294 }
1295
1296 /**
1297 * setMaximumPoolSize(int) throws IllegalArgumentException
1298 * if given a value less the core pool size
1299 */
1300 public void testMaximumPoolSizeIllegalArgumentException() {
1301 ThreadPoolExecutor p =
1302 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1303 try {
1304 p.setMaximumPoolSize(1);
1305 shouldThrow();
1306 } catch (IllegalArgumentException success) {
1307 } finally {
1308 try { p.shutdown(); } catch (SecurityException ok) { return; }
1309 }
1310 joinPool(p);
1311 }
1312
1313 /**
1314 * setMaximumPoolSize throws IllegalArgumentException
1315 * if given a negative value
1316 */
1317 public void testMaximumPoolSizeIllegalArgumentException2() {
1318 ThreadPoolExecutor p =
1319 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1320 try {
1321 p.setMaximumPoolSize(-1);
1322 shouldThrow();
1323 } catch (IllegalArgumentException success) {
1324 } finally {
1325 try { p.shutdown(); } catch (SecurityException ok) { return; }
1326 }
1327 joinPool(p);
1328 }
1329
1330 /**
1331 * setKeepAliveTime throws IllegalArgumentException
1332 * when given a negative value
1333 */
1334 public void testKeepAliveTimeIllegalArgumentException() {
1335 ThreadPoolExecutor p =
1336 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1337
1338 try {
1339 p.setKeepAliveTime(-1,MILLISECONDS);
1340 shouldThrow();
1341 } catch (IllegalArgumentException success) {
1342 } finally {
1343 try { p.shutdown(); } catch (SecurityException ok) { return; }
1344 }
1345 joinPool(p);
1346 }
1347
1348 /**
1349 * terminated() is called on termination
1350 */
1351 public void testTerminated() {
1352 CustomTPE p = new CustomTPE();
1353 try { p.shutdown(); } catch (SecurityException ok) { return; }
1354 assertTrue(p.terminatedCalled());
1355 joinPool(p);
1356 }
1357
1358 /**
1359 * beforeExecute and afterExecute are called when executing task
1360 */
1361 public void testBeforeAfter() throws InterruptedException {
1362 CustomTPE p = new CustomTPE();
1363 try {
1364 final CountDownLatch done = new CountDownLatch(1);
1365 p.execute(new CheckedRunnable() {
1366 public void realRun() {
1367 done.countDown();
1368 }});
1369 await(p.afterCalled);
1370 assertEquals(0, done.getCount());
1371 assertTrue(p.afterCalled());
1372 assertTrue(p.beforeCalled());
1373 try { p.shutdown(); } catch (SecurityException ok) { return; }
1374 } finally {
1375 joinPool(p);
1376 }
1377 }
1378
1379 /**
1380 * completed submit of callable returns result
1381 */
1382 public void testSubmitCallable() throws Exception {
1383 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1384 try {
1385 Future<String> future = e.submit(new StringTask());
1386 String result = future.get();
1387 assertSame(TEST_STRING, result);
1388 } finally {
1389 joinPool(e);
1390 }
1391 }
1392
1393 /**
1394 * completed submit of runnable returns successfully
1395 */
1396 public void testSubmitRunnable() throws Exception {
1397 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1398 try {
1399 Future<?> future = e.submit(new NoOpRunnable());
1400 future.get();
1401 assertTrue(future.isDone());
1402 } finally {
1403 joinPool(e);
1404 }
1405 }
1406
1407 /**
1408 * completed submit of (runnable, result) returns result
1409 */
1410 public void testSubmitRunnable2() throws Exception {
1411 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1412 try {
1413 Future<String> future = e.submit(new NoOpRunnable(), TEST_STRING);
1414 String result = future.get();
1415 assertSame(TEST_STRING, result);
1416 } finally {
1417 joinPool(e);
1418 }
1419 }
1420
1421 /**
1422 * invokeAny(null) throws NPE
1423 */
1424 public void testInvokeAny1() throws Exception {
1425 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1426 try {
1427 e.invokeAny(null);
1428 shouldThrow();
1429 } catch (NullPointerException success) {
1430 } finally {
1431 joinPool(e);
1432 }
1433 }
1434
1435 /**
1436 * invokeAny(empty collection) throws IAE
1437 */
1438 public void testInvokeAny2() throws Exception {
1439 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1440 try {
1441 e.invokeAny(new ArrayList<Callable<String>>());
1442 shouldThrow();
1443 } catch (IllegalArgumentException success) {
1444 } finally {
1445 joinPool(e);
1446 }
1447 }
1448
1449 /**
1450 * invokeAny(c) throws NPE if c has null elements
1451 */
1452 public void testInvokeAny3() throws Exception {
1453 CountDownLatch latch = new CountDownLatch(1);
1454 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1455 List<Callable<String>> l = new ArrayList<Callable<String>>();
1456 l.add(latchAwaitingStringTask(latch));
1457 l.add(null);
1458 try {
1459 e.invokeAny(l);
1460 shouldThrow();
1461 } catch (NullPointerException success) {
1462 } finally {
1463 latch.countDown();
1464 joinPool(e);
1465 }
1466 }
1467
1468 /**
1469 * invokeAny(c) throws ExecutionException if no task completes
1470 */
1471 public void testInvokeAny4() throws Exception {
1472 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1473 List<Callable<String>> l = new ArrayList<Callable<String>>();
1474 l.add(new NPETask());
1475 try {
1476 e.invokeAny(l);
1477 shouldThrow();
1478 } catch (ExecutionException success) {
1479 assertTrue(success.getCause() instanceof NullPointerException);
1480 } finally {
1481 joinPool(e);
1482 }
1483 }
1484
1485 /**
1486 * invokeAny(c) returns result of some task
1487 */
1488 public void testInvokeAny5() throws Exception {
1489 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1490 try {
1491 List<Callable<String>> l = new ArrayList<Callable<String>>();
1492 l.add(new StringTask());
1493 l.add(new StringTask());
1494 String result = e.invokeAny(l);
1495 assertSame(TEST_STRING, result);
1496 } finally {
1497 joinPool(e);
1498 }
1499 }
1500
1501 /**
1502 * invokeAll(null) throws NPE
1503 */
1504 public void testInvokeAll1() throws Exception {
1505 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1506 try {
1507 e.invokeAll(null);
1508 shouldThrow();
1509 } catch (NullPointerException success) {
1510 } finally {
1511 joinPool(e);
1512 }
1513 }
1514
1515 /**
1516 * invokeAll(empty collection) returns empty collection
1517 */
1518 public void testInvokeAll2() throws Exception {
1519 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1520 try {
1521 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>());
1522 assertTrue(r.isEmpty());
1523 } finally {
1524 joinPool(e);
1525 }
1526 }
1527
1528 /**
1529 * invokeAll(c) throws NPE if c has null elements
1530 */
1531 public void testInvokeAll3() throws Exception {
1532 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1533 List<Callable<String>> l = new ArrayList<Callable<String>>();
1534 l.add(new StringTask());
1535 l.add(null);
1536 try {
1537 e.invokeAll(l);
1538 shouldThrow();
1539 } catch (NullPointerException success) {
1540 } finally {
1541 joinPool(e);
1542 }
1543 }
1544
1545 /**
1546 * get of element of invokeAll(c) throws exception on failed task
1547 */
1548 public void testInvokeAll4() throws Exception {
1549 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1550 List<Callable<String>> l = new ArrayList<Callable<String>>();
1551 l.add(new NPETask());
1552 List<Future<String>> futures = e.invokeAll(l);
1553 assertEquals(1, futures.size());
1554 try {
1555 futures.get(0).get();
1556 shouldThrow();
1557 } catch (ExecutionException success) {
1558 assertTrue(success.getCause() instanceof NullPointerException);
1559 } finally {
1560 joinPool(e);
1561 }
1562 }
1563
1564 /**
1565 * invokeAll(c) returns results of all completed tasks
1566 */
1567 public void testInvokeAll5() throws Exception {
1568 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1569 try {
1570 List<Callable<String>> l = new ArrayList<Callable<String>>();
1571 l.add(new StringTask());
1572 l.add(new StringTask());
1573 List<Future<String>> futures = e.invokeAll(l);
1574 assertEquals(2, futures.size());
1575 for (Future<String> future : futures)
1576 assertSame(TEST_STRING, future.get());
1577 } finally {
1578 joinPool(e);
1579 }
1580 }
1581
1582 /**
1583 * timed invokeAny(null) throws NPE
1584 */
1585 public void testTimedInvokeAny1() throws Exception {
1586 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1587 try {
1588 e.invokeAny(null, MEDIUM_DELAY_MS, MILLISECONDS);
1589 shouldThrow();
1590 } catch (NullPointerException success) {
1591 } finally {
1592 joinPool(e);
1593 }
1594 }
1595
1596 /**
1597 * timed invokeAny(,,null) throws NPE
1598 */
1599 public void testTimedInvokeAnyNullTimeUnit() throws Exception {
1600 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1601 List<Callable<String>> l = new ArrayList<Callable<String>>();
1602 l.add(new StringTask());
1603 try {
1604 e.invokeAny(l, MEDIUM_DELAY_MS, null);
1605 shouldThrow();
1606 } catch (NullPointerException success) {
1607 } finally {
1608 joinPool(e);
1609 }
1610 }
1611
1612 /**
1613 * timed invokeAny(empty collection) throws IAE
1614 */
1615 public void testTimedInvokeAny2() throws Exception {
1616 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1617 try {
1618 e.invokeAny(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1619 shouldThrow();
1620 } catch (IllegalArgumentException success) {
1621 } finally {
1622 joinPool(e);
1623 }
1624 }
1625
1626 /**
1627 * timed invokeAny(c) throws NPE if c has null elements
1628 */
1629 public void testTimedInvokeAny3() throws Exception {
1630 CountDownLatch latch = new CountDownLatch(1);
1631 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1632 List<Callable<String>> l = new ArrayList<Callable<String>>();
1633 l.add(latchAwaitingStringTask(latch));
1634 l.add(null);
1635 try {
1636 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1637 shouldThrow();
1638 } catch (NullPointerException success) {
1639 } finally {
1640 latch.countDown();
1641 joinPool(e);
1642 }
1643 }
1644
1645 /**
1646 * timed invokeAny(c) throws ExecutionException if no task completes
1647 */
1648 public void testTimedInvokeAny4() throws Exception {
1649 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1650 List<Callable<String>> l = new ArrayList<Callable<String>>();
1651 l.add(new NPETask());
1652 try {
1653 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1654 shouldThrow();
1655 } catch (ExecutionException success) {
1656 assertTrue(success.getCause() instanceof NullPointerException);
1657 } finally {
1658 joinPool(e);
1659 }
1660 }
1661
1662 /**
1663 * timed invokeAny(c) returns result of some task
1664 */
1665 public void testTimedInvokeAny5() throws Exception {
1666 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1667 try {
1668 List<Callable<String>> l = new ArrayList<Callable<String>>();
1669 l.add(new StringTask());
1670 l.add(new StringTask());
1671 String result = e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1672 assertSame(TEST_STRING, result);
1673 } finally {
1674 joinPool(e);
1675 }
1676 }
1677
1678 /**
1679 * timed invokeAll(null) throws NPE
1680 */
1681 public void testTimedInvokeAll1() throws Exception {
1682 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1683 try {
1684 e.invokeAll(null, MEDIUM_DELAY_MS, MILLISECONDS);
1685 shouldThrow();
1686 } catch (NullPointerException success) {
1687 } finally {
1688 joinPool(e);
1689 }
1690 }
1691
1692 /**
1693 * timed invokeAll(,,null) throws NPE
1694 */
1695 public void testTimedInvokeAllNullTimeUnit() throws Exception {
1696 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1697 List<Callable<String>> l = new ArrayList<Callable<String>>();
1698 l.add(new StringTask());
1699 try {
1700 e.invokeAll(l, MEDIUM_DELAY_MS, null);
1701 shouldThrow();
1702 } catch (NullPointerException success) {
1703 } finally {
1704 joinPool(e);
1705 }
1706 }
1707
1708 /**
1709 * timed invokeAll(empty collection) returns empty collection
1710 */
1711 public void testTimedInvokeAll2() throws Exception {
1712 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1713 try {
1714 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1715 assertTrue(r.isEmpty());
1716 } finally {
1717 joinPool(e);
1718 }
1719 }
1720
1721 /**
1722 * timed invokeAll(c) throws NPE if c has null elements
1723 */
1724 public void testTimedInvokeAll3() throws Exception {
1725 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1726 List<Callable<String>> l = new ArrayList<Callable<String>>();
1727 l.add(new StringTask());
1728 l.add(null);
1729 try {
1730 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1731 shouldThrow();
1732 } catch (NullPointerException success) {
1733 } finally {
1734 joinPool(e);
1735 }
1736 }
1737
1738 /**
1739 * get of element of invokeAll(c) throws exception on failed task
1740 */
1741 public void testTimedInvokeAll4() throws Exception {
1742 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1743 List<Callable<String>> l = new ArrayList<Callable<String>>();
1744 l.add(new NPETask());
1745 List<Future<String>> futures =
1746 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1747 assertEquals(1, futures.size());
1748 try {
1749 futures.get(0).get();
1750 shouldThrow();
1751 } catch (ExecutionException success) {
1752 assertTrue(success.getCause() instanceof NullPointerException);
1753 } finally {
1754 joinPool(e);
1755 }
1756 }
1757
1758 /**
1759 * timed invokeAll(c) returns results of all completed tasks
1760 */
1761 public void testTimedInvokeAll5() throws Exception {
1762 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1763 try {
1764 List<Callable<String>> l = new ArrayList<Callable<String>>();
1765 l.add(new StringTask());
1766 l.add(new StringTask());
1767 List<Future<String>> futures =
1768 e.invokeAll(l, LONG_DELAY_MS, MILLISECONDS);
1769 assertEquals(2, futures.size());
1770 for (Future<String> future : futures)
1771 assertSame(TEST_STRING, future.get());
1772 } finally {
1773 joinPool(e);
1774 }
1775 }
1776
1777 /**
1778 * timed invokeAll(c) cancels tasks not completed by timeout
1779 */
1780 public void testTimedInvokeAll6() throws Exception {
1781 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1782 try {
1783 for (long timeout = timeoutMillis();;) {
1784 List<Callable<String>> tasks = new ArrayList<>();
1785 tasks.add(new StringTask("0"));
1786 tasks.add(Executors.callable(new LongPossiblyInterruptedRunnable(), TEST_STRING));
1787 tasks.add(new StringTask("2"));
1788 long startTime = System.nanoTime();
1789 List<Future<String>> futures =
1790 e.invokeAll(tasks, timeout, MILLISECONDS);
1791 assertEquals(tasks.size(), futures.size());
1792 assertTrue(millisElapsedSince(startTime) >= timeout);
1793 for (Future future : futures)
1794 assertTrue(future.isDone());
1795 assertTrue(futures.get(1).isCancelled());
1796 try {
1797 assertEquals("0", futures.get(0).get());
1798 assertEquals("2", futures.get(2).get());
1799 break;
1800 } catch (CancellationException retryWithLongerTimeout) {
1801 timeout *= 2;
1802 if (timeout >= LONG_DELAY_MS / 2)
1803 fail("expected exactly one task to be cancelled");
1804 }
1805 }
1806 } finally {
1807 joinPool(e);
1808 }
1809 }
1810
1811 /**
1812 * Execution continues if there is at least one thread even if
1813 * thread factory fails to create more
1814 */
1815 public void testFailingThreadFactory() throws InterruptedException {
1816 final ExecutorService e =
1817 new CustomTPE(100, 100,
1818 LONG_DELAY_MS, MILLISECONDS,
1819 new LinkedBlockingQueue<Runnable>(),
1820 new FailingThreadFactory());
1821 try {
1822 final int TASKS = 100;
1823 final CountDownLatch done = new CountDownLatch(TASKS);
1824 for (int k = 0; k < TASKS; ++k)
1825 e.execute(new CheckedRunnable() {
1826 public void realRun() {
1827 done.countDown();
1828 }});
1829 assertTrue(done.await(LONG_DELAY_MS, MILLISECONDS));
1830 } finally {
1831 joinPool(e);
1832 }
1833 }
1834
1835 /**
1836 * allowsCoreThreadTimeOut is by default false.
1837 */
1838 public void testAllowsCoreThreadTimeOut() {
1839 ThreadPoolExecutor p = new CustomTPE(2, 2, 1000, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1840 assertFalse(p.allowsCoreThreadTimeOut());
1841 joinPool(p);
1842 }
1843
1844 /**
1845 * allowCoreThreadTimeOut(true) causes idle threads to time out
1846 */
1847 public void testAllowCoreThreadTimeOut_true() throws Exception {
1848 long keepAliveTime = timeoutMillis();
1849 final ThreadPoolExecutor p =
1850 new CustomTPE(2, 10,
1851 keepAliveTime, MILLISECONDS,
1852 new ArrayBlockingQueue<Runnable>(10));
1853 final CountDownLatch threadStarted = new CountDownLatch(1);
1854 try {
1855 p.allowCoreThreadTimeOut(true);
1856 p.execute(new CheckedRunnable() {
1857 public void realRun() {
1858 threadStarted.countDown();
1859 assertEquals(1, p.getPoolSize());
1860 }});
1861 await(threadStarted);
1862 delay(keepAliveTime);
1863 long startTime = System.nanoTime();
1864 while (p.getPoolSize() > 0
1865 && millisElapsedSince(startTime) < LONG_DELAY_MS)
1866 Thread.yield();
1867 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1868 assertEquals(0, p.getPoolSize());
1869 } finally {
1870 joinPool(p);
1871 }
1872 }
1873
1874 /**
1875 * allowCoreThreadTimeOut(false) causes idle threads not to time out
1876 */
1877 public void testAllowCoreThreadTimeOut_false() throws Exception {
1878 long keepAliveTime = timeoutMillis();
1879 final ThreadPoolExecutor p =
1880 new CustomTPE(2, 10,
1881 keepAliveTime, MILLISECONDS,
1882 new ArrayBlockingQueue<Runnable>(10));
1883 final CountDownLatch threadStarted = new CountDownLatch(1);
1884 try {
1885 p.allowCoreThreadTimeOut(false);
1886 p.execute(new CheckedRunnable() {
1887 public void realRun() throws InterruptedException {
1888 threadStarted.countDown();
1889 assertTrue(p.getPoolSize() >= 1);
1890 }});
1891 delay(2 * keepAliveTime);
1892 assertTrue(p.getPoolSize() >= 1);
1893 } finally {
1894 joinPool(p);
1895 }
1896 }
1897
1898 /**
1899 * get(cancelled task) throws CancellationException
1900 * (in part, a test of CustomTPE itself)
1901 */
1902 public void testGet_cancelled() throws Exception {
1903 final ExecutorService e =
1904 new CustomTPE(1, 1,
1905 LONG_DELAY_MS, MILLISECONDS,
1906 new LinkedBlockingQueue<Runnable>());
1907 try {
1908 final CountDownLatch blockerStarted = new CountDownLatch(1);
1909 final CountDownLatch done = new CountDownLatch(1);
1910 final List<Future<?>> futures = new ArrayList<>();
1911 for (int i = 0; i < 2; i++) {
1912 Runnable r = new CheckedRunnable() { public void realRun()
1913 throws Throwable {
1914 blockerStarted.countDown();
1915 assertTrue(done.await(2 * LONG_DELAY_MS, MILLISECONDS));
1916 }};
1917 futures.add(e.submit(r));
1918 }
1919 assertTrue(blockerStarted.await(LONG_DELAY_MS, MILLISECONDS));
1920 for (Future<?> future : futures) future.cancel(false);
1921 for (Future<?> future : futures) {
1922 try {
1923 future.get();
1924 shouldThrow();
1925 } catch (CancellationException success) {}
1926 try {
1927 future.get(LONG_DELAY_MS, MILLISECONDS);
1928 shouldThrow();
1929 } catch (CancellationException success) {}
1930 assertTrue(future.isCancelled());
1931 assertTrue(future.isDone());
1932 }
1933 done.countDown();
1934 } finally {
1935 joinPool(e);
1936 }
1937 }
1938
1939 }