ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/ThreadPoolExecutorSubclassTest.java
Revision: 1.52
Committed: Sun Oct 4 01:18:25 2015 UTC (8 years, 7 months ago) by jsr166
Branch: MAIN
Changes since 1.51: +20 -9 lines
Log Message:
improve testPrestartCoreThread

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include Andrew Wright, Jeffrey Hayes,
6 * Pat Fisher, Mike Judd.
7 */
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.SECONDS;
11
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.concurrent.ArrayBlockingQueue;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.CancellationException;
18 import java.util.concurrent.CountDownLatch;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.FutureTask;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.RejectedExecutionHandler;
27 import java.util.concurrent.RunnableFuture;
28 import java.util.concurrent.SynchronousQueue;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.TimeoutException;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.locks.Condition;
35 import java.util.concurrent.locks.ReentrantLock;
36
37 import junit.framework.Test;
38 import junit.framework.TestSuite;
39
40 public class ThreadPoolExecutorSubclassTest extends JSR166TestCase {
41 public static void main(String[] args) {
42 main(suite(), args);
43 }
44 public static Test suite() {
45 return new TestSuite(ThreadPoolExecutorSubclassTest.class);
46 }
47
48 static class CustomTask<V> implements RunnableFuture<V> {
49 final Callable<V> callable;
50 final ReentrantLock lock = new ReentrantLock();
51 final Condition cond = lock.newCondition();
52 boolean done;
53 boolean cancelled;
54 V result;
55 Thread thread;
56 Exception exception;
57 CustomTask(Callable<V> c) {
58 if (c == null) throw new NullPointerException();
59 callable = c;
60 }
61 CustomTask(final Runnable r, final V res) {
62 if (r == null) throw new NullPointerException();
63 callable = new Callable<V>() {
64 public V call() throws Exception { r.run(); return res; }};
65 }
66 public boolean isDone() {
67 lock.lock(); try { return done; } finally { lock.unlock() ; }
68 }
69 public boolean isCancelled() {
70 lock.lock(); try { return cancelled; } finally { lock.unlock() ; }
71 }
72 public boolean cancel(boolean mayInterrupt) {
73 lock.lock();
74 try {
75 if (!done) {
76 cancelled = true;
77 done = true;
78 if (mayInterrupt && thread != null)
79 thread.interrupt();
80 return true;
81 }
82 return false;
83 }
84 finally { lock.unlock() ; }
85 }
86 public void run() {
87 lock.lock();
88 try {
89 if (done)
90 return;
91 thread = Thread.currentThread();
92 }
93 finally { lock.unlock() ; }
94 V v = null;
95 Exception e = null;
96 try {
97 v = callable.call();
98 }
99 catch (Exception ex) {
100 e = ex;
101 }
102 lock.lock();
103 try {
104 if (!done) {
105 result = v;
106 exception = e;
107 done = true;
108 thread = null;
109 cond.signalAll();
110 }
111 }
112 finally { lock.unlock(); }
113 }
114 public V get() throws InterruptedException, ExecutionException {
115 lock.lock();
116 try {
117 while (!done)
118 cond.await();
119 if (cancelled)
120 throw new CancellationException();
121 if (exception != null)
122 throw new ExecutionException(exception);
123 return result;
124 }
125 finally { lock.unlock(); }
126 }
127 public V get(long timeout, TimeUnit unit)
128 throws InterruptedException, ExecutionException, TimeoutException {
129 long nanos = unit.toNanos(timeout);
130 lock.lock();
131 try {
132 while (!done) {
133 if (nanos <= 0L)
134 throw new TimeoutException();
135 nanos = cond.awaitNanos(nanos);
136 }
137 if (cancelled)
138 throw new CancellationException();
139 if (exception != null)
140 throw new ExecutionException(exception);
141 return result;
142 }
143 finally { lock.unlock(); }
144 }
145 }
146
147 static class CustomTPE extends ThreadPoolExecutor {
148 protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
149 return new CustomTask<V>(c);
150 }
151 protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
152 return new CustomTask<V>(r, v);
153 }
154
155 CustomTPE(int corePoolSize,
156 int maximumPoolSize,
157 long keepAliveTime,
158 TimeUnit unit,
159 BlockingQueue<Runnable> workQueue) {
160 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
161 workQueue);
162 }
163 CustomTPE(int corePoolSize,
164 int maximumPoolSize,
165 long keepAliveTime,
166 TimeUnit unit,
167 BlockingQueue<Runnable> workQueue,
168 ThreadFactory threadFactory) {
169 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
170 threadFactory);
171 }
172
173 CustomTPE(int corePoolSize,
174 int maximumPoolSize,
175 long keepAliveTime,
176 TimeUnit unit,
177 BlockingQueue<Runnable> workQueue,
178 RejectedExecutionHandler handler) {
179 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
180 handler);
181 }
182 CustomTPE(int corePoolSize,
183 int maximumPoolSize,
184 long keepAliveTime,
185 TimeUnit unit,
186 BlockingQueue<Runnable> workQueue,
187 ThreadFactory threadFactory,
188 RejectedExecutionHandler handler) {
189 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
190 workQueue, threadFactory, handler);
191 }
192
193 final CountDownLatch beforeCalled = new CountDownLatch(1);
194 final CountDownLatch afterCalled = new CountDownLatch(1);
195 final CountDownLatch terminatedCalled = new CountDownLatch(1);
196
197 public CustomTPE() {
198 super(1, 1, LONG_DELAY_MS, MILLISECONDS, new SynchronousQueue<Runnable>());
199 }
200 protected void beforeExecute(Thread t, Runnable r) {
201 beforeCalled.countDown();
202 }
203 protected void afterExecute(Runnable r, Throwable t) {
204 afterCalled.countDown();
205 }
206 protected void terminated() {
207 terminatedCalled.countDown();
208 }
209
210 public boolean beforeCalled() {
211 return beforeCalled.getCount() == 0;
212 }
213 public boolean afterCalled() {
214 return afterCalled.getCount() == 0;
215 }
216 public boolean terminatedCalled() {
217 return terminatedCalled.getCount() == 0;
218 }
219 }
220
221 static class FailingThreadFactory implements ThreadFactory {
222 int calls = 0;
223 public Thread newThread(Runnable r) {
224 if (++calls > 1) return null;
225 return new Thread(r);
226 }
227 }
228
229 /**
230 * execute successfully executes a runnable
231 */
232 public void testExecute() throws InterruptedException {
233 final ThreadPoolExecutor p =
234 new CustomTPE(1, 1,
235 2 * LONG_DELAY_MS, MILLISECONDS,
236 new ArrayBlockingQueue<Runnable>(10));
237 try (PoolCleaner cleaner = cleaner(p)) {
238 final CountDownLatch done = new CountDownLatch(1);
239 final Runnable task = new CheckedRunnable() {
240 public void realRun() { done.countDown(); }};
241 p.execute(task);
242 assertTrue(done.await(LONG_DELAY_MS, MILLISECONDS));
243 }
244 }
245
246 /**
247 * getActiveCount increases but doesn't overestimate, when a
248 * thread becomes active
249 */
250 public void testGetActiveCount() throws InterruptedException {
251 final ThreadPoolExecutor p =
252 new CustomTPE(2, 2,
253 LONG_DELAY_MS, MILLISECONDS,
254 new ArrayBlockingQueue<Runnable>(10));
255 final CountDownLatch threadStarted = new CountDownLatch(1);
256 final CountDownLatch done = new CountDownLatch(1);
257 try (PoolCleaner cleaner = cleaner(p)) {
258 assertEquals(0, p.getActiveCount());
259 p.execute(new CheckedRunnable() {
260 public void realRun() throws InterruptedException {
261 threadStarted.countDown();
262 assertEquals(1, p.getActiveCount());
263 done.await();
264 }});
265 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
266 assertEquals(1, p.getActiveCount());
267 done.countDown();
268 }
269 }
270
271 /**
272 * prestartCoreThread starts a thread if under corePoolSize, else doesn't
273 */
274 public void testPrestartCoreThread() {
275 ThreadPoolExecutor p =
276 new CustomTPE(2, 6,
277 LONG_DELAY_MS, MILLISECONDS,
278 new ArrayBlockingQueue<Runnable>(10));
279 try (PoolCleaner cleaner = cleaner(p)) {
280 assertEquals(0, p.getPoolSize());
281 assertTrue(p.prestartCoreThread());
282 assertEquals(1, p.getPoolSize());
283 assertTrue(p.prestartCoreThread());
284 assertEquals(2, p.getPoolSize());
285 assertFalse(p.prestartCoreThread());
286 assertEquals(2, p.getPoolSize());
287 p.setCorePoolSize(4);
288 assertTrue(p.prestartCoreThread());
289 assertEquals(3, p.getPoolSize());
290 assertTrue(p.prestartCoreThread());
291 assertEquals(4, p.getPoolSize());
292 assertFalse(p.prestartCoreThread());
293 assertEquals(4, p.getPoolSize());
294 }
295 }
296
297 /**
298 * prestartAllCoreThreads starts all corePoolSize threads
299 */
300 public void testPrestartAllCoreThreads() {
301 ThreadPoolExecutor p = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
302 assertEquals(0, p.getPoolSize());
303 p.prestartAllCoreThreads();
304 assertEquals(2, p.getPoolSize());
305 p.prestartAllCoreThreads();
306 assertEquals(2, p.getPoolSize());
307 joinPool(p);
308 }
309
310 /**
311 * getCompletedTaskCount increases, but doesn't overestimate,
312 * when tasks complete
313 */
314 public void testGetCompletedTaskCount() throws InterruptedException {
315 final ThreadPoolExecutor p =
316 new CustomTPE(2, 2,
317 LONG_DELAY_MS, MILLISECONDS,
318 new ArrayBlockingQueue<Runnable>(10));
319 final CountDownLatch threadStarted = new CountDownLatch(1);
320 final CountDownLatch threadProceed = new CountDownLatch(1);
321 final CountDownLatch threadDone = new CountDownLatch(1);
322 try {
323 assertEquals(0, p.getCompletedTaskCount());
324 p.execute(new CheckedRunnable() {
325 public void realRun() throws InterruptedException {
326 threadStarted.countDown();
327 assertEquals(0, p.getCompletedTaskCount());
328 threadProceed.await();
329 threadDone.countDown();
330 }});
331 await(threadStarted);
332 assertEquals(0, p.getCompletedTaskCount());
333 threadProceed.countDown();
334 threadDone.await();
335 long startTime = System.nanoTime();
336 while (p.getCompletedTaskCount() != 1) {
337 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
338 fail("timed out");
339 Thread.yield();
340 }
341 } finally {
342 joinPool(p);
343 }
344 }
345
346 /**
347 * getCorePoolSize returns size given in constructor if not otherwise set
348 */
349 public void testGetCorePoolSize() {
350 ThreadPoolExecutor p = new CustomTPE(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
351 assertEquals(1, p.getCorePoolSize());
352 joinPool(p);
353 }
354
355 /**
356 * getKeepAliveTime returns value given in constructor if not otherwise set
357 */
358 public void testGetKeepAliveTime() {
359 ThreadPoolExecutor p = new CustomTPE(2, 2, 1000, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
360 assertEquals(1, p.getKeepAliveTime(SECONDS));
361 joinPool(p);
362 }
363
364 /**
365 * getThreadFactory returns factory in constructor if not set
366 */
367 public void testGetThreadFactory() {
368 ThreadFactory tf = new SimpleThreadFactory();
369 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), tf, new NoOpREHandler());
370 assertSame(tf, p.getThreadFactory());
371 joinPool(p);
372 }
373
374 /**
375 * setThreadFactory sets the thread factory returned by getThreadFactory
376 */
377 public void testSetThreadFactory() {
378 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
379 ThreadFactory tf = new SimpleThreadFactory();
380 p.setThreadFactory(tf);
381 assertSame(tf, p.getThreadFactory());
382 joinPool(p);
383 }
384
385 /**
386 * setThreadFactory(null) throws NPE
387 */
388 public void testSetThreadFactoryNull() {
389 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
390 try {
391 p.setThreadFactory(null);
392 shouldThrow();
393 } catch (NullPointerException success) {
394 } finally {
395 joinPool(p);
396 }
397 }
398
399 /**
400 * getRejectedExecutionHandler returns handler in constructor if not set
401 */
402 public void testGetRejectedExecutionHandler() {
403 RejectedExecutionHandler h = new NoOpREHandler();
404 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), h);
405 assertSame(h, p.getRejectedExecutionHandler());
406 joinPool(p);
407 }
408
409 /**
410 * setRejectedExecutionHandler sets the handler returned by
411 * getRejectedExecutionHandler
412 */
413 public void testSetRejectedExecutionHandler() {
414 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
415 RejectedExecutionHandler h = new NoOpREHandler();
416 p.setRejectedExecutionHandler(h);
417 assertSame(h, p.getRejectedExecutionHandler());
418 joinPool(p);
419 }
420
421 /**
422 * setRejectedExecutionHandler(null) throws NPE
423 */
424 public void testSetRejectedExecutionHandlerNull() {
425 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
426 try {
427 p.setRejectedExecutionHandler(null);
428 shouldThrow();
429 } catch (NullPointerException success) {
430 } finally {
431 joinPool(p);
432 }
433 }
434
435 /**
436 * getLargestPoolSize increases, but doesn't overestimate, when
437 * multiple threads active
438 */
439 public void testGetLargestPoolSize() throws InterruptedException {
440 final int THREADS = 3;
441 final ThreadPoolExecutor p =
442 new CustomTPE(THREADS, THREADS,
443 LONG_DELAY_MS, MILLISECONDS,
444 new ArrayBlockingQueue<Runnable>(10));
445 final CountDownLatch threadsStarted = new CountDownLatch(THREADS);
446 final CountDownLatch done = new CountDownLatch(1);
447 try {
448 assertEquals(0, p.getLargestPoolSize());
449 for (int i = 0; i < THREADS; i++)
450 p.execute(new CheckedRunnable() {
451 public void realRun() throws InterruptedException {
452 threadsStarted.countDown();
453 done.await();
454 assertEquals(THREADS, p.getLargestPoolSize());
455 }});
456 assertTrue(threadsStarted.await(SMALL_DELAY_MS, MILLISECONDS));
457 assertEquals(THREADS, p.getLargestPoolSize());
458 } finally {
459 done.countDown();
460 joinPool(p);
461 assertEquals(THREADS, p.getLargestPoolSize());
462 }
463 }
464
465 /**
466 * getMaximumPoolSize returns value given in constructor if not
467 * otherwise set
468 */
469 public void testGetMaximumPoolSize() {
470 ThreadPoolExecutor p = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
471 assertEquals(2, p.getMaximumPoolSize());
472 joinPool(p);
473 }
474
475 /**
476 * getPoolSize increases, but doesn't overestimate, when threads
477 * become active
478 */
479 public void testGetPoolSize() throws InterruptedException {
480 final ThreadPoolExecutor p =
481 new CustomTPE(1, 1,
482 LONG_DELAY_MS, MILLISECONDS,
483 new ArrayBlockingQueue<Runnable>(10));
484 final CountDownLatch threadStarted = new CountDownLatch(1);
485 final CountDownLatch done = new CountDownLatch(1);
486 try {
487 assertEquals(0, p.getPoolSize());
488 p.execute(new CheckedRunnable() {
489 public void realRun() throws InterruptedException {
490 threadStarted.countDown();
491 assertEquals(1, p.getPoolSize());
492 done.await();
493 }});
494 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
495 assertEquals(1, p.getPoolSize());
496 } finally {
497 done.countDown();
498 joinPool(p);
499 }
500 }
501
502 /**
503 * getTaskCount increases, but doesn't overestimate, when tasks submitted
504 */
505 public void testGetTaskCount() throws InterruptedException {
506 final ThreadPoolExecutor p =
507 new CustomTPE(1, 1,
508 LONG_DELAY_MS, MILLISECONDS,
509 new ArrayBlockingQueue<Runnable>(10));
510 final CountDownLatch threadStarted = new CountDownLatch(1);
511 final CountDownLatch done = new CountDownLatch(1);
512 try {
513 assertEquals(0, p.getTaskCount());
514 p.execute(new CheckedRunnable() {
515 public void realRun() throws InterruptedException {
516 threadStarted.countDown();
517 assertEquals(1, p.getTaskCount());
518 done.await();
519 }});
520 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
521 assertEquals(1, p.getTaskCount());
522 } finally {
523 done.countDown();
524 joinPool(p);
525 }
526 }
527
528 /**
529 * isShutdown is false before shutdown, true after
530 */
531 public void testIsShutdown() {
532
533 ThreadPoolExecutor p = new CustomTPE(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
534 assertFalse(p.isShutdown());
535 try { p.shutdown(); } catch (SecurityException ok) { return; }
536 assertTrue(p.isShutdown());
537 joinPool(p);
538 }
539
540 /**
541 * isTerminated is false before termination, true after
542 */
543 public void testIsTerminated() throws InterruptedException {
544 final ThreadPoolExecutor p =
545 new CustomTPE(1, 1,
546 LONG_DELAY_MS, MILLISECONDS,
547 new ArrayBlockingQueue<Runnable>(10));
548 final CountDownLatch threadStarted = new CountDownLatch(1);
549 final CountDownLatch done = new CountDownLatch(1);
550 try {
551 assertFalse(p.isTerminating());
552 p.execute(new CheckedRunnable() {
553 public void realRun() throws InterruptedException {
554 assertFalse(p.isTerminating());
555 threadStarted.countDown();
556 done.await();
557 }});
558 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
559 assertFalse(p.isTerminating());
560 done.countDown();
561 } finally {
562 try { p.shutdown(); } catch (SecurityException ok) { return; }
563 }
564 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
565 assertTrue(p.isTerminated());
566 assertFalse(p.isTerminating());
567 }
568
569 /**
570 * isTerminating is not true when running or when terminated
571 */
572 public void testIsTerminating() throws InterruptedException {
573 final ThreadPoolExecutor p =
574 new CustomTPE(1, 1,
575 LONG_DELAY_MS, MILLISECONDS,
576 new ArrayBlockingQueue<Runnable>(10));
577 final CountDownLatch threadStarted = new CountDownLatch(1);
578 final CountDownLatch done = new CountDownLatch(1);
579 try {
580 assertFalse(p.isTerminating());
581 p.execute(new CheckedRunnable() {
582 public void realRun() throws InterruptedException {
583 assertFalse(p.isTerminating());
584 threadStarted.countDown();
585 done.await();
586 }});
587 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
588 assertFalse(p.isTerminating());
589 done.countDown();
590 } finally {
591 try { p.shutdown(); } catch (SecurityException ok) { return; }
592 }
593 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
594 assertTrue(p.isTerminated());
595 assertFalse(p.isTerminating());
596 }
597
598 /**
599 * getQueue returns the work queue, which contains queued tasks
600 */
601 public void testGetQueue() throws InterruptedException {
602 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
603 final ThreadPoolExecutor p =
604 new CustomTPE(1, 1,
605 LONG_DELAY_MS, MILLISECONDS,
606 q);
607 final CountDownLatch threadStarted = new CountDownLatch(1);
608 final CountDownLatch done = new CountDownLatch(1);
609 try {
610 FutureTask[] tasks = new FutureTask[5];
611 for (int i = 0; i < tasks.length; i++) {
612 Callable task = new CheckedCallable<Boolean>() {
613 public Boolean realCall() throws InterruptedException {
614 threadStarted.countDown();
615 assertSame(q, p.getQueue());
616 done.await();
617 return Boolean.TRUE;
618 }};
619 tasks[i] = new FutureTask(task);
620 p.execute(tasks[i]);
621 }
622 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
623 assertSame(q, p.getQueue());
624 assertFalse(q.contains(tasks[0]));
625 assertTrue(q.contains(tasks[tasks.length - 1]));
626 assertEquals(tasks.length - 1, q.size());
627 } finally {
628 done.countDown();
629 joinPool(p);
630 }
631 }
632
633 /**
634 * remove(task) removes queued task, and fails to remove active task
635 */
636 public void testRemove() throws InterruptedException {
637 BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
638 final ThreadPoolExecutor p =
639 new CustomTPE(1, 1,
640 LONG_DELAY_MS, MILLISECONDS,
641 q);
642 Runnable[] tasks = new Runnable[6];
643 final CountDownLatch threadStarted = new CountDownLatch(1);
644 final CountDownLatch done = new CountDownLatch(1);
645 try {
646 for (int i = 0; i < tasks.length; i++) {
647 tasks[i] = new CheckedRunnable() {
648 public void realRun() throws InterruptedException {
649 threadStarted.countDown();
650 done.await();
651 }};
652 p.execute(tasks[i]);
653 }
654 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
655 assertFalse(p.remove(tasks[0]));
656 assertTrue(q.contains(tasks[4]));
657 assertTrue(q.contains(tasks[3]));
658 assertTrue(p.remove(tasks[4]));
659 assertFalse(p.remove(tasks[4]));
660 assertFalse(q.contains(tasks[4]));
661 assertTrue(q.contains(tasks[3]));
662 assertTrue(p.remove(tasks[3]));
663 assertFalse(q.contains(tasks[3]));
664 } finally {
665 done.countDown();
666 joinPool(p);
667 }
668 }
669
670 /**
671 * purge removes cancelled tasks from the queue
672 */
673 public void testPurge() throws InterruptedException {
674 final CountDownLatch threadStarted = new CountDownLatch(1);
675 final CountDownLatch done = new CountDownLatch(1);
676 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
677 final ThreadPoolExecutor p =
678 new CustomTPE(1, 1,
679 LONG_DELAY_MS, MILLISECONDS,
680 q);
681 FutureTask[] tasks = new FutureTask[5];
682 try {
683 for (int i = 0; i < tasks.length; i++) {
684 Callable task = new CheckedCallable<Boolean>() {
685 public Boolean realCall() throws InterruptedException {
686 threadStarted.countDown();
687 done.await();
688 return Boolean.TRUE;
689 }};
690 tasks[i] = new FutureTask(task);
691 p.execute(tasks[i]);
692 }
693 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
694 assertEquals(tasks.length, p.getTaskCount());
695 assertEquals(tasks.length - 1, q.size());
696 assertEquals(1L, p.getActiveCount());
697 assertEquals(0L, p.getCompletedTaskCount());
698 tasks[4].cancel(true);
699 tasks[3].cancel(false);
700 p.purge();
701 assertEquals(tasks.length - 3, q.size());
702 assertEquals(tasks.length - 2, p.getTaskCount());
703 p.purge(); // Nothing to do
704 assertEquals(tasks.length - 3, q.size());
705 assertEquals(tasks.length - 2, p.getTaskCount());
706 } finally {
707 done.countDown();
708 joinPool(p);
709 }
710 }
711
712 /**
713 * shutdownNow returns a list containing tasks that were not run,
714 * and those tasks are drained from the queue
715 */
716 public void testShutdownNow() throws InterruptedException {
717 final int poolSize = 2;
718 final int count = 5;
719 final AtomicInteger ran = new AtomicInteger(0);
720 ThreadPoolExecutor p =
721 new CustomTPE(poolSize, poolSize, LONG_DELAY_MS, MILLISECONDS,
722 new ArrayBlockingQueue<Runnable>(10));
723 CountDownLatch threadsStarted = new CountDownLatch(poolSize);
724 Runnable waiter = new CheckedRunnable() { public void realRun() {
725 threadsStarted.countDown();
726 try {
727 MILLISECONDS.sleep(2 * LONG_DELAY_MS);
728 } catch (InterruptedException success) {}
729 ran.getAndIncrement();
730 }};
731 for (int i = 0; i < count; i++)
732 p.execute(waiter);
733 assertTrue(threadsStarted.await(LONG_DELAY_MS, MILLISECONDS));
734 assertEquals(poolSize, p.getActiveCount());
735 assertEquals(0, p.getCompletedTaskCount());
736 final List<Runnable> queuedTasks;
737 try {
738 queuedTasks = p.shutdownNow();
739 } catch (SecurityException ok) {
740 return; // Allowed in case test doesn't have privs
741 }
742 assertTrue(p.isShutdown());
743 assertTrue(p.getQueue().isEmpty());
744 assertEquals(count - poolSize, queuedTasks.size());
745 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
746 assertTrue(p.isTerminated());
747 assertEquals(poolSize, ran.get());
748 assertEquals(poolSize, p.getCompletedTaskCount());
749 }
750
751 // Exception Tests
752
753 /**
754 * Constructor throws if corePoolSize argument is less than zero
755 */
756 public void testConstructor1() {
757 try {
758 new CustomTPE(-1, 1, 1L, SECONDS,
759 new ArrayBlockingQueue<Runnable>(10));
760 shouldThrow();
761 } catch (IllegalArgumentException success) {}
762 }
763
764 /**
765 * Constructor throws if maximumPoolSize is less than zero
766 */
767 public void testConstructor2() {
768 try {
769 new CustomTPE(1, -1, 1L, SECONDS,
770 new ArrayBlockingQueue<Runnable>(10));
771 shouldThrow();
772 } catch (IllegalArgumentException success) {}
773 }
774
775 /**
776 * Constructor throws if maximumPoolSize is equal to zero
777 */
778 public void testConstructor3() {
779 try {
780 new CustomTPE(1, 0, 1L, SECONDS,
781 new ArrayBlockingQueue<Runnable>(10));
782 shouldThrow();
783 } catch (IllegalArgumentException success) {}
784 }
785
786 /**
787 * Constructor throws if keepAliveTime is less than zero
788 */
789 public void testConstructor4() {
790 try {
791 new CustomTPE(1, 2, -1L, SECONDS,
792 new ArrayBlockingQueue<Runnable>(10));
793 shouldThrow();
794 } catch (IllegalArgumentException success) {}
795 }
796
797 /**
798 * Constructor throws if corePoolSize is greater than the maximumPoolSize
799 */
800 public void testConstructor5() {
801 try {
802 new CustomTPE(2, 1, 1L, SECONDS,
803 new ArrayBlockingQueue<Runnable>(10));
804 shouldThrow();
805 } catch (IllegalArgumentException success) {}
806 }
807
808 /**
809 * Constructor throws if workQueue is set to null
810 */
811 public void testConstructorNullPointerException() {
812 try {
813 new CustomTPE(1, 2, 1L, SECONDS, null);
814 shouldThrow();
815 } catch (NullPointerException success) {}
816 }
817
818 /**
819 * Constructor throws if corePoolSize argument is less than zero
820 */
821 public void testConstructor6() {
822 try {
823 new CustomTPE(-1, 1, 1L, SECONDS,
824 new ArrayBlockingQueue<Runnable>(10),
825 new SimpleThreadFactory());
826 shouldThrow();
827 } catch (IllegalArgumentException success) {}
828 }
829
830 /**
831 * Constructor throws if maximumPoolSize is less than zero
832 */
833 public void testConstructor7() {
834 try {
835 new CustomTPE(1,-1, 1L, SECONDS,
836 new ArrayBlockingQueue<Runnable>(10),
837 new SimpleThreadFactory());
838 shouldThrow();
839 } catch (IllegalArgumentException success) {}
840 }
841
842 /**
843 * Constructor throws if maximumPoolSize is equal to zero
844 */
845 public void testConstructor8() {
846 try {
847 new CustomTPE(1, 0, 1L, SECONDS,
848 new ArrayBlockingQueue<Runnable>(10),
849 new SimpleThreadFactory());
850 shouldThrow();
851 } catch (IllegalArgumentException success) {}
852 }
853
854 /**
855 * Constructor throws if keepAliveTime is less than zero
856 */
857 public void testConstructor9() {
858 try {
859 new CustomTPE(1, 2, -1L, SECONDS,
860 new ArrayBlockingQueue<Runnable>(10),
861 new SimpleThreadFactory());
862 shouldThrow();
863 } catch (IllegalArgumentException success) {}
864 }
865
866 /**
867 * Constructor throws if corePoolSize is greater than the maximumPoolSize
868 */
869 public void testConstructor10() {
870 try {
871 new CustomTPE(2, 1, 1L, SECONDS,
872 new ArrayBlockingQueue<Runnable>(10),
873 new SimpleThreadFactory());
874 shouldThrow();
875 } catch (IllegalArgumentException success) {}
876 }
877
878 /**
879 * Constructor throws if workQueue is set to null
880 */
881 public void testConstructorNullPointerException2() {
882 try {
883 new CustomTPE(1, 2, 1L, SECONDS, null, new SimpleThreadFactory());
884 shouldThrow();
885 } catch (NullPointerException success) {}
886 }
887
888 /**
889 * Constructor throws if threadFactory is set to null
890 */
891 public void testConstructorNullPointerException3() {
892 try {
893 new CustomTPE(1, 2, 1L, SECONDS,
894 new ArrayBlockingQueue<Runnable>(10),
895 (ThreadFactory) null);
896 shouldThrow();
897 } catch (NullPointerException success) {}
898 }
899
900 /**
901 * Constructor throws if corePoolSize argument is less than zero
902 */
903 public void testConstructor11() {
904 try {
905 new CustomTPE(-1, 1, 1L, SECONDS,
906 new ArrayBlockingQueue<Runnable>(10),
907 new NoOpREHandler());
908 shouldThrow();
909 } catch (IllegalArgumentException success) {}
910 }
911
912 /**
913 * Constructor throws if maximumPoolSize is less than zero
914 */
915 public void testConstructor12() {
916 try {
917 new CustomTPE(1, -1, 1L, SECONDS,
918 new ArrayBlockingQueue<Runnable>(10),
919 new NoOpREHandler());
920 shouldThrow();
921 } catch (IllegalArgumentException success) {}
922 }
923
924 /**
925 * Constructor throws if maximumPoolSize is equal to zero
926 */
927 public void testConstructor13() {
928 try {
929 new CustomTPE(1, 0, 1L, SECONDS,
930 new ArrayBlockingQueue<Runnable>(10),
931 new NoOpREHandler());
932 shouldThrow();
933 } catch (IllegalArgumentException success) {}
934 }
935
936 /**
937 * Constructor throws if keepAliveTime is less than zero
938 */
939 public void testConstructor14() {
940 try {
941 new CustomTPE(1, 2, -1L, SECONDS,
942 new ArrayBlockingQueue<Runnable>(10),
943 new NoOpREHandler());
944 shouldThrow();
945 } catch (IllegalArgumentException success) {}
946 }
947
948 /**
949 * Constructor throws if corePoolSize is greater than the maximumPoolSize
950 */
951 public void testConstructor15() {
952 try {
953 new CustomTPE(2, 1, 1L, SECONDS,
954 new ArrayBlockingQueue<Runnable>(10),
955 new NoOpREHandler());
956 shouldThrow();
957 } catch (IllegalArgumentException success) {}
958 }
959
960 /**
961 * Constructor throws if workQueue is set to null
962 */
963 public void testConstructorNullPointerException4() {
964 try {
965 new CustomTPE(1, 2, 1L, SECONDS,
966 null,
967 new NoOpREHandler());
968 shouldThrow();
969 } catch (NullPointerException success) {}
970 }
971
972 /**
973 * Constructor throws if handler is set to null
974 */
975 public void testConstructorNullPointerException5() {
976 try {
977 new CustomTPE(1, 2, 1L, SECONDS,
978 new ArrayBlockingQueue<Runnable>(10),
979 (RejectedExecutionHandler) null);
980 shouldThrow();
981 } catch (NullPointerException success) {}
982 }
983
984 /**
985 * Constructor throws if corePoolSize argument is less than zero
986 */
987 public void testConstructor16() {
988 try {
989 new CustomTPE(-1, 1, 1L, SECONDS,
990 new ArrayBlockingQueue<Runnable>(10),
991 new SimpleThreadFactory(),
992 new NoOpREHandler());
993 shouldThrow();
994 } catch (IllegalArgumentException success) {}
995 }
996
997 /**
998 * Constructor throws if maximumPoolSize is less than zero
999 */
1000 public void testConstructor17() {
1001 try {
1002 new CustomTPE(1, -1, 1L, SECONDS,
1003 new ArrayBlockingQueue<Runnable>(10),
1004 new SimpleThreadFactory(),
1005 new NoOpREHandler());
1006 shouldThrow();
1007 } catch (IllegalArgumentException success) {}
1008 }
1009
1010 /**
1011 * Constructor throws if maximumPoolSize is equal to zero
1012 */
1013 public void testConstructor18() {
1014 try {
1015 new CustomTPE(1, 0, 1L, SECONDS,
1016 new ArrayBlockingQueue<Runnable>(10),
1017 new SimpleThreadFactory(),
1018 new NoOpREHandler());
1019 shouldThrow();
1020 } catch (IllegalArgumentException success) {}
1021 }
1022
1023 /**
1024 * Constructor throws if keepAliveTime is less than zero
1025 */
1026 public void testConstructor19() {
1027 try {
1028 new CustomTPE(1, 2, -1L, SECONDS,
1029 new ArrayBlockingQueue<Runnable>(10),
1030 new SimpleThreadFactory(),
1031 new NoOpREHandler());
1032 shouldThrow();
1033 } catch (IllegalArgumentException success) {}
1034 }
1035
1036 /**
1037 * Constructor throws if corePoolSize is greater than the maximumPoolSize
1038 */
1039 public void testConstructor20() {
1040 try {
1041 new CustomTPE(2, 1, 1L, SECONDS,
1042 new ArrayBlockingQueue<Runnable>(10),
1043 new SimpleThreadFactory(),
1044 new NoOpREHandler());
1045 shouldThrow();
1046 } catch (IllegalArgumentException success) {}
1047 }
1048
1049 /**
1050 * Constructor throws if workQueue is null
1051 */
1052 public void testConstructorNullPointerException6() {
1053 try {
1054 new CustomTPE(1, 2, 1L, SECONDS,
1055 null,
1056 new SimpleThreadFactory(),
1057 new NoOpREHandler());
1058 shouldThrow();
1059 } catch (NullPointerException success) {}
1060 }
1061
1062 /**
1063 * Constructor throws if handler is null
1064 */
1065 public void testConstructorNullPointerException7() {
1066 try {
1067 new CustomTPE(1, 2, 1L, SECONDS,
1068 new ArrayBlockingQueue<Runnable>(10),
1069 new SimpleThreadFactory(),
1070 (RejectedExecutionHandler) null);
1071 shouldThrow();
1072 } catch (NullPointerException success) {}
1073 }
1074
1075 /**
1076 * Constructor throws if ThreadFactory is null
1077 */
1078 public void testConstructorNullPointerException8() {
1079 try {
1080 new CustomTPE(1, 2, 1L, SECONDS,
1081 new ArrayBlockingQueue<Runnable>(10),
1082 (ThreadFactory) null,
1083 new NoOpREHandler());
1084 shouldThrow();
1085 } catch (NullPointerException success) {}
1086 }
1087
1088 /**
1089 * execute throws RejectedExecutionException if saturated.
1090 */
1091 public void testSaturatedExecute() {
1092 ThreadPoolExecutor p =
1093 new CustomTPE(1, 1,
1094 LONG_DELAY_MS, MILLISECONDS,
1095 new ArrayBlockingQueue<Runnable>(1));
1096 final CountDownLatch done = new CountDownLatch(1);
1097 try {
1098 Runnable task = new CheckedRunnable() {
1099 public void realRun() throws InterruptedException {
1100 done.await();
1101 }};
1102 for (int i = 0; i < 2; ++i)
1103 p.execute(task);
1104 for (int i = 0; i < 2; ++i) {
1105 try {
1106 p.execute(task);
1107 shouldThrow();
1108 } catch (RejectedExecutionException success) {}
1109 assertTrue(p.getTaskCount() <= 2);
1110 }
1111 } finally {
1112 done.countDown();
1113 joinPool(p);
1114 }
1115 }
1116
1117 /**
1118 * executor using CallerRunsPolicy runs task if saturated.
1119 */
1120 public void testSaturatedExecute2() {
1121 RejectedExecutionHandler h = new CustomTPE.CallerRunsPolicy();
1122 ThreadPoolExecutor p = new CustomTPE(1, 1,
1123 LONG_DELAY_MS, MILLISECONDS,
1124 new ArrayBlockingQueue<Runnable>(1),
1125 h);
1126 try {
1127 TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5];
1128 for (int i = 0; i < tasks.length; ++i)
1129 tasks[i] = new TrackedNoOpRunnable();
1130 TrackedLongRunnable mr = new TrackedLongRunnable();
1131 p.execute(mr);
1132 for (int i = 0; i < tasks.length; ++i)
1133 p.execute(tasks[i]);
1134 for (int i = 1; i < tasks.length; ++i)
1135 assertTrue(tasks[i].done);
1136 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1137 } finally {
1138 joinPool(p);
1139 }
1140 }
1141
1142 /**
1143 * executor using DiscardPolicy drops task if saturated.
1144 */
1145 public void testSaturatedExecute3() {
1146 RejectedExecutionHandler h = new CustomTPE.DiscardPolicy();
1147 ThreadPoolExecutor p =
1148 new CustomTPE(1, 1,
1149 LONG_DELAY_MS, MILLISECONDS,
1150 new ArrayBlockingQueue<Runnable>(1),
1151 h);
1152 try {
1153 TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5];
1154 for (int i = 0; i < tasks.length; ++i)
1155 tasks[i] = new TrackedNoOpRunnable();
1156 p.execute(new TrackedLongRunnable());
1157 for (TrackedNoOpRunnable task : tasks)
1158 p.execute(task);
1159 for (TrackedNoOpRunnable task : tasks)
1160 assertFalse(task.done);
1161 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1162 } finally {
1163 joinPool(p);
1164 }
1165 }
1166
1167 /**
1168 * executor using DiscardOldestPolicy drops oldest task if saturated.
1169 */
1170 public void testSaturatedExecute4() {
1171 RejectedExecutionHandler h = new CustomTPE.DiscardOldestPolicy();
1172 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1173 try {
1174 p.execute(new TrackedLongRunnable());
1175 TrackedLongRunnable r2 = new TrackedLongRunnable();
1176 p.execute(r2);
1177 assertTrue(p.getQueue().contains(r2));
1178 TrackedNoOpRunnable r3 = new TrackedNoOpRunnable();
1179 p.execute(r3);
1180 assertFalse(p.getQueue().contains(r2));
1181 assertTrue(p.getQueue().contains(r3));
1182 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1183 } finally {
1184 joinPool(p);
1185 }
1186 }
1187
1188 /**
1189 * execute throws RejectedExecutionException if shutdown
1190 */
1191 public void testRejectedExecutionExceptionOnShutdown() {
1192 ThreadPoolExecutor p =
1193 new CustomTPE(1,1,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(1));
1194 try { p.shutdown(); } catch (SecurityException ok) { return; }
1195 try {
1196 p.execute(new NoOpRunnable());
1197 shouldThrow();
1198 } catch (RejectedExecutionException success) {}
1199
1200 joinPool(p);
1201 }
1202
1203 /**
1204 * execute using CallerRunsPolicy drops task on shutdown
1205 */
1206 public void testCallerRunsOnShutdown() {
1207 RejectedExecutionHandler h = new CustomTPE.CallerRunsPolicy();
1208 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1209
1210 try { p.shutdown(); } catch (SecurityException ok) { return; }
1211 try {
1212 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1213 p.execute(r);
1214 assertFalse(r.done);
1215 } finally {
1216 joinPool(p);
1217 }
1218 }
1219
1220 /**
1221 * execute using DiscardPolicy drops task on shutdown
1222 */
1223 public void testDiscardOnShutdown() {
1224 RejectedExecutionHandler h = new CustomTPE.DiscardPolicy();
1225 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1226
1227 try { p.shutdown(); } catch (SecurityException ok) { return; }
1228 try {
1229 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1230 p.execute(r);
1231 assertFalse(r.done);
1232 } finally {
1233 joinPool(p);
1234 }
1235 }
1236
1237 /**
1238 * execute using DiscardOldestPolicy drops task on shutdown
1239 */
1240 public void testDiscardOldestOnShutdown() {
1241 RejectedExecutionHandler h = new CustomTPE.DiscardOldestPolicy();
1242 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1243
1244 try { p.shutdown(); } catch (SecurityException ok) { return; }
1245 try {
1246 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1247 p.execute(r);
1248 assertFalse(r.done);
1249 } finally {
1250 joinPool(p);
1251 }
1252 }
1253
1254 /**
1255 * execute(null) throws NPE
1256 */
1257 public void testExecuteNull() {
1258 ThreadPoolExecutor p =
1259 new CustomTPE(1, 2, 1L, SECONDS,
1260 new ArrayBlockingQueue<Runnable>(10));
1261 try {
1262 p.execute(null);
1263 shouldThrow();
1264 } catch (NullPointerException success) {}
1265
1266 joinPool(p);
1267 }
1268
1269 /**
1270 * setCorePoolSize of negative value throws IllegalArgumentException
1271 */
1272 public void testCorePoolSizeIllegalArgumentException() {
1273 ThreadPoolExecutor p =
1274 new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1275 try {
1276 p.setCorePoolSize(-1);
1277 shouldThrow();
1278 } catch (IllegalArgumentException success) {
1279 } finally {
1280 try { p.shutdown(); } catch (SecurityException ok) { return; }
1281 }
1282 joinPool(p);
1283 }
1284
1285 /**
1286 * setMaximumPoolSize(int) throws IllegalArgumentException
1287 * if given a value less the core pool size
1288 */
1289 public void testMaximumPoolSizeIllegalArgumentException() {
1290 ThreadPoolExecutor p =
1291 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1292 try {
1293 p.setMaximumPoolSize(1);
1294 shouldThrow();
1295 } catch (IllegalArgumentException success) {
1296 } finally {
1297 try { p.shutdown(); } catch (SecurityException ok) { return; }
1298 }
1299 joinPool(p);
1300 }
1301
1302 /**
1303 * setMaximumPoolSize throws IllegalArgumentException
1304 * if given a negative value
1305 */
1306 public void testMaximumPoolSizeIllegalArgumentException2() {
1307 ThreadPoolExecutor p =
1308 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1309 try {
1310 p.setMaximumPoolSize(-1);
1311 shouldThrow();
1312 } catch (IllegalArgumentException success) {
1313 } finally {
1314 try { p.shutdown(); } catch (SecurityException ok) { return; }
1315 }
1316 joinPool(p);
1317 }
1318
1319 /**
1320 * setKeepAliveTime throws IllegalArgumentException
1321 * when given a negative value
1322 */
1323 public void testKeepAliveTimeIllegalArgumentException() {
1324 ThreadPoolExecutor p =
1325 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1326
1327 try {
1328 p.setKeepAliveTime(-1,MILLISECONDS);
1329 shouldThrow();
1330 } catch (IllegalArgumentException success) {
1331 } finally {
1332 try { p.shutdown(); } catch (SecurityException ok) { return; }
1333 }
1334 joinPool(p);
1335 }
1336
1337 /**
1338 * terminated() is called on termination
1339 */
1340 public void testTerminated() {
1341 CustomTPE p = new CustomTPE();
1342 try { p.shutdown(); } catch (SecurityException ok) { return; }
1343 assertTrue(p.terminatedCalled());
1344 joinPool(p);
1345 }
1346
1347 /**
1348 * beforeExecute and afterExecute are called when executing task
1349 */
1350 public void testBeforeAfter() throws InterruptedException {
1351 CustomTPE p = new CustomTPE();
1352 try {
1353 final CountDownLatch done = new CountDownLatch(1);
1354 p.execute(new CheckedRunnable() {
1355 public void realRun() {
1356 done.countDown();
1357 }});
1358 await(p.afterCalled);
1359 assertEquals(0, done.getCount());
1360 assertTrue(p.afterCalled());
1361 assertTrue(p.beforeCalled());
1362 try { p.shutdown(); } catch (SecurityException ok) { return; }
1363 } finally {
1364 joinPool(p);
1365 }
1366 }
1367
1368 /**
1369 * completed submit of callable returns result
1370 */
1371 public void testSubmitCallable() throws Exception {
1372 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1373 try {
1374 Future<String> future = e.submit(new StringTask());
1375 String result = future.get();
1376 assertSame(TEST_STRING, result);
1377 } finally {
1378 joinPool(e);
1379 }
1380 }
1381
1382 /**
1383 * completed submit of runnable returns successfully
1384 */
1385 public void testSubmitRunnable() throws Exception {
1386 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1387 try {
1388 Future<?> future = e.submit(new NoOpRunnable());
1389 future.get();
1390 assertTrue(future.isDone());
1391 } finally {
1392 joinPool(e);
1393 }
1394 }
1395
1396 /**
1397 * completed submit of (runnable, result) returns result
1398 */
1399 public void testSubmitRunnable2() throws Exception {
1400 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1401 try {
1402 Future<String> future = e.submit(new NoOpRunnable(), TEST_STRING);
1403 String result = future.get();
1404 assertSame(TEST_STRING, result);
1405 } finally {
1406 joinPool(e);
1407 }
1408 }
1409
1410 /**
1411 * invokeAny(null) throws NPE
1412 */
1413 public void testInvokeAny1() throws Exception {
1414 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1415 try {
1416 e.invokeAny(null);
1417 shouldThrow();
1418 } catch (NullPointerException success) {
1419 } finally {
1420 joinPool(e);
1421 }
1422 }
1423
1424 /**
1425 * invokeAny(empty collection) throws IAE
1426 */
1427 public void testInvokeAny2() throws Exception {
1428 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1429 try {
1430 e.invokeAny(new ArrayList<Callable<String>>());
1431 shouldThrow();
1432 } catch (IllegalArgumentException success) {
1433 } finally {
1434 joinPool(e);
1435 }
1436 }
1437
1438 /**
1439 * invokeAny(c) throws NPE if c has null elements
1440 */
1441 public void testInvokeAny3() throws Exception {
1442 CountDownLatch latch = new CountDownLatch(1);
1443 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1444 List<Callable<String>> l = new ArrayList<Callable<String>>();
1445 l.add(latchAwaitingStringTask(latch));
1446 l.add(null);
1447 try {
1448 e.invokeAny(l);
1449 shouldThrow();
1450 } catch (NullPointerException success) {
1451 } finally {
1452 latch.countDown();
1453 joinPool(e);
1454 }
1455 }
1456
1457 /**
1458 * invokeAny(c) throws ExecutionException if no task completes
1459 */
1460 public void testInvokeAny4() throws Exception {
1461 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1462 List<Callable<String>> l = new ArrayList<Callable<String>>();
1463 l.add(new NPETask());
1464 try {
1465 e.invokeAny(l);
1466 shouldThrow();
1467 } catch (ExecutionException success) {
1468 assertTrue(success.getCause() instanceof NullPointerException);
1469 } finally {
1470 joinPool(e);
1471 }
1472 }
1473
1474 /**
1475 * invokeAny(c) returns result of some task
1476 */
1477 public void testInvokeAny5() throws Exception {
1478 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1479 try {
1480 List<Callable<String>> l = new ArrayList<Callable<String>>();
1481 l.add(new StringTask());
1482 l.add(new StringTask());
1483 String result = e.invokeAny(l);
1484 assertSame(TEST_STRING, result);
1485 } finally {
1486 joinPool(e);
1487 }
1488 }
1489
1490 /**
1491 * invokeAll(null) throws NPE
1492 */
1493 public void testInvokeAll1() throws Exception {
1494 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1495 try {
1496 e.invokeAll(null);
1497 shouldThrow();
1498 } catch (NullPointerException success) {
1499 } finally {
1500 joinPool(e);
1501 }
1502 }
1503
1504 /**
1505 * invokeAll(empty collection) returns empty collection
1506 */
1507 public void testInvokeAll2() throws Exception {
1508 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1509 try {
1510 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>());
1511 assertTrue(r.isEmpty());
1512 } finally {
1513 joinPool(e);
1514 }
1515 }
1516
1517 /**
1518 * invokeAll(c) throws NPE if c has null elements
1519 */
1520 public void testInvokeAll3() throws Exception {
1521 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1522 List<Callable<String>> l = new ArrayList<Callable<String>>();
1523 l.add(new StringTask());
1524 l.add(null);
1525 try {
1526 e.invokeAll(l);
1527 shouldThrow();
1528 } catch (NullPointerException success) {
1529 } finally {
1530 joinPool(e);
1531 }
1532 }
1533
1534 /**
1535 * get of element of invokeAll(c) throws exception on failed task
1536 */
1537 public void testInvokeAll4() throws Exception {
1538 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1539 List<Callable<String>> l = new ArrayList<Callable<String>>();
1540 l.add(new NPETask());
1541 List<Future<String>> futures = e.invokeAll(l);
1542 assertEquals(1, futures.size());
1543 try {
1544 futures.get(0).get();
1545 shouldThrow();
1546 } catch (ExecutionException success) {
1547 assertTrue(success.getCause() instanceof NullPointerException);
1548 } finally {
1549 joinPool(e);
1550 }
1551 }
1552
1553 /**
1554 * invokeAll(c) returns results of all completed tasks
1555 */
1556 public void testInvokeAll5() throws Exception {
1557 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1558 try {
1559 List<Callable<String>> l = new ArrayList<Callable<String>>();
1560 l.add(new StringTask());
1561 l.add(new StringTask());
1562 List<Future<String>> futures = e.invokeAll(l);
1563 assertEquals(2, futures.size());
1564 for (Future<String> future : futures)
1565 assertSame(TEST_STRING, future.get());
1566 } finally {
1567 joinPool(e);
1568 }
1569 }
1570
1571 /**
1572 * timed invokeAny(null) throws NPE
1573 */
1574 public void testTimedInvokeAny1() throws Exception {
1575 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1576 try {
1577 e.invokeAny(null, MEDIUM_DELAY_MS, MILLISECONDS);
1578 shouldThrow();
1579 } catch (NullPointerException success) {
1580 } finally {
1581 joinPool(e);
1582 }
1583 }
1584
1585 /**
1586 * timed invokeAny(,,null) throws NPE
1587 */
1588 public void testTimedInvokeAnyNullTimeUnit() throws Exception {
1589 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1590 List<Callable<String>> l = new ArrayList<Callable<String>>();
1591 l.add(new StringTask());
1592 try {
1593 e.invokeAny(l, MEDIUM_DELAY_MS, null);
1594 shouldThrow();
1595 } catch (NullPointerException success) {
1596 } finally {
1597 joinPool(e);
1598 }
1599 }
1600
1601 /**
1602 * timed invokeAny(empty collection) throws IAE
1603 */
1604 public void testTimedInvokeAny2() throws Exception {
1605 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1606 try {
1607 e.invokeAny(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1608 shouldThrow();
1609 } catch (IllegalArgumentException success) {
1610 } finally {
1611 joinPool(e);
1612 }
1613 }
1614
1615 /**
1616 * timed invokeAny(c) throws NPE if c has null elements
1617 */
1618 public void testTimedInvokeAny3() throws Exception {
1619 CountDownLatch latch = new CountDownLatch(1);
1620 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1621 List<Callable<String>> l = new ArrayList<Callable<String>>();
1622 l.add(latchAwaitingStringTask(latch));
1623 l.add(null);
1624 try {
1625 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1626 shouldThrow();
1627 } catch (NullPointerException success) {
1628 } finally {
1629 latch.countDown();
1630 joinPool(e);
1631 }
1632 }
1633
1634 /**
1635 * timed invokeAny(c) throws ExecutionException if no task completes
1636 */
1637 public void testTimedInvokeAny4() throws Exception {
1638 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1639 List<Callable<String>> l = new ArrayList<Callable<String>>();
1640 l.add(new NPETask());
1641 try {
1642 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1643 shouldThrow();
1644 } catch (ExecutionException success) {
1645 assertTrue(success.getCause() instanceof NullPointerException);
1646 } finally {
1647 joinPool(e);
1648 }
1649 }
1650
1651 /**
1652 * timed invokeAny(c) returns result of some task
1653 */
1654 public void testTimedInvokeAny5() throws Exception {
1655 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1656 try {
1657 List<Callable<String>> l = new ArrayList<Callable<String>>();
1658 l.add(new StringTask());
1659 l.add(new StringTask());
1660 String result = e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1661 assertSame(TEST_STRING, result);
1662 } finally {
1663 joinPool(e);
1664 }
1665 }
1666
1667 /**
1668 * timed invokeAll(null) throws NPE
1669 */
1670 public void testTimedInvokeAll1() throws Exception {
1671 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1672 try {
1673 e.invokeAll(null, MEDIUM_DELAY_MS, MILLISECONDS);
1674 shouldThrow();
1675 } catch (NullPointerException success) {
1676 } finally {
1677 joinPool(e);
1678 }
1679 }
1680
1681 /**
1682 * timed invokeAll(,,null) throws NPE
1683 */
1684 public void testTimedInvokeAllNullTimeUnit() throws Exception {
1685 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1686 List<Callable<String>> l = new ArrayList<Callable<String>>();
1687 l.add(new StringTask());
1688 try {
1689 e.invokeAll(l, MEDIUM_DELAY_MS, null);
1690 shouldThrow();
1691 } catch (NullPointerException success) {
1692 } finally {
1693 joinPool(e);
1694 }
1695 }
1696
1697 /**
1698 * timed invokeAll(empty collection) returns empty collection
1699 */
1700 public void testTimedInvokeAll2() throws Exception {
1701 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1702 try {
1703 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1704 assertTrue(r.isEmpty());
1705 } finally {
1706 joinPool(e);
1707 }
1708 }
1709
1710 /**
1711 * timed invokeAll(c) throws NPE if c has null elements
1712 */
1713 public void testTimedInvokeAll3() throws Exception {
1714 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1715 List<Callable<String>> l = new ArrayList<Callable<String>>();
1716 l.add(new StringTask());
1717 l.add(null);
1718 try {
1719 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1720 shouldThrow();
1721 } catch (NullPointerException success) {
1722 } finally {
1723 joinPool(e);
1724 }
1725 }
1726
1727 /**
1728 * get of element of invokeAll(c) throws exception on failed task
1729 */
1730 public void testTimedInvokeAll4() throws Exception {
1731 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1732 List<Callable<String>> l = new ArrayList<Callable<String>>();
1733 l.add(new NPETask());
1734 List<Future<String>> futures =
1735 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1736 assertEquals(1, futures.size());
1737 try {
1738 futures.get(0).get();
1739 shouldThrow();
1740 } catch (ExecutionException success) {
1741 assertTrue(success.getCause() instanceof NullPointerException);
1742 } finally {
1743 joinPool(e);
1744 }
1745 }
1746
1747 /**
1748 * timed invokeAll(c) returns results of all completed tasks
1749 */
1750 public void testTimedInvokeAll5() throws Exception {
1751 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1752 try {
1753 List<Callable<String>> l = new ArrayList<Callable<String>>();
1754 l.add(new StringTask());
1755 l.add(new StringTask());
1756 List<Future<String>> futures =
1757 e.invokeAll(l, LONG_DELAY_MS, MILLISECONDS);
1758 assertEquals(2, futures.size());
1759 for (Future<String> future : futures)
1760 assertSame(TEST_STRING, future.get());
1761 } finally {
1762 joinPool(e);
1763 }
1764 }
1765
1766 /**
1767 * timed invokeAll(c) cancels tasks not completed by timeout
1768 */
1769 public void testTimedInvokeAll6() throws Exception {
1770 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1771 try {
1772 for (long timeout = timeoutMillis();;) {
1773 List<Callable<String>> tasks = new ArrayList<>();
1774 tasks.add(new StringTask("0"));
1775 tasks.add(Executors.callable(new LongPossiblyInterruptedRunnable(), TEST_STRING));
1776 tasks.add(new StringTask("2"));
1777 long startTime = System.nanoTime();
1778 List<Future<String>> futures =
1779 e.invokeAll(tasks, timeout, MILLISECONDS);
1780 assertEquals(tasks.size(), futures.size());
1781 assertTrue(millisElapsedSince(startTime) >= timeout);
1782 for (Future future : futures)
1783 assertTrue(future.isDone());
1784 assertTrue(futures.get(1).isCancelled());
1785 try {
1786 assertEquals("0", futures.get(0).get());
1787 assertEquals("2", futures.get(2).get());
1788 break;
1789 } catch (CancellationException retryWithLongerTimeout) {
1790 timeout *= 2;
1791 if (timeout >= LONG_DELAY_MS / 2)
1792 fail("expected exactly one task to be cancelled");
1793 }
1794 }
1795 } finally {
1796 joinPool(e);
1797 }
1798 }
1799
1800 /**
1801 * Execution continues if there is at least one thread even if
1802 * thread factory fails to create more
1803 */
1804 public void testFailingThreadFactory() throws InterruptedException {
1805 final ExecutorService e =
1806 new CustomTPE(100, 100,
1807 LONG_DELAY_MS, MILLISECONDS,
1808 new LinkedBlockingQueue<Runnable>(),
1809 new FailingThreadFactory());
1810 try {
1811 final int TASKS = 100;
1812 final CountDownLatch done = new CountDownLatch(TASKS);
1813 for (int k = 0; k < TASKS; ++k)
1814 e.execute(new CheckedRunnable() {
1815 public void realRun() {
1816 done.countDown();
1817 }});
1818 assertTrue(done.await(LONG_DELAY_MS, MILLISECONDS));
1819 } finally {
1820 joinPool(e);
1821 }
1822 }
1823
1824 /**
1825 * allowsCoreThreadTimeOut is by default false.
1826 */
1827 public void testAllowsCoreThreadTimeOut() {
1828 ThreadPoolExecutor p = new CustomTPE(2, 2, 1000, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1829 assertFalse(p.allowsCoreThreadTimeOut());
1830 joinPool(p);
1831 }
1832
1833 /**
1834 * allowCoreThreadTimeOut(true) causes idle threads to time out
1835 */
1836 public void testAllowCoreThreadTimeOut_true() throws Exception {
1837 long keepAliveTime = timeoutMillis();
1838 final ThreadPoolExecutor p =
1839 new CustomTPE(2, 10,
1840 keepAliveTime, MILLISECONDS,
1841 new ArrayBlockingQueue<Runnable>(10));
1842 final CountDownLatch threadStarted = new CountDownLatch(1);
1843 try {
1844 p.allowCoreThreadTimeOut(true);
1845 p.execute(new CheckedRunnable() {
1846 public void realRun() {
1847 threadStarted.countDown();
1848 assertEquals(1, p.getPoolSize());
1849 }});
1850 await(threadStarted);
1851 delay(keepAliveTime);
1852 long startTime = System.nanoTime();
1853 while (p.getPoolSize() > 0
1854 && millisElapsedSince(startTime) < LONG_DELAY_MS)
1855 Thread.yield();
1856 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1857 assertEquals(0, p.getPoolSize());
1858 } finally {
1859 joinPool(p);
1860 }
1861 }
1862
1863 /**
1864 * allowCoreThreadTimeOut(false) causes idle threads not to time out
1865 */
1866 public void testAllowCoreThreadTimeOut_false() throws Exception {
1867 long keepAliveTime = timeoutMillis();
1868 final ThreadPoolExecutor p =
1869 new CustomTPE(2, 10,
1870 keepAliveTime, MILLISECONDS,
1871 new ArrayBlockingQueue<Runnable>(10));
1872 final CountDownLatch threadStarted = new CountDownLatch(1);
1873 try {
1874 p.allowCoreThreadTimeOut(false);
1875 p.execute(new CheckedRunnable() {
1876 public void realRun() throws InterruptedException {
1877 threadStarted.countDown();
1878 assertTrue(p.getPoolSize() >= 1);
1879 }});
1880 delay(2 * keepAliveTime);
1881 assertTrue(p.getPoolSize() >= 1);
1882 } finally {
1883 joinPool(p);
1884 }
1885 }
1886
1887 /**
1888 * get(cancelled task) throws CancellationException
1889 * (in part, a test of CustomTPE itself)
1890 */
1891 public void testGet_cancelled() throws Exception {
1892 final ExecutorService e =
1893 new CustomTPE(1, 1,
1894 LONG_DELAY_MS, MILLISECONDS,
1895 new LinkedBlockingQueue<Runnable>());
1896 try {
1897 final CountDownLatch blockerStarted = new CountDownLatch(1);
1898 final CountDownLatch done = new CountDownLatch(1);
1899 final List<Future<?>> futures = new ArrayList<>();
1900 for (int i = 0; i < 2; i++) {
1901 Runnable r = new CheckedRunnable() { public void realRun()
1902 throws Throwable {
1903 blockerStarted.countDown();
1904 assertTrue(done.await(2 * LONG_DELAY_MS, MILLISECONDS));
1905 }};
1906 futures.add(e.submit(r));
1907 }
1908 assertTrue(blockerStarted.await(LONG_DELAY_MS, MILLISECONDS));
1909 for (Future<?> future : futures) future.cancel(false);
1910 for (Future<?> future : futures) {
1911 try {
1912 future.get();
1913 shouldThrow();
1914 } catch (CancellationException success) {}
1915 try {
1916 future.get(LONG_DELAY_MS, MILLISECONDS);
1917 shouldThrow();
1918 } catch (CancellationException success) {}
1919 assertTrue(future.isCancelled());
1920 assertTrue(future.isDone());
1921 }
1922 done.countDown();
1923 } finally {
1924 joinPool(e);
1925 }
1926 }
1927
1928 }