ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/ThreadPoolExecutorSubclassTest.java
Revision: 1.44
Committed: Mon Sep 28 21:15:44 2015 UTC (8 years, 7 months ago) by jsr166
Branch: MAIN
Changes since 1.43: +53 -7 lines
Log Message:
fix concurrency bugs in CustomTask; add cancellation tests

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include Andrew Wright, Jeffrey Hayes,
6 * Pat Fisher, Mike Judd.
7 */
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.SECONDS;
11
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.concurrent.ArrayBlockingQueue;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.CancellationException;
18 import java.util.concurrent.CountDownLatch;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.FutureTask;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.RejectedExecutionHandler;
27 import java.util.concurrent.RunnableFuture;
28 import java.util.concurrent.SynchronousQueue;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.TimeoutException;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.locks.Condition;
35 import java.util.concurrent.locks.ReentrantLock;
36
37 import junit.framework.Test;
38 import junit.framework.TestSuite;
39
40 public class ThreadPoolExecutorSubclassTest extends JSR166TestCase {
41 public static void main(String[] args) {
42 main(suite(), args);
43 }
44 public static Test suite() {
45 return new TestSuite(ThreadPoolExecutorSubclassTest.class);
46 }
47
48 static class CustomTask<V> implements RunnableFuture<V> {
49 final Callable<V> callable;
50 final ReentrantLock lock = new ReentrantLock();
51 final Condition cond = lock.newCondition();
52 boolean done;
53 boolean cancelled;
54 V result;
55 Thread thread;
56 Exception exception;
57 CustomTask(Callable<V> c) {
58 if (c == null) throw new NullPointerException();
59 callable = c;
60 }
61 CustomTask(final Runnable r, final V res) {
62 if (r == null) throw new NullPointerException();
63 callable = new Callable<V>() {
64 public V call() throws Exception { r.run(); return res; }};
65 }
66 public boolean isDone() {
67 lock.lock(); try { return done; } finally { lock.unlock() ; }
68 }
69 public boolean isCancelled() {
70 lock.lock(); try { return cancelled; } finally { lock.unlock() ; }
71 }
72 public boolean cancel(boolean mayInterrupt) {
73 lock.lock();
74 try {
75 if (!done) {
76 cancelled = true;
77 done = true;
78 if (mayInterrupt && thread != null)
79 thread.interrupt();
80 return true;
81 }
82 return false;
83 }
84 finally { lock.unlock() ; }
85 }
86 public void run() {
87 lock.lock();
88 try {
89 if (done)
90 return;
91 thread = Thread.currentThread();
92 }
93 finally { lock.unlock() ; }
94 V v = null;
95 Exception e = null;
96 try {
97 v = callable.call();
98 }
99 catch (Exception ex) {
100 e = ex;
101 }
102 lock.lock();
103 try {
104 if (!done) {
105 result = v;
106 exception = e;
107 done = true;
108 thread = null;
109 cond.signalAll();
110 }
111 }
112 finally { lock.unlock(); }
113 }
114 public V get() throws InterruptedException, ExecutionException {
115 lock.lock();
116 try {
117 while (!done)
118 cond.await();
119 if (cancelled)
120 throw new CancellationException();
121 if (exception != null)
122 throw new ExecutionException(exception);
123 return result;
124 }
125 finally { lock.unlock(); }
126 }
127 public V get(long timeout, TimeUnit unit)
128 throws InterruptedException, ExecutionException, TimeoutException {
129 long nanos = unit.toNanos(timeout);
130 lock.lock();
131 try {
132 while (!done) {
133 if (nanos < 0)
134 throw new TimeoutException();
135 nanos = cond.awaitNanos(nanos);
136 }
137 if (cancelled)
138 throw new CancellationException();
139 if (exception != null)
140 throw new ExecutionException(exception);
141 return result;
142 }
143 finally { lock.unlock(); }
144 }
145 }
146
147 static class CustomTPE extends ThreadPoolExecutor {
148 protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
149 return new CustomTask<V>(c);
150 }
151 protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
152 return new CustomTask<V>(r, v);
153 }
154
155 CustomTPE(int corePoolSize,
156 int maximumPoolSize,
157 long keepAliveTime,
158 TimeUnit unit,
159 BlockingQueue<Runnable> workQueue) {
160 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
161 workQueue);
162 }
163 CustomTPE(int corePoolSize,
164 int maximumPoolSize,
165 long keepAliveTime,
166 TimeUnit unit,
167 BlockingQueue<Runnable> workQueue,
168 ThreadFactory threadFactory) {
169 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
170 threadFactory);
171 }
172
173 CustomTPE(int corePoolSize,
174 int maximumPoolSize,
175 long keepAliveTime,
176 TimeUnit unit,
177 BlockingQueue<Runnable> workQueue,
178 RejectedExecutionHandler handler) {
179 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
180 handler);
181 }
182 CustomTPE(int corePoolSize,
183 int maximumPoolSize,
184 long keepAliveTime,
185 TimeUnit unit,
186 BlockingQueue<Runnable> workQueue,
187 ThreadFactory threadFactory,
188 RejectedExecutionHandler handler) {
189 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
190 workQueue, threadFactory, handler);
191 }
192
193 final CountDownLatch beforeCalled = new CountDownLatch(1);
194 final CountDownLatch afterCalled = new CountDownLatch(1);
195 final CountDownLatch terminatedCalled = new CountDownLatch(1);
196
197 public CustomTPE() {
198 super(1, 1, LONG_DELAY_MS, MILLISECONDS, new SynchronousQueue<Runnable>());
199 }
200 protected void beforeExecute(Thread t, Runnable r) {
201 beforeCalled.countDown();
202 }
203 protected void afterExecute(Runnable r, Throwable t) {
204 afterCalled.countDown();
205 }
206 protected void terminated() {
207 terminatedCalled.countDown();
208 }
209
210 public boolean beforeCalled() {
211 return beforeCalled.getCount() == 0;
212 }
213 public boolean afterCalled() {
214 return afterCalled.getCount() == 0;
215 }
216 public boolean terminatedCalled() {
217 return terminatedCalled.getCount() == 0;
218 }
219 }
220
221 static class FailingThreadFactory implements ThreadFactory {
222 int calls = 0;
223 public Thread newThread(Runnable r) {
224 if (++calls > 1) return null;
225 return new Thread(r);
226 }
227 }
228
229 /**
230 * execute successfully executes a runnable
231 */
232 public void testExecute() throws InterruptedException {
233 final ThreadPoolExecutor p =
234 new CustomTPE(1, 1,
235 LONG_DELAY_MS, MILLISECONDS,
236 new ArrayBlockingQueue<Runnable>(10));
237 final CountDownLatch done = new CountDownLatch(1);
238 final Runnable task = new CheckedRunnable() {
239 public void realRun() {
240 done.countDown();
241 }};
242 try {
243 p.execute(task);
244 assertTrue(done.await(SMALL_DELAY_MS, MILLISECONDS));
245 } finally {
246 joinPool(p);
247 }
248 }
249
250 /**
251 * getActiveCount increases but doesn't overestimate, when a
252 * thread becomes active
253 */
254 public void testGetActiveCount() throws InterruptedException {
255 final ThreadPoolExecutor p =
256 new CustomTPE(2, 2,
257 LONG_DELAY_MS, MILLISECONDS,
258 new ArrayBlockingQueue<Runnable>(10));
259 final CountDownLatch threadStarted = new CountDownLatch(1);
260 final CountDownLatch done = new CountDownLatch(1);
261 try {
262 assertEquals(0, p.getActiveCount());
263 p.execute(new CheckedRunnable() {
264 public void realRun() throws InterruptedException {
265 threadStarted.countDown();
266 assertEquals(1, p.getActiveCount());
267 done.await();
268 }});
269 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
270 assertEquals(1, p.getActiveCount());
271 } finally {
272 done.countDown();
273 joinPool(p);
274 }
275 }
276
277 /**
278 * prestartCoreThread starts a thread if under corePoolSize, else doesn't
279 */
280 public void testPrestartCoreThread() {
281 ThreadPoolExecutor p = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
282 assertEquals(0, p.getPoolSize());
283 assertTrue(p.prestartCoreThread());
284 assertEquals(1, p.getPoolSize());
285 assertTrue(p.prestartCoreThread());
286 assertEquals(2, p.getPoolSize());
287 assertFalse(p.prestartCoreThread());
288 assertEquals(2, p.getPoolSize());
289 joinPool(p);
290 }
291
292 /**
293 * prestartAllCoreThreads starts all corePoolSize threads
294 */
295 public void testPrestartAllCoreThreads() {
296 ThreadPoolExecutor p = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
297 assertEquals(0, p.getPoolSize());
298 p.prestartAllCoreThreads();
299 assertEquals(2, p.getPoolSize());
300 p.prestartAllCoreThreads();
301 assertEquals(2, p.getPoolSize());
302 joinPool(p);
303 }
304
305 /**
306 * getCompletedTaskCount increases, but doesn't overestimate,
307 * when tasks complete
308 */
309 public void testGetCompletedTaskCount() throws InterruptedException {
310 final ThreadPoolExecutor p =
311 new CustomTPE(2, 2,
312 LONG_DELAY_MS, MILLISECONDS,
313 new ArrayBlockingQueue<Runnable>(10));
314 final CountDownLatch threadStarted = new CountDownLatch(1);
315 final CountDownLatch threadProceed = new CountDownLatch(1);
316 final CountDownLatch threadDone = new CountDownLatch(1);
317 try {
318 assertEquals(0, p.getCompletedTaskCount());
319 p.execute(new CheckedRunnable() {
320 public void realRun() throws InterruptedException {
321 threadStarted.countDown();
322 assertEquals(0, p.getCompletedTaskCount());
323 threadProceed.await();
324 threadDone.countDown();
325 }});
326 await(threadStarted);
327 assertEquals(0, p.getCompletedTaskCount());
328 threadProceed.countDown();
329 threadDone.await();
330 long startTime = System.nanoTime();
331 while (p.getCompletedTaskCount() != 1) {
332 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
333 fail("timed out");
334 Thread.yield();
335 }
336 } finally {
337 joinPool(p);
338 }
339 }
340
341 /**
342 * getCorePoolSize returns size given in constructor if not otherwise set
343 */
344 public void testGetCorePoolSize() {
345 ThreadPoolExecutor p = new CustomTPE(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
346 assertEquals(1, p.getCorePoolSize());
347 joinPool(p);
348 }
349
350 /**
351 * getKeepAliveTime returns value given in constructor if not otherwise set
352 */
353 public void testGetKeepAliveTime() {
354 ThreadPoolExecutor p = new CustomTPE(2, 2, 1000, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
355 assertEquals(1, p.getKeepAliveTime(SECONDS));
356 joinPool(p);
357 }
358
359 /**
360 * getThreadFactory returns factory in constructor if not set
361 */
362 public void testGetThreadFactory() {
363 ThreadFactory tf = new SimpleThreadFactory();
364 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), tf, new NoOpREHandler());
365 assertSame(tf, p.getThreadFactory());
366 joinPool(p);
367 }
368
369 /**
370 * setThreadFactory sets the thread factory returned by getThreadFactory
371 */
372 public void testSetThreadFactory() {
373 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
374 ThreadFactory tf = new SimpleThreadFactory();
375 p.setThreadFactory(tf);
376 assertSame(tf, p.getThreadFactory());
377 joinPool(p);
378 }
379
380 /**
381 * setThreadFactory(null) throws NPE
382 */
383 public void testSetThreadFactoryNull() {
384 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
385 try {
386 p.setThreadFactory(null);
387 shouldThrow();
388 } catch (NullPointerException success) {
389 } finally {
390 joinPool(p);
391 }
392 }
393
394 /**
395 * getRejectedExecutionHandler returns handler in constructor if not set
396 */
397 public void testGetRejectedExecutionHandler() {
398 RejectedExecutionHandler h = new NoOpREHandler();
399 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), h);
400 assertSame(h, p.getRejectedExecutionHandler());
401 joinPool(p);
402 }
403
404 /**
405 * setRejectedExecutionHandler sets the handler returned by
406 * getRejectedExecutionHandler
407 */
408 public void testSetRejectedExecutionHandler() {
409 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
410 RejectedExecutionHandler h = new NoOpREHandler();
411 p.setRejectedExecutionHandler(h);
412 assertSame(h, p.getRejectedExecutionHandler());
413 joinPool(p);
414 }
415
416 /**
417 * setRejectedExecutionHandler(null) throws NPE
418 */
419 public void testSetRejectedExecutionHandlerNull() {
420 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
421 try {
422 p.setRejectedExecutionHandler(null);
423 shouldThrow();
424 } catch (NullPointerException success) {
425 } finally {
426 joinPool(p);
427 }
428 }
429
430 /**
431 * getLargestPoolSize increases, but doesn't overestimate, when
432 * multiple threads active
433 */
434 public void testGetLargestPoolSize() throws InterruptedException {
435 final int THREADS = 3;
436 final ThreadPoolExecutor p =
437 new CustomTPE(THREADS, THREADS,
438 LONG_DELAY_MS, MILLISECONDS,
439 new ArrayBlockingQueue<Runnable>(10));
440 final CountDownLatch threadsStarted = new CountDownLatch(THREADS);
441 final CountDownLatch done = new CountDownLatch(1);
442 try {
443 assertEquals(0, p.getLargestPoolSize());
444 for (int i = 0; i < THREADS; i++)
445 p.execute(new CheckedRunnable() {
446 public void realRun() throws InterruptedException {
447 threadsStarted.countDown();
448 done.await();
449 assertEquals(THREADS, p.getLargestPoolSize());
450 }});
451 assertTrue(threadsStarted.await(SMALL_DELAY_MS, MILLISECONDS));
452 assertEquals(THREADS, p.getLargestPoolSize());
453 } finally {
454 done.countDown();
455 joinPool(p);
456 assertEquals(THREADS, p.getLargestPoolSize());
457 }
458 }
459
460 /**
461 * getMaximumPoolSize returns value given in constructor if not
462 * otherwise set
463 */
464 public void testGetMaximumPoolSize() {
465 ThreadPoolExecutor p = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
466 assertEquals(2, p.getMaximumPoolSize());
467 joinPool(p);
468 }
469
470 /**
471 * getPoolSize increases, but doesn't overestimate, when threads
472 * become active
473 */
474 public void testGetPoolSize() throws InterruptedException {
475 final ThreadPoolExecutor p =
476 new CustomTPE(1, 1,
477 LONG_DELAY_MS, MILLISECONDS,
478 new ArrayBlockingQueue<Runnable>(10));
479 final CountDownLatch threadStarted = new CountDownLatch(1);
480 final CountDownLatch done = new CountDownLatch(1);
481 try {
482 assertEquals(0, p.getPoolSize());
483 p.execute(new CheckedRunnable() {
484 public void realRun() throws InterruptedException {
485 threadStarted.countDown();
486 assertEquals(1, p.getPoolSize());
487 done.await();
488 }});
489 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
490 assertEquals(1, p.getPoolSize());
491 } finally {
492 done.countDown();
493 joinPool(p);
494 }
495 }
496
497 /**
498 * getTaskCount increases, but doesn't overestimate, when tasks submitted
499 */
500 public void testGetTaskCount() throws InterruptedException {
501 final ThreadPoolExecutor p =
502 new CustomTPE(1, 1,
503 LONG_DELAY_MS, MILLISECONDS,
504 new ArrayBlockingQueue<Runnable>(10));
505 final CountDownLatch threadStarted = new CountDownLatch(1);
506 final CountDownLatch done = new CountDownLatch(1);
507 try {
508 assertEquals(0, p.getTaskCount());
509 p.execute(new CheckedRunnable() {
510 public void realRun() throws InterruptedException {
511 threadStarted.countDown();
512 assertEquals(1, p.getTaskCount());
513 done.await();
514 }});
515 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
516 assertEquals(1, p.getTaskCount());
517 } finally {
518 done.countDown();
519 joinPool(p);
520 }
521 }
522
523 /**
524 * isShutdown is false before shutdown, true after
525 */
526 public void testIsShutdown() {
527
528 ThreadPoolExecutor p = new CustomTPE(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
529 assertFalse(p.isShutdown());
530 try { p.shutdown(); } catch (SecurityException ok) { return; }
531 assertTrue(p.isShutdown());
532 joinPool(p);
533 }
534
535 /**
536 * isTerminated is false before termination, true after
537 */
538 public void testIsTerminated() throws InterruptedException {
539 final ThreadPoolExecutor p =
540 new CustomTPE(1, 1,
541 LONG_DELAY_MS, MILLISECONDS,
542 new ArrayBlockingQueue<Runnable>(10));
543 final CountDownLatch threadStarted = new CountDownLatch(1);
544 final CountDownLatch done = new CountDownLatch(1);
545 try {
546 assertFalse(p.isTerminating());
547 p.execute(new CheckedRunnable() {
548 public void realRun() throws InterruptedException {
549 assertFalse(p.isTerminating());
550 threadStarted.countDown();
551 done.await();
552 }});
553 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
554 assertFalse(p.isTerminating());
555 done.countDown();
556 } finally {
557 try { p.shutdown(); } catch (SecurityException ok) { return; }
558 }
559 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
560 assertTrue(p.isTerminated());
561 assertFalse(p.isTerminating());
562 }
563
564 /**
565 * isTerminating is not true when running or when terminated
566 */
567 public void testIsTerminating() throws InterruptedException {
568 final ThreadPoolExecutor p =
569 new CustomTPE(1, 1,
570 LONG_DELAY_MS, MILLISECONDS,
571 new ArrayBlockingQueue<Runnable>(10));
572 final CountDownLatch threadStarted = new CountDownLatch(1);
573 final CountDownLatch done = new CountDownLatch(1);
574 try {
575 assertFalse(p.isTerminating());
576 p.execute(new CheckedRunnable() {
577 public void realRun() throws InterruptedException {
578 assertFalse(p.isTerminating());
579 threadStarted.countDown();
580 done.await();
581 }});
582 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
583 assertFalse(p.isTerminating());
584 done.countDown();
585 } finally {
586 try { p.shutdown(); } catch (SecurityException ok) { return; }
587 }
588 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
589 assertTrue(p.isTerminated());
590 assertFalse(p.isTerminating());
591 }
592
593 /**
594 * getQueue returns the work queue, which contains queued tasks
595 */
596 public void testGetQueue() throws InterruptedException {
597 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
598 final ThreadPoolExecutor p =
599 new CustomTPE(1, 1,
600 LONG_DELAY_MS, MILLISECONDS,
601 q);
602 final CountDownLatch threadStarted = new CountDownLatch(1);
603 final CountDownLatch done = new CountDownLatch(1);
604 try {
605 FutureTask[] tasks = new FutureTask[5];
606 for (int i = 0; i < tasks.length; i++) {
607 Callable task = new CheckedCallable<Boolean>() {
608 public Boolean realCall() throws InterruptedException {
609 threadStarted.countDown();
610 assertSame(q, p.getQueue());
611 done.await();
612 return Boolean.TRUE;
613 }};
614 tasks[i] = new FutureTask(task);
615 p.execute(tasks[i]);
616 }
617 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
618 assertSame(q, p.getQueue());
619 assertFalse(q.contains(tasks[0]));
620 assertTrue(q.contains(tasks[tasks.length - 1]));
621 assertEquals(tasks.length - 1, q.size());
622 } finally {
623 done.countDown();
624 joinPool(p);
625 }
626 }
627
628 /**
629 * remove(task) removes queued task, and fails to remove active task
630 */
631 public void testRemove() throws InterruptedException {
632 BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
633 final ThreadPoolExecutor p =
634 new CustomTPE(1, 1,
635 LONG_DELAY_MS, MILLISECONDS,
636 q);
637 Runnable[] tasks = new Runnable[6];
638 final CountDownLatch threadStarted = new CountDownLatch(1);
639 final CountDownLatch done = new CountDownLatch(1);
640 try {
641 for (int i = 0; i < tasks.length; i++) {
642 tasks[i] = new CheckedRunnable() {
643 public void realRun() throws InterruptedException {
644 threadStarted.countDown();
645 done.await();
646 }};
647 p.execute(tasks[i]);
648 }
649 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
650 assertFalse(p.remove(tasks[0]));
651 assertTrue(q.contains(tasks[4]));
652 assertTrue(q.contains(tasks[3]));
653 assertTrue(p.remove(tasks[4]));
654 assertFalse(p.remove(tasks[4]));
655 assertFalse(q.contains(tasks[4]));
656 assertTrue(q.contains(tasks[3]));
657 assertTrue(p.remove(tasks[3]));
658 assertFalse(q.contains(tasks[3]));
659 } finally {
660 done.countDown();
661 joinPool(p);
662 }
663 }
664
665 /**
666 * purge removes cancelled tasks from the queue
667 */
668 public void testPurge() throws InterruptedException {
669 final CountDownLatch threadStarted = new CountDownLatch(1);
670 final CountDownLatch done = new CountDownLatch(1);
671 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
672 final ThreadPoolExecutor p =
673 new CustomTPE(1, 1,
674 LONG_DELAY_MS, MILLISECONDS,
675 q);
676 FutureTask[] tasks = new FutureTask[5];
677 try {
678 for (int i = 0; i < tasks.length; i++) {
679 Callable task = new CheckedCallable<Boolean>() {
680 public Boolean realCall() throws InterruptedException {
681 threadStarted.countDown();
682 done.await();
683 return Boolean.TRUE;
684 }};
685 tasks[i] = new FutureTask(task);
686 p.execute(tasks[i]);
687 }
688 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
689 assertEquals(tasks.length, p.getTaskCount());
690 assertEquals(tasks.length - 1, q.size());
691 assertEquals(1L, p.getActiveCount());
692 assertEquals(0L, p.getCompletedTaskCount());
693 tasks[4].cancel(true);
694 tasks[3].cancel(false);
695 p.purge();
696 assertEquals(tasks.length - 3, q.size());
697 assertEquals(tasks.length - 2, p.getTaskCount());
698 p.purge(); // Nothing to do
699 assertEquals(tasks.length - 3, q.size());
700 assertEquals(tasks.length - 2, p.getTaskCount());
701 } finally {
702 done.countDown();
703 joinPool(p);
704 }
705 }
706
707 /**
708 * shutdownNow returns a list containing tasks that were not run,
709 * and those tasks are drained from the queue
710 */
711 public void testShutdownNow() throws InterruptedException {
712 final int poolSize = 2;
713 final int count = 5;
714 final AtomicInteger ran = new AtomicInteger(0);
715 ThreadPoolExecutor p =
716 new CustomTPE(poolSize, poolSize, LONG_DELAY_MS, MILLISECONDS,
717 new ArrayBlockingQueue<Runnable>(10));
718 CountDownLatch threadsStarted = new CountDownLatch(poolSize);
719 Runnable waiter = new CheckedRunnable() { public void realRun() {
720 threadsStarted.countDown();
721 try {
722 MILLISECONDS.sleep(2 * LONG_DELAY_MS);
723 } catch (InterruptedException success) {}
724 ran.getAndIncrement();
725 }};
726 for (int i = 0; i < count; i++)
727 p.execute(waiter);
728 assertTrue(threadsStarted.await(LONG_DELAY_MS, MILLISECONDS));
729 assertEquals(poolSize, p.getActiveCount());
730 assertEquals(0, p.getCompletedTaskCount());
731 final List<Runnable> queuedTasks;
732 try {
733 queuedTasks = p.shutdownNow();
734 } catch (SecurityException ok) {
735 return; // Allowed in case test doesn't have privs
736 }
737 assertTrue(p.isShutdown());
738 assertTrue(p.getQueue().isEmpty());
739 assertEquals(count - poolSize, queuedTasks.size());
740 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
741 assertTrue(p.isTerminated());
742 assertEquals(poolSize, ran.get());
743 assertEquals(poolSize, p.getCompletedTaskCount());
744 }
745
746 // Exception Tests
747
748 /**
749 * Constructor throws if corePoolSize argument is less than zero
750 */
751 public void testConstructor1() {
752 try {
753 new CustomTPE(-1, 1, 1L, SECONDS,
754 new ArrayBlockingQueue<Runnable>(10));
755 shouldThrow();
756 } catch (IllegalArgumentException success) {}
757 }
758
759 /**
760 * Constructor throws if maximumPoolSize is less than zero
761 */
762 public void testConstructor2() {
763 try {
764 new CustomTPE(1, -1, 1L, SECONDS,
765 new ArrayBlockingQueue<Runnable>(10));
766 shouldThrow();
767 } catch (IllegalArgumentException success) {}
768 }
769
770 /**
771 * Constructor throws if maximumPoolSize is equal to zero
772 */
773 public void testConstructor3() {
774 try {
775 new CustomTPE(1, 0, 1L, SECONDS,
776 new ArrayBlockingQueue<Runnable>(10));
777 shouldThrow();
778 } catch (IllegalArgumentException success) {}
779 }
780
781 /**
782 * Constructor throws if keepAliveTime is less than zero
783 */
784 public void testConstructor4() {
785 try {
786 new CustomTPE(1, 2, -1L, SECONDS,
787 new ArrayBlockingQueue<Runnable>(10));
788 shouldThrow();
789 } catch (IllegalArgumentException success) {}
790 }
791
792 /**
793 * Constructor throws if corePoolSize is greater than the maximumPoolSize
794 */
795 public void testConstructor5() {
796 try {
797 new CustomTPE(2, 1, 1L, SECONDS,
798 new ArrayBlockingQueue<Runnable>(10));
799 shouldThrow();
800 } catch (IllegalArgumentException success) {}
801 }
802
803 /**
804 * Constructor throws if workQueue is set to null
805 */
806 public void testConstructorNullPointerException() {
807 try {
808 new CustomTPE(1, 2, 1L, SECONDS, null);
809 shouldThrow();
810 } catch (NullPointerException success) {}
811 }
812
813 /**
814 * Constructor throws if corePoolSize argument is less than zero
815 */
816 public void testConstructor6() {
817 try {
818 new CustomTPE(-1, 1, 1L, SECONDS,
819 new ArrayBlockingQueue<Runnable>(10),
820 new SimpleThreadFactory());
821 shouldThrow();
822 } catch (IllegalArgumentException success) {}
823 }
824
825 /**
826 * Constructor throws if maximumPoolSize is less than zero
827 */
828 public void testConstructor7() {
829 try {
830 new CustomTPE(1,-1, 1L, SECONDS,
831 new ArrayBlockingQueue<Runnable>(10),
832 new SimpleThreadFactory());
833 shouldThrow();
834 } catch (IllegalArgumentException success) {}
835 }
836
837 /**
838 * Constructor throws if maximumPoolSize is equal to zero
839 */
840 public void testConstructor8() {
841 try {
842 new CustomTPE(1, 0, 1L, SECONDS,
843 new ArrayBlockingQueue<Runnable>(10),
844 new SimpleThreadFactory());
845 shouldThrow();
846 } catch (IllegalArgumentException success) {}
847 }
848
849 /**
850 * Constructor throws if keepAliveTime is less than zero
851 */
852 public void testConstructor9() {
853 try {
854 new CustomTPE(1, 2, -1L, SECONDS,
855 new ArrayBlockingQueue<Runnable>(10),
856 new SimpleThreadFactory());
857 shouldThrow();
858 } catch (IllegalArgumentException success) {}
859 }
860
861 /**
862 * Constructor throws if corePoolSize is greater than the maximumPoolSize
863 */
864 public void testConstructor10() {
865 try {
866 new CustomTPE(2, 1, 1L, SECONDS,
867 new ArrayBlockingQueue<Runnable>(10),
868 new SimpleThreadFactory());
869 shouldThrow();
870 } catch (IllegalArgumentException success) {}
871 }
872
873 /**
874 * Constructor throws if workQueue is set to null
875 */
876 public void testConstructorNullPointerException2() {
877 try {
878 new CustomTPE(1, 2, 1L, SECONDS, null, new SimpleThreadFactory());
879 shouldThrow();
880 } catch (NullPointerException success) {}
881 }
882
883 /**
884 * Constructor throws if threadFactory is set to null
885 */
886 public void testConstructorNullPointerException3() {
887 try {
888 new CustomTPE(1, 2, 1L, SECONDS,
889 new ArrayBlockingQueue<Runnable>(10),
890 (ThreadFactory) null);
891 shouldThrow();
892 } catch (NullPointerException success) {}
893 }
894
895 /**
896 * Constructor throws if corePoolSize argument is less than zero
897 */
898 public void testConstructor11() {
899 try {
900 new CustomTPE(-1, 1, 1L, SECONDS,
901 new ArrayBlockingQueue<Runnable>(10),
902 new NoOpREHandler());
903 shouldThrow();
904 } catch (IllegalArgumentException success) {}
905 }
906
907 /**
908 * Constructor throws if maximumPoolSize is less than zero
909 */
910 public void testConstructor12() {
911 try {
912 new CustomTPE(1, -1, 1L, SECONDS,
913 new ArrayBlockingQueue<Runnable>(10),
914 new NoOpREHandler());
915 shouldThrow();
916 } catch (IllegalArgumentException success) {}
917 }
918
919 /**
920 * Constructor throws if maximumPoolSize is equal to zero
921 */
922 public void testConstructor13() {
923 try {
924 new CustomTPE(1, 0, 1L, SECONDS,
925 new ArrayBlockingQueue<Runnable>(10),
926 new NoOpREHandler());
927 shouldThrow();
928 } catch (IllegalArgumentException success) {}
929 }
930
931 /**
932 * Constructor throws if keepAliveTime is less than zero
933 */
934 public void testConstructor14() {
935 try {
936 new CustomTPE(1, 2, -1L, SECONDS,
937 new ArrayBlockingQueue<Runnable>(10),
938 new NoOpREHandler());
939 shouldThrow();
940 } catch (IllegalArgumentException success) {}
941 }
942
943 /**
944 * Constructor throws if corePoolSize is greater than the maximumPoolSize
945 */
946 public void testConstructor15() {
947 try {
948 new CustomTPE(2, 1, 1L, SECONDS,
949 new ArrayBlockingQueue<Runnable>(10),
950 new NoOpREHandler());
951 shouldThrow();
952 } catch (IllegalArgumentException success) {}
953 }
954
955 /**
956 * Constructor throws if workQueue is set to null
957 */
958 public void testConstructorNullPointerException4() {
959 try {
960 new CustomTPE(1, 2, 1L, SECONDS,
961 null,
962 new NoOpREHandler());
963 shouldThrow();
964 } catch (NullPointerException success) {}
965 }
966
967 /**
968 * Constructor throws if handler is set to null
969 */
970 public void testConstructorNullPointerException5() {
971 try {
972 new CustomTPE(1, 2, 1L, SECONDS,
973 new ArrayBlockingQueue<Runnable>(10),
974 (RejectedExecutionHandler) null);
975 shouldThrow();
976 } catch (NullPointerException success) {}
977 }
978
979 /**
980 * Constructor throws if corePoolSize argument is less than zero
981 */
982 public void testConstructor16() {
983 try {
984 new CustomTPE(-1, 1, 1L, SECONDS,
985 new ArrayBlockingQueue<Runnable>(10),
986 new SimpleThreadFactory(),
987 new NoOpREHandler());
988 shouldThrow();
989 } catch (IllegalArgumentException success) {}
990 }
991
992 /**
993 * Constructor throws if maximumPoolSize is less than zero
994 */
995 public void testConstructor17() {
996 try {
997 new CustomTPE(1, -1, 1L, SECONDS,
998 new ArrayBlockingQueue<Runnable>(10),
999 new SimpleThreadFactory(),
1000 new NoOpREHandler());
1001 shouldThrow();
1002 } catch (IllegalArgumentException success) {}
1003 }
1004
1005 /**
1006 * Constructor throws if maximumPoolSize is equal to zero
1007 */
1008 public void testConstructor18() {
1009 try {
1010 new CustomTPE(1, 0, 1L, SECONDS,
1011 new ArrayBlockingQueue<Runnable>(10),
1012 new SimpleThreadFactory(),
1013 new NoOpREHandler());
1014 shouldThrow();
1015 } catch (IllegalArgumentException success) {}
1016 }
1017
1018 /**
1019 * Constructor throws if keepAliveTime is less than zero
1020 */
1021 public void testConstructor19() {
1022 try {
1023 new CustomTPE(1, 2, -1L, SECONDS,
1024 new ArrayBlockingQueue<Runnable>(10),
1025 new SimpleThreadFactory(),
1026 new NoOpREHandler());
1027 shouldThrow();
1028 } catch (IllegalArgumentException success) {}
1029 }
1030
1031 /**
1032 * Constructor throws if corePoolSize is greater than the maximumPoolSize
1033 */
1034 public void testConstructor20() {
1035 try {
1036 new CustomTPE(2, 1, 1L, SECONDS,
1037 new ArrayBlockingQueue<Runnable>(10),
1038 new SimpleThreadFactory(),
1039 new NoOpREHandler());
1040 shouldThrow();
1041 } catch (IllegalArgumentException success) {}
1042 }
1043
1044 /**
1045 * Constructor throws if workQueue is null
1046 */
1047 public void testConstructorNullPointerException6() {
1048 try {
1049 new CustomTPE(1, 2, 1L, SECONDS,
1050 null,
1051 new SimpleThreadFactory(),
1052 new NoOpREHandler());
1053 shouldThrow();
1054 } catch (NullPointerException success) {}
1055 }
1056
1057 /**
1058 * Constructor throws if handler is null
1059 */
1060 public void testConstructorNullPointerException7() {
1061 try {
1062 new CustomTPE(1, 2, 1L, SECONDS,
1063 new ArrayBlockingQueue<Runnable>(10),
1064 new SimpleThreadFactory(),
1065 (RejectedExecutionHandler) null);
1066 shouldThrow();
1067 } catch (NullPointerException success) {}
1068 }
1069
1070 /**
1071 * Constructor throws if ThreadFactory is null
1072 */
1073 public void testConstructorNullPointerException8() {
1074 try {
1075 new CustomTPE(1, 2, 1L, SECONDS,
1076 new ArrayBlockingQueue<Runnable>(10),
1077 (ThreadFactory) null,
1078 new NoOpREHandler());
1079 shouldThrow();
1080 } catch (NullPointerException success) {}
1081 }
1082
1083 /**
1084 * execute throws RejectedExecutionException if saturated.
1085 */
1086 public void testSaturatedExecute() {
1087 ThreadPoolExecutor p =
1088 new CustomTPE(1, 1,
1089 LONG_DELAY_MS, MILLISECONDS,
1090 new ArrayBlockingQueue<Runnable>(1));
1091 final CountDownLatch done = new CountDownLatch(1);
1092 try {
1093 Runnable task = new CheckedRunnable() {
1094 public void realRun() throws InterruptedException {
1095 done.await();
1096 }};
1097 for (int i = 0; i < 2; ++i)
1098 p.execute(task);
1099 for (int i = 0; i < 2; ++i) {
1100 try {
1101 p.execute(task);
1102 shouldThrow();
1103 } catch (RejectedExecutionException success) {}
1104 assertTrue(p.getTaskCount() <= 2);
1105 }
1106 } finally {
1107 done.countDown();
1108 joinPool(p);
1109 }
1110 }
1111
1112 /**
1113 * executor using CallerRunsPolicy runs task if saturated.
1114 */
1115 public void testSaturatedExecute2() {
1116 RejectedExecutionHandler h = new CustomTPE.CallerRunsPolicy();
1117 ThreadPoolExecutor p = new CustomTPE(1, 1,
1118 LONG_DELAY_MS, MILLISECONDS,
1119 new ArrayBlockingQueue<Runnable>(1),
1120 h);
1121 try {
1122 TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5];
1123 for (int i = 0; i < tasks.length; ++i)
1124 tasks[i] = new TrackedNoOpRunnable();
1125 TrackedLongRunnable mr = new TrackedLongRunnable();
1126 p.execute(mr);
1127 for (int i = 0; i < tasks.length; ++i)
1128 p.execute(tasks[i]);
1129 for (int i = 1; i < tasks.length; ++i)
1130 assertTrue(tasks[i].done);
1131 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1132 } finally {
1133 joinPool(p);
1134 }
1135 }
1136
1137 /**
1138 * executor using DiscardPolicy drops task if saturated.
1139 */
1140 public void testSaturatedExecute3() {
1141 RejectedExecutionHandler h = new CustomTPE.DiscardPolicy();
1142 ThreadPoolExecutor p =
1143 new CustomTPE(1, 1,
1144 LONG_DELAY_MS, MILLISECONDS,
1145 new ArrayBlockingQueue<Runnable>(1),
1146 h);
1147 try {
1148 TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5];
1149 for (int i = 0; i < tasks.length; ++i)
1150 tasks[i] = new TrackedNoOpRunnable();
1151 p.execute(new TrackedLongRunnable());
1152 for (TrackedNoOpRunnable task : tasks)
1153 p.execute(task);
1154 for (TrackedNoOpRunnable task : tasks)
1155 assertFalse(task.done);
1156 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1157 } finally {
1158 joinPool(p);
1159 }
1160 }
1161
1162 /**
1163 * executor using DiscardOldestPolicy drops oldest task if saturated.
1164 */
1165 public void testSaturatedExecute4() {
1166 RejectedExecutionHandler h = new CustomTPE.DiscardOldestPolicy();
1167 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1168 try {
1169 p.execute(new TrackedLongRunnable());
1170 TrackedLongRunnable r2 = new TrackedLongRunnable();
1171 p.execute(r2);
1172 assertTrue(p.getQueue().contains(r2));
1173 TrackedNoOpRunnable r3 = new TrackedNoOpRunnable();
1174 p.execute(r3);
1175 assertFalse(p.getQueue().contains(r2));
1176 assertTrue(p.getQueue().contains(r3));
1177 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1178 } finally {
1179 joinPool(p);
1180 }
1181 }
1182
1183 /**
1184 * execute throws RejectedExecutionException if shutdown
1185 */
1186 public void testRejectedExecutionExceptionOnShutdown() {
1187 ThreadPoolExecutor p =
1188 new CustomTPE(1,1,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(1));
1189 try { p.shutdown(); } catch (SecurityException ok) { return; }
1190 try {
1191 p.execute(new NoOpRunnable());
1192 shouldThrow();
1193 } catch (RejectedExecutionException success) {}
1194
1195 joinPool(p);
1196 }
1197
1198 /**
1199 * execute using CallerRunsPolicy drops task on shutdown
1200 */
1201 public void testCallerRunsOnShutdown() {
1202 RejectedExecutionHandler h = new CustomTPE.CallerRunsPolicy();
1203 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1204
1205 try { p.shutdown(); } catch (SecurityException ok) { return; }
1206 try {
1207 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1208 p.execute(r);
1209 assertFalse(r.done);
1210 } finally {
1211 joinPool(p);
1212 }
1213 }
1214
1215 /**
1216 * execute using DiscardPolicy drops task on shutdown
1217 */
1218 public void testDiscardOnShutdown() {
1219 RejectedExecutionHandler h = new CustomTPE.DiscardPolicy();
1220 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1221
1222 try { p.shutdown(); } catch (SecurityException ok) { return; }
1223 try {
1224 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1225 p.execute(r);
1226 assertFalse(r.done);
1227 } finally {
1228 joinPool(p);
1229 }
1230 }
1231
1232 /**
1233 * execute using DiscardOldestPolicy drops task on shutdown
1234 */
1235 public void testDiscardOldestOnShutdown() {
1236 RejectedExecutionHandler h = new CustomTPE.DiscardOldestPolicy();
1237 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1238
1239 try { p.shutdown(); } catch (SecurityException ok) { return; }
1240 try {
1241 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1242 p.execute(r);
1243 assertFalse(r.done);
1244 } finally {
1245 joinPool(p);
1246 }
1247 }
1248
1249 /**
1250 * execute(null) throws NPE
1251 */
1252 public void testExecuteNull() {
1253 ThreadPoolExecutor p =
1254 new CustomTPE(1, 2, 1L, SECONDS,
1255 new ArrayBlockingQueue<Runnable>(10));
1256 try {
1257 p.execute(null);
1258 shouldThrow();
1259 } catch (NullPointerException success) {}
1260
1261 joinPool(p);
1262 }
1263
1264 /**
1265 * setCorePoolSize of negative value throws IllegalArgumentException
1266 */
1267 public void testCorePoolSizeIllegalArgumentException() {
1268 ThreadPoolExecutor p =
1269 new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1270 try {
1271 p.setCorePoolSize(-1);
1272 shouldThrow();
1273 } catch (IllegalArgumentException success) {
1274 } finally {
1275 try { p.shutdown(); } catch (SecurityException ok) { return; }
1276 }
1277 joinPool(p);
1278 }
1279
1280 /**
1281 * setMaximumPoolSize(int) throws IllegalArgumentException
1282 * if given a value less the core pool size
1283 */
1284 public void testMaximumPoolSizeIllegalArgumentException() {
1285 ThreadPoolExecutor p =
1286 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1287 try {
1288 p.setMaximumPoolSize(1);
1289 shouldThrow();
1290 } catch (IllegalArgumentException success) {
1291 } finally {
1292 try { p.shutdown(); } catch (SecurityException ok) { return; }
1293 }
1294 joinPool(p);
1295 }
1296
1297 /**
1298 * setMaximumPoolSize throws IllegalArgumentException
1299 * if given a negative value
1300 */
1301 public void testMaximumPoolSizeIllegalArgumentException2() {
1302 ThreadPoolExecutor p =
1303 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1304 try {
1305 p.setMaximumPoolSize(-1);
1306 shouldThrow();
1307 } catch (IllegalArgumentException success) {
1308 } finally {
1309 try { p.shutdown(); } catch (SecurityException ok) { return; }
1310 }
1311 joinPool(p);
1312 }
1313
1314 /**
1315 * setKeepAliveTime throws IllegalArgumentException
1316 * when given a negative value
1317 */
1318 public void testKeepAliveTimeIllegalArgumentException() {
1319 ThreadPoolExecutor p =
1320 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1321
1322 try {
1323 p.setKeepAliveTime(-1,MILLISECONDS);
1324 shouldThrow();
1325 } catch (IllegalArgumentException success) {
1326 } finally {
1327 try { p.shutdown(); } catch (SecurityException ok) { return; }
1328 }
1329 joinPool(p);
1330 }
1331
1332 /**
1333 * terminated() is called on termination
1334 */
1335 public void testTerminated() {
1336 CustomTPE p = new CustomTPE();
1337 try { p.shutdown(); } catch (SecurityException ok) { return; }
1338 assertTrue(p.terminatedCalled());
1339 joinPool(p);
1340 }
1341
1342 /**
1343 * beforeExecute and afterExecute are called when executing task
1344 */
1345 public void testBeforeAfter() throws InterruptedException {
1346 CustomTPE p = new CustomTPE();
1347 try {
1348 final CountDownLatch done = new CountDownLatch(1);
1349 p.execute(new CheckedRunnable() {
1350 public void realRun() {
1351 done.countDown();
1352 }});
1353 await(p.afterCalled);
1354 assertEquals(0, done.getCount());
1355 assertTrue(p.afterCalled());
1356 assertTrue(p.beforeCalled());
1357 try { p.shutdown(); } catch (SecurityException ok) { return; }
1358 } finally {
1359 joinPool(p);
1360 }
1361 }
1362
1363 /**
1364 * completed submit of callable returns result
1365 */
1366 public void testSubmitCallable() throws Exception {
1367 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1368 try {
1369 Future<String> future = e.submit(new StringTask());
1370 String result = future.get();
1371 assertSame(TEST_STRING, result);
1372 } finally {
1373 joinPool(e);
1374 }
1375 }
1376
1377 /**
1378 * completed submit of runnable returns successfully
1379 */
1380 public void testSubmitRunnable() throws Exception {
1381 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1382 try {
1383 Future<?> future = e.submit(new NoOpRunnable());
1384 future.get();
1385 assertTrue(future.isDone());
1386 } finally {
1387 joinPool(e);
1388 }
1389 }
1390
1391 /**
1392 * completed submit of (runnable, result) returns result
1393 */
1394 public void testSubmitRunnable2() throws Exception {
1395 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1396 try {
1397 Future<String> future = e.submit(new NoOpRunnable(), TEST_STRING);
1398 String result = future.get();
1399 assertSame(TEST_STRING, result);
1400 } finally {
1401 joinPool(e);
1402 }
1403 }
1404
1405 /**
1406 * invokeAny(null) throws NPE
1407 */
1408 public void testInvokeAny1() throws Exception {
1409 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1410 try {
1411 e.invokeAny(null);
1412 shouldThrow();
1413 } catch (NullPointerException success) {
1414 } finally {
1415 joinPool(e);
1416 }
1417 }
1418
1419 /**
1420 * invokeAny(empty collection) throws IAE
1421 */
1422 public void testInvokeAny2() throws Exception {
1423 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1424 try {
1425 e.invokeAny(new ArrayList<Callable<String>>());
1426 shouldThrow();
1427 } catch (IllegalArgumentException success) {
1428 } finally {
1429 joinPool(e);
1430 }
1431 }
1432
1433 /**
1434 * invokeAny(c) throws NPE if c has null elements
1435 */
1436 public void testInvokeAny3() throws Exception {
1437 CountDownLatch latch = new CountDownLatch(1);
1438 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1439 List<Callable<String>> l = new ArrayList<Callable<String>>();
1440 l.add(latchAwaitingStringTask(latch));
1441 l.add(null);
1442 try {
1443 e.invokeAny(l);
1444 shouldThrow();
1445 } catch (NullPointerException success) {
1446 } finally {
1447 latch.countDown();
1448 joinPool(e);
1449 }
1450 }
1451
1452 /**
1453 * invokeAny(c) throws ExecutionException if no task completes
1454 */
1455 public void testInvokeAny4() throws Exception {
1456 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1457 List<Callable<String>> l = new ArrayList<Callable<String>>();
1458 l.add(new NPETask());
1459 try {
1460 e.invokeAny(l);
1461 shouldThrow();
1462 } catch (ExecutionException success) {
1463 assertTrue(success.getCause() instanceof NullPointerException);
1464 } finally {
1465 joinPool(e);
1466 }
1467 }
1468
1469 /**
1470 * invokeAny(c) returns result of some task
1471 */
1472 public void testInvokeAny5() throws Exception {
1473 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1474 try {
1475 List<Callable<String>> l = new ArrayList<Callable<String>>();
1476 l.add(new StringTask());
1477 l.add(new StringTask());
1478 String result = e.invokeAny(l);
1479 assertSame(TEST_STRING, result);
1480 } finally {
1481 joinPool(e);
1482 }
1483 }
1484
1485 /**
1486 * invokeAll(null) throws NPE
1487 */
1488 public void testInvokeAll1() throws Exception {
1489 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1490 try {
1491 e.invokeAll(null);
1492 shouldThrow();
1493 } catch (NullPointerException success) {
1494 } finally {
1495 joinPool(e);
1496 }
1497 }
1498
1499 /**
1500 * invokeAll(empty collection) returns empty collection
1501 */
1502 public void testInvokeAll2() throws Exception {
1503 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1504 try {
1505 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>());
1506 assertTrue(r.isEmpty());
1507 } finally {
1508 joinPool(e);
1509 }
1510 }
1511
1512 /**
1513 * invokeAll(c) throws NPE if c has null elements
1514 */
1515 public void testInvokeAll3() throws Exception {
1516 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1517 List<Callable<String>> l = new ArrayList<Callable<String>>();
1518 l.add(new StringTask());
1519 l.add(null);
1520 try {
1521 e.invokeAll(l);
1522 shouldThrow();
1523 } catch (NullPointerException success) {
1524 } finally {
1525 joinPool(e);
1526 }
1527 }
1528
1529 /**
1530 * get of element of invokeAll(c) throws exception on failed task
1531 */
1532 public void testInvokeAll4() throws Exception {
1533 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1534 List<Callable<String>> l = new ArrayList<Callable<String>>();
1535 l.add(new NPETask());
1536 List<Future<String>> futures = e.invokeAll(l);
1537 assertEquals(1, futures.size());
1538 try {
1539 futures.get(0).get();
1540 shouldThrow();
1541 } catch (ExecutionException success) {
1542 assertTrue(success.getCause() instanceof NullPointerException);
1543 } finally {
1544 joinPool(e);
1545 }
1546 }
1547
1548 /**
1549 * invokeAll(c) returns results of all completed tasks
1550 */
1551 public void testInvokeAll5() throws Exception {
1552 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1553 try {
1554 List<Callable<String>> l = new ArrayList<Callable<String>>();
1555 l.add(new StringTask());
1556 l.add(new StringTask());
1557 List<Future<String>> futures = e.invokeAll(l);
1558 assertEquals(2, futures.size());
1559 for (Future<String> future : futures)
1560 assertSame(TEST_STRING, future.get());
1561 } finally {
1562 joinPool(e);
1563 }
1564 }
1565
1566 /**
1567 * timed invokeAny(null) throws NPE
1568 */
1569 public void testTimedInvokeAny1() throws Exception {
1570 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1571 try {
1572 e.invokeAny(null, MEDIUM_DELAY_MS, MILLISECONDS);
1573 shouldThrow();
1574 } catch (NullPointerException success) {
1575 } finally {
1576 joinPool(e);
1577 }
1578 }
1579
1580 /**
1581 * timed invokeAny(,,null) throws NPE
1582 */
1583 public void testTimedInvokeAnyNullTimeUnit() throws Exception {
1584 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1585 List<Callable<String>> l = new ArrayList<Callable<String>>();
1586 l.add(new StringTask());
1587 try {
1588 e.invokeAny(l, MEDIUM_DELAY_MS, null);
1589 shouldThrow();
1590 } catch (NullPointerException success) {
1591 } finally {
1592 joinPool(e);
1593 }
1594 }
1595
1596 /**
1597 * timed invokeAny(empty collection) throws IAE
1598 */
1599 public void testTimedInvokeAny2() throws Exception {
1600 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1601 try {
1602 e.invokeAny(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1603 shouldThrow();
1604 } catch (IllegalArgumentException success) {
1605 } finally {
1606 joinPool(e);
1607 }
1608 }
1609
1610 /**
1611 * timed invokeAny(c) throws NPE if c has null elements
1612 */
1613 public void testTimedInvokeAny3() throws Exception {
1614 CountDownLatch latch = new CountDownLatch(1);
1615 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1616 List<Callable<String>> l = new ArrayList<Callable<String>>();
1617 l.add(latchAwaitingStringTask(latch));
1618 l.add(null);
1619 try {
1620 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1621 shouldThrow();
1622 } catch (NullPointerException success) {
1623 } finally {
1624 latch.countDown();
1625 joinPool(e);
1626 }
1627 }
1628
1629 /**
1630 * timed invokeAny(c) throws ExecutionException if no task completes
1631 */
1632 public void testTimedInvokeAny4() throws Exception {
1633 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1634 List<Callable<String>> l = new ArrayList<Callable<String>>();
1635 l.add(new NPETask());
1636 try {
1637 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1638 shouldThrow();
1639 } catch (ExecutionException success) {
1640 assertTrue(success.getCause() instanceof NullPointerException);
1641 } finally {
1642 joinPool(e);
1643 }
1644 }
1645
1646 /**
1647 * timed invokeAny(c) returns result of some task
1648 */
1649 public void testTimedInvokeAny5() throws Exception {
1650 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1651 try {
1652 List<Callable<String>> l = new ArrayList<Callable<String>>();
1653 l.add(new StringTask());
1654 l.add(new StringTask());
1655 String result = e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1656 assertSame(TEST_STRING, result);
1657 } finally {
1658 joinPool(e);
1659 }
1660 }
1661
1662 /**
1663 * timed invokeAll(null) throws NPE
1664 */
1665 public void testTimedInvokeAll1() throws Exception {
1666 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1667 try {
1668 e.invokeAll(null, MEDIUM_DELAY_MS, MILLISECONDS);
1669 shouldThrow();
1670 } catch (NullPointerException success) {
1671 } finally {
1672 joinPool(e);
1673 }
1674 }
1675
1676 /**
1677 * timed invokeAll(,,null) throws NPE
1678 */
1679 public void testTimedInvokeAllNullTimeUnit() throws Exception {
1680 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1681 List<Callable<String>> l = new ArrayList<Callable<String>>();
1682 l.add(new StringTask());
1683 try {
1684 e.invokeAll(l, MEDIUM_DELAY_MS, null);
1685 shouldThrow();
1686 } catch (NullPointerException success) {
1687 } finally {
1688 joinPool(e);
1689 }
1690 }
1691
1692 /**
1693 * timed invokeAll(empty collection) returns empty collection
1694 */
1695 public void testTimedInvokeAll2() throws Exception {
1696 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1697 try {
1698 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1699 assertTrue(r.isEmpty());
1700 } finally {
1701 joinPool(e);
1702 }
1703 }
1704
1705 /**
1706 * timed invokeAll(c) throws NPE if c has null elements
1707 */
1708 public void testTimedInvokeAll3() throws Exception {
1709 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1710 List<Callable<String>> l = new ArrayList<Callable<String>>();
1711 l.add(new StringTask());
1712 l.add(null);
1713 try {
1714 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1715 shouldThrow();
1716 } catch (NullPointerException success) {
1717 } finally {
1718 joinPool(e);
1719 }
1720 }
1721
1722 /**
1723 * get of element of invokeAll(c) throws exception on failed task
1724 */
1725 public void testTimedInvokeAll4() throws Exception {
1726 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1727 List<Callable<String>> l = new ArrayList<Callable<String>>();
1728 l.add(new NPETask());
1729 List<Future<String>> futures =
1730 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1731 assertEquals(1, futures.size());
1732 try {
1733 futures.get(0).get();
1734 shouldThrow();
1735 } catch (ExecutionException success) {
1736 assertTrue(success.getCause() instanceof NullPointerException);
1737 } finally {
1738 joinPool(e);
1739 }
1740 }
1741
1742 /**
1743 * timed invokeAll(c) returns results of all completed tasks
1744 */
1745 public void testTimedInvokeAll5() throws Exception {
1746 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1747 try {
1748 List<Callable<String>> l = new ArrayList<Callable<String>>();
1749 l.add(new StringTask());
1750 l.add(new StringTask());
1751 List<Future<String>> futures =
1752 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1753 assertEquals(2, futures.size());
1754 for (Future<String> future : futures)
1755 assertSame(TEST_STRING, future.get());
1756 } finally {
1757 joinPool(e);
1758 }
1759 }
1760
1761 /**
1762 * timed invokeAll(c) cancels tasks not completed by timeout
1763 */
1764 public void testTimedInvokeAll6() throws Exception {
1765 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1766 try {
1767 for (long timeout = timeoutMillis();;) {
1768 List<Callable<String>> tasks = new ArrayList<>();
1769 tasks.add(new StringTask("0"));
1770 tasks.add(Executors.callable(new LongPossiblyInterruptedRunnable(), TEST_STRING));
1771 tasks.add(new StringTask("2"));
1772 long startTime = System.nanoTime();
1773 List<Future<String>> futures =
1774 e.invokeAll(tasks, timeout, MILLISECONDS);
1775 assertEquals(tasks.size(), futures.size());
1776 assertTrue(millisElapsedSince(startTime) >= timeout);
1777 for (Future future : futures)
1778 assertTrue(future.isDone());
1779 assertTrue(futures.get(1).isCancelled());
1780 try {
1781 assertEquals("0", futures.get(0).get());
1782 assertEquals("2", futures.get(2).get());
1783 break;
1784 } catch (CancellationException retryWithLongerTimeout) {
1785 timeout *= 2;
1786 if (timeout >= LONG_DELAY_MS / 2)
1787 fail("expected exactly one task to be cancelled");
1788 }
1789 }
1790 } finally {
1791 joinPool(e);
1792 }
1793 }
1794
1795 /**
1796 * Execution continues if there is at least one thread even if
1797 * thread factory fails to create more
1798 */
1799 public void testFailingThreadFactory() throws InterruptedException {
1800 final ExecutorService e =
1801 new CustomTPE(100, 100,
1802 LONG_DELAY_MS, MILLISECONDS,
1803 new LinkedBlockingQueue<Runnable>(),
1804 new FailingThreadFactory());
1805 try {
1806 final int TASKS = 100;
1807 final CountDownLatch done = new CountDownLatch(TASKS);
1808 for (int k = 0; k < TASKS; ++k)
1809 e.execute(new CheckedRunnable() {
1810 public void realRun() {
1811 done.countDown();
1812 }});
1813 assertTrue(done.await(LONG_DELAY_MS, MILLISECONDS));
1814 } finally {
1815 joinPool(e);
1816 }
1817 }
1818
1819 /**
1820 * allowsCoreThreadTimeOut is by default false.
1821 */
1822 public void testAllowsCoreThreadTimeOut() {
1823 ThreadPoolExecutor p = new CustomTPE(2, 2, 1000, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1824 assertFalse(p.allowsCoreThreadTimeOut());
1825 joinPool(p);
1826 }
1827
1828 /**
1829 * allowCoreThreadTimeOut(true) causes idle threads to time out
1830 */
1831 public void testAllowCoreThreadTimeOut_true() throws Exception {
1832 long keepAliveTime = timeoutMillis();
1833 final ThreadPoolExecutor p =
1834 new CustomTPE(2, 10,
1835 keepAliveTime, MILLISECONDS,
1836 new ArrayBlockingQueue<Runnable>(10));
1837 final CountDownLatch threadStarted = new CountDownLatch(1);
1838 try {
1839 p.allowCoreThreadTimeOut(true);
1840 p.execute(new CheckedRunnable() {
1841 public void realRun() {
1842 threadStarted.countDown();
1843 assertEquals(1, p.getPoolSize());
1844 }});
1845 await(threadStarted);
1846 delay(keepAliveTime);
1847 long startTime = System.nanoTime();
1848 while (p.getPoolSize() > 0
1849 && millisElapsedSince(startTime) < LONG_DELAY_MS)
1850 Thread.yield();
1851 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1852 assertEquals(0, p.getPoolSize());
1853 } finally {
1854 joinPool(p);
1855 }
1856 }
1857
1858 /**
1859 * allowCoreThreadTimeOut(false) causes idle threads not to time out
1860 */
1861 public void testAllowCoreThreadTimeOut_false() throws Exception {
1862 long keepAliveTime = timeoutMillis();
1863 final ThreadPoolExecutor p =
1864 new CustomTPE(2, 10,
1865 keepAliveTime, MILLISECONDS,
1866 new ArrayBlockingQueue<Runnable>(10));
1867 final CountDownLatch threadStarted = new CountDownLatch(1);
1868 try {
1869 p.allowCoreThreadTimeOut(false);
1870 p.execute(new CheckedRunnable() {
1871 public void realRun() throws InterruptedException {
1872 threadStarted.countDown();
1873 assertTrue(p.getPoolSize() >= 1);
1874 }});
1875 delay(2 * keepAliveTime);
1876 assertTrue(p.getPoolSize() >= 1);
1877 } finally {
1878 joinPool(p);
1879 }
1880 }
1881
1882 /**
1883 * get(cancelled task) throws CancellationException
1884 * (in part, a test of CustomTPE itself)
1885 */
1886 public void testGet_cancelled() throws Exception {
1887 final ExecutorService e =
1888 new CustomTPE(1, 1,
1889 LONG_DELAY_MS, MILLISECONDS,
1890 new LinkedBlockingQueue<Runnable>());
1891 try {
1892 final CountDownLatch blockerStarted = new CountDownLatch(1);
1893 final CountDownLatch done = new CountDownLatch(1);
1894 final List<Future<?>> futures = new ArrayList<>();
1895 for (int i = 0; i < 2; i++) {
1896 Runnable r = new CheckedRunnable() { public void realRun()
1897 throws Throwable {
1898 blockerStarted.countDown();
1899 assertTrue(done.await(2 * LONG_DELAY_MS, MILLISECONDS));
1900 }};
1901 futures.add(e.submit(r));
1902 }
1903 assertTrue(blockerStarted.await(LONG_DELAY_MS, MILLISECONDS));
1904 for (Future<?> future : futures) future.cancel(false);
1905 for (Future<?> future : futures) {
1906 try {
1907 future.get();
1908 shouldThrow();
1909 } catch (CancellationException success) {}
1910 try {
1911 future.get(LONG_DELAY_MS, MILLISECONDS);
1912 shouldThrow();
1913 } catch (CancellationException success) {}
1914 assertTrue(future.isCancelled());
1915 assertTrue(future.isDone());
1916 }
1917 done.countDown();
1918 } finally {
1919 joinPool(e);
1920 }
1921 }
1922
1923 }