ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/ThreadPoolExecutorSubclassTest.java
Revision: 1.53
Committed: Sun Oct 4 01:23:41 2015 UTC (8 years, 7 months ago) by jsr166
Branch: MAIN
Changes since 1.52: +16 -7 lines
Log Message:
improve testPrestartAllCoreThreads

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include Andrew Wright, Jeffrey Hayes,
6 * Pat Fisher, Mike Judd.
7 */
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.SECONDS;
11
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.concurrent.ArrayBlockingQueue;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.CancellationException;
18 import java.util.concurrent.CountDownLatch;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.FutureTask;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.RejectedExecutionHandler;
27 import java.util.concurrent.RunnableFuture;
28 import java.util.concurrent.SynchronousQueue;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.TimeoutException;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.locks.Condition;
35 import java.util.concurrent.locks.ReentrantLock;
36
37 import junit.framework.Test;
38 import junit.framework.TestSuite;
39
40 public class ThreadPoolExecutorSubclassTest extends JSR166TestCase {
41 public static void main(String[] args) {
42 main(suite(), args);
43 }
44 public static Test suite() {
45 return new TestSuite(ThreadPoolExecutorSubclassTest.class);
46 }
47
48 static class CustomTask<V> implements RunnableFuture<V> {
49 final Callable<V> callable;
50 final ReentrantLock lock = new ReentrantLock();
51 final Condition cond = lock.newCondition();
52 boolean done;
53 boolean cancelled;
54 V result;
55 Thread thread;
56 Exception exception;
57 CustomTask(Callable<V> c) {
58 if (c == null) throw new NullPointerException();
59 callable = c;
60 }
61 CustomTask(final Runnable r, final V res) {
62 if (r == null) throw new NullPointerException();
63 callable = new Callable<V>() {
64 public V call() throws Exception { r.run(); return res; }};
65 }
66 public boolean isDone() {
67 lock.lock(); try { return done; } finally { lock.unlock() ; }
68 }
69 public boolean isCancelled() {
70 lock.lock(); try { return cancelled; } finally { lock.unlock() ; }
71 }
72 public boolean cancel(boolean mayInterrupt) {
73 lock.lock();
74 try {
75 if (!done) {
76 cancelled = true;
77 done = true;
78 if (mayInterrupt && thread != null)
79 thread.interrupt();
80 return true;
81 }
82 return false;
83 }
84 finally { lock.unlock() ; }
85 }
86 public void run() {
87 lock.lock();
88 try {
89 if (done)
90 return;
91 thread = Thread.currentThread();
92 }
93 finally { lock.unlock() ; }
94 V v = null;
95 Exception e = null;
96 try {
97 v = callable.call();
98 }
99 catch (Exception ex) {
100 e = ex;
101 }
102 lock.lock();
103 try {
104 if (!done) {
105 result = v;
106 exception = e;
107 done = true;
108 thread = null;
109 cond.signalAll();
110 }
111 }
112 finally { lock.unlock(); }
113 }
114 public V get() throws InterruptedException, ExecutionException {
115 lock.lock();
116 try {
117 while (!done)
118 cond.await();
119 if (cancelled)
120 throw new CancellationException();
121 if (exception != null)
122 throw new ExecutionException(exception);
123 return result;
124 }
125 finally { lock.unlock(); }
126 }
127 public V get(long timeout, TimeUnit unit)
128 throws InterruptedException, ExecutionException, TimeoutException {
129 long nanos = unit.toNanos(timeout);
130 lock.lock();
131 try {
132 while (!done) {
133 if (nanos <= 0L)
134 throw new TimeoutException();
135 nanos = cond.awaitNanos(nanos);
136 }
137 if (cancelled)
138 throw new CancellationException();
139 if (exception != null)
140 throw new ExecutionException(exception);
141 return result;
142 }
143 finally { lock.unlock(); }
144 }
145 }
146
147 static class CustomTPE extends ThreadPoolExecutor {
148 protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
149 return new CustomTask<V>(c);
150 }
151 protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
152 return new CustomTask<V>(r, v);
153 }
154
155 CustomTPE(int corePoolSize,
156 int maximumPoolSize,
157 long keepAliveTime,
158 TimeUnit unit,
159 BlockingQueue<Runnable> workQueue) {
160 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
161 workQueue);
162 }
163 CustomTPE(int corePoolSize,
164 int maximumPoolSize,
165 long keepAliveTime,
166 TimeUnit unit,
167 BlockingQueue<Runnable> workQueue,
168 ThreadFactory threadFactory) {
169 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
170 threadFactory);
171 }
172
173 CustomTPE(int corePoolSize,
174 int maximumPoolSize,
175 long keepAliveTime,
176 TimeUnit unit,
177 BlockingQueue<Runnable> workQueue,
178 RejectedExecutionHandler handler) {
179 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
180 handler);
181 }
182 CustomTPE(int corePoolSize,
183 int maximumPoolSize,
184 long keepAliveTime,
185 TimeUnit unit,
186 BlockingQueue<Runnable> workQueue,
187 ThreadFactory threadFactory,
188 RejectedExecutionHandler handler) {
189 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
190 workQueue, threadFactory, handler);
191 }
192
193 final CountDownLatch beforeCalled = new CountDownLatch(1);
194 final CountDownLatch afterCalled = new CountDownLatch(1);
195 final CountDownLatch terminatedCalled = new CountDownLatch(1);
196
197 public CustomTPE() {
198 super(1, 1, LONG_DELAY_MS, MILLISECONDS, new SynchronousQueue<Runnable>());
199 }
200 protected void beforeExecute(Thread t, Runnable r) {
201 beforeCalled.countDown();
202 }
203 protected void afterExecute(Runnable r, Throwable t) {
204 afterCalled.countDown();
205 }
206 protected void terminated() {
207 terminatedCalled.countDown();
208 }
209
210 public boolean beforeCalled() {
211 return beforeCalled.getCount() == 0;
212 }
213 public boolean afterCalled() {
214 return afterCalled.getCount() == 0;
215 }
216 public boolean terminatedCalled() {
217 return terminatedCalled.getCount() == 0;
218 }
219 }
220
221 static class FailingThreadFactory implements ThreadFactory {
222 int calls = 0;
223 public Thread newThread(Runnable r) {
224 if (++calls > 1) return null;
225 return new Thread(r);
226 }
227 }
228
229 /**
230 * execute successfully executes a runnable
231 */
232 public void testExecute() throws InterruptedException {
233 final ThreadPoolExecutor p =
234 new CustomTPE(1, 1,
235 2 * LONG_DELAY_MS, MILLISECONDS,
236 new ArrayBlockingQueue<Runnable>(10));
237 try (PoolCleaner cleaner = cleaner(p)) {
238 final CountDownLatch done = new CountDownLatch(1);
239 final Runnable task = new CheckedRunnable() {
240 public void realRun() { done.countDown(); }};
241 p.execute(task);
242 assertTrue(done.await(LONG_DELAY_MS, MILLISECONDS));
243 }
244 }
245
246 /**
247 * getActiveCount increases but doesn't overestimate, when a
248 * thread becomes active
249 */
250 public void testGetActiveCount() throws InterruptedException {
251 final ThreadPoolExecutor p =
252 new CustomTPE(2, 2,
253 LONG_DELAY_MS, MILLISECONDS,
254 new ArrayBlockingQueue<Runnable>(10));
255 final CountDownLatch threadStarted = new CountDownLatch(1);
256 final CountDownLatch done = new CountDownLatch(1);
257 try (PoolCleaner cleaner = cleaner(p)) {
258 assertEquals(0, p.getActiveCount());
259 p.execute(new CheckedRunnable() {
260 public void realRun() throws InterruptedException {
261 threadStarted.countDown();
262 assertEquals(1, p.getActiveCount());
263 done.await();
264 }});
265 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
266 assertEquals(1, p.getActiveCount());
267 done.countDown();
268 }
269 }
270
271 /**
272 * prestartCoreThread starts a thread if under corePoolSize, else doesn't
273 */
274 public void testPrestartCoreThread() {
275 ThreadPoolExecutor p =
276 new CustomTPE(2, 6,
277 LONG_DELAY_MS, MILLISECONDS,
278 new ArrayBlockingQueue<Runnable>(10));
279 try (PoolCleaner cleaner = cleaner(p)) {
280 assertEquals(0, p.getPoolSize());
281 assertTrue(p.prestartCoreThread());
282 assertEquals(1, p.getPoolSize());
283 assertTrue(p.prestartCoreThread());
284 assertEquals(2, p.getPoolSize());
285 assertFalse(p.prestartCoreThread());
286 assertEquals(2, p.getPoolSize());
287 p.setCorePoolSize(4);
288 assertTrue(p.prestartCoreThread());
289 assertEquals(3, p.getPoolSize());
290 assertTrue(p.prestartCoreThread());
291 assertEquals(4, p.getPoolSize());
292 assertFalse(p.prestartCoreThread());
293 assertEquals(4, p.getPoolSize());
294 }
295 }
296
297 /**
298 * prestartAllCoreThreads starts all corePoolSize threads
299 */
300 public void testPrestartAllCoreThreads() {
301 ThreadPoolExecutor p =
302 new CustomTPE(2, 6,
303 LONG_DELAY_MS, MILLISECONDS,
304 new ArrayBlockingQueue<Runnable>(10));
305 try (PoolCleaner cleaner = cleaner(p)) {
306 assertEquals(0, p.getPoolSize());
307 p.prestartAllCoreThreads();
308 assertEquals(2, p.getPoolSize());
309 p.prestartAllCoreThreads();
310 assertEquals(2, p.getPoolSize());
311 p.setCorePoolSize(4);
312 p.prestartAllCoreThreads();
313 assertEquals(4, p.getPoolSize());
314 p.prestartAllCoreThreads();
315 assertEquals(4, p.getPoolSize());
316 }
317 }
318
319 /**
320 * getCompletedTaskCount increases, but doesn't overestimate,
321 * when tasks complete
322 */
323 public void testGetCompletedTaskCount() throws InterruptedException {
324 final ThreadPoolExecutor p =
325 new CustomTPE(2, 2,
326 LONG_DELAY_MS, MILLISECONDS,
327 new ArrayBlockingQueue<Runnable>(10));
328 final CountDownLatch threadStarted = new CountDownLatch(1);
329 final CountDownLatch threadProceed = new CountDownLatch(1);
330 final CountDownLatch threadDone = new CountDownLatch(1);
331 try {
332 assertEquals(0, p.getCompletedTaskCount());
333 p.execute(new CheckedRunnable() {
334 public void realRun() throws InterruptedException {
335 threadStarted.countDown();
336 assertEquals(0, p.getCompletedTaskCount());
337 threadProceed.await();
338 threadDone.countDown();
339 }});
340 await(threadStarted);
341 assertEquals(0, p.getCompletedTaskCount());
342 threadProceed.countDown();
343 threadDone.await();
344 long startTime = System.nanoTime();
345 while (p.getCompletedTaskCount() != 1) {
346 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
347 fail("timed out");
348 Thread.yield();
349 }
350 } finally {
351 joinPool(p);
352 }
353 }
354
355 /**
356 * getCorePoolSize returns size given in constructor if not otherwise set
357 */
358 public void testGetCorePoolSize() {
359 ThreadPoolExecutor p = new CustomTPE(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
360 assertEquals(1, p.getCorePoolSize());
361 joinPool(p);
362 }
363
364 /**
365 * getKeepAliveTime returns value given in constructor if not otherwise set
366 */
367 public void testGetKeepAliveTime() {
368 ThreadPoolExecutor p = new CustomTPE(2, 2, 1000, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
369 assertEquals(1, p.getKeepAliveTime(SECONDS));
370 joinPool(p);
371 }
372
373 /**
374 * getThreadFactory returns factory in constructor if not set
375 */
376 public void testGetThreadFactory() {
377 ThreadFactory tf = new SimpleThreadFactory();
378 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), tf, new NoOpREHandler());
379 assertSame(tf, p.getThreadFactory());
380 joinPool(p);
381 }
382
383 /**
384 * setThreadFactory sets the thread factory returned by getThreadFactory
385 */
386 public void testSetThreadFactory() {
387 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
388 ThreadFactory tf = new SimpleThreadFactory();
389 p.setThreadFactory(tf);
390 assertSame(tf, p.getThreadFactory());
391 joinPool(p);
392 }
393
394 /**
395 * setThreadFactory(null) throws NPE
396 */
397 public void testSetThreadFactoryNull() {
398 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
399 try {
400 p.setThreadFactory(null);
401 shouldThrow();
402 } catch (NullPointerException success) {
403 } finally {
404 joinPool(p);
405 }
406 }
407
408 /**
409 * getRejectedExecutionHandler returns handler in constructor if not set
410 */
411 public void testGetRejectedExecutionHandler() {
412 RejectedExecutionHandler h = new NoOpREHandler();
413 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), h);
414 assertSame(h, p.getRejectedExecutionHandler());
415 joinPool(p);
416 }
417
418 /**
419 * setRejectedExecutionHandler sets the handler returned by
420 * getRejectedExecutionHandler
421 */
422 public void testSetRejectedExecutionHandler() {
423 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
424 RejectedExecutionHandler h = new NoOpREHandler();
425 p.setRejectedExecutionHandler(h);
426 assertSame(h, p.getRejectedExecutionHandler());
427 joinPool(p);
428 }
429
430 /**
431 * setRejectedExecutionHandler(null) throws NPE
432 */
433 public void testSetRejectedExecutionHandlerNull() {
434 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
435 try {
436 p.setRejectedExecutionHandler(null);
437 shouldThrow();
438 } catch (NullPointerException success) {
439 } finally {
440 joinPool(p);
441 }
442 }
443
444 /**
445 * getLargestPoolSize increases, but doesn't overestimate, when
446 * multiple threads active
447 */
448 public void testGetLargestPoolSize() throws InterruptedException {
449 final int THREADS = 3;
450 final ThreadPoolExecutor p =
451 new CustomTPE(THREADS, THREADS,
452 LONG_DELAY_MS, MILLISECONDS,
453 new ArrayBlockingQueue<Runnable>(10));
454 final CountDownLatch threadsStarted = new CountDownLatch(THREADS);
455 final CountDownLatch done = new CountDownLatch(1);
456 try {
457 assertEquals(0, p.getLargestPoolSize());
458 for (int i = 0; i < THREADS; i++)
459 p.execute(new CheckedRunnable() {
460 public void realRun() throws InterruptedException {
461 threadsStarted.countDown();
462 done.await();
463 assertEquals(THREADS, p.getLargestPoolSize());
464 }});
465 assertTrue(threadsStarted.await(SMALL_DELAY_MS, MILLISECONDS));
466 assertEquals(THREADS, p.getLargestPoolSize());
467 } finally {
468 done.countDown();
469 joinPool(p);
470 assertEquals(THREADS, p.getLargestPoolSize());
471 }
472 }
473
474 /**
475 * getMaximumPoolSize returns value given in constructor if not
476 * otherwise set
477 */
478 public void testGetMaximumPoolSize() {
479 ThreadPoolExecutor p = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
480 assertEquals(2, p.getMaximumPoolSize());
481 joinPool(p);
482 }
483
484 /**
485 * getPoolSize increases, but doesn't overestimate, when threads
486 * become active
487 */
488 public void testGetPoolSize() throws InterruptedException {
489 final ThreadPoolExecutor p =
490 new CustomTPE(1, 1,
491 LONG_DELAY_MS, MILLISECONDS,
492 new ArrayBlockingQueue<Runnable>(10));
493 final CountDownLatch threadStarted = new CountDownLatch(1);
494 final CountDownLatch done = new CountDownLatch(1);
495 try {
496 assertEquals(0, p.getPoolSize());
497 p.execute(new CheckedRunnable() {
498 public void realRun() throws InterruptedException {
499 threadStarted.countDown();
500 assertEquals(1, p.getPoolSize());
501 done.await();
502 }});
503 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
504 assertEquals(1, p.getPoolSize());
505 } finally {
506 done.countDown();
507 joinPool(p);
508 }
509 }
510
511 /**
512 * getTaskCount increases, but doesn't overestimate, when tasks submitted
513 */
514 public void testGetTaskCount() throws InterruptedException {
515 final ThreadPoolExecutor p =
516 new CustomTPE(1, 1,
517 LONG_DELAY_MS, MILLISECONDS,
518 new ArrayBlockingQueue<Runnable>(10));
519 final CountDownLatch threadStarted = new CountDownLatch(1);
520 final CountDownLatch done = new CountDownLatch(1);
521 try {
522 assertEquals(0, p.getTaskCount());
523 p.execute(new CheckedRunnable() {
524 public void realRun() throws InterruptedException {
525 threadStarted.countDown();
526 assertEquals(1, p.getTaskCount());
527 done.await();
528 }});
529 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
530 assertEquals(1, p.getTaskCount());
531 } finally {
532 done.countDown();
533 joinPool(p);
534 }
535 }
536
537 /**
538 * isShutdown is false before shutdown, true after
539 */
540 public void testIsShutdown() {
541
542 ThreadPoolExecutor p = new CustomTPE(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
543 assertFalse(p.isShutdown());
544 try { p.shutdown(); } catch (SecurityException ok) { return; }
545 assertTrue(p.isShutdown());
546 joinPool(p);
547 }
548
549 /**
550 * isTerminated is false before termination, true after
551 */
552 public void testIsTerminated() throws InterruptedException {
553 final ThreadPoolExecutor p =
554 new CustomTPE(1, 1,
555 LONG_DELAY_MS, MILLISECONDS,
556 new ArrayBlockingQueue<Runnable>(10));
557 final CountDownLatch threadStarted = new CountDownLatch(1);
558 final CountDownLatch done = new CountDownLatch(1);
559 try {
560 assertFalse(p.isTerminating());
561 p.execute(new CheckedRunnable() {
562 public void realRun() throws InterruptedException {
563 assertFalse(p.isTerminating());
564 threadStarted.countDown();
565 done.await();
566 }});
567 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
568 assertFalse(p.isTerminating());
569 done.countDown();
570 } finally {
571 try { p.shutdown(); } catch (SecurityException ok) { return; }
572 }
573 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
574 assertTrue(p.isTerminated());
575 assertFalse(p.isTerminating());
576 }
577
578 /**
579 * isTerminating is not true when running or when terminated
580 */
581 public void testIsTerminating() throws InterruptedException {
582 final ThreadPoolExecutor p =
583 new CustomTPE(1, 1,
584 LONG_DELAY_MS, MILLISECONDS,
585 new ArrayBlockingQueue<Runnable>(10));
586 final CountDownLatch threadStarted = new CountDownLatch(1);
587 final CountDownLatch done = new CountDownLatch(1);
588 try {
589 assertFalse(p.isTerminating());
590 p.execute(new CheckedRunnable() {
591 public void realRun() throws InterruptedException {
592 assertFalse(p.isTerminating());
593 threadStarted.countDown();
594 done.await();
595 }});
596 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
597 assertFalse(p.isTerminating());
598 done.countDown();
599 } finally {
600 try { p.shutdown(); } catch (SecurityException ok) { return; }
601 }
602 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
603 assertTrue(p.isTerminated());
604 assertFalse(p.isTerminating());
605 }
606
607 /**
608 * getQueue returns the work queue, which contains queued tasks
609 */
610 public void testGetQueue() throws InterruptedException {
611 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
612 final ThreadPoolExecutor p =
613 new CustomTPE(1, 1,
614 LONG_DELAY_MS, MILLISECONDS,
615 q);
616 final CountDownLatch threadStarted = new CountDownLatch(1);
617 final CountDownLatch done = new CountDownLatch(1);
618 try {
619 FutureTask[] tasks = new FutureTask[5];
620 for (int i = 0; i < tasks.length; i++) {
621 Callable task = new CheckedCallable<Boolean>() {
622 public Boolean realCall() throws InterruptedException {
623 threadStarted.countDown();
624 assertSame(q, p.getQueue());
625 done.await();
626 return Boolean.TRUE;
627 }};
628 tasks[i] = new FutureTask(task);
629 p.execute(tasks[i]);
630 }
631 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
632 assertSame(q, p.getQueue());
633 assertFalse(q.contains(tasks[0]));
634 assertTrue(q.contains(tasks[tasks.length - 1]));
635 assertEquals(tasks.length - 1, q.size());
636 } finally {
637 done.countDown();
638 joinPool(p);
639 }
640 }
641
642 /**
643 * remove(task) removes queued task, and fails to remove active task
644 */
645 public void testRemove() throws InterruptedException {
646 BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
647 final ThreadPoolExecutor p =
648 new CustomTPE(1, 1,
649 LONG_DELAY_MS, MILLISECONDS,
650 q);
651 Runnable[] tasks = new Runnable[6];
652 final CountDownLatch threadStarted = new CountDownLatch(1);
653 final CountDownLatch done = new CountDownLatch(1);
654 try {
655 for (int i = 0; i < tasks.length; i++) {
656 tasks[i] = new CheckedRunnable() {
657 public void realRun() throws InterruptedException {
658 threadStarted.countDown();
659 done.await();
660 }};
661 p.execute(tasks[i]);
662 }
663 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
664 assertFalse(p.remove(tasks[0]));
665 assertTrue(q.contains(tasks[4]));
666 assertTrue(q.contains(tasks[3]));
667 assertTrue(p.remove(tasks[4]));
668 assertFalse(p.remove(tasks[4]));
669 assertFalse(q.contains(tasks[4]));
670 assertTrue(q.contains(tasks[3]));
671 assertTrue(p.remove(tasks[3]));
672 assertFalse(q.contains(tasks[3]));
673 } finally {
674 done.countDown();
675 joinPool(p);
676 }
677 }
678
679 /**
680 * purge removes cancelled tasks from the queue
681 */
682 public void testPurge() throws InterruptedException {
683 final CountDownLatch threadStarted = new CountDownLatch(1);
684 final CountDownLatch done = new CountDownLatch(1);
685 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
686 final ThreadPoolExecutor p =
687 new CustomTPE(1, 1,
688 LONG_DELAY_MS, MILLISECONDS,
689 q);
690 FutureTask[] tasks = new FutureTask[5];
691 try {
692 for (int i = 0; i < tasks.length; i++) {
693 Callable task = new CheckedCallable<Boolean>() {
694 public Boolean realCall() throws InterruptedException {
695 threadStarted.countDown();
696 done.await();
697 return Boolean.TRUE;
698 }};
699 tasks[i] = new FutureTask(task);
700 p.execute(tasks[i]);
701 }
702 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
703 assertEquals(tasks.length, p.getTaskCount());
704 assertEquals(tasks.length - 1, q.size());
705 assertEquals(1L, p.getActiveCount());
706 assertEquals(0L, p.getCompletedTaskCount());
707 tasks[4].cancel(true);
708 tasks[3].cancel(false);
709 p.purge();
710 assertEquals(tasks.length - 3, q.size());
711 assertEquals(tasks.length - 2, p.getTaskCount());
712 p.purge(); // Nothing to do
713 assertEquals(tasks.length - 3, q.size());
714 assertEquals(tasks.length - 2, p.getTaskCount());
715 } finally {
716 done.countDown();
717 joinPool(p);
718 }
719 }
720
721 /**
722 * shutdownNow returns a list containing tasks that were not run,
723 * and those tasks are drained from the queue
724 */
725 public void testShutdownNow() throws InterruptedException {
726 final int poolSize = 2;
727 final int count = 5;
728 final AtomicInteger ran = new AtomicInteger(0);
729 ThreadPoolExecutor p =
730 new CustomTPE(poolSize, poolSize, LONG_DELAY_MS, MILLISECONDS,
731 new ArrayBlockingQueue<Runnable>(10));
732 CountDownLatch threadsStarted = new CountDownLatch(poolSize);
733 Runnable waiter = new CheckedRunnable() { public void realRun() {
734 threadsStarted.countDown();
735 try {
736 MILLISECONDS.sleep(2 * LONG_DELAY_MS);
737 } catch (InterruptedException success) {}
738 ran.getAndIncrement();
739 }};
740 for (int i = 0; i < count; i++)
741 p.execute(waiter);
742 assertTrue(threadsStarted.await(LONG_DELAY_MS, MILLISECONDS));
743 assertEquals(poolSize, p.getActiveCount());
744 assertEquals(0, p.getCompletedTaskCount());
745 final List<Runnable> queuedTasks;
746 try {
747 queuedTasks = p.shutdownNow();
748 } catch (SecurityException ok) {
749 return; // Allowed in case test doesn't have privs
750 }
751 assertTrue(p.isShutdown());
752 assertTrue(p.getQueue().isEmpty());
753 assertEquals(count - poolSize, queuedTasks.size());
754 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
755 assertTrue(p.isTerminated());
756 assertEquals(poolSize, ran.get());
757 assertEquals(poolSize, p.getCompletedTaskCount());
758 }
759
760 // Exception Tests
761
762 /**
763 * Constructor throws if corePoolSize argument is less than zero
764 */
765 public void testConstructor1() {
766 try {
767 new CustomTPE(-1, 1, 1L, SECONDS,
768 new ArrayBlockingQueue<Runnable>(10));
769 shouldThrow();
770 } catch (IllegalArgumentException success) {}
771 }
772
773 /**
774 * Constructor throws if maximumPoolSize is less than zero
775 */
776 public void testConstructor2() {
777 try {
778 new CustomTPE(1, -1, 1L, SECONDS,
779 new ArrayBlockingQueue<Runnable>(10));
780 shouldThrow();
781 } catch (IllegalArgumentException success) {}
782 }
783
784 /**
785 * Constructor throws if maximumPoolSize is equal to zero
786 */
787 public void testConstructor3() {
788 try {
789 new CustomTPE(1, 0, 1L, SECONDS,
790 new ArrayBlockingQueue<Runnable>(10));
791 shouldThrow();
792 } catch (IllegalArgumentException success) {}
793 }
794
795 /**
796 * Constructor throws if keepAliveTime is less than zero
797 */
798 public void testConstructor4() {
799 try {
800 new CustomTPE(1, 2, -1L, SECONDS,
801 new ArrayBlockingQueue<Runnable>(10));
802 shouldThrow();
803 } catch (IllegalArgumentException success) {}
804 }
805
806 /**
807 * Constructor throws if corePoolSize is greater than the maximumPoolSize
808 */
809 public void testConstructor5() {
810 try {
811 new CustomTPE(2, 1, 1L, SECONDS,
812 new ArrayBlockingQueue<Runnable>(10));
813 shouldThrow();
814 } catch (IllegalArgumentException success) {}
815 }
816
817 /**
818 * Constructor throws if workQueue is set to null
819 */
820 public void testConstructorNullPointerException() {
821 try {
822 new CustomTPE(1, 2, 1L, SECONDS, null);
823 shouldThrow();
824 } catch (NullPointerException success) {}
825 }
826
827 /**
828 * Constructor throws if corePoolSize argument is less than zero
829 */
830 public void testConstructor6() {
831 try {
832 new CustomTPE(-1, 1, 1L, SECONDS,
833 new ArrayBlockingQueue<Runnable>(10),
834 new SimpleThreadFactory());
835 shouldThrow();
836 } catch (IllegalArgumentException success) {}
837 }
838
839 /**
840 * Constructor throws if maximumPoolSize is less than zero
841 */
842 public void testConstructor7() {
843 try {
844 new CustomTPE(1,-1, 1L, SECONDS,
845 new ArrayBlockingQueue<Runnable>(10),
846 new SimpleThreadFactory());
847 shouldThrow();
848 } catch (IllegalArgumentException success) {}
849 }
850
851 /**
852 * Constructor throws if maximumPoolSize is equal to zero
853 */
854 public void testConstructor8() {
855 try {
856 new CustomTPE(1, 0, 1L, SECONDS,
857 new ArrayBlockingQueue<Runnable>(10),
858 new SimpleThreadFactory());
859 shouldThrow();
860 } catch (IllegalArgumentException success) {}
861 }
862
863 /**
864 * Constructor throws if keepAliveTime is less than zero
865 */
866 public void testConstructor9() {
867 try {
868 new CustomTPE(1, 2, -1L, SECONDS,
869 new ArrayBlockingQueue<Runnable>(10),
870 new SimpleThreadFactory());
871 shouldThrow();
872 } catch (IllegalArgumentException success) {}
873 }
874
875 /**
876 * Constructor throws if corePoolSize is greater than the maximumPoolSize
877 */
878 public void testConstructor10() {
879 try {
880 new CustomTPE(2, 1, 1L, SECONDS,
881 new ArrayBlockingQueue<Runnable>(10),
882 new SimpleThreadFactory());
883 shouldThrow();
884 } catch (IllegalArgumentException success) {}
885 }
886
887 /**
888 * Constructor throws if workQueue is set to null
889 */
890 public void testConstructorNullPointerException2() {
891 try {
892 new CustomTPE(1, 2, 1L, SECONDS, null, new SimpleThreadFactory());
893 shouldThrow();
894 } catch (NullPointerException success) {}
895 }
896
897 /**
898 * Constructor throws if threadFactory is set to null
899 */
900 public void testConstructorNullPointerException3() {
901 try {
902 new CustomTPE(1, 2, 1L, SECONDS,
903 new ArrayBlockingQueue<Runnable>(10),
904 (ThreadFactory) null);
905 shouldThrow();
906 } catch (NullPointerException success) {}
907 }
908
909 /**
910 * Constructor throws if corePoolSize argument is less than zero
911 */
912 public void testConstructor11() {
913 try {
914 new CustomTPE(-1, 1, 1L, SECONDS,
915 new ArrayBlockingQueue<Runnable>(10),
916 new NoOpREHandler());
917 shouldThrow();
918 } catch (IllegalArgumentException success) {}
919 }
920
921 /**
922 * Constructor throws if maximumPoolSize is less than zero
923 */
924 public void testConstructor12() {
925 try {
926 new CustomTPE(1, -1, 1L, SECONDS,
927 new ArrayBlockingQueue<Runnable>(10),
928 new NoOpREHandler());
929 shouldThrow();
930 } catch (IllegalArgumentException success) {}
931 }
932
933 /**
934 * Constructor throws if maximumPoolSize is equal to zero
935 */
936 public void testConstructor13() {
937 try {
938 new CustomTPE(1, 0, 1L, SECONDS,
939 new ArrayBlockingQueue<Runnable>(10),
940 new NoOpREHandler());
941 shouldThrow();
942 } catch (IllegalArgumentException success) {}
943 }
944
945 /**
946 * Constructor throws if keepAliveTime is less than zero
947 */
948 public void testConstructor14() {
949 try {
950 new CustomTPE(1, 2, -1L, SECONDS,
951 new ArrayBlockingQueue<Runnable>(10),
952 new NoOpREHandler());
953 shouldThrow();
954 } catch (IllegalArgumentException success) {}
955 }
956
957 /**
958 * Constructor throws if corePoolSize is greater than the maximumPoolSize
959 */
960 public void testConstructor15() {
961 try {
962 new CustomTPE(2, 1, 1L, SECONDS,
963 new ArrayBlockingQueue<Runnable>(10),
964 new NoOpREHandler());
965 shouldThrow();
966 } catch (IllegalArgumentException success) {}
967 }
968
969 /**
970 * Constructor throws if workQueue is set to null
971 */
972 public void testConstructorNullPointerException4() {
973 try {
974 new CustomTPE(1, 2, 1L, SECONDS,
975 null,
976 new NoOpREHandler());
977 shouldThrow();
978 } catch (NullPointerException success) {}
979 }
980
981 /**
982 * Constructor throws if handler is set to null
983 */
984 public void testConstructorNullPointerException5() {
985 try {
986 new CustomTPE(1, 2, 1L, SECONDS,
987 new ArrayBlockingQueue<Runnable>(10),
988 (RejectedExecutionHandler) null);
989 shouldThrow();
990 } catch (NullPointerException success) {}
991 }
992
993 /**
994 * Constructor throws if corePoolSize argument is less than zero
995 */
996 public void testConstructor16() {
997 try {
998 new CustomTPE(-1, 1, 1L, SECONDS,
999 new ArrayBlockingQueue<Runnable>(10),
1000 new SimpleThreadFactory(),
1001 new NoOpREHandler());
1002 shouldThrow();
1003 } catch (IllegalArgumentException success) {}
1004 }
1005
1006 /**
1007 * Constructor throws if maximumPoolSize is less than zero
1008 */
1009 public void testConstructor17() {
1010 try {
1011 new CustomTPE(1, -1, 1L, SECONDS,
1012 new ArrayBlockingQueue<Runnable>(10),
1013 new SimpleThreadFactory(),
1014 new NoOpREHandler());
1015 shouldThrow();
1016 } catch (IllegalArgumentException success) {}
1017 }
1018
1019 /**
1020 * Constructor throws if maximumPoolSize is equal to zero
1021 */
1022 public void testConstructor18() {
1023 try {
1024 new CustomTPE(1, 0, 1L, SECONDS,
1025 new ArrayBlockingQueue<Runnable>(10),
1026 new SimpleThreadFactory(),
1027 new NoOpREHandler());
1028 shouldThrow();
1029 } catch (IllegalArgumentException success) {}
1030 }
1031
1032 /**
1033 * Constructor throws if keepAliveTime is less than zero
1034 */
1035 public void testConstructor19() {
1036 try {
1037 new CustomTPE(1, 2, -1L, SECONDS,
1038 new ArrayBlockingQueue<Runnable>(10),
1039 new SimpleThreadFactory(),
1040 new NoOpREHandler());
1041 shouldThrow();
1042 } catch (IllegalArgumentException success) {}
1043 }
1044
1045 /**
1046 * Constructor throws if corePoolSize is greater than the maximumPoolSize
1047 */
1048 public void testConstructor20() {
1049 try {
1050 new CustomTPE(2, 1, 1L, SECONDS,
1051 new ArrayBlockingQueue<Runnable>(10),
1052 new SimpleThreadFactory(),
1053 new NoOpREHandler());
1054 shouldThrow();
1055 } catch (IllegalArgumentException success) {}
1056 }
1057
1058 /**
1059 * Constructor throws if workQueue is null
1060 */
1061 public void testConstructorNullPointerException6() {
1062 try {
1063 new CustomTPE(1, 2, 1L, SECONDS,
1064 null,
1065 new SimpleThreadFactory(),
1066 new NoOpREHandler());
1067 shouldThrow();
1068 } catch (NullPointerException success) {}
1069 }
1070
1071 /**
1072 * Constructor throws if handler is null
1073 */
1074 public void testConstructorNullPointerException7() {
1075 try {
1076 new CustomTPE(1, 2, 1L, SECONDS,
1077 new ArrayBlockingQueue<Runnable>(10),
1078 new SimpleThreadFactory(),
1079 (RejectedExecutionHandler) null);
1080 shouldThrow();
1081 } catch (NullPointerException success) {}
1082 }
1083
1084 /**
1085 * Constructor throws if ThreadFactory is null
1086 */
1087 public void testConstructorNullPointerException8() {
1088 try {
1089 new CustomTPE(1, 2, 1L, SECONDS,
1090 new ArrayBlockingQueue<Runnable>(10),
1091 (ThreadFactory) null,
1092 new NoOpREHandler());
1093 shouldThrow();
1094 } catch (NullPointerException success) {}
1095 }
1096
1097 /**
1098 * execute throws RejectedExecutionException if saturated.
1099 */
1100 public void testSaturatedExecute() {
1101 ThreadPoolExecutor p =
1102 new CustomTPE(1, 1,
1103 LONG_DELAY_MS, MILLISECONDS,
1104 new ArrayBlockingQueue<Runnable>(1));
1105 final CountDownLatch done = new CountDownLatch(1);
1106 try {
1107 Runnable task = new CheckedRunnable() {
1108 public void realRun() throws InterruptedException {
1109 done.await();
1110 }};
1111 for (int i = 0; i < 2; ++i)
1112 p.execute(task);
1113 for (int i = 0; i < 2; ++i) {
1114 try {
1115 p.execute(task);
1116 shouldThrow();
1117 } catch (RejectedExecutionException success) {}
1118 assertTrue(p.getTaskCount() <= 2);
1119 }
1120 } finally {
1121 done.countDown();
1122 joinPool(p);
1123 }
1124 }
1125
1126 /**
1127 * executor using CallerRunsPolicy runs task if saturated.
1128 */
1129 public void testSaturatedExecute2() {
1130 RejectedExecutionHandler h = new CustomTPE.CallerRunsPolicy();
1131 ThreadPoolExecutor p = new CustomTPE(1, 1,
1132 LONG_DELAY_MS, MILLISECONDS,
1133 new ArrayBlockingQueue<Runnable>(1),
1134 h);
1135 try {
1136 TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5];
1137 for (int i = 0; i < tasks.length; ++i)
1138 tasks[i] = new TrackedNoOpRunnable();
1139 TrackedLongRunnable mr = new TrackedLongRunnable();
1140 p.execute(mr);
1141 for (int i = 0; i < tasks.length; ++i)
1142 p.execute(tasks[i]);
1143 for (int i = 1; i < tasks.length; ++i)
1144 assertTrue(tasks[i].done);
1145 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1146 } finally {
1147 joinPool(p);
1148 }
1149 }
1150
1151 /**
1152 * executor using DiscardPolicy drops task if saturated.
1153 */
1154 public void testSaturatedExecute3() {
1155 RejectedExecutionHandler h = new CustomTPE.DiscardPolicy();
1156 ThreadPoolExecutor p =
1157 new CustomTPE(1, 1,
1158 LONG_DELAY_MS, MILLISECONDS,
1159 new ArrayBlockingQueue<Runnable>(1),
1160 h);
1161 try {
1162 TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5];
1163 for (int i = 0; i < tasks.length; ++i)
1164 tasks[i] = new TrackedNoOpRunnable();
1165 p.execute(new TrackedLongRunnable());
1166 for (TrackedNoOpRunnable task : tasks)
1167 p.execute(task);
1168 for (TrackedNoOpRunnable task : tasks)
1169 assertFalse(task.done);
1170 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1171 } finally {
1172 joinPool(p);
1173 }
1174 }
1175
1176 /**
1177 * executor using DiscardOldestPolicy drops oldest task if saturated.
1178 */
1179 public void testSaturatedExecute4() {
1180 RejectedExecutionHandler h = new CustomTPE.DiscardOldestPolicy();
1181 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1182 try {
1183 p.execute(new TrackedLongRunnable());
1184 TrackedLongRunnable r2 = new TrackedLongRunnable();
1185 p.execute(r2);
1186 assertTrue(p.getQueue().contains(r2));
1187 TrackedNoOpRunnable r3 = new TrackedNoOpRunnable();
1188 p.execute(r3);
1189 assertFalse(p.getQueue().contains(r2));
1190 assertTrue(p.getQueue().contains(r3));
1191 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1192 } finally {
1193 joinPool(p);
1194 }
1195 }
1196
1197 /**
1198 * execute throws RejectedExecutionException if shutdown
1199 */
1200 public void testRejectedExecutionExceptionOnShutdown() {
1201 ThreadPoolExecutor p =
1202 new CustomTPE(1,1,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(1));
1203 try { p.shutdown(); } catch (SecurityException ok) { return; }
1204 try {
1205 p.execute(new NoOpRunnable());
1206 shouldThrow();
1207 } catch (RejectedExecutionException success) {}
1208
1209 joinPool(p);
1210 }
1211
1212 /**
1213 * execute using CallerRunsPolicy drops task on shutdown
1214 */
1215 public void testCallerRunsOnShutdown() {
1216 RejectedExecutionHandler h = new CustomTPE.CallerRunsPolicy();
1217 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1218
1219 try { p.shutdown(); } catch (SecurityException ok) { return; }
1220 try {
1221 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1222 p.execute(r);
1223 assertFalse(r.done);
1224 } finally {
1225 joinPool(p);
1226 }
1227 }
1228
1229 /**
1230 * execute using DiscardPolicy drops task on shutdown
1231 */
1232 public void testDiscardOnShutdown() {
1233 RejectedExecutionHandler h = new CustomTPE.DiscardPolicy();
1234 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1235
1236 try { p.shutdown(); } catch (SecurityException ok) { return; }
1237 try {
1238 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1239 p.execute(r);
1240 assertFalse(r.done);
1241 } finally {
1242 joinPool(p);
1243 }
1244 }
1245
1246 /**
1247 * execute using DiscardOldestPolicy drops task on shutdown
1248 */
1249 public void testDiscardOldestOnShutdown() {
1250 RejectedExecutionHandler h = new CustomTPE.DiscardOldestPolicy();
1251 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1252
1253 try { p.shutdown(); } catch (SecurityException ok) { return; }
1254 try {
1255 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1256 p.execute(r);
1257 assertFalse(r.done);
1258 } finally {
1259 joinPool(p);
1260 }
1261 }
1262
1263 /**
1264 * execute(null) throws NPE
1265 */
1266 public void testExecuteNull() {
1267 ThreadPoolExecutor p =
1268 new CustomTPE(1, 2, 1L, SECONDS,
1269 new ArrayBlockingQueue<Runnable>(10));
1270 try {
1271 p.execute(null);
1272 shouldThrow();
1273 } catch (NullPointerException success) {}
1274
1275 joinPool(p);
1276 }
1277
1278 /**
1279 * setCorePoolSize of negative value throws IllegalArgumentException
1280 */
1281 public void testCorePoolSizeIllegalArgumentException() {
1282 ThreadPoolExecutor p =
1283 new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1284 try {
1285 p.setCorePoolSize(-1);
1286 shouldThrow();
1287 } catch (IllegalArgumentException success) {
1288 } finally {
1289 try { p.shutdown(); } catch (SecurityException ok) { return; }
1290 }
1291 joinPool(p);
1292 }
1293
1294 /**
1295 * setMaximumPoolSize(int) throws IllegalArgumentException
1296 * if given a value less the core pool size
1297 */
1298 public void testMaximumPoolSizeIllegalArgumentException() {
1299 ThreadPoolExecutor p =
1300 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1301 try {
1302 p.setMaximumPoolSize(1);
1303 shouldThrow();
1304 } catch (IllegalArgumentException success) {
1305 } finally {
1306 try { p.shutdown(); } catch (SecurityException ok) { return; }
1307 }
1308 joinPool(p);
1309 }
1310
1311 /**
1312 * setMaximumPoolSize throws IllegalArgumentException
1313 * if given a negative value
1314 */
1315 public void testMaximumPoolSizeIllegalArgumentException2() {
1316 ThreadPoolExecutor p =
1317 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1318 try {
1319 p.setMaximumPoolSize(-1);
1320 shouldThrow();
1321 } catch (IllegalArgumentException success) {
1322 } finally {
1323 try { p.shutdown(); } catch (SecurityException ok) { return; }
1324 }
1325 joinPool(p);
1326 }
1327
1328 /**
1329 * setKeepAliveTime throws IllegalArgumentException
1330 * when given a negative value
1331 */
1332 public void testKeepAliveTimeIllegalArgumentException() {
1333 ThreadPoolExecutor p =
1334 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1335
1336 try {
1337 p.setKeepAliveTime(-1,MILLISECONDS);
1338 shouldThrow();
1339 } catch (IllegalArgumentException success) {
1340 } finally {
1341 try { p.shutdown(); } catch (SecurityException ok) { return; }
1342 }
1343 joinPool(p);
1344 }
1345
1346 /**
1347 * terminated() is called on termination
1348 */
1349 public void testTerminated() {
1350 CustomTPE p = new CustomTPE();
1351 try { p.shutdown(); } catch (SecurityException ok) { return; }
1352 assertTrue(p.terminatedCalled());
1353 joinPool(p);
1354 }
1355
1356 /**
1357 * beforeExecute and afterExecute are called when executing task
1358 */
1359 public void testBeforeAfter() throws InterruptedException {
1360 CustomTPE p = new CustomTPE();
1361 try {
1362 final CountDownLatch done = new CountDownLatch(1);
1363 p.execute(new CheckedRunnable() {
1364 public void realRun() {
1365 done.countDown();
1366 }});
1367 await(p.afterCalled);
1368 assertEquals(0, done.getCount());
1369 assertTrue(p.afterCalled());
1370 assertTrue(p.beforeCalled());
1371 try { p.shutdown(); } catch (SecurityException ok) { return; }
1372 } finally {
1373 joinPool(p);
1374 }
1375 }
1376
1377 /**
1378 * completed submit of callable returns result
1379 */
1380 public void testSubmitCallable() throws Exception {
1381 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1382 try {
1383 Future<String> future = e.submit(new StringTask());
1384 String result = future.get();
1385 assertSame(TEST_STRING, result);
1386 } finally {
1387 joinPool(e);
1388 }
1389 }
1390
1391 /**
1392 * completed submit of runnable returns successfully
1393 */
1394 public void testSubmitRunnable() throws Exception {
1395 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1396 try {
1397 Future<?> future = e.submit(new NoOpRunnable());
1398 future.get();
1399 assertTrue(future.isDone());
1400 } finally {
1401 joinPool(e);
1402 }
1403 }
1404
1405 /**
1406 * completed submit of (runnable, result) returns result
1407 */
1408 public void testSubmitRunnable2() throws Exception {
1409 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1410 try {
1411 Future<String> future = e.submit(new NoOpRunnable(), TEST_STRING);
1412 String result = future.get();
1413 assertSame(TEST_STRING, result);
1414 } finally {
1415 joinPool(e);
1416 }
1417 }
1418
1419 /**
1420 * invokeAny(null) throws NPE
1421 */
1422 public void testInvokeAny1() throws Exception {
1423 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1424 try {
1425 e.invokeAny(null);
1426 shouldThrow();
1427 } catch (NullPointerException success) {
1428 } finally {
1429 joinPool(e);
1430 }
1431 }
1432
1433 /**
1434 * invokeAny(empty collection) throws IAE
1435 */
1436 public void testInvokeAny2() throws Exception {
1437 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1438 try {
1439 e.invokeAny(new ArrayList<Callable<String>>());
1440 shouldThrow();
1441 } catch (IllegalArgumentException success) {
1442 } finally {
1443 joinPool(e);
1444 }
1445 }
1446
1447 /**
1448 * invokeAny(c) throws NPE if c has null elements
1449 */
1450 public void testInvokeAny3() throws Exception {
1451 CountDownLatch latch = new CountDownLatch(1);
1452 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1453 List<Callable<String>> l = new ArrayList<Callable<String>>();
1454 l.add(latchAwaitingStringTask(latch));
1455 l.add(null);
1456 try {
1457 e.invokeAny(l);
1458 shouldThrow();
1459 } catch (NullPointerException success) {
1460 } finally {
1461 latch.countDown();
1462 joinPool(e);
1463 }
1464 }
1465
1466 /**
1467 * invokeAny(c) throws ExecutionException if no task completes
1468 */
1469 public void testInvokeAny4() throws Exception {
1470 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1471 List<Callable<String>> l = new ArrayList<Callable<String>>();
1472 l.add(new NPETask());
1473 try {
1474 e.invokeAny(l);
1475 shouldThrow();
1476 } catch (ExecutionException success) {
1477 assertTrue(success.getCause() instanceof NullPointerException);
1478 } finally {
1479 joinPool(e);
1480 }
1481 }
1482
1483 /**
1484 * invokeAny(c) returns result of some task
1485 */
1486 public void testInvokeAny5() throws Exception {
1487 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1488 try {
1489 List<Callable<String>> l = new ArrayList<Callable<String>>();
1490 l.add(new StringTask());
1491 l.add(new StringTask());
1492 String result = e.invokeAny(l);
1493 assertSame(TEST_STRING, result);
1494 } finally {
1495 joinPool(e);
1496 }
1497 }
1498
1499 /**
1500 * invokeAll(null) throws NPE
1501 */
1502 public void testInvokeAll1() throws Exception {
1503 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1504 try {
1505 e.invokeAll(null);
1506 shouldThrow();
1507 } catch (NullPointerException success) {
1508 } finally {
1509 joinPool(e);
1510 }
1511 }
1512
1513 /**
1514 * invokeAll(empty collection) returns empty collection
1515 */
1516 public void testInvokeAll2() throws Exception {
1517 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1518 try {
1519 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>());
1520 assertTrue(r.isEmpty());
1521 } finally {
1522 joinPool(e);
1523 }
1524 }
1525
1526 /**
1527 * invokeAll(c) throws NPE if c has null elements
1528 */
1529 public void testInvokeAll3() throws Exception {
1530 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1531 List<Callable<String>> l = new ArrayList<Callable<String>>();
1532 l.add(new StringTask());
1533 l.add(null);
1534 try {
1535 e.invokeAll(l);
1536 shouldThrow();
1537 } catch (NullPointerException success) {
1538 } finally {
1539 joinPool(e);
1540 }
1541 }
1542
1543 /**
1544 * get of element of invokeAll(c) throws exception on failed task
1545 */
1546 public void testInvokeAll4() throws Exception {
1547 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1548 List<Callable<String>> l = new ArrayList<Callable<String>>();
1549 l.add(new NPETask());
1550 List<Future<String>> futures = e.invokeAll(l);
1551 assertEquals(1, futures.size());
1552 try {
1553 futures.get(0).get();
1554 shouldThrow();
1555 } catch (ExecutionException success) {
1556 assertTrue(success.getCause() instanceof NullPointerException);
1557 } finally {
1558 joinPool(e);
1559 }
1560 }
1561
1562 /**
1563 * invokeAll(c) returns results of all completed tasks
1564 */
1565 public void testInvokeAll5() throws Exception {
1566 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1567 try {
1568 List<Callable<String>> l = new ArrayList<Callable<String>>();
1569 l.add(new StringTask());
1570 l.add(new StringTask());
1571 List<Future<String>> futures = e.invokeAll(l);
1572 assertEquals(2, futures.size());
1573 for (Future<String> future : futures)
1574 assertSame(TEST_STRING, future.get());
1575 } finally {
1576 joinPool(e);
1577 }
1578 }
1579
1580 /**
1581 * timed invokeAny(null) throws NPE
1582 */
1583 public void testTimedInvokeAny1() throws Exception {
1584 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1585 try {
1586 e.invokeAny(null, MEDIUM_DELAY_MS, MILLISECONDS);
1587 shouldThrow();
1588 } catch (NullPointerException success) {
1589 } finally {
1590 joinPool(e);
1591 }
1592 }
1593
1594 /**
1595 * timed invokeAny(,,null) throws NPE
1596 */
1597 public void testTimedInvokeAnyNullTimeUnit() throws Exception {
1598 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1599 List<Callable<String>> l = new ArrayList<Callable<String>>();
1600 l.add(new StringTask());
1601 try {
1602 e.invokeAny(l, MEDIUM_DELAY_MS, null);
1603 shouldThrow();
1604 } catch (NullPointerException success) {
1605 } finally {
1606 joinPool(e);
1607 }
1608 }
1609
1610 /**
1611 * timed invokeAny(empty collection) throws IAE
1612 */
1613 public void testTimedInvokeAny2() throws Exception {
1614 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1615 try {
1616 e.invokeAny(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1617 shouldThrow();
1618 } catch (IllegalArgumentException success) {
1619 } finally {
1620 joinPool(e);
1621 }
1622 }
1623
1624 /**
1625 * timed invokeAny(c) throws NPE if c has null elements
1626 */
1627 public void testTimedInvokeAny3() throws Exception {
1628 CountDownLatch latch = new CountDownLatch(1);
1629 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1630 List<Callable<String>> l = new ArrayList<Callable<String>>();
1631 l.add(latchAwaitingStringTask(latch));
1632 l.add(null);
1633 try {
1634 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1635 shouldThrow();
1636 } catch (NullPointerException success) {
1637 } finally {
1638 latch.countDown();
1639 joinPool(e);
1640 }
1641 }
1642
1643 /**
1644 * timed invokeAny(c) throws ExecutionException if no task completes
1645 */
1646 public void testTimedInvokeAny4() throws Exception {
1647 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1648 List<Callable<String>> l = new ArrayList<Callable<String>>();
1649 l.add(new NPETask());
1650 try {
1651 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1652 shouldThrow();
1653 } catch (ExecutionException success) {
1654 assertTrue(success.getCause() instanceof NullPointerException);
1655 } finally {
1656 joinPool(e);
1657 }
1658 }
1659
1660 /**
1661 * timed invokeAny(c) returns result of some task
1662 */
1663 public void testTimedInvokeAny5() throws Exception {
1664 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1665 try {
1666 List<Callable<String>> l = new ArrayList<Callable<String>>();
1667 l.add(new StringTask());
1668 l.add(new StringTask());
1669 String result = e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1670 assertSame(TEST_STRING, result);
1671 } finally {
1672 joinPool(e);
1673 }
1674 }
1675
1676 /**
1677 * timed invokeAll(null) throws NPE
1678 */
1679 public void testTimedInvokeAll1() throws Exception {
1680 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1681 try {
1682 e.invokeAll(null, MEDIUM_DELAY_MS, MILLISECONDS);
1683 shouldThrow();
1684 } catch (NullPointerException success) {
1685 } finally {
1686 joinPool(e);
1687 }
1688 }
1689
1690 /**
1691 * timed invokeAll(,,null) throws NPE
1692 */
1693 public void testTimedInvokeAllNullTimeUnit() throws Exception {
1694 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1695 List<Callable<String>> l = new ArrayList<Callable<String>>();
1696 l.add(new StringTask());
1697 try {
1698 e.invokeAll(l, MEDIUM_DELAY_MS, null);
1699 shouldThrow();
1700 } catch (NullPointerException success) {
1701 } finally {
1702 joinPool(e);
1703 }
1704 }
1705
1706 /**
1707 * timed invokeAll(empty collection) returns empty collection
1708 */
1709 public void testTimedInvokeAll2() throws Exception {
1710 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1711 try {
1712 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1713 assertTrue(r.isEmpty());
1714 } finally {
1715 joinPool(e);
1716 }
1717 }
1718
1719 /**
1720 * timed invokeAll(c) throws NPE if c has null elements
1721 */
1722 public void testTimedInvokeAll3() throws Exception {
1723 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1724 List<Callable<String>> l = new ArrayList<Callable<String>>();
1725 l.add(new StringTask());
1726 l.add(null);
1727 try {
1728 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1729 shouldThrow();
1730 } catch (NullPointerException success) {
1731 } finally {
1732 joinPool(e);
1733 }
1734 }
1735
1736 /**
1737 * get of element of invokeAll(c) throws exception on failed task
1738 */
1739 public void testTimedInvokeAll4() throws Exception {
1740 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1741 List<Callable<String>> l = new ArrayList<Callable<String>>();
1742 l.add(new NPETask());
1743 List<Future<String>> futures =
1744 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1745 assertEquals(1, futures.size());
1746 try {
1747 futures.get(0).get();
1748 shouldThrow();
1749 } catch (ExecutionException success) {
1750 assertTrue(success.getCause() instanceof NullPointerException);
1751 } finally {
1752 joinPool(e);
1753 }
1754 }
1755
1756 /**
1757 * timed invokeAll(c) returns results of all completed tasks
1758 */
1759 public void testTimedInvokeAll5() throws Exception {
1760 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1761 try {
1762 List<Callable<String>> l = new ArrayList<Callable<String>>();
1763 l.add(new StringTask());
1764 l.add(new StringTask());
1765 List<Future<String>> futures =
1766 e.invokeAll(l, LONG_DELAY_MS, MILLISECONDS);
1767 assertEquals(2, futures.size());
1768 for (Future<String> future : futures)
1769 assertSame(TEST_STRING, future.get());
1770 } finally {
1771 joinPool(e);
1772 }
1773 }
1774
1775 /**
1776 * timed invokeAll(c) cancels tasks not completed by timeout
1777 */
1778 public void testTimedInvokeAll6() throws Exception {
1779 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1780 try {
1781 for (long timeout = timeoutMillis();;) {
1782 List<Callable<String>> tasks = new ArrayList<>();
1783 tasks.add(new StringTask("0"));
1784 tasks.add(Executors.callable(new LongPossiblyInterruptedRunnable(), TEST_STRING));
1785 tasks.add(new StringTask("2"));
1786 long startTime = System.nanoTime();
1787 List<Future<String>> futures =
1788 e.invokeAll(tasks, timeout, MILLISECONDS);
1789 assertEquals(tasks.size(), futures.size());
1790 assertTrue(millisElapsedSince(startTime) >= timeout);
1791 for (Future future : futures)
1792 assertTrue(future.isDone());
1793 assertTrue(futures.get(1).isCancelled());
1794 try {
1795 assertEquals("0", futures.get(0).get());
1796 assertEquals("2", futures.get(2).get());
1797 break;
1798 } catch (CancellationException retryWithLongerTimeout) {
1799 timeout *= 2;
1800 if (timeout >= LONG_DELAY_MS / 2)
1801 fail("expected exactly one task to be cancelled");
1802 }
1803 }
1804 } finally {
1805 joinPool(e);
1806 }
1807 }
1808
1809 /**
1810 * Execution continues if there is at least one thread even if
1811 * thread factory fails to create more
1812 */
1813 public void testFailingThreadFactory() throws InterruptedException {
1814 final ExecutorService e =
1815 new CustomTPE(100, 100,
1816 LONG_DELAY_MS, MILLISECONDS,
1817 new LinkedBlockingQueue<Runnable>(),
1818 new FailingThreadFactory());
1819 try {
1820 final int TASKS = 100;
1821 final CountDownLatch done = new CountDownLatch(TASKS);
1822 for (int k = 0; k < TASKS; ++k)
1823 e.execute(new CheckedRunnable() {
1824 public void realRun() {
1825 done.countDown();
1826 }});
1827 assertTrue(done.await(LONG_DELAY_MS, MILLISECONDS));
1828 } finally {
1829 joinPool(e);
1830 }
1831 }
1832
1833 /**
1834 * allowsCoreThreadTimeOut is by default false.
1835 */
1836 public void testAllowsCoreThreadTimeOut() {
1837 ThreadPoolExecutor p = new CustomTPE(2, 2, 1000, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1838 assertFalse(p.allowsCoreThreadTimeOut());
1839 joinPool(p);
1840 }
1841
1842 /**
1843 * allowCoreThreadTimeOut(true) causes idle threads to time out
1844 */
1845 public void testAllowCoreThreadTimeOut_true() throws Exception {
1846 long keepAliveTime = timeoutMillis();
1847 final ThreadPoolExecutor p =
1848 new CustomTPE(2, 10,
1849 keepAliveTime, MILLISECONDS,
1850 new ArrayBlockingQueue<Runnable>(10));
1851 final CountDownLatch threadStarted = new CountDownLatch(1);
1852 try {
1853 p.allowCoreThreadTimeOut(true);
1854 p.execute(new CheckedRunnable() {
1855 public void realRun() {
1856 threadStarted.countDown();
1857 assertEquals(1, p.getPoolSize());
1858 }});
1859 await(threadStarted);
1860 delay(keepAliveTime);
1861 long startTime = System.nanoTime();
1862 while (p.getPoolSize() > 0
1863 && millisElapsedSince(startTime) < LONG_DELAY_MS)
1864 Thread.yield();
1865 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1866 assertEquals(0, p.getPoolSize());
1867 } finally {
1868 joinPool(p);
1869 }
1870 }
1871
1872 /**
1873 * allowCoreThreadTimeOut(false) causes idle threads not to time out
1874 */
1875 public void testAllowCoreThreadTimeOut_false() throws Exception {
1876 long keepAliveTime = timeoutMillis();
1877 final ThreadPoolExecutor p =
1878 new CustomTPE(2, 10,
1879 keepAliveTime, MILLISECONDS,
1880 new ArrayBlockingQueue<Runnable>(10));
1881 final CountDownLatch threadStarted = new CountDownLatch(1);
1882 try {
1883 p.allowCoreThreadTimeOut(false);
1884 p.execute(new CheckedRunnable() {
1885 public void realRun() throws InterruptedException {
1886 threadStarted.countDown();
1887 assertTrue(p.getPoolSize() >= 1);
1888 }});
1889 delay(2 * keepAliveTime);
1890 assertTrue(p.getPoolSize() >= 1);
1891 } finally {
1892 joinPool(p);
1893 }
1894 }
1895
1896 /**
1897 * get(cancelled task) throws CancellationException
1898 * (in part, a test of CustomTPE itself)
1899 */
1900 public void testGet_cancelled() throws Exception {
1901 final ExecutorService e =
1902 new CustomTPE(1, 1,
1903 LONG_DELAY_MS, MILLISECONDS,
1904 new LinkedBlockingQueue<Runnable>());
1905 try {
1906 final CountDownLatch blockerStarted = new CountDownLatch(1);
1907 final CountDownLatch done = new CountDownLatch(1);
1908 final List<Future<?>> futures = new ArrayList<>();
1909 for (int i = 0; i < 2; i++) {
1910 Runnable r = new CheckedRunnable() { public void realRun()
1911 throws Throwable {
1912 blockerStarted.countDown();
1913 assertTrue(done.await(2 * LONG_DELAY_MS, MILLISECONDS));
1914 }};
1915 futures.add(e.submit(r));
1916 }
1917 assertTrue(blockerStarted.await(LONG_DELAY_MS, MILLISECONDS));
1918 for (Future<?> future : futures) future.cancel(false);
1919 for (Future<?> future : futures) {
1920 try {
1921 future.get();
1922 shouldThrow();
1923 } catch (CancellationException success) {}
1924 try {
1925 future.get(LONG_DELAY_MS, MILLISECONDS);
1926 shouldThrow();
1927 } catch (CancellationException success) {}
1928 assertTrue(future.isCancelled());
1929 assertTrue(future.isDone());
1930 }
1931 done.countDown();
1932 } finally {
1933 joinPool(e);
1934 }
1935 }
1936
1937 }