ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/ThreadPoolExecutorSubclassTest.java
Revision: 1.40
Committed: Sun Sep 27 18:50:50 2015 UTC (8 years, 7 months ago) by jsr166
Branch: MAIN
Changes since 1.39: +3 -1 lines
Log Message:
testShutdownNow: add queue-draining assertions

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include Andrew Wright, Jeffrey Hayes,
6 * Pat Fisher, Mike Judd.
7 */
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.SECONDS;
11
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.concurrent.ArrayBlockingQueue;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.CancellationException;
18 import java.util.concurrent.CountDownLatch;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.FutureTask;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.RejectedExecutionHandler;
27 import java.util.concurrent.RunnableFuture;
28 import java.util.concurrent.SynchronousQueue;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.TimeoutException;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.locks.Condition;
34 import java.util.concurrent.locks.ReentrantLock;
35
36 import junit.framework.Test;
37 import junit.framework.TestSuite;
38
39 public class ThreadPoolExecutorSubclassTest extends JSR166TestCase {
40 public static void main(String[] args) {
41 main(suite(), args);
42 }
43 public static Test suite() {
44 return new TestSuite(ThreadPoolExecutorSubclassTest.class);
45 }
46
47 static class CustomTask<V> implements RunnableFuture<V> {
48 final Callable<V> callable;
49 final ReentrantLock lock = new ReentrantLock();
50 final Condition cond = lock.newCondition();
51 boolean done;
52 boolean cancelled;
53 V result;
54 Thread thread;
55 Exception exception;
56 CustomTask(Callable<V> c) {
57 if (c == null) throw new NullPointerException();
58 callable = c;
59 }
60 CustomTask(final Runnable r, final V res) {
61 if (r == null) throw new NullPointerException();
62 callable = new Callable<V>() {
63 public V call() throws Exception { r.run(); return res; }};
64 }
65 public boolean isDone() {
66 lock.lock(); try { return done; } finally { lock.unlock() ; }
67 }
68 public boolean isCancelled() {
69 lock.lock(); try { return cancelled; } finally { lock.unlock() ; }
70 }
71 public boolean cancel(boolean mayInterrupt) {
72 lock.lock();
73 try {
74 if (!done) {
75 cancelled = true;
76 done = true;
77 if (mayInterrupt && thread != null)
78 thread.interrupt();
79 return true;
80 }
81 return false;
82 }
83 finally { lock.unlock() ; }
84 }
85 public void run() {
86 lock.lock();
87 try {
88 if (done)
89 return;
90 thread = Thread.currentThread();
91 }
92 finally { lock.unlock() ; }
93 V v = null;
94 Exception e = null;
95 try {
96 v = callable.call();
97 }
98 catch (Exception ex) {
99 e = ex;
100 }
101 lock.lock();
102 try {
103 result = v;
104 exception = e;
105 done = true;
106 thread = null;
107 cond.signalAll();
108 }
109 finally { lock.unlock(); }
110 }
111 public V get() throws InterruptedException, ExecutionException {
112 lock.lock();
113 try {
114 while (!done)
115 cond.await();
116 if (exception != null)
117 throw new ExecutionException(exception);
118 return result;
119 }
120 finally { lock.unlock(); }
121 }
122 public V get(long timeout, TimeUnit unit)
123 throws InterruptedException, ExecutionException, TimeoutException {
124 long nanos = unit.toNanos(timeout);
125 lock.lock();
126 try {
127 for (;;) {
128 if (done) break;
129 if (nanos < 0)
130 throw new TimeoutException();
131 nanos = cond.awaitNanos(nanos);
132 }
133 if (exception != null)
134 throw new ExecutionException(exception);
135 return result;
136 }
137 finally { lock.unlock(); }
138 }
139 }
140
141 static class CustomTPE extends ThreadPoolExecutor {
142 protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
143 return new CustomTask<V>(c);
144 }
145 protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
146 return new CustomTask<V>(r, v);
147 }
148
149 CustomTPE(int corePoolSize,
150 int maximumPoolSize,
151 long keepAliveTime,
152 TimeUnit unit,
153 BlockingQueue<Runnable> workQueue) {
154 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
155 workQueue);
156 }
157 CustomTPE(int corePoolSize,
158 int maximumPoolSize,
159 long keepAliveTime,
160 TimeUnit unit,
161 BlockingQueue<Runnable> workQueue,
162 ThreadFactory threadFactory) {
163 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
164 threadFactory);
165 }
166
167 CustomTPE(int corePoolSize,
168 int maximumPoolSize,
169 long keepAliveTime,
170 TimeUnit unit,
171 BlockingQueue<Runnable> workQueue,
172 RejectedExecutionHandler handler) {
173 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
174 handler);
175 }
176 CustomTPE(int corePoolSize,
177 int maximumPoolSize,
178 long keepAliveTime,
179 TimeUnit unit,
180 BlockingQueue<Runnable> workQueue,
181 ThreadFactory threadFactory,
182 RejectedExecutionHandler handler) {
183 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
184 workQueue, threadFactory, handler);
185 }
186
187 final CountDownLatch beforeCalled = new CountDownLatch(1);
188 final CountDownLatch afterCalled = new CountDownLatch(1);
189 final CountDownLatch terminatedCalled = new CountDownLatch(1);
190
191 public CustomTPE() {
192 super(1, 1, LONG_DELAY_MS, MILLISECONDS, new SynchronousQueue<Runnable>());
193 }
194 protected void beforeExecute(Thread t, Runnable r) {
195 beforeCalled.countDown();
196 }
197 protected void afterExecute(Runnable r, Throwable t) {
198 afterCalled.countDown();
199 }
200 protected void terminated() {
201 terminatedCalled.countDown();
202 }
203
204 public boolean beforeCalled() {
205 return beforeCalled.getCount() == 0;
206 }
207 public boolean afterCalled() {
208 return afterCalled.getCount() == 0;
209 }
210 public boolean terminatedCalled() {
211 return terminatedCalled.getCount() == 0;
212 }
213 }
214
215 static class FailingThreadFactory implements ThreadFactory {
216 int calls = 0;
217 public Thread newThread(Runnable r) {
218 if (++calls > 1) return null;
219 return new Thread(r);
220 }
221 }
222
223 /**
224 * execute successfully executes a runnable
225 */
226 public void testExecute() throws InterruptedException {
227 final ThreadPoolExecutor p =
228 new CustomTPE(1, 1,
229 LONG_DELAY_MS, MILLISECONDS,
230 new ArrayBlockingQueue<Runnable>(10));
231 final CountDownLatch done = new CountDownLatch(1);
232 final Runnable task = new CheckedRunnable() {
233 public void realRun() {
234 done.countDown();
235 }};
236 try {
237 p.execute(task);
238 assertTrue(done.await(SMALL_DELAY_MS, MILLISECONDS));
239 } finally {
240 joinPool(p);
241 }
242 }
243
244 /**
245 * getActiveCount increases but doesn't overestimate, when a
246 * thread becomes active
247 */
248 public void testGetActiveCount() throws InterruptedException {
249 final ThreadPoolExecutor p =
250 new CustomTPE(2, 2,
251 LONG_DELAY_MS, MILLISECONDS,
252 new ArrayBlockingQueue<Runnable>(10));
253 final CountDownLatch threadStarted = new CountDownLatch(1);
254 final CountDownLatch done = new CountDownLatch(1);
255 try {
256 assertEquals(0, p.getActiveCount());
257 p.execute(new CheckedRunnable() {
258 public void realRun() throws InterruptedException {
259 threadStarted.countDown();
260 assertEquals(1, p.getActiveCount());
261 done.await();
262 }});
263 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
264 assertEquals(1, p.getActiveCount());
265 } finally {
266 done.countDown();
267 joinPool(p);
268 }
269 }
270
271 /**
272 * prestartCoreThread starts a thread if under corePoolSize, else doesn't
273 */
274 public void testPrestartCoreThread() {
275 ThreadPoolExecutor p = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
276 assertEquals(0, p.getPoolSize());
277 assertTrue(p.prestartCoreThread());
278 assertEquals(1, p.getPoolSize());
279 assertTrue(p.prestartCoreThread());
280 assertEquals(2, p.getPoolSize());
281 assertFalse(p.prestartCoreThread());
282 assertEquals(2, p.getPoolSize());
283 joinPool(p);
284 }
285
286 /**
287 * prestartAllCoreThreads starts all corePoolSize threads
288 */
289 public void testPrestartAllCoreThreads() {
290 ThreadPoolExecutor p = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
291 assertEquals(0, p.getPoolSize());
292 p.prestartAllCoreThreads();
293 assertEquals(2, p.getPoolSize());
294 p.prestartAllCoreThreads();
295 assertEquals(2, p.getPoolSize());
296 joinPool(p);
297 }
298
299 /**
300 * getCompletedTaskCount increases, but doesn't overestimate,
301 * when tasks complete
302 */
303 public void testGetCompletedTaskCount() throws InterruptedException {
304 final ThreadPoolExecutor p =
305 new CustomTPE(2, 2,
306 LONG_DELAY_MS, MILLISECONDS,
307 new ArrayBlockingQueue<Runnable>(10));
308 final CountDownLatch threadStarted = new CountDownLatch(1);
309 final CountDownLatch threadProceed = new CountDownLatch(1);
310 final CountDownLatch threadDone = new CountDownLatch(1);
311 try {
312 assertEquals(0, p.getCompletedTaskCount());
313 p.execute(new CheckedRunnable() {
314 public void realRun() throws InterruptedException {
315 threadStarted.countDown();
316 assertEquals(0, p.getCompletedTaskCount());
317 threadProceed.await();
318 threadDone.countDown();
319 }});
320 await(threadStarted);
321 assertEquals(0, p.getCompletedTaskCount());
322 threadProceed.countDown();
323 threadDone.await();
324 long startTime = System.nanoTime();
325 while (p.getCompletedTaskCount() != 1) {
326 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
327 fail("timed out");
328 Thread.yield();
329 }
330 } finally {
331 joinPool(p);
332 }
333 }
334
335 /**
336 * getCorePoolSize returns size given in constructor if not otherwise set
337 */
338 public void testGetCorePoolSize() {
339 ThreadPoolExecutor p = new CustomTPE(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
340 assertEquals(1, p.getCorePoolSize());
341 joinPool(p);
342 }
343
344 /**
345 * getKeepAliveTime returns value given in constructor if not otherwise set
346 */
347 public void testGetKeepAliveTime() {
348 ThreadPoolExecutor p = new CustomTPE(2, 2, 1000, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
349 assertEquals(1, p.getKeepAliveTime(SECONDS));
350 joinPool(p);
351 }
352
353 /**
354 * getThreadFactory returns factory in constructor if not set
355 */
356 public void testGetThreadFactory() {
357 ThreadFactory tf = new SimpleThreadFactory();
358 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), tf, new NoOpREHandler());
359 assertSame(tf, p.getThreadFactory());
360 joinPool(p);
361 }
362
363 /**
364 * setThreadFactory sets the thread factory returned by getThreadFactory
365 */
366 public void testSetThreadFactory() {
367 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
368 ThreadFactory tf = new SimpleThreadFactory();
369 p.setThreadFactory(tf);
370 assertSame(tf, p.getThreadFactory());
371 joinPool(p);
372 }
373
374 /**
375 * setThreadFactory(null) throws NPE
376 */
377 public void testSetThreadFactoryNull() {
378 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
379 try {
380 p.setThreadFactory(null);
381 shouldThrow();
382 } catch (NullPointerException success) {
383 } finally {
384 joinPool(p);
385 }
386 }
387
388 /**
389 * getRejectedExecutionHandler returns handler in constructor if not set
390 */
391 public void testGetRejectedExecutionHandler() {
392 RejectedExecutionHandler h = new NoOpREHandler();
393 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), h);
394 assertSame(h, p.getRejectedExecutionHandler());
395 joinPool(p);
396 }
397
398 /**
399 * setRejectedExecutionHandler sets the handler returned by
400 * getRejectedExecutionHandler
401 */
402 public void testSetRejectedExecutionHandler() {
403 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
404 RejectedExecutionHandler h = new NoOpREHandler();
405 p.setRejectedExecutionHandler(h);
406 assertSame(h, p.getRejectedExecutionHandler());
407 joinPool(p);
408 }
409
410 /**
411 * setRejectedExecutionHandler(null) throws NPE
412 */
413 public void testSetRejectedExecutionHandlerNull() {
414 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
415 try {
416 p.setRejectedExecutionHandler(null);
417 shouldThrow();
418 } catch (NullPointerException success) {
419 } finally {
420 joinPool(p);
421 }
422 }
423
424 /**
425 * getLargestPoolSize increases, but doesn't overestimate, when
426 * multiple threads active
427 */
428 public void testGetLargestPoolSize() throws InterruptedException {
429 final int THREADS = 3;
430 final ThreadPoolExecutor p =
431 new CustomTPE(THREADS, THREADS,
432 LONG_DELAY_MS, MILLISECONDS,
433 new ArrayBlockingQueue<Runnable>(10));
434 final CountDownLatch threadsStarted = new CountDownLatch(THREADS);
435 final CountDownLatch done = new CountDownLatch(1);
436 try {
437 assertEquals(0, p.getLargestPoolSize());
438 for (int i = 0; i < THREADS; i++)
439 p.execute(new CheckedRunnable() {
440 public void realRun() throws InterruptedException {
441 threadsStarted.countDown();
442 done.await();
443 assertEquals(THREADS, p.getLargestPoolSize());
444 }});
445 assertTrue(threadsStarted.await(SMALL_DELAY_MS, MILLISECONDS));
446 assertEquals(THREADS, p.getLargestPoolSize());
447 } finally {
448 done.countDown();
449 joinPool(p);
450 assertEquals(THREADS, p.getLargestPoolSize());
451 }
452 }
453
454 /**
455 * getMaximumPoolSize returns value given in constructor if not
456 * otherwise set
457 */
458 public void testGetMaximumPoolSize() {
459 ThreadPoolExecutor p = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
460 assertEquals(2, p.getMaximumPoolSize());
461 joinPool(p);
462 }
463
464 /**
465 * getPoolSize increases, but doesn't overestimate, when threads
466 * become active
467 */
468 public void testGetPoolSize() throws InterruptedException {
469 final ThreadPoolExecutor p =
470 new CustomTPE(1, 1,
471 LONG_DELAY_MS, MILLISECONDS,
472 new ArrayBlockingQueue<Runnable>(10));
473 final CountDownLatch threadStarted = new CountDownLatch(1);
474 final CountDownLatch done = new CountDownLatch(1);
475 try {
476 assertEquals(0, p.getPoolSize());
477 p.execute(new CheckedRunnable() {
478 public void realRun() throws InterruptedException {
479 threadStarted.countDown();
480 assertEquals(1, p.getPoolSize());
481 done.await();
482 }});
483 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
484 assertEquals(1, p.getPoolSize());
485 } finally {
486 done.countDown();
487 joinPool(p);
488 }
489 }
490
491 /**
492 * getTaskCount increases, but doesn't overestimate, when tasks submitted
493 */
494 public void testGetTaskCount() throws InterruptedException {
495 final ThreadPoolExecutor p =
496 new CustomTPE(1, 1,
497 LONG_DELAY_MS, MILLISECONDS,
498 new ArrayBlockingQueue<Runnable>(10));
499 final CountDownLatch threadStarted = new CountDownLatch(1);
500 final CountDownLatch done = new CountDownLatch(1);
501 try {
502 assertEquals(0, p.getTaskCount());
503 p.execute(new CheckedRunnable() {
504 public void realRun() throws InterruptedException {
505 threadStarted.countDown();
506 assertEquals(1, p.getTaskCount());
507 done.await();
508 }});
509 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
510 assertEquals(1, p.getTaskCount());
511 } finally {
512 done.countDown();
513 joinPool(p);
514 }
515 }
516
517 /**
518 * isShutdown is false before shutdown, true after
519 */
520 public void testIsShutdown() {
521
522 ThreadPoolExecutor p = new CustomTPE(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
523 assertFalse(p.isShutdown());
524 try { p.shutdown(); } catch (SecurityException ok) { return; }
525 assertTrue(p.isShutdown());
526 joinPool(p);
527 }
528
529 /**
530 * isTerminated is false before termination, true after
531 */
532 public void testIsTerminated() throws InterruptedException {
533 final ThreadPoolExecutor p =
534 new CustomTPE(1, 1,
535 LONG_DELAY_MS, MILLISECONDS,
536 new ArrayBlockingQueue<Runnable>(10));
537 final CountDownLatch threadStarted = new CountDownLatch(1);
538 final CountDownLatch done = new CountDownLatch(1);
539 try {
540 assertFalse(p.isTerminating());
541 p.execute(new CheckedRunnable() {
542 public void realRun() throws InterruptedException {
543 assertFalse(p.isTerminating());
544 threadStarted.countDown();
545 done.await();
546 }});
547 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
548 assertFalse(p.isTerminating());
549 done.countDown();
550 } finally {
551 try { p.shutdown(); } catch (SecurityException ok) { return; }
552 }
553 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
554 assertTrue(p.isTerminated());
555 assertFalse(p.isTerminating());
556 }
557
558 /**
559 * isTerminating is not true when running or when terminated
560 */
561 public void testIsTerminating() throws InterruptedException {
562 final ThreadPoolExecutor p =
563 new CustomTPE(1, 1,
564 LONG_DELAY_MS, MILLISECONDS,
565 new ArrayBlockingQueue<Runnable>(10));
566 final CountDownLatch threadStarted = new CountDownLatch(1);
567 final CountDownLatch done = new CountDownLatch(1);
568 try {
569 assertFalse(p.isTerminating());
570 p.execute(new CheckedRunnable() {
571 public void realRun() throws InterruptedException {
572 assertFalse(p.isTerminating());
573 threadStarted.countDown();
574 done.await();
575 }});
576 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
577 assertFalse(p.isTerminating());
578 done.countDown();
579 } finally {
580 try { p.shutdown(); } catch (SecurityException ok) { return; }
581 }
582 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
583 assertTrue(p.isTerminated());
584 assertFalse(p.isTerminating());
585 }
586
587 /**
588 * getQueue returns the work queue, which contains queued tasks
589 */
590 public void testGetQueue() throws InterruptedException {
591 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
592 final ThreadPoolExecutor p =
593 new CustomTPE(1, 1,
594 LONG_DELAY_MS, MILLISECONDS,
595 q);
596 final CountDownLatch threadStarted = new CountDownLatch(1);
597 final CountDownLatch done = new CountDownLatch(1);
598 try {
599 FutureTask[] tasks = new FutureTask[5];
600 for (int i = 0; i < tasks.length; i++) {
601 Callable task = new CheckedCallable<Boolean>() {
602 public Boolean realCall() throws InterruptedException {
603 threadStarted.countDown();
604 assertSame(q, p.getQueue());
605 done.await();
606 return Boolean.TRUE;
607 }};
608 tasks[i] = new FutureTask(task);
609 p.execute(tasks[i]);
610 }
611 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
612 assertSame(q, p.getQueue());
613 assertFalse(q.contains(tasks[0]));
614 assertTrue(q.contains(tasks[tasks.length - 1]));
615 assertEquals(tasks.length - 1, q.size());
616 } finally {
617 done.countDown();
618 joinPool(p);
619 }
620 }
621
622 /**
623 * remove(task) removes queued task, and fails to remove active task
624 */
625 public void testRemove() throws InterruptedException {
626 BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
627 final ThreadPoolExecutor p =
628 new CustomTPE(1, 1,
629 LONG_DELAY_MS, MILLISECONDS,
630 q);
631 Runnable[] tasks = new Runnable[6];
632 final CountDownLatch threadStarted = new CountDownLatch(1);
633 final CountDownLatch done = new CountDownLatch(1);
634 try {
635 for (int i = 0; i < tasks.length; i++) {
636 tasks[i] = new CheckedRunnable() {
637 public void realRun() throws InterruptedException {
638 threadStarted.countDown();
639 done.await();
640 }};
641 p.execute(tasks[i]);
642 }
643 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
644 assertFalse(p.remove(tasks[0]));
645 assertTrue(q.contains(tasks[4]));
646 assertTrue(q.contains(tasks[3]));
647 assertTrue(p.remove(tasks[4]));
648 assertFalse(p.remove(tasks[4]));
649 assertFalse(q.contains(tasks[4]));
650 assertTrue(q.contains(tasks[3]));
651 assertTrue(p.remove(tasks[3]));
652 assertFalse(q.contains(tasks[3]));
653 } finally {
654 done.countDown();
655 joinPool(p);
656 }
657 }
658
659 /**
660 * purge removes cancelled tasks from the queue
661 */
662 public void testPurge() throws InterruptedException {
663 final CountDownLatch threadStarted = new CountDownLatch(1);
664 final CountDownLatch done = new CountDownLatch(1);
665 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
666 final ThreadPoolExecutor p =
667 new CustomTPE(1, 1,
668 LONG_DELAY_MS, MILLISECONDS,
669 q);
670 FutureTask[] tasks = new FutureTask[5];
671 try {
672 for (int i = 0; i < tasks.length; i++) {
673 Callable task = new CheckedCallable<Boolean>() {
674 public Boolean realCall() throws InterruptedException {
675 threadStarted.countDown();
676 done.await();
677 return Boolean.TRUE;
678 }};
679 tasks[i] = new FutureTask(task);
680 p.execute(tasks[i]);
681 }
682 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
683 assertEquals(tasks.length, p.getTaskCount());
684 assertEquals(tasks.length - 1, q.size());
685 assertEquals(1L, p.getActiveCount());
686 assertEquals(0L, p.getCompletedTaskCount());
687 tasks[4].cancel(true);
688 tasks[3].cancel(false);
689 p.purge();
690 assertEquals(tasks.length - 3, q.size());
691 assertEquals(tasks.length - 2, p.getTaskCount());
692 p.purge(); // Nothing to do
693 assertEquals(tasks.length - 3, q.size());
694 assertEquals(tasks.length - 2, p.getTaskCount());
695 } finally {
696 done.countDown();
697 joinPool(p);
698 }
699 }
700
701 /**
702 * shutdownNow returns a list containing tasks that were not run,
703 * and those tasks are drained from the queue
704 */
705 public void testShutdownNow() {
706 ThreadPoolExecutor p = new CustomTPE(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
707 List l;
708 try {
709 for (int i = 0; i < 5; i++)
710 p.execute(new MediumPossiblyInterruptedRunnable());
711 }
712 finally {
713 try {
714 l = p.shutdownNow();
715 } catch (SecurityException ok) { return; }
716 }
717 assertTrue(p.isShutdown());
718 assertTrue(p.getQueue().isEmpty());
719 assertTrue(l.size() <= 4);
720 }
721
722 // Exception Tests
723
724 /**
725 * Constructor throws if corePoolSize argument is less than zero
726 */
727 public void testConstructor1() {
728 try {
729 new CustomTPE(-1, 1, 1L, SECONDS,
730 new ArrayBlockingQueue<Runnable>(10));
731 shouldThrow();
732 } catch (IllegalArgumentException success) {}
733 }
734
735 /**
736 * Constructor throws if maximumPoolSize is less than zero
737 */
738 public void testConstructor2() {
739 try {
740 new CustomTPE(1, -1, 1L, SECONDS,
741 new ArrayBlockingQueue<Runnable>(10));
742 shouldThrow();
743 } catch (IllegalArgumentException success) {}
744 }
745
746 /**
747 * Constructor throws if maximumPoolSize is equal to zero
748 */
749 public void testConstructor3() {
750 try {
751 new CustomTPE(1, 0, 1L, SECONDS,
752 new ArrayBlockingQueue<Runnable>(10));
753 shouldThrow();
754 } catch (IllegalArgumentException success) {}
755 }
756
757 /**
758 * Constructor throws if keepAliveTime is less than zero
759 */
760 public void testConstructor4() {
761 try {
762 new CustomTPE(1, 2, -1L, SECONDS,
763 new ArrayBlockingQueue<Runnable>(10));
764 shouldThrow();
765 } catch (IllegalArgumentException success) {}
766 }
767
768 /**
769 * Constructor throws if corePoolSize is greater than the maximumPoolSize
770 */
771 public void testConstructor5() {
772 try {
773 new CustomTPE(2, 1, 1L, SECONDS,
774 new ArrayBlockingQueue<Runnable>(10));
775 shouldThrow();
776 } catch (IllegalArgumentException success) {}
777 }
778
779 /**
780 * Constructor throws if workQueue is set to null
781 */
782 public void testConstructorNullPointerException() {
783 try {
784 new CustomTPE(1, 2, 1L, SECONDS, null);
785 shouldThrow();
786 } catch (NullPointerException success) {}
787 }
788
789 /**
790 * Constructor throws if corePoolSize argument is less than zero
791 */
792 public void testConstructor6() {
793 try {
794 new CustomTPE(-1, 1, 1L, SECONDS,
795 new ArrayBlockingQueue<Runnable>(10),
796 new SimpleThreadFactory());
797 shouldThrow();
798 } catch (IllegalArgumentException success) {}
799 }
800
801 /**
802 * Constructor throws if maximumPoolSize is less than zero
803 */
804 public void testConstructor7() {
805 try {
806 new CustomTPE(1,-1, 1L, SECONDS,
807 new ArrayBlockingQueue<Runnable>(10),
808 new SimpleThreadFactory());
809 shouldThrow();
810 } catch (IllegalArgumentException success) {}
811 }
812
813 /**
814 * Constructor throws if maximumPoolSize is equal to zero
815 */
816 public void testConstructor8() {
817 try {
818 new CustomTPE(1, 0, 1L, SECONDS,
819 new ArrayBlockingQueue<Runnable>(10),
820 new SimpleThreadFactory());
821 shouldThrow();
822 } catch (IllegalArgumentException success) {}
823 }
824
825 /**
826 * Constructor throws if keepAliveTime is less than zero
827 */
828 public void testConstructor9() {
829 try {
830 new CustomTPE(1, 2, -1L, SECONDS,
831 new ArrayBlockingQueue<Runnable>(10),
832 new SimpleThreadFactory());
833 shouldThrow();
834 } catch (IllegalArgumentException success) {}
835 }
836
837 /**
838 * Constructor throws if corePoolSize is greater than the maximumPoolSize
839 */
840 public void testConstructor10() {
841 try {
842 new CustomTPE(2, 1, 1L, SECONDS,
843 new ArrayBlockingQueue<Runnable>(10),
844 new SimpleThreadFactory());
845 shouldThrow();
846 } catch (IllegalArgumentException success) {}
847 }
848
849 /**
850 * Constructor throws if workQueue is set to null
851 */
852 public void testConstructorNullPointerException2() {
853 try {
854 new CustomTPE(1, 2, 1L, SECONDS, null, new SimpleThreadFactory());
855 shouldThrow();
856 } catch (NullPointerException success) {}
857 }
858
859 /**
860 * Constructor throws if threadFactory is set to null
861 */
862 public void testConstructorNullPointerException3() {
863 try {
864 new CustomTPE(1, 2, 1L, SECONDS,
865 new ArrayBlockingQueue<Runnable>(10),
866 (ThreadFactory) null);
867 shouldThrow();
868 } catch (NullPointerException success) {}
869 }
870
871 /**
872 * Constructor throws if corePoolSize argument is less than zero
873 */
874 public void testConstructor11() {
875 try {
876 new CustomTPE(-1, 1, 1L, SECONDS,
877 new ArrayBlockingQueue<Runnable>(10),
878 new NoOpREHandler());
879 shouldThrow();
880 } catch (IllegalArgumentException success) {}
881 }
882
883 /**
884 * Constructor throws if maximumPoolSize is less than zero
885 */
886 public void testConstructor12() {
887 try {
888 new CustomTPE(1, -1, 1L, SECONDS,
889 new ArrayBlockingQueue<Runnable>(10),
890 new NoOpREHandler());
891 shouldThrow();
892 } catch (IllegalArgumentException success) {}
893 }
894
895 /**
896 * Constructor throws if maximumPoolSize is equal to zero
897 */
898 public void testConstructor13() {
899 try {
900 new CustomTPE(1, 0, 1L, SECONDS,
901 new ArrayBlockingQueue<Runnable>(10),
902 new NoOpREHandler());
903 shouldThrow();
904 } catch (IllegalArgumentException success) {}
905 }
906
907 /**
908 * Constructor throws if keepAliveTime is less than zero
909 */
910 public void testConstructor14() {
911 try {
912 new CustomTPE(1, 2, -1L, SECONDS,
913 new ArrayBlockingQueue<Runnable>(10),
914 new NoOpREHandler());
915 shouldThrow();
916 } catch (IllegalArgumentException success) {}
917 }
918
919 /**
920 * Constructor throws if corePoolSize is greater than the maximumPoolSize
921 */
922 public void testConstructor15() {
923 try {
924 new CustomTPE(2, 1, 1L, SECONDS,
925 new ArrayBlockingQueue<Runnable>(10),
926 new NoOpREHandler());
927 shouldThrow();
928 } catch (IllegalArgumentException success) {}
929 }
930
931 /**
932 * Constructor throws if workQueue is set to null
933 */
934 public void testConstructorNullPointerException4() {
935 try {
936 new CustomTPE(1, 2, 1L, SECONDS,
937 null,
938 new NoOpREHandler());
939 shouldThrow();
940 } catch (NullPointerException success) {}
941 }
942
943 /**
944 * Constructor throws if handler is set to null
945 */
946 public void testConstructorNullPointerException5() {
947 try {
948 new CustomTPE(1, 2, 1L, SECONDS,
949 new ArrayBlockingQueue<Runnable>(10),
950 (RejectedExecutionHandler) null);
951 shouldThrow();
952 } catch (NullPointerException success) {}
953 }
954
955 /**
956 * Constructor throws if corePoolSize argument is less than zero
957 */
958 public void testConstructor16() {
959 try {
960 new CustomTPE(-1, 1, 1L, SECONDS,
961 new ArrayBlockingQueue<Runnable>(10),
962 new SimpleThreadFactory(),
963 new NoOpREHandler());
964 shouldThrow();
965 } catch (IllegalArgumentException success) {}
966 }
967
968 /**
969 * Constructor throws if maximumPoolSize is less than zero
970 */
971 public void testConstructor17() {
972 try {
973 new CustomTPE(1, -1, 1L, SECONDS,
974 new ArrayBlockingQueue<Runnable>(10),
975 new SimpleThreadFactory(),
976 new NoOpREHandler());
977 shouldThrow();
978 } catch (IllegalArgumentException success) {}
979 }
980
981 /**
982 * Constructor throws if maximumPoolSize is equal to zero
983 */
984 public void testConstructor18() {
985 try {
986 new CustomTPE(1, 0, 1L, SECONDS,
987 new ArrayBlockingQueue<Runnable>(10),
988 new SimpleThreadFactory(),
989 new NoOpREHandler());
990 shouldThrow();
991 } catch (IllegalArgumentException success) {}
992 }
993
994 /**
995 * Constructor throws if keepAliveTime is less than zero
996 */
997 public void testConstructor19() {
998 try {
999 new CustomTPE(1, 2, -1L, SECONDS,
1000 new ArrayBlockingQueue<Runnable>(10),
1001 new SimpleThreadFactory(),
1002 new NoOpREHandler());
1003 shouldThrow();
1004 } catch (IllegalArgumentException success) {}
1005 }
1006
1007 /**
1008 * Constructor throws if corePoolSize is greater than the maximumPoolSize
1009 */
1010 public void testConstructor20() {
1011 try {
1012 new CustomTPE(2, 1, 1L, SECONDS,
1013 new ArrayBlockingQueue<Runnable>(10),
1014 new SimpleThreadFactory(),
1015 new NoOpREHandler());
1016 shouldThrow();
1017 } catch (IllegalArgumentException success) {}
1018 }
1019
1020 /**
1021 * Constructor throws if workQueue is null
1022 */
1023 public void testConstructorNullPointerException6() {
1024 try {
1025 new CustomTPE(1, 2, 1L, SECONDS,
1026 null,
1027 new SimpleThreadFactory(),
1028 new NoOpREHandler());
1029 shouldThrow();
1030 } catch (NullPointerException success) {}
1031 }
1032
1033 /**
1034 * Constructor throws if handler is null
1035 */
1036 public void testConstructorNullPointerException7() {
1037 try {
1038 new CustomTPE(1, 2, 1L, SECONDS,
1039 new ArrayBlockingQueue<Runnable>(10),
1040 new SimpleThreadFactory(),
1041 (RejectedExecutionHandler) null);
1042 shouldThrow();
1043 } catch (NullPointerException success) {}
1044 }
1045
1046 /**
1047 * Constructor throws if ThreadFactory is null
1048 */
1049 public void testConstructorNullPointerException8() {
1050 try {
1051 new CustomTPE(1, 2, 1L, SECONDS,
1052 new ArrayBlockingQueue<Runnable>(10),
1053 (ThreadFactory) null,
1054 new NoOpREHandler());
1055 shouldThrow();
1056 } catch (NullPointerException success) {}
1057 }
1058
1059 /**
1060 * execute throws RejectedExecutionException if saturated.
1061 */
1062 public void testSaturatedExecute() {
1063 ThreadPoolExecutor p =
1064 new CustomTPE(1, 1,
1065 LONG_DELAY_MS, MILLISECONDS,
1066 new ArrayBlockingQueue<Runnable>(1));
1067 final CountDownLatch done = new CountDownLatch(1);
1068 try {
1069 Runnable task = new CheckedRunnable() {
1070 public void realRun() throws InterruptedException {
1071 done.await();
1072 }};
1073 for (int i = 0; i < 2; ++i)
1074 p.execute(task);
1075 for (int i = 0; i < 2; ++i) {
1076 try {
1077 p.execute(task);
1078 shouldThrow();
1079 } catch (RejectedExecutionException success) {}
1080 assertTrue(p.getTaskCount() <= 2);
1081 }
1082 } finally {
1083 done.countDown();
1084 joinPool(p);
1085 }
1086 }
1087
1088 /**
1089 * executor using CallerRunsPolicy runs task if saturated.
1090 */
1091 public void testSaturatedExecute2() {
1092 RejectedExecutionHandler h = new CustomTPE.CallerRunsPolicy();
1093 ThreadPoolExecutor p = new CustomTPE(1, 1,
1094 LONG_DELAY_MS, MILLISECONDS,
1095 new ArrayBlockingQueue<Runnable>(1),
1096 h);
1097 try {
1098 TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5];
1099 for (int i = 0; i < tasks.length; ++i)
1100 tasks[i] = new TrackedNoOpRunnable();
1101 TrackedLongRunnable mr = new TrackedLongRunnable();
1102 p.execute(mr);
1103 for (int i = 0; i < tasks.length; ++i)
1104 p.execute(tasks[i]);
1105 for (int i = 1; i < tasks.length; ++i)
1106 assertTrue(tasks[i].done);
1107 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1108 } finally {
1109 joinPool(p);
1110 }
1111 }
1112
1113 /**
1114 * executor using DiscardPolicy drops task if saturated.
1115 */
1116 public void testSaturatedExecute3() {
1117 RejectedExecutionHandler h = new CustomTPE.DiscardPolicy();
1118 ThreadPoolExecutor p =
1119 new CustomTPE(1, 1,
1120 LONG_DELAY_MS, MILLISECONDS,
1121 new ArrayBlockingQueue<Runnable>(1),
1122 h);
1123 try {
1124 TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5];
1125 for (int i = 0; i < tasks.length; ++i)
1126 tasks[i] = new TrackedNoOpRunnable();
1127 p.execute(new TrackedLongRunnable());
1128 for (TrackedNoOpRunnable task : tasks)
1129 p.execute(task);
1130 for (TrackedNoOpRunnable task : tasks)
1131 assertFalse(task.done);
1132 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1133 } finally {
1134 joinPool(p);
1135 }
1136 }
1137
1138 /**
1139 * executor using DiscardOldestPolicy drops oldest task if saturated.
1140 */
1141 public void testSaturatedExecute4() {
1142 RejectedExecutionHandler h = new CustomTPE.DiscardOldestPolicy();
1143 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1144 try {
1145 p.execute(new TrackedLongRunnable());
1146 TrackedLongRunnable r2 = new TrackedLongRunnable();
1147 p.execute(r2);
1148 assertTrue(p.getQueue().contains(r2));
1149 TrackedNoOpRunnable r3 = new TrackedNoOpRunnable();
1150 p.execute(r3);
1151 assertFalse(p.getQueue().contains(r2));
1152 assertTrue(p.getQueue().contains(r3));
1153 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1154 } finally {
1155 joinPool(p);
1156 }
1157 }
1158
1159 /**
1160 * execute throws RejectedExecutionException if shutdown
1161 */
1162 public void testRejectedExecutionExceptionOnShutdown() {
1163 ThreadPoolExecutor p =
1164 new CustomTPE(1,1,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(1));
1165 try { p.shutdown(); } catch (SecurityException ok) { return; }
1166 try {
1167 p.execute(new NoOpRunnable());
1168 shouldThrow();
1169 } catch (RejectedExecutionException success) {}
1170
1171 joinPool(p);
1172 }
1173
1174 /**
1175 * execute using CallerRunsPolicy drops task on shutdown
1176 */
1177 public void testCallerRunsOnShutdown() {
1178 RejectedExecutionHandler h = new CustomTPE.CallerRunsPolicy();
1179 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1180
1181 try { p.shutdown(); } catch (SecurityException ok) { return; }
1182 try {
1183 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1184 p.execute(r);
1185 assertFalse(r.done);
1186 } finally {
1187 joinPool(p);
1188 }
1189 }
1190
1191 /**
1192 * execute using DiscardPolicy drops task on shutdown
1193 */
1194 public void testDiscardOnShutdown() {
1195 RejectedExecutionHandler h = new CustomTPE.DiscardPolicy();
1196 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1197
1198 try { p.shutdown(); } catch (SecurityException ok) { return; }
1199 try {
1200 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1201 p.execute(r);
1202 assertFalse(r.done);
1203 } finally {
1204 joinPool(p);
1205 }
1206 }
1207
1208 /**
1209 * execute using DiscardOldestPolicy drops task on shutdown
1210 */
1211 public void testDiscardOldestOnShutdown() {
1212 RejectedExecutionHandler h = new CustomTPE.DiscardOldestPolicy();
1213 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1214
1215 try { p.shutdown(); } catch (SecurityException ok) { return; }
1216 try {
1217 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1218 p.execute(r);
1219 assertFalse(r.done);
1220 } finally {
1221 joinPool(p);
1222 }
1223 }
1224
1225 /**
1226 * execute(null) throws NPE
1227 */
1228 public void testExecuteNull() {
1229 ThreadPoolExecutor p =
1230 new CustomTPE(1, 2, 1L, SECONDS,
1231 new ArrayBlockingQueue<Runnable>(10));
1232 try {
1233 p.execute(null);
1234 shouldThrow();
1235 } catch (NullPointerException success) {}
1236
1237 joinPool(p);
1238 }
1239
1240 /**
1241 * setCorePoolSize of negative value throws IllegalArgumentException
1242 */
1243 public void testCorePoolSizeIllegalArgumentException() {
1244 ThreadPoolExecutor p =
1245 new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1246 try {
1247 p.setCorePoolSize(-1);
1248 shouldThrow();
1249 } catch (IllegalArgumentException success) {
1250 } finally {
1251 try { p.shutdown(); } catch (SecurityException ok) { return; }
1252 }
1253 joinPool(p);
1254 }
1255
1256 /**
1257 * setMaximumPoolSize(int) throws IllegalArgumentException
1258 * if given a value less the core pool size
1259 */
1260 public void testMaximumPoolSizeIllegalArgumentException() {
1261 ThreadPoolExecutor p =
1262 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1263 try {
1264 p.setMaximumPoolSize(1);
1265 shouldThrow();
1266 } catch (IllegalArgumentException success) {
1267 } finally {
1268 try { p.shutdown(); } catch (SecurityException ok) { return; }
1269 }
1270 joinPool(p);
1271 }
1272
1273 /**
1274 * setMaximumPoolSize throws IllegalArgumentException
1275 * if given a negative value
1276 */
1277 public void testMaximumPoolSizeIllegalArgumentException2() {
1278 ThreadPoolExecutor p =
1279 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1280 try {
1281 p.setMaximumPoolSize(-1);
1282 shouldThrow();
1283 } catch (IllegalArgumentException success) {
1284 } finally {
1285 try { p.shutdown(); } catch (SecurityException ok) { return; }
1286 }
1287 joinPool(p);
1288 }
1289
1290 /**
1291 * setKeepAliveTime throws IllegalArgumentException
1292 * when given a negative value
1293 */
1294 public void testKeepAliveTimeIllegalArgumentException() {
1295 ThreadPoolExecutor p =
1296 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1297
1298 try {
1299 p.setKeepAliveTime(-1,MILLISECONDS);
1300 shouldThrow();
1301 } catch (IllegalArgumentException success) {
1302 } finally {
1303 try { p.shutdown(); } catch (SecurityException ok) { return; }
1304 }
1305 joinPool(p);
1306 }
1307
1308 /**
1309 * terminated() is called on termination
1310 */
1311 public void testTerminated() {
1312 CustomTPE p = new CustomTPE();
1313 try { p.shutdown(); } catch (SecurityException ok) { return; }
1314 assertTrue(p.terminatedCalled());
1315 joinPool(p);
1316 }
1317
1318 /**
1319 * beforeExecute and afterExecute are called when executing task
1320 */
1321 public void testBeforeAfter() throws InterruptedException {
1322 CustomTPE p = new CustomTPE();
1323 try {
1324 final CountDownLatch done = new CountDownLatch(1);
1325 p.execute(new CheckedRunnable() {
1326 public void realRun() {
1327 done.countDown();
1328 }});
1329 await(p.afterCalled);
1330 assertEquals(0, done.getCount());
1331 assertTrue(p.afterCalled());
1332 assertTrue(p.beforeCalled());
1333 try { p.shutdown(); } catch (SecurityException ok) { return; }
1334 } finally {
1335 joinPool(p);
1336 }
1337 }
1338
1339 /**
1340 * completed submit of callable returns result
1341 */
1342 public void testSubmitCallable() throws Exception {
1343 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1344 try {
1345 Future<String> future = e.submit(new StringTask());
1346 String result = future.get();
1347 assertSame(TEST_STRING, result);
1348 } finally {
1349 joinPool(e);
1350 }
1351 }
1352
1353 /**
1354 * completed submit of runnable returns successfully
1355 */
1356 public void testSubmitRunnable() throws Exception {
1357 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1358 try {
1359 Future<?> future = e.submit(new NoOpRunnable());
1360 future.get();
1361 assertTrue(future.isDone());
1362 } finally {
1363 joinPool(e);
1364 }
1365 }
1366
1367 /**
1368 * completed submit of (runnable, result) returns result
1369 */
1370 public void testSubmitRunnable2() throws Exception {
1371 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1372 try {
1373 Future<String> future = e.submit(new NoOpRunnable(), TEST_STRING);
1374 String result = future.get();
1375 assertSame(TEST_STRING, result);
1376 } finally {
1377 joinPool(e);
1378 }
1379 }
1380
1381 /**
1382 * invokeAny(null) throws NPE
1383 */
1384 public void testInvokeAny1() throws Exception {
1385 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1386 try {
1387 e.invokeAny(null);
1388 shouldThrow();
1389 } catch (NullPointerException success) {
1390 } finally {
1391 joinPool(e);
1392 }
1393 }
1394
1395 /**
1396 * invokeAny(empty collection) throws IAE
1397 */
1398 public void testInvokeAny2() throws Exception {
1399 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1400 try {
1401 e.invokeAny(new ArrayList<Callable<String>>());
1402 shouldThrow();
1403 } catch (IllegalArgumentException success) {
1404 } finally {
1405 joinPool(e);
1406 }
1407 }
1408
1409 /**
1410 * invokeAny(c) throws NPE if c has null elements
1411 */
1412 public void testInvokeAny3() throws Exception {
1413 CountDownLatch latch = new CountDownLatch(1);
1414 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1415 List<Callable<String>> l = new ArrayList<Callable<String>>();
1416 l.add(latchAwaitingStringTask(latch));
1417 l.add(null);
1418 try {
1419 e.invokeAny(l);
1420 shouldThrow();
1421 } catch (NullPointerException success) {
1422 } finally {
1423 latch.countDown();
1424 joinPool(e);
1425 }
1426 }
1427
1428 /**
1429 * invokeAny(c) throws ExecutionException if no task completes
1430 */
1431 public void testInvokeAny4() throws Exception {
1432 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1433 List<Callable<String>> l = new ArrayList<Callable<String>>();
1434 l.add(new NPETask());
1435 try {
1436 e.invokeAny(l);
1437 shouldThrow();
1438 } catch (ExecutionException success) {
1439 assertTrue(success.getCause() instanceof NullPointerException);
1440 } finally {
1441 joinPool(e);
1442 }
1443 }
1444
1445 /**
1446 * invokeAny(c) returns result of some task
1447 */
1448 public void testInvokeAny5() throws Exception {
1449 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1450 try {
1451 List<Callable<String>> l = new ArrayList<Callable<String>>();
1452 l.add(new StringTask());
1453 l.add(new StringTask());
1454 String result = e.invokeAny(l);
1455 assertSame(TEST_STRING, result);
1456 } finally {
1457 joinPool(e);
1458 }
1459 }
1460
1461 /**
1462 * invokeAll(null) throws NPE
1463 */
1464 public void testInvokeAll1() throws Exception {
1465 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1466 try {
1467 e.invokeAll(null);
1468 shouldThrow();
1469 } catch (NullPointerException success) {
1470 } finally {
1471 joinPool(e);
1472 }
1473 }
1474
1475 /**
1476 * invokeAll(empty collection) returns empty collection
1477 */
1478 public void testInvokeAll2() throws Exception {
1479 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1480 try {
1481 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>());
1482 assertTrue(r.isEmpty());
1483 } finally {
1484 joinPool(e);
1485 }
1486 }
1487
1488 /**
1489 * invokeAll(c) throws NPE if c has null elements
1490 */
1491 public void testInvokeAll3() throws Exception {
1492 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1493 List<Callable<String>> l = new ArrayList<Callable<String>>();
1494 l.add(new StringTask());
1495 l.add(null);
1496 try {
1497 e.invokeAll(l);
1498 shouldThrow();
1499 } catch (NullPointerException success) {
1500 } finally {
1501 joinPool(e);
1502 }
1503 }
1504
1505 /**
1506 * get of element of invokeAll(c) throws exception on failed task
1507 */
1508 public void testInvokeAll4() throws Exception {
1509 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1510 List<Callable<String>> l = new ArrayList<Callable<String>>();
1511 l.add(new NPETask());
1512 List<Future<String>> futures = e.invokeAll(l);
1513 assertEquals(1, futures.size());
1514 try {
1515 futures.get(0).get();
1516 shouldThrow();
1517 } catch (ExecutionException success) {
1518 assertTrue(success.getCause() instanceof NullPointerException);
1519 } finally {
1520 joinPool(e);
1521 }
1522 }
1523
1524 /**
1525 * invokeAll(c) returns results of all completed tasks
1526 */
1527 public void testInvokeAll5() throws Exception {
1528 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1529 try {
1530 List<Callable<String>> l = new ArrayList<Callable<String>>();
1531 l.add(new StringTask());
1532 l.add(new StringTask());
1533 List<Future<String>> futures = e.invokeAll(l);
1534 assertEquals(2, futures.size());
1535 for (Future<String> future : futures)
1536 assertSame(TEST_STRING, future.get());
1537 } finally {
1538 joinPool(e);
1539 }
1540 }
1541
1542 /**
1543 * timed invokeAny(null) throws NPE
1544 */
1545 public void testTimedInvokeAny1() throws Exception {
1546 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1547 try {
1548 e.invokeAny(null, MEDIUM_DELAY_MS, MILLISECONDS);
1549 shouldThrow();
1550 } catch (NullPointerException success) {
1551 } finally {
1552 joinPool(e);
1553 }
1554 }
1555
1556 /**
1557 * timed invokeAny(,,null) throws NPE
1558 */
1559 public void testTimedInvokeAnyNullTimeUnit() throws Exception {
1560 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1561 List<Callable<String>> l = new ArrayList<Callable<String>>();
1562 l.add(new StringTask());
1563 try {
1564 e.invokeAny(l, MEDIUM_DELAY_MS, null);
1565 shouldThrow();
1566 } catch (NullPointerException success) {
1567 } finally {
1568 joinPool(e);
1569 }
1570 }
1571
1572 /**
1573 * timed invokeAny(empty collection) throws IAE
1574 */
1575 public void testTimedInvokeAny2() throws Exception {
1576 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1577 try {
1578 e.invokeAny(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1579 shouldThrow();
1580 } catch (IllegalArgumentException success) {
1581 } finally {
1582 joinPool(e);
1583 }
1584 }
1585
1586 /**
1587 * timed invokeAny(c) throws NPE if c has null elements
1588 */
1589 public void testTimedInvokeAny3() throws Exception {
1590 CountDownLatch latch = new CountDownLatch(1);
1591 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1592 List<Callable<String>> l = new ArrayList<Callable<String>>();
1593 l.add(latchAwaitingStringTask(latch));
1594 l.add(null);
1595 try {
1596 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1597 shouldThrow();
1598 } catch (NullPointerException success) {
1599 } finally {
1600 latch.countDown();
1601 joinPool(e);
1602 }
1603 }
1604
1605 /**
1606 * timed invokeAny(c) throws ExecutionException if no task completes
1607 */
1608 public void testTimedInvokeAny4() throws Exception {
1609 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1610 List<Callable<String>> l = new ArrayList<Callable<String>>();
1611 l.add(new NPETask());
1612 try {
1613 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1614 shouldThrow();
1615 } catch (ExecutionException success) {
1616 assertTrue(success.getCause() instanceof NullPointerException);
1617 } finally {
1618 joinPool(e);
1619 }
1620 }
1621
1622 /**
1623 * timed invokeAny(c) returns result of some task
1624 */
1625 public void testTimedInvokeAny5() throws Exception {
1626 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1627 try {
1628 List<Callable<String>> l = new ArrayList<Callable<String>>();
1629 l.add(new StringTask());
1630 l.add(new StringTask());
1631 String result = e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1632 assertSame(TEST_STRING, result);
1633 } finally {
1634 joinPool(e);
1635 }
1636 }
1637
1638 /**
1639 * timed invokeAll(null) throws NPE
1640 */
1641 public void testTimedInvokeAll1() throws Exception {
1642 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1643 try {
1644 e.invokeAll(null, MEDIUM_DELAY_MS, MILLISECONDS);
1645 shouldThrow();
1646 } catch (NullPointerException success) {
1647 } finally {
1648 joinPool(e);
1649 }
1650 }
1651
1652 /**
1653 * timed invokeAll(,,null) throws NPE
1654 */
1655 public void testTimedInvokeAllNullTimeUnit() throws Exception {
1656 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1657 List<Callable<String>> l = new ArrayList<Callable<String>>();
1658 l.add(new StringTask());
1659 try {
1660 e.invokeAll(l, MEDIUM_DELAY_MS, null);
1661 shouldThrow();
1662 } catch (NullPointerException success) {
1663 } finally {
1664 joinPool(e);
1665 }
1666 }
1667
1668 /**
1669 * timed invokeAll(empty collection) returns empty collection
1670 */
1671 public void testTimedInvokeAll2() throws Exception {
1672 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1673 try {
1674 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1675 assertTrue(r.isEmpty());
1676 } finally {
1677 joinPool(e);
1678 }
1679 }
1680
1681 /**
1682 * timed invokeAll(c) throws NPE if c has null elements
1683 */
1684 public void testTimedInvokeAll3() throws Exception {
1685 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1686 List<Callable<String>> l = new ArrayList<Callable<String>>();
1687 l.add(new StringTask());
1688 l.add(null);
1689 try {
1690 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1691 shouldThrow();
1692 } catch (NullPointerException success) {
1693 } finally {
1694 joinPool(e);
1695 }
1696 }
1697
1698 /**
1699 * get of element of invokeAll(c) throws exception on failed task
1700 */
1701 public void testTimedInvokeAll4() throws Exception {
1702 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1703 List<Callable<String>> l = new ArrayList<Callable<String>>();
1704 l.add(new NPETask());
1705 List<Future<String>> futures =
1706 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1707 assertEquals(1, futures.size());
1708 try {
1709 futures.get(0).get();
1710 shouldThrow();
1711 } catch (ExecutionException success) {
1712 assertTrue(success.getCause() instanceof NullPointerException);
1713 } finally {
1714 joinPool(e);
1715 }
1716 }
1717
1718 /**
1719 * timed invokeAll(c) returns results of all completed tasks
1720 */
1721 public void testTimedInvokeAll5() throws Exception {
1722 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1723 try {
1724 List<Callable<String>> l = new ArrayList<Callable<String>>();
1725 l.add(new StringTask());
1726 l.add(new StringTask());
1727 List<Future<String>> futures =
1728 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1729 assertEquals(2, futures.size());
1730 for (Future<String> future : futures)
1731 assertSame(TEST_STRING, future.get());
1732 } finally {
1733 joinPool(e);
1734 }
1735 }
1736
1737 /**
1738 * timed invokeAll(c) cancels tasks not completed by timeout
1739 */
1740 public void testTimedInvokeAll6() throws Exception {
1741 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1742 try {
1743 for (long timeout = timeoutMillis();;) {
1744 List<Callable<String>> tasks = new ArrayList<>();
1745 tasks.add(new StringTask("0"));
1746 tasks.add(Executors.callable(new LongPossiblyInterruptedRunnable(), TEST_STRING));
1747 tasks.add(new StringTask("2"));
1748 long startTime = System.nanoTime();
1749 List<Future<String>> futures =
1750 e.invokeAll(tasks, timeout, MILLISECONDS);
1751 assertEquals(tasks.size(), futures.size());
1752 assertTrue(millisElapsedSince(startTime) >= timeout);
1753 for (Future future : futures)
1754 assertTrue(future.isDone());
1755 assertTrue(futures.get(1).isCancelled());
1756 try {
1757 assertEquals("0", futures.get(0).get());
1758 assertEquals("2", futures.get(2).get());
1759 break;
1760 } catch (CancellationException retryWithLongerTimeout) {
1761 timeout *= 2;
1762 if (timeout >= LONG_DELAY_MS / 2)
1763 fail("expected exactly one task to be cancelled");
1764 }
1765 }
1766 } finally {
1767 joinPool(e);
1768 }
1769 }
1770
1771 /**
1772 * Execution continues if there is at least one thread even if
1773 * thread factory fails to create more
1774 */
1775 public void testFailingThreadFactory() throws InterruptedException {
1776 final ExecutorService e =
1777 new CustomTPE(100, 100,
1778 LONG_DELAY_MS, MILLISECONDS,
1779 new LinkedBlockingQueue<Runnable>(),
1780 new FailingThreadFactory());
1781 try {
1782 final int TASKS = 100;
1783 final CountDownLatch done = new CountDownLatch(TASKS);
1784 for (int k = 0; k < TASKS; ++k)
1785 e.execute(new CheckedRunnable() {
1786 public void realRun() {
1787 done.countDown();
1788 }});
1789 assertTrue(done.await(LONG_DELAY_MS, MILLISECONDS));
1790 } finally {
1791 joinPool(e);
1792 }
1793 }
1794
1795 /**
1796 * allowsCoreThreadTimeOut is by default false.
1797 */
1798 public void testAllowsCoreThreadTimeOut() {
1799 ThreadPoolExecutor p = new CustomTPE(2, 2, 1000, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1800 assertFalse(p.allowsCoreThreadTimeOut());
1801 joinPool(p);
1802 }
1803
1804 /**
1805 * allowCoreThreadTimeOut(true) causes idle threads to time out
1806 */
1807 public void testAllowCoreThreadTimeOut_true() throws Exception {
1808 long keepAliveTime = timeoutMillis();
1809 final ThreadPoolExecutor p =
1810 new CustomTPE(2, 10,
1811 keepAliveTime, MILLISECONDS,
1812 new ArrayBlockingQueue<Runnable>(10));
1813 final CountDownLatch threadStarted = new CountDownLatch(1);
1814 try {
1815 p.allowCoreThreadTimeOut(true);
1816 p.execute(new CheckedRunnable() {
1817 public void realRun() {
1818 threadStarted.countDown();
1819 assertEquals(1, p.getPoolSize());
1820 }});
1821 await(threadStarted);
1822 delay(keepAliveTime);
1823 long startTime = System.nanoTime();
1824 while (p.getPoolSize() > 0
1825 && millisElapsedSince(startTime) < LONG_DELAY_MS)
1826 Thread.yield();
1827 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1828 assertEquals(0, p.getPoolSize());
1829 } finally {
1830 joinPool(p);
1831 }
1832 }
1833
1834 /**
1835 * allowCoreThreadTimeOut(false) causes idle threads not to time out
1836 */
1837 public void testAllowCoreThreadTimeOut_false() throws Exception {
1838 long keepAliveTime = timeoutMillis();
1839 final ThreadPoolExecutor p =
1840 new CustomTPE(2, 10,
1841 keepAliveTime, MILLISECONDS,
1842 new ArrayBlockingQueue<Runnable>(10));
1843 final CountDownLatch threadStarted = new CountDownLatch(1);
1844 try {
1845 p.allowCoreThreadTimeOut(false);
1846 p.execute(new CheckedRunnable() {
1847 public void realRun() throws InterruptedException {
1848 threadStarted.countDown();
1849 assertTrue(p.getPoolSize() >= 1);
1850 }});
1851 delay(2 * keepAliveTime);
1852 assertTrue(p.getPoolSize() >= 1);
1853 } finally {
1854 joinPool(p);
1855 }
1856 }
1857
1858 }