ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/ThreadPoolExecutorSubclassTest.java
Revision: 1.54
Committed: Sun Oct 4 01:27:32 2015 UTC (8 years, 7 months ago) by jsr166
Branch: MAIN
Changes since 1.53: +4 -6 lines
Log Message:
improve testGetCompletedTaskCount

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include Andrew Wright, Jeffrey Hayes,
6 * Pat Fisher, Mike Judd.
7 */
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.SECONDS;
11
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.concurrent.ArrayBlockingQueue;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.CancellationException;
18 import java.util.concurrent.CountDownLatch;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.FutureTask;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.RejectedExecutionHandler;
27 import java.util.concurrent.RunnableFuture;
28 import java.util.concurrent.SynchronousQueue;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.TimeoutException;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.locks.Condition;
35 import java.util.concurrent.locks.ReentrantLock;
36
37 import junit.framework.Test;
38 import junit.framework.TestSuite;
39
40 public class ThreadPoolExecutorSubclassTest extends JSR166TestCase {
41 public static void main(String[] args) {
42 main(suite(), args);
43 }
44 public static Test suite() {
45 return new TestSuite(ThreadPoolExecutorSubclassTest.class);
46 }
47
48 static class CustomTask<V> implements RunnableFuture<V> {
49 final Callable<V> callable;
50 final ReentrantLock lock = new ReentrantLock();
51 final Condition cond = lock.newCondition();
52 boolean done;
53 boolean cancelled;
54 V result;
55 Thread thread;
56 Exception exception;
57 CustomTask(Callable<V> c) {
58 if (c == null) throw new NullPointerException();
59 callable = c;
60 }
61 CustomTask(final Runnable r, final V res) {
62 if (r == null) throw new NullPointerException();
63 callable = new Callable<V>() {
64 public V call() throws Exception { r.run(); return res; }};
65 }
66 public boolean isDone() {
67 lock.lock(); try { return done; } finally { lock.unlock() ; }
68 }
69 public boolean isCancelled() {
70 lock.lock(); try { return cancelled; } finally { lock.unlock() ; }
71 }
72 public boolean cancel(boolean mayInterrupt) {
73 lock.lock();
74 try {
75 if (!done) {
76 cancelled = true;
77 done = true;
78 if (mayInterrupt && thread != null)
79 thread.interrupt();
80 return true;
81 }
82 return false;
83 }
84 finally { lock.unlock() ; }
85 }
86 public void run() {
87 lock.lock();
88 try {
89 if (done)
90 return;
91 thread = Thread.currentThread();
92 }
93 finally { lock.unlock() ; }
94 V v = null;
95 Exception e = null;
96 try {
97 v = callable.call();
98 }
99 catch (Exception ex) {
100 e = ex;
101 }
102 lock.lock();
103 try {
104 if (!done) {
105 result = v;
106 exception = e;
107 done = true;
108 thread = null;
109 cond.signalAll();
110 }
111 }
112 finally { lock.unlock(); }
113 }
114 public V get() throws InterruptedException, ExecutionException {
115 lock.lock();
116 try {
117 while (!done)
118 cond.await();
119 if (cancelled)
120 throw new CancellationException();
121 if (exception != null)
122 throw new ExecutionException(exception);
123 return result;
124 }
125 finally { lock.unlock(); }
126 }
127 public V get(long timeout, TimeUnit unit)
128 throws InterruptedException, ExecutionException, TimeoutException {
129 long nanos = unit.toNanos(timeout);
130 lock.lock();
131 try {
132 while (!done) {
133 if (nanos <= 0L)
134 throw new TimeoutException();
135 nanos = cond.awaitNanos(nanos);
136 }
137 if (cancelled)
138 throw new CancellationException();
139 if (exception != null)
140 throw new ExecutionException(exception);
141 return result;
142 }
143 finally { lock.unlock(); }
144 }
145 }
146
147 static class CustomTPE extends ThreadPoolExecutor {
148 protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
149 return new CustomTask<V>(c);
150 }
151 protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
152 return new CustomTask<V>(r, v);
153 }
154
155 CustomTPE(int corePoolSize,
156 int maximumPoolSize,
157 long keepAliveTime,
158 TimeUnit unit,
159 BlockingQueue<Runnable> workQueue) {
160 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
161 workQueue);
162 }
163 CustomTPE(int corePoolSize,
164 int maximumPoolSize,
165 long keepAliveTime,
166 TimeUnit unit,
167 BlockingQueue<Runnable> workQueue,
168 ThreadFactory threadFactory) {
169 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
170 threadFactory);
171 }
172
173 CustomTPE(int corePoolSize,
174 int maximumPoolSize,
175 long keepAliveTime,
176 TimeUnit unit,
177 BlockingQueue<Runnable> workQueue,
178 RejectedExecutionHandler handler) {
179 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
180 handler);
181 }
182 CustomTPE(int corePoolSize,
183 int maximumPoolSize,
184 long keepAliveTime,
185 TimeUnit unit,
186 BlockingQueue<Runnable> workQueue,
187 ThreadFactory threadFactory,
188 RejectedExecutionHandler handler) {
189 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
190 workQueue, threadFactory, handler);
191 }
192
193 final CountDownLatch beforeCalled = new CountDownLatch(1);
194 final CountDownLatch afterCalled = new CountDownLatch(1);
195 final CountDownLatch terminatedCalled = new CountDownLatch(1);
196
197 public CustomTPE() {
198 super(1, 1, LONG_DELAY_MS, MILLISECONDS, new SynchronousQueue<Runnable>());
199 }
200 protected void beforeExecute(Thread t, Runnable r) {
201 beforeCalled.countDown();
202 }
203 protected void afterExecute(Runnable r, Throwable t) {
204 afterCalled.countDown();
205 }
206 protected void terminated() {
207 terminatedCalled.countDown();
208 }
209
210 public boolean beforeCalled() {
211 return beforeCalled.getCount() == 0;
212 }
213 public boolean afterCalled() {
214 return afterCalled.getCount() == 0;
215 }
216 public boolean terminatedCalled() {
217 return terminatedCalled.getCount() == 0;
218 }
219 }
220
221 static class FailingThreadFactory implements ThreadFactory {
222 int calls = 0;
223 public Thread newThread(Runnable r) {
224 if (++calls > 1) return null;
225 return new Thread(r);
226 }
227 }
228
229 /**
230 * execute successfully executes a runnable
231 */
232 public void testExecute() throws InterruptedException {
233 final ThreadPoolExecutor p =
234 new CustomTPE(1, 1,
235 2 * LONG_DELAY_MS, MILLISECONDS,
236 new ArrayBlockingQueue<Runnable>(10));
237 try (PoolCleaner cleaner = cleaner(p)) {
238 final CountDownLatch done = new CountDownLatch(1);
239 final Runnable task = new CheckedRunnable() {
240 public void realRun() { done.countDown(); }};
241 p.execute(task);
242 assertTrue(done.await(LONG_DELAY_MS, MILLISECONDS));
243 }
244 }
245
246 /**
247 * getActiveCount increases but doesn't overestimate, when a
248 * thread becomes active
249 */
250 public void testGetActiveCount() throws InterruptedException {
251 final ThreadPoolExecutor p =
252 new CustomTPE(2, 2,
253 LONG_DELAY_MS, MILLISECONDS,
254 new ArrayBlockingQueue<Runnable>(10));
255 final CountDownLatch threadStarted = new CountDownLatch(1);
256 final CountDownLatch done = new CountDownLatch(1);
257 try (PoolCleaner cleaner = cleaner(p)) {
258 assertEquals(0, p.getActiveCount());
259 p.execute(new CheckedRunnable() {
260 public void realRun() throws InterruptedException {
261 threadStarted.countDown();
262 assertEquals(1, p.getActiveCount());
263 done.await();
264 }});
265 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
266 assertEquals(1, p.getActiveCount());
267 done.countDown();
268 }
269 }
270
271 /**
272 * prestartCoreThread starts a thread if under corePoolSize, else doesn't
273 */
274 public void testPrestartCoreThread() {
275 ThreadPoolExecutor p =
276 new CustomTPE(2, 6,
277 LONG_DELAY_MS, MILLISECONDS,
278 new ArrayBlockingQueue<Runnable>(10));
279 try (PoolCleaner cleaner = cleaner(p)) {
280 assertEquals(0, p.getPoolSize());
281 assertTrue(p.prestartCoreThread());
282 assertEquals(1, p.getPoolSize());
283 assertTrue(p.prestartCoreThread());
284 assertEquals(2, p.getPoolSize());
285 assertFalse(p.prestartCoreThread());
286 assertEquals(2, p.getPoolSize());
287 p.setCorePoolSize(4);
288 assertTrue(p.prestartCoreThread());
289 assertEquals(3, p.getPoolSize());
290 assertTrue(p.prestartCoreThread());
291 assertEquals(4, p.getPoolSize());
292 assertFalse(p.prestartCoreThread());
293 assertEquals(4, p.getPoolSize());
294 }
295 }
296
297 /**
298 * prestartAllCoreThreads starts all corePoolSize threads
299 */
300 public void testPrestartAllCoreThreads() {
301 ThreadPoolExecutor p =
302 new CustomTPE(2, 6,
303 LONG_DELAY_MS, MILLISECONDS,
304 new ArrayBlockingQueue<Runnable>(10));
305 try (PoolCleaner cleaner = cleaner(p)) {
306 assertEquals(0, p.getPoolSize());
307 p.prestartAllCoreThreads();
308 assertEquals(2, p.getPoolSize());
309 p.prestartAllCoreThreads();
310 assertEquals(2, p.getPoolSize());
311 p.setCorePoolSize(4);
312 p.prestartAllCoreThreads();
313 assertEquals(4, p.getPoolSize());
314 p.prestartAllCoreThreads();
315 assertEquals(4, p.getPoolSize());
316 }
317 }
318
319 /**
320 * getCompletedTaskCount increases, but doesn't overestimate,
321 * when tasks complete
322 */
323 public void testGetCompletedTaskCount() throws InterruptedException {
324 final ThreadPoolExecutor p =
325 new CustomTPE(2, 2,
326 LONG_DELAY_MS, MILLISECONDS,
327 new ArrayBlockingQueue<Runnable>(10));
328 try (PoolCleaner cleaner = cleaner(p)) {
329 final CountDownLatch threadStarted = new CountDownLatch(1);
330 final CountDownLatch threadProceed = new CountDownLatch(1);
331 final CountDownLatch threadDone = new CountDownLatch(1);
332 assertEquals(0, p.getCompletedTaskCount());
333 p.execute(new CheckedRunnable() {
334 public void realRun() throws InterruptedException {
335 threadStarted.countDown();
336 assertEquals(0, p.getCompletedTaskCount());
337 threadProceed.await();
338 threadDone.countDown();
339 }});
340 await(threadStarted);
341 assertEquals(0, p.getCompletedTaskCount());
342 threadProceed.countDown();
343 threadDone.await();
344 long startTime = System.nanoTime();
345 while (p.getCompletedTaskCount() != 1) {
346 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
347 fail("timed out");
348 Thread.yield();
349 }
350 }
351 }
352
353 /**
354 * getCorePoolSize returns size given in constructor if not otherwise set
355 */
356 public void testGetCorePoolSize() {
357 ThreadPoolExecutor p = new CustomTPE(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
358 assertEquals(1, p.getCorePoolSize());
359 joinPool(p);
360 }
361
362 /**
363 * getKeepAliveTime returns value given in constructor if not otherwise set
364 */
365 public void testGetKeepAliveTime() {
366 ThreadPoolExecutor p = new CustomTPE(2, 2, 1000, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
367 assertEquals(1, p.getKeepAliveTime(SECONDS));
368 joinPool(p);
369 }
370
371 /**
372 * getThreadFactory returns factory in constructor if not set
373 */
374 public void testGetThreadFactory() {
375 ThreadFactory tf = new SimpleThreadFactory();
376 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), tf, new NoOpREHandler());
377 assertSame(tf, p.getThreadFactory());
378 joinPool(p);
379 }
380
381 /**
382 * setThreadFactory sets the thread factory returned by getThreadFactory
383 */
384 public void testSetThreadFactory() {
385 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
386 ThreadFactory tf = new SimpleThreadFactory();
387 p.setThreadFactory(tf);
388 assertSame(tf, p.getThreadFactory());
389 joinPool(p);
390 }
391
392 /**
393 * setThreadFactory(null) throws NPE
394 */
395 public void testSetThreadFactoryNull() {
396 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
397 try {
398 p.setThreadFactory(null);
399 shouldThrow();
400 } catch (NullPointerException success) {
401 } finally {
402 joinPool(p);
403 }
404 }
405
406 /**
407 * getRejectedExecutionHandler returns handler in constructor if not set
408 */
409 public void testGetRejectedExecutionHandler() {
410 RejectedExecutionHandler h = new NoOpREHandler();
411 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), h);
412 assertSame(h, p.getRejectedExecutionHandler());
413 joinPool(p);
414 }
415
416 /**
417 * setRejectedExecutionHandler sets the handler returned by
418 * getRejectedExecutionHandler
419 */
420 public void testSetRejectedExecutionHandler() {
421 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
422 RejectedExecutionHandler h = new NoOpREHandler();
423 p.setRejectedExecutionHandler(h);
424 assertSame(h, p.getRejectedExecutionHandler());
425 joinPool(p);
426 }
427
428 /**
429 * setRejectedExecutionHandler(null) throws NPE
430 */
431 public void testSetRejectedExecutionHandlerNull() {
432 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
433 try {
434 p.setRejectedExecutionHandler(null);
435 shouldThrow();
436 } catch (NullPointerException success) {
437 } finally {
438 joinPool(p);
439 }
440 }
441
442 /**
443 * getLargestPoolSize increases, but doesn't overestimate, when
444 * multiple threads active
445 */
446 public void testGetLargestPoolSize() throws InterruptedException {
447 final int THREADS = 3;
448 final ThreadPoolExecutor p =
449 new CustomTPE(THREADS, THREADS,
450 LONG_DELAY_MS, MILLISECONDS,
451 new ArrayBlockingQueue<Runnable>(10));
452 final CountDownLatch threadsStarted = new CountDownLatch(THREADS);
453 final CountDownLatch done = new CountDownLatch(1);
454 try {
455 assertEquals(0, p.getLargestPoolSize());
456 for (int i = 0; i < THREADS; i++)
457 p.execute(new CheckedRunnable() {
458 public void realRun() throws InterruptedException {
459 threadsStarted.countDown();
460 done.await();
461 assertEquals(THREADS, p.getLargestPoolSize());
462 }});
463 assertTrue(threadsStarted.await(SMALL_DELAY_MS, MILLISECONDS));
464 assertEquals(THREADS, p.getLargestPoolSize());
465 } finally {
466 done.countDown();
467 joinPool(p);
468 assertEquals(THREADS, p.getLargestPoolSize());
469 }
470 }
471
472 /**
473 * getMaximumPoolSize returns value given in constructor if not
474 * otherwise set
475 */
476 public void testGetMaximumPoolSize() {
477 ThreadPoolExecutor p = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
478 assertEquals(2, p.getMaximumPoolSize());
479 joinPool(p);
480 }
481
482 /**
483 * getPoolSize increases, but doesn't overestimate, when threads
484 * become active
485 */
486 public void testGetPoolSize() throws InterruptedException {
487 final ThreadPoolExecutor p =
488 new CustomTPE(1, 1,
489 LONG_DELAY_MS, MILLISECONDS,
490 new ArrayBlockingQueue<Runnable>(10));
491 final CountDownLatch threadStarted = new CountDownLatch(1);
492 final CountDownLatch done = new CountDownLatch(1);
493 try {
494 assertEquals(0, p.getPoolSize());
495 p.execute(new CheckedRunnable() {
496 public void realRun() throws InterruptedException {
497 threadStarted.countDown();
498 assertEquals(1, p.getPoolSize());
499 done.await();
500 }});
501 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
502 assertEquals(1, p.getPoolSize());
503 } finally {
504 done.countDown();
505 joinPool(p);
506 }
507 }
508
509 /**
510 * getTaskCount increases, but doesn't overestimate, when tasks submitted
511 */
512 public void testGetTaskCount() throws InterruptedException {
513 final ThreadPoolExecutor p =
514 new CustomTPE(1, 1,
515 LONG_DELAY_MS, MILLISECONDS,
516 new ArrayBlockingQueue<Runnable>(10));
517 final CountDownLatch threadStarted = new CountDownLatch(1);
518 final CountDownLatch done = new CountDownLatch(1);
519 try {
520 assertEquals(0, p.getTaskCount());
521 p.execute(new CheckedRunnable() {
522 public void realRun() throws InterruptedException {
523 threadStarted.countDown();
524 assertEquals(1, p.getTaskCount());
525 done.await();
526 }});
527 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
528 assertEquals(1, p.getTaskCount());
529 } finally {
530 done.countDown();
531 joinPool(p);
532 }
533 }
534
535 /**
536 * isShutdown is false before shutdown, true after
537 */
538 public void testIsShutdown() {
539
540 ThreadPoolExecutor p = new CustomTPE(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
541 assertFalse(p.isShutdown());
542 try { p.shutdown(); } catch (SecurityException ok) { return; }
543 assertTrue(p.isShutdown());
544 joinPool(p);
545 }
546
547 /**
548 * isTerminated is false before termination, true after
549 */
550 public void testIsTerminated() throws InterruptedException {
551 final ThreadPoolExecutor p =
552 new CustomTPE(1, 1,
553 LONG_DELAY_MS, MILLISECONDS,
554 new ArrayBlockingQueue<Runnable>(10));
555 final CountDownLatch threadStarted = new CountDownLatch(1);
556 final CountDownLatch done = new CountDownLatch(1);
557 try {
558 assertFalse(p.isTerminating());
559 p.execute(new CheckedRunnable() {
560 public void realRun() throws InterruptedException {
561 assertFalse(p.isTerminating());
562 threadStarted.countDown();
563 done.await();
564 }});
565 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
566 assertFalse(p.isTerminating());
567 done.countDown();
568 } finally {
569 try { p.shutdown(); } catch (SecurityException ok) { return; }
570 }
571 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
572 assertTrue(p.isTerminated());
573 assertFalse(p.isTerminating());
574 }
575
576 /**
577 * isTerminating is not true when running or when terminated
578 */
579 public void testIsTerminating() throws InterruptedException {
580 final ThreadPoolExecutor p =
581 new CustomTPE(1, 1,
582 LONG_DELAY_MS, MILLISECONDS,
583 new ArrayBlockingQueue<Runnable>(10));
584 final CountDownLatch threadStarted = new CountDownLatch(1);
585 final CountDownLatch done = new CountDownLatch(1);
586 try {
587 assertFalse(p.isTerminating());
588 p.execute(new CheckedRunnable() {
589 public void realRun() throws InterruptedException {
590 assertFalse(p.isTerminating());
591 threadStarted.countDown();
592 done.await();
593 }});
594 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
595 assertFalse(p.isTerminating());
596 done.countDown();
597 } finally {
598 try { p.shutdown(); } catch (SecurityException ok) { return; }
599 }
600 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
601 assertTrue(p.isTerminated());
602 assertFalse(p.isTerminating());
603 }
604
605 /**
606 * getQueue returns the work queue, which contains queued tasks
607 */
608 public void testGetQueue() throws InterruptedException {
609 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
610 final ThreadPoolExecutor p =
611 new CustomTPE(1, 1,
612 LONG_DELAY_MS, MILLISECONDS,
613 q);
614 final CountDownLatch threadStarted = new CountDownLatch(1);
615 final CountDownLatch done = new CountDownLatch(1);
616 try {
617 FutureTask[] tasks = new FutureTask[5];
618 for (int i = 0; i < tasks.length; i++) {
619 Callable task = new CheckedCallable<Boolean>() {
620 public Boolean realCall() throws InterruptedException {
621 threadStarted.countDown();
622 assertSame(q, p.getQueue());
623 done.await();
624 return Boolean.TRUE;
625 }};
626 tasks[i] = new FutureTask(task);
627 p.execute(tasks[i]);
628 }
629 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
630 assertSame(q, p.getQueue());
631 assertFalse(q.contains(tasks[0]));
632 assertTrue(q.contains(tasks[tasks.length - 1]));
633 assertEquals(tasks.length - 1, q.size());
634 } finally {
635 done.countDown();
636 joinPool(p);
637 }
638 }
639
640 /**
641 * remove(task) removes queued task, and fails to remove active task
642 */
643 public void testRemove() throws InterruptedException {
644 BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
645 final ThreadPoolExecutor p =
646 new CustomTPE(1, 1,
647 LONG_DELAY_MS, MILLISECONDS,
648 q);
649 Runnable[] tasks = new Runnable[6];
650 final CountDownLatch threadStarted = new CountDownLatch(1);
651 final CountDownLatch done = new CountDownLatch(1);
652 try {
653 for (int i = 0; i < tasks.length; i++) {
654 tasks[i] = new CheckedRunnable() {
655 public void realRun() throws InterruptedException {
656 threadStarted.countDown();
657 done.await();
658 }};
659 p.execute(tasks[i]);
660 }
661 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
662 assertFalse(p.remove(tasks[0]));
663 assertTrue(q.contains(tasks[4]));
664 assertTrue(q.contains(tasks[3]));
665 assertTrue(p.remove(tasks[4]));
666 assertFalse(p.remove(tasks[4]));
667 assertFalse(q.contains(tasks[4]));
668 assertTrue(q.contains(tasks[3]));
669 assertTrue(p.remove(tasks[3]));
670 assertFalse(q.contains(tasks[3]));
671 } finally {
672 done.countDown();
673 joinPool(p);
674 }
675 }
676
677 /**
678 * purge removes cancelled tasks from the queue
679 */
680 public void testPurge() throws InterruptedException {
681 final CountDownLatch threadStarted = new CountDownLatch(1);
682 final CountDownLatch done = new CountDownLatch(1);
683 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
684 final ThreadPoolExecutor p =
685 new CustomTPE(1, 1,
686 LONG_DELAY_MS, MILLISECONDS,
687 q);
688 FutureTask[] tasks = new FutureTask[5];
689 try {
690 for (int i = 0; i < tasks.length; i++) {
691 Callable task = new CheckedCallable<Boolean>() {
692 public Boolean realCall() throws InterruptedException {
693 threadStarted.countDown();
694 done.await();
695 return Boolean.TRUE;
696 }};
697 tasks[i] = new FutureTask(task);
698 p.execute(tasks[i]);
699 }
700 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
701 assertEquals(tasks.length, p.getTaskCount());
702 assertEquals(tasks.length - 1, q.size());
703 assertEquals(1L, p.getActiveCount());
704 assertEquals(0L, p.getCompletedTaskCount());
705 tasks[4].cancel(true);
706 tasks[3].cancel(false);
707 p.purge();
708 assertEquals(tasks.length - 3, q.size());
709 assertEquals(tasks.length - 2, p.getTaskCount());
710 p.purge(); // Nothing to do
711 assertEquals(tasks.length - 3, q.size());
712 assertEquals(tasks.length - 2, p.getTaskCount());
713 } finally {
714 done.countDown();
715 joinPool(p);
716 }
717 }
718
719 /**
720 * shutdownNow returns a list containing tasks that were not run,
721 * and those tasks are drained from the queue
722 */
723 public void testShutdownNow() throws InterruptedException {
724 final int poolSize = 2;
725 final int count = 5;
726 final AtomicInteger ran = new AtomicInteger(0);
727 ThreadPoolExecutor p =
728 new CustomTPE(poolSize, poolSize, LONG_DELAY_MS, MILLISECONDS,
729 new ArrayBlockingQueue<Runnable>(10));
730 CountDownLatch threadsStarted = new CountDownLatch(poolSize);
731 Runnable waiter = new CheckedRunnable() { public void realRun() {
732 threadsStarted.countDown();
733 try {
734 MILLISECONDS.sleep(2 * LONG_DELAY_MS);
735 } catch (InterruptedException success) {}
736 ran.getAndIncrement();
737 }};
738 for (int i = 0; i < count; i++)
739 p.execute(waiter);
740 assertTrue(threadsStarted.await(LONG_DELAY_MS, MILLISECONDS));
741 assertEquals(poolSize, p.getActiveCount());
742 assertEquals(0, p.getCompletedTaskCount());
743 final List<Runnable> queuedTasks;
744 try {
745 queuedTasks = p.shutdownNow();
746 } catch (SecurityException ok) {
747 return; // Allowed in case test doesn't have privs
748 }
749 assertTrue(p.isShutdown());
750 assertTrue(p.getQueue().isEmpty());
751 assertEquals(count - poolSize, queuedTasks.size());
752 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
753 assertTrue(p.isTerminated());
754 assertEquals(poolSize, ran.get());
755 assertEquals(poolSize, p.getCompletedTaskCount());
756 }
757
758 // Exception Tests
759
760 /**
761 * Constructor throws if corePoolSize argument is less than zero
762 */
763 public void testConstructor1() {
764 try {
765 new CustomTPE(-1, 1, 1L, SECONDS,
766 new ArrayBlockingQueue<Runnable>(10));
767 shouldThrow();
768 } catch (IllegalArgumentException success) {}
769 }
770
771 /**
772 * Constructor throws if maximumPoolSize is less than zero
773 */
774 public void testConstructor2() {
775 try {
776 new CustomTPE(1, -1, 1L, SECONDS,
777 new ArrayBlockingQueue<Runnable>(10));
778 shouldThrow();
779 } catch (IllegalArgumentException success) {}
780 }
781
782 /**
783 * Constructor throws if maximumPoolSize is equal to zero
784 */
785 public void testConstructor3() {
786 try {
787 new CustomTPE(1, 0, 1L, SECONDS,
788 new ArrayBlockingQueue<Runnable>(10));
789 shouldThrow();
790 } catch (IllegalArgumentException success) {}
791 }
792
793 /**
794 * Constructor throws if keepAliveTime is less than zero
795 */
796 public void testConstructor4() {
797 try {
798 new CustomTPE(1, 2, -1L, SECONDS,
799 new ArrayBlockingQueue<Runnable>(10));
800 shouldThrow();
801 } catch (IllegalArgumentException success) {}
802 }
803
804 /**
805 * Constructor throws if corePoolSize is greater than the maximumPoolSize
806 */
807 public void testConstructor5() {
808 try {
809 new CustomTPE(2, 1, 1L, SECONDS,
810 new ArrayBlockingQueue<Runnable>(10));
811 shouldThrow();
812 } catch (IllegalArgumentException success) {}
813 }
814
815 /**
816 * Constructor throws if workQueue is set to null
817 */
818 public void testConstructorNullPointerException() {
819 try {
820 new CustomTPE(1, 2, 1L, SECONDS, null);
821 shouldThrow();
822 } catch (NullPointerException success) {}
823 }
824
825 /**
826 * Constructor throws if corePoolSize argument is less than zero
827 */
828 public void testConstructor6() {
829 try {
830 new CustomTPE(-1, 1, 1L, SECONDS,
831 new ArrayBlockingQueue<Runnable>(10),
832 new SimpleThreadFactory());
833 shouldThrow();
834 } catch (IllegalArgumentException success) {}
835 }
836
837 /**
838 * Constructor throws if maximumPoolSize is less than zero
839 */
840 public void testConstructor7() {
841 try {
842 new CustomTPE(1,-1, 1L, SECONDS,
843 new ArrayBlockingQueue<Runnable>(10),
844 new SimpleThreadFactory());
845 shouldThrow();
846 } catch (IllegalArgumentException success) {}
847 }
848
849 /**
850 * Constructor throws if maximumPoolSize is equal to zero
851 */
852 public void testConstructor8() {
853 try {
854 new CustomTPE(1, 0, 1L, SECONDS,
855 new ArrayBlockingQueue<Runnable>(10),
856 new SimpleThreadFactory());
857 shouldThrow();
858 } catch (IllegalArgumentException success) {}
859 }
860
861 /**
862 * Constructor throws if keepAliveTime is less than zero
863 */
864 public void testConstructor9() {
865 try {
866 new CustomTPE(1, 2, -1L, SECONDS,
867 new ArrayBlockingQueue<Runnable>(10),
868 new SimpleThreadFactory());
869 shouldThrow();
870 } catch (IllegalArgumentException success) {}
871 }
872
873 /**
874 * Constructor throws if corePoolSize is greater than the maximumPoolSize
875 */
876 public void testConstructor10() {
877 try {
878 new CustomTPE(2, 1, 1L, SECONDS,
879 new ArrayBlockingQueue<Runnable>(10),
880 new SimpleThreadFactory());
881 shouldThrow();
882 } catch (IllegalArgumentException success) {}
883 }
884
885 /**
886 * Constructor throws if workQueue is set to null
887 */
888 public void testConstructorNullPointerException2() {
889 try {
890 new CustomTPE(1, 2, 1L, SECONDS, null, new SimpleThreadFactory());
891 shouldThrow();
892 } catch (NullPointerException success) {}
893 }
894
895 /**
896 * Constructor throws if threadFactory is set to null
897 */
898 public void testConstructorNullPointerException3() {
899 try {
900 new CustomTPE(1, 2, 1L, SECONDS,
901 new ArrayBlockingQueue<Runnable>(10),
902 (ThreadFactory) null);
903 shouldThrow();
904 } catch (NullPointerException success) {}
905 }
906
907 /**
908 * Constructor throws if corePoolSize argument is less than zero
909 */
910 public void testConstructor11() {
911 try {
912 new CustomTPE(-1, 1, 1L, SECONDS,
913 new ArrayBlockingQueue<Runnable>(10),
914 new NoOpREHandler());
915 shouldThrow();
916 } catch (IllegalArgumentException success) {}
917 }
918
919 /**
920 * Constructor throws if maximumPoolSize is less than zero
921 */
922 public void testConstructor12() {
923 try {
924 new CustomTPE(1, -1, 1L, SECONDS,
925 new ArrayBlockingQueue<Runnable>(10),
926 new NoOpREHandler());
927 shouldThrow();
928 } catch (IllegalArgumentException success) {}
929 }
930
931 /**
932 * Constructor throws if maximumPoolSize is equal to zero
933 */
934 public void testConstructor13() {
935 try {
936 new CustomTPE(1, 0, 1L, SECONDS,
937 new ArrayBlockingQueue<Runnable>(10),
938 new NoOpREHandler());
939 shouldThrow();
940 } catch (IllegalArgumentException success) {}
941 }
942
943 /**
944 * Constructor throws if keepAliveTime is less than zero
945 */
946 public void testConstructor14() {
947 try {
948 new CustomTPE(1, 2, -1L, SECONDS,
949 new ArrayBlockingQueue<Runnable>(10),
950 new NoOpREHandler());
951 shouldThrow();
952 } catch (IllegalArgumentException success) {}
953 }
954
955 /**
956 * Constructor throws if corePoolSize is greater than the maximumPoolSize
957 */
958 public void testConstructor15() {
959 try {
960 new CustomTPE(2, 1, 1L, SECONDS,
961 new ArrayBlockingQueue<Runnable>(10),
962 new NoOpREHandler());
963 shouldThrow();
964 } catch (IllegalArgumentException success) {}
965 }
966
967 /**
968 * Constructor throws if workQueue is set to null
969 */
970 public void testConstructorNullPointerException4() {
971 try {
972 new CustomTPE(1, 2, 1L, SECONDS,
973 null,
974 new NoOpREHandler());
975 shouldThrow();
976 } catch (NullPointerException success) {}
977 }
978
979 /**
980 * Constructor throws if handler is set to null
981 */
982 public void testConstructorNullPointerException5() {
983 try {
984 new CustomTPE(1, 2, 1L, SECONDS,
985 new ArrayBlockingQueue<Runnable>(10),
986 (RejectedExecutionHandler) null);
987 shouldThrow();
988 } catch (NullPointerException success) {}
989 }
990
991 /**
992 * Constructor throws if corePoolSize argument is less than zero
993 */
994 public void testConstructor16() {
995 try {
996 new CustomTPE(-1, 1, 1L, SECONDS,
997 new ArrayBlockingQueue<Runnable>(10),
998 new SimpleThreadFactory(),
999 new NoOpREHandler());
1000 shouldThrow();
1001 } catch (IllegalArgumentException success) {}
1002 }
1003
1004 /**
1005 * Constructor throws if maximumPoolSize is less than zero
1006 */
1007 public void testConstructor17() {
1008 try {
1009 new CustomTPE(1, -1, 1L, SECONDS,
1010 new ArrayBlockingQueue<Runnable>(10),
1011 new SimpleThreadFactory(),
1012 new NoOpREHandler());
1013 shouldThrow();
1014 } catch (IllegalArgumentException success) {}
1015 }
1016
1017 /**
1018 * Constructor throws if maximumPoolSize is equal to zero
1019 */
1020 public void testConstructor18() {
1021 try {
1022 new CustomTPE(1, 0, 1L, SECONDS,
1023 new ArrayBlockingQueue<Runnable>(10),
1024 new SimpleThreadFactory(),
1025 new NoOpREHandler());
1026 shouldThrow();
1027 } catch (IllegalArgumentException success) {}
1028 }
1029
1030 /**
1031 * Constructor throws if keepAliveTime is less than zero
1032 */
1033 public void testConstructor19() {
1034 try {
1035 new CustomTPE(1, 2, -1L, SECONDS,
1036 new ArrayBlockingQueue<Runnable>(10),
1037 new SimpleThreadFactory(),
1038 new NoOpREHandler());
1039 shouldThrow();
1040 } catch (IllegalArgumentException success) {}
1041 }
1042
1043 /**
1044 * Constructor throws if corePoolSize is greater than the maximumPoolSize
1045 */
1046 public void testConstructor20() {
1047 try {
1048 new CustomTPE(2, 1, 1L, SECONDS,
1049 new ArrayBlockingQueue<Runnable>(10),
1050 new SimpleThreadFactory(),
1051 new NoOpREHandler());
1052 shouldThrow();
1053 } catch (IllegalArgumentException success) {}
1054 }
1055
1056 /**
1057 * Constructor throws if workQueue is null
1058 */
1059 public void testConstructorNullPointerException6() {
1060 try {
1061 new CustomTPE(1, 2, 1L, SECONDS,
1062 null,
1063 new SimpleThreadFactory(),
1064 new NoOpREHandler());
1065 shouldThrow();
1066 } catch (NullPointerException success) {}
1067 }
1068
1069 /**
1070 * Constructor throws if handler is null
1071 */
1072 public void testConstructorNullPointerException7() {
1073 try {
1074 new CustomTPE(1, 2, 1L, SECONDS,
1075 new ArrayBlockingQueue<Runnable>(10),
1076 new SimpleThreadFactory(),
1077 (RejectedExecutionHandler) null);
1078 shouldThrow();
1079 } catch (NullPointerException success) {}
1080 }
1081
1082 /**
1083 * Constructor throws if ThreadFactory is null
1084 */
1085 public void testConstructorNullPointerException8() {
1086 try {
1087 new CustomTPE(1, 2, 1L, SECONDS,
1088 new ArrayBlockingQueue<Runnable>(10),
1089 (ThreadFactory) null,
1090 new NoOpREHandler());
1091 shouldThrow();
1092 } catch (NullPointerException success) {}
1093 }
1094
1095 /**
1096 * execute throws RejectedExecutionException if saturated.
1097 */
1098 public void testSaturatedExecute() {
1099 ThreadPoolExecutor p =
1100 new CustomTPE(1, 1,
1101 LONG_DELAY_MS, MILLISECONDS,
1102 new ArrayBlockingQueue<Runnable>(1));
1103 final CountDownLatch done = new CountDownLatch(1);
1104 try {
1105 Runnable task = new CheckedRunnable() {
1106 public void realRun() throws InterruptedException {
1107 done.await();
1108 }};
1109 for (int i = 0; i < 2; ++i)
1110 p.execute(task);
1111 for (int i = 0; i < 2; ++i) {
1112 try {
1113 p.execute(task);
1114 shouldThrow();
1115 } catch (RejectedExecutionException success) {}
1116 assertTrue(p.getTaskCount() <= 2);
1117 }
1118 } finally {
1119 done.countDown();
1120 joinPool(p);
1121 }
1122 }
1123
1124 /**
1125 * executor using CallerRunsPolicy runs task if saturated.
1126 */
1127 public void testSaturatedExecute2() {
1128 RejectedExecutionHandler h = new CustomTPE.CallerRunsPolicy();
1129 ThreadPoolExecutor p = new CustomTPE(1, 1,
1130 LONG_DELAY_MS, MILLISECONDS,
1131 new ArrayBlockingQueue<Runnable>(1),
1132 h);
1133 try {
1134 TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5];
1135 for (int i = 0; i < tasks.length; ++i)
1136 tasks[i] = new TrackedNoOpRunnable();
1137 TrackedLongRunnable mr = new TrackedLongRunnable();
1138 p.execute(mr);
1139 for (int i = 0; i < tasks.length; ++i)
1140 p.execute(tasks[i]);
1141 for (int i = 1; i < tasks.length; ++i)
1142 assertTrue(tasks[i].done);
1143 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1144 } finally {
1145 joinPool(p);
1146 }
1147 }
1148
1149 /**
1150 * executor using DiscardPolicy drops task if saturated.
1151 */
1152 public void testSaturatedExecute3() {
1153 RejectedExecutionHandler h = new CustomTPE.DiscardPolicy();
1154 ThreadPoolExecutor p =
1155 new CustomTPE(1, 1,
1156 LONG_DELAY_MS, MILLISECONDS,
1157 new ArrayBlockingQueue<Runnable>(1),
1158 h);
1159 try {
1160 TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5];
1161 for (int i = 0; i < tasks.length; ++i)
1162 tasks[i] = new TrackedNoOpRunnable();
1163 p.execute(new TrackedLongRunnable());
1164 for (TrackedNoOpRunnable task : tasks)
1165 p.execute(task);
1166 for (TrackedNoOpRunnable task : tasks)
1167 assertFalse(task.done);
1168 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1169 } finally {
1170 joinPool(p);
1171 }
1172 }
1173
1174 /**
1175 * executor using DiscardOldestPolicy drops oldest task if saturated.
1176 */
1177 public void testSaturatedExecute4() {
1178 RejectedExecutionHandler h = new CustomTPE.DiscardOldestPolicy();
1179 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1180 try {
1181 p.execute(new TrackedLongRunnable());
1182 TrackedLongRunnable r2 = new TrackedLongRunnable();
1183 p.execute(r2);
1184 assertTrue(p.getQueue().contains(r2));
1185 TrackedNoOpRunnable r3 = new TrackedNoOpRunnable();
1186 p.execute(r3);
1187 assertFalse(p.getQueue().contains(r2));
1188 assertTrue(p.getQueue().contains(r3));
1189 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1190 } finally {
1191 joinPool(p);
1192 }
1193 }
1194
1195 /**
1196 * execute throws RejectedExecutionException if shutdown
1197 */
1198 public void testRejectedExecutionExceptionOnShutdown() {
1199 ThreadPoolExecutor p =
1200 new CustomTPE(1,1,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(1));
1201 try { p.shutdown(); } catch (SecurityException ok) { return; }
1202 try {
1203 p.execute(new NoOpRunnable());
1204 shouldThrow();
1205 } catch (RejectedExecutionException success) {}
1206
1207 joinPool(p);
1208 }
1209
1210 /**
1211 * execute using CallerRunsPolicy drops task on shutdown
1212 */
1213 public void testCallerRunsOnShutdown() {
1214 RejectedExecutionHandler h = new CustomTPE.CallerRunsPolicy();
1215 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1216
1217 try { p.shutdown(); } catch (SecurityException ok) { return; }
1218 try {
1219 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1220 p.execute(r);
1221 assertFalse(r.done);
1222 } finally {
1223 joinPool(p);
1224 }
1225 }
1226
1227 /**
1228 * execute using DiscardPolicy drops task on shutdown
1229 */
1230 public void testDiscardOnShutdown() {
1231 RejectedExecutionHandler h = new CustomTPE.DiscardPolicy();
1232 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1233
1234 try { p.shutdown(); } catch (SecurityException ok) { return; }
1235 try {
1236 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1237 p.execute(r);
1238 assertFalse(r.done);
1239 } finally {
1240 joinPool(p);
1241 }
1242 }
1243
1244 /**
1245 * execute using DiscardOldestPolicy drops task on shutdown
1246 */
1247 public void testDiscardOldestOnShutdown() {
1248 RejectedExecutionHandler h = new CustomTPE.DiscardOldestPolicy();
1249 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1250
1251 try { p.shutdown(); } catch (SecurityException ok) { return; }
1252 try {
1253 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1254 p.execute(r);
1255 assertFalse(r.done);
1256 } finally {
1257 joinPool(p);
1258 }
1259 }
1260
1261 /**
1262 * execute(null) throws NPE
1263 */
1264 public void testExecuteNull() {
1265 ThreadPoolExecutor p =
1266 new CustomTPE(1, 2, 1L, SECONDS,
1267 new ArrayBlockingQueue<Runnable>(10));
1268 try {
1269 p.execute(null);
1270 shouldThrow();
1271 } catch (NullPointerException success) {}
1272
1273 joinPool(p);
1274 }
1275
1276 /**
1277 * setCorePoolSize of negative value throws IllegalArgumentException
1278 */
1279 public void testCorePoolSizeIllegalArgumentException() {
1280 ThreadPoolExecutor p =
1281 new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1282 try {
1283 p.setCorePoolSize(-1);
1284 shouldThrow();
1285 } catch (IllegalArgumentException success) {
1286 } finally {
1287 try { p.shutdown(); } catch (SecurityException ok) { return; }
1288 }
1289 joinPool(p);
1290 }
1291
1292 /**
1293 * setMaximumPoolSize(int) throws IllegalArgumentException
1294 * if given a value less the core pool size
1295 */
1296 public void testMaximumPoolSizeIllegalArgumentException() {
1297 ThreadPoolExecutor p =
1298 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1299 try {
1300 p.setMaximumPoolSize(1);
1301 shouldThrow();
1302 } catch (IllegalArgumentException success) {
1303 } finally {
1304 try { p.shutdown(); } catch (SecurityException ok) { return; }
1305 }
1306 joinPool(p);
1307 }
1308
1309 /**
1310 * setMaximumPoolSize throws IllegalArgumentException
1311 * if given a negative value
1312 */
1313 public void testMaximumPoolSizeIllegalArgumentException2() {
1314 ThreadPoolExecutor p =
1315 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1316 try {
1317 p.setMaximumPoolSize(-1);
1318 shouldThrow();
1319 } catch (IllegalArgumentException success) {
1320 } finally {
1321 try { p.shutdown(); } catch (SecurityException ok) { return; }
1322 }
1323 joinPool(p);
1324 }
1325
1326 /**
1327 * setKeepAliveTime throws IllegalArgumentException
1328 * when given a negative value
1329 */
1330 public void testKeepAliveTimeIllegalArgumentException() {
1331 ThreadPoolExecutor p =
1332 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1333
1334 try {
1335 p.setKeepAliveTime(-1,MILLISECONDS);
1336 shouldThrow();
1337 } catch (IllegalArgumentException success) {
1338 } finally {
1339 try { p.shutdown(); } catch (SecurityException ok) { return; }
1340 }
1341 joinPool(p);
1342 }
1343
1344 /**
1345 * terminated() is called on termination
1346 */
1347 public void testTerminated() {
1348 CustomTPE p = new CustomTPE();
1349 try { p.shutdown(); } catch (SecurityException ok) { return; }
1350 assertTrue(p.terminatedCalled());
1351 joinPool(p);
1352 }
1353
1354 /**
1355 * beforeExecute and afterExecute are called when executing task
1356 */
1357 public void testBeforeAfter() throws InterruptedException {
1358 CustomTPE p = new CustomTPE();
1359 try {
1360 final CountDownLatch done = new CountDownLatch(1);
1361 p.execute(new CheckedRunnable() {
1362 public void realRun() {
1363 done.countDown();
1364 }});
1365 await(p.afterCalled);
1366 assertEquals(0, done.getCount());
1367 assertTrue(p.afterCalled());
1368 assertTrue(p.beforeCalled());
1369 try { p.shutdown(); } catch (SecurityException ok) { return; }
1370 } finally {
1371 joinPool(p);
1372 }
1373 }
1374
1375 /**
1376 * completed submit of callable returns result
1377 */
1378 public void testSubmitCallable() throws Exception {
1379 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1380 try {
1381 Future<String> future = e.submit(new StringTask());
1382 String result = future.get();
1383 assertSame(TEST_STRING, result);
1384 } finally {
1385 joinPool(e);
1386 }
1387 }
1388
1389 /**
1390 * completed submit of runnable returns successfully
1391 */
1392 public void testSubmitRunnable() throws Exception {
1393 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1394 try {
1395 Future<?> future = e.submit(new NoOpRunnable());
1396 future.get();
1397 assertTrue(future.isDone());
1398 } finally {
1399 joinPool(e);
1400 }
1401 }
1402
1403 /**
1404 * completed submit of (runnable, result) returns result
1405 */
1406 public void testSubmitRunnable2() throws Exception {
1407 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1408 try {
1409 Future<String> future = e.submit(new NoOpRunnable(), TEST_STRING);
1410 String result = future.get();
1411 assertSame(TEST_STRING, result);
1412 } finally {
1413 joinPool(e);
1414 }
1415 }
1416
1417 /**
1418 * invokeAny(null) throws NPE
1419 */
1420 public void testInvokeAny1() throws Exception {
1421 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1422 try {
1423 e.invokeAny(null);
1424 shouldThrow();
1425 } catch (NullPointerException success) {
1426 } finally {
1427 joinPool(e);
1428 }
1429 }
1430
1431 /**
1432 * invokeAny(empty collection) throws IAE
1433 */
1434 public void testInvokeAny2() throws Exception {
1435 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1436 try {
1437 e.invokeAny(new ArrayList<Callable<String>>());
1438 shouldThrow();
1439 } catch (IllegalArgumentException success) {
1440 } finally {
1441 joinPool(e);
1442 }
1443 }
1444
1445 /**
1446 * invokeAny(c) throws NPE if c has null elements
1447 */
1448 public void testInvokeAny3() throws Exception {
1449 CountDownLatch latch = new CountDownLatch(1);
1450 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1451 List<Callable<String>> l = new ArrayList<Callable<String>>();
1452 l.add(latchAwaitingStringTask(latch));
1453 l.add(null);
1454 try {
1455 e.invokeAny(l);
1456 shouldThrow();
1457 } catch (NullPointerException success) {
1458 } finally {
1459 latch.countDown();
1460 joinPool(e);
1461 }
1462 }
1463
1464 /**
1465 * invokeAny(c) throws ExecutionException if no task completes
1466 */
1467 public void testInvokeAny4() throws Exception {
1468 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1469 List<Callable<String>> l = new ArrayList<Callable<String>>();
1470 l.add(new NPETask());
1471 try {
1472 e.invokeAny(l);
1473 shouldThrow();
1474 } catch (ExecutionException success) {
1475 assertTrue(success.getCause() instanceof NullPointerException);
1476 } finally {
1477 joinPool(e);
1478 }
1479 }
1480
1481 /**
1482 * invokeAny(c) returns result of some task
1483 */
1484 public void testInvokeAny5() throws Exception {
1485 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1486 try {
1487 List<Callable<String>> l = new ArrayList<Callable<String>>();
1488 l.add(new StringTask());
1489 l.add(new StringTask());
1490 String result = e.invokeAny(l);
1491 assertSame(TEST_STRING, result);
1492 } finally {
1493 joinPool(e);
1494 }
1495 }
1496
1497 /**
1498 * invokeAll(null) throws NPE
1499 */
1500 public void testInvokeAll1() throws Exception {
1501 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1502 try {
1503 e.invokeAll(null);
1504 shouldThrow();
1505 } catch (NullPointerException success) {
1506 } finally {
1507 joinPool(e);
1508 }
1509 }
1510
1511 /**
1512 * invokeAll(empty collection) returns empty collection
1513 */
1514 public void testInvokeAll2() throws Exception {
1515 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1516 try {
1517 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>());
1518 assertTrue(r.isEmpty());
1519 } finally {
1520 joinPool(e);
1521 }
1522 }
1523
1524 /**
1525 * invokeAll(c) throws NPE if c has null elements
1526 */
1527 public void testInvokeAll3() throws Exception {
1528 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1529 List<Callable<String>> l = new ArrayList<Callable<String>>();
1530 l.add(new StringTask());
1531 l.add(null);
1532 try {
1533 e.invokeAll(l);
1534 shouldThrow();
1535 } catch (NullPointerException success) {
1536 } finally {
1537 joinPool(e);
1538 }
1539 }
1540
1541 /**
1542 * get of element of invokeAll(c) throws exception on failed task
1543 */
1544 public void testInvokeAll4() throws Exception {
1545 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1546 List<Callable<String>> l = new ArrayList<Callable<String>>();
1547 l.add(new NPETask());
1548 List<Future<String>> futures = e.invokeAll(l);
1549 assertEquals(1, futures.size());
1550 try {
1551 futures.get(0).get();
1552 shouldThrow();
1553 } catch (ExecutionException success) {
1554 assertTrue(success.getCause() instanceof NullPointerException);
1555 } finally {
1556 joinPool(e);
1557 }
1558 }
1559
1560 /**
1561 * invokeAll(c) returns results of all completed tasks
1562 */
1563 public void testInvokeAll5() throws Exception {
1564 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1565 try {
1566 List<Callable<String>> l = new ArrayList<Callable<String>>();
1567 l.add(new StringTask());
1568 l.add(new StringTask());
1569 List<Future<String>> futures = e.invokeAll(l);
1570 assertEquals(2, futures.size());
1571 for (Future<String> future : futures)
1572 assertSame(TEST_STRING, future.get());
1573 } finally {
1574 joinPool(e);
1575 }
1576 }
1577
1578 /**
1579 * timed invokeAny(null) throws NPE
1580 */
1581 public void testTimedInvokeAny1() throws Exception {
1582 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1583 try {
1584 e.invokeAny(null, MEDIUM_DELAY_MS, MILLISECONDS);
1585 shouldThrow();
1586 } catch (NullPointerException success) {
1587 } finally {
1588 joinPool(e);
1589 }
1590 }
1591
1592 /**
1593 * timed invokeAny(,,null) throws NPE
1594 */
1595 public void testTimedInvokeAnyNullTimeUnit() throws Exception {
1596 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1597 List<Callable<String>> l = new ArrayList<Callable<String>>();
1598 l.add(new StringTask());
1599 try {
1600 e.invokeAny(l, MEDIUM_DELAY_MS, null);
1601 shouldThrow();
1602 } catch (NullPointerException success) {
1603 } finally {
1604 joinPool(e);
1605 }
1606 }
1607
1608 /**
1609 * timed invokeAny(empty collection) throws IAE
1610 */
1611 public void testTimedInvokeAny2() throws Exception {
1612 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1613 try {
1614 e.invokeAny(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1615 shouldThrow();
1616 } catch (IllegalArgumentException success) {
1617 } finally {
1618 joinPool(e);
1619 }
1620 }
1621
1622 /**
1623 * timed invokeAny(c) throws NPE if c has null elements
1624 */
1625 public void testTimedInvokeAny3() throws Exception {
1626 CountDownLatch latch = new CountDownLatch(1);
1627 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1628 List<Callable<String>> l = new ArrayList<Callable<String>>();
1629 l.add(latchAwaitingStringTask(latch));
1630 l.add(null);
1631 try {
1632 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1633 shouldThrow();
1634 } catch (NullPointerException success) {
1635 } finally {
1636 latch.countDown();
1637 joinPool(e);
1638 }
1639 }
1640
1641 /**
1642 * timed invokeAny(c) throws ExecutionException if no task completes
1643 */
1644 public void testTimedInvokeAny4() throws Exception {
1645 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1646 List<Callable<String>> l = new ArrayList<Callable<String>>();
1647 l.add(new NPETask());
1648 try {
1649 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1650 shouldThrow();
1651 } catch (ExecutionException success) {
1652 assertTrue(success.getCause() instanceof NullPointerException);
1653 } finally {
1654 joinPool(e);
1655 }
1656 }
1657
1658 /**
1659 * timed invokeAny(c) returns result of some task
1660 */
1661 public void testTimedInvokeAny5() throws Exception {
1662 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1663 try {
1664 List<Callable<String>> l = new ArrayList<Callable<String>>();
1665 l.add(new StringTask());
1666 l.add(new StringTask());
1667 String result = e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1668 assertSame(TEST_STRING, result);
1669 } finally {
1670 joinPool(e);
1671 }
1672 }
1673
1674 /**
1675 * timed invokeAll(null) throws NPE
1676 */
1677 public void testTimedInvokeAll1() throws Exception {
1678 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1679 try {
1680 e.invokeAll(null, MEDIUM_DELAY_MS, MILLISECONDS);
1681 shouldThrow();
1682 } catch (NullPointerException success) {
1683 } finally {
1684 joinPool(e);
1685 }
1686 }
1687
1688 /**
1689 * timed invokeAll(,,null) throws NPE
1690 */
1691 public void testTimedInvokeAllNullTimeUnit() throws Exception {
1692 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1693 List<Callable<String>> l = new ArrayList<Callable<String>>();
1694 l.add(new StringTask());
1695 try {
1696 e.invokeAll(l, MEDIUM_DELAY_MS, null);
1697 shouldThrow();
1698 } catch (NullPointerException success) {
1699 } finally {
1700 joinPool(e);
1701 }
1702 }
1703
1704 /**
1705 * timed invokeAll(empty collection) returns empty collection
1706 */
1707 public void testTimedInvokeAll2() throws Exception {
1708 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1709 try {
1710 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1711 assertTrue(r.isEmpty());
1712 } finally {
1713 joinPool(e);
1714 }
1715 }
1716
1717 /**
1718 * timed invokeAll(c) throws NPE if c has null elements
1719 */
1720 public void testTimedInvokeAll3() throws Exception {
1721 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1722 List<Callable<String>> l = new ArrayList<Callable<String>>();
1723 l.add(new StringTask());
1724 l.add(null);
1725 try {
1726 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1727 shouldThrow();
1728 } catch (NullPointerException success) {
1729 } finally {
1730 joinPool(e);
1731 }
1732 }
1733
1734 /**
1735 * get of element of invokeAll(c) throws exception on failed task
1736 */
1737 public void testTimedInvokeAll4() throws Exception {
1738 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1739 List<Callable<String>> l = new ArrayList<Callable<String>>();
1740 l.add(new NPETask());
1741 List<Future<String>> futures =
1742 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1743 assertEquals(1, futures.size());
1744 try {
1745 futures.get(0).get();
1746 shouldThrow();
1747 } catch (ExecutionException success) {
1748 assertTrue(success.getCause() instanceof NullPointerException);
1749 } finally {
1750 joinPool(e);
1751 }
1752 }
1753
1754 /**
1755 * timed invokeAll(c) returns results of all completed tasks
1756 */
1757 public void testTimedInvokeAll5() throws Exception {
1758 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1759 try {
1760 List<Callable<String>> l = new ArrayList<Callable<String>>();
1761 l.add(new StringTask());
1762 l.add(new StringTask());
1763 List<Future<String>> futures =
1764 e.invokeAll(l, LONG_DELAY_MS, MILLISECONDS);
1765 assertEquals(2, futures.size());
1766 for (Future<String> future : futures)
1767 assertSame(TEST_STRING, future.get());
1768 } finally {
1769 joinPool(e);
1770 }
1771 }
1772
1773 /**
1774 * timed invokeAll(c) cancels tasks not completed by timeout
1775 */
1776 public void testTimedInvokeAll6() throws Exception {
1777 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1778 try {
1779 for (long timeout = timeoutMillis();;) {
1780 List<Callable<String>> tasks = new ArrayList<>();
1781 tasks.add(new StringTask("0"));
1782 tasks.add(Executors.callable(new LongPossiblyInterruptedRunnable(), TEST_STRING));
1783 tasks.add(new StringTask("2"));
1784 long startTime = System.nanoTime();
1785 List<Future<String>> futures =
1786 e.invokeAll(tasks, timeout, MILLISECONDS);
1787 assertEquals(tasks.size(), futures.size());
1788 assertTrue(millisElapsedSince(startTime) >= timeout);
1789 for (Future future : futures)
1790 assertTrue(future.isDone());
1791 assertTrue(futures.get(1).isCancelled());
1792 try {
1793 assertEquals("0", futures.get(0).get());
1794 assertEquals("2", futures.get(2).get());
1795 break;
1796 } catch (CancellationException retryWithLongerTimeout) {
1797 timeout *= 2;
1798 if (timeout >= LONG_DELAY_MS / 2)
1799 fail("expected exactly one task to be cancelled");
1800 }
1801 }
1802 } finally {
1803 joinPool(e);
1804 }
1805 }
1806
1807 /**
1808 * Execution continues if there is at least one thread even if
1809 * thread factory fails to create more
1810 */
1811 public void testFailingThreadFactory() throws InterruptedException {
1812 final ExecutorService e =
1813 new CustomTPE(100, 100,
1814 LONG_DELAY_MS, MILLISECONDS,
1815 new LinkedBlockingQueue<Runnable>(),
1816 new FailingThreadFactory());
1817 try {
1818 final int TASKS = 100;
1819 final CountDownLatch done = new CountDownLatch(TASKS);
1820 for (int k = 0; k < TASKS; ++k)
1821 e.execute(new CheckedRunnable() {
1822 public void realRun() {
1823 done.countDown();
1824 }});
1825 assertTrue(done.await(LONG_DELAY_MS, MILLISECONDS));
1826 } finally {
1827 joinPool(e);
1828 }
1829 }
1830
1831 /**
1832 * allowsCoreThreadTimeOut is by default false.
1833 */
1834 public void testAllowsCoreThreadTimeOut() {
1835 ThreadPoolExecutor p = new CustomTPE(2, 2, 1000, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1836 assertFalse(p.allowsCoreThreadTimeOut());
1837 joinPool(p);
1838 }
1839
1840 /**
1841 * allowCoreThreadTimeOut(true) causes idle threads to time out
1842 */
1843 public void testAllowCoreThreadTimeOut_true() throws Exception {
1844 long keepAliveTime = timeoutMillis();
1845 final ThreadPoolExecutor p =
1846 new CustomTPE(2, 10,
1847 keepAliveTime, MILLISECONDS,
1848 new ArrayBlockingQueue<Runnable>(10));
1849 final CountDownLatch threadStarted = new CountDownLatch(1);
1850 try {
1851 p.allowCoreThreadTimeOut(true);
1852 p.execute(new CheckedRunnable() {
1853 public void realRun() {
1854 threadStarted.countDown();
1855 assertEquals(1, p.getPoolSize());
1856 }});
1857 await(threadStarted);
1858 delay(keepAliveTime);
1859 long startTime = System.nanoTime();
1860 while (p.getPoolSize() > 0
1861 && millisElapsedSince(startTime) < LONG_DELAY_MS)
1862 Thread.yield();
1863 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1864 assertEquals(0, p.getPoolSize());
1865 } finally {
1866 joinPool(p);
1867 }
1868 }
1869
1870 /**
1871 * allowCoreThreadTimeOut(false) causes idle threads not to time out
1872 */
1873 public void testAllowCoreThreadTimeOut_false() throws Exception {
1874 long keepAliveTime = timeoutMillis();
1875 final ThreadPoolExecutor p =
1876 new CustomTPE(2, 10,
1877 keepAliveTime, MILLISECONDS,
1878 new ArrayBlockingQueue<Runnable>(10));
1879 final CountDownLatch threadStarted = new CountDownLatch(1);
1880 try {
1881 p.allowCoreThreadTimeOut(false);
1882 p.execute(new CheckedRunnable() {
1883 public void realRun() throws InterruptedException {
1884 threadStarted.countDown();
1885 assertTrue(p.getPoolSize() >= 1);
1886 }});
1887 delay(2 * keepAliveTime);
1888 assertTrue(p.getPoolSize() >= 1);
1889 } finally {
1890 joinPool(p);
1891 }
1892 }
1893
1894 /**
1895 * get(cancelled task) throws CancellationException
1896 * (in part, a test of CustomTPE itself)
1897 */
1898 public void testGet_cancelled() throws Exception {
1899 final ExecutorService e =
1900 new CustomTPE(1, 1,
1901 LONG_DELAY_MS, MILLISECONDS,
1902 new LinkedBlockingQueue<Runnable>());
1903 try {
1904 final CountDownLatch blockerStarted = new CountDownLatch(1);
1905 final CountDownLatch done = new CountDownLatch(1);
1906 final List<Future<?>> futures = new ArrayList<>();
1907 for (int i = 0; i < 2; i++) {
1908 Runnable r = new CheckedRunnable() { public void realRun()
1909 throws Throwable {
1910 blockerStarted.countDown();
1911 assertTrue(done.await(2 * LONG_DELAY_MS, MILLISECONDS));
1912 }};
1913 futures.add(e.submit(r));
1914 }
1915 assertTrue(blockerStarted.await(LONG_DELAY_MS, MILLISECONDS));
1916 for (Future<?> future : futures) future.cancel(false);
1917 for (Future<?> future : futures) {
1918 try {
1919 future.get();
1920 shouldThrow();
1921 } catch (CancellationException success) {}
1922 try {
1923 future.get(LONG_DELAY_MS, MILLISECONDS);
1924 shouldThrow();
1925 } catch (CancellationException success) {}
1926 assertTrue(future.isCancelled());
1927 assertTrue(future.isDone());
1928 }
1929 done.countDown();
1930 } finally {
1931 joinPool(e);
1932 }
1933 }
1934
1935 }