ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/ThreadPoolExecutorSubclassTest.java
Revision: 1.43
Committed: Mon Sep 28 08:23:49 2015 UTC (8 years, 7 months ago) by jsr166
Branch: MAIN
Changes since 1.42: +1 -1 lines
Log Message:
improve tests for shutdown

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include Andrew Wright, Jeffrey Hayes,
6 * Pat Fisher, Mike Judd.
7 */
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.SECONDS;
11
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.concurrent.ArrayBlockingQueue;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.CancellationException;
18 import java.util.concurrent.CountDownLatch;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.FutureTask;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.RejectedExecutionHandler;
27 import java.util.concurrent.RunnableFuture;
28 import java.util.concurrent.SynchronousQueue;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.TimeoutException;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.locks.Condition;
35 import java.util.concurrent.locks.ReentrantLock;
36
37 import junit.framework.Test;
38 import junit.framework.TestSuite;
39
40 public class ThreadPoolExecutorSubclassTest extends JSR166TestCase {
41 public static void main(String[] args) {
42 main(suite(), args);
43 }
44 public static Test suite() {
45 return new TestSuite(ThreadPoolExecutorSubclassTest.class);
46 }
47
48 static class CustomTask<V> implements RunnableFuture<V> {
49 final Callable<V> callable;
50 final ReentrantLock lock = new ReentrantLock();
51 final Condition cond = lock.newCondition();
52 boolean done;
53 boolean cancelled;
54 V result;
55 Thread thread;
56 Exception exception;
57 CustomTask(Callable<V> c) {
58 if (c == null) throw new NullPointerException();
59 callable = c;
60 }
61 CustomTask(final Runnable r, final V res) {
62 if (r == null) throw new NullPointerException();
63 callable = new Callable<V>() {
64 public V call() throws Exception { r.run(); return res; }};
65 }
66 public boolean isDone() {
67 lock.lock(); try { return done; } finally { lock.unlock() ; }
68 }
69 public boolean isCancelled() {
70 lock.lock(); try { return cancelled; } finally { lock.unlock() ; }
71 }
72 public boolean cancel(boolean mayInterrupt) {
73 lock.lock();
74 try {
75 if (!done) {
76 cancelled = true;
77 done = true;
78 if (mayInterrupt && thread != null)
79 thread.interrupt();
80 return true;
81 }
82 return false;
83 }
84 finally { lock.unlock() ; }
85 }
86 public void run() {
87 lock.lock();
88 try {
89 if (done)
90 return;
91 thread = Thread.currentThread();
92 }
93 finally { lock.unlock() ; }
94 V v = null;
95 Exception e = null;
96 try {
97 v = callable.call();
98 }
99 catch (Exception ex) {
100 e = ex;
101 }
102 lock.lock();
103 try {
104 result = v;
105 exception = e;
106 done = true;
107 thread = null;
108 cond.signalAll();
109 }
110 finally { lock.unlock(); }
111 }
112 public V get() throws InterruptedException, ExecutionException {
113 lock.lock();
114 try {
115 while (!done)
116 cond.await();
117 if (exception != null)
118 throw new ExecutionException(exception);
119 return result;
120 }
121 finally { lock.unlock(); }
122 }
123 public V get(long timeout, TimeUnit unit)
124 throws InterruptedException, ExecutionException, TimeoutException {
125 long nanos = unit.toNanos(timeout);
126 lock.lock();
127 try {
128 for (;;) {
129 if (done) break;
130 if (nanos < 0)
131 throw new TimeoutException();
132 nanos = cond.awaitNanos(nanos);
133 }
134 if (exception != null)
135 throw new ExecutionException(exception);
136 return result;
137 }
138 finally { lock.unlock(); }
139 }
140 }
141
142 static class CustomTPE extends ThreadPoolExecutor {
143 protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
144 return new CustomTask<V>(c);
145 }
146 protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
147 return new CustomTask<V>(r, v);
148 }
149
150 CustomTPE(int corePoolSize,
151 int maximumPoolSize,
152 long keepAliveTime,
153 TimeUnit unit,
154 BlockingQueue<Runnable> workQueue) {
155 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
156 workQueue);
157 }
158 CustomTPE(int corePoolSize,
159 int maximumPoolSize,
160 long keepAliveTime,
161 TimeUnit unit,
162 BlockingQueue<Runnable> workQueue,
163 ThreadFactory threadFactory) {
164 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
165 threadFactory);
166 }
167
168 CustomTPE(int corePoolSize,
169 int maximumPoolSize,
170 long keepAliveTime,
171 TimeUnit unit,
172 BlockingQueue<Runnable> workQueue,
173 RejectedExecutionHandler handler) {
174 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
175 handler);
176 }
177 CustomTPE(int corePoolSize,
178 int maximumPoolSize,
179 long keepAliveTime,
180 TimeUnit unit,
181 BlockingQueue<Runnable> workQueue,
182 ThreadFactory threadFactory,
183 RejectedExecutionHandler handler) {
184 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
185 workQueue, threadFactory, handler);
186 }
187
188 final CountDownLatch beforeCalled = new CountDownLatch(1);
189 final CountDownLatch afterCalled = new CountDownLatch(1);
190 final CountDownLatch terminatedCalled = new CountDownLatch(1);
191
192 public CustomTPE() {
193 super(1, 1, LONG_DELAY_MS, MILLISECONDS, new SynchronousQueue<Runnable>());
194 }
195 protected void beforeExecute(Thread t, Runnable r) {
196 beforeCalled.countDown();
197 }
198 protected void afterExecute(Runnable r, Throwable t) {
199 afterCalled.countDown();
200 }
201 protected void terminated() {
202 terminatedCalled.countDown();
203 }
204
205 public boolean beforeCalled() {
206 return beforeCalled.getCount() == 0;
207 }
208 public boolean afterCalled() {
209 return afterCalled.getCount() == 0;
210 }
211 public boolean terminatedCalled() {
212 return terminatedCalled.getCount() == 0;
213 }
214 }
215
216 static class FailingThreadFactory implements ThreadFactory {
217 int calls = 0;
218 public Thread newThread(Runnable r) {
219 if (++calls > 1) return null;
220 return new Thread(r);
221 }
222 }
223
224 /**
225 * execute successfully executes a runnable
226 */
227 public void testExecute() throws InterruptedException {
228 final ThreadPoolExecutor p =
229 new CustomTPE(1, 1,
230 LONG_DELAY_MS, MILLISECONDS,
231 new ArrayBlockingQueue<Runnable>(10));
232 final CountDownLatch done = new CountDownLatch(1);
233 final Runnable task = new CheckedRunnable() {
234 public void realRun() {
235 done.countDown();
236 }};
237 try {
238 p.execute(task);
239 assertTrue(done.await(SMALL_DELAY_MS, MILLISECONDS));
240 } finally {
241 joinPool(p);
242 }
243 }
244
245 /**
246 * getActiveCount increases but doesn't overestimate, when a
247 * thread becomes active
248 */
249 public void testGetActiveCount() throws InterruptedException {
250 final ThreadPoolExecutor p =
251 new CustomTPE(2, 2,
252 LONG_DELAY_MS, MILLISECONDS,
253 new ArrayBlockingQueue<Runnable>(10));
254 final CountDownLatch threadStarted = new CountDownLatch(1);
255 final CountDownLatch done = new CountDownLatch(1);
256 try {
257 assertEquals(0, p.getActiveCount());
258 p.execute(new CheckedRunnable() {
259 public void realRun() throws InterruptedException {
260 threadStarted.countDown();
261 assertEquals(1, p.getActiveCount());
262 done.await();
263 }});
264 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
265 assertEquals(1, p.getActiveCount());
266 } finally {
267 done.countDown();
268 joinPool(p);
269 }
270 }
271
272 /**
273 * prestartCoreThread starts a thread if under corePoolSize, else doesn't
274 */
275 public void testPrestartCoreThread() {
276 ThreadPoolExecutor p = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
277 assertEquals(0, p.getPoolSize());
278 assertTrue(p.prestartCoreThread());
279 assertEquals(1, p.getPoolSize());
280 assertTrue(p.prestartCoreThread());
281 assertEquals(2, p.getPoolSize());
282 assertFalse(p.prestartCoreThread());
283 assertEquals(2, p.getPoolSize());
284 joinPool(p);
285 }
286
287 /**
288 * prestartAllCoreThreads starts all corePoolSize threads
289 */
290 public void testPrestartAllCoreThreads() {
291 ThreadPoolExecutor p = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
292 assertEquals(0, p.getPoolSize());
293 p.prestartAllCoreThreads();
294 assertEquals(2, p.getPoolSize());
295 p.prestartAllCoreThreads();
296 assertEquals(2, p.getPoolSize());
297 joinPool(p);
298 }
299
300 /**
301 * getCompletedTaskCount increases, but doesn't overestimate,
302 * when tasks complete
303 */
304 public void testGetCompletedTaskCount() throws InterruptedException {
305 final ThreadPoolExecutor p =
306 new CustomTPE(2, 2,
307 LONG_DELAY_MS, MILLISECONDS,
308 new ArrayBlockingQueue<Runnable>(10));
309 final CountDownLatch threadStarted = new CountDownLatch(1);
310 final CountDownLatch threadProceed = new CountDownLatch(1);
311 final CountDownLatch threadDone = new CountDownLatch(1);
312 try {
313 assertEquals(0, p.getCompletedTaskCount());
314 p.execute(new CheckedRunnable() {
315 public void realRun() throws InterruptedException {
316 threadStarted.countDown();
317 assertEquals(0, p.getCompletedTaskCount());
318 threadProceed.await();
319 threadDone.countDown();
320 }});
321 await(threadStarted);
322 assertEquals(0, p.getCompletedTaskCount());
323 threadProceed.countDown();
324 threadDone.await();
325 long startTime = System.nanoTime();
326 while (p.getCompletedTaskCount() != 1) {
327 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
328 fail("timed out");
329 Thread.yield();
330 }
331 } finally {
332 joinPool(p);
333 }
334 }
335
336 /**
337 * getCorePoolSize returns size given in constructor if not otherwise set
338 */
339 public void testGetCorePoolSize() {
340 ThreadPoolExecutor p = new CustomTPE(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
341 assertEquals(1, p.getCorePoolSize());
342 joinPool(p);
343 }
344
345 /**
346 * getKeepAliveTime returns value given in constructor if not otherwise set
347 */
348 public void testGetKeepAliveTime() {
349 ThreadPoolExecutor p = new CustomTPE(2, 2, 1000, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
350 assertEquals(1, p.getKeepAliveTime(SECONDS));
351 joinPool(p);
352 }
353
354 /**
355 * getThreadFactory returns factory in constructor if not set
356 */
357 public void testGetThreadFactory() {
358 ThreadFactory tf = new SimpleThreadFactory();
359 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), tf, new NoOpREHandler());
360 assertSame(tf, p.getThreadFactory());
361 joinPool(p);
362 }
363
364 /**
365 * setThreadFactory sets the thread factory returned by getThreadFactory
366 */
367 public void testSetThreadFactory() {
368 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
369 ThreadFactory tf = new SimpleThreadFactory();
370 p.setThreadFactory(tf);
371 assertSame(tf, p.getThreadFactory());
372 joinPool(p);
373 }
374
375 /**
376 * setThreadFactory(null) throws NPE
377 */
378 public void testSetThreadFactoryNull() {
379 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
380 try {
381 p.setThreadFactory(null);
382 shouldThrow();
383 } catch (NullPointerException success) {
384 } finally {
385 joinPool(p);
386 }
387 }
388
389 /**
390 * getRejectedExecutionHandler returns handler in constructor if not set
391 */
392 public void testGetRejectedExecutionHandler() {
393 RejectedExecutionHandler h = new NoOpREHandler();
394 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), h);
395 assertSame(h, p.getRejectedExecutionHandler());
396 joinPool(p);
397 }
398
399 /**
400 * setRejectedExecutionHandler sets the handler returned by
401 * getRejectedExecutionHandler
402 */
403 public void testSetRejectedExecutionHandler() {
404 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
405 RejectedExecutionHandler h = new NoOpREHandler();
406 p.setRejectedExecutionHandler(h);
407 assertSame(h, p.getRejectedExecutionHandler());
408 joinPool(p);
409 }
410
411 /**
412 * setRejectedExecutionHandler(null) throws NPE
413 */
414 public void testSetRejectedExecutionHandlerNull() {
415 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
416 try {
417 p.setRejectedExecutionHandler(null);
418 shouldThrow();
419 } catch (NullPointerException success) {
420 } finally {
421 joinPool(p);
422 }
423 }
424
425 /**
426 * getLargestPoolSize increases, but doesn't overestimate, when
427 * multiple threads active
428 */
429 public void testGetLargestPoolSize() throws InterruptedException {
430 final int THREADS = 3;
431 final ThreadPoolExecutor p =
432 new CustomTPE(THREADS, THREADS,
433 LONG_DELAY_MS, MILLISECONDS,
434 new ArrayBlockingQueue<Runnable>(10));
435 final CountDownLatch threadsStarted = new CountDownLatch(THREADS);
436 final CountDownLatch done = new CountDownLatch(1);
437 try {
438 assertEquals(0, p.getLargestPoolSize());
439 for (int i = 0; i < THREADS; i++)
440 p.execute(new CheckedRunnable() {
441 public void realRun() throws InterruptedException {
442 threadsStarted.countDown();
443 done.await();
444 assertEquals(THREADS, p.getLargestPoolSize());
445 }});
446 assertTrue(threadsStarted.await(SMALL_DELAY_MS, MILLISECONDS));
447 assertEquals(THREADS, p.getLargestPoolSize());
448 } finally {
449 done.countDown();
450 joinPool(p);
451 assertEquals(THREADS, p.getLargestPoolSize());
452 }
453 }
454
455 /**
456 * getMaximumPoolSize returns value given in constructor if not
457 * otherwise set
458 */
459 public void testGetMaximumPoolSize() {
460 ThreadPoolExecutor p = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
461 assertEquals(2, p.getMaximumPoolSize());
462 joinPool(p);
463 }
464
465 /**
466 * getPoolSize increases, but doesn't overestimate, when threads
467 * become active
468 */
469 public void testGetPoolSize() throws InterruptedException {
470 final ThreadPoolExecutor p =
471 new CustomTPE(1, 1,
472 LONG_DELAY_MS, MILLISECONDS,
473 new ArrayBlockingQueue<Runnable>(10));
474 final CountDownLatch threadStarted = new CountDownLatch(1);
475 final CountDownLatch done = new CountDownLatch(1);
476 try {
477 assertEquals(0, p.getPoolSize());
478 p.execute(new CheckedRunnable() {
479 public void realRun() throws InterruptedException {
480 threadStarted.countDown();
481 assertEquals(1, p.getPoolSize());
482 done.await();
483 }});
484 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
485 assertEquals(1, p.getPoolSize());
486 } finally {
487 done.countDown();
488 joinPool(p);
489 }
490 }
491
492 /**
493 * getTaskCount increases, but doesn't overestimate, when tasks submitted
494 */
495 public void testGetTaskCount() throws InterruptedException {
496 final ThreadPoolExecutor p =
497 new CustomTPE(1, 1,
498 LONG_DELAY_MS, MILLISECONDS,
499 new ArrayBlockingQueue<Runnable>(10));
500 final CountDownLatch threadStarted = new CountDownLatch(1);
501 final CountDownLatch done = new CountDownLatch(1);
502 try {
503 assertEquals(0, p.getTaskCount());
504 p.execute(new CheckedRunnable() {
505 public void realRun() throws InterruptedException {
506 threadStarted.countDown();
507 assertEquals(1, p.getTaskCount());
508 done.await();
509 }});
510 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
511 assertEquals(1, p.getTaskCount());
512 } finally {
513 done.countDown();
514 joinPool(p);
515 }
516 }
517
518 /**
519 * isShutdown is false before shutdown, true after
520 */
521 public void testIsShutdown() {
522
523 ThreadPoolExecutor p = new CustomTPE(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
524 assertFalse(p.isShutdown());
525 try { p.shutdown(); } catch (SecurityException ok) { return; }
526 assertTrue(p.isShutdown());
527 joinPool(p);
528 }
529
530 /**
531 * isTerminated is false before termination, true after
532 */
533 public void testIsTerminated() throws InterruptedException {
534 final ThreadPoolExecutor p =
535 new CustomTPE(1, 1,
536 LONG_DELAY_MS, MILLISECONDS,
537 new ArrayBlockingQueue<Runnable>(10));
538 final CountDownLatch threadStarted = new CountDownLatch(1);
539 final CountDownLatch done = new CountDownLatch(1);
540 try {
541 assertFalse(p.isTerminating());
542 p.execute(new CheckedRunnable() {
543 public void realRun() throws InterruptedException {
544 assertFalse(p.isTerminating());
545 threadStarted.countDown();
546 done.await();
547 }});
548 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
549 assertFalse(p.isTerminating());
550 done.countDown();
551 } finally {
552 try { p.shutdown(); } catch (SecurityException ok) { return; }
553 }
554 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
555 assertTrue(p.isTerminated());
556 assertFalse(p.isTerminating());
557 }
558
559 /**
560 * isTerminating is not true when running or when terminated
561 */
562 public void testIsTerminating() throws InterruptedException {
563 final ThreadPoolExecutor p =
564 new CustomTPE(1, 1,
565 LONG_DELAY_MS, MILLISECONDS,
566 new ArrayBlockingQueue<Runnable>(10));
567 final CountDownLatch threadStarted = new CountDownLatch(1);
568 final CountDownLatch done = new CountDownLatch(1);
569 try {
570 assertFalse(p.isTerminating());
571 p.execute(new CheckedRunnable() {
572 public void realRun() throws InterruptedException {
573 assertFalse(p.isTerminating());
574 threadStarted.countDown();
575 done.await();
576 }});
577 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
578 assertFalse(p.isTerminating());
579 done.countDown();
580 } finally {
581 try { p.shutdown(); } catch (SecurityException ok) { return; }
582 }
583 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
584 assertTrue(p.isTerminated());
585 assertFalse(p.isTerminating());
586 }
587
588 /**
589 * getQueue returns the work queue, which contains queued tasks
590 */
591 public void testGetQueue() throws InterruptedException {
592 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
593 final ThreadPoolExecutor p =
594 new CustomTPE(1, 1,
595 LONG_DELAY_MS, MILLISECONDS,
596 q);
597 final CountDownLatch threadStarted = new CountDownLatch(1);
598 final CountDownLatch done = new CountDownLatch(1);
599 try {
600 FutureTask[] tasks = new FutureTask[5];
601 for (int i = 0; i < tasks.length; i++) {
602 Callable task = new CheckedCallable<Boolean>() {
603 public Boolean realCall() throws InterruptedException {
604 threadStarted.countDown();
605 assertSame(q, p.getQueue());
606 done.await();
607 return Boolean.TRUE;
608 }};
609 tasks[i] = new FutureTask(task);
610 p.execute(tasks[i]);
611 }
612 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
613 assertSame(q, p.getQueue());
614 assertFalse(q.contains(tasks[0]));
615 assertTrue(q.contains(tasks[tasks.length - 1]));
616 assertEquals(tasks.length - 1, q.size());
617 } finally {
618 done.countDown();
619 joinPool(p);
620 }
621 }
622
623 /**
624 * remove(task) removes queued task, and fails to remove active task
625 */
626 public void testRemove() throws InterruptedException {
627 BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
628 final ThreadPoolExecutor p =
629 new CustomTPE(1, 1,
630 LONG_DELAY_MS, MILLISECONDS,
631 q);
632 Runnable[] tasks = new Runnable[6];
633 final CountDownLatch threadStarted = new CountDownLatch(1);
634 final CountDownLatch done = new CountDownLatch(1);
635 try {
636 for (int i = 0; i < tasks.length; i++) {
637 tasks[i] = new CheckedRunnable() {
638 public void realRun() throws InterruptedException {
639 threadStarted.countDown();
640 done.await();
641 }};
642 p.execute(tasks[i]);
643 }
644 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
645 assertFalse(p.remove(tasks[0]));
646 assertTrue(q.contains(tasks[4]));
647 assertTrue(q.contains(tasks[3]));
648 assertTrue(p.remove(tasks[4]));
649 assertFalse(p.remove(tasks[4]));
650 assertFalse(q.contains(tasks[4]));
651 assertTrue(q.contains(tasks[3]));
652 assertTrue(p.remove(tasks[3]));
653 assertFalse(q.contains(tasks[3]));
654 } finally {
655 done.countDown();
656 joinPool(p);
657 }
658 }
659
660 /**
661 * purge removes cancelled tasks from the queue
662 */
663 public void testPurge() throws InterruptedException {
664 final CountDownLatch threadStarted = new CountDownLatch(1);
665 final CountDownLatch done = new CountDownLatch(1);
666 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
667 final ThreadPoolExecutor p =
668 new CustomTPE(1, 1,
669 LONG_DELAY_MS, MILLISECONDS,
670 q);
671 FutureTask[] tasks = new FutureTask[5];
672 try {
673 for (int i = 0; i < tasks.length; i++) {
674 Callable task = new CheckedCallable<Boolean>() {
675 public Boolean realCall() throws InterruptedException {
676 threadStarted.countDown();
677 done.await();
678 return Boolean.TRUE;
679 }};
680 tasks[i] = new FutureTask(task);
681 p.execute(tasks[i]);
682 }
683 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
684 assertEquals(tasks.length, p.getTaskCount());
685 assertEquals(tasks.length - 1, q.size());
686 assertEquals(1L, p.getActiveCount());
687 assertEquals(0L, p.getCompletedTaskCount());
688 tasks[4].cancel(true);
689 tasks[3].cancel(false);
690 p.purge();
691 assertEquals(tasks.length - 3, q.size());
692 assertEquals(tasks.length - 2, p.getTaskCount());
693 p.purge(); // Nothing to do
694 assertEquals(tasks.length - 3, q.size());
695 assertEquals(tasks.length - 2, p.getTaskCount());
696 } finally {
697 done.countDown();
698 joinPool(p);
699 }
700 }
701
702 /**
703 * shutdownNow returns a list containing tasks that were not run,
704 * and those tasks are drained from the queue
705 */
706 public void testShutdownNow() throws InterruptedException {
707 final int poolSize = 2;
708 final int count = 5;
709 final AtomicInteger ran = new AtomicInteger(0);
710 ThreadPoolExecutor p =
711 new CustomTPE(poolSize, poolSize, LONG_DELAY_MS, MILLISECONDS,
712 new ArrayBlockingQueue<Runnable>(10));
713 CountDownLatch threadsStarted = new CountDownLatch(poolSize);
714 Runnable waiter = new CheckedRunnable() { public void realRun() {
715 threadsStarted.countDown();
716 try {
717 MILLISECONDS.sleep(2 * LONG_DELAY_MS);
718 } catch (InterruptedException success) {}
719 ran.getAndIncrement();
720 }};
721 for (int i = 0; i < count; i++)
722 p.execute(waiter);
723 assertTrue(threadsStarted.await(LONG_DELAY_MS, MILLISECONDS));
724 assertEquals(poolSize, p.getActiveCount());
725 assertEquals(0, p.getCompletedTaskCount());
726 final List<Runnable> queuedTasks;
727 try {
728 queuedTasks = p.shutdownNow();
729 } catch (SecurityException ok) {
730 return; // Allowed in case test doesn't have privs
731 }
732 assertTrue(p.isShutdown());
733 assertTrue(p.getQueue().isEmpty());
734 assertEquals(count - poolSize, queuedTasks.size());
735 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
736 assertTrue(p.isTerminated());
737 assertEquals(poolSize, ran.get());
738 assertEquals(poolSize, p.getCompletedTaskCount());
739 }
740
741 // Exception Tests
742
743 /**
744 * Constructor throws if corePoolSize argument is less than zero
745 */
746 public void testConstructor1() {
747 try {
748 new CustomTPE(-1, 1, 1L, SECONDS,
749 new ArrayBlockingQueue<Runnable>(10));
750 shouldThrow();
751 } catch (IllegalArgumentException success) {}
752 }
753
754 /**
755 * Constructor throws if maximumPoolSize is less than zero
756 */
757 public void testConstructor2() {
758 try {
759 new CustomTPE(1, -1, 1L, SECONDS,
760 new ArrayBlockingQueue<Runnable>(10));
761 shouldThrow();
762 } catch (IllegalArgumentException success) {}
763 }
764
765 /**
766 * Constructor throws if maximumPoolSize is equal to zero
767 */
768 public void testConstructor3() {
769 try {
770 new CustomTPE(1, 0, 1L, SECONDS,
771 new ArrayBlockingQueue<Runnable>(10));
772 shouldThrow();
773 } catch (IllegalArgumentException success) {}
774 }
775
776 /**
777 * Constructor throws if keepAliveTime is less than zero
778 */
779 public void testConstructor4() {
780 try {
781 new CustomTPE(1, 2, -1L, SECONDS,
782 new ArrayBlockingQueue<Runnable>(10));
783 shouldThrow();
784 } catch (IllegalArgumentException success) {}
785 }
786
787 /**
788 * Constructor throws if corePoolSize is greater than the maximumPoolSize
789 */
790 public void testConstructor5() {
791 try {
792 new CustomTPE(2, 1, 1L, SECONDS,
793 new ArrayBlockingQueue<Runnable>(10));
794 shouldThrow();
795 } catch (IllegalArgumentException success) {}
796 }
797
798 /**
799 * Constructor throws if workQueue is set to null
800 */
801 public void testConstructorNullPointerException() {
802 try {
803 new CustomTPE(1, 2, 1L, SECONDS, null);
804 shouldThrow();
805 } catch (NullPointerException success) {}
806 }
807
808 /**
809 * Constructor throws if corePoolSize argument is less than zero
810 */
811 public void testConstructor6() {
812 try {
813 new CustomTPE(-1, 1, 1L, SECONDS,
814 new ArrayBlockingQueue<Runnable>(10),
815 new SimpleThreadFactory());
816 shouldThrow();
817 } catch (IllegalArgumentException success) {}
818 }
819
820 /**
821 * Constructor throws if maximumPoolSize is less than zero
822 */
823 public void testConstructor7() {
824 try {
825 new CustomTPE(1,-1, 1L, SECONDS,
826 new ArrayBlockingQueue<Runnable>(10),
827 new SimpleThreadFactory());
828 shouldThrow();
829 } catch (IllegalArgumentException success) {}
830 }
831
832 /**
833 * Constructor throws if maximumPoolSize is equal to zero
834 */
835 public void testConstructor8() {
836 try {
837 new CustomTPE(1, 0, 1L, SECONDS,
838 new ArrayBlockingQueue<Runnable>(10),
839 new SimpleThreadFactory());
840 shouldThrow();
841 } catch (IllegalArgumentException success) {}
842 }
843
844 /**
845 * Constructor throws if keepAliveTime is less than zero
846 */
847 public void testConstructor9() {
848 try {
849 new CustomTPE(1, 2, -1L, SECONDS,
850 new ArrayBlockingQueue<Runnable>(10),
851 new SimpleThreadFactory());
852 shouldThrow();
853 } catch (IllegalArgumentException success) {}
854 }
855
856 /**
857 * Constructor throws if corePoolSize is greater than the maximumPoolSize
858 */
859 public void testConstructor10() {
860 try {
861 new CustomTPE(2, 1, 1L, SECONDS,
862 new ArrayBlockingQueue<Runnable>(10),
863 new SimpleThreadFactory());
864 shouldThrow();
865 } catch (IllegalArgumentException success) {}
866 }
867
868 /**
869 * Constructor throws if workQueue is set to null
870 */
871 public void testConstructorNullPointerException2() {
872 try {
873 new CustomTPE(1, 2, 1L, SECONDS, null, new SimpleThreadFactory());
874 shouldThrow();
875 } catch (NullPointerException success) {}
876 }
877
878 /**
879 * Constructor throws if threadFactory is set to null
880 */
881 public void testConstructorNullPointerException3() {
882 try {
883 new CustomTPE(1, 2, 1L, SECONDS,
884 new ArrayBlockingQueue<Runnable>(10),
885 (ThreadFactory) null);
886 shouldThrow();
887 } catch (NullPointerException success) {}
888 }
889
890 /**
891 * Constructor throws if corePoolSize argument is less than zero
892 */
893 public void testConstructor11() {
894 try {
895 new CustomTPE(-1, 1, 1L, SECONDS,
896 new ArrayBlockingQueue<Runnable>(10),
897 new NoOpREHandler());
898 shouldThrow();
899 } catch (IllegalArgumentException success) {}
900 }
901
902 /**
903 * Constructor throws if maximumPoolSize is less than zero
904 */
905 public void testConstructor12() {
906 try {
907 new CustomTPE(1, -1, 1L, SECONDS,
908 new ArrayBlockingQueue<Runnable>(10),
909 new NoOpREHandler());
910 shouldThrow();
911 } catch (IllegalArgumentException success) {}
912 }
913
914 /**
915 * Constructor throws if maximumPoolSize is equal to zero
916 */
917 public void testConstructor13() {
918 try {
919 new CustomTPE(1, 0, 1L, SECONDS,
920 new ArrayBlockingQueue<Runnable>(10),
921 new NoOpREHandler());
922 shouldThrow();
923 } catch (IllegalArgumentException success) {}
924 }
925
926 /**
927 * Constructor throws if keepAliveTime is less than zero
928 */
929 public void testConstructor14() {
930 try {
931 new CustomTPE(1, 2, -1L, SECONDS,
932 new ArrayBlockingQueue<Runnable>(10),
933 new NoOpREHandler());
934 shouldThrow();
935 } catch (IllegalArgumentException success) {}
936 }
937
938 /**
939 * Constructor throws if corePoolSize is greater than the maximumPoolSize
940 */
941 public void testConstructor15() {
942 try {
943 new CustomTPE(2, 1, 1L, SECONDS,
944 new ArrayBlockingQueue<Runnable>(10),
945 new NoOpREHandler());
946 shouldThrow();
947 } catch (IllegalArgumentException success) {}
948 }
949
950 /**
951 * Constructor throws if workQueue is set to null
952 */
953 public void testConstructorNullPointerException4() {
954 try {
955 new CustomTPE(1, 2, 1L, SECONDS,
956 null,
957 new NoOpREHandler());
958 shouldThrow();
959 } catch (NullPointerException success) {}
960 }
961
962 /**
963 * Constructor throws if handler is set to null
964 */
965 public void testConstructorNullPointerException5() {
966 try {
967 new CustomTPE(1, 2, 1L, SECONDS,
968 new ArrayBlockingQueue<Runnable>(10),
969 (RejectedExecutionHandler) null);
970 shouldThrow();
971 } catch (NullPointerException success) {}
972 }
973
974 /**
975 * Constructor throws if corePoolSize argument is less than zero
976 */
977 public void testConstructor16() {
978 try {
979 new CustomTPE(-1, 1, 1L, SECONDS,
980 new ArrayBlockingQueue<Runnable>(10),
981 new SimpleThreadFactory(),
982 new NoOpREHandler());
983 shouldThrow();
984 } catch (IllegalArgumentException success) {}
985 }
986
987 /**
988 * Constructor throws if maximumPoolSize is less than zero
989 */
990 public void testConstructor17() {
991 try {
992 new CustomTPE(1, -1, 1L, SECONDS,
993 new ArrayBlockingQueue<Runnable>(10),
994 new SimpleThreadFactory(),
995 new NoOpREHandler());
996 shouldThrow();
997 } catch (IllegalArgumentException success) {}
998 }
999
1000 /**
1001 * Constructor throws if maximumPoolSize is equal to zero
1002 */
1003 public void testConstructor18() {
1004 try {
1005 new CustomTPE(1, 0, 1L, SECONDS,
1006 new ArrayBlockingQueue<Runnable>(10),
1007 new SimpleThreadFactory(),
1008 new NoOpREHandler());
1009 shouldThrow();
1010 } catch (IllegalArgumentException success) {}
1011 }
1012
1013 /**
1014 * Constructor throws if keepAliveTime is less than zero
1015 */
1016 public void testConstructor19() {
1017 try {
1018 new CustomTPE(1, 2, -1L, SECONDS,
1019 new ArrayBlockingQueue<Runnable>(10),
1020 new SimpleThreadFactory(),
1021 new NoOpREHandler());
1022 shouldThrow();
1023 } catch (IllegalArgumentException success) {}
1024 }
1025
1026 /**
1027 * Constructor throws if corePoolSize is greater than the maximumPoolSize
1028 */
1029 public void testConstructor20() {
1030 try {
1031 new CustomTPE(2, 1, 1L, SECONDS,
1032 new ArrayBlockingQueue<Runnable>(10),
1033 new SimpleThreadFactory(),
1034 new NoOpREHandler());
1035 shouldThrow();
1036 } catch (IllegalArgumentException success) {}
1037 }
1038
1039 /**
1040 * Constructor throws if workQueue is null
1041 */
1042 public void testConstructorNullPointerException6() {
1043 try {
1044 new CustomTPE(1, 2, 1L, SECONDS,
1045 null,
1046 new SimpleThreadFactory(),
1047 new NoOpREHandler());
1048 shouldThrow();
1049 } catch (NullPointerException success) {}
1050 }
1051
1052 /**
1053 * Constructor throws if handler is null
1054 */
1055 public void testConstructorNullPointerException7() {
1056 try {
1057 new CustomTPE(1, 2, 1L, SECONDS,
1058 new ArrayBlockingQueue<Runnable>(10),
1059 new SimpleThreadFactory(),
1060 (RejectedExecutionHandler) null);
1061 shouldThrow();
1062 } catch (NullPointerException success) {}
1063 }
1064
1065 /**
1066 * Constructor throws if ThreadFactory is null
1067 */
1068 public void testConstructorNullPointerException8() {
1069 try {
1070 new CustomTPE(1, 2, 1L, SECONDS,
1071 new ArrayBlockingQueue<Runnable>(10),
1072 (ThreadFactory) null,
1073 new NoOpREHandler());
1074 shouldThrow();
1075 } catch (NullPointerException success) {}
1076 }
1077
1078 /**
1079 * execute throws RejectedExecutionException if saturated.
1080 */
1081 public void testSaturatedExecute() {
1082 ThreadPoolExecutor p =
1083 new CustomTPE(1, 1,
1084 LONG_DELAY_MS, MILLISECONDS,
1085 new ArrayBlockingQueue<Runnable>(1));
1086 final CountDownLatch done = new CountDownLatch(1);
1087 try {
1088 Runnable task = new CheckedRunnable() {
1089 public void realRun() throws InterruptedException {
1090 done.await();
1091 }};
1092 for (int i = 0; i < 2; ++i)
1093 p.execute(task);
1094 for (int i = 0; i < 2; ++i) {
1095 try {
1096 p.execute(task);
1097 shouldThrow();
1098 } catch (RejectedExecutionException success) {}
1099 assertTrue(p.getTaskCount() <= 2);
1100 }
1101 } finally {
1102 done.countDown();
1103 joinPool(p);
1104 }
1105 }
1106
1107 /**
1108 * executor using CallerRunsPolicy runs task if saturated.
1109 */
1110 public void testSaturatedExecute2() {
1111 RejectedExecutionHandler h = new CustomTPE.CallerRunsPolicy();
1112 ThreadPoolExecutor p = new CustomTPE(1, 1,
1113 LONG_DELAY_MS, MILLISECONDS,
1114 new ArrayBlockingQueue<Runnable>(1),
1115 h);
1116 try {
1117 TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5];
1118 for (int i = 0; i < tasks.length; ++i)
1119 tasks[i] = new TrackedNoOpRunnable();
1120 TrackedLongRunnable mr = new TrackedLongRunnable();
1121 p.execute(mr);
1122 for (int i = 0; i < tasks.length; ++i)
1123 p.execute(tasks[i]);
1124 for (int i = 1; i < tasks.length; ++i)
1125 assertTrue(tasks[i].done);
1126 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1127 } finally {
1128 joinPool(p);
1129 }
1130 }
1131
1132 /**
1133 * executor using DiscardPolicy drops task if saturated.
1134 */
1135 public void testSaturatedExecute3() {
1136 RejectedExecutionHandler h = new CustomTPE.DiscardPolicy();
1137 ThreadPoolExecutor p =
1138 new CustomTPE(1, 1,
1139 LONG_DELAY_MS, MILLISECONDS,
1140 new ArrayBlockingQueue<Runnable>(1),
1141 h);
1142 try {
1143 TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5];
1144 for (int i = 0; i < tasks.length; ++i)
1145 tasks[i] = new TrackedNoOpRunnable();
1146 p.execute(new TrackedLongRunnable());
1147 for (TrackedNoOpRunnable task : tasks)
1148 p.execute(task);
1149 for (TrackedNoOpRunnable task : tasks)
1150 assertFalse(task.done);
1151 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1152 } finally {
1153 joinPool(p);
1154 }
1155 }
1156
1157 /**
1158 * executor using DiscardOldestPolicy drops oldest task if saturated.
1159 */
1160 public void testSaturatedExecute4() {
1161 RejectedExecutionHandler h = new CustomTPE.DiscardOldestPolicy();
1162 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1163 try {
1164 p.execute(new TrackedLongRunnable());
1165 TrackedLongRunnable r2 = new TrackedLongRunnable();
1166 p.execute(r2);
1167 assertTrue(p.getQueue().contains(r2));
1168 TrackedNoOpRunnable r3 = new TrackedNoOpRunnable();
1169 p.execute(r3);
1170 assertFalse(p.getQueue().contains(r2));
1171 assertTrue(p.getQueue().contains(r3));
1172 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1173 } finally {
1174 joinPool(p);
1175 }
1176 }
1177
1178 /**
1179 * execute throws RejectedExecutionException if shutdown
1180 */
1181 public void testRejectedExecutionExceptionOnShutdown() {
1182 ThreadPoolExecutor p =
1183 new CustomTPE(1,1,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(1));
1184 try { p.shutdown(); } catch (SecurityException ok) { return; }
1185 try {
1186 p.execute(new NoOpRunnable());
1187 shouldThrow();
1188 } catch (RejectedExecutionException success) {}
1189
1190 joinPool(p);
1191 }
1192
1193 /**
1194 * execute using CallerRunsPolicy drops task on shutdown
1195 */
1196 public void testCallerRunsOnShutdown() {
1197 RejectedExecutionHandler h = new CustomTPE.CallerRunsPolicy();
1198 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1199
1200 try { p.shutdown(); } catch (SecurityException ok) { return; }
1201 try {
1202 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1203 p.execute(r);
1204 assertFalse(r.done);
1205 } finally {
1206 joinPool(p);
1207 }
1208 }
1209
1210 /**
1211 * execute using DiscardPolicy drops task on shutdown
1212 */
1213 public void testDiscardOnShutdown() {
1214 RejectedExecutionHandler h = new CustomTPE.DiscardPolicy();
1215 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1216
1217 try { p.shutdown(); } catch (SecurityException ok) { return; }
1218 try {
1219 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1220 p.execute(r);
1221 assertFalse(r.done);
1222 } finally {
1223 joinPool(p);
1224 }
1225 }
1226
1227 /**
1228 * execute using DiscardOldestPolicy drops task on shutdown
1229 */
1230 public void testDiscardOldestOnShutdown() {
1231 RejectedExecutionHandler h = new CustomTPE.DiscardOldestPolicy();
1232 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1233
1234 try { p.shutdown(); } catch (SecurityException ok) { return; }
1235 try {
1236 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1237 p.execute(r);
1238 assertFalse(r.done);
1239 } finally {
1240 joinPool(p);
1241 }
1242 }
1243
1244 /**
1245 * execute(null) throws NPE
1246 */
1247 public void testExecuteNull() {
1248 ThreadPoolExecutor p =
1249 new CustomTPE(1, 2, 1L, SECONDS,
1250 new ArrayBlockingQueue<Runnable>(10));
1251 try {
1252 p.execute(null);
1253 shouldThrow();
1254 } catch (NullPointerException success) {}
1255
1256 joinPool(p);
1257 }
1258
1259 /**
1260 * setCorePoolSize of negative value throws IllegalArgumentException
1261 */
1262 public void testCorePoolSizeIllegalArgumentException() {
1263 ThreadPoolExecutor p =
1264 new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1265 try {
1266 p.setCorePoolSize(-1);
1267 shouldThrow();
1268 } catch (IllegalArgumentException success) {
1269 } finally {
1270 try { p.shutdown(); } catch (SecurityException ok) { return; }
1271 }
1272 joinPool(p);
1273 }
1274
1275 /**
1276 * setMaximumPoolSize(int) throws IllegalArgumentException
1277 * if given a value less the core pool size
1278 */
1279 public void testMaximumPoolSizeIllegalArgumentException() {
1280 ThreadPoolExecutor p =
1281 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1282 try {
1283 p.setMaximumPoolSize(1);
1284 shouldThrow();
1285 } catch (IllegalArgumentException success) {
1286 } finally {
1287 try { p.shutdown(); } catch (SecurityException ok) { return; }
1288 }
1289 joinPool(p);
1290 }
1291
1292 /**
1293 * setMaximumPoolSize throws IllegalArgumentException
1294 * if given a negative value
1295 */
1296 public void testMaximumPoolSizeIllegalArgumentException2() {
1297 ThreadPoolExecutor p =
1298 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1299 try {
1300 p.setMaximumPoolSize(-1);
1301 shouldThrow();
1302 } catch (IllegalArgumentException success) {
1303 } finally {
1304 try { p.shutdown(); } catch (SecurityException ok) { return; }
1305 }
1306 joinPool(p);
1307 }
1308
1309 /**
1310 * setKeepAliveTime throws IllegalArgumentException
1311 * when given a negative value
1312 */
1313 public void testKeepAliveTimeIllegalArgumentException() {
1314 ThreadPoolExecutor p =
1315 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1316
1317 try {
1318 p.setKeepAliveTime(-1,MILLISECONDS);
1319 shouldThrow();
1320 } catch (IllegalArgumentException success) {
1321 } finally {
1322 try { p.shutdown(); } catch (SecurityException ok) { return; }
1323 }
1324 joinPool(p);
1325 }
1326
1327 /**
1328 * terminated() is called on termination
1329 */
1330 public void testTerminated() {
1331 CustomTPE p = new CustomTPE();
1332 try { p.shutdown(); } catch (SecurityException ok) { return; }
1333 assertTrue(p.terminatedCalled());
1334 joinPool(p);
1335 }
1336
1337 /**
1338 * beforeExecute and afterExecute are called when executing task
1339 */
1340 public void testBeforeAfter() throws InterruptedException {
1341 CustomTPE p = new CustomTPE();
1342 try {
1343 final CountDownLatch done = new CountDownLatch(1);
1344 p.execute(new CheckedRunnable() {
1345 public void realRun() {
1346 done.countDown();
1347 }});
1348 await(p.afterCalled);
1349 assertEquals(0, done.getCount());
1350 assertTrue(p.afterCalled());
1351 assertTrue(p.beforeCalled());
1352 try { p.shutdown(); } catch (SecurityException ok) { return; }
1353 } finally {
1354 joinPool(p);
1355 }
1356 }
1357
1358 /**
1359 * completed submit of callable returns result
1360 */
1361 public void testSubmitCallable() throws Exception {
1362 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1363 try {
1364 Future<String> future = e.submit(new StringTask());
1365 String result = future.get();
1366 assertSame(TEST_STRING, result);
1367 } finally {
1368 joinPool(e);
1369 }
1370 }
1371
1372 /**
1373 * completed submit of runnable returns successfully
1374 */
1375 public void testSubmitRunnable() throws Exception {
1376 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1377 try {
1378 Future<?> future = e.submit(new NoOpRunnable());
1379 future.get();
1380 assertTrue(future.isDone());
1381 } finally {
1382 joinPool(e);
1383 }
1384 }
1385
1386 /**
1387 * completed submit of (runnable, result) returns result
1388 */
1389 public void testSubmitRunnable2() throws Exception {
1390 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1391 try {
1392 Future<String> future = e.submit(new NoOpRunnable(), TEST_STRING);
1393 String result = future.get();
1394 assertSame(TEST_STRING, result);
1395 } finally {
1396 joinPool(e);
1397 }
1398 }
1399
1400 /**
1401 * invokeAny(null) throws NPE
1402 */
1403 public void testInvokeAny1() throws Exception {
1404 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1405 try {
1406 e.invokeAny(null);
1407 shouldThrow();
1408 } catch (NullPointerException success) {
1409 } finally {
1410 joinPool(e);
1411 }
1412 }
1413
1414 /**
1415 * invokeAny(empty collection) throws IAE
1416 */
1417 public void testInvokeAny2() throws Exception {
1418 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1419 try {
1420 e.invokeAny(new ArrayList<Callable<String>>());
1421 shouldThrow();
1422 } catch (IllegalArgumentException success) {
1423 } finally {
1424 joinPool(e);
1425 }
1426 }
1427
1428 /**
1429 * invokeAny(c) throws NPE if c has null elements
1430 */
1431 public void testInvokeAny3() throws Exception {
1432 CountDownLatch latch = new CountDownLatch(1);
1433 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1434 List<Callable<String>> l = new ArrayList<Callable<String>>();
1435 l.add(latchAwaitingStringTask(latch));
1436 l.add(null);
1437 try {
1438 e.invokeAny(l);
1439 shouldThrow();
1440 } catch (NullPointerException success) {
1441 } finally {
1442 latch.countDown();
1443 joinPool(e);
1444 }
1445 }
1446
1447 /**
1448 * invokeAny(c) throws ExecutionException if no task completes
1449 */
1450 public void testInvokeAny4() throws Exception {
1451 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1452 List<Callable<String>> l = new ArrayList<Callable<String>>();
1453 l.add(new NPETask());
1454 try {
1455 e.invokeAny(l);
1456 shouldThrow();
1457 } catch (ExecutionException success) {
1458 assertTrue(success.getCause() instanceof NullPointerException);
1459 } finally {
1460 joinPool(e);
1461 }
1462 }
1463
1464 /**
1465 * invokeAny(c) returns result of some task
1466 */
1467 public void testInvokeAny5() throws Exception {
1468 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1469 try {
1470 List<Callable<String>> l = new ArrayList<Callable<String>>();
1471 l.add(new StringTask());
1472 l.add(new StringTask());
1473 String result = e.invokeAny(l);
1474 assertSame(TEST_STRING, result);
1475 } finally {
1476 joinPool(e);
1477 }
1478 }
1479
1480 /**
1481 * invokeAll(null) throws NPE
1482 */
1483 public void testInvokeAll1() throws Exception {
1484 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1485 try {
1486 e.invokeAll(null);
1487 shouldThrow();
1488 } catch (NullPointerException success) {
1489 } finally {
1490 joinPool(e);
1491 }
1492 }
1493
1494 /**
1495 * invokeAll(empty collection) returns empty collection
1496 */
1497 public void testInvokeAll2() throws Exception {
1498 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1499 try {
1500 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>());
1501 assertTrue(r.isEmpty());
1502 } finally {
1503 joinPool(e);
1504 }
1505 }
1506
1507 /**
1508 * invokeAll(c) throws NPE if c has null elements
1509 */
1510 public void testInvokeAll3() throws Exception {
1511 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1512 List<Callable<String>> l = new ArrayList<Callable<String>>();
1513 l.add(new StringTask());
1514 l.add(null);
1515 try {
1516 e.invokeAll(l);
1517 shouldThrow();
1518 } catch (NullPointerException success) {
1519 } finally {
1520 joinPool(e);
1521 }
1522 }
1523
1524 /**
1525 * get of element of invokeAll(c) throws exception on failed task
1526 */
1527 public void testInvokeAll4() throws Exception {
1528 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1529 List<Callable<String>> l = new ArrayList<Callable<String>>();
1530 l.add(new NPETask());
1531 List<Future<String>> futures = e.invokeAll(l);
1532 assertEquals(1, futures.size());
1533 try {
1534 futures.get(0).get();
1535 shouldThrow();
1536 } catch (ExecutionException success) {
1537 assertTrue(success.getCause() instanceof NullPointerException);
1538 } finally {
1539 joinPool(e);
1540 }
1541 }
1542
1543 /**
1544 * invokeAll(c) returns results of all completed tasks
1545 */
1546 public void testInvokeAll5() throws Exception {
1547 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1548 try {
1549 List<Callable<String>> l = new ArrayList<Callable<String>>();
1550 l.add(new StringTask());
1551 l.add(new StringTask());
1552 List<Future<String>> futures = e.invokeAll(l);
1553 assertEquals(2, futures.size());
1554 for (Future<String> future : futures)
1555 assertSame(TEST_STRING, future.get());
1556 } finally {
1557 joinPool(e);
1558 }
1559 }
1560
1561 /**
1562 * timed invokeAny(null) throws NPE
1563 */
1564 public void testTimedInvokeAny1() throws Exception {
1565 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1566 try {
1567 e.invokeAny(null, MEDIUM_DELAY_MS, MILLISECONDS);
1568 shouldThrow();
1569 } catch (NullPointerException success) {
1570 } finally {
1571 joinPool(e);
1572 }
1573 }
1574
1575 /**
1576 * timed invokeAny(,,null) throws NPE
1577 */
1578 public void testTimedInvokeAnyNullTimeUnit() throws Exception {
1579 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1580 List<Callable<String>> l = new ArrayList<Callable<String>>();
1581 l.add(new StringTask());
1582 try {
1583 e.invokeAny(l, MEDIUM_DELAY_MS, null);
1584 shouldThrow();
1585 } catch (NullPointerException success) {
1586 } finally {
1587 joinPool(e);
1588 }
1589 }
1590
1591 /**
1592 * timed invokeAny(empty collection) throws IAE
1593 */
1594 public void testTimedInvokeAny2() throws Exception {
1595 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1596 try {
1597 e.invokeAny(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1598 shouldThrow();
1599 } catch (IllegalArgumentException success) {
1600 } finally {
1601 joinPool(e);
1602 }
1603 }
1604
1605 /**
1606 * timed invokeAny(c) throws NPE if c has null elements
1607 */
1608 public void testTimedInvokeAny3() throws Exception {
1609 CountDownLatch latch = new CountDownLatch(1);
1610 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1611 List<Callable<String>> l = new ArrayList<Callable<String>>();
1612 l.add(latchAwaitingStringTask(latch));
1613 l.add(null);
1614 try {
1615 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1616 shouldThrow();
1617 } catch (NullPointerException success) {
1618 } finally {
1619 latch.countDown();
1620 joinPool(e);
1621 }
1622 }
1623
1624 /**
1625 * timed invokeAny(c) throws ExecutionException if no task completes
1626 */
1627 public void testTimedInvokeAny4() throws Exception {
1628 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1629 List<Callable<String>> l = new ArrayList<Callable<String>>();
1630 l.add(new NPETask());
1631 try {
1632 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1633 shouldThrow();
1634 } catch (ExecutionException success) {
1635 assertTrue(success.getCause() instanceof NullPointerException);
1636 } finally {
1637 joinPool(e);
1638 }
1639 }
1640
1641 /**
1642 * timed invokeAny(c) returns result of some task
1643 */
1644 public void testTimedInvokeAny5() throws Exception {
1645 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1646 try {
1647 List<Callable<String>> l = new ArrayList<Callable<String>>();
1648 l.add(new StringTask());
1649 l.add(new StringTask());
1650 String result = e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1651 assertSame(TEST_STRING, result);
1652 } finally {
1653 joinPool(e);
1654 }
1655 }
1656
1657 /**
1658 * timed invokeAll(null) throws NPE
1659 */
1660 public void testTimedInvokeAll1() throws Exception {
1661 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1662 try {
1663 e.invokeAll(null, MEDIUM_DELAY_MS, MILLISECONDS);
1664 shouldThrow();
1665 } catch (NullPointerException success) {
1666 } finally {
1667 joinPool(e);
1668 }
1669 }
1670
1671 /**
1672 * timed invokeAll(,,null) throws NPE
1673 */
1674 public void testTimedInvokeAllNullTimeUnit() throws Exception {
1675 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1676 List<Callable<String>> l = new ArrayList<Callable<String>>();
1677 l.add(new StringTask());
1678 try {
1679 e.invokeAll(l, MEDIUM_DELAY_MS, null);
1680 shouldThrow();
1681 } catch (NullPointerException success) {
1682 } finally {
1683 joinPool(e);
1684 }
1685 }
1686
1687 /**
1688 * timed invokeAll(empty collection) returns empty collection
1689 */
1690 public void testTimedInvokeAll2() throws Exception {
1691 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1692 try {
1693 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1694 assertTrue(r.isEmpty());
1695 } finally {
1696 joinPool(e);
1697 }
1698 }
1699
1700 /**
1701 * timed invokeAll(c) throws NPE if c has null elements
1702 */
1703 public void testTimedInvokeAll3() throws Exception {
1704 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1705 List<Callable<String>> l = new ArrayList<Callable<String>>();
1706 l.add(new StringTask());
1707 l.add(null);
1708 try {
1709 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1710 shouldThrow();
1711 } catch (NullPointerException success) {
1712 } finally {
1713 joinPool(e);
1714 }
1715 }
1716
1717 /**
1718 * get of element of invokeAll(c) throws exception on failed task
1719 */
1720 public void testTimedInvokeAll4() throws Exception {
1721 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1722 List<Callable<String>> l = new ArrayList<Callable<String>>();
1723 l.add(new NPETask());
1724 List<Future<String>> futures =
1725 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1726 assertEquals(1, futures.size());
1727 try {
1728 futures.get(0).get();
1729 shouldThrow();
1730 } catch (ExecutionException success) {
1731 assertTrue(success.getCause() instanceof NullPointerException);
1732 } finally {
1733 joinPool(e);
1734 }
1735 }
1736
1737 /**
1738 * timed invokeAll(c) returns results of all completed tasks
1739 */
1740 public void testTimedInvokeAll5() throws Exception {
1741 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1742 try {
1743 List<Callable<String>> l = new ArrayList<Callable<String>>();
1744 l.add(new StringTask());
1745 l.add(new StringTask());
1746 List<Future<String>> futures =
1747 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1748 assertEquals(2, futures.size());
1749 for (Future<String> future : futures)
1750 assertSame(TEST_STRING, future.get());
1751 } finally {
1752 joinPool(e);
1753 }
1754 }
1755
1756 /**
1757 * timed invokeAll(c) cancels tasks not completed by timeout
1758 */
1759 public void testTimedInvokeAll6() throws Exception {
1760 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1761 try {
1762 for (long timeout = timeoutMillis();;) {
1763 List<Callable<String>> tasks = new ArrayList<>();
1764 tasks.add(new StringTask("0"));
1765 tasks.add(Executors.callable(new LongPossiblyInterruptedRunnable(), TEST_STRING));
1766 tasks.add(new StringTask("2"));
1767 long startTime = System.nanoTime();
1768 List<Future<String>> futures =
1769 e.invokeAll(tasks, timeout, MILLISECONDS);
1770 assertEquals(tasks.size(), futures.size());
1771 assertTrue(millisElapsedSince(startTime) >= timeout);
1772 for (Future future : futures)
1773 assertTrue(future.isDone());
1774 assertTrue(futures.get(1).isCancelled());
1775 try {
1776 assertEquals("0", futures.get(0).get());
1777 assertEquals("2", futures.get(2).get());
1778 break;
1779 } catch (CancellationException retryWithLongerTimeout) {
1780 timeout *= 2;
1781 if (timeout >= LONG_DELAY_MS / 2)
1782 fail("expected exactly one task to be cancelled");
1783 }
1784 }
1785 } finally {
1786 joinPool(e);
1787 }
1788 }
1789
1790 /**
1791 * Execution continues if there is at least one thread even if
1792 * thread factory fails to create more
1793 */
1794 public void testFailingThreadFactory() throws InterruptedException {
1795 final ExecutorService e =
1796 new CustomTPE(100, 100,
1797 LONG_DELAY_MS, MILLISECONDS,
1798 new LinkedBlockingQueue<Runnable>(),
1799 new FailingThreadFactory());
1800 try {
1801 final int TASKS = 100;
1802 final CountDownLatch done = new CountDownLatch(TASKS);
1803 for (int k = 0; k < TASKS; ++k)
1804 e.execute(new CheckedRunnable() {
1805 public void realRun() {
1806 done.countDown();
1807 }});
1808 assertTrue(done.await(LONG_DELAY_MS, MILLISECONDS));
1809 } finally {
1810 joinPool(e);
1811 }
1812 }
1813
1814 /**
1815 * allowsCoreThreadTimeOut is by default false.
1816 */
1817 public void testAllowsCoreThreadTimeOut() {
1818 ThreadPoolExecutor p = new CustomTPE(2, 2, 1000, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1819 assertFalse(p.allowsCoreThreadTimeOut());
1820 joinPool(p);
1821 }
1822
1823 /**
1824 * allowCoreThreadTimeOut(true) causes idle threads to time out
1825 */
1826 public void testAllowCoreThreadTimeOut_true() throws Exception {
1827 long keepAliveTime = timeoutMillis();
1828 final ThreadPoolExecutor p =
1829 new CustomTPE(2, 10,
1830 keepAliveTime, MILLISECONDS,
1831 new ArrayBlockingQueue<Runnable>(10));
1832 final CountDownLatch threadStarted = new CountDownLatch(1);
1833 try {
1834 p.allowCoreThreadTimeOut(true);
1835 p.execute(new CheckedRunnable() {
1836 public void realRun() {
1837 threadStarted.countDown();
1838 assertEquals(1, p.getPoolSize());
1839 }});
1840 await(threadStarted);
1841 delay(keepAliveTime);
1842 long startTime = System.nanoTime();
1843 while (p.getPoolSize() > 0
1844 && millisElapsedSince(startTime) < LONG_DELAY_MS)
1845 Thread.yield();
1846 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1847 assertEquals(0, p.getPoolSize());
1848 } finally {
1849 joinPool(p);
1850 }
1851 }
1852
1853 /**
1854 * allowCoreThreadTimeOut(false) causes idle threads not to time out
1855 */
1856 public void testAllowCoreThreadTimeOut_false() throws Exception {
1857 long keepAliveTime = timeoutMillis();
1858 final ThreadPoolExecutor p =
1859 new CustomTPE(2, 10,
1860 keepAliveTime, MILLISECONDS,
1861 new ArrayBlockingQueue<Runnable>(10));
1862 final CountDownLatch threadStarted = new CountDownLatch(1);
1863 try {
1864 p.allowCoreThreadTimeOut(false);
1865 p.execute(new CheckedRunnable() {
1866 public void realRun() throws InterruptedException {
1867 threadStarted.countDown();
1868 assertTrue(p.getPoolSize() >= 1);
1869 }});
1870 delay(2 * keepAliveTime);
1871 assertTrue(p.getPoolSize() >= 1);
1872 } finally {
1873 joinPool(p);
1874 }
1875 }
1876
1877 }