ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/ThreadPoolExecutorSubclassTest.java
Revision: 1.50
Committed: Sun Oct 4 00:40:33 2015 UTC (8 years, 7 months ago) by jsr166
Branch: MAIN
Changes since 1.49: +6 -7 lines
Log Message:
rejigger pool closing infrastructure

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include Andrew Wright, Jeffrey Hayes,
6 * Pat Fisher, Mike Judd.
7 */
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.SECONDS;
11
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.concurrent.ArrayBlockingQueue;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.CancellationException;
18 import java.util.concurrent.CountDownLatch;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.FutureTask;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.RejectedExecutionHandler;
27 import java.util.concurrent.RunnableFuture;
28 import java.util.concurrent.SynchronousQueue;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.TimeoutException;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.locks.Condition;
35 import java.util.concurrent.locks.ReentrantLock;
36
37 import junit.framework.Test;
38 import junit.framework.TestSuite;
39
40 public class ThreadPoolExecutorSubclassTest extends JSR166TestCase {
41 public static void main(String[] args) {
42 main(suite(), args);
43 }
44 public static Test suite() {
45 return new TestSuite(ThreadPoolExecutorSubclassTest.class);
46 }
47
48 static class CustomTask<V> implements RunnableFuture<V> {
49 final Callable<V> callable;
50 final ReentrantLock lock = new ReentrantLock();
51 final Condition cond = lock.newCondition();
52 boolean done;
53 boolean cancelled;
54 V result;
55 Thread thread;
56 Exception exception;
57 CustomTask(Callable<V> c) {
58 if (c == null) throw new NullPointerException();
59 callable = c;
60 }
61 CustomTask(final Runnable r, final V res) {
62 if (r == null) throw new NullPointerException();
63 callable = new Callable<V>() {
64 public V call() throws Exception { r.run(); return res; }};
65 }
66 public boolean isDone() {
67 lock.lock(); try { return done; } finally { lock.unlock() ; }
68 }
69 public boolean isCancelled() {
70 lock.lock(); try { return cancelled; } finally { lock.unlock() ; }
71 }
72 public boolean cancel(boolean mayInterrupt) {
73 lock.lock();
74 try {
75 if (!done) {
76 cancelled = true;
77 done = true;
78 if (mayInterrupt && thread != null)
79 thread.interrupt();
80 return true;
81 }
82 return false;
83 }
84 finally { lock.unlock() ; }
85 }
86 public void run() {
87 lock.lock();
88 try {
89 if (done)
90 return;
91 thread = Thread.currentThread();
92 }
93 finally { lock.unlock() ; }
94 V v = null;
95 Exception e = null;
96 try {
97 v = callable.call();
98 }
99 catch (Exception ex) {
100 e = ex;
101 }
102 lock.lock();
103 try {
104 if (!done) {
105 result = v;
106 exception = e;
107 done = true;
108 thread = null;
109 cond.signalAll();
110 }
111 }
112 finally { lock.unlock(); }
113 }
114 public V get() throws InterruptedException, ExecutionException {
115 lock.lock();
116 try {
117 while (!done)
118 cond.await();
119 if (cancelled)
120 throw new CancellationException();
121 if (exception != null)
122 throw new ExecutionException(exception);
123 return result;
124 }
125 finally { lock.unlock(); }
126 }
127 public V get(long timeout, TimeUnit unit)
128 throws InterruptedException, ExecutionException, TimeoutException {
129 long nanos = unit.toNanos(timeout);
130 lock.lock();
131 try {
132 while (!done) {
133 if (nanos <= 0L)
134 throw new TimeoutException();
135 nanos = cond.awaitNanos(nanos);
136 }
137 if (cancelled)
138 throw new CancellationException();
139 if (exception != null)
140 throw new ExecutionException(exception);
141 return result;
142 }
143 finally { lock.unlock(); }
144 }
145 }
146
147 static class CustomTPE extends ThreadPoolExecutor {
148 protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
149 return new CustomTask<V>(c);
150 }
151 protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
152 return new CustomTask<V>(r, v);
153 }
154
155 CustomTPE(int corePoolSize,
156 int maximumPoolSize,
157 long keepAliveTime,
158 TimeUnit unit,
159 BlockingQueue<Runnable> workQueue) {
160 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
161 workQueue);
162 }
163 CustomTPE(int corePoolSize,
164 int maximumPoolSize,
165 long keepAliveTime,
166 TimeUnit unit,
167 BlockingQueue<Runnable> workQueue,
168 ThreadFactory threadFactory) {
169 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
170 threadFactory);
171 }
172
173 CustomTPE(int corePoolSize,
174 int maximumPoolSize,
175 long keepAliveTime,
176 TimeUnit unit,
177 BlockingQueue<Runnable> workQueue,
178 RejectedExecutionHandler handler) {
179 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
180 handler);
181 }
182 CustomTPE(int corePoolSize,
183 int maximumPoolSize,
184 long keepAliveTime,
185 TimeUnit unit,
186 BlockingQueue<Runnable> workQueue,
187 ThreadFactory threadFactory,
188 RejectedExecutionHandler handler) {
189 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
190 workQueue, threadFactory, handler);
191 }
192
193 final CountDownLatch beforeCalled = new CountDownLatch(1);
194 final CountDownLatch afterCalled = new CountDownLatch(1);
195 final CountDownLatch terminatedCalled = new CountDownLatch(1);
196
197 public CustomTPE() {
198 super(1, 1, LONG_DELAY_MS, MILLISECONDS, new SynchronousQueue<Runnable>());
199 }
200 protected void beforeExecute(Thread t, Runnable r) {
201 beforeCalled.countDown();
202 }
203 protected void afterExecute(Runnable r, Throwable t) {
204 afterCalled.countDown();
205 }
206 protected void terminated() {
207 terminatedCalled.countDown();
208 }
209
210 public boolean beforeCalled() {
211 return beforeCalled.getCount() == 0;
212 }
213 public boolean afterCalled() {
214 return afterCalled.getCount() == 0;
215 }
216 public boolean terminatedCalled() {
217 return terminatedCalled.getCount() == 0;
218 }
219 }
220
221 static class FailingThreadFactory implements ThreadFactory {
222 int calls = 0;
223 public Thread newThread(Runnable r) {
224 if (++calls > 1) return null;
225 return new Thread(r);
226 }
227 }
228
229 /**
230 * execute successfully executes a runnable
231 */
232 public void testExecute() throws InterruptedException {
233 final ThreadPoolExecutor p =
234 new CustomTPE(1, 1,
235 2 * LONG_DELAY_MS, MILLISECONDS,
236 new ArrayBlockingQueue<Runnable>(10));
237 try (PoolCleaner<ThreadPoolExecutor> cleaner = cleaner(p)) {
238 final CountDownLatch done = new CountDownLatch(1);
239 final Runnable task = new CheckedRunnable() {
240 public void realRun() { done.countDown(); }};
241 p.execute(task);
242 assertTrue(done.await(LONG_DELAY_MS, MILLISECONDS));
243 }
244 }
245
246 /**
247 * getActiveCount increases but doesn't overestimate, when a
248 * thread becomes active
249 */
250 public void testGetActiveCount() throws InterruptedException {
251 final ThreadPoolExecutor p =
252 new CustomTPE(2, 2,
253 LONG_DELAY_MS, MILLISECONDS,
254 new ArrayBlockingQueue<Runnable>(10));
255 final CountDownLatch threadStarted = new CountDownLatch(1);
256 final CountDownLatch done = new CountDownLatch(1);
257 try {
258 assertEquals(0, p.getActiveCount());
259 p.execute(new CheckedRunnable() {
260 public void realRun() throws InterruptedException {
261 threadStarted.countDown();
262 assertEquals(1, p.getActiveCount());
263 done.await();
264 }});
265 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
266 assertEquals(1, p.getActiveCount());
267 } finally {
268 done.countDown();
269 joinPool(p);
270 }
271 }
272
273 /**
274 * prestartCoreThread starts a thread if under corePoolSize, else doesn't
275 */
276 public void testPrestartCoreThread() {
277 ThreadPoolExecutor p = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
278 assertEquals(0, p.getPoolSize());
279 assertTrue(p.prestartCoreThread());
280 assertEquals(1, p.getPoolSize());
281 assertTrue(p.prestartCoreThread());
282 assertEquals(2, p.getPoolSize());
283 assertFalse(p.prestartCoreThread());
284 assertEquals(2, p.getPoolSize());
285 joinPool(p);
286 }
287
288 /**
289 * prestartAllCoreThreads starts all corePoolSize threads
290 */
291 public void testPrestartAllCoreThreads() {
292 ThreadPoolExecutor p = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
293 assertEquals(0, p.getPoolSize());
294 p.prestartAllCoreThreads();
295 assertEquals(2, p.getPoolSize());
296 p.prestartAllCoreThreads();
297 assertEquals(2, p.getPoolSize());
298 joinPool(p);
299 }
300
301 /**
302 * getCompletedTaskCount increases, but doesn't overestimate,
303 * when tasks complete
304 */
305 public void testGetCompletedTaskCount() throws InterruptedException {
306 final ThreadPoolExecutor p =
307 new CustomTPE(2, 2,
308 LONG_DELAY_MS, MILLISECONDS,
309 new ArrayBlockingQueue<Runnable>(10));
310 final CountDownLatch threadStarted = new CountDownLatch(1);
311 final CountDownLatch threadProceed = new CountDownLatch(1);
312 final CountDownLatch threadDone = new CountDownLatch(1);
313 try {
314 assertEquals(0, p.getCompletedTaskCount());
315 p.execute(new CheckedRunnable() {
316 public void realRun() throws InterruptedException {
317 threadStarted.countDown();
318 assertEquals(0, p.getCompletedTaskCount());
319 threadProceed.await();
320 threadDone.countDown();
321 }});
322 await(threadStarted);
323 assertEquals(0, p.getCompletedTaskCount());
324 threadProceed.countDown();
325 threadDone.await();
326 long startTime = System.nanoTime();
327 while (p.getCompletedTaskCount() != 1) {
328 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
329 fail("timed out");
330 Thread.yield();
331 }
332 } finally {
333 joinPool(p);
334 }
335 }
336
337 /**
338 * getCorePoolSize returns size given in constructor if not otherwise set
339 */
340 public void testGetCorePoolSize() {
341 ThreadPoolExecutor p = new CustomTPE(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
342 assertEquals(1, p.getCorePoolSize());
343 joinPool(p);
344 }
345
346 /**
347 * getKeepAliveTime returns value given in constructor if not otherwise set
348 */
349 public void testGetKeepAliveTime() {
350 ThreadPoolExecutor p = new CustomTPE(2, 2, 1000, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
351 assertEquals(1, p.getKeepAliveTime(SECONDS));
352 joinPool(p);
353 }
354
355 /**
356 * getThreadFactory returns factory in constructor if not set
357 */
358 public void testGetThreadFactory() {
359 ThreadFactory tf = new SimpleThreadFactory();
360 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), tf, new NoOpREHandler());
361 assertSame(tf, p.getThreadFactory());
362 joinPool(p);
363 }
364
365 /**
366 * setThreadFactory sets the thread factory returned by getThreadFactory
367 */
368 public void testSetThreadFactory() {
369 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
370 ThreadFactory tf = new SimpleThreadFactory();
371 p.setThreadFactory(tf);
372 assertSame(tf, p.getThreadFactory());
373 joinPool(p);
374 }
375
376 /**
377 * setThreadFactory(null) throws NPE
378 */
379 public void testSetThreadFactoryNull() {
380 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
381 try {
382 p.setThreadFactory(null);
383 shouldThrow();
384 } catch (NullPointerException success) {
385 } finally {
386 joinPool(p);
387 }
388 }
389
390 /**
391 * getRejectedExecutionHandler returns handler in constructor if not set
392 */
393 public void testGetRejectedExecutionHandler() {
394 RejectedExecutionHandler h = new NoOpREHandler();
395 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), h);
396 assertSame(h, p.getRejectedExecutionHandler());
397 joinPool(p);
398 }
399
400 /**
401 * setRejectedExecutionHandler sets the handler returned by
402 * getRejectedExecutionHandler
403 */
404 public void testSetRejectedExecutionHandler() {
405 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
406 RejectedExecutionHandler h = new NoOpREHandler();
407 p.setRejectedExecutionHandler(h);
408 assertSame(h, p.getRejectedExecutionHandler());
409 joinPool(p);
410 }
411
412 /**
413 * setRejectedExecutionHandler(null) throws NPE
414 */
415 public void testSetRejectedExecutionHandlerNull() {
416 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
417 try {
418 p.setRejectedExecutionHandler(null);
419 shouldThrow();
420 } catch (NullPointerException success) {
421 } finally {
422 joinPool(p);
423 }
424 }
425
426 /**
427 * getLargestPoolSize increases, but doesn't overestimate, when
428 * multiple threads active
429 */
430 public void testGetLargestPoolSize() throws InterruptedException {
431 final int THREADS = 3;
432 final ThreadPoolExecutor p =
433 new CustomTPE(THREADS, THREADS,
434 LONG_DELAY_MS, MILLISECONDS,
435 new ArrayBlockingQueue<Runnable>(10));
436 final CountDownLatch threadsStarted = new CountDownLatch(THREADS);
437 final CountDownLatch done = new CountDownLatch(1);
438 try {
439 assertEquals(0, p.getLargestPoolSize());
440 for (int i = 0; i < THREADS; i++)
441 p.execute(new CheckedRunnable() {
442 public void realRun() throws InterruptedException {
443 threadsStarted.countDown();
444 done.await();
445 assertEquals(THREADS, p.getLargestPoolSize());
446 }});
447 assertTrue(threadsStarted.await(SMALL_DELAY_MS, MILLISECONDS));
448 assertEquals(THREADS, p.getLargestPoolSize());
449 } finally {
450 done.countDown();
451 joinPool(p);
452 assertEquals(THREADS, p.getLargestPoolSize());
453 }
454 }
455
456 /**
457 * getMaximumPoolSize returns value given in constructor if not
458 * otherwise set
459 */
460 public void testGetMaximumPoolSize() {
461 ThreadPoolExecutor p = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
462 assertEquals(2, p.getMaximumPoolSize());
463 joinPool(p);
464 }
465
466 /**
467 * getPoolSize increases, but doesn't overestimate, when threads
468 * become active
469 */
470 public void testGetPoolSize() throws InterruptedException {
471 final ThreadPoolExecutor p =
472 new CustomTPE(1, 1,
473 LONG_DELAY_MS, MILLISECONDS,
474 new ArrayBlockingQueue<Runnable>(10));
475 final CountDownLatch threadStarted = new CountDownLatch(1);
476 final CountDownLatch done = new CountDownLatch(1);
477 try {
478 assertEquals(0, p.getPoolSize());
479 p.execute(new CheckedRunnable() {
480 public void realRun() throws InterruptedException {
481 threadStarted.countDown();
482 assertEquals(1, p.getPoolSize());
483 done.await();
484 }});
485 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
486 assertEquals(1, p.getPoolSize());
487 } finally {
488 done.countDown();
489 joinPool(p);
490 }
491 }
492
493 /**
494 * getTaskCount increases, but doesn't overestimate, when tasks submitted
495 */
496 public void testGetTaskCount() throws InterruptedException {
497 final ThreadPoolExecutor p =
498 new CustomTPE(1, 1,
499 LONG_DELAY_MS, MILLISECONDS,
500 new ArrayBlockingQueue<Runnable>(10));
501 final CountDownLatch threadStarted = new CountDownLatch(1);
502 final CountDownLatch done = new CountDownLatch(1);
503 try {
504 assertEquals(0, p.getTaskCount());
505 p.execute(new CheckedRunnable() {
506 public void realRun() throws InterruptedException {
507 threadStarted.countDown();
508 assertEquals(1, p.getTaskCount());
509 done.await();
510 }});
511 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
512 assertEquals(1, p.getTaskCount());
513 } finally {
514 done.countDown();
515 joinPool(p);
516 }
517 }
518
519 /**
520 * isShutdown is false before shutdown, true after
521 */
522 public void testIsShutdown() {
523
524 ThreadPoolExecutor p = new CustomTPE(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
525 assertFalse(p.isShutdown());
526 try { p.shutdown(); } catch (SecurityException ok) { return; }
527 assertTrue(p.isShutdown());
528 joinPool(p);
529 }
530
531 /**
532 * isTerminated is false before termination, true after
533 */
534 public void testIsTerminated() throws InterruptedException {
535 final ThreadPoolExecutor p =
536 new CustomTPE(1, 1,
537 LONG_DELAY_MS, MILLISECONDS,
538 new ArrayBlockingQueue<Runnable>(10));
539 final CountDownLatch threadStarted = new CountDownLatch(1);
540 final CountDownLatch done = new CountDownLatch(1);
541 try {
542 assertFalse(p.isTerminating());
543 p.execute(new CheckedRunnable() {
544 public void realRun() throws InterruptedException {
545 assertFalse(p.isTerminating());
546 threadStarted.countDown();
547 done.await();
548 }});
549 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
550 assertFalse(p.isTerminating());
551 done.countDown();
552 } finally {
553 try { p.shutdown(); } catch (SecurityException ok) { return; }
554 }
555 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
556 assertTrue(p.isTerminated());
557 assertFalse(p.isTerminating());
558 }
559
560 /**
561 * isTerminating is not true when running or when terminated
562 */
563 public void testIsTerminating() throws InterruptedException {
564 final ThreadPoolExecutor p =
565 new CustomTPE(1, 1,
566 LONG_DELAY_MS, MILLISECONDS,
567 new ArrayBlockingQueue<Runnable>(10));
568 final CountDownLatch threadStarted = new CountDownLatch(1);
569 final CountDownLatch done = new CountDownLatch(1);
570 try {
571 assertFalse(p.isTerminating());
572 p.execute(new CheckedRunnable() {
573 public void realRun() throws InterruptedException {
574 assertFalse(p.isTerminating());
575 threadStarted.countDown();
576 done.await();
577 }});
578 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
579 assertFalse(p.isTerminating());
580 done.countDown();
581 } finally {
582 try { p.shutdown(); } catch (SecurityException ok) { return; }
583 }
584 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
585 assertTrue(p.isTerminated());
586 assertFalse(p.isTerminating());
587 }
588
589 /**
590 * getQueue returns the work queue, which contains queued tasks
591 */
592 public void testGetQueue() throws InterruptedException {
593 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
594 final ThreadPoolExecutor p =
595 new CustomTPE(1, 1,
596 LONG_DELAY_MS, MILLISECONDS,
597 q);
598 final CountDownLatch threadStarted = new CountDownLatch(1);
599 final CountDownLatch done = new CountDownLatch(1);
600 try {
601 FutureTask[] tasks = new FutureTask[5];
602 for (int i = 0; i < tasks.length; i++) {
603 Callable task = new CheckedCallable<Boolean>() {
604 public Boolean realCall() throws InterruptedException {
605 threadStarted.countDown();
606 assertSame(q, p.getQueue());
607 done.await();
608 return Boolean.TRUE;
609 }};
610 tasks[i] = new FutureTask(task);
611 p.execute(tasks[i]);
612 }
613 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
614 assertSame(q, p.getQueue());
615 assertFalse(q.contains(tasks[0]));
616 assertTrue(q.contains(tasks[tasks.length - 1]));
617 assertEquals(tasks.length - 1, q.size());
618 } finally {
619 done.countDown();
620 joinPool(p);
621 }
622 }
623
624 /**
625 * remove(task) removes queued task, and fails to remove active task
626 */
627 public void testRemove() throws InterruptedException {
628 BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
629 final ThreadPoolExecutor p =
630 new CustomTPE(1, 1,
631 LONG_DELAY_MS, MILLISECONDS,
632 q);
633 Runnable[] tasks = new Runnable[6];
634 final CountDownLatch threadStarted = new CountDownLatch(1);
635 final CountDownLatch done = new CountDownLatch(1);
636 try {
637 for (int i = 0; i < tasks.length; i++) {
638 tasks[i] = new CheckedRunnable() {
639 public void realRun() throws InterruptedException {
640 threadStarted.countDown();
641 done.await();
642 }};
643 p.execute(tasks[i]);
644 }
645 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
646 assertFalse(p.remove(tasks[0]));
647 assertTrue(q.contains(tasks[4]));
648 assertTrue(q.contains(tasks[3]));
649 assertTrue(p.remove(tasks[4]));
650 assertFalse(p.remove(tasks[4]));
651 assertFalse(q.contains(tasks[4]));
652 assertTrue(q.contains(tasks[3]));
653 assertTrue(p.remove(tasks[3]));
654 assertFalse(q.contains(tasks[3]));
655 } finally {
656 done.countDown();
657 joinPool(p);
658 }
659 }
660
661 /**
662 * purge removes cancelled tasks from the queue
663 */
664 public void testPurge() throws InterruptedException {
665 final CountDownLatch threadStarted = new CountDownLatch(1);
666 final CountDownLatch done = new CountDownLatch(1);
667 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
668 final ThreadPoolExecutor p =
669 new CustomTPE(1, 1,
670 LONG_DELAY_MS, MILLISECONDS,
671 q);
672 FutureTask[] tasks = new FutureTask[5];
673 try {
674 for (int i = 0; i < tasks.length; i++) {
675 Callable task = new CheckedCallable<Boolean>() {
676 public Boolean realCall() throws InterruptedException {
677 threadStarted.countDown();
678 done.await();
679 return Boolean.TRUE;
680 }};
681 tasks[i] = new FutureTask(task);
682 p.execute(tasks[i]);
683 }
684 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
685 assertEquals(tasks.length, p.getTaskCount());
686 assertEquals(tasks.length - 1, q.size());
687 assertEquals(1L, p.getActiveCount());
688 assertEquals(0L, p.getCompletedTaskCount());
689 tasks[4].cancel(true);
690 tasks[3].cancel(false);
691 p.purge();
692 assertEquals(tasks.length - 3, q.size());
693 assertEquals(tasks.length - 2, p.getTaskCount());
694 p.purge(); // Nothing to do
695 assertEquals(tasks.length - 3, q.size());
696 assertEquals(tasks.length - 2, p.getTaskCount());
697 } finally {
698 done.countDown();
699 joinPool(p);
700 }
701 }
702
703 /**
704 * shutdownNow returns a list containing tasks that were not run,
705 * and those tasks are drained from the queue
706 */
707 public void testShutdownNow() throws InterruptedException {
708 final int poolSize = 2;
709 final int count = 5;
710 final AtomicInteger ran = new AtomicInteger(0);
711 ThreadPoolExecutor p =
712 new CustomTPE(poolSize, poolSize, LONG_DELAY_MS, MILLISECONDS,
713 new ArrayBlockingQueue<Runnable>(10));
714 CountDownLatch threadsStarted = new CountDownLatch(poolSize);
715 Runnable waiter = new CheckedRunnable() { public void realRun() {
716 threadsStarted.countDown();
717 try {
718 MILLISECONDS.sleep(2 * LONG_DELAY_MS);
719 } catch (InterruptedException success) {}
720 ran.getAndIncrement();
721 }};
722 for (int i = 0; i < count; i++)
723 p.execute(waiter);
724 assertTrue(threadsStarted.await(LONG_DELAY_MS, MILLISECONDS));
725 assertEquals(poolSize, p.getActiveCount());
726 assertEquals(0, p.getCompletedTaskCount());
727 final List<Runnable> queuedTasks;
728 try {
729 queuedTasks = p.shutdownNow();
730 } catch (SecurityException ok) {
731 return; // Allowed in case test doesn't have privs
732 }
733 assertTrue(p.isShutdown());
734 assertTrue(p.getQueue().isEmpty());
735 assertEquals(count - poolSize, queuedTasks.size());
736 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
737 assertTrue(p.isTerminated());
738 assertEquals(poolSize, ran.get());
739 assertEquals(poolSize, p.getCompletedTaskCount());
740 }
741
742 // Exception Tests
743
744 /**
745 * Constructor throws if corePoolSize argument is less than zero
746 */
747 public void testConstructor1() {
748 try {
749 new CustomTPE(-1, 1, 1L, SECONDS,
750 new ArrayBlockingQueue<Runnable>(10));
751 shouldThrow();
752 } catch (IllegalArgumentException success) {}
753 }
754
755 /**
756 * Constructor throws if maximumPoolSize is less than zero
757 */
758 public void testConstructor2() {
759 try {
760 new CustomTPE(1, -1, 1L, SECONDS,
761 new ArrayBlockingQueue<Runnable>(10));
762 shouldThrow();
763 } catch (IllegalArgumentException success) {}
764 }
765
766 /**
767 * Constructor throws if maximumPoolSize is equal to zero
768 */
769 public void testConstructor3() {
770 try {
771 new CustomTPE(1, 0, 1L, SECONDS,
772 new ArrayBlockingQueue<Runnable>(10));
773 shouldThrow();
774 } catch (IllegalArgumentException success) {}
775 }
776
777 /**
778 * Constructor throws if keepAliveTime is less than zero
779 */
780 public void testConstructor4() {
781 try {
782 new CustomTPE(1, 2, -1L, SECONDS,
783 new ArrayBlockingQueue<Runnable>(10));
784 shouldThrow();
785 } catch (IllegalArgumentException success) {}
786 }
787
788 /**
789 * Constructor throws if corePoolSize is greater than the maximumPoolSize
790 */
791 public void testConstructor5() {
792 try {
793 new CustomTPE(2, 1, 1L, SECONDS,
794 new ArrayBlockingQueue<Runnable>(10));
795 shouldThrow();
796 } catch (IllegalArgumentException success) {}
797 }
798
799 /**
800 * Constructor throws if workQueue is set to null
801 */
802 public void testConstructorNullPointerException() {
803 try {
804 new CustomTPE(1, 2, 1L, SECONDS, null);
805 shouldThrow();
806 } catch (NullPointerException success) {}
807 }
808
809 /**
810 * Constructor throws if corePoolSize argument is less than zero
811 */
812 public void testConstructor6() {
813 try {
814 new CustomTPE(-1, 1, 1L, SECONDS,
815 new ArrayBlockingQueue<Runnable>(10),
816 new SimpleThreadFactory());
817 shouldThrow();
818 } catch (IllegalArgumentException success) {}
819 }
820
821 /**
822 * Constructor throws if maximumPoolSize is less than zero
823 */
824 public void testConstructor7() {
825 try {
826 new CustomTPE(1,-1, 1L, SECONDS,
827 new ArrayBlockingQueue<Runnable>(10),
828 new SimpleThreadFactory());
829 shouldThrow();
830 } catch (IllegalArgumentException success) {}
831 }
832
833 /**
834 * Constructor throws if maximumPoolSize is equal to zero
835 */
836 public void testConstructor8() {
837 try {
838 new CustomTPE(1, 0, 1L, SECONDS,
839 new ArrayBlockingQueue<Runnable>(10),
840 new SimpleThreadFactory());
841 shouldThrow();
842 } catch (IllegalArgumentException success) {}
843 }
844
845 /**
846 * Constructor throws if keepAliveTime is less than zero
847 */
848 public void testConstructor9() {
849 try {
850 new CustomTPE(1, 2, -1L, SECONDS,
851 new ArrayBlockingQueue<Runnable>(10),
852 new SimpleThreadFactory());
853 shouldThrow();
854 } catch (IllegalArgumentException success) {}
855 }
856
857 /**
858 * Constructor throws if corePoolSize is greater than the maximumPoolSize
859 */
860 public void testConstructor10() {
861 try {
862 new CustomTPE(2, 1, 1L, SECONDS,
863 new ArrayBlockingQueue<Runnable>(10),
864 new SimpleThreadFactory());
865 shouldThrow();
866 } catch (IllegalArgumentException success) {}
867 }
868
869 /**
870 * Constructor throws if workQueue is set to null
871 */
872 public void testConstructorNullPointerException2() {
873 try {
874 new CustomTPE(1, 2, 1L, SECONDS, null, new SimpleThreadFactory());
875 shouldThrow();
876 } catch (NullPointerException success) {}
877 }
878
879 /**
880 * Constructor throws if threadFactory is set to null
881 */
882 public void testConstructorNullPointerException3() {
883 try {
884 new CustomTPE(1, 2, 1L, SECONDS,
885 new ArrayBlockingQueue<Runnable>(10),
886 (ThreadFactory) null);
887 shouldThrow();
888 } catch (NullPointerException success) {}
889 }
890
891 /**
892 * Constructor throws if corePoolSize argument is less than zero
893 */
894 public void testConstructor11() {
895 try {
896 new CustomTPE(-1, 1, 1L, SECONDS,
897 new ArrayBlockingQueue<Runnable>(10),
898 new NoOpREHandler());
899 shouldThrow();
900 } catch (IllegalArgumentException success) {}
901 }
902
903 /**
904 * Constructor throws if maximumPoolSize is less than zero
905 */
906 public void testConstructor12() {
907 try {
908 new CustomTPE(1, -1, 1L, SECONDS,
909 new ArrayBlockingQueue<Runnable>(10),
910 new NoOpREHandler());
911 shouldThrow();
912 } catch (IllegalArgumentException success) {}
913 }
914
915 /**
916 * Constructor throws if maximumPoolSize is equal to zero
917 */
918 public void testConstructor13() {
919 try {
920 new CustomTPE(1, 0, 1L, SECONDS,
921 new ArrayBlockingQueue<Runnable>(10),
922 new NoOpREHandler());
923 shouldThrow();
924 } catch (IllegalArgumentException success) {}
925 }
926
927 /**
928 * Constructor throws if keepAliveTime is less than zero
929 */
930 public void testConstructor14() {
931 try {
932 new CustomTPE(1, 2, -1L, SECONDS,
933 new ArrayBlockingQueue<Runnable>(10),
934 new NoOpREHandler());
935 shouldThrow();
936 } catch (IllegalArgumentException success) {}
937 }
938
939 /**
940 * Constructor throws if corePoolSize is greater than the maximumPoolSize
941 */
942 public void testConstructor15() {
943 try {
944 new CustomTPE(2, 1, 1L, SECONDS,
945 new ArrayBlockingQueue<Runnable>(10),
946 new NoOpREHandler());
947 shouldThrow();
948 } catch (IllegalArgumentException success) {}
949 }
950
951 /**
952 * Constructor throws if workQueue is set to null
953 */
954 public void testConstructorNullPointerException4() {
955 try {
956 new CustomTPE(1, 2, 1L, SECONDS,
957 null,
958 new NoOpREHandler());
959 shouldThrow();
960 } catch (NullPointerException success) {}
961 }
962
963 /**
964 * Constructor throws if handler is set to null
965 */
966 public void testConstructorNullPointerException5() {
967 try {
968 new CustomTPE(1, 2, 1L, SECONDS,
969 new ArrayBlockingQueue<Runnable>(10),
970 (RejectedExecutionHandler) null);
971 shouldThrow();
972 } catch (NullPointerException success) {}
973 }
974
975 /**
976 * Constructor throws if corePoolSize argument is less than zero
977 */
978 public void testConstructor16() {
979 try {
980 new CustomTPE(-1, 1, 1L, SECONDS,
981 new ArrayBlockingQueue<Runnable>(10),
982 new SimpleThreadFactory(),
983 new NoOpREHandler());
984 shouldThrow();
985 } catch (IllegalArgumentException success) {}
986 }
987
988 /**
989 * Constructor throws if maximumPoolSize is less than zero
990 */
991 public void testConstructor17() {
992 try {
993 new CustomTPE(1, -1, 1L, SECONDS,
994 new ArrayBlockingQueue<Runnable>(10),
995 new SimpleThreadFactory(),
996 new NoOpREHandler());
997 shouldThrow();
998 } catch (IllegalArgumentException success) {}
999 }
1000
1001 /**
1002 * Constructor throws if maximumPoolSize is equal to zero
1003 */
1004 public void testConstructor18() {
1005 try {
1006 new CustomTPE(1, 0, 1L, SECONDS,
1007 new ArrayBlockingQueue<Runnable>(10),
1008 new SimpleThreadFactory(),
1009 new NoOpREHandler());
1010 shouldThrow();
1011 } catch (IllegalArgumentException success) {}
1012 }
1013
1014 /**
1015 * Constructor throws if keepAliveTime is less than zero
1016 */
1017 public void testConstructor19() {
1018 try {
1019 new CustomTPE(1, 2, -1L, SECONDS,
1020 new ArrayBlockingQueue<Runnable>(10),
1021 new SimpleThreadFactory(),
1022 new NoOpREHandler());
1023 shouldThrow();
1024 } catch (IllegalArgumentException success) {}
1025 }
1026
1027 /**
1028 * Constructor throws if corePoolSize is greater than the maximumPoolSize
1029 */
1030 public void testConstructor20() {
1031 try {
1032 new CustomTPE(2, 1, 1L, SECONDS,
1033 new ArrayBlockingQueue<Runnable>(10),
1034 new SimpleThreadFactory(),
1035 new NoOpREHandler());
1036 shouldThrow();
1037 } catch (IllegalArgumentException success) {}
1038 }
1039
1040 /**
1041 * Constructor throws if workQueue is null
1042 */
1043 public void testConstructorNullPointerException6() {
1044 try {
1045 new CustomTPE(1, 2, 1L, SECONDS,
1046 null,
1047 new SimpleThreadFactory(),
1048 new NoOpREHandler());
1049 shouldThrow();
1050 } catch (NullPointerException success) {}
1051 }
1052
1053 /**
1054 * Constructor throws if handler is null
1055 */
1056 public void testConstructorNullPointerException7() {
1057 try {
1058 new CustomTPE(1, 2, 1L, SECONDS,
1059 new ArrayBlockingQueue<Runnable>(10),
1060 new SimpleThreadFactory(),
1061 (RejectedExecutionHandler) null);
1062 shouldThrow();
1063 } catch (NullPointerException success) {}
1064 }
1065
1066 /**
1067 * Constructor throws if ThreadFactory is null
1068 */
1069 public void testConstructorNullPointerException8() {
1070 try {
1071 new CustomTPE(1, 2, 1L, SECONDS,
1072 new ArrayBlockingQueue<Runnable>(10),
1073 (ThreadFactory) null,
1074 new NoOpREHandler());
1075 shouldThrow();
1076 } catch (NullPointerException success) {}
1077 }
1078
1079 /**
1080 * execute throws RejectedExecutionException if saturated.
1081 */
1082 public void testSaturatedExecute() {
1083 ThreadPoolExecutor p =
1084 new CustomTPE(1, 1,
1085 LONG_DELAY_MS, MILLISECONDS,
1086 new ArrayBlockingQueue<Runnable>(1));
1087 final CountDownLatch done = new CountDownLatch(1);
1088 try {
1089 Runnable task = new CheckedRunnable() {
1090 public void realRun() throws InterruptedException {
1091 done.await();
1092 }};
1093 for (int i = 0; i < 2; ++i)
1094 p.execute(task);
1095 for (int i = 0; i < 2; ++i) {
1096 try {
1097 p.execute(task);
1098 shouldThrow();
1099 } catch (RejectedExecutionException success) {}
1100 assertTrue(p.getTaskCount() <= 2);
1101 }
1102 } finally {
1103 done.countDown();
1104 joinPool(p);
1105 }
1106 }
1107
1108 /**
1109 * executor using CallerRunsPolicy runs task if saturated.
1110 */
1111 public void testSaturatedExecute2() {
1112 RejectedExecutionHandler h = new CustomTPE.CallerRunsPolicy();
1113 ThreadPoolExecutor p = new CustomTPE(1, 1,
1114 LONG_DELAY_MS, MILLISECONDS,
1115 new ArrayBlockingQueue<Runnable>(1),
1116 h);
1117 try {
1118 TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5];
1119 for (int i = 0; i < tasks.length; ++i)
1120 tasks[i] = new TrackedNoOpRunnable();
1121 TrackedLongRunnable mr = new TrackedLongRunnable();
1122 p.execute(mr);
1123 for (int i = 0; i < tasks.length; ++i)
1124 p.execute(tasks[i]);
1125 for (int i = 1; i < tasks.length; ++i)
1126 assertTrue(tasks[i].done);
1127 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1128 } finally {
1129 joinPool(p);
1130 }
1131 }
1132
1133 /**
1134 * executor using DiscardPolicy drops task if saturated.
1135 */
1136 public void testSaturatedExecute3() {
1137 RejectedExecutionHandler h = new CustomTPE.DiscardPolicy();
1138 ThreadPoolExecutor p =
1139 new CustomTPE(1, 1,
1140 LONG_DELAY_MS, MILLISECONDS,
1141 new ArrayBlockingQueue<Runnable>(1),
1142 h);
1143 try {
1144 TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5];
1145 for (int i = 0; i < tasks.length; ++i)
1146 tasks[i] = new TrackedNoOpRunnable();
1147 p.execute(new TrackedLongRunnable());
1148 for (TrackedNoOpRunnable task : tasks)
1149 p.execute(task);
1150 for (TrackedNoOpRunnable task : tasks)
1151 assertFalse(task.done);
1152 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1153 } finally {
1154 joinPool(p);
1155 }
1156 }
1157
1158 /**
1159 * executor using DiscardOldestPolicy drops oldest task if saturated.
1160 */
1161 public void testSaturatedExecute4() {
1162 RejectedExecutionHandler h = new CustomTPE.DiscardOldestPolicy();
1163 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1164 try {
1165 p.execute(new TrackedLongRunnable());
1166 TrackedLongRunnable r2 = new TrackedLongRunnable();
1167 p.execute(r2);
1168 assertTrue(p.getQueue().contains(r2));
1169 TrackedNoOpRunnable r3 = new TrackedNoOpRunnable();
1170 p.execute(r3);
1171 assertFalse(p.getQueue().contains(r2));
1172 assertTrue(p.getQueue().contains(r3));
1173 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1174 } finally {
1175 joinPool(p);
1176 }
1177 }
1178
1179 /**
1180 * execute throws RejectedExecutionException if shutdown
1181 */
1182 public void testRejectedExecutionExceptionOnShutdown() {
1183 ThreadPoolExecutor p =
1184 new CustomTPE(1,1,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(1));
1185 try { p.shutdown(); } catch (SecurityException ok) { return; }
1186 try {
1187 p.execute(new NoOpRunnable());
1188 shouldThrow();
1189 } catch (RejectedExecutionException success) {}
1190
1191 joinPool(p);
1192 }
1193
1194 /**
1195 * execute using CallerRunsPolicy drops task on shutdown
1196 */
1197 public void testCallerRunsOnShutdown() {
1198 RejectedExecutionHandler h = new CustomTPE.CallerRunsPolicy();
1199 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1200
1201 try { p.shutdown(); } catch (SecurityException ok) { return; }
1202 try {
1203 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1204 p.execute(r);
1205 assertFalse(r.done);
1206 } finally {
1207 joinPool(p);
1208 }
1209 }
1210
1211 /**
1212 * execute using DiscardPolicy drops task on shutdown
1213 */
1214 public void testDiscardOnShutdown() {
1215 RejectedExecutionHandler h = new CustomTPE.DiscardPolicy();
1216 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1217
1218 try { p.shutdown(); } catch (SecurityException ok) { return; }
1219 try {
1220 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1221 p.execute(r);
1222 assertFalse(r.done);
1223 } finally {
1224 joinPool(p);
1225 }
1226 }
1227
1228 /**
1229 * execute using DiscardOldestPolicy drops task on shutdown
1230 */
1231 public void testDiscardOldestOnShutdown() {
1232 RejectedExecutionHandler h = new CustomTPE.DiscardOldestPolicy();
1233 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1234
1235 try { p.shutdown(); } catch (SecurityException ok) { return; }
1236 try {
1237 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1238 p.execute(r);
1239 assertFalse(r.done);
1240 } finally {
1241 joinPool(p);
1242 }
1243 }
1244
1245 /**
1246 * execute(null) throws NPE
1247 */
1248 public void testExecuteNull() {
1249 ThreadPoolExecutor p =
1250 new CustomTPE(1, 2, 1L, SECONDS,
1251 new ArrayBlockingQueue<Runnable>(10));
1252 try {
1253 p.execute(null);
1254 shouldThrow();
1255 } catch (NullPointerException success) {}
1256
1257 joinPool(p);
1258 }
1259
1260 /**
1261 * setCorePoolSize of negative value throws IllegalArgumentException
1262 */
1263 public void testCorePoolSizeIllegalArgumentException() {
1264 ThreadPoolExecutor p =
1265 new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1266 try {
1267 p.setCorePoolSize(-1);
1268 shouldThrow();
1269 } catch (IllegalArgumentException success) {
1270 } finally {
1271 try { p.shutdown(); } catch (SecurityException ok) { return; }
1272 }
1273 joinPool(p);
1274 }
1275
1276 /**
1277 * setMaximumPoolSize(int) throws IllegalArgumentException
1278 * if given a value less the core pool size
1279 */
1280 public void testMaximumPoolSizeIllegalArgumentException() {
1281 ThreadPoolExecutor p =
1282 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1283 try {
1284 p.setMaximumPoolSize(1);
1285 shouldThrow();
1286 } catch (IllegalArgumentException success) {
1287 } finally {
1288 try { p.shutdown(); } catch (SecurityException ok) { return; }
1289 }
1290 joinPool(p);
1291 }
1292
1293 /**
1294 * setMaximumPoolSize throws IllegalArgumentException
1295 * if given a negative value
1296 */
1297 public void testMaximumPoolSizeIllegalArgumentException2() {
1298 ThreadPoolExecutor p =
1299 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1300 try {
1301 p.setMaximumPoolSize(-1);
1302 shouldThrow();
1303 } catch (IllegalArgumentException success) {
1304 } finally {
1305 try { p.shutdown(); } catch (SecurityException ok) { return; }
1306 }
1307 joinPool(p);
1308 }
1309
1310 /**
1311 * setKeepAliveTime throws IllegalArgumentException
1312 * when given a negative value
1313 */
1314 public void testKeepAliveTimeIllegalArgumentException() {
1315 ThreadPoolExecutor p =
1316 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1317
1318 try {
1319 p.setKeepAliveTime(-1,MILLISECONDS);
1320 shouldThrow();
1321 } catch (IllegalArgumentException success) {
1322 } finally {
1323 try { p.shutdown(); } catch (SecurityException ok) { return; }
1324 }
1325 joinPool(p);
1326 }
1327
1328 /**
1329 * terminated() is called on termination
1330 */
1331 public void testTerminated() {
1332 CustomTPE p = new CustomTPE();
1333 try { p.shutdown(); } catch (SecurityException ok) { return; }
1334 assertTrue(p.terminatedCalled());
1335 joinPool(p);
1336 }
1337
1338 /**
1339 * beforeExecute and afterExecute are called when executing task
1340 */
1341 public void testBeforeAfter() throws InterruptedException {
1342 CustomTPE p = new CustomTPE();
1343 try {
1344 final CountDownLatch done = new CountDownLatch(1);
1345 p.execute(new CheckedRunnable() {
1346 public void realRun() {
1347 done.countDown();
1348 }});
1349 await(p.afterCalled);
1350 assertEquals(0, done.getCount());
1351 assertTrue(p.afterCalled());
1352 assertTrue(p.beforeCalled());
1353 try { p.shutdown(); } catch (SecurityException ok) { return; }
1354 } finally {
1355 joinPool(p);
1356 }
1357 }
1358
1359 /**
1360 * completed submit of callable returns result
1361 */
1362 public void testSubmitCallable() throws Exception {
1363 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1364 try {
1365 Future<String> future = e.submit(new StringTask());
1366 String result = future.get();
1367 assertSame(TEST_STRING, result);
1368 } finally {
1369 joinPool(e);
1370 }
1371 }
1372
1373 /**
1374 * completed submit of runnable returns successfully
1375 */
1376 public void testSubmitRunnable() throws Exception {
1377 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1378 try {
1379 Future<?> future = e.submit(new NoOpRunnable());
1380 future.get();
1381 assertTrue(future.isDone());
1382 } finally {
1383 joinPool(e);
1384 }
1385 }
1386
1387 /**
1388 * completed submit of (runnable, result) returns result
1389 */
1390 public void testSubmitRunnable2() throws Exception {
1391 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1392 try {
1393 Future<String> future = e.submit(new NoOpRunnable(), TEST_STRING);
1394 String result = future.get();
1395 assertSame(TEST_STRING, result);
1396 } finally {
1397 joinPool(e);
1398 }
1399 }
1400
1401 /**
1402 * invokeAny(null) throws NPE
1403 */
1404 public void testInvokeAny1() throws Exception {
1405 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1406 try {
1407 e.invokeAny(null);
1408 shouldThrow();
1409 } catch (NullPointerException success) {
1410 } finally {
1411 joinPool(e);
1412 }
1413 }
1414
1415 /**
1416 * invokeAny(empty collection) throws IAE
1417 */
1418 public void testInvokeAny2() throws Exception {
1419 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1420 try {
1421 e.invokeAny(new ArrayList<Callable<String>>());
1422 shouldThrow();
1423 } catch (IllegalArgumentException success) {
1424 } finally {
1425 joinPool(e);
1426 }
1427 }
1428
1429 /**
1430 * invokeAny(c) throws NPE if c has null elements
1431 */
1432 public void testInvokeAny3() throws Exception {
1433 CountDownLatch latch = new CountDownLatch(1);
1434 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1435 List<Callable<String>> l = new ArrayList<Callable<String>>();
1436 l.add(latchAwaitingStringTask(latch));
1437 l.add(null);
1438 try {
1439 e.invokeAny(l);
1440 shouldThrow();
1441 } catch (NullPointerException success) {
1442 } finally {
1443 latch.countDown();
1444 joinPool(e);
1445 }
1446 }
1447
1448 /**
1449 * invokeAny(c) throws ExecutionException if no task completes
1450 */
1451 public void testInvokeAny4() throws Exception {
1452 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1453 List<Callable<String>> l = new ArrayList<Callable<String>>();
1454 l.add(new NPETask());
1455 try {
1456 e.invokeAny(l);
1457 shouldThrow();
1458 } catch (ExecutionException success) {
1459 assertTrue(success.getCause() instanceof NullPointerException);
1460 } finally {
1461 joinPool(e);
1462 }
1463 }
1464
1465 /**
1466 * invokeAny(c) returns result of some task
1467 */
1468 public void testInvokeAny5() throws Exception {
1469 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1470 try {
1471 List<Callable<String>> l = new ArrayList<Callable<String>>();
1472 l.add(new StringTask());
1473 l.add(new StringTask());
1474 String result = e.invokeAny(l);
1475 assertSame(TEST_STRING, result);
1476 } finally {
1477 joinPool(e);
1478 }
1479 }
1480
1481 /**
1482 * invokeAll(null) throws NPE
1483 */
1484 public void testInvokeAll1() throws Exception {
1485 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1486 try {
1487 e.invokeAll(null);
1488 shouldThrow();
1489 } catch (NullPointerException success) {
1490 } finally {
1491 joinPool(e);
1492 }
1493 }
1494
1495 /**
1496 * invokeAll(empty collection) returns empty collection
1497 */
1498 public void testInvokeAll2() throws Exception {
1499 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1500 try {
1501 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>());
1502 assertTrue(r.isEmpty());
1503 } finally {
1504 joinPool(e);
1505 }
1506 }
1507
1508 /**
1509 * invokeAll(c) throws NPE if c has null elements
1510 */
1511 public void testInvokeAll3() throws Exception {
1512 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1513 List<Callable<String>> l = new ArrayList<Callable<String>>();
1514 l.add(new StringTask());
1515 l.add(null);
1516 try {
1517 e.invokeAll(l);
1518 shouldThrow();
1519 } catch (NullPointerException success) {
1520 } finally {
1521 joinPool(e);
1522 }
1523 }
1524
1525 /**
1526 * get of element of invokeAll(c) throws exception on failed task
1527 */
1528 public void testInvokeAll4() throws Exception {
1529 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1530 List<Callable<String>> l = new ArrayList<Callable<String>>();
1531 l.add(new NPETask());
1532 List<Future<String>> futures = e.invokeAll(l);
1533 assertEquals(1, futures.size());
1534 try {
1535 futures.get(0).get();
1536 shouldThrow();
1537 } catch (ExecutionException success) {
1538 assertTrue(success.getCause() instanceof NullPointerException);
1539 } finally {
1540 joinPool(e);
1541 }
1542 }
1543
1544 /**
1545 * invokeAll(c) returns results of all completed tasks
1546 */
1547 public void testInvokeAll5() throws Exception {
1548 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1549 try {
1550 List<Callable<String>> l = new ArrayList<Callable<String>>();
1551 l.add(new StringTask());
1552 l.add(new StringTask());
1553 List<Future<String>> futures = e.invokeAll(l);
1554 assertEquals(2, futures.size());
1555 for (Future<String> future : futures)
1556 assertSame(TEST_STRING, future.get());
1557 } finally {
1558 joinPool(e);
1559 }
1560 }
1561
1562 /**
1563 * timed invokeAny(null) throws NPE
1564 */
1565 public void testTimedInvokeAny1() throws Exception {
1566 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1567 try {
1568 e.invokeAny(null, MEDIUM_DELAY_MS, MILLISECONDS);
1569 shouldThrow();
1570 } catch (NullPointerException success) {
1571 } finally {
1572 joinPool(e);
1573 }
1574 }
1575
1576 /**
1577 * timed invokeAny(,,null) throws NPE
1578 */
1579 public void testTimedInvokeAnyNullTimeUnit() throws Exception {
1580 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1581 List<Callable<String>> l = new ArrayList<Callable<String>>();
1582 l.add(new StringTask());
1583 try {
1584 e.invokeAny(l, MEDIUM_DELAY_MS, null);
1585 shouldThrow();
1586 } catch (NullPointerException success) {
1587 } finally {
1588 joinPool(e);
1589 }
1590 }
1591
1592 /**
1593 * timed invokeAny(empty collection) throws IAE
1594 */
1595 public void testTimedInvokeAny2() throws Exception {
1596 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1597 try {
1598 e.invokeAny(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1599 shouldThrow();
1600 } catch (IllegalArgumentException success) {
1601 } finally {
1602 joinPool(e);
1603 }
1604 }
1605
1606 /**
1607 * timed invokeAny(c) throws NPE if c has null elements
1608 */
1609 public void testTimedInvokeAny3() throws Exception {
1610 CountDownLatch latch = new CountDownLatch(1);
1611 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1612 List<Callable<String>> l = new ArrayList<Callable<String>>();
1613 l.add(latchAwaitingStringTask(latch));
1614 l.add(null);
1615 try {
1616 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1617 shouldThrow();
1618 } catch (NullPointerException success) {
1619 } finally {
1620 latch.countDown();
1621 joinPool(e);
1622 }
1623 }
1624
1625 /**
1626 * timed invokeAny(c) throws ExecutionException if no task completes
1627 */
1628 public void testTimedInvokeAny4() throws Exception {
1629 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1630 List<Callable<String>> l = new ArrayList<Callable<String>>();
1631 l.add(new NPETask());
1632 try {
1633 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1634 shouldThrow();
1635 } catch (ExecutionException success) {
1636 assertTrue(success.getCause() instanceof NullPointerException);
1637 } finally {
1638 joinPool(e);
1639 }
1640 }
1641
1642 /**
1643 * timed invokeAny(c) returns result of some task
1644 */
1645 public void testTimedInvokeAny5() throws Exception {
1646 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1647 try {
1648 List<Callable<String>> l = new ArrayList<Callable<String>>();
1649 l.add(new StringTask());
1650 l.add(new StringTask());
1651 String result = e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1652 assertSame(TEST_STRING, result);
1653 } finally {
1654 joinPool(e);
1655 }
1656 }
1657
1658 /**
1659 * timed invokeAll(null) throws NPE
1660 */
1661 public void testTimedInvokeAll1() throws Exception {
1662 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1663 try {
1664 e.invokeAll(null, MEDIUM_DELAY_MS, MILLISECONDS);
1665 shouldThrow();
1666 } catch (NullPointerException success) {
1667 } finally {
1668 joinPool(e);
1669 }
1670 }
1671
1672 /**
1673 * timed invokeAll(,,null) throws NPE
1674 */
1675 public void testTimedInvokeAllNullTimeUnit() throws Exception {
1676 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1677 List<Callable<String>> l = new ArrayList<Callable<String>>();
1678 l.add(new StringTask());
1679 try {
1680 e.invokeAll(l, MEDIUM_DELAY_MS, null);
1681 shouldThrow();
1682 } catch (NullPointerException success) {
1683 } finally {
1684 joinPool(e);
1685 }
1686 }
1687
1688 /**
1689 * timed invokeAll(empty collection) returns empty collection
1690 */
1691 public void testTimedInvokeAll2() throws Exception {
1692 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1693 try {
1694 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1695 assertTrue(r.isEmpty());
1696 } finally {
1697 joinPool(e);
1698 }
1699 }
1700
1701 /**
1702 * timed invokeAll(c) throws NPE if c has null elements
1703 */
1704 public void testTimedInvokeAll3() throws Exception {
1705 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1706 List<Callable<String>> l = new ArrayList<Callable<String>>();
1707 l.add(new StringTask());
1708 l.add(null);
1709 try {
1710 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1711 shouldThrow();
1712 } catch (NullPointerException success) {
1713 } finally {
1714 joinPool(e);
1715 }
1716 }
1717
1718 /**
1719 * get of element of invokeAll(c) throws exception on failed task
1720 */
1721 public void testTimedInvokeAll4() throws Exception {
1722 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1723 List<Callable<String>> l = new ArrayList<Callable<String>>();
1724 l.add(new NPETask());
1725 List<Future<String>> futures =
1726 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1727 assertEquals(1, futures.size());
1728 try {
1729 futures.get(0).get();
1730 shouldThrow();
1731 } catch (ExecutionException success) {
1732 assertTrue(success.getCause() instanceof NullPointerException);
1733 } finally {
1734 joinPool(e);
1735 }
1736 }
1737
1738 /**
1739 * timed invokeAll(c) returns results of all completed tasks
1740 */
1741 public void testTimedInvokeAll5() throws Exception {
1742 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1743 try {
1744 List<Callable<String>> l = new ArrayList<Callable<String>>();
1745 l.add(new StringTask());
1746 l.add(new StringTask());
1747 List<Future<String>> futures =
1748 e.invokeAll(l, LONG_DELAY_MS, MILLISECONDS);
1749 assertEquals(2, futures.size());
1750 for (Future<String> future : futures)
1751 assertSame(TEST_STRING, future.get());
1752 } finally {
1753 joinPool(e);
1754 }
1755 }
1756
1757 /**
1758 * timed invokeAll(c) cancels tasks not completed by timeout
1759 */
1760 public void testTimedInvokeAll6() throws Exception {
1761 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1762 try {
1763 for (long timeout = timeoutMillis();;) {
1764 List<Callable<String>> tasks = new ArrayList<>();
1765 tasks.add(new StringTask("0"));
1766 tasks.add(Executors.callable(new LongPossiblyInterruptedRunnable(), TEST_STRING));
1767 tasks.add(new StringTask("2"));
1768 long startTime = System.nanoTime();
1769 List<Future<String>> futures =
1770 e.invokeAll(tasks, timeout, MILLISECONDS);
1771 assertEquals(tasks.size(), futures.size());
1772 assertTrue(millisElapsedSince(startTime) >= timeout);
1773 for (Future future : futures)
1774 assertTrue(future.isDone());
1775 assertTrue(futures.get(1).isCancelled());
1776 try {
1777 assertEquals("0", futures.get(0).get());
1778 assertEquals("2", futures.get(2).get());
1779 break;
1780 } catch (CancellationException retryWithLongerTimeout) {
1781 timeout *= 2;
1782 if (timeout >= LONG_DELAY_MS / 2)
1783 fail("expected exactly one task to be cancelled");
1784 }
1785 }
1786 } finally {
1787 joinPool(e);
1788 }
1789 }
1790
1791 /**
1792 * Execution continues if there is at least one thread even if
1793 * thread factory fails to create more
1794 */
1795 public void testFailingThreadFactory() throws InterruptedException {
1796 final ExecutorService e =
1797 new CustomTPE(100, 100,
1798 LONG_DELAY_MS, MILLISECONDS,
1799 new LinkedBlockingQueue<Runnable>(),
1800 new FailingThreadFactory());
1801 try {
1802 final int TASKS = 100;
1803 final CountDownLatch done = new CountDownLatch(TASKS);
1804 for (int k = 0; k < TASKS; ++k)
1805 e.execute(new CheckedRunnable() {
1806 public void realRun() {
1807 done.countDown();
1808 }});
1809 assertTrue(done.await(LONG_DELAY_MS, MILLISECONDS));
1810 } finally {
1811 joinPool(e);
1812 }
1813 }
1814
1815 /**
1816 * allowsCoreThreadTimeOut is by default false.
1817 */
1818 public void testAllowsCoreThreadTimeOut() {
1819 ThreadPoolExecutor p = new CustomTPE(2, 2, 1000, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1820 assertFalse(p.allowsCoreThreadTimeOut());
1821 joinPool(p);
1822 }
1823
1824 /**
1825 * allowCoreThreadTimeOut(true) causes idle threads to time out
1826 */
1827 public void testAllowCoreThreadTimeOut_true() throws Exception {
1828 long keepAliveTime = timeoutMillis();
1829 final ThreadPoolExecutor p =
1830 new CustomTPE(2, 10,
1831 keepAliveTime, MILLISECONDS,
1832 new ArrayBlockingQueue<Runnable>(10));
1833 final CountDownLatch threadStarted = new CountDownLatch(1);
1834 try {
1835 p.allowCoreThreadTimeOut(true);
1836 p.execute(new CheckedRunnable() {
1837 public void realRun() {
1838 threadStarted.countDown();
1839 assertEquals(1, p.getPoolSize());
1840 }});
1841 await(threadStarted);
1842 delay(keepAliveTime);
1843 long startTime = System.nanoTime();
1844 while (p.getPoolSize() > 0
1845 && millisElapsedSince(startTime) < LONG_DELAY_MS)
1846 Thread.yield();
1847 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1848 assertEquals(0, p.getPoolSize());
1849 } finally {
1850 joinPool(p);
1851 }
1852 }
1853
1854 /**
1855 * allowCoreThreadTimeOut(false) causes idle threads not to time out
1856 */
1857 public void testAllowCoreThreadTimeOut_false() throws Exception {
1858 long keepAliveTime = timeoutMillis();
1859 final ThreadPoolExecutor p =
1860 new CustomTPE(2, 10,
1861 keepAliveTime, MILLISECONDS,
1862 new ArrayBlockingQueue<Runnable>(10));
1863 final CountDownLatch threadStarted = new CountDownLatch(1);
1864 try {
1865 p.allowCoreThreadTimeOut(false);
1866 p.execute(new CheckedRunnable() {
1867 public void realRun() throws InterruptedException {
1868 threadStarted.countDown();
1869 assertTrue(p.getPoolSize() >= 1);
1870 }});
1871 delay(2 * keepAliveTime);
1872 assertTrue(p.getPoolSize() >= 1);
1873 } finally {
1874 joinPool(p);
1875 }
1876 }
1877
1878 /**
1879 * get(cancelled task) throws CancellationException
1880 * (in part, a test of CustomTPE itself)
1881 */
1882 public void testGet_cancelled() throws Exception {
1883 final ExecutorService e =
1884 new CustomTPE(1, 1,
1885 LONG_DELAY_MS, MILLISECONDS,
1886 new LinkedBlockingQueue<Runnable>());
1887 try {
1888 final CountDownLatch blockerStarted = new CountDownLatch(1);
1889 final CountDownLatch done = new CountDownLatch(1);
1890 final List<Future<?>> futures = new ArrayList<>();
1891 for (int i = 0; i < 2; i++) {
1892 Runnable r = new CheckedRunnable() { public void realRun()
1893 throws Throwable {
1894 blockerStarted.countDown();
1895 assertTrue(done.await(2 * LONG_DELAY_MS, MILLISECONDS));
1896 }};
1897 futures.add(e.submit(r));
1898 }
1899 assertTrue(blockerStarted.await(LONG_DELAY_MS, MILLISECONDS));
1900 for (Future<?> future : futures) future.cancel(false);
1901 for (Future<?> future : futures) {
1902 try {
1903 future.get();
1904 shouldThrow();
1905 } catch (CancellationException success) {}
1906 try {
1907 future.get(LONG_DELAY_MS, MILLISECONDS);
1908 shouldThrow();
1909 } catch (CancellationException success) {}
1910 assertTrue(future.isCancelled());
1911 assertTrue(future.isDone());
1912 }
1913 done.countDown();
1914 } finally {
1915 joinPool(e);
1916 }
1917 }
1918
1919 }