ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/ThreadPoolExecutorSubclassTest.java
Revision: 1.71
Committed: Sun Oct 4 02:34:48 2015 UTC (8 years, 7 months ago) by jsr166
Branch: MAIN
Changes since 1.70: +3 -5 lines
Log Message:
improve testGetQueue

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include Andrew Wright, Jeffrey Hayes,
6 * Pat Fisher, Mike Judd.
7 */
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.SECONDS;
11
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.concurrent.ArrayBlockingQueue;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.CancellationException;
18 import java.util.concurrent.CountDownLatch;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.FutureTask;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.RejectedExecutionHandler;
27 import java.util.concurrent.RunnableFuture;
28 import java.util.concurrent.SynchronousQueue;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.TimeoutException;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.locks.Condition;
35 import java.util.concurrent.locks.ReentrantLock;
36
37 import junit.framework.Test;
38 import junit.framework.TestSuite;
39
40 public class ThreadPoolExecutorSubclassTest extends JSR166TestCase {
41 public static void main(String[] args) {
42 main(suite(), args);
43 }
44 public static Test suite() {
45 return new TestSuite(ThreadPoolExecutorSubclassTest.class);
46 }
47
48 static class CustomTask<V> implements RunnableFuture<V> {
49 final Callable<V> callable;
50 final ReentrantLock lock = new ReentrantLock();
51 final Condition cond = lock.newCondition();
52 boolean done;
53 boolean cancelled;
54 V result;
55 Thread thread;
56 Exception exception;
57 CustomTask(Callable<V> c) {
58 if (c == null) throw new NullPointerException();
59 callable = c;
60 }
61 CustomTask(final Runnable r, final V res) {
62 if (r == null) throw new NullPointerException();
63 callable = new Callable<V>() {
64 public V call() throws Exception { r.run(); return res; }};
65 }
66 public boolean isDone() {
67 lock.lock(); try { return done; } finally { lock.unlock() ; }
68 }
69 public boolean isCancelled() {
70 lock.lock(); try { return cancelled; } finally { lock.unlock() ; }
71 }
72 public boolean cancel(boolean mayInterrupt) {
73 lock.lock();
74 try {
75 if (!done) {
76 cancelled = true;
77 done = true;
78 if (mayInterrupt && thread != null)
79 thread.interrupt();
80 return true;
81 }
82 return false;
83 }
84 finally { lock.unlock() ; }
85 }
86 public void run() {
87 lock.lock();
88 try {
89 if (done)
90 return;
91 thread = Thread.currentThread();
92 }
93 finally { lock.unlock() ; }
94 V v = null;
95 Exception e = null;
96 try {
97 v = callable.call();
98 }
99 catch (Exception ex) {
100 e = ex;
101 }
102 lock.lock();
103 try {
104 if (!done) {
105 result = v;
106 exception = e;
107 done = true;
108 thread = null;
109 cond.signalAll();
110 }
111 }
112 finally { lock.unlock(); }
113 }
114 public V get() throws InterruptedException, ExecutionException {
115 lock.lock();
116 try {
117 while (!done)
118 cond.await();
119 if (cancelled)
120 throw new CancellationException();
121 if (exception != null)
122 throw new ExecutionException(exception);
123 return result;
124 }
125 finally { lock.unlock(); }
126 }
127 public V get(long timeout, TimeUnit unit)
128 throws InterruptedException, ExecutionException, TimeoutException {
129 long nanos = unit.toNanos(timeout);
130 lock.lock();
131 try {
132 while (!done) {
133 if (nanos <= 0L)
134 throw new TimeoutException();
135 nanos = cond.awaitNanos(nanos);
136 }
137 if (cancelled)
138 throw new CancellationException();
139 if (exception != null)
140 throw new ExecutionException(exception);
141 return result;
142 }
143 finally { lock.unlock(); }
144 }
145 }
146
147 static class CustomTPE extends ThreadPoolExecutor {
148 protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
149 return new CustomTask<V>(c);
150 }
151 protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
152 return new CustomTask<V>(r, v);
153 }
154
155 CustomTPE(int corePoolSize,
156 int maximumPoolSize,
157 long keepAliveTime,
158 TimeUnit unit,
159 BlockingQueue<Runnable> workQueue) {
160 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
161 workQueue);
162 }
163 CustomTPE(int corePoolSize,
164 int maximumPoolSize,
165 long keepAliveTime,
166 TimeUnit unit,
167 BlockingQueue<Runnable> workQueue,
168 ThreadFactory threadFactory) {
169 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
170 threadFactory);
171 }
172
173 CustomTPE(int corePoolSize,
174 int maximumPoolSize,
175 long keepAliveTime,
176 TimeUnit unit,
177 BlockingQueue<Runnable> workQueue,
178 RejectedExecutionHandler handler) {
179 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
180 handler);
181 }
182 CustomTPE(int corePoolSize,
183 int maximumPoolSize,
184 long keepAliveTime,
185 TimeUnit unit,
186 BlockingQueue<Runnable> workQueue,
187 ThreadFactory threadFactory,
188 RejectedExecutionHandler handler) {
189 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
190 workQueue, threadFactory, handler);
191 }
192
193 final CountDownLatch beforeCalled = new CountDownLatch(1);
194 final CountDownLatch afterCalled = new CountDownLatch(1);
195 final CountDownLatch terminatedCalled = new CountDownLatch(1);
196
197 public CustomTPE() {
198 super(1, 1, LONG_DELAY_MS, MILLISECONDS, new SynchronousQueue<Runnable>());
199 }
200 protected void beforeExecute(Thread t, Runnable r) {
201 beforeCalled.countDown();
202 }
203 protected void afterExecute(Runnable r, Throwable t) {
204 afterCalled.countDown();
205 }
206 protected void terminated() {
207 terminatedCalled.countDown();
208 }
209
210 public boolean beforeCalled() {
211 return beforeCalled.getCount() == 0;
212 }
213 public boolean afterCalled() {
214 return afterCalled.getCount() == 0;
215 }
216 public boolean terminatedCalled() {
217 return terminatedCalled.getCount() == 0;
218 }
219 }
220
221 static class FailingThreadFactory implements ThreadFactory {
222 int calls = 0;
223 public Thread newThread(Runnable r) {
224 if (++calls > 1) return null;
225 return new Thread(r);
226 }
227 }
228
229 /**
230 * execute successfully executes a runnable
231 */
232 public void testExecute() throws InterruptedException {
233 final ThreadPoolExecutor p =
234 new CustomTPE(1, 1,
235 2 * LONG_DELAY_MS, MILLISECONDS,
236 new ArrayBlockingQueue<Runnable>(10));
237 try (PoolCleaner cleaner = cleaner(p)) {
238 final CountDownLatch done = new CountDownLatch(1);
239 final Runnable task = new CheckedRunnable() {
240 public void realRun() { done.countDown(); }};
241 p.execute(task);
242 assertTrue(done.await(LONG_DELAY_MS, MILLISECONDS));
243 }
244 }
245
246 /**
247 * getActiveCount increases but doesn't overestimate, when a
248 * thread becomes active
249 */
250 public void testGetActiveCount() throws InterruptedException {
251 final ThreadPoolExecutor p =
252 new CustomTPE(2, 2,
253 LONG_DELAY_MS, MILLISECONDS,
254 new ArrayBlockingQueue<Runnable>(10));
255 try (PoolCleaner cleaner = cleaner(p)) {
256 final CountDownLatch threadStarted = new CountDownLatch(1);
257 final CountDownLatch done = new CountDownLatch(1);
258 assertEquals(0, p.getActiveCount());
259 p.execute(new CheckedRunnable() {
260 public void realRun() throws InterruptedException {
261 threadStarted.countDown();
262 assertEquals(1, p.getActiveCount());
263 done.await();
264 }});
265 assertTrue(threadStarted.await(MEDIUM_DELAY_MS, MILLISECONDS));
266 assertEquals(1, p.getActiveCount());
267 done.countDown();
268 }
269 }
270
271 /**
272 * prestartCoreThread starts a thread if under corePoolSize, else doesn't
273 */
274 public void testPrestartCoreThread() {
275 ThreadPoolExecutor p =
276 new CustomTPE(2, 6,
277 LONG_DELAY_MS, MILLISECONDS,
278 new ArrayBlockingQueue<Runnable>(10));
279 try (PoolCleaner cleaner = cleaner(p)) {
280 assertEquals(0, p.getPoolSize());
281 assertTrue(p.prestartCoreThread());
282 assertEquals(1, p.getPoolSize());
283 assertTrue(p.prestartCoreThread());
284 assertEquals(2, p.getPoolSize());
285 assertFalse(p.prestartCoreThread());
286 assertEquals(2, p.getPoolSize());
287 p.setCorePoolSize(4);
288 assertTrue(p.prestartCoreThread());
289 assertEquals(3, p.getPoolSize());
290 assertTrue(p.prestartCoreThread());
291 assertEquals(4, p.getPoolSize());
292 assertFalse(p.prestartCoreThread());
293 assertEquals(4, p.getPoolSize());
294 }
295 }
296
297 /**
298 * prestartAllCoreThreads starts all corePoolSize threads
299 */
300 public void testPrestartAllCoreThreads() {
301 ThreadPoolExecutor p =
302 new CustomTPE(2, 6,
303 LONG_DELAY_MS, MILLISECONDS,
304 new ArrayBlockingQueue<Runnable>(10));
305 try (PoolCleaner cleaner = cleaner(p)) {
306 assertEquals(0, p.getPoolSize());
307 p.prestartAllCoreThreads();
308 assertEquals(2, p.getPoolSize());
309 p.prestartAllCoreThreads();
310 assertEquals(2, p.getPoolSize());
311 p.setCorePoolSize(4);
312 p.prestartAllCoreThreads();
313 assertEquals(4, p.getPoolSize());
314 p.prestartAllCoreThreads();
315 assertEquals(4, p.getPoolSize());
316 }
317 }
318
319 /**
320 * getCompletedTaskCount increases, but doesn't overestimate,
321 * when tasks complete
322 */
323 public void testGetCompletedTaskCount() throws InterruptedException {
324 final ThreadPoolExecutor p =
325 new CustomTPE(2, 2,
326 LONG_DELAY_MS, MILLISECONDS,
327 new ArrayBlockingQueue<Runnable>(10));
328 try (PoolCleaner cleaner = cleaner(p)) {
329 final CountDownLatch threadStarted = new CountDownLatch(1);
330 final CountDownLatch threadProceed = new CountDownLatch(1);
331 final CountDownLatch threadDone = new CountDownLatch(1);
332 assertEquals(0, p.getCompletedTaskCount());
333 p.execute(new CheckedRunnable() {
334 public void realRun() throws InterruptedException {
335 threadStarted.countDown();
336 assertEquals(0, p.getCompletedTaskCount());
337 threadProceed.await();
338 threadDone.countDown();
339 }});
340 await(threadStarted);
341 assertEquals(0, p.getCompletedTaskCount());
342 threadProceed.countDown();
343 threadDone.await();
344 long startTime = System.nanoTime();
345 while (p.getCompletedTaskCount() != 1) {
346 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
347 fail("timed out");
348 Thread.yield();
349 }
350 }
351 }
352
353 /**
354 * getCorePoolSize returns size given in constructor if not otherwise set
355 */
356 public void testGetCorePoolSize() {
357 final ThreadPoolExecutor p =
358 new CustomTPE(1, 1,
359 LONG_DELAY_MS, MILLISECONDS,
360 new ArrayBlockingQueue<Runnable>(10));
361 try (PoolCleaner cleaner = cleaner(p)) {
362 assertEquals(1, p.getCorePoolSize());
363 }
364 }
365
366 /**
367 * getKeepAliveTime returns value given in constructor if not otherwise set
368 */
369 public void testGetKeepAliveTime() {
370 final ThreadPoolExecutor p =
371 new CustomTPE(2, 2,
372 1000, MILLISECONDS,
373 new ArrayBlockingQueue<Runnable>(10));
374 try (PoolCleaner cleaner = cleaner(p)) {
375 assertEquals(1, p.getKeepAliveTime(SECONDS));
376 }
377 }
378
379 /**
380 * getThreadFactory returns factory in constructor if not set
381 */
382 public void testGetThreadFactory() {
383 final ThreadFactory threadFactory = new SimpleThreadFactory();
384 final ThreadPoolExecutor p =
385 new CustomTPE(1, 2,
386 LONG_DELAY_MS, MILLISECONDS,
387 new ArrayBlockingQueue<Runnable>(10),
388 threadFactory,
389 new NoOpREHandler());
390 try (PoolCleaner cleaner = cleaner(p)) {
391 assertSame(threadFactory, p.getThreadFactory());
392 }
393 }
394
395 /**
396 * setThreadFactory sets the thread factory returned by getThreadFactory
397 */
398 public void testSetThreadFactory() {
399 final ThreadPoolExecutor p =
400 new CustomTPE(1, 2,
401 LONG_DELAY_MS, MILLISECONDS,
402 new ArrayBlockingQueue<Runnable>(10));
403 try (PoolCleaner cleaner = cleaner(p)) {
404 ThreadFactory threadFactory = new SimpleThreadFactory();
405 p.setThreadFactory(threadFactory);
406 assertSame(threadFactory, p.getThreadFactory());
407 }
408 }
409
410 /**
411 * setThreadFactory(null) throws NPE
412 */
413 public void testSetThreadFactoryNull() {
414 final ThreadPoolExecutor p =
415 new CustomTPE(1, 2,
416 LONG_DELAY_MS, MILLISECONDS,
417 new ArrayBlockingQueue<Runnable>(10));
418 try (PoolCleaner cleaner = cleaner(p)) {
419 try {
420 p.setThreadFactory(null);
421 shouldThrow();
422 } catch (NullPointerException success) {}
423 }
424 }
425
426 /**
427 * getRejectedExecutionHandler returns handler in constructor if not set
428 */
429 public void testGetRejectedExecutionHandler() {
430 final RejectedExecutionHandler handler = new NoOpREHandler();
431 final ThreadPoolExecutor p =
432 new CustomTPE(1, 2,
433 LONG_DELAY_MS, MILLISECONDS,
434 new ArrayBlockingQueue<Runnable>(10),
435 handler);
436 try (PoolCleaner cleaner = cleaner(p)) {
437 assertSame(handler, p.getRejectedExecutionHandler());
438 }
439 }
440
441 /**
442 * setRejectedExecutionHandler sets the handler returned by
443 * getRejectedExecutionHandler
444 */
445 public void testSetRejectedExecutionHandler() {
446 final ThreadPoolExecutor p =
447 new CustomTPE(1, 2,
448 LONG_DELAY_MS, MILLISECONDS,
449 new ArrayBlockingQueue<Runnable>(10));
450 try (PoolCleaner cleaner = cleaner(p)) {
451 RejectedExecutionHandler handler = new NoOpREHandler();
452 p.setRejectedExecutionHandler(handler);
453 assertSame(handler, p.getRejectedExecutionHandler());
454 }
455 }
456
457 /**
458 * setRejectedExecutionHandler(null) throws NPE
459 */
460 public void testSetRejectedExecutionHandlerNull() {
461 final ThreadPoolExecutor p =
462 new CustomTPE(1, 2,
463 LONG_DELAY_MS, MILLISECONDS,
464 new ArrayBlockingQueue<Runnable>(10));
465 try (PoolCleaner cleaner = cleaner(p)) {
466 try {
467 p.setRejectedExecutionHandler(null);
468 shouldThrow();
469 } catch (NullPointerException success) {}
470 }
471 }
472
473 /**
474 * getLargestPoolSize increases, but doesn't overestimate, when
475 * multiple threads active
476 */
477 public void testGetLargestPoolSize() throws InterruptedException {
478 final int THREADS = 3;
479 final ThreadPoolExecutor p =
480 new CustomTPE(THREADS, THREADS,
481 LONG_DELAY_MS, MILLISECONDS,
482 new ArrayBlockingQueue<Runnable>(10));
483 try (PoolCleaner cleaner = cleaner(p)) {
484 final CountDownLatch threadsStarted = new CountDownLatch(THREADS);
485 final CountDownLatch done = new CountDownLatch(1);
486 assertEquals(0, p.getLargestPoolSize());
487 for (int i = 0; i < THREADS; i++)
488 p.execute(new CheckedRunnable() {
489 public void realRun() throws InterruptedException {
490 threadsStarted.countDown();
491 done.await();
492 assertEquals(THREADS, p.getLargestPoolSize());
493 }});
494 assertTrue(threadsStarted.await(MEDIUM_DELAY_MS, MILLISECONDS));
495 assertEquals(THREADS, p.getLargestPoolSize());
496 done.countDown(); // release pool
497 }
498 assertEquals(THREADS, p.getLargestPoolSize());
499 }
500
501 /**
502 * getMaximumPoolSize returns value given in constructor if not
503 * otherwise set
504 */
505 public void testGetMaximumPoolSize() {
506 final ThreadPoolExecutor p =
507 new CustomTPE(2, 3,
508 LONG_DELAY_MS, MILLISECONDS,
509 new ArrayBlockingQueue<Runnable>(10));
510 try (PoolCleaner cleaner = cleaner(p)) {
511 assertEquals(3, p.getMaximumPoolSize());
512 p.setMaximumPoolSize(5);
513 assertEquals(5, p.getMaximumPoolSize());
514 p.setMaximumPoolSize(4);
515 assertEquals(4, p.getMaximumPoolSize());
516 }
517 }
518
519 /**
520 * getPoolSize increases, but doesn't overestimate, when threads
521 * become active
522 */
523 public void testGetPoolSize() throws InterruptedException {
524 final ThreadPoolExecutor p =
525 new CustomTPE(1, 1,
526 LONG_DELAY_MS, MILLISECONDS,
527 new ArrayBlockingQueue<Runnable>(10));
528 try (PoolCleaner cleaner = cleaner(p)) {
529 final CountDownLatch threadStarted = new CountDownLatch(1);
530 final CountDownLatch done = new CountDownLatch(1);
531 assertEquals(0, p.getPoolSize());
532 p.execute(new CheckedRunnable() {
533 public void realRun() throws InterruptedException {
534 threadStarted.countDown();
535 assertEquals(1, p.getPoolSize());
536 done.await();
537 }});
538 assertTrue(threadStarted.await(MEDIUM_DELAY_MS, MILLISECONDS));
539 assertEquals(1, p.getPoolSize());
540 done.countDown(); // release pool
541 }
542 }
543
544 /**
545 * getTaskCount increases, but doesn't overestimate, when tasks submitted
546 */
547 public void testGetTaskCount() throws InterruptedException {
548 final ThreadPoolExecutor p =
549 new CustomTPE(1, 1,
550 LONG_DELAY_MS, MILLISECONDS,
551 new ArrayBlockingQueue<Runnable>(10));
552 try (PoolCleaner cleaner = cleaner(p)) {
553 final CountDownLatch threadStarted = new CountDownLatch(1);
554 final CountDownLatch done = new CountDownLatch(1);
555 assertEquals(0, p.getTaskCount());
556 p.execute(new CheckedRunnable() {
557 public void realRun() throws InterruptedException {
558 threadStarted.countDown();
559 assertEquals(1, p.getTaskCount());
560 done.await();
561 }});
562 assertTrue(threadStarted.await(MEDIUM_DELAY_MS, MILLISECONDS));
563 assertEquals(1, p.getTaskCount());
564 done.countDown();
565 }
566 }
567
568 /**
569 * isShutdown is false before shutdown, true after
570 */
571 public void testIsShutdown() {
572 final ThreadPoolExecutor p =
573 new CustomTPE(1, 1,
574 LONG_DELAY_MS, MILLISECONDS,
575 new ArrayBlockingQueue<Runnable>(10));
576 try (PoolCleaner cleaner = cleaner(p)) {
577 assertFalse(p.isShutdown());
578 try { p.shutdown(); } catch (SecurityException ok) { return; }
579 assertTrue(p.isShutdown());
580 }
581 }
582
583 /**
584 * isTerminated is false before termination, true after
585 */
586 public void testIsTerminated() throws InterruptedException {
587 final ThreadPoolExecutor p =
588 new CustomTPE(1, 1,
589 LONG_DELAY_MS, MILLISECONDS,
590 new ArrayBlockingQueue<Runnable>(10));
591 try (PoolCleaner cleaner = cleaner(p)) {
592 final CountDownLatch threadStarted = new CountDownLatch(1);
593 final CountDownLatch done = new CountDownLatch(1);
594 assertFalse(p.isTerminating());
595 p.execute(new CheckedRunnable() {
596 public void realRun() throws InterruptedException {
597 assertFalse(p.isTerminating());
598 threadStarted.countDown();
599 done.await();
600 }});
601 assertTrue(threadStarted.await(MEDIUM_DELAY_MS, MILLISECONDS));
602 assertFalse(p.isTerminating());
603 done.countDown();
604 try { p.shutdown(); } catch (SecurityException ok) { return; }
605 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
606 assertTrue(p.isTerminated());
607 assertFalse(p.isTerminating());
608 }
609 }
610
611 /**
612 * isTerminating is not true when running or when terminated
613 */
614 public void testIsTerminating() throws InterruptedException {
615 final ThreadPoolExecutor p =
616 new CustomTPE(1, 1,
617 LONG_DELAY_MS, MILLISECONDS,
618 new ArrayBlockingQueue<Runnable>(10));
619 try (PoolCleaner cleaner = cleaner(p)) {
620 final CountDownLatch threadStarted = new CountDownLatch(1);
621 final CountDownLatch done = new CountDownLatch(1);
622 assertFalse(p.isTerminating());
623 p.execute(new CheckedRunnable() {
624 public void realRun() throws InterruptedException {
625 assertFalse(p.isTerminating());
626 threadStarted.countDown();
627 done.await();
628 }});
629 assertTrue(threadStarted.await(MEDIUM_DELAY_MS, MILLISECONDS));
630 assertFalse(p.isTerminating());
631 done.countDown();
632 try { p.shutdown(); } catch (SecurityException ok) { return; }
633 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
634 assertTrue(p.isTerminated());
635 assertFalse(p.isTerminating());
636 }
637 }
638
639 /**
640 * getQueue returns the work queue, which contains queued tasks
641 */
642 public void testGetQueue() throws InterruptedException {
643 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
644 final ThreadPoolExecutor p =
645 new CustomTPE(1, 1,
646 LONG_DELAY_MS, MILLISECONDS,
647 q);
648 try (PoolCleaner cleaner = cleaner(p)) {
649 final CountDownLatch threadStarted = new CountDownLatch(1);
650 final CountDownLatch done = new CountDownLatch(1);
651 FutureTask[] tasks = new FutureTask[5];
652 for (int i = 0; i < tasks.length; i++) {
653 Callable task = new CheckedCallable<Boolean>() {
654 public Boolean realCall() throws InterruptedException {
655 threadStarted.countDown();
656 assertSame(q, p.getQueue());
657 done.await();
658 return Boolean.TRUE;
659 }};
660 tasks[i] = new FutureTask(task);
661 p.execute(tasks[i]);
662 }
663 assertTrue(threadStarted.await(MEDIUM_DELAY_MS, MILLISECONDS));
664 assertSame(q, p.getQueue());
665 assertFalse(q.contains(tasks[0]));
666 assertTrue(q.contains(tasks[tasks.length - 1]));
667 assertEquals(tasks.length - 1, q.size());
668 done.countDown();
669 }
670 }
671
672 /**
673 * remove(task) removes queued task, and fails to remove active task
674 */
675 public void testRemove() throws InterruptedException {
676 BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
677 final ThreadPoolExecutor p =
678 new CustomTPE(1, 1,
679 LONG_DELAY_MS, MILLISECONDS,
680 q);
681 Runnable[] tasks = new Runnable[6];
682 final CountDownLatch threadStarted = new CountDownLatch(1);
683 final CountDownLatch done = new CountDownLatch(1);
684 try {
685 for (int i = 0; i < tasks.length; i++) {
686 tasks[i] = new CheckedRunnable() {
687 public void realRun() throws InterruptedException {
688 threadStarted.countDown();
689 done.await();
690 }};
691 p.execute(tasks[i]);
692 }
693 assertTrue(threadStarted.await(MEDIUM_DELAY_MS, MILLISECONDS));
694 assertFalse(p.remove(tasks[0]));
695 assertTrue(q.contains(tasks[4]));
696 assertTrue(q.contains(tasks[3]));
697 assertTrue(p.remove(tasks[4]));
698 assertFalse(p.remove(tasks[4]));
699 assertFalse(q.contains(tasks[4]));
700 assertTrue(q.contains(tasks[3]));
701 assertTrue(p.remove(tasks[3]));
702 assertFalse(q.contains(tasks[3]));
703 } finally {
704 done.countDown();
705 joinPool(p);
706 }
707 }
708
709 /**
710 * purge removes cancelled tasks from the queue
711 */
712 public void testPurge() throws InterruptedException {
713 final CountDownLatch threadStarted = new CountDownLatch(1);
714 final CountDownLatch done = new CountDownLatch(1);
715 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
716 final ThreadPoolExecutor p =
717 new CustomTPE(1, 1,
718 LONG_DELAY_MS, MILLISECONDS,
719 q);
720 FutureTask[] tasks = new FutureTask[5];
721 try {
722 for (int i = 0; i < tasks.length; i++) {
723 Callable task = new CheckedCallable<Boolean>() {
724 public Boolean realCall() throws InterruptedException {
725 threadStarted.countDown();
726 done.await();
727 return Boolean.TRUE;
728 }};
729 tasks[i] = new FutureTask(task);
730 p.execute(tasks[i]);
731 }
732 assertTrue(threadStarted.await(MEDIUM_DELAY_MS, MILLISECONDS));
733 assertEquals(tasks.length, p.getTaskCount());
734 assertEquals(tasks.length - 1, q.size());
735 assertEquals(1L, p.getActiveCount());
736 assertEquals(0L, p.getCompletedTaskCount());
737 tasks[4].cancel(true);
738 tasks[3].cancel(false);
739 p.purge();
740 assertEquals(tasks.length - 3, q.size());
741 assertEquals(tasks.length - 2, p.getTaskCount());
742 p.purge(); // Nothing to do
743 assertEquals(tasks.length - 3, q.size());
744 assertEquals(tasks.length - 2, p.getTaskCount());
745 } finally {
746 done.countDown();
747 joinPool(p);
748 }
749 }
750
751 /**
752 * shutdownNow returns a list containing tasks that were not run,
753 * and those tasks are drained from the queue
754 */
755 public void testShutdownNow() throws InterruptedException {
756 final int poolSize = 2;
757 final int count = 5;
758 final AtomicInteger ran = new AtomicInteger(0);
759 ThreadPoolExecutor p =
760 new CustomTPE(poolSize, poolSize, LONG_DELAY_MS, MILLISECONDS,
761 new ArrayBlockingQueue<Runnable>(10));
762 CountDownLatch threadsStarted = new CountDownLatch(poolSize);
763 Runnable waiter = new CheckedRunnable() { public void realRun() {
764 threadsStarted.countDown();
765 try {
766 MILLISECONDS.sleep(2 * LONG_DELAY_MS);
767 } catch (InterruptedException success) {}
768 ran.getAndIncrement();
769 }};
770 for (int i = 0; i < count; i++)
771 p.execute(waiter);
772 assertTrue(threadsStarted.await(LONG_DELAY_MS, MILLISECONDS));
773 assertEquals(poolSize, p.getActiveCount());
774 assertEquals(0, p.getCompletedTaskCount());
775 final List<Runnable> queuedTasks;
776 try {
777 queuedTasks = p.shutdownNow();
778 } catch (SecurityException ok) {
779 return; // Allowed in case test doesn't have privs
780 }
781 assertTrue(p.isShutdown());
782 assertTrue(p.getQueue().isEmpty());
783 assertEquals(count - poolSize, queuedTasks.size());
784 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
785 assertTrue(p.isTerminated());
786 assertEquals(poolSize, ran.get());
787 assertEquals(poolSize, p.getCompletedTaskCount());
788 }
789
790 // Exception Tests
791
792 /**
793 * Constructor throws if corePoolSize argument is less than zero
794 */
795 public void testConstructor1() {
796 try {
797 new CustomTPE(-1, 1, 1L, SECONDS,
798 new ArrayBlockingQueue<Runnable>(10));
799 shouldThrow();
800 } catch (IllegalArgumentException success) {}
801 }
802
803 /**
804 * Constructor throws if maximumPoolSize is less than zero
805 */
806 public void testConstructor2() {
807 try {
808 new CustomTPE(1, -1, 1L, SECONDS,
809 new ArrayBlockingQueue<Runnable>(10));
810 shouldThrow();
811 } catch (IllegalArgumentException success) {}
812 }
813
814 /**
815 * Constructor throws if maximumPoolSize is equal to zero
816 */
817 public void testConstructor3() {
818 try {
819 new CustomTPE(1, 0, 1L, SECONDS,
820 new ArrayBlockingQueue<Runnable>(10));
821 shouldThrow();
822 } catch (IllegalArgumentException success) {}
823 }
824
825 /**
826 * Constructor throws if keepAliveTime is less than zero
827 */
828 public void testConstructor4() {
829 try {
830 new CustomTPE(1, 2, -1L, SECONDS,
831 new ArrayBlockingQueue<Runnable>(10));
832 shouldThrow();
833 } catch (IllegalArgumentException success) {}
834 }
835
836 /**
837 * Constructor throws if corePoolSize is greater than the maximumPoolSize
838 */
839 public void testConstructor5() {
840 try {
841 new CustomTPE(2, 1, 1L, SECONDS,
842 new ArrayBlockingQueue<Runnable>(10));
843 shouldThrow();
844 } catch (IllegalArgumentException success) {}
845 }
846
847 /**
848 * Constructor throws if workQueue is set to null
849 */
850 public void testConstructorNullPointerException() {
851 try {
852 new CustomTPE(1, 2, 1L, SECONDS, null);
853 shouldThrow();
854 } catch (NullPointerException success) {}
855 }
856
857 /**
858 * Constructor throws if corePoolSize argument is less than zero
859 */
860 public void testConstructor6() {
861 try {
862 new CustomTPE(-1, 1, 1L, SECONDS,
863 new ArrayBlockingQueue<Runnable>(10),
864 new SimpleThreadFactory());
865 shouldThrow();
866 } catch (IllegalArgumentException success) {}
867 }
868
869 /**
870 * Constructor throws if maximumPoolSize is less than zero
871 */
872 public void testConstructor7() {
873 try {
874 new CustomTPE(1,-1, 1L, SECONDS,
875 new ArrayBlockingQueue<Runnable>(10),
876 new SimpleThreadFactory());
877 shouldThrow();
878 } catch (IllegalArgumentException success) {}
879 }
880
881 /**
882 * Constructor throws if maximumPoolSize is equal to zero
883 */
884 public void testConstructor8() {
885 try {
886 new CustomTPE(1, 0, 1L, SECONDS,
887 new ArrayBlockingQueue<Runnable>(10),
888 new SimpleThreadFactory());
889 shouldThrow();
890 } catch (IllegalArgumentException success) {}
891 }
892
893 /**
894 * Constructor throws if keepAliveTime is less than zero
895 */
896 public void testConstructor9() {
897 try {
898 new CustomTPE(1, 2, -1L, SECONDS,
899 new ArrayBlockingQueue<Runnable>(10),
900 new SimpleThreadFactory());
901 shouldThrow();
902 } catch (IllegalArgumentException success) {}
903 }
904
905 /**
906 * Constructor throws if corePoolSize is greater than the maximumPoolSize
907 */
908 public void testConstructor10() {
909 try {
910 new CustomTPE(2, 1, 1L, SECONDS,
911 new ArrayBlockingQueue<Runnable>(10),
912 new SimpleThreadFactory());
913 shouldThrow();
914 } catch (IllegalArgumentException success) {}
915 }
916
917 /**
918 * Constructor throws if workQueue is set to null
919 */
920 public void testConstructorNullPointerException2() {
921 try {
922 new CustomTPE(1, 2, 1L, SECONDS, null, new SimpleThreadFactory());
923 shouldThrow();
924 } catch (NullPointerException success) {}
925 }
926
927 /**
928 * Constructor throws if threadFactory is set to null
929 */
930 public void testConstructorNullPointerException3() {
931 try {
932 new CustomTPE(1, 2, 1L, SECONDS,
933 new ArrayBlockingQueue<Runnable>(10),
934 (ThreadFactory) null);
935 shouldThrow();
936 } catch (NullPointerException success) {}
937 }
938
939 /**
940 * Constructor throws if corePoolSize argument is less than zero
941 */
942 public void testConstructor11() {
943 try {
944 new CustomTPE(-1, 1, 1L, SECONDS,
945 new ArrayBlockingQueue<Runnable>(10),
946 new NoOpREHandler());
947 shouldThrow();
948 } catch (IllegalArgumentException success) {}
949 }
950
951 /**
952 * Constructor throws if maximumPoolSize is less than zero
953 */
954 public void testConstructor12() {
955 try {
956 new CustomTPE(1, -1, 1L, SECONDS,
957 new ArrayBlockingQueue<Runnable>(10),
958 new NoOpREHandler());
959 shouldThrow();
960 } catch (IllegalArgumentException success) {}
961 }
962
963 /**
964 * Constructor throws if maximumPoolSize is equal to zero
965 */
966 public void testConstructor13() {
967 try {
968 new CustomTPE(1, 0, 1L, SECONDS,
969 new ArrayBlockingQueue<Runnable>(10),
970 new NoOpREHandler());
971 shouldThrow();
972 } catch (IllegalArgumentException success) {}
973 }
974
975 /**
976 * Constructor throws if keepAliveTime is less than zero
977 */
978 public void testConstructor14() {
979 try {
980 new CustomTPE(1, 2, -1L, SECONDS,
981 new ArrayBlockingQueue<Runnable>(10),
982 new NoOpREHandler());
983 shouldThrow();
984 } catch (IllegalArgumentException success) {}
985 }
986
987 /**
988 * Constructor throws if corePoolSize is greater than the maximumPoolSize
989 */
990 public void testConstructor15() {
991 try {
992 new CustomTPE(2, 1, 1L, SECONDS,
993 new ArrayBlockingQueue<Runnable>(10),
994 new NoOpREHandler());
995 shouldThrow();
996 } catch (IllegalArgumentException success) {}
997 }
998
999 /**
1000 * Constructor throws if workQueue is set to null
1001 */
1002 public void testConstructorNullPointerException4() {
1003 try {
1004 new CustomTPE(1, 2, 1L, SECONDS,
1005 null,
1006 new NoOpREHandler());
1007 shouldThrow();
1008 } catch (NullPointerException success) {}
1009 }
1010
1011 /**
1012 * Constructor throws if handler is set to null
1013 */
1014 public void testConstructorNullPointerException5() {
1015 try {
1016 new CustomTPE(1, 2, 1L, SECONDS,
1017 new ArrayBlockingQueue<Runnable>(10),
1018 (RejectedExecutionHandler) null);
1019 shouldThrow();
1020 } catch (NullPointerException success) {}
1021 }
1022
1023 /**
1024 * Constructor throws if corePoolSize argument is less than zero
1025 */
1026 public void testConstructor16() {
1027 try {
1028 new CustomTPE(-1, 1, 1L, SECONDS,
1029 new ArrayBlockingQueue<Runnable>(10),
1030 new SimpleThreadFactory(),
1031 new NoOpREHandler());
1032 shouldThrow();
1033 } catch (IllegalArgumentException success) {}
1034 }
1035
1036 /**
1037 * Constructor throws if maximumPoolSize is less than zero
1038 */
1039 public void testConstructor17() {
1040 try {
1041 new CustomTPE(1, -1, 1L, SECONDS,
1042 new ArrayBlockingQueue<Runnable>(10),
1043 new SimpleThreadFactory(),
1044 new NoOpREHandler());
1045 shouldThrow();
1046 } catch (IllegalArgumentException success) {}
1047 }
1048
1049 /**
1050 * Constructor throws if maximumPoolSize is equal to zero
1051 */
1052 public void testConstructor18() {
1053 try {
1054 new CustomTPE(1, 0, 1L, SECONDS,
1055 new ArrayBlockingQueue<Runnable>(10),
1056 new SimpleThreadFactory(),
1057 new NoOpREHandler());
1058 shouldThrow();
1059 } catch (IllegalArgumentException success) {}
1060 }
1061
1062 /**
1063 * Constructor throws if keepAliveTime is less than zero
1064 */
1065 public void testConstructor19() {
1066 try {
1067 new CustomTPE(1, 2, -1L, SECONDS,
1068 new ArrayBlockingQueue<Runnable>(10),
1069 new SimpleThreadFactory(),
1070 new NoOpREHandler());
1071 shouldThrow();
1072 } catch (IllegalArgumentException success) {}
1073 }
1074
1075 /**
1076 * Constructor throws if corePoolSize is greater than the maximumPoolSize
1077 */
1078 public void testConstructor20() {
1079 try {
1080 new CustomTPE(2, 1, 1L, SECONDS,
1081 new ArrayBlockingQueue<Runnable>(10),
1082 new SimpleThreadFactory(),
1083 new NoOpREHandler());
1084 shouldThrow();
1085 } catch (IllegalArgumentException success) {}
1086 }
1087
1088 /**
1089 * Constructor throws if workQueue is null
1090 */
1091 public void testConstructorNullPointerException6() {
1092 try {
1093 new CustomTPE(1, 2, 1L, SECONDS,
1094 null,
1095 new SimpleThreadFactory(),
1096 new NoOpREHandler());
1097 shouldThrow();
1098 } catch (NullPointerException success) {}
1099 }
1100
1101 /**
1102 * Constructor throws if handler is null
1103 */
1104 public void testConstructorNullPointerException7() {
1105 try {
1106 new CustomTPE(1, 2, 1L, SECONDS,
1107 new ArrayBlockingQueue<Runnable>(10),
1108 new SimpleThreadFactory(),
1109 (RejectedExecutionHandler) null);
1110 shouldThrow();
1111 } catch (NullPointerException success) {}
1112 }
1113
1114 /**
1115 * Constructor throws if ThreadFactory is null
1116 */
1117 public void testConstructorNullPointerException8() {
1118 try {
1119 new CustomTPE(1, 2, 1L, SECONDS,
1120 new ArrayBlockingQueue<Runnable>(10),
1121 (ThreadFactory) null,
1122 new NoOpREHandler());
1123 shouldThrow();
1124 } catch (NullPointerException success) {}
1125 }
1126
1127 /**
1128 * execute throws RejectedExecutionException if saturated.
1129 */
1130 public void testSaturatedExecute() {
1131 ThreadPoolExecutor p =
1132 new CustomTPE(1, 1,
1133 LONG_DELAY_MS, MILLISECONDS,
1134 new ArrayBlockingQueue<Runnable>(1));
1135 final CountDownLatch done = new CountDownLatch(1);
1136 try {
1137 Runnable task = new CheckedRunnable() {
1138 public void realRun() throws InterruptedException {
1139 done.await();
1140 }};
1141 for (int i = 0; i < 2; ++i)
1142 p.execute(task);
1143 for (int i = 0; i < 2; ++i) {
1144 try {
1145 p.execute(task);
1146 shouldThrow();
1147 } catch (RejectedExecutionException success) {}
1148 assertTrue(p.getTaskCount() <= 2);
1149 }
1150 } finally {
1151 done.countDown();
1152 joinPool(p);
1153 }
1154 }
1155
1156 /**
1157 * executor using CallerRunsPolicy runs task if saturated.
1158 */
1159 public void testSaturatedExecute2() {
1160 RejectedExecutionHandler h = new CustomTPE.CallerRunsPolicy();
1161 ThreadPoolExecutor p = new CustomTPE(1, 1,
1162 LONG_DELAY_MS, MILLISECONDS,
1163 new ArrayBlockingQueue<Runnable>(1),
1164 h);
1165 try {
1166 TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5];
1167 for (int i = 0; i < tasks.length; ++i)
1168 tasks[i] = new TrackedNoOpRunnable();
1169 TrackedLongRunnable mr = new TrackedLongRunnable();
1170 p.execute(mr);
1171 for (int i = 0; i < tasks.length; ++i)
1172 p.execute(tasks[i]);
1173 for (int i = 1; i < tasks.length; ++i)
1174 assertTrue(tasks[i].done);
1175 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1176 } finally {
1177 joinPool(p);
1178 }
1179 }
1180
1181 /**
1182 * executor using DiscardPolicy drops task if saturated.
1183 */
1184 public void testSaturatedExecute3() {
1185 RejectedExecutionHandler h = new CustomTPE.DiscardPolicy();
1186 ThreadPoolExecutor p =
1187 new CustomTPE(1, 1,
1188 LONG_DELAY_MS, MILLISECONDS,
1189 new ArrayBlockingQueue<Runnable>(1),
1190 h);
1191 try {
1192 TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5];
1193 for (int i = 0; i < tasks.length; ++i)
1194 tasks[i] = new TrackedNoOpRunnable();
1195 p.execute(new TrackedLongRunnable());
1196 for (TrackedNoOpRunnable task : tasks)
1197 p.execute(task);
1198 for (TrackedNoOpRunnable task : tasks)
1199 assertFalse(task.done);
1200 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1201 } finally {
1202 joinPool(p);
1203 }
1204 }
1205
1206 /**
1207 * executor using DiscardOldestPolicy drops oldest task if saturated.
1208 */
1209 public void testSaturatedExecute4() {
1210 RejectedExecutionHandler h = new CustomTPE.DiscardOldestPolicy();
1211 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1212 try {
1213 p.execute(new TrackedLongRunnable());
1214 TrackedLongRunnable r2 = new TrackedLongRunnable();
1215 p.execute(r2);
1216 assertTrue(p.getQueue().contains(r2));
1217 TrackedNoOpRunnable r3 = new TrackedNoOpRunnable();
1218 p.execute(r3);
1219 assertFalse(p.getQueue().contains(r2));
1220 assertTrue(p.getQueue().contains(r3));
1221 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1222 } finally {
1223 joinPool(p);
1224 }
1225 }
1226
1227 /**
1228 * execute throws RejectedExecutionException if shutdown
1229 */
1230 public void testRejectedExecutionExceptionOnShutdown() {
1231 ThreadPoolExecutor p =
1232 new CustomTPE(1,1,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(1));
1233 try { p.shutdown(); } catch (SecurityException ok) { return; }
1234 try {
1235 p.execute(new NoOpRunnable());
1236 shouldThrow();
1237 } catch (RejectedExecutionException success) {}
1238
1239 joinPool(p);
1240 }
1241
1242 /**
1243 * execute using CallerRunsPolicy drops task on shutdown
1244 */
1245 public void testCallerRunsOnShutdown() {
1246 RejectedExecutionHandler h = new CustomTPE.CallerRunsPolicy();
1247 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1248
1249 try { p.shutdown(); } catch (SecurityException ok) { return; }
1250 try {
1251 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1252 p.execute(r);
1253 assertFalse(r.done);
1254 } finally {
1255 joinPool(p);
1256 }
1257 }
1258
1259 /**
1260 * execute using DiscardPolicy drops task on shutdown
1261 */
1262 public void testDiscardOnShutdown() {
1263 RejectedExecutionHandler h = new CustomTPE.DiscardPolicy();
1264 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1265
1266 try { p.shutdown(); } catch (SecurityException ok) { return; }
1267 try {
1268 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1269 p.execute(r);
1270 assertFalse(r.done);
1271 } finally {
1272 joinPool(p);
1273 }
1274 }
1275
1276 /**
1277 * execute using DiscardOldestPolicy drops task on shutdown
1278 */
1279 public void testDiscardOldestOnShutdown() {
1280 RejectedExecutionHandler h = new CustomTPE.DiscardOldestPolicy();
1281 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1282
1283 try { p.shutdown(); } catch (SecurityException ok) { return; }
1284 try {
1285 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1286 p.execute(r);
1287 assertFalse(r.done);
1288 } finally {
1289 joinPool(p);
1290 }
1291 }
1292
1293 /**
1294 * execute(null) throws NPE
1295 */
1296 public void testExecuteNull() {
1297 ThreadPoolExecutor p =
1298 new CustomTPE(1, 2, 1L, SECONDS,
1299 new ArrayBlockingQueue<Runnable>(10));
1300 try {
1301 p.execute(null);
1302 shouldThrow();
1303 } catch (NullPointerException success) {}
1304
1305 joinPool(p);
1306 }
1307
1308 /**
1309 * setCorePoolSize of negative value throws IllegalArgumentException
1310 */
1311 public void testCorePoolSizeIllegalArgumentException() {
1312 ThreadPoolExecutor p =
1313 new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1314 try {
1315 p.setCorePoolSize(-1);
1316 shouldThrow();
1317 } catch (IllegalArgumentException success) {
1318 } finally {
1319 try { p.shutdown(); } catch (SecurityException ok) { return; }
1320 }
1321 joinPool(p);
1322 }
1323
1324 /**
1325 * setMaximumPoolSize(int) throws IllegalArgumentException
1326 * if given a value less the core pool size
1327 */
1328 public void testMaximumPoolSizeIllegalArgumentException() {
1329 ThreadPoolExecutor p =
1330 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1331 try {
1332 p.setMaximumPoolSize(1);
1333 shouldThrow();
1334 } catch (IllegalArgumentException success) {
1335 } finally {
1336 try { p.shutdown(); } catch (SecurityException ok) { return; }
1337 }
1338 joinPool(p);
1339 }
1340
1341 /**
1342 * setMaximumPoolSize throws IllegalArgumentException
1343 * if given a negative value
1344 */
1345 public void testMaximumPoolSizeIllegalArgumentException2() {
1346 ThreadPoolExecutor p =
1347 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1348 try {
1349 p.setMaximumPoolSize(-1);
1350 shouldThrow();
1351 } catch (IllegalArgumentException success) {
1352 } finally {
1353 try { p.shutdown(); } catch (SecurityException ok) { return; }
1354 }
1355 joinPool(p);
1356 }
1357
1358 /**
1359 * setKeepAliveTime throws IllegalArgumentException
1360 * when given a negative value
1361 */
1362 public void testKeepAliveTimeIllegalArgumentException() {
1363 ThreadPoolExecutor p =
1364 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1365
1366 try {
1367 p.setKeepAliveTime(-1,MILLISECONDS);
1368 shouldThrow();
1369 } catch (IllegalArgumentException success) {
1370 } finally {
1371 try { p.shutdown(); } catch (SecurityException ok) { return; }
1372 }
1373 joinPool(p);
1374 }
1375
1376 /**
1377 * terminated() is called on termination
1378 */
1379 public void testTerminated() {
1380 CustomTPE p = new CustomTPE();
1381 try { p.shutdown(); } catch (SecurityException ok) { return; }
1382 assertTrue(p.terminatedCalled());
1383 joinPool(p);
1384 }
1385
1386 /**
1387 * beforeExecute and afterExecute are called when executing task
1388 */
1389 public void testBeforeAfter() throws InterruptedException {
1390 CustomTPE p = new CustomTPE();
1391 try {
1392 final CountDownLatch done = new CountDownLatch(1);
1393 p.execute(new CheckedRunnable() {
1394 public void realRun() {
1395 done.countDown();
1396 }});
1397 await(p.afterCalled);
1398 assertEquals(0, done.getCount());
1399 assertTrue(p.afterCalled());
1400 assertTrue(p.beforeCalled());
1401 try { p.shutdown(); } catch (SecurityException ok) { return; }
1402 } finally {
1403 joinPool(p);
1404 }
1405 }
1406
1407 /**
1408 * completed submit of callable returns result
1409 */
1410 public void testSubmitCallable() throws Exception {
1411 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1412 try {
1413 Future<String> future = e.submit(new StringTask());
1414 String result = future.get();
1415 assertSame(TEST_STRING, result);
1416 } finally {
1417 joinPool(e);
1418 }
1419 }
1420
1421 /**
1422 * completed submit of runnable returns successfully
1423 */
1424 public void testSubmitRunnable() throws Exception {
1425 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1426 try {
1427 Future<?> future = e.submit(new NoOpRunnable());
1428 future.get();
1429 assertTrue(future.isDone());
1430 } finally {
1431 joinPool(e);
1432 }
1433 }
1434
1435 /**
1436 * completed submit of (runnable, result) returns result
1437 */
1438 public void testSubmitRunnable2() throws Exception {
1439 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1440 try {
1441 Future<String> future = e.submit(new NoOpRunnable(), TEST_STRING);
1442 String result = future.get();
1443 assertSame(TEST_STRING, result);
1444 } finally {
1445 joinPool(e);
1446 }
1447 }
1448
1449 /**
1450 * invokeAny(null) throws NPE
1451 */
1452 public void testInvokeAny1() throws Exception {
1453 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1454 try {
1455 e.invokeAny(null);
1456 shouldThrow();
1457 } catch (NullPointerException success) {
1458 } finally {
1459 joinPool(e);
1460 }
1461 }
1462
1463 /**
1464 * invokeAny(empty collection) throws IAE
1465 */
1466 public void testInvokeAny2() throws Exception {
1467 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1468 try {
1469 e.invokeAny(new ArrayList<Callable<String>>());
1470 shouldThrow();
1471 } catch (IllegalArgumentException success) {
1472 } finally {
1473 joinPool(e);
1474 }
1475 }
1476
1477 /**
1478 * invokeAny(c) throws NPE if c has null elements
1479 */
1480 public void testInvokeAny3() throws Exception {
1481 CountDownLatch latch = new CountDownLatch(1);
1482 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1483 List<Callable<String>> l = new ArrayList<Callable<String>>();
1484 l.add(latchAwaitingStringTask(latch));
1485 l.add(null);
1486 try {
1487 e.invokeAny(l);
1488 shouldThrow();
1489 } catch (NullPointerException success) {
1490 } finally {
1491 latch.countDown();
1492 joinPool(e);
1493 }
1494 }
1495
1496 /**
1497 * invokeAny(c) throws ExecutionException if no task completes
1498 */
1499 public void testInvokeAny4() throws Exception {
1500 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1501 List<Callable<String>> l = new ArrayList<Callable<String>>();
1502 l.add(new NPETask());
1503 try {
1504 e.invokeAny(l);
1505 shouldThrow();
1506 } catch (ExecutionException success) {
1507 assertTrue(success.getCause() instanceof NullPointerException);
1508 } finally {
1509 joinPool(e);
1510 }
1511 }
1512
1513 /**
1514 * invokeAny(c) returns result of some task
1515 */
1516 public void testInvokeAny5() throws Exception {
1517 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1518 try {
1519 List<Callable<String>> l = new ArrayList<Callable<String>>();
1520 l.add(new StringTask());
1521 l.add(new StringTask());
1522 String result = e.invokeAny(l);
1523 assertSame(TEST_STRING, result);
1524 } finally {
1525 joinPool(e);
1526 }
1527 }
1528
1529 /**
1530 * invokeAll(null) throws NPE
1531 */
1532 public void testInvokeAll1() throws Exception {
1533 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1534 try {
1535 e.invokeAll(null);
1536 shouldThrow();
1537 } catch (NullPointerException success) {
1538 } finally {
1539 joinPool(e);
1540 }
1541 }
1542
1543 /**
1544 * invokeAll(empty collection) returns empty collection
1545 */
1546 public void testInvokeAll2() throws Exception {
1547 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1548 try {
1549 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>());
1550 assertTrue(r.isEmpty());
1551 } finally {
1552 joinPool(e);
1553 }
1554 }
1555
1556 /**
1557 * invokeAll(c) throws NPE if c has null elements
1558 */
1559 public void testInvokeAll3() throws Exception {
1560 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1561 List<Callable<String>> l = new ArrayList<Callable<String>>();
1562 l.add(new StringTask());
1563 l.add(null);
1564 try {
1565 e.invokeAll(l);
1566 shouldThrow();
1567 } catch (NullPointerException success) {
1568 } finally {
1569 joinPool(e);
1570 }
1571 }
1572
1573 /**
1574 * get of element of invokeAll(c) throws exception on failed task
1575 */
1576 public void testInvokeAll4() throws Exception {
1577 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1578 List<Callable<String>> l = new ArrayList<Callable<String>>();
1579 l.add(new NPETask());
1580 List<Future<String>> futures = e.invokeAll(l);
1581 assertEquals(1, futures.size());
1582 try {
1583 futures.get(0).get();
1584 shouldThrow();
1585 } catch (ExecutionException success) {
1586 assertTrue(success.getCause() instanceof NullPointerException);
1587 } finally {
1588 joinPool(e);
1589 }
1590 }
1591
1592 /**
1593 * invokeAll(c) returns results of all completed tasks
1594 */
1595 public void testInvokeAll5() throws Exception {
1596 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1597 try {
1598 List<Callable<String>> l = new ArrayList<Callable<String>>();
1599 l.add(new StringTask());
1600 l.add(new StringTask());
1601 List<Future<String>> futures = e.invokeAll(l);
1602 assertEquals(2, futures.size());
1603 for (Future<String> future : futures)
1604 assertSame(TEST_STRING, future.get());
1605 } finally {
1606 joinPool(e);
1607 }
1608 }
1609
1610 /**
1611 * timed invokeAny(null) throws NPE
1612 */
1613 public void testTimedInvokeAny1() throws Exception {
1614 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1615 try {
1616 e.invokeAny(null, MEDIUM_DELAY_MS, MILLISECONDS);
1617 shouldThrow();
1618 } catch (NullPointerException success) {
1619 } finally {
1620 joinPool(e);
1621 }
1622 }
1623
1624 /**
1625 * timed invokeAny(,,null) throws NPE
1626 */
1627 public void testTimedInvokeAnyNullTimeUnit() throws Exception {
1628 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1629 List<Callable<String>> l = new ArrayList<Callable<String>>();
1630 l.add(new StringTask());
1631 try {
1632 e.invokeAny(l, MEDIUM_DELAY_MS, null);
1633 shouldThrow();
1634 } catch (NullPointerException success) {
1635 } finally {
1636 joinPool(e);
1637 }
1638 }
1639
1640 /**
1641 * timed invokeAny(empty collection) throws IAE
1642 */
1643 public void testTimedInvokeAny2() throws Exception {
1644 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1645 try {
1646 e.invokeAny(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1647 shouldThrow();
1648 } catch (IllegalArgumentException success) {
1649 } finally {
1650 joinPool(e);
1651 }
1652 }
1653
1654 /**
1655 * timed invokeAny(c) throws NPE if c has null elements
1656 */
1657 public void testTimedInvokeAny3() throws Exception {
1658 CountDownLatch latch = new CountDownLatch(1);
1659 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1660 List<Callable<String>> l = new ArrayList<Callable<String>>();
1661 l.add(latchAwaitingStringTask(latch));
1662 l.add(null);
1663 try {
1664 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1665 shouldThrow();
1666 } catch (NullPointerException success) {
1667 } finally {
1668 latch.countDown();
1669 joinPool(e);
1670 }
1671 }
1672
1673 /**
1674 * timed invokeAny(c) throws ExecutionException if no task completes
1675 */
1676 public void testTimedInvokeAny4() throws Exception {
1677 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1678 List<Callable<String>> l = new ArrayList<Callable<String>>();
1679 l.add(new NPETask());
1680 try {
1681 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1682 shouldThrow();
1683 } catch (ExecutionException success) {
1684 assertTrue(success.getCause() instanceof NullPointerException);
1685 } finally {
1686 joinPool(e);
1687 }
1688 }
1689
1690 /**
1691 * timed invokeAny(c) returns result of some task
1692 */
1693 public void testTimedInvokeAny5() throws Exception {
1694 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1695 try {
1696 List<Callable<String>> l = new ArrayList<Callable<String>>();
1697 l.add(new StringTask());
1698 l.add(new StringTask());
1699 String result = e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1700 assertSame(TEST_STRING, result);
1701 } finally {
1702 joinPool(e);
1703 }
1704 }
1705
1706 /**
1707 * timed invokeAll(null) throws NPE
1708 */
1709 public void testTimedInvokeAll1() throws Exception {
1710 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1711 try {
1712 e.invokeAll(null, MEDIUM_DELAY_MS, MILLISECONDS);
1713 shouldThrow();
1714 } catch (NullPointerException success) {
1715 } finally {
1716 joinPool(e);
1717 }
1718 }
1719
1720 /**
1721 * timed invokeAll(,,null) throws NPE
1722 */
1723 public void testTimedInvokeAllNullTimeUnit() throws Exception {
1724 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1725 List<Callable<String>> l = new ArrayList<Callable<String>>();
1726 l.add(new StringTask());
1727 try {
1728 e.invokeAll(l, MEDIUM_DELAY_MS, null);
1729 shouldThrow();
1730 } catch (NullPointerException success) {
1731 } finally {
1732 joinPool(e);
1733 }
1734 }
1735
1736 /**
1737 * timed invokeAll(empty collection) returns empty collection
1738 */
1739 public void testTimedInvokeAll2() throws Exception {
1740 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1741 try {
1742 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1743 assertTrue(r.isEmpty());
1744 } finally {
1745 joinPool(e);
1746 }
1747 }
1748
1749 /**
1750 * timed invokeAll(c) throws NPE if c has null elements
1751 */
1752 public void testTimedInvokeAll3() throws Exception {
1753 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1754 List<Callable<String>> l = new ArrayList<Callable<String>>();
1755 l.add(new StringTask());
1756 l.add(null);
1757 try {
1758 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1759 shouldThrow();
1760 } catch (NullPointerException success) {
1761 } finally {
1762 joinPool(e);
1763 }
1764 }
1765
1766 /**
1767 * get of element of invokeAll(c) throws exception on failed task
1768 */
1769 public void testTimedInvokeAll4() throws Exception {
1770 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1771 List<Callable<String>> l = new ArrayList<Callable<String>>();
1772 l.add(new NPETask());
1773 List<Future<String>> futures =
1774 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1775 assertEquals(1, futures.size());
1776 try {
1777 futures.get(0).get();
1778 shouldThrow();
1779 } catch (ExecutionException success) {
1780 assertTrue(success.getCause() instanceof NullPointerException);
1781 } finally {
1782 joinPool(e);
1783 }
1784 }
1785
1786 /**
1787 * timed invokeAll(c) returns results of all completed tasks
1788 */
1789 public void testTimedInvokeAll5() throws Exception {
1790 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1791 try {
1792 List<Callable<String>> l = new ArrayList<Callable<String>>();
1793 l.add(new StringTask());
1794 l.add(new StringTask());
1795 List<Future<String>> futures =
1796 e.invokeAll(l, LONG_DELAY_MS, MILLISECONDS);
1797 assertEquals(2, futures.size());
1798 for (Future<String> future : futures)
1799 assertSame(TEST_STRING, future.get());
1800 } finally {
1801 joinPool(e);
1802 }
1803 }
1804
1805 /**
1806 * timed invokeAll(c) cancels tasks not completed by timeout
1807 */
1808 public void testTimedInvokeAll6() throws Exception {
1809 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1810 try {
1811 for (long timeout = timeoutMillis();;) {
1812 List<Callable<String>> tasks = new ArrayList<>();
1813 tasks.add(new StringTask("0"));
1814 tasks.add(Executors.callable(new LongPossiblyInterruptedRunnable(), TEST_STRING));
1815 tasks.add(new StringTask("2"));
1816 long startTime = System.nanoTime();
1817 List<Future<String>> futures =
1818 e.invokeAll(tasks, timeout, MILLISECONDS);
1819 assertEquals(tasks.size(), futures.size());
1820 assertTrue(millisElapsedSince(startTime) >= timeout);
1821 for (Future future : futures)
1822 assertTrue(future.isDone());
1823 assertTrue(futures.get(1).isCancelled());
1824 try {
1825 assertEquals("0", futures.get(0).get());
1826 assertEquals("2", futures.get(2).get());
1827 break;
1828 } catch (CancellationException retryWithLongerTimeout) {
1829 timeout *= 2;
1830 if (timeout >= LONG_DELAY_MS / 2)
1831 fail("expected exactly one task to be cancelled");
1832 }
1833 }
1834 } finally {
1835 joinPool(e);
1836 }
1837 }
1838
1839 /**
1840 * Execution continues if there is at least one thread even if
1841 * thread factory fails to create more
1842 */
1843 public void testFailingThreadFactory() throws InterruptedException {
1844 final ExecutorService e =
1845 new CustomTPE(100, 100,
1846 LONG_DELAY_MS, MILLISECONDS,
1847 new LinkedBlockingQueue<Runnable>(),
1848 new FailingThreadFactory());
1849 try {
1850 final int TASKS = 100;
1851 final CountDownLatch done = new CountDownLatch(TASKS);
1852 for (int k = 0; k < TASKS; ++k)
1853 e.execute(new CheckedRunnable() {
1854 public void realRun() {
1855 done.countDown();
1856 }});
1857 assertTrue(done.await(LONG_DELAY_MS, MILLISECONDS));
1858 } finally {
1859 joinPool(e);
1860 }
1861 }
1862
1863 /**
1864 * allowsCoreThreadTimeOut is by default false.
1865 */
1866 public void testAllowsCoreThreadTimeOut() {
1867 ThreadPoolExecutor p = new CustomTPE(2, 2, 1000, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1868 assertFalse(p.allowsCoreThreadTimeOut());
1869 joinPool(p);
1870 }
1871
1872 /**
1873 * allowCoreThreadTimeOut(true) causes idle threads to time out
1874 */
1875 public void testAllowCoreThreadTimeOut_true() throws Exception {
1876 long keepAliveTime = timeoutMillis();
1877 final ThreadPoolExecutor p =
1878 new CustomTPE(2, 10,
1879 keepAliveTime, MILLISECONDS,
1880 new ArrayBlockingQueue<Runnable>(10));
1881 final CountDownLatch threadStarted = new CountDownLatch(1);
1882 try {
1883 p.allowCoreThreadTimeOut(true);
1884 p.execute(new CheckedRunnable() {
1885 public void realRun() {
1886 threadStarted.countDown();
1887 assertEquals(1, p.getPoolSize());
1888 }});
1889 await(threadStarted);
1890 delay(keepAliveTime);
1891 long startTime = System.nanoTime();
1892 while (p.getPoolSize() > 0
1893 && millisElapsedSince(startTime) < LONG_DELAY_MS)
1894 Thread.yield();
1895 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1896 assertEquals(0, p.getPoolSize());
1897 } finally {
1898 joinPool(p);
1899 }
1900 }
1901
1902 /**
1903 * allowCoreThreadTimeOut(false) causes idle threads not to time out
1904 */
1905 public void testAllowCoreThreadTimeOut_false() throws Exception {
1906 long keepAliveTime = timeoutMillis();
1907 final ThreadPoolExecutor p =
1908 new CustomTPE(2, 10,
1909 keepAliveTime, MILLISECONDS,
1910 new ArrayBlockingQueue<Runnable>(10));
1911 final CountDownLatch threadStarted = new CountDownLatch(1);
1912 try {
1913 p.allowCoreThreadTimeOut(false);
1914 p.execute(new CheckedRunnable() {
1915 public void realRun() throws InterruptedException {
1916 threadStarted.countDown();
1917 assertTrue(p.getPoolSize() >= 1);
1918 }});
1919 delay(2 * keepAliveTime);
1920 assertTrue(p.getPoolSize() >= 1);
1921 } finally {
1922 joinPool(p);
1923 }
1924 }
1925
1926 /**
1927 * get(cancelled task) throws CancellationException
1928 * (in part, a test of CustomTPE itself)
1929 */
1930 public void testGet_cancelled() throws Exception {
1931 final ExecutorService e =
1932 new CustomTPE(1, 1,
1933 LONG_DELAY_MS, MILLISECONDS,
1934 new LinkedBlockingQueue<Runnable>());
1935 try {
1936 final CountDownLatch blockerStarted = new CountDownLatch(1);
1937 final CountDownLatch done = new CountDownLatch(1);
1938 final List<Future<?>> futures = new ArrayList<>();
1939 for (int i = 0; i < 2; i++) {
1940 Runnable r = new CheckedRunnable() { public void realRun()
1941 throws Throwable {
1942 blockerStarted.countDown();
1943 assertTrue(done.await(2 * LONG_DELAY_MS, MILLISECONDS));
1944 }};
1945 futures.add(e.submit(r));
1946 }
1947 assertTrue(blockerStarted.await(LONG_DELAY_MS, MILLISECONDS));
1948 for (Future<?> future : futures) future.cancel(false);
1949 for (Future<?> future : futures) {
1950 try {
1951 future.get();
1952 shouldThrow();
1953 } catch (CancellationException success) {}
1954 try {
1955 future.get(LONG_DELAY_MS, MILLISECONDS);
1956 shouldThrow();
1957 } catch (CancellationException success) {}
1958 assertTrue(future.isCancelled());
1959 assertTrue(future.isDone());
1960 }
1961 done.countDown();
1962 } finally {
1963 joinPool(e);
1964 }
1965 }
1966
1967 }