ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/ThreadPoolExecutorSubclassTest.java
Revision: 1.59
Committed: Sun Oct 4 01:56:51 2015 UTC (8 years, 7 months ago) by jsr166
Branch: MAIN
Changes since 1.58: +15 -10 lines
Log Message:
improve testGetRejectedExecutionHandler

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include Andrew Wright, Jeffrey Hayes,
6 * Pat Fisher, Mike Judd.
7 */
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.SECONDS;
11
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.concurrent.ArrayBlockingQueue;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.CancellationException;
18 import java.util.concurrent.CountDownLatch;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.FutureTask;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.RejectedExecutionHandler;
27 import java.util.concurrent.RunnableFuture;
28 import java.util.concurrent.SynchronousQueue;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.TimeoutException;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.locks.Condition;
35 import java.util.concurrent.locks.ReentrantLock;
36
37 import junit.framework.Test;
38 import junit.framework.TestSuite;
39
40 public class ThreadPoolExecutorSubclassTest extends JSR166TestCase {
41 public static void main(String[] args) {
42 main(suite(), args);
43 }
44 public static Test suite() {
45 return new TestSuite(ThreadPoolExecutorSubclassTest.class);
46 }
47
48 static class CustomTask<V> implements RunnableFuture<V> {
49 final Callable<V> callable;
50 final ReentrantLock lock = new ReentrantLock();
51 final Condition cond = lock.newCondition();
52 boolean done;
53 boolean cancelled;
54 V result;
55 Thread thread;
56 Exception exception;
57 CustomTask(Callable<V> c) {
58 if (c == null) throw new NullPointerException();
59 callable = c;
60 }
61 CustomTask(final Runnable r, final V res) {
62 if (r == null) throw new NullPointerException();
63 callable = new Callable<V>() {
64 public V call() throws Exception { r.run(); return res; }};
65 }
66 public boolean isDone() {
67 lock.lock(); try { return done; } finally { lock.unlock() ; }
68 }
69 public boolean isCancelled() {
70 lock.lock(); try { return cancelled; } finally { lock.unlock() ; }
71 }
72 public boolean cancel(boolean mayInterrupt) {
73 lock.lock();
74 try {
75 if (!done) {
76 cancelled = true;
77 done = true;
78 if (mayInterrupt && thread != null)
79 thread.interrupt();
80 return true;
81 }
82 return false;
83 }
84 finally { lock.unlock() ; }
85 }
86 public void run() {
87 lock.lock();
88 try {
89 if (done)
90 return;
91 thread = Thread.currentThread();
92 }
93 finally { lock.unlock() ; }
94 V v = null;
95 Exception e = null;
96 try {
97 v = callable.call();
98 }
99 catch (Exception ex) {
100 e = ex;
101 }
102 lock.lock();
103 try {
104 if (!done) {
105 result = v;
106 exception = e;
107 done = true;
108 thread = null;
109 cond.signalAll();
110 }
111 }
112 finally { lock.unlock(); }
113 }
114 public V get() throws InterruptedException, ExecutionException {
115 lock.lock();
116 try {
117 while (!done)
118 cond.await();
119 if (cancelled)
120 throw new CancellationException();
121 if (exception != null)
122 throw new ExecutionException(exception);
123 return result;
124 }
125 finally { lock.unlock(); }
126 }
127 public V get(long timeout, TimeUnit unit)
128 throws InterruptedException, ExecutionException, TimeoutException {
129 long nanos = unit.toNanos(timeout);
130 lock.lock();
131 try {
132 while (!done) {
133 if (nanos <= 0L)
134 throw new TimeoutException();
135 nanos = cond.awaitNanos(nanos);
136 }
137 if (cancelled)
138 throw new CancellationException();
139 if (exception != null)
140 throw new ExecutionException(exception);
141 return result;
142 }
143 finally { lock.unlock(); }
144 }
145 }
146
147 static class CustomTPE extends ThreadPoolExecutor {
148 protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
149 return new CustomTask<V>(c);
150 }
151 protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
152 return new CustomTask<V>(r, v);
153 }
154
155 CustomTPE(int corePoolSize,
156 int maximumPoolSize,
157 long keepAliveTime,
158 TimeUnit unit,
159 BlockingQueue<Runnable> workQueue) {
160 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
161 workQueue);
162 }
163 CustomTPE(int corePoolSize,
164 int maximumPoolSize,
165 long keepAliveTime,
166 TimeUnit unit,
167 BlockingQueue<Runnable> workQueue,
168 ThreadFactory threadFactory) {
169 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
170 threadFactory);
171 }
172
173 CustomTPE(int corePoolSize,
174 int maximumPoolSize,
175 long keepAliveTime,
176 TimeUnit unit,
177 BlockingQueue<Runnable> workQueue,
178 RejectedExecutionHandler handler) {
179 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
180 handler);
181 }
182 CustomTPE(int corePoolSize,
183 int maximumPoolSize,
184 long keepAliveTime,
185 TimeUnit unit,
186 BlockingQueue<Runnable> workQueue,
187 ThreadFactory threadFactory,
188 RejectedExecutionHandler handler) {
189 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
190 workQueue, threadFactory, handler);
191 }
192
193 final CountDownLatch beforeCalled = new CountDownLatch(1);
194 final CountDownLatch afterCalled = new CountDownLatch(1);
195 final CountDownLatch terminatedCalled = new CountDownLatch(1);
196
197 public CustomTPE() {
198 super(1, 1, LONG_DELAY_MS, MILLISECONDS, new SynchronousQueue<Runnable>());
199 }
200 protected void beforeExecute(Thread t, Runnable r) {
201 beforeCalled.countDown();
202 }
203 protected void afterExecute(Runnable r, Throwable t) {
204 afterCalled.countDown();
205 }
206 protected void terminated() {
207 terminatedCalled.countDown();
208 }
209
210 public boolean beforeCalled() {
211 return beforeCalled.getCount() == 0;
212 }
213 public boolean afterCalled() {
214 return afterCalled.getCount() == 0;
215 }
216 public boolean terminatedCalled() {
217 return terminatedCalled.getCount() == 0;
218 }
219 }
220
221 static class FailingThreadFactory implements ThreadFactory {
222 int calls = 0;
223 public Thread newThread(Runnable r) {
224 if (++calls > 1) return null;
225 return new Thread(r);
226 }
227 }
228
229 /**
230 * execute successfully executes a runnable
231 */
232 public void testExecute() throws InterruptedException {
233 final ThreadPoolExecutor p =
234 new CustomTPE(1, 1,
235 2 * LONG_DELAY_MS, MILLISECONDS,
236 new ArrayBlockingQueue<Runnable>(10));
237 try (PoolCleaner cleaner = cleaner(p)) {
238 final CountDownLatch done = new CountDownLatch(1);
239 final Runnable task = new CheckedRunnable() {
240 public void realRun() { done.countDown(); }};
241 p.execute(task);
242 assertTrue(done.await(LONG_DELAY_MS, MILLISECONDS));
243 }
244 }
245
246 /**
247 * getActiveCount increases but doesn't overestimate, when a
248 * thread becomes active
249 */
250 public void testGetActiveCount() throws InterruptedException {
251 final ThreadPoolExecutor p =
252 new CustomTPE(2, 2,
253 LONG_DELAY_MS, MILLISECONDS,
254 new ArrayBlockingQueue<Runnable>(10));
255 final CountDownLatch threadStarted = new CountDownLatch(1);
256 final CountDownLatch done = new CountDownLatch(1);
257 try (PoolCleaner cleaner = cleaner(p)) {
258 assertEquals(0, p.getActiveCount());
259 p.execute(new CheckedRunnable() {
260 public void realRun() throws InterruptedException {
261 threadStarted.countDown();
262 assertEquals(1, p.getActiveCount());
263 done.await();
264 }});
265 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
266 assertEquals(1, p.getActiveCount());
267 done.countDown();
268 }
269 }
270
271 /**
272 * prestartCoreThread starts a thread if under corePoolSize, else doesn't
273 */
274 public void testPrestartCoreThread() {
275 ThreadPoolExecutor p =
276 new CustomTPE(2, 6,
277 LONG_DELAY_MS, MILLISECONDS,
278 new ArrayBlockingQueue<Runnable>(10));
279 try (PoolCleaner cleaner = cleaner(p)) {
280 assertEquals(0, p.getPoolSize());
281 assertTrue(p.prestartCoreThread());
282 assertEquals(1, p.getPoolSize());
283 assertTrue(p.prestartCoreThread());
284 assertEquals(2, p.getPoolSize());
285 assertFalse(p.prestartCoreThread());
286 assertEquals(2, p.getPoolSize());
287 p.setCorePoolSize(4);
288 assertTrue(p.prestartCoreThread());
289 assertEquals(3, p.getPoolSize());
290 assertTrue(p.prestartCoreThread());
291 assertEquals(4, p.getPoolSize());
292 assertFalse(p.prestartCoreThread());
293 assertEquals(4, p.getPoolSize());
294 }
295 }
296
297 /**
298 * prestartAllCoreThreads starts all corePoolSize threads
299 */
300 public void testPrestartAllCoreThreads() {
301 ThreadPoolExecutor p =
302 new CustomTPE(2, 6,
303 LONG_DELAY_MS, MILLISECONDS,
304 new ArrayBlockingQueue<Runnable>(10));
305 try (PoolCleaner cleaner = cleaner(p)) {
306 assertEquals(0, p.getPoolSize());
307 p.prestartAllCoreThreads();
308 assertEquals(2, p.getPoolSize());
309 p.prestartAllCoreThreads();
310 assertEquals(2, p.getPoolSize());
311 p.setCorePoolSize(4);
312 p.prestartAllCoreThreads();
313 assertEquals(4, p.getPoolSize());
314 p.prestartAllCoreThreads();
315 assertEquals(4, p.getPoolSize());
316 }
317 }
318
319 /**
320 * getCompletedTaskCount increases, but doesn't overestimate,
321 * when tasks complete
322 */
323 public void testGetCompletedTaskCount() throws InterruptedException {
324 final ThreadPoolExecutor p =
325 new CustomTPE(2, 2,
326 LONG_DELAY_MS, MILLISECONDS,
327 new ArrayBlockingQueue<Runnable>(10));
328 try (PoolCleaner cleaner = cleaner(p)) {
329 final CountDownLatch threadStarted = new CountDownLatch(1);
330 final CountDownLatch threadProceed = new CountDownLatch(1);
331 final CountDownLatch threadDone = new CountDownLatch(1);
332 assertEquals(0, p.getCompletedTaskCount());
333 p.execute(new CheckedRunnable() {
334 public void realRun() throws InterruptedException {
335 threadStarted.countDown();
336 assertEquals(0, p.getCompletedTaskCount());
337 threadProceed.await();
338 threadDone.countDown();
339 }});
340 await(threadStarted);
341 assertEquals(0, p.getCompletedTaskCount());
342 threadProceed.countDown();
343 threadDone.await();
344 long startTime = System.nanoTime();
345 while (p.getCompletedTaskCount() != 1) {
346 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
347 fail("timed out");
348 Thread.yield();
349 }
350 }
351 }
352
353 /**
354 * getCorePoolSize returns size given in constructor if not otherwise set
355 */
356 public void testGetCorePoolSize() {
357 final ThreadPoolExecutor p =
358 new CustomTPE(1, 1,
359 LONG_DELAY_MS, MILLISECONDS,
360 new ArrayBlockingQueue<Runnable>(10));
361 try (PoolCleaner cleaner = cleaner(p)) {
362 assertEquals(1, p.getCorePoolSize());
363 }
364 }
365
366 /**
367 * getKeepAliveTime returns value given in constructor if not otherwise set
368 */
369 public void testGetKeepAliveTime() {
370 final ThreadPoolExecutor p =
371 new CustomTPE(2, 2,
372 1000, MILLISECONDS,
373 new ArrayBlockingQueue<Runnable>(10));
374 try (PoolCleaner cleaner = cleaner(p)) {
375 assertEquals(1, p.getKeepAliveTime(SECONDS));
376 }
377 }
378
379 /**
380 * getThreadFactory returns factory in constructor if not set
381 */
382 public void testGetThreadFactory() {
383 final ThreadFactory threadFactory = new SimpleThreadFactory();
384 final ThreadPoolExecutor p =
385 new CustomTPE(1, 2,
386 LONG_DELAY_MS, MILLISECONDS,
387 new ArrayBlockingQueue<Runnable>(10),
388 threadFactory,
389 new NoOpREHandler());
390 try (PoolCleaner cleaner = cleaner(p)) {
391 assertSame(threadFactory, p.getThreadFactory());
392 }
393 }
394
395 /**
396 * setThreadFactory sets the thread factory returned by getThreadFactory
397 */
398 public void testSetThreadFactory() {
399 final ThreadPoolExecutor p =
400 new CustomTPE(1, 2,
401 LONG_DELAY_MS, MILLISECONDS,
402 new ArrayBlockingQueue<Runnable>(10));
403 try (PoolCleaner cleaner = cleaner(p)) {
404 ThreadFactory threadFactory = new SimpleThreadFactory();
405 p.setThreadFactory(threadFactory);
406 assertSame(threadFactory, p.getThreadFactory());
407 }
408 }
409
410 /**
411 * setThreadFactory(null) throws NPE
412 */
413 public void testSetThreadFactoryNull() {
414 final ThreadPoolExecutor p =
415 new CustomTPE(1, 2,
416 LONG_DELAY_MS, MILLISECONDS,
417 new ArrayBlockingQueue<Runnable>(10));
418 try (PoolCleaner cleaner = cleaner(p)) {
419 try {
420 p.setThreadFactory(null);
421 shouldThrow();
422 } catch (NullPointerException success) {}
423 }
424 }
425
426 /**
427 * getRejectedExecutionHandler returns handler in constructor if not set
428 */
429 public void testGetRejectedExecutionHandler() {
430 final RejectedExecutionHandler h = new NoOpREHandler();
431 final ThreadPoolExecutor p =
432 new CustomTPE(1, 2,
433 LONG_DELAY_MS, MILLISECONDS,
434 new ArrayBlockingQueue<Runnable>(10),
435 h);
436 try (PoolCleaner cleaner = cleaner(p)) {
437 assertSame(h, p.getRejectedExecutionHandler());
438 }
439 }
440
441 /**
442 * setRejectedExecutionHandler sets the handler returned by
443 * getRejectedExecutionHandler
444 */
445 public void testSetRejectedExecutionHandler() {
446 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
447 RejectedExecutionHandler h = new NoOpREHandler();
448 p.setRejectedExecutionHandler(h);
449 assertSame(h, p.getRejectedExecutionHandler());
450 joinPool(p);
451 }
452
453 /**
454 * setRejectedExecutionHandler(null) throws NPE
455 */
456 public void testSetRejectedExecutionHandlerNull() {
457 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
458 try {
459 p.setRejectedExecutionHandler(null);
460 shouldThrow();
461 } catch (NullPointerException success) {
462 } finally {
463 joinPool(p);
464 }
465 }
466
467 /**
468 * getLargestPoolSize increases, but doesn't overestimate, when
469 * multiple threads active
470 */
471 public void testGetLargestPoolSize() throws InterruptedException {
472 final int THREADS = 3;
473 final ThreadPoolExecutor p =
474 new CustomTPE(THREADS, THREADS,
475 LONG_DELAY_MS, MILLISECONDS,
476 new ArrayBlockingQueue<Runnable>(10));
477 final CountDownLatch threadsStarted = new CountDownLatch(THREADS);
478 final CountDownLatch done = new CountDownLatch(1);
479 try {
480 assertEquals(0, p.getLargestPoolSize());
481 for (int i = 0; i < THREADS; i++)
482 p.execute(new CheckedRunnable() {
483 public void realRun() throws InterruptedException {
484 threadsStarted.countDown();
485 done.await();
486 assertEquals(THREADS, p.getLargestPoolSize());
487 }});
488 assertTrue(threadsStarted.await(SMALL_DELAY_MS, MILLISECONDS));
489 assertEquals(THREADS, p.getLargestPoolSize());
490 } finally {
491 done.countDown();
492 joinPool(p);
493 assertEquals(THREADS, p.getLargestPoolSize());
494 }
495 }
496
497 /**
498 * getMaximumPoolSize returns value given in constructor if not
499 * otherwise set
500 */
501 public void testGetMaximumPoolSize() {
502 ThreadPoolExecutor p = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
503 assertEquals(2, p.getMaximumPoolSize());
504 joinPool(p);
505 }
506
507 /**
508 * getPoolSize increases, but doesn't overestimate, when threads
509 * become active
510 */
511 public void testGetPoolSize() throws InterruptedException {
512 final ThreadPoolExecutor p =
513 new CustomTPE(1, 1,
514 LONG_DELAY_MS, MILLISECONDS,
515 new ArrayBlockingQueue<Runnable>(10));
516 final CountDownLatch threadStarted = new CountDownLatch(1);
517 final CountDownLatch done = new CountDownLatch(1);
518 try {
519 assertEquals(0, p.getPoolSize());
520 p.execute(new CheckedRunnable() {
521 public void realRun() throws InterruptedException {
522 threadStarted.countDown();
523 assertEquals(1, p.getPoolSize());
524 done.await();
525 }});
526 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
527 assertEquals(1, p.getPoolSize());
528 } finally {
529 done.countDown();
530 joinPool(p);
531 }
532 }
533
534 /**
535 * getTaskCount increases, but doesn't overestimate, when tasks submitted
536 */
537 public void testGetTaskCount() throws InterruptedException {
538 final ThreadPoolExecutor p =
539 new CustomTPE(1, 1,
540 LONG_DELAY_MS, MILLISECONDS,
541 new ArrayBlockingQueue<Runnable>(10));
542 final CountDownLatch threadStarted = new CountDownLatch(1);
543 final CountDownLatch done = new CountDownLatch(1);
544 try {
545 assertEquals(0, p.getTaskCount());
546 p.execute(new CheckedRunnable() {
547 public void realRun() throws InterruptedException {
548 threadStarted.countDown();
549 assertEquals(1, p.getTaskCount());
550 done.await();
551 }});
552 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
553 assertEquals(1, p.getTaskCount());
554 } finally {
555 done.countDown();
556 joinPool(p);
557 }
558 }
559
560 /**
561 * isShutdown is false before shutdown, true after
562 */
563 public void testIsShutdown() {
564
565 ThreadPoolExecutor p = new CustomTPE(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
566 assertFalse(p.isShutdown());
567 try { p.shutdown(); } catch (SecurityException ok) { return; }
568 assertTrue(p.isShutdown());
569 joinPool(p);
570 }
571
572 /**
573 * isTerminated is false before termination, true after
574 */
575 public void testIsTerminated() throws InterruptedException {
576 final ThreadPoolExecutor p =
577 new CustomTPE(1, 1,
578 LONG_DELAY_MS, MILLISECONDS,
579 new ArrayBlockingQueue<Runnable>(10));
580 final CountDownLatch threadStarted = new CountDownLatch(1);
581 final CountDownLatch done = new CountDownLatch(1);
582 try {
583 assertFalse(p.isTerminating());
584 p.execute(new CheckedRunnable() {
585 public void realRun() throws InterruptedException {
586 assertFalse(p.isTerminating());
587 threadStarted.countDown();
588 done.await();
589 }});
590 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
591 assertFalse(p.isTerminating());
592 done.countDown();
593 } finally {
594 try { p.shutdown(); } catch (SecurityException ok) { return; }
595 }
596 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
597 assertTrue(p.isTerminated());
598 assertFalse(p.isTerminating());
599 }
600
601 /**
602 * isTerminating is not true when running or when terminated
603 */
604 public void testIsTerminating() throws InterruptedException {
605 final ThreadPoolExecutor p =
606 new CustomTPE(1, 1,
607 LONG_DELAY_MS, MILLISECONDS,
608 new ArrayBlockingQueue<Runnable>(10));
609 final CountDownLatch threadStarted = new CountDownLatch(1);
610 final CountDownLatch done = new CountDownLatch(1);
611 try {
612 assertFalse(p.isTerminating());
613 p.execute(new CheckedRunnable() {
614 public void realRun() throws InterruptedException {
615 assertFalse(p.isTerminating());
616 threadStarted.countDown();
617 done.await();
618 }});
619 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
620 assertFalse(p.isTerminating());
621 done.countDown();
622 } finally {
623 try { p.shutdown(); } catch (SecurityException ok) { return; }
624 }
625 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
626 assertTrue(p.isTerminated());
627 assertFalse(p.isTerminating());
628 }
629
630 /**
631 * getQueue returns the work queue, which contains queued tasks
632 */
633 public void testGetQueue() throws InterruptedException {
634 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
635 final ThreadPoolExecutor p =
636 new CustomTPE(1, 1,
637 LONG_DELAY_MS, MILLISECONDS,
638 q);
639 final CountDownLatch threadStarted = new CountDownLatch(1);
640 final CountDownLatch done = new CountDownLatch(1);
641 try {
642 FutureTask[] tasks = new FutureTask[5];
643 for (int i = 0; i < tasks.length; i++) {
644 Callable task = new CheckedCallable<Boolean>() {
645 public Boolean realCall() throws InterruptedException {
646 threadStarted.countDown();
647 assertSame(q, p.getQueue());
648 done.await();
649 return Boolean.TRUE;
650 }};
651 tasks[i] = new FutureTask(task);
652 p.execute(tasks[i]);
653 }
654 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
655 assertSame(q, p.getQueue());
656 assertFalse(q.contains(tasks[0]));
657 assertTrue(q.contains(tasks[tasks.length - 1]));
658 assertEquals(tasks.length - 1, q.size());
659 } finally {
660 done.countDown();
661 joinPool(p);
662 }
663 }
664
665 /**
666 * remove(task) removes queued task, and fails to remove active task
667 */
668 public void testRemove() throws InterruptedException {
669 BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
670 final ThreadPoolExecutor p =
671 new CustomTPE(1, 1,
672 LONG_DELAY_MS, MILLISECONDS,
673 q);
674 Runnable[] tasks = new Runnable[6];
675 final CountDownLatch threadStarted = new CountDownLatch(1);
676 final CountDownLatch done = new CountDownLatch(1);
677 try {
678 for (int i = 0; i < tasks.length; i++) {
679 tasks[i] = new CheckedRunnable() {
680 public void realRun() throws InterruptedException {
681 threadStarted.countDown();
682 done.await();
683 }};
684 p.execute(tasks[i]);
685 }
686 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
687 assertFalse(p.remove(tasks[0]));
688 assertTrue(q.contains(tasks[4]));
689 assertTrue(q.contains(tasks[3]));
690 assertTrue(p.remove(tasks[4]));
691 assertFalse(p.remove(tasks[4]));
692 assertFalse(q.contains(tasks[4]));
693 assertTrue(q.contains(tasks[3]));
694 assertTrue(p.remove(tasks[3]));
695 assertFalse(q.contains(tasks[3]));
696 } finally {
697 done.countDown();
698 joinPool(p);
699 }
700 }
701
702 /**
703 * purge removes cancelled tasks from the queue
704 */
705 public void testPurge() throws InterruptedException {
706 final CountDownLatch threadStarted = new CountDownLatch(1);
707 final CountDownLatch done = new CountDownLatch(1);
708 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
709 final ThreadPoolExecutor p =
710 new CustomTPE(1, 1,
711 LONG_DELAY_MS, MILLISECONDS,
712 q);
713 FutureTask[] tasks = new FutureTask[5];
714 try {
715 for (int i = 0; i < tasks.length; i++) {
716 Callable task = new CheckedCallable<Boolean>() {
717 public Boolean realCall() throws InterruptedException {
718 threadStarted.countDown();
719 done.await();
720 return Boolean.TRUE;
721 }};
722 tasks[i] = new FutureTask(task);
723 p.execute(tasks[i]);
724 }
725 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
726 assertEquals(tasks.length, p.getTaskCount());
727 assertEquals(tasks.length - 1, q.size());
728 assertEquals(1L, p.getActiveCount());
729 assertEquals(0L, p.getCompletedTaskCount());
730 tasks[4].cancel(true);
731 tasks[3].cancel(false);
732 p.purge();
733 assertEquals(tasks.length - 3, q.size());
734 assertEquals(tasks.length - 2, p.getTaskCount());
735 p.purge(); // Nothing to do
736 assertEquals(tasks.length - 3, q.size());
737 assertEquals(tasks.length - 2, p.getTaskCount());
738 } finally {
739 done.countDown();
740 joinPool(p);
741 }
742 }
743
744 /**
745 * shutdownNow returns a list containing tasks that were not run,
746 * and those tasks are drained from the queue
747 */
748 public void testShutdownNow() throws InterruptedException {
749 final int poolSize = 2;
750 final int count = 5;
751 final AtomicInteger ran = new AtomicInteger(0);
752 ThreadPoolExecutor p =
753 new CustomTPE(poolSize, poolSize, LONG_DELAY_MS, MILLISECONDS,
754 new ArrayBlockingQueue<Runnable>(10));
755 CountDownLatch threadsStarted = new CountDownLatch(poolSize);
756 Runnable waiter = new CheckedRunnable() { public void realRun() {
757 threadsStarted.countDown();
758 try {
759 MILLISECONDS.sleep(2 * LONG_DELAY_MS);
760 } catch (InterruptedException success) {}
761 ran.getAndIncrement();
762 }};
763 for (int i = 0; i < count; i++)
764 p.execute(waiter);
765 assertTrue(threadsStarted.await(LONG_DELAY_MS, MILLISECONDS));
766 assertEquals(poolSize, p.getActiveCount());
767 assertEquals(0, p.getCompletedTaskCount());
768 final List<Runnable> queuedTasks;
769 try {
770 queuedTasks = p.shutdownNow();
771 } catch (SecurityException ok) {
772 return; // Allowed in case test doesn't have privs
773 }
774 assertTrue(p.isShutdown());
775 assertTrue(p.getQueue().isEmpty());
776 assertEquals(count - poolSize, queuedTasks.size());
777 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
778 assertTrue(p.isTerminated());
779 assertEquals(poolSize, ran.get());
780 assertEquals(poolSize, p.getCompletedTaskCount());
781 }
782
783 // Exception Tests
784
785 /**
786 * Constructor throws if corePoolSize argument is less than zero
787 */
788 public void testConstructor1() {
789 try {
790 new CustomTPE(-1, 1, 1L, SECONDS,
791 new ArrayBlockingQueue<Runnable>(10));
792 shouldThrow();
793 } catch (IllegalArgumentException success) {}
794 }
795
796 /**
797 * Constructor throws if maximumPoolSize is less than zero
798 */
799 public void testConstructor2() {
800 try {
801 new CustomTPE(1, -1, 1L, SECONDS,
802 new ArrayBlockingQueue<Runnable>(10));
803 shouldThrow();
804 } catch (IllegalArgumentException success) {}
805 }
806
807 /**
808 * Constructor throws if maximumPoolSize is equal to zero
809 */
810 public void testConstructor3() {
811 try {
812 new CustomTPE(1, 0, 1L, SECONDS,
813 new ArrayBlockingQueue<Runnable>(10));
814 shouldThrow();
815 } catch (IllegalArgumentException success) {}
816 }
817
818 /**
819 * Constructor throws if keepAliveTime is less than zero
820 */
821 public void testConstructor4() {
822 try {
823 new CustomTPE(1, 2, -1L, SECONDS,
824 new ArrayBlockingQueue<Runnable>(10));
825 shouldThrow();
826 } catch (IllegalArgumentException success) {}
827 }
828
829 /**
830 * Constructor throws if corePoolSize is greater than the maximumPoolSize
831 */
832 public void testConstructor5() {
833 try {
834 new CustomTPE(2, 1, 1L, SECONDS,
835 new ArrayBlockingQueue<Runnable>(10));
836 shouldThrow();
837 } catch (IllegalArgumentException success) {}
838 }
839
840 /**
841 * Constructor throws if workQueue is set to null
842 */
843 public void testConstructorNullPointerException() {
844 try {
845 new CustomTPE(1, 2, 1L, SECONDS, null);
846 shouldThrow();
847 } catch (NullPointerException success) {}
848 }
849
850 /**
851 * Constructor throws if corePoolSize argument is less than zero
852 */
853 public void testConstructor6() {
854 try {
855 new CustomTPE(-1, 1, 1L, SECONDS,
856 new ArrayBlockingQueue<Runnable>(10),
857 new SimpleThreadFactory());
858 shouldThrow();
859 } catch (IllegalArgumentException success) {}
860 }
861
862 /**
863 * Constructor throws if maximumPoolSize is less than zero
864 */
865 public void testConstructor7() {
866 try {
867 new CustomTPE(1,-1, 1L, SECONDS,
868 new ArrayBlockingQueue<Runnable>(10),
869 new SimpleThreadFactory());
870 shouldThrow();
871 } catch (IllegalArgumentException success) {}
872 }
873
874 /**
875 * Constructor throws if maximumPoolSize is equal to zero
876 */
877 public void testConstructor8() {
878 try {
879 new CustomTPE(1, 0, 1L, SECONDS,
880 new ArrayBlockingQueue<Runnable>(10),
881 new SimpleThreadFactory());
882 shouldThrow();
883 } catch (IllegalArgumentException success) {}
884 }
885
886 /**
887 * Constructor throws if keepAliveTime is less than zero
888 */
889 public void testConstructor9() {
890 try {
891 new CustomTPE(1, 2, -1L, SECONDS,
892 new ArrayBlockingQueue<Runnable>(10),
893 new SimpleThreadFactory());
894 shouldThrow();
895 } catch (IllegalArgumentException success) {}
896 }
897
898 /**
899 * Constructor throws if corePoolSize is greater than the maximumPoolSize
900 */
901 public void testConstructor10() {
902 try {
903 new CustomTPE(2, 1, 1L, SECONDS,
904 new ArrayBlockingQueue<Runnable>(10),
905 new SimpleThreadFactory());
906 shouldThrow();
907 } catch (IllegalArgumentException success) {}
908 }
909
910 /**
911 * Constructor throws if workQueue is set to null
912 */
913 public void testConstructorNullPointerException2() {
914 try {
915 new CustomTPE(1, 2, 1L, SECONDS, null, new SimpleThreadFactory());
916 shouldThrow();
917 } catch (NullPointerException success) {}
918 }
919
920 /**
921 * Constructor throws if threadFactory is set to null
922 */
923 public void testConstructorNullPointerException3() {
924 try {
925 new CustomTPE(1, 2, 1L, SECONDS,
926 new ArrayBlockingQueue<Runnable>(10),
927 (ThreadFactory) null);
928 shouldThrow();
929 } catch (NullPointerException success) {}
930 }
931
932 /**
933 * Constructor throws if corePoolSize argument is less than zero
934 */
935 public void testConstructor11() {
936 try {
937 new CustomTPE(-1, 1, 1L, SECONDS,
938 new ArrayBlockingQueue<Runnable>(10),
939 new NoOpREHandler());
940 shouldThrow();
941 } catch (IllegalArgumentException success) {}
942 }
943
944 /**
945 * Constructor throws if maximumPoolSize is less than zero
946 */
947 public void testConstructor12() {
948 try {
949 new CustomTPE(1, -1, 1L, SECONDS,
950 new ArrayBlockingQueue<Runnable>(10),
951 new NoOpREHandler());
952 shouldThrow();
953 } catch (IllegalArgumentException success) {}
954 }
955
956 /**
957 * Constructor throws if maximumPoolSize is equal to zero
958 */
959 public void testConstructor13() {
960 try {
961 new CustomTPE(1, 0, 1L, SECONDS,
962 new ArrayBlockingQueue<Runnable>(10),
963 new NoOpREHandler());
964 shouldThrow();
965 } catch (IllegalArgumentException success) {}
966 }
967
968 /**
969 * Constructor throws if keepAliveTime is less than zero
970 */
971 public void testConstructor14() {
972 try {
973 new CustomTPE(1, 2, -1L, SECONDS,
974 new ArrayBlockingQueue<Runnable>(10),
975 new NoOpREHandler());
976 shouldThrow();
977 } catch (IllegalArgumentException success) {}
978 }
979
980 /**
981 * Constructor throws if corePoolSize is greater than the maximumPoolSize
982 */
983 public void testConstructor15() {
984 try {
985 new CustomTPE(2, 1, 1L, SECONDS,
986 new ArrayBlockingQueue<Runnable>(10),
987 new NoOpREHandler());
988 shouldThrow();
989 } catch (IllegalArgumentException success) {}
990 }
991
992 /**
993 * Constructor throws if workQueue is set to null
994 */
995 public void testConstructorNullPointerException4() {
996 try {
997 new CustomTPE(1, 2, 1L, SECONDS,
998 null,
999 new NoOpREHandler());
1000 shouldThrow();
1001 } catch (NullPointerException success) {}
1002 }
1003
1004 /**
1005 * Constructor throws if handler is set to null
1006 */
1007 public void testConstructorNullPointerException5() {
1008 try {
1009 new CustomTPE(1, 2, 1L, SECONDS,
1010 new ArrayBlockingQueue<Runnable>(10),
1011 (RejectedExecutionHandler) null);
1012 shouldThrow();
1013 } catch (NullPointerException success) {}
1014 }
1015
1016 /**
1017 * Constructor throws if corePoolSize argument is less than zero
1018 */
1019 public void testConstructor16() {
1020 try {
1021 new CustomTPE(-1, 1, 1L, SECONDS,
1022 new ArrayBlockingQueue<Runnable>(10),
1023 new SimpleThreadFactory(),
1024 new NoOpREHandler());
1025 shouldThrow();
1026 } catch (IllegalArgumentException success) {}
1027 }
1028
1029 /**
1030 * Constructor throws if maximumPoolSize is less than zero
1031 */
1032 public void testConstructor17() {
1033 try {
1034 new CustomTPE(1, -1, 1L, SECONDS,
1035 new ArrayBlockingQueue<Runnable>(10),
1036 new SimpleThreadFactory(),
1037 new NoOpREHandler());
1038 shouldThrow();
1039 } catch (IllegalArgumentException success) {}
1040 }
1041
1042 /**
1043 * Constructor throws if maximumPoolSize is equal to zero
1044 */
1045 public void testConstructor18() {
1046 try {
1047 new CustomTPE(1, 0, 1L, SECONDS,
1048 new ArrayBlockingQueue<Runnable>(10),
1049 new SimpleThreadFactory(),
1050 new NoOpREHandler());
1051 shouldThrow();
1052 } catch (IllegalArgumentException success) {}
1053 }
1054
1055 /**
1056 * Constructor throws if keepAliveTime is less than zero
1057 */
1058 public void testConstructor19() {
1059 try {
1060 new CustomTPE(1, 2, -1L, SECONDS,
1061 new ArrayBlockingQueue<Runnable>(10),
1062 new SimpleThreadFactory(),
1063 new NoOpREHandler());
1064 shouldThrow();
1065 } catch (IllegalArgumentException success) {}
1066 }
1067
1068 /**
1069 * Constructor throws if corePoolSize is greater than the maximumPoolSize
1070 */
1071 public void testConstructor20() {
1072 try {
1073 new CustomTPE(2, 1, 1L, SECONDS,
1074 new ArrayBlockingQueue<Runnable>(10),
1075 new SimpleThreadFactory(),
1076 new NoOpREHandler());
1077 shouldThrow();
1078 } catch (IllegalArgumentException success) {}
1079 }
1080
1081 /**
1082 * Constructor throws if workQueue is null
1083 */
1084 public void testConstructorNullPointerException6() {
1085 try {
1086 new CustomTPE(1, 2, 1L, SECONDS,
1087 null,
1088 new SimpleThreadFactory(),
1089 new NoOpREHandler());
1090 shouldThrow();
1091 } catch (NullPointerException success) {}
1092 }
1093
1094 /**
1095 * Constructor throws if handler is null
1096 */
1097 public void testConstructorNullPointerException7() {
1098 try {
1099 new CustomTPE(1, 2, 1L, SECONDS,
1100 new ArrayBlockingQueue<Runnable>(10),
1101 new SimpleThreadFactory(),
1102 (RejectedExecutionHandler) null);
1103 shouldThrow();
1104 } catch (NullPointerException success) {}
1105 }
1106
1107 /**
1108 * Constructor throws if ThreadFactory is null
1109 */
1110 public void testConstructorNullPointerException8() {
1111 try {
1112 new CustomTPE(1, 2, 1L, SECONDS,
1113 new ArrayBlockingQueue<Runnable>(10),
1114 (ThreadFactory) null,
1115 new NoOpREHandler());
1116 shouldThrow();
1117 } catch (NullPointerException success) {}
1118 }
1119
1120 /**
1121 * execute throws RejectedExecutionException if saturated.
1122 */
1123 public void testSaturatedExecute() {
1124 ThreadPoolExecutor p =
1125 new CustomTPE(1, 1,
1126 LONG_DELAY_MS, MILLISECONDS,
1127 new ArrayBlockingQueue<Runnable>(1));
1128 final CountDownLatch done = new CountDownLatch(1);
1129 try {
1130 Runnable task = new CheckedRunnable() {
1131 public void realRun() throws InterruptedException {
1132 done.await();
1133 }};
1134 for (int i = 0; i < 2; ++i)
1135 p.execute(task);
1136 for (int i = 0; i < 2; ++i) {
1137 try {
1138 p.execute(task);
1139 shouldThrow();
1140 } catch (RejectedExecutionException success) {}
1141 assertTrue(p.getTaskCount() <= 2);
1142 }
1143 } finally {
1144 done.countDown();
1145 joinPool(p);
1146 }
1147 }
1148
1149 /**
1150 * executor using CallerRunsPolicy runs task if saturated.
1151 */
1152 public void testSaturatedExecute2() {
1153 RejectedExecutionHandler h = new CustomTPE.CallerRunsPolicy();
1154 ThreadPoolExecutor p = new CustomTPE(1, 1,
1155 LONG_DELAY_MS, MILLISECONDS,
1156 new ArrayBlockingQueue<Runnable>(1),
1157 h);
1158 try {
1159 TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5];
1160 for (int i = 0; i < tasks.length; ++i)
1161 tasks[i] = new TrackedNoOpRunnable();
1162 TrackedLongRunnable mr = new TrackedLongRunnable();
1163 p.execute(mr);
1164 for (int i = 0; i < tasks.length; ++i)
1165 p.execute(tasks[i]);
1166 for (int i = 1; i < tasks.length; ++i)
1167 assertTrue(tasks[i].done);
1168 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1169 } finally {
1170 joinPool(p);
1171 }
1172 }
1173
1174 /**
1175 * executor using DiscardPolicy drops task if saturated.
1176 */
1177 public void testSaturatedExecute3() {
1178 RejectedExecutionHandler h = new CustomTPE.DiscardPolicy();
1179 ThreadPoolExecutor p =
1180 new CustomTPE(1, 1,
1181 LONG_DELAY_MS, MILLISECONDS,
1182 new ArrayBlockingQueue<Runnable>(1),
1183 h);
1184 try {
1185 TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5];
1186 for (int i = 0; i < tasks.length; ++i)
1187 tasks[i] = new TrackedNoOpRunnable();
1188 p.execute(new TrackedLongRunnable());
1189 for (TrackedNoOpRunnable task : tasks)
1190 p.execute(task);
1191 for (TrackedNoOpRunnable task : tasks)
1192 assertFalse(task.done);
1193 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1194 } finally {
1195 joinPool(p);
1196 }
1197 }
1198
1199 /**
1200 * executor using DiscardOldestPolicy drops oldest task if saturated.
1201 */
1202 public void testSaturatedExecute4() {
1203 RejectedExecutionHandler h = new CustomTPE.DiscardOldestPolicy();
1204 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1205 try {
1206 p.execute(new TrackedLongRunnable());
1207 TrackedLongRunnable r2 = new TrackedLongRunnable();
1208 p.execute(r2);
1209 assertTrue(p.getQueue().contains(r2));
1210 TrackedNoOpRunnable r3 = new TrackedNoOpRunnable();
1211 p.execute(r3);
1212 assertFalse(p.getQueue().contains(r2));
1213 assertTrue(p.getQueue().contains(r3));
1214 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1215 } finally {
1216 joinPool(p);
1217 }
1218 }
1219
1220 /**
1221 * execute throws RejectedExecutionException if shutdown
1222 */
1223 public void testRejectedExecutionExceptionOnShutdown() {
1224 ThreadPoolExecutor p =
1225 new CustomTPE(1,1,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(1));
1226 try { p.shutdown(); } catch (SecurityException ok) { return; }
1227 try {
1228 p.execute(new NoOpRunnable());
1229 shouldThrow();
1230 } catch (RejectedExecutionException success) {}
1231
1232 joinPool(p);
1233 }
1234
1235 /**
1236 * execute using CallerRunsPolicy drops task on shutdown
1237 */
1238 public void testCallerRunsOnShutdown() {
1239 RejectedExecutionHandler h = new CustomTPE.CallerRunsPolicy();
1240 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1241
1242 try { p.shutdown(); } catch (SecurityException ok) { return; }
1243 try {
1244 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1245 p.execute(r);
1246 assertFalse(r.done);
1247 } finally {
1248 joinPool(p);
1249 }
1250 }
1251
1252 /**
1253 * execute using DiscardPolicy drops task on shutdown
1254 */
1255 public void testDiscardOnShutdown() {
1256 RejectedExecutionHandler h = new CustomTPE.DiscardPolicy();
1257 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1258
1259 try { p.shutdown(); } catch (SecurityException ok) { return; }
1260 try {
1261 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1262 p.execute(r);
1263 assertFalse(r.done);
1264 } finally {
1265 joinPool(p);
1266 }
1267 }
1268
1269 /**
1270 * execute using DiscardOldestPolicy drops task on shutdown
1271 */
1272 public void testDiscardOldestOnShutdown() {
1273 RejectedExecutionHandler h = new CustomTPE.DiscardOldestPolicy();
1274 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1275
1276 try { p.shutdown(); } catch (SecurityException ok) { return; }
1277 try {
1278 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1279 p.execute(r);
1280 assertFalse(r.done);
1281 } finally {
1282 joinPool(p);
1283 }
1284 }
1285
1286 /**
1287 * execute(null) throws NPE
1288 */
1289 public void testExecuteNull() {
1290 ThreadPoolExecutor p =
1291 new CustomTPE(1, 2, 1L, SECONDS,
1292 new ArrayBlockingQueue<Runnable>(10));
1293 try {
1294 p.execute(null);
1295 shouldThrow();
1296 } catch (NullPointerException success) {}
1297
1298 joinPool(p);
1299 }
1300
1301 /**
1302 * setCorePoolSize of negative value throws IllegalArgumentException
1303 */
1304 public void testCorePoolSizeIllegalArgumentException() {
1305 ThreadPoolExecutor p =
1306 new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1307 try {
1308 p.setCorePoolSize(-1);
1309 shouldThrow();
1310 } catch (IllegalArgumentException success) {
1311 } finally {
1312 try { p.shutdown(); } catch (SecurityException ok) { return; }
1313 }
1314 joinPool(p);
1315 }
1316
1317 /**
1318 * setMaximumPoolSize(int) throws IllegalArgumentException
1319 * if given a value less the core pool size
1320 */
1321 public void testMaximumPoolSizeIllegalArgumentException() {
1322 ThreadPoolExecutor p =
1323 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1324 try {
1325 p.setMaximumPoolSize(1);
1326 shouldThrow();
1327 } catch (IllegalArgumentException success) {
1328 } finally {
1329 try { p.shutdown(); } catch (SecurityException ok) { return; }
1330 }
1331 joinPool(p);
1332 }
1333
1334 /**
1335 * setMaximumPoolSize throws IllegalArgumentException
1336 * if given a negative value
1337 */
1338 public void testMaximumPoolSizeIllegalArgumentException2() {
1339 ThreadPoolExecutor p =
1340 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1341 try {
1342 p.setMaximumPoolSize(-1);
1343 shouldThrow();
1344 } catch (IllegalArgumentException success) {
1345 } finally {
1346 try { p.shutdown(); } catch (SecurityException ok) { return; }
1347 }
1348 joinPool(p);
1349 }
1350
1351 /**
1352 * setKeepAliveTime throws IllegalArgumentException
1353 * when given a negative value
1354 */
1355 public void testKeepAliveTimeIllegalArgumentException() {
1356 ThreadPoolExecutor p =
1357 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1358
1359 try {
1360 p.setKeepAliveTime(-1,MILLISECONDS);
1361 shouldThrow();
1362 } catch (IllegalArgumentException success) {
1363 } finally {
1364 try { p.shutdown(); } catch (SecurityException ok) { return; }
1365 }
1366 joinPool(p);
1367 }
1368
1369 /**
1370 * terminated() is called on termination
1371 */
1372 public void testTerminated() {
1373 CustomTPE p = new CustomTPE();
1374 try { p.shutdown(); } catch (SecurityException ok) { return; }
1375 assertTrue(p.terminatedCalled());
1376 joinPool(p);
1377 }
1378
1379 /**
1380 * beforeExecute and afterExecute are called when executing task
1381 */
1382 public void testBeforeAfter() throws InterruptedException {
1383 CustomTPE p = new CustomTPE();
1384 try {
1385 final CountDownLatch done = new CountDownLatch(1);
1386 p.execute(new CheckedRunnable() {
1387 public void realRun() {
1388 done.countDown();
1389 }});
1390 await(p.afterCalled);
1391 assertEquals(0, done.getCount());
1392 assertTrue(p.afterCalled());
1393 assertTrue(p.beforeCalled());
1394 try { p.shutdown(); } catch (SecurityException ok) { return; }
1395 } finally {
1396 joinPool(p);
1397 }
1398 }
1399
1400 /**
1401 * completed submit of callable returns result
1402 */
1403 public void testSubmitCallable() throws Exception {
1404 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1405 try {
1406 Future<String> future = e.submit(new StringTask());
1407 String result = future.get();
1408 assertSame(TEST_STRING, result);
1409 } finally {
1410 joinPool(e);
1411 }
1412 }
1413
1414 /**
1415 * completed submit of runnable returns successfully
1416 */
1417 public void testSubmitRunnable() throws Exception {
1418 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1419 try {
1420 Future<?> future = e.submit(new NoOpRunnable());
1421 future.get();
1422 assertTrue(future.isDone());
1423 } finally {
1424 joinPool(e);
1425 }
1426 }
1427
1428 /**
1429 * completed submit of (runnable, result) returns result
1430 */
1431 public void testSubmitRunnable2() throws Exception {
1432 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1433 try {
1434 Future<String> future = e.submit(new NoOpRunnable(), TEST_STRING);
1435 String result = future.get();
1436 assertSame(TEST_STRING, result);
1437 } finally {
1438 joinPool(e);
1439 }
1440 }
1441
1442 /**
1443 * invokeAny(null) throws NPE
1444 */
1445 public void testInvokeAny1() throws Exception {
1446 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1447 try {
1448 e.invokeAny(null);
1449 shouldThrow();
1450 } catch (NullPointerException success) {
1451 } finally {
1452 joinPool(e);
1453 }
1454 }
1455
1456 /**
1457 * invokeAny(empty collection) throws IAE
1458 */
1459 public void testInvokeAny2() throws Exception {
1460 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1461 try {
1462 e.invokeAny(new ArrayList<Callable<String>>());
1463 shouldThrow();
1464 } catch (IllegalArgumentException success) {
1465 } finally {
1466 joinPool(e);
1467 }
1468 }
1469
1470 /**
1471 * invokeAny(c) throws NPE if c has null elements
1472 */
1473 public void testInvokeAny3() throws Exception {
1474 CountDownLatch latch = new CountDownLatch(1);
1475 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1476 List<Callable<String>> l = new ArrayList<Callable<String>>();
1477 l.add(latchAwaitingStringTask(latch));
1478 l.add(null);
1479 try {
1480 e.invokeAny(l);
1481 shouldThrow();
1482 } catch (NullPointerException success) {
1483 } finally {
1484 latch.countDown();
1485 joinPool(e);
1486 }
1487 }
1488
1489 /**
1490 * invokeAny(c) throws ExecutionException if no task completes
1491 */
1492 public void testInvokeAny4() throws Exception {
1493 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1494 List<Callable<String>> l = new ArrayList<Callable<String>>();
1495 l.add(new NPETask());
1496 try {
1497 e.invokeAny(l);
1498 shouldThrow();
1499 } catch (ExecutionException success) {
1500 assertTrue(success.getCause() instanceof NullPointerException);
1501 } finally {
1502 joinPool(e);
1503 }
1504 }
1505
1506 /**
1507 * invokeAny(c) returns result of some task
1508 */
1509 public void testInvokeAny5() throws Exception {
1510 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1511 try {
1512 List<Callable<String>> l = new ArrayList<Callable<String>>();
1513 l.add(new StringTask());
1514 l.add(new StringTask());
1515 String result = e.invokeAny(l);
1516 assertSame(TEST_STRING, result);
1517 } finally {
1518 joinPool(e);
1519 }
1520 }
1521
1522 /**
1523 * invokeAll(null) throws NPE
1524 */
1525 public void testInvokeAll1() throws Exception {
1526 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1527 try {
1528 e.invokeAll(null);
1529 shouldThrow();
1530 } catch (NullPointerException success) {
1531 } finally {
1532 joinPool(e);
1533 }
1534 }
1535
1536 /**
1537 * invokeAll(empty collection) returns empty collection
1538 */
1539 public void testInvokeAll2() throws Exception {
1540 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1541 try {
1542 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>());
1543 assertTrue(r.isEmpty());
1544 } finally {
1545 joinPool(e);
1546 }
1547 }
1548
1549 /**
1550 * invokeAll(c) throws NPE if c has null elements
1551 */
1552 public void testInvokeAll3() throws Exception {
1553 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1554 List<Callable<String>> l = new ArrayList<Callable<String>>();
1555 l.add(new StringTask());
1556 l.add(null);
1557 try {
1558 e.invokeAll(l);
1559 shouldThrow();
1560 } catch (NullPointerException success) {
1561 } finally {
1562 joinPool(e);
1563 }
1564 }
1565
1566 /**
1567 * get of element of invokeAll(c) throws exception on failed task
1568 */
1569 public void testInvokeAll4() throws Exception {
1570 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1571 List<Callable<String>> l = new ArrayList<Callable<String>>();
1572 l.add(new NPETask());
1573 List<Future<String>> futures = e.invokeAll(l);
1574 assertEquals(1, futures.size());
1575 try {
1576 futures.get(0).get();
1577 shouldThrow();
1578 } catch (ExecutionException success) {
1579 assertTrue(success.getCause() instanceof NullPointerException);
1580 } finally {
1581 joinPool(e);
1582 }
1583 }
1584
1585 /**
1586 * invokeAll(c) returns results of all completed tasks
1587 */
1588 public void testInvokeAll5() throws Exception {
1589 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1590 try {
1591 List<Callable<String>> l = new ArrayList<Callable<String>>();
1592 l.add(new StringTask());
1593 l.add(new StringTask());
1594 List<Future<String>> futures = e.invokeAll(l);
1595 assertEquals(2, futures.size());
1596 for (Future<String> future : futures)
1597 assertSame(TEST_STRING, future.get());
1598 } finally {
1599 joinPool(e);
1600 }
1601 }
1602
1603 /**
1604 * timed invokeAny(null) throws NPE
1605 */
1606 public void testTimedInvokeAny1() throws Exception {
1607 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1608 try {
1609 e.invokeAny(null, MEDIUM_DELAY_MS, MILLISECONDS);
1610 shouldThrow();
1611 } catch (NullPointerException success) {
1612 } finally {
1613 joinPool(e);
1614 }
1615 }
1616
1617 /**
1618 * timed invokeAny(,,null) throws NPE
1619 */
1620 public void testTimedInvokeAnyNullTimeUnit() throws Exception {
1621 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1622 List<Callable<String>> l = new ArrayList<Callable<String>>();
1623 l.add(new StringTask());
1624 try {
1625 e.invokeAny(l, MEDIUM_DELAY_MS, null);
1626 shouldThrow();
1627 } catch (NullPointerException success) {
1628 } finally {
1629 joinPool(e);
1630 }
1631 }
1632
1633 /**
1634 * timed invokeAny(empty collection) throws IAE
1635 */
1636 public void testTimedInvokeAny2() throws Exception {
1637 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1638 try {
1639 e.invokeAny(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1640 shouldThrow();
1641 } catch (IllegalArgumentException success) {
1642 } finally {
1643 joinPool(e);
1644 }
1645 }
1646
1647 /**
1648 * timed invokeAny(c) throws NPE if c has null elements
1649 */
1650 public void testTimedInvokeAny3() throws Exception {
1651 CountDownLatch latch = new CountDownLatch(1);
1652 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1653 List<Callable<String>> l = new ArrayList<Callable<String>>();
1654 l.add(latchAwaitingStringTask(latch));
1655 l.add(null);
1656 try {
1657 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1658 shouldThrow();
1659 } catch (NullPointerException success) {
1660 } finally {
1661 latch.countDown();
1662 joinPool(e);
1663 }
1664 }
1665
1666 /**
1667 * timed invokeAny(c) throws ExecutionException if no task completes
1668 */
1669 public void testTimedInvokeAny4() throws Exception {
1670 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1671 List<Callable<String>> l = new ArrayList<Callable<String>>();
1672 l.add(new NPETask());
1673 try {
1674 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1675 shouldThrow();
1676 } catch (ExecutionException success) {
1677 assertTrue(success.getCause() instanceof NullPointerException);
1678 } finally {
1679 joinPool(e);
1680 }
1681 }
1682
1683 /**
1684 * timed invokeAny(c) returns result of some task
1685 */
1686 public void testTimedInvokeAny5() throws Exception {
1687 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1688 try {
1689 List<Callable<String>> l = new ArrayList<Callable<String>>();
1690 l.add(new StringTask());
1691 l.add(new StringTask());
1692 String result = e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1693 assertSame(TEST_STRING, result);
1694 } finally {
1695 joinPool(e);
1696 }
1697 }
1698
1699 /**
1700 * timed invokeAll(null) throws NPE
1701 */
1702 public void testTimedInvokeAll1() throws Exception {
1703 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1704 try {
1705 e.invokeAll(null, MEDIUM_DELAY_MS, MILLISECONDS);
1706 shouldThrow();
1707 } catch (NullPointerException success) {
1708 } finally {
1709 joinPool(e);
1710 }
1711 }
1712
1713 /**
1714 * timed invokeAll(,,null) throws NPE
1715 */
1716 public void testTimedInvokeAllNullTimeUnit() throws Exception {
1717 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1718 List<Callable<String>> l = new ArrayList<Callable<String>>();
1719 l.add(new StringTask());
1720 try {
1721 e.invokeAll(l, MEDIUM_DELAY_MS, null);
1722 shouldThrow();
1723 } catch (NullPointerException success) {
1724 } finally {
1725 joinPool(e);
1726 }
1727 }
1728
1729 /**
1730 * timed invokeAll(empty collection) returns empty collection
1731 */
1732 public void testTimedInvokeAll2() throws Exception {
1733 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1734 try {
1735 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1736 assertTrue(r.isEmpty());
1737 } finally {
1738 joinPool(e);
1739 }
1740 }
1741
1742 /**
1743 * timed invokeAll(c) throws NPE if c has null elements
1744 */
1745 public void testTimedInvokeAll3() throws Exception {
1746 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1747 List<Callable<String>> l = new ArrayList<Callable<String>>();
1748 l.add(new StringTask());
1749 l.add(null);
1750 try {
1751 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1752 shouldThrow();
1753 } catch (NullPointerException success) {
1754 } finally {
1755 joinPool(e);
1756 }
1757 }
1758
1759 /**
1760 * get of element of invokeAll(c) throws exception on failed task
1761 */
1762 public void testTimedInvokeAll4() throws Exception {
1763 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1764 List<Callable<String>> l = new ArrayList<Callable<String>>();
1765 l.add(new NPETask());
1766 List<Future<String>> futures =
1767 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1768 assertEquals(1, futures.size());
1769 try {
1770 futures.get(0).get();
1771 shouldThrow();
1772 } catch (ExecutionException success) {
1773 assertTrue(success.getCause() instanceof NullPointerException);
1774 } finally {
1775 joinPool(e);
1776 }
1777 }
1778
1779 /**
1780 * timed invokeAll(c) returns results of all completed tasks
1781 */
1782 public void testTimedInvokeAll5() throws Exception {
1783 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1784 try {
1785 List<Callable<String>> l = new ArrayList<Callable<String>>();
1786 l.add(new StringTask());
1787 l.add(new StringTask());
1788 List<Future<String>> futures =
1789 e.invokeAll(l, LONG_DELAY_MS, MILLISECONDS);
1790 assertEquals(2, futures.size());
1791 for (Future<String> future : futures)
1792 assertSame(TEST_STRING, future.get());
1793 } finally {
1794 joinPool(e);
1795 }
1796 }
1797
1798 /**
1799 * timed invokeAll(c) cancels tasks not completed by timeout
1800 */
1801 public void testTimedInvokeAll6() throws Exception {
1802 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1803 try {
1804 for (long timeout = timeoutMillis();;) {
1805 List<Callable<String>> tasks = new ArrayList<>();
1806 tasks.add(new StringTask("0"));
1807 tasks.add(Executors.callable(new LongPossiblyInterruptedRunnable(), TEST_STRING));
1808 tasks.add(new StringTask("2"));
1809 long startTime = System.nanoTime();
1810 List<Future<String>> futures =
1811 e.invokeAll(tasks, timeout, MILLISECONDS);
1812 assertEquals(tasks.size(), futures.size());
1813 assertTrue(millisElapsedSince(startTime) >= timeout);
1814 for (Future future : futures)
1815 assertTrue(future.isDone());
1816 assertTrue(futures.get(1).isCancelled());
1817 try {
1818 assertEquals("0", futures.get(0).get());
1819 assertEquals("2", futures.get(2).get());
1820 break;
1821 } catch (CancellationException retryWithLongerTimeout) {
1822 timeout *= 2;
1823 if (timeout >= LONG_DELAY_MS / 2)
1824 fail("expected exactly one task to be cancelled");
1825 }
1826 }
1827 } finally {
1828 joinPool(e);
1829 }
1830 }
1831
1832 /**
1833 * Execution continues if there is at least one thread even if
1834 * thread factory fails to create more
1835 */
1836 public void testFailingThreadFactory() throws InterruptedException {
1837 final ExecutorService e =
1838 new CustomTPE(100, 100,
1839 LONG_DELAY_MS, MILLISECONDS,
1840 new LinkedBlockingQueue<Runnable>(),
1841 new FailingThreadFactory());
1842 try {
1843 final int TASKS = 100;
1844 final CountDownLatch done = new CountDownLatch(TASKS);
1845 for (int k = 0; k < TASKS; ++k)
1846 e.execute(new CheckedRunnable() {
1847 public void realRun() {
1848 done.countDown();
1849 }});
1850 assertTrue(done.await(LONG_DELAY_MS, MILLISECONDS));
1851 } finally {
1852 joinPool(e);
1853 }
1854 }
1855
1856 /**
1857 * allowsCoreThreadTimeOut is by default false.
1858 */
1859 public void testAllowsCoreThreadTimeOut() {
1860 ThreadPoolExecutor p = new CustomTPE(2, 2, 1000, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1861 assertFalse(p.allowsCoreThreadTimeOut());
1862 joinPool(p);
1863 }
1864
1865 /**
1866 * allowCoreThreadTimeOut(true) causes idle threads to time out
1867 */
1868 public void testAllowCoreThreadTimeOut_true() throws Exception {
1869 long keepAliveTime = timeoutMillis();
1870 final ThreadPoolExecutor p =
1871 new CustomTPE(2, 10,
1872 keepAliveTime, MILLISECONDS,
1873 new ArrayBlockingQueue<Runnable>(10));
1874 final CountDownLatch threadStarted = new CountDownLatch(1);
1875 try {
1876 p.allowCoreThreadTimeOut(true);
1877 p.execute(new CheckedRunnable() {
1878 public void realRun() {
1879 threadStarted.countDown();
1880 assertEquals(1, p.getPoolSize());
1881 }});
1882 await(threadStarted);
1883 delay(keepAliveTime);
1884 long startTime = System.nanoTime();
1885 while (p.getPoolSize() > 0
1886 && millisElapsedSince(startTime) < LONG_DELAY_MS)
1887 Thread.yield();
1888 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1889 assertEquals(0, p.getPoolSize());
1890 } finally {
1891 joinPool(p);
1892 }
1893 }
1894
1895 /**
1896 * allowCoreThreadTimeOut(false) causes idle threads not to time out
1897 */
1898 public void testAllowCoreThreadTimeOut_false() throws Exception {
1899 long keepAliveTime = timeoutMillis();
1900 final ThreadPoolExecutor p =
1901 new CustomTPE(2, 10,
1902 keepAliveTime, MILLISECONDS,
1903 new ArrayBlockingQueue<Runnable>(10));
1904 final CountDownLatch threadStarted = new CountDownLatch(1);
1905 try {
1906 p.allowCoreThreadTimeOut(false);
1907 p.execute(new CheckedRunnable() {
1908 public void realRun() throws InterruptedException {
1909 threadStarted.countDown();
1910 assertTrue(p.getPoolSize() >= 1);
1911 }});
1912 delay(2 * keepAliveTime);
1913 assertTrue(p.getPoolSize() >= 1);
1914 } finally {
1915 joinPool(p);
1916 }
1917 }
1918
1919 /**
1920 * get(cancelled task) throws CancellationException
1921 * (in part, a test of CustomTPE itself)
1922 */
1923 public void testGet_cancelled() throws Exception {
1924 final ExecutorService e =
1925 new CustomTPE(1, 1,
1926 LONG_DELAY_MS, MILLISECONDS,
1927 new LinkedBlockingQueue<Runnable>());
1928 try {
1929 final CountDownLatch blockerStarted = new CountDownLatch(1);
1930 final CountDownLatch done = new CountDownLatch(1);
1931 final List<Future<?>> futures = new ArrayList<>();
1932 for (int i = 0; i < 2; i++) {
1933 Runnable r = new CheckedRunnable() { public void realRun()
1934 throws Throwable {
1935 blockerStarted.countDown();
1936 assertTrue(done.await(2 * LONG_DELAY_MS, MILLISECONDS));
1937 }};
1938 futures.add(e.submit(r));
1939 }
1940 assertTrue(blockerStarted.await(LONG_DELAY_MS, MILLISECONDS));
1941 for (Future<?> future : futures) future.cancel(false);
1942 for (Future<?> future : futures) {
1943 try {
1944 future.get();
1945 shouldThrow();
1946 } catch (CancellationException success) {}
1947 try {
1948 future.get(LONG_DELAY_MS, MILLISECONDS);
1949 shouldThrow();
1950 } catch (CancellationException success) {}
1951 assertTrue(future.isCancelled());
1952 assertTrue(future.isDone());
1953 }
1954 done.countDown();
1955 } finally {
1956 joinPool(e);
1957 }
1958 }
1959
1960 }