ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/ThreadPoolExecutorSubclassTest.java
Revision: 1.41
Committed: Mon Sep 28 02:41:29 2015 UTC (8 years, 7 months ago) by jsr166
Branch: MAIN
Changes since 1.40: +27 -11 lines
Log Message:
improve tests for shutdownNow

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include Andrew Wright, Jeffrey Hayes,
6 * Pat Fisher, Mike Judd.
7 */
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.SECONDS;
11
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.concurrent.ArrayBlockingQueue;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.CancellationException;
18 import java.util.concurrent.CountDownLatch;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.FutureTask;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.RejectedExecutionHandler;
27 import java.util.concurrent.RunnableFuture;
28 import java.util.concurrent.SynchronousQueue;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.TimeoutException;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.locks.Condition;
35 import java.util.concurrent.locks.ReentrantLock;
36
37 import junit.framework.Test;
38 import junit.framework.TestSuite;
39
40 public class ThreadPoolExecutorSubclassTest extends JSR166TestCase {
41 public static void main(String[] args) {
42 main(suite(), args);
43 }
44 public static Test suite() {
45 return new TestSuite(ThreadPoolExecutorSubclassTest.class);
46 }
47
48 static class CustomTask<V> implements RunnableFuture<V> {
49 final Callable<V> callable;
50 final ReentrantLock lock = new ReentrantLock();
51 final Condition cond = lock.newCondition();
52 boolean done;
53 boolean cancelled;
54 V result;
55 Thread thread;
56 Exception exception;
57 CustomTask(Callable<V> c) {
58 if (c == null) throw new NullPointerException();
59 callable = c;
60 }
61 CustomTask(final Runnable r, final V res) {
62 if (r == null) throw new NullPointerException();
63 callable = new Callable<V>() {
64 public V call() throws Exception { r.run(); return res; }};
65 }
66 public boolean isDone() {
67 lock.lock(); try { return done; } finally { lock.unlock() ; }
68 }
69 public boolean isCancelled() {
70 lock.lock(); try { return cancelled; } finally { lock.unlock() ; }
71 }
72 public boolean cancel(boolean mayInterrupt) {
73 lock.lock();
74 try {
75 if (!done) {
76 cancelled = true;
77 done = true;
78 if (mayInterrupt && thread != null)
79 thread.interrupt();
80 return true;
81 }
82 return false;
83 }
84 finally { lock.unlock() ; }
85 }
86 public void run() {
87 lock.lock();
88 try {
89 if (done)
90 return;
91 thread = Thread.currentThread();
92 }
93 finally { lock.unlock() ; }
94 V v = null;
95 Exception e = null;
96 try {
97 v = callable.call();
98 }
99 catch (Exception ex) {
100 e = ex;
101 }
102 lock.lock();
103 try {
104 result = v;
105 exception = e;
106 done = true;
107 thread = null;
108 cond.signalAll();
109 }
110 finally { lock.unlock(); }
111 }
112 public V get() throws InterruptedException, ExecutionException {
113 lock.lock();
114 try {
115 while (!done)
116 cond.await();
117 if (exception != null)
118 throw new ExecutionException(exception);
119 return result;
120 }
121 finally { lock.unlock(); }
122 }
123 public V get(long timeout, TimeUnit unit)
124 throws InterruptedException, ExecutionException, TimeoutException {
125 long nanos = unit.toNanos(timeout);
126 lock.lock();
127 try {
128 for (;;) {
129 if (done) break;
130 if (nanos < 0)
131 throw new TimeoutException();
132 nanos = cond.awaitNanos(nanos);
133 }
134 if (exception != null)
135 throw new ExecutionException(exception);
136 return result;
137 }
138 finally { lock.unlock(); }
139 }
140 }
141
142 static class CustomTPE extends ThreadPoolExecutor {
143 protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
144 return new CustomTask<V>(c);
145 }
146 protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
147 return new CustomTask<V>(r, v);
148 }
149
150 CustomTPE(int corePoolSize,
151 int maximumPoolSize,
152 long keepAliveTime,
153 TimeUnit unit,
154 BlockingQueue<Runnable> workQueue) {
155 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
156 workQueue);
157 }
158 CustomTPE(int corePoolSize,
159 int maximumPoolSize,
160 long keepAliveTime,
161 TimeUnit unit,
162 BlockingQueue<Runnable> workQueue,
163 ThreadFactory threadFactory) {
164 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
165 threadFactory);
166 }
167
168 CustomTPE(int corePoolSize,
169 int maximumPoolSize,
170 long keepAliveTime,
171 TimeUnit unit,
172 BlockingQueue<Runnable> workQueue,
173 RejectedExecutionHandler handler) {
174 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
175 handler);
176 }
177 CustomTPE(int corePoolSize,
178 int maximumPoolSize,
179 long keepAliveTime,
180 TimeUnit unit,
181 BlockingQueue<Runnable> workQueue,
182 ThreadFactory threadFactory,
183 RejectedExecutionHandler handler) {
184 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
185 workQueue, threadFactory, handler);
186 }
187
188 final CountDownLatch beforeCalled = new CountDownLatch(1);
189 final CountDownLatch afterCalled = new CountDownLatch(1);
190 final CountDownLatch terminatedCalled = new CountDownLatch(1);
191
192 public CustomTPE() {
193 super(1, 1, LONG_DELAY_MS, MILLISECONDS, new SynchronousQueue<Runnable>());
194 }
195 protected void beforeExecute(Thread t, Runnable r) {
196 beforeCalled.countDown();
197 }
198 protected void afterExecute(Runnable r, Throwable t) {
199 afterCalled.countDown();
200 }
201 protected void terminated() {
202 terminatedCalled.countDown();
203 }
204
205 public boolean beforeCalled() {
206 return beforeCalled.getCount() == 0;
207 }
208 public boolean afterCalled() {
209 return afterCalled.getCount() == 0;
210 }
211 public boolean terminatedCalled() {
212 return terminatedCalled.getCount() == 0;
213 }
214 }
215
216 static class FailingThreadFactory implements ThreadFactory {
217 int calls = 0;
218 public Thread newThread(Runnable r) {
219 if (++calls > 1) return null;
220 return new Thread(r);
221 }
222 }
223
224 /**
225 * execute successfully executes a runnable
226 */
227 public void testExecute() throws InterruptedException {
228 final ThreadPoolExecutor p =
229 new CustomTPE(1, 1,
230 LONG_DELAY_MS, MILLISECONDS,
231 new ArrayBlockingQueue<Runnable>(10));
232 final CountDownLatch done = new CountDownLatch(1);
233 final Runnable task = new CheckedRunnable() {
234 public void realRun() {
235 done.countDown();
236 }};
237 try {
238 p.execute(task);
239 assertTrue(done.await(SMALL_DELAY_MS, MILLISECONDS));
240 } finally {
241 joinPool(p);
242 }
243 }
244
245 /**
246 * getActiveCount increases but doesn't overestimate, when a
247 * thread becomes active
248 */
249 public void testGetActiveCount() throws InterruptedException {
250 final ThreadPoolExecutor p =
251 new CustomTPE(2, 2,
252 LONG_DELAY_MS, MILLISECONDS,
253 new ArrayBlockingQueue<Runnable>(10));
254 final CountDownLatch threadStarted = new CountDownLatch(1);
255 final CountDownLatch done = new CountDownLatch(1);
256 try {
257 assertEquals(0, p.getActiveCount());
258 p.execute(new CheckedRunnable() {
259 public void realRun() throws InterruptedException {
260 threadStarted.countDown();
261 assertEquals(1, p.getActiveCount());
262 done.await();
263 }});
264 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
265 assertEquals(1, p.getActiveCount());
266 } finally {
267 done.countDown();
268 joinPool(p);
269 }
270 }
271
272 /**
273 * prestartCoreThread starts a thread if under corePoolSize, else doesn't
274 */
275 public void testPrestartCoreThread() {
276 ThreadPoolExecutor p = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
277 assertEquals(0, p.getPoolSize());
278 assertTrue(p.prestartCoreThread());
279 assertEquals(1, p.getPoolSize());
280 assertTrue(p.prestartCoreThread());
281 assertEquals(2, p.getPoolSize());
282 assertFalse(p.prestartCoreThread());
283 assertEquals(2, p.getPoolSize());
284 joinPool(p);
285 }
286
287 /**
288 * prestartAllCoreThreads starts all corePoolSize threads
289 */
290 public void testPrestartAllCoreThreads() {
291 ThreadPoolExecutor p = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
292 assertEquals(0, p.getPoolSize());
293 p.prestartAllCoreThreads();
294 assertEquals(2, p.getPoolSize());
295 p.prestartAllCoreThreads();
296 assertEquals(2, p.getPoolSize());
297 joinPool(p);
298 }
299
300 /**
301 * getCompletedTaskCount increases, but doesn't overestimate,
302 * when tasks complete
303 */
304 public void testGetCompletedTaskCount() throws InterruptedException {
305 final ThreadPoolExecutor p =
306 new CustomTPE(2, 2,
307 LONG_DELAY_MS, MILLISECONDS,
308 new ArrayBlockingQueue<Runnable>(10));
309 final CountDownLatch threadStarted = new CountDownLatch(1);
310 final CountDownLatch threadProceed = new CountDownLatch(1);
311 final CountDownLatch threadDone = new CountDownLatch(1);
312 try {
313 assertEquals(0, p.getCompletedTaskCount());
314 p.execute(new CheckedRunnable() {
315 public void realRun() throws InterruptedException {
316 threadStarted.countDown();
317 assertEquals(0, p.getCompletedTaskCount());
318 threadProceed.await();
319 threadDone.countDown();
320 }});
321 await(threadStarted);
322 assertEquals(0, p.getCompletedTaskCount());
323 threadProceed.countDown();
324 threadDone.await();
325 long startTime = System.nanoTime();
326 while (p.getCompletedTaskCount() != 1) {
327 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
328 fail("timed out");
329 Thread.yield();
330 }
331 } finally {
332 joinPool(p);
333 }
334 }
335
336 /**
337 * getCorePoolSize returns size given in constructor if not otherwise set
338 */
339 public void testGetCorePoolSize() {
340 ThreadPoolExecutor p = new CustomTPE(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
341 assertEquals(1, p.getCorePoolSize());
342 joinPool(p);
343 }
344
345 /**
346 * getKeepAliveTime returns value given in constructor if not otherwise set
347 */
348 public void testGetKeepAliveTime() {
349 ThreadPoolExecutor p = new CustomTPE(2, 2, 1000, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
350 assertEquals(1, p.getKeepAliveTime(SECONDS));
351 joinPool(p);
352 }
353
354 /**
355 * getThreadFactory returns factory in constructor if not set
356 */
357 public void testGetThreadFactory() {
358 ThreadFactory tf = new SimpleThreadFactory();
359 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), tf, new NoOpREHandler());
360 assertSame(tf, p.getThreadFactory());
361 joinPool(p);
362 }
363
364 /**
365 * setThreadFactory sets the thread factory returned by getThreadFactory
366 */
367 public void testSetThreadFactory() {
368 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
369 ThreadFactory tf = new SimpleThreadFactory();
370 p.setThreadFactory(tf);
371 assertSame(tf, p.getThreadFactory());
372 joinPool(p);
373 }
374
375 /**
376 * setThreadFactory(null) throws NPE
377 */
378 public void testSetThreadFactoryNull() {
379 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
380 try {
381 p.setThreadFactory(null);
382 shouldThrow();
383 } catch (NullPointerException success) {
384 } finally {
385 joinPool(p);
386 }
387 }
388
389 /**
390 * getRejectedExecutionHandler returns handler in constructor if not set
391 */
392 public void testGetRejectedExecutionHandler() {
393 RejectedExecutionHandler h = new NoOpREHandler();
394 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), h);
395 assertSame(h, p.getRejectedExecutionHandler());
396 joinPool(p);
397 }
398
399 /**
400 * setRejectedExecutionHandler sets the handler returned by
401 * getRejectedExecutionHandler
402 */
403 public void testSetRejectedExecutionHandler() {
404 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
405 RejectedExecutionHandler h = new NoOpREHandler();
406 p.setRejectedExecutionHandler(h);
407 assertSame(h, p.getRejectedExecutionHandler());
408 joinPool(p);
409 }
410
411 /**
412 * setRejectedExecutionHandler(null) throws NPE
413 */
414 public void testSetRejectedExecutionHandlerNull() {
415 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
416 try {
417 p.setRejectedExecutionHandler(null);
418 shouldThrow();
419 } catch (NullPointerException success) {
420 } finally {
421 joinPool(p);
422 }
423 }
424
425 /**
426 * getLargestPoolSize increases, but doesn't overestimate, when
427 * multiple threads active
428 */
429 public void testGetLargestPoolSize() throws InterruptedException {
430 final int THREADS = 3;
431 final ThreadPoolExecutor p =
432 new CustomTPE(THREADS, THREADS,
433 LONG_DELAY_MS, MILLISECONDS,
434 new ArrayBlockingQueue<Runnable>(10));
435 final CountDownLatch threadsStarted = new CountDownLatch(THREADS);
436 final CountDownLatch done = new CountDownLatch(1);
437 try {
438 assertEquals(0, p.getLargestPoolSize());
439 for (int i = 0; i < THREADS; i++)
440 p.execute(new CheckedRunnable() {
441 public void realRun() throws InterruptedException {
442 threadsStarted.countDown();
443 done.await();
444 assertEquals(THREADS, p.getLargestPoolSize());
445 }});
446 assertTrue(threadsStarted.await(SMALL_DELAY_MS, MILLISECONDS));
447 assertEquals(THREADS, p.getLargestPoolSize());
448 } finally {
449 done.countDown();
450 joinPool(p);
451 assertEquals(THREADS, p.getLargestPoolSize());
452 }
453 }
454
455 /**
456 * getMaximumPoolSize returns value given in constructor if not
457 * otherwise set
458 */
459 public void testGetMaximumPoolSize() {
460 ThreadPoolExecutor p = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
461 assertEquals(2, p.getMaximumPoolSize());
462 joinPool(p);
463 }
464
465 /**
466 * getPoolSize increases, but doesn't overestimate, when threads
467 * become active
468 */
469 public void testGetPoolSize() throws InterruptedException {
470 final ThreadPoolExecutor p =
471 new CustomTPE(1, 1,
472 LONG_DELAY_MS, MILLISECONDS,
473 new ArrayBlockingQueue<Runnable>(10));
474 final CountDownLatch threadStarted = new CountDownLatch(1);
475 final CountDownLatch done = new CountDownLatch(1);
476 try {
477 assertEquals(0, p.getPoolSize());
478 p.execute(new CheckedRunnable() {
479 public void realRun() throws InterruptedException {
480 threadStarted.countDown();
481 assertEquals(1, p.getPoolSize());
482 done.await();
483 }});
484 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
485 assertEquals(1, p.getPoolSize());
486 } finally {
487 done.countDown();
488 joinPool(p);
489 }
490 }
491
492 /**
493 * getTaskCount increases, but doesn't overestimate, when tasks submitted
494 */
495 public void testGetTaskCount() throws InterruptedException {
496 final ThreadPoolExecutor p =
497 new CustomTPE(1, 1,
498 LONG_DELAY_MS, MILLISECONDS,
499 new ArrayBlockingQueue<Runnable>(10));
500 final CountDownLatch threadStarted = new CountDownLatch(1);
501 final CountDownLatch done = new CountDownLatch(1);
502 try {
503 assertEquals(0, p.getTaskCount());
504 p.execute(new CheckedRunnable() {
505 public void realRun() throws InterruptedException {
506 threadStarted.countDown();
507 assertEquals(1, p.getTaskCount());
508 done.await();
509 }});
510 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
511 assertEquals(1, p.getTaskCount());
512 } finally {
513 done.countDown();
514 joinPool(p);
515 }
516 }
517
518 /**
519 * isShutdown is false before shutdown, true after
520 */
521 public void testIsShutdown() {
522
523 ThreadPoolExecutor p = new CustomTPE(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
524 assertFalse(p.isShutdown());
525 try { p.shutdown(); } catch (SecurityException ok) { return; }
526 assertTrue(p.isShutdown());
527 joinPool(p);
528 }
529
530 /**
531 * isTerminated is false before termination, true after
532 */
533 public void testIsTerminated() throws InterruptedException {
534 final ThreadPoolExecutor p =
535 new CustomTPE(1, 1,
536 LONG_DELAY_MS, MILLISECONDS,
537 new ArrayBlockingQueue<Runnable>(10));
538 final CountDownLatch threadStarted = new CountDownLatch(1);
539 final CountDownLatch done = new CountDownLatch(1);
540 try {
541 assertFalse(p.isTerminating());
542 p.execute(new CheckedRunnable() {
543 public void realRun() throws InterruptedException {
544 assertFalse(p.isTerminating());
545 threadStarted.countDown();
546 done.await();
547 }});
548 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
549 assertFalse(p.isTerminating());
550 done.countDown();
551 } finally {
552 try { p.shutdown(); } catch (SecurityException ok) { return; }
553 }
554 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
555 assertTrue(p.isTerminated());
556 assertFalse(p.isTerminating());
557 }
558
559 /**
560 * isTerminating is not true when running or when terminated
561 */
562 public void testIsTerminating() throws InterruptedException {
563 final ThreadPoolExecutor p =
564 new CustomTPE(1, 1,
565 LONG_DELAY_MS, MILLISECONDS,
566 new ArrayBlockingQueue<Runnable>(10));
567 final CountDownLatch threadStarted = new CountDownLatch(1);
568 final CountDownLatch done = new CountDownLatch(1);
569 try {
570 assertFalse(p.isTerminating());
571 p.execute(new CheckedRunnable() {
572 public void realRun() throws InterruptedException {
573 assertFalse(p.isTerminating());
574 threadStarted.countDown();
575 done.await();
576 }});
577 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
578 assertFalse(p.isTerminating());
579 done.countDown();
580 } finally {
581 try { p.shutdown(); } catch (SecurityException ok) { return; }
582 }
583 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
584 assertTrue(p.isTerminated());
585 assertFalse(p.isTerminating());
586 }
587
588 /**
589 * getQueue returns the work queue, which contains queued tasks
590 */
591 public void testGetQueue() throws InterruptedException {
592 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
593 final ThreadPoolExecutor p =
594 new CustomTPE(1, 1,
595 LONG_DELAY_MS, MILLISECONDS,
596 q);
597 final CountDownLatch threadStarted = new CountDownLatch(1);
598 final CountDownLatch done = new CountDownLatch(1);
599 try {
600 FutureTask[] tasks = new FutureTask[5];
601 for (int i = 0; i < tasks.length; i++) {
602 Callable task = new CheckedCallable<Boolean>() {
603 public Boolean realCall() throws InterruptedException {
604 threadStarted.countDown();
605 assertSame(q, p.getQueue());
606 done.await();
607 return Boolean.TRUE;
608 }};
609 tasks[i] = new FutureTask(task);
610 p.execute(tasks[i]);
611 }
612 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
613 assertSame(q, p.getQueue());
614 assertFalse(q.contains(tasks[0]));
615 assertTrue(q.contains(tasks[tasks.length - 1]));
616 assertEquals(tasks.length - 1, q.size());
617 } finally {
618 done.countDown();
619 joinPool(p);
620 }
621 }
622
623 /**
624 * remove(task) removes queued task, and fails to remove active task
625 */
626 public void testRemove() throws InterruptedException {
627 BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
628 final ThreadPoolExecutor p =
629 new CustomTPE(1, 1,
630 LONG_DELAY_MS, MILLISECONDS,
631 q);
632 Runnable[] tasks = new Runnable[6];
633 final CountDownLatch threadStarted = new CountDownLatch(1);
634 final CountDownLatch done = new CountDownLatch(1);
635 try {
636 for (int i = 0; i < tasks.length; i++) {
637 tasks[i] = new CheckedRunnable() {
638 public void realRun() throws InterruptedException {
639 threadStarted.countDown();
640 done.await();
641 }};
642 p.execute(tasks[i]);
643 }
644 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
645 assertFalse(p.remove(tasks[0]));
646 assertTrue(q.contains(tasks[4]));
647 assertTrue(q.contains(tasks[3]));
648 assertTrue(p.remove(tasks[4]));
649 assertFalse(p.remove(tasks[4]));
650 assertFalse(q.contains(tasks[4]));
651 assertTrue(q.contains(tasks[3]));
652 assertTrue(p.remove(tasks[3]));
653 assertFalse(q.contains(tasks[3]));
654 } finally {
655 done.countDown();
656 joinPool(p);
657 }
658 }
659
660 /**
661 * purge removes cancelled tasks from the queue
662 */
663 public void testPurge() throws InterruptedException {
664 final CountDownLatch threadStarted = new CountDownLatch(1);
665 final CountDownLatch done = new CountDownLatch(1);
666 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
667 final ThreadPoolExecutor p =
668 new CustomTPE(1, 1,
669 LONG_DELAY_MS, MILLISECONDS,
670 q);
671 FutureTask[] tasks = new FutureTask[5];
672 try {
673 for (int i = 0; i < tasks.length; i++) {
674 Callable task = new CheckedCallable<Boolean>() {
675 public Boolean realCall() throws InterruptedException {
676 threadStarted.countDown();
677 done.await();
678 return Boolean.TRUE;
679 }};
680 tasks[i] = new FutureTask(task);
681 p.execute(tasks[i]);
682 }
683 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
684 assertEquals(tasks.length, p.getTaskCount());
685 assertEquals(tasks.length - 1, q.size());
686 assertEquals(1L, p.getActiveCount());
687 assertEquals(0L, p.getCompletedTaskCount());
688 tasks[4].cancel(true);
689 tasks[3].cancel(false);
690 p.purge();
691 assertEquals(tasks.length - 3, q.size());
692 assertEquals(tasks.length - 2, p.getTaskCount());
693 p.purge(); // Nothing to do
694 assertEquals(tasks.length - 3, q.size());
695 assertEquals(tasks.length - 2, p.getTaskCount());
696 } finally {
697 done.countDown();
698 joinPool(p);
699 }
700 }
701
702 /**
703 * shutdownNow returns a list containing tasks that were not run,
704 * and those tasks are drained from the queue
705 */
706 public void testShutdownNow() throws InterruptedException {
707 final int poolSize = 2;
708 final int count = 5;
709 final AtomicInteger ran = new AtomicInteger(0);
710 ThreadPoolExecutor p =
711 new CustomTPE(poolSize, poolSize, LONG_DELAY_MS, MILLISECONDS,
712 new ArrayBlockingQueue<Runnable>(10));
713 CountDownLatch threadsStarted = new CountDownLatch(poolSize);
714 CheckedRunnable waiter = new CheckedRunnable() { public void realRun() {
715 threadsStarted.countDown();
716 try {
717 MILLISECONDS.sleep(2 * LONG_DELAY_MS);
718 } catch (InterruptedException success) {}
719 ran.getAndIncrement();
720 }};
721 for (int i = 0; i < count; i++)
722 p.execute(waiter);
723 assertTrue(threadsStarted.await(LONG_DELAY_MS, MILLISECONDS));
724 final List<Runnable> queuedTasks;
725 try {
726 queuedTasks = p.shutdownNow();
727 } catch (SecurityException ok) {
728 return; // Allowed in case test doesn't have privs
729 }
730 assertTrue(p.isShutdown());
731 assertTrue(p.getQueue().isEmpty());
732 assertEquals(count - poolSize, queuedTasks.size());
733 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
734 assertTrue(p.isTerminated());
735 assertEquals(poolSize, ran.get());
736 }
737
738 // Exception Tests
739
740 /**
741 * Constructor throws if corePoolSize argument is less than zero
742 */
743 public void testConstructor1() {
744 try {
745 new CustomTPE(-1, 1, 1L, SECONDS,
746 new ArrayBlockingQueue<Runnable>(10));
747 shouldThrow();
748 } catch (IllegalArgumentException success) {}
749 }
750
751 /**
752 * Constructor throws if maximumPoolSize is less than zero
753 */
754 public void testConstructor2() {
755 try {
756 new CustomTPE(1, -1, 1L, SECONDS,
757 new ArrayBlockingQueue<Runnable>(10));
758 shouldThrow();
759 } catch (IllegalArgumentException success) {}
760 }
761
762 /**
763 * Constructor throws if maximumPoolSize is equal to zero
764 */
765 public void testConstructor3() {
766 try {
767 new CustomTPE(1, 0, 1L, SECONDS,
768 new ArrayBlockingQueue<Runnable>(10));
769 shouldThrow();
770 } catch (IllegalArgumentException success) {}
771 }
772
773 /**
774 * Constructor throws if keepAliveTime is less than zero
775 */
776 public void testConstructor4() {
777 try {
778 new CustomTPE(1, 2, -1L, SECONDS,
779 new ArrayBlockingQueue<Runnable>(10));
780 shouldThrow();
781 } catch (IllegalArgumentException success) {}
782 }
783
784 /**
785 * Constructor throws if corePoolSize is greater than the maximumPoolSize
786 */
787 public void testConstructor5() {
788 try {
789 new CustomTPE(2, 1, 1L, SECONDS,
790 new ArrayBlockingQueue<Runnable>(10));
791 shouldThrow();
792 } catch (IllegalArgumentException success) {}
793 }
794
795 /**
796 * Constructor throws if workQueue is set to null
797 */
798 public void testConstructorNullPointerException() {
799 try {
800 new CustomTPE(1, 2, 1L, SECONDS, null);
801 shouldThrow();
802 } catch (NullPointerException success) {}
803 }
804
805 /**
806 * Constructor throws if corePoolSize argument is less than zero
807 */
808 public void testConstructor6() {
809 try {
810 new CustomTPE(-1, 1, 1L, SECONDS,
811 new ArrayBlockingQueue<Runnable>(10),
812 new SimpleThreadFactory());
813 shouldThrow();
814 } catch (IllegalArgumentException success) {}
815 }
816
817 /**
818 * Constructor throws if maximumPoolSize is less than zero
819 */
820 public void testConstructor7() {
821 try {
822 new CustomTPE(1,-1, 1L, SECONDS,
823 new ArrayBlockingQueue<Runnable>(10),
824 new SimpleThreadFactory());
825 shouldThrow();
826 } catch (IllegalArgumentException success) {}
827 }
828
829 /**
830 * Constructor throws if maximumPoolSize is equal to zero
831 */
832 public void testConstructor8() {
833 try {
834 new CustomTPE(1, 0, 1L, SECONDS,
835 new ArrayBlockingQueue<Runnable>(10),
836 new SimpleThreadFactory());
837 shouldThrow();
838 } catch (IllegalArgumentException success) {}
839 }
840
841 /**
842 * Constructor throws if keepAliveTime is less than zero
843 */
844 public void testConstructor9() {
845 try {
846 new CustomTPE(1, 2, -1L, SECONDS,
847 new ArrayBlockingQueue<Runnable>(10),
848 new SimpleThreadFactory());
849 shouldThrow();
850 } catch (IllegalArgumentException success) {}
851 }
852
853 /**
854 * Constructor throws if corePoolSize is greater than the maximumPoolSize
855 */
856 public void testConstructor10() {
857 try {
858 new CustomTPE(2, 1, 1L, SECONDS,
859 new ArrayBlockingQueue<Runnable>(10),
860 new SimpleThreadFactory());
861 shouldThrow();
862 } catch (IllegalArgumentException success) {}
863 }
864
865 /**
866 * Constructor throws if workQueue is set to null
867 */
868 public void testConstructorNullPointerException2() {
869 try {
870 new CustomTPE(1, 2, 1L, SECONDS, null, new SimpleThreadFactory());
871 shouldThrow();
872 } catch (NullPointerException success) {}
873 }
874
875 /**
876 * Constructor throws if threadFactory is set to null
877 */
878 public void testConstructorNullPointerException3() {
879 try {
880 new CustomTPE(1, 2, 1L, SECONDS,
881 new ArrayBlockingQueue<Runnable>(10),
882 (ThreadFactory) null);
883 shouldThrow();
884 } catch (NullPointerException success) {}
885 }
886
887 /**
888 * Constructor throws if corePoolSize argument is less than zero
889 */
890 public void testConstructor11() {
891 try {
892 new CustomTPE(-1, 1, 1L, SECONDS,
893 new ArrayBlockingQueue<Runnable>(10),
894 new NoOpREHandler());
895 shouldThrow();
896 } catch (IllegalArgumentException success) {}
897 }
898
899 /**
900 * Constructor throws if maximumPoolSize is less than zero
901 */
902 public void testConstructor12() {
903 try {
904 new CustomTPE(1, -1, 1L, SECONDS,
905 new ArrayBlockingQueue<Runnable>(10),
906 new NoOpREHandler());
907 shouldThrow();
908 } catch (IllegalArgumentException success) {}
909 }
910
911 /**
912 * Constructor throws if maximumPoolSize is equal to zero
913 */
914 public void testConstructor13() {
915 try {
916 new CustomTPE(1, 0, 1L, SECONDS,
917 new ArrayBlockingQueue<Runnable>(10),
918 new NoOpREHandler());
919 shouldThrow();
920 } catch (IllegalArgumentException success) {}
921 }
922
923 /**
924 * Constructor throws if keepAliveTime is less than zero
925 */
926 public void testConstructor14() {
927 try {
928 new CustomTPE(1, 2, -1L, SECONDS,
929 new ArrayBlockingQueue<Runnable>(10),
930 new NoOpREHandler());
931 shouldThrow();
932 } catch (IllegalArgumentException success) {}
933 }
934
935 /**
936 * Constructor throws if corePoolSize is greater than the maximumPoolSize
937 */
938 public void testConstructor15() {
939 try {
940 new CustomTPE(2, 1, 1L, SECONDS,
941 new ArrayBlockingQueue<Runnable>(10),
942 new NoOpREHandler());
943 shouldThrow();
944 } catch (IllegalArgumentException success) {}
945 }
946
947 /**
948 * Constructor throws if workQueue is set to null
949 */
950 public void testConstructorNullPointerException4() {
951 try {
952 new CustomTPE(1, 2, 1L, SECONDS,
953 null,
954 new NoOpREHandler());
955 shouldThrow();
956 } catch (NullPointerException success) {}
957 }
958
959 /**
960 * Constructor throws if handler is set to null
961 */
962 public void testConstructorNullPointerException5() {
963 try {
964 new CustomTPE(1, 2, 1L, SECONDS,
965 new ArrayBlockingQueue<Runnable>(10),
966 (RejectedExecutionHandler) null);
967 shouldThrow();
968 } catch (NullPointerException success) {}
969 }
970
971 /**
972 * Constructor throws if corePoolSize argument is less than zero
973 */
974 public void testConstructor16() {
975 try {
976 new CustomTPE(-1, 1, 1L, SECONDS,
977 new ArrayBlockingQueue<Runnable>(10),
978 new SimpleThreadFactory(),
979 new NoOpREHandler());
980 shouldThrow();
981 } catch (IllegalArgumentException success) {}
982 }
983
984 /**
985 * Constructor throws if maximumPoolSize is less than zero
986 */
987 public void testConstructor17() {
988 try {
989 new CustomTPE(1, -1, 1L, SECONDS,
990 new ArrayBlockingQueue<Runnable>(10),
991 new SimpleThreadFactory(),
992 new NoOpREHandler());
993 shouldThrow();
994 } catch (IllegalArgumentException success) {}
995 }
996
997 /**
998 * Constructor throws if maximumPoolSize is equal to zero
999 */
1000 public void testConstructor18() {
1001 try {
1002 new CustomTPE(1, 0, 1L, SECONDS,
1003 new ArrayBlockingQueue<Runnable>(10),
1004 new SimpleThreadFactory(),
1005 new NoOpREHandler());
1006 shouldThrow();
1007 } catch (IllegalArgumentException success) {}
1008 }
1009
1010 /**
1011 * Constructor throws if keepAliveTime is less than zero
1012 */
1013 public void testConstructor19() {
1014 try {
1015 new CustomTPE(1, 2, -1L, SECONDS,
1016 new ArrayBlockingQueue<Runnable>(10),
1017 new SimpleThreadFactory(),
1018 new NoOpREHandler());
1019 shouldThrow();
1020 } catch (IllegalArgumentException success) {}
1021 }
1022
1023 /**
1024 * Constructor throws if corePoolSize is greater than the maximumPoolSize
1025 */
1026 public void testConstructor20() {
1027 try {
1028 new CustomTPE(2, 1, 1L, SECONDS,
1029 new ArrayBlockingQueue<Runnable>(10),
1030 new SimpleThreadFactory(),
1031 new NoOpREHandler());
1032 shouldThrow();
1033 } catch (IllegalArgumentException success) {}
1034 }
1035
1036 /**
1037 * Constructor throws if workQueue is null
1038 */
1039 public void testConstructorNullPointerException6() {
1040 try {
1041 new CustomTPE(1, 2, 1L, SECONDS,
1042 null,
1043 new SimpleThreadFactory(),
1044 new NoOpREHandler());
1045 shouldThrow();
1046 } catch (NullPointerException success) {}
1047 }
1048
1049 /**
1050 * Constructor throws if handler is null
1051 */
1052 public void testConstructorNullPointerException7() {
1053 try {
1054 new CustomTPE(1, 2, 1L, SECONDS,
1055 new ArrayBlockingQueue<Runnable>(10),
1056 new SimpleThreadFactory(),
1057 (RejectedExecutionHandler) null);
1058 shouldThrow();
1059 } catch (NullPointerException success) {}
1060 }
1061
1062 /**
1063 * Constructor throws if ThreadFactory is null
1064 */
1065 public void testConstructorNullPointerException8() {
1066 try {
1067 new CustomTPE(1, 2, 1L, SECONDS,
1068 new ArrayBlockingQueue<Runnable>(10),
1069 (ThreadFactory) null,
1070 new NoOpREHandler());
1071 shouldThrow();
1072 } catch (NullPointerException success) {}
1073 }
1074
1075 /**
1076 * execute throws RejectedExecutionException if saturated.
1077 */
1078 public void testSaturatedExecute() {
1079 ThreadPoolExecutor p =
1080 new CustomTPE(1, 1,
1081 LONG_DELAY_MS, MILLISECONDS,
1082 new ArrayBlockingQueue<Runnable>(1));
1083 final CountDownLatch done = new CountDownLatch(1);
1084 try {
1085 Runnable task = new CheckedRunnable() {
1086 public void realRun() throws InterruptedException {
1087 done.await();
1088 }};
1089 for (int i = 0; i < 2; ++i)
1090 p.execute(task);
1091 for (int i = 0; i < 2; ++i) {
1092 try {
1093 p.execute(task);
1094 shouldThrow();
1095 } catch (RejectedExecutionException success) {}
1096 assertTrue(p.getTaskCount() <= 2);
1097 }
1098 } finally {
1099 done.countDown();
1100 joinPool(p);
1101 }
1102 }
1103
1104 /**
1105 * executor using CallerRunsPolicy runs task if saturated.
1106 */
1107 public void testSaturatedExecute2() {
1108 RejectedExecutionHandler h = new CustomTPE.CallerRunsPolicy();
1109 ThreadPoolExecutor p = new CustomTPE(1, 1,
1110 LONG_DELAY_MS, MILLISECONDS,
1111 new ArrayBlockingQueue<Runnable>(1),
1112 h);
1113 try {
1114 TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5];
1115 for (int i = 0; i < tasks.length; ++i)
1116 tasks[i] = new TrackedNoOpRunnable();
1117 TrackedLongRunnable mr = new TrackedLongRunnable();
1118 p.execute(mr);
1119 for (int i = 0; i < tasks.length; ++i)
1120 p.execute(tasks[i]);
1121 for (int i = 1; i < tasks.length; ++i)
1122 assertTrue(tasks[i].done);
1123 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1124 } finally {
1125 joinPool(p);
1126 }
1127 }
1128
1129 /**
1130 * executor using DiscardPolicy drops task if saturated.
1131 */
1132 public void testSaturatedExecute3() {
1133 RejectedExecutionHandler h = new CustomTPE.DiscardPolicy();
1134 ThreadPoolExecutor p =
1135 new CustomTPE(1, 1,
1136 LONG_DELAY_MS, MILLISECONDS,
1137 new ArrayBlockingQueue<Runnable>(1),
1138 h);
1139 try {
1140 TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5];
1141 for (int i = 0; i < tasks.length; ++i)
1142 tasks[i] = new TrackedNoOpRunnable();
1143 p.execute(new TrackedLongRunnable());
1144 for (TrackedNoOpRunnable task : tasks)
1145 p.execute(task);
1146 for (TrackedNoOpRunnable task : tasks)
1147 assertFalse(task.done);
1148 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1149 } finally {
1150 joinPool(p);
1151 }
1152 }
1153
1154 /**
1155 * executor using DiscardOldestPolicy drops oldest task if saturated.
1156 */
1157 public void testSaturatedExecute4() {
1158 RejectedExecutionHandler h = new CustomTPE.DiscardOldestPolicy();
1159 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1160 try {
1161 p.execute(new TrackedLongRunnable());
1162 TrackedLongRunnable r2 = new TrackedLongRunnable();
1163 p.execute(r2);
1164 assertTrue(p.getQueue().contains(r2));
1165 TrackedNoOpRunnable r3 = new TrackedNoOpRunnable();
1166 p.execute(r3);
1167 assertFalse(p.getQueue().contains(r2));
1168 assertTrue(p.getQueue().contains(r3));
1169 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1170 } finally {
1171 joinPool(p);
1172 }
1173 }
1174
1175 /**
1176 * execute throws RejectedExecutionException if shutdown
1177 */
1178 public void testRejectedExecutionExceptionOnShutdown() {
1179 ThreadPoolExecutor p =
1180 new CustomTPE(1,1,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(1));
1181 try { p.shutdown(); } catch (SecurityException ok) { return; }
1182 try {
1183 p.execute(new NoOpRunnable());
1184 shouldThrow();
1185 } catch (RejectedExecutionException success) {}
1186
1187 joinPool(p);
1188 }
1189
1190 /**
1191 * execute using CallerRunsPolicy drops task on shutdown
1192 */
1193 public void testCallerRunsOnShutdown() {
1194 RejectedExecutionHandler h = new CustomTPE.CallerRunsPolicy();
1195 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1196
1197 try { p.shutdown(); } catch (SecurityException ok) { return; }
1198 try {
1199 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1200 p.execute(r);
1201 assertFalse(r.done);
1202 } finally {
1203 joinPool(p);
1204 }
1205 }
1206
1207 /**
1208 * execute using DiscardPolicy drops task on shutdown
1209 */
1210 public void testDiscardOnShutdown() {
1211 RejectedExecutionHandler h = new CustomTPE.DiscardPolicy();
1212 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1213
1214 try { p.shutdown(); } catch (SecurityException ok) { return; }
1215 try {
1216 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1217 p.execute(r);
1218 assertFalse(r.done);
1219 } finally {
1220 joinPool(p);
1221 }
1222 }
1223
1224 /**
1225 * execute using DiscardOldestPolicy drops task on shutdown
1226 */
1227 public void testDiscardOldestOnShutdown() {
1228 RejectedExecutionHandler h = new CustomTPE.DiscardOldestPolicy();
1229 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1230
1231 try { p.shutdown(); } catch (SecurityException ok) { return; }
1232 try {
1233 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1234 p.execute(r);
1235 assertFalse(r.done);
1236 } finally {
1237 joinPool(p);
1238 }
1239 }
1240
1241 /**
1242 * execute(null) throws NPE
1243 */
1244 public void testExecuteNull() {
1245 ThreadPoolExecutor p =
1246 new CustomTPE(1, 2, 1L, SECONDS,
1247 new ArrayBlockingQueue<Runnable>(10));
1248 try {
1249 p.execute(null);
1250 shouldThrow();
1251 } catch (NullPointerException success) {}
1252
1253 joinPool(p);
1254 }
1255
1256 /**
1257 * setCorePoolSize of negative value throws IllegalArgumentException
1258 */
1259 public void testCorePoolSizeIllegalArgumentException() {
1260 ThreadPoolExecutor p =
1261 new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1262 try {
1263 p.setCorePoolSize(-1);
1264 shouldThrow();
1265 } catch (IllegalArgumentException success) {
1266 } finally {
1267 try { p.shutdown(); } catch (SecurityException ok) { return; }
1268 }
1269 joinPool(p);
1270 }
1271
1272 /**
1273 * setMaximumPoolSize(int) throws IllegalArgumentException
1274 * if given a value less the core pool size
1275 */
1276 public void testMaximumPoolSizeIllegalArgumentException() {
1277 ThreadPoolExecutor p =
1278 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1279 try {
1280 p.setMaximumPoolSize(1);
1281 shouldThrow();
1282 } catch (IllegalArgumentException success) {
1283 } finally {
1284 try { p.shutdown(); } catch (SecurityException ok) { return; }
1285 }
1286 joinPool(p);
1287 }
1288
1289 /**
1290 * setMaximumPoolSize throws IllegalArgumentException
1291 * if given a negative value
1292 */
1293 public void testMaximumPoolSizeIllegalArgumentException2() {
1294 ThreadPoolExecutor p =
1295 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1296 try {
1297 p.setMaximumPoolSize(-1);
1298 shouldThrow();
1299 } catch (IllegalArgumentException success) {
1300 } finally {
1301 try { p.shutdown(); } catch (SecurityException ok) { return; }
1302 }
1303 joinPool(p);
1304 }
1305
1306 /**
1307 * setKeepAliveTime throws IllegalArgumentException
1308 * when given a negative value
1309 */
1310 public void testKeepAliveTimeIllegalArgumentException() {
1311 ThreadPoolExecutor p =
1312 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1313
1314 try {
1315 p.setKeepAliveTime(-1,MILLISECONDS);
1316 shouldThrow();
1317 } catch (IllegalArgumentException success) {
1318 } finally {
1319 try { p.shutdown(); } catch (SecurityException ok) { return; }
1320 }
1321 joinPool(p);
1322 }
1323
1324 /**
1325 * terminated() is called on termination
1326 */
1327 public void testTerminated() {
1328 CustomTPE p = new CustomTPE();
1329 try { p.shutdown(); } catch (SecurityException ok) { return; }
1330 assertTrue(p.terminatedCalled());
1331 joinPool(p);
1332 }
1333
1334 /**
1335 * beforeExecute and afterExecute are called when executing task
1336 */
1337 public void testBeforeAfter() throws InterruptedException {
1338 CustomTPE p = new CustomTPE();
1339 try {
1340 final CountDownLatch done = new CountDownLatch(1);
1341 p.execute(new CheckedRunnable() {
1342 public void realRun() {
1343 done.countDown();
1344 }});
1345 await(p.afterCalled);
1346 assertEquals(0, done.getCount());
1347 assertTrue(p.afterCalled());
1348 assertTrue(p.beforeCalled());
1349 try { p.shutdown(); } catch (SecurityException ok) { return; }
1350 } finally {
1351 joinPool(p);
1352 }
1353 }
1354
1355 /**
1356 * completed submit of callable returns result
1357 */
1358 public void testSubmitCallable() throws Exception {
1359 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1360 try {
1361 Future<String> future = e.submit(new StringTask());
1362 String result = future.get();
1363 assertSame(TEST_STRING, result);
1364 } finally {
1365 joinPool(e);
1366 }
1367 }
1368
1369 /**
1370 * completed submit of runnable returns successfully
1371 */
1372 public void testSubmitRunnable() throws Exception {
1373 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1374 try {
1375 Future<?> future = e.submit(new NoOpRunnable());
1376 future.get();
1377 assertTrue(future.isDone());
1378 } finally {
1379 joinPool(e);
1380 }
1381 }
1382
1383 /**
1384 * completed submit of (runnable, result) returns result
1385 */
1386 public void testSubmitRunnable2() throws Exception {
1387 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1388 try {
1389 Future<String> future = e.submit(new NoOpRunnable(), TEST_STRING);
1390 String result = future.get();
1391 assertSame(TEST_STRING, result);
1392 } finally {
1393 joinPool(e);
1394 }
1395 }
1396
1397 /**
1398 * invokeAny(null) throws NPE
1399 */
1400 public void testInvokeAny1() throws Exception {
1401 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1402 try {
1403 e.invokeAny(null);
1404 shouldThrow();
1405 } catch (NullPointerException success) {
1406 } finally {
1407 joinPool(e);
1408 }
1409 }
1410
1411 /**
1412 * invokeAny(empty collection) throws IAE
1413 */
1414 public void testInvokeAny2() throws Exception {
1415 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1416 try {
1417 e.invokeAny(new ArrayList<Callable<String>>());
1418 shouldThrow();
1419 } catch (IllegalArgumentException success) {
1420 } finally {
1421 joinPool(e);
1422 }
1423 }
1424
1425 /**
1426 * invokeAny(c) throws NPE if c has null elements
1427 */
1428 public void testInvokeAny3() throws Exception {
1429 CountDownLatch latch = new CountDownLatch(1);
1430 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1431 List<Callable<String>> l = new ArrayList<Callable<String>>();
1432 l.add(latchAwaitingStringTask(latch));
1433 l.add(null);
1434 try {
1435 e.invokeAny(l);
1436 shouldThrow();
1437 } catch (NullPointerException success) {
1438 } finally {
1439 latch.countDown();
1440 joinPool(e);
1441 }
1442 }
1443
1444 /**
1445 * invokeAny(c) throws ExecutionException if no task completes
1446 */
1447 public void testInvokeAny4() throws Exception {
1448 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1449 List<Callable<String>> l = new ArrayList<Callable<String>>();
1450 l.add(new NPETask());
1451 try {
1452 e.invokeAny(l);
1453 shouldThrow();
1454 } catch (ExecutionException success) {
1455 assertTrue(success.getCause() instanceof NullPointerException);
1456 } finally {
1457 joinPool(e);
1458 }
1459 }
1460
1461 /**
1462 * invokeAny(c) returns result of some task
1463 */
1464 public void testInvokeAny5() throws Exception {
1465 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1466 try {
1467 List<Callable<String>> l = new ArrayList<Callable<String>>();
1468 l.add(new StringTask());
1469 l.add(new StringTask());
1470 String result = e.invokeAny(l);
1471 assertSame(TEST_STRING, result);
1472 } finally {
1473 joinPool(e);
1474 }
1475 }
1476
1477 /**
1478 * invokeAll(null) throws NPE
1479 */
1480 public void testInvokeAll1() throws Exception {
1481 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1482 try {
1483 e.invokeAll(null);
1484 shouldThrow();
1485 } catch (NullPointerException success) {
1486 } finally {
1487 joinPool(e);
1488 }
1489 }
1490
1491 /**
1492 * invokeAll(empty collection) returns empty collection
1493 */
1494 public void testInvokeAll2() throws Exception {
1495 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1496 try {
1497 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>());
1498 assertTrue(r.isEmpty());
1499 } finally {
1500 joinPool(e);
1501 }
1502 }
1503
1504 /**
1505 * invokeAll(c) throws NPE if c has null elements
1506 */
1507 public void testInvokeAll3() throws Exception {
1508 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1509 List<Callable<String>> l = new ArrayList<Callable<String>>();
1510 l.add(new StringTask());
1511 l.add(null);
1512 try {
1513 e.invokeAll(l);
1514 shouldThrow();
1515 } catch (NullPointerException success) {
1516 } finally {
1517 joinPool(e);
1518 }
1519 }
1520
1521 /**
1522 * get of element of invokeAll(c) throws exception on failed task
1523 */
1524 public void testInvokeAll4() throws Exception {
1525 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1526 List<Callable<String>> l = new ArrayList<Callable<String>>();
1527 l.add(new NPETask());
1528 List<Future<String>> futures = e.invokeAll(l);
1529 assertEquals(1, futures.size());
1530 try {
1531 futures.get(0).get();
1532 shouldThrow();
1533 } catch (ExecutionException success) {
1534 assertTrue(success.getCause() instanceof NullPointerException);
1535 } finally {
1536 joinPool(e);
1537 }
1538 }
1539
1540 /**
1541 * invokeAll(c) returns results of all completed tasks
1542 */
1543 public void testInvokeAll5() throws Exception {
1544 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1545 try {
1546 List<Callable<String>> l = new ArrayList<Callable<String>>();
1547 l.add(new StringTask());
1548 l.add(new StringTask());
1549 List<Future<String>> futures = e.invokeAll(l);
1550 assertEquals(2, futures.size());
1551 for (Future<String> future : futures)
1552 assertSame(TEST_STRING, future.get());
1553 } finally {
1554 joinPool(e);
1555 }
1556 }
1557
1558 /**
1559 * timed invokeAny(null) throws NPE
1560 */
1561 public void testTimedInvokeAny1() throws Exception {
1562 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1563 try {
1564 e.invokeAny(null, MEDIUM_DELAY_MS, MILLISECONDS);
1565 shouldThrow();
1566 } catch (NullPointerException success) {
1567 } finally {
1568 joinPool(e);
1569 }
1570 }
1571
1572 /**
1573 * timed invokeAny(,,null) throws NPE
1574 */
1575 public void testTimedInvokeAnyNullTimeUnit() throws Exception {
1576 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1577 List<Callable<String>> l = new ArrayList<Callable<String>>();
1578 l.add(new StringTask());
1579 try {
1580 e.invokeAny(l, MEDIUM_DELAY_MS, null);
1581 shouldThrow();
1582 } catch (NullPointerException success) {
1583 } finally {
1584 joinPool(e);
1585 }
1586 }
1587
1588 /**
1589 * timed invokeAny(empty collection) throws IAE
1590 */
1591 public void testTimedInvokeAny2() throws Exception {
1592 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1593 try {
1594 e.invokeAny(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1595 shouldThrow();
1596 } catch (IllegalArgumentException success) {
1597 } finally {
1598 joinPool(e);
1599 }
1600 }
1601
1602 /**
1603 * timed invokeAny(c) throws NPE if c has null elements
1604 */
1605 public void testTimedInvokeAny3() throws Exception {
1606 CountDownLatch latch = new CountDownLatch(1);
1607 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1608 List<Callable<String>> l = new ArrayList<Callable<String>>();
1609 l.add(latchAwaitingStringTask(latch));
1610 l.add(null);
1611 try {
1612 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1613 shouldThrow();
1614 } catch (NullPointerException success) {
1615 } finally {
1616 latch.countDown();
1617 joinPool(e);
1618 }
1619 }
1620
1621 /**
1622 * timed invokeAny(c) throws ExecutionException if no task completes
1623 */
1624 public void testTimedInvokeAny4() throws Exception {
1625 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1626 List<Callable<String>> l = new ArrayList<Callable<String>>();
1627 l.add(new NPETask());
1628 try {
1629 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1630 shouldThrow();
1631 } catch (ExecutionException success) {
1632 assertTrue(success.getCause() instanceof NullPointerException);
1633 } finally {
1634 joinPool(e);
1635 }
1636 }
1637
1638 /**
1639 * timed invokeAny(c) returns result of some task
1640 */
1641 public void testTimedInvokeAny5() throws Exception {
1642 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1643 try {
1644 List<Callable<String>> l = new ArrayList<Callable<String>>();
1645 l.add(new StringTask());
1646 l.add(new StringTask());
1647 String result = e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1648 assertSame(TEST_STRING, result);
1649 } finally {
1650 joinPool(e);
1651 }
1652 }
1653
1654 /**
1655 * timed invokeAll(null) throws NPE
1656 */
1657 public void testTimedInvokeAll1() throws Exception {
1658 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1659 try {
1660 e.invokeAll(null, MEDIUM_DELAY_MS, MILLISECONDS);
1661 shouldThrow();
1662 } catch (NullPointerException success) {
1663 } finally {
1664 joinPool(e);
1665 }
1666 }
1667
1668 /**
1669 * timed invokeAll(,,null) throws NPE
1670 */
1671 public void testTimedInvokeAllNullTimeUnit() throws Exception {
1672 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1673 List<Callable<String>> l = new ArrayList<Callable<String>>();
1674 l.add(new StringTask());
1675 try {
1676 e.invokeAll(l, MEDIUM_DELAY_MS, null);
1677 shouldThrow();
1678 } catch (NullPointerException success) {
1679 } finally {
1680 joinPool(e);
1681 }
1682 }
1683
1684 /**
1685 * timed invokeAll(empty collection) returns empty collection
1686 */
1687 public void testTimedInvokeAll2() throws Exception {
1688 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1689 try {
1690 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1691 assertTrue(r.isEmpty());
1692 } finally {
1693 joinPool(e);
1694 }
1695 }
1696
1697 /**
1698 * timed invokeAll(c) throws NPE if c has null elements
1699 */
1700 public void testTimedInvokeAll3() throws Exception {
1701 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1702 List<Callable<String>> l = new ArrayList<Callable<String>>();
1703 l.add(new StringTask());
1704 l.add(null);
1705 try {
1706 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1707 shouldThrow();
1708 } catch (NullPointerException success) {
1709 } finally {
1710 joinPool(e);
1711 }
1712 }
1713
1714 /**
1715 * get of element of invokeAll(c) throws exception on failed task
1716 */
1717 public void testTimedInvokeAll4() throws Exception {
1718 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1719 List<Callable<String>> l = new ArrayList<Callable<String>>();
1720 l.add(new NPETask());
1721 List<Future<String>> futures =
1722 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1723 assertEquals(1, futures.size());
1724 try {
1725 futures.get(0).get();
1726 shouldThrow();
1727 } catch (ExecutionException success) {
1728 assertTrue(success.getCause() instanceof NullPointerException);
1729 } finally {
1730 joinPool(e);
1731 }
1732 }
1733
1734 /**
1735 * timed invokeAll(c) returns results of all completed tasks
1736 */
1737 public void testTimedInvokeAll5() throws Exception {
1738 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1739 try {
1740 List<Callable<String>> l = new ArrayList<Callable<String>>();
1741 l.add(new StringTask());
1742 l.add(new StringTask());
1743 List<Future<String>> futures =
1744 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1745 assertEquals(2, futures.size());
1746 for (Future<String> future : futures)
1747 assertSame(TEST_STRING, future.get());
1748 } finally {
1749 joinPool(e);
1750 }
1751 }
1752
1753 /**
1754 * timed invokeAll(c) cancels tasks not completed by timeout
1755 */
1756 public void testTimedInvokeAll6() throws Exception {
1757 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1758 try {
1759 for (long timeout = timeoutMillis();;) {
1760 List<Callable<String>> tasks = new ArrayList<>();
1761 tasks.add(new StringTask("0"));
1762 tasks.add(Executors.callable(new LongPossiblyInterruptedRunnable(), TEST_STRING));
1763 tasks.add(new StringTask("2"));
1764 long startTime = System.nanoTime();
1765 List<Future<String>> futures =
1766 e.invokeAll(tasks, timeout, MILLISECONDS);
1767 assertEquals(tasks.size(), futures.size());
1768 assertTrue(millisElapsedSince(startTime) >= timeout);
1769 for (Future future : futures)
1770 assertTrue(future.isDone());
1771 assertTrue(futures.get(1).isCancelled());
1772 try {
1773 assertEquals("0", futures.get(0).get());
1774 assertEquals("2", futures.get(2).get());
1775 break;
1776 } catch (CancellationException retryWithLongerTimeout) {
1777 timeout *= 2;
1778 if (timeout >= LONG_DELAY_MS / 2)
1779 fail("expected exactly one task to be cancelled");
1780 }
1781 }
1782 } finally {
1783 joinPool(e);
1784 }
1785 }
1786
1787 /**
1788 * Execution continues if there is at least one thread even if
1789 * thread factory fails to create more
1790 */
1791 public void testFailingThreadFactory() throws InterruptedException {
1792 final ExecutorService e =
1793 new CustomTPE(100, 100,
1794 LONG_DELAY_MS, MILLISECONDS,
1795 new LinkedBlockingQueue<Runnable>(),
1796 new FailingThreadFactory());
1797 try {
1798 final int TASKS = 100;
1799 final CountDownLatch done = new CountDownLatch(TASKS);
1800 for (int k = 0; k < TASKS; ++k)
1801 e.execute(new CheckedRunnable() {
1802 public void realRun() {
1803 done.countDown();
1804 }});
1805 assertTrue(done.await(LONG_DELAY_MS, MILLISECONDS));
1806 } finally {
1807 joinPool(e);
1808 }
1809 }
1810
1811 /**
1812 * allowsCoreThreadTimeOut is by default false.
1813 */
1814 public void testAllowsCoreThreadTimeOut() {
1815 ThreadPoolExecutor p = new CustomTPE(2, 2, 1000, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1816 assertFalse(p.allowsCoreThreadTimeOut());
1817 joinPool(p);
1818 }
1819
1820 /**
1821 * allowCoreThreadTimeOut(true) causes idle threads to time out
1822 */
1823 public void testAllowCoreThreadTimeOut_true() throws Exception {
1824 long keepAliveTime = timeoutMillis();
1825 final ThreadPoolExecutor p =
1826 new CustomTPE(2, 10,
1827 keepAliveTime, MILLISECONDS,
1828 new ArrayBlockingQueue<Runnable>(10));
1829 final CountDownLatch threadStarted = new CountDownLatch(1);
1830 try {
1831 p.allowCoreThreadTimeOut(true);
1832 p.execute(new CheckedRunnable() {
1833 public void realRun() {
1834 threadStarted.countDown();
1835 assertEquals(1, p.getPoolSize());
1836 }});
1837 await(threadStarted);
1838 delay(keepAliveTime);
1839 long startTime = System.nanoTime();
1840 while (p.getPoolSize() > 0
1841 && millisElapsedSince(startTime) < LONG_DELAY_MS)
1842 Thread.yield();
1843 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1844 assertEquals(0, p.getPoolSize());
1845 } finally {
1846 joinPool(p);
1847 }
1848 }
1849
1850 /**
1851 * allowCoreThreadTimeOut(false) causes idle threads not to time out
1852 */
1853 public void testAllowCoreThreadTimeOut_false() throws Exception {
1854 long keepAliveTime = timeoutMillis();
1855 final ThreadPoolExecutor p =
1856 new CustomTPE(2, 10,
1857 keepAliveTime, MILLISECONDS,
1858 new ArrayBlockingQueue<Runnable>(10));
1859 final CountDownLatch threadStarted = new CountDownLatch(1);
1860 try {
1861 p.allowCoreThreadTimeOut(false);
1862 p.execute(new CheckedRunnable() {
1863 public void realRun() throws InterruptedException {
1864 threadStarted.countDown();
1865 assertTrue(p.getPoolSize() >= 1);
1866 }});
1867 delay(2 * keepAliveTime);
1868 assertTrue(p.getPoolSize() >= 1);
1869 } finally {
1870 joinPool(p);
1871 }
1872 }
1873
1874 }