ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/ThreadPoolExecutorSubclassTest.java
Revision: 1.49
Committed: Sun Oct 4 00:30:50 2015 UTC (8 years, 7 months ago) by jsr166
Branch: MAIN
Changes since 1.48: +5 -5 lines
Log Message:
rejigger pool closing infrastructure

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include Andrew Wright, Jeffrey Hayes,
6 * Pat Fisher, Mike Judd.
7 */
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.SECONDS;
11
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.concurrent.ArrayBlockingQueue;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.CancellationException;
18 import java.util.concurrent.CountDownLatch;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.FutureTask;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.RejectedExecutionHandler;
27 import java.util.concurrent.RunnableFuture;
28 import java.util.concurrent.SynchronousQueue;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.TimeoutException;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.locks.Condition;
35 import java.util.concurrent.locks.ReentrantLock;
36
37 import junit.framework.Test;
38 import junit.framework.TestSuite;
39
40 public class ThreadPoolExecutorSubclassTest extends JSR166TestCase {
41 public static void main(String[] args) {
42 main(suite(), args);
43 }
44 public static Test suite() {
45 return new TestSuite(ThreadPoolExecutorSubclassTest.class);
46 }
47
48 static class CustomTask<V> implements RunnableFuture<V> {
49 final Callable<V> callable;
50 final ReentrantLock lock = new ReentrantLock();
51 final Condition cond = lock.newCondition();
52 boolean done;
53 boolean cancelled;
54 V result;
55 Thread thread;
56 Exception exception;
57 CustomTask(Callable<V> c) {
58 if (c == null) throw new NullPointerException();
59 callable = c;
60 }
61 CustomTask(final Runnable r, final V res) {
62 if (r == null) throw new NullPointerException();
63 callable = new Callable<V>() {
64 public V call() throws Exception { r.run(); return res; }};
65 }
66 public boolean isDone() {
67 lock.lock(); try { return done; } finally { lock.unlock() ; }
68 }
69 public boolean isCancelled() {
70 lock.lock(); try { return cancelled; } finally { lock.unlock() ; }
71 }
72 public boolean cancel(boolean mayInterrupt) {
73 lock.lock();
74 try {
75 if (!done) {
76 cancelled = true;
77 done = true;
78 if (mayInterrupt && thread != null)
79 thread.interrupt();
80 return true;
81 }
82 return false;
83 }
84 finally { lock.unlock() ; }
85 }
86 public void run() {
87 lock.lock();
88 try {
89 if (done)
90 return;
91 thread = Thread.currentThread();
92 }
93 finally { lock.unlock() ; }
94 V v = null;
95 Exception e = null;
96 try {
97 v = callable.call();
98 }
99 catch (Exception ex) {
100 e = ex;
101 }
102 lock.lock();
103 try {
104 if (!done) {
105 result = v;
106 exception = e;
107 done = true;
108 thread = null;
109 cond.signalAll();
110 }
111 }
112 finally { lock.unlock(); }
113 }
114 public V get() throws InterruptedException, ExecutionException {
115 lock.lock();
116 try {
117 while (!done)
118 cond.await();
119 if (cancelled)
120 throw new CancellationException();
121 if (exception != null)
122 throw new ExecutionException(exception);
123 return result;
124 }
125 finally { lock.unlock(); }
126 }
127 public V get(long timeout, TimeUnit unit)
128 throws InterruptedException, ExecutionException, TimeoutException {
129 long nanos = unit.toNanos(timeout);
130 lock.lock();
131 try {
132 while (!done) {
133 if (nanos <= 0L)
134 throw new TimeoutException();
135 nanos = cond.awaitNanos(nanos);
136 }
137 if (cancelled)
138 throw new CancellationException();
139 if (exception != null)
140 throw new ExecutionException(exception);
141 return result;
142 }
143 finally { lock.unlock(); }
144 }
145 }
146
147 static class CustomTPE extends ThreadPoolExecutor {
148 protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
149 return new CustomTask<V>(c);
150 }
151 protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
152 return new CustomTask<V>(r, v);
153 }
154
155 CustomTPE(int corePoolSize,
156 int maximumPoolSize,
157 long keepAliveTime,
158 TimeUnit unit,
159 BlockingQueue<Runnable> workQueue) {
160 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
161 workQueue);
162 }
163 CustomTPE(int corePoolSize,
164 int maximumPoolSize,
165 long keepAliveTime,
166 TimeUnit unit,
167 BlockingQueue<Runnable> workQueue,
168 ThreadFactory threadFactory) {
169 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
170 threadFactory);
171 }
172
173 CustomTPE(int corePoolSize,
174 int maximumPoolSize,
175 long keepAliveTime,
176 TimeUnit unit,
177 BlockingQueue<Runnable> workQueue,
178 RejectedExecutionHandler handler) {
179 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
180 handler);
181 }
182 CustomTPE(int corePoolSize,
183 int maximumPoolSize,
184 long keepAliveTime,
185 TimeUnit unit,
186 BlockingQueue<Runnable> workQueue,
187 ThreadFactory threadFactory,
188 RejectedExecutionHandler handler) {
189 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
190 workQueue, threadFactory, handler);
191 }
192
193 final CountDownLatch beforeCalled = new CountDownLatch(1);
194 final CountDownLatch afterCalled = new CountDownLatch(1);
195 final CountDownLatch terminatedCalled = new CountDownLatch(1);
196
197 public CustomTPE() {
198 super(1, 1, LONG_DELAY_MS, MILLISECONDS, new SynchronousQueue<Runnable>());
199 }
200 protected void beforeExecute(Thread t, Runnable r) {
201 beforeCalled.countDown();
202 }
203 protected void afterExecute(Runnable r, Throwable t) {
204 afterCalled.countDown();
205 }
206 protected void terminated() {
207 terminatedCalled.countDown();
208 }
209
210 public boolean beforeCalled() {
211 return beforeCalled.getCount() == 0;
212 }
213 public boolean afterCalled() {
214 return afterCalled.getCount() == 0;
215 }
216 public boolean terminatedCalled() {
217 return terminatedCalled.getCount() == 0;
218 }
219 }
220
221 static class FailingThreadFactory implements ThreadFactory {
222 int calls = 0;
223 public Thread newThread(Runnable r) {
224 if (++calls > 1) return null;
225 return new Thread(r);
226 }
227 }
228
229 /**
230 * execute successfully executes a runnable
231 */
232 public void testExecute() throws InterruptedException {
233 try (PoolCleaner<CustomTPE> cleaner =
234 cleaner(new CustomTPE(1, 1,
235 2 * LONG_DELAY_MS, MILLISECONDS,
236 new ArrayBlockingQueue<Runnable>(10)))) {
237 final ThreadPoolExecutor p = cleaner.pool;
238 final CountDownLatch done = new CountDownLatch(1);
239 final Runnable task = new CheckedRunnable() {
240 public void realRun() { done.countDown();
241 }};
242 p.execute(task);
243 assertTrue(done.await(LONG_DELAY_MS, MILLISECONDS));
244 }
245 }
246
247 /**
248 * getActiveCount increases but doesn't overestimate, when a
249 * thread becomes active
250 */
251 public void testGetActiveCount() throws InterruptedException {
252 final ThreadPoolExecutor p =
253 new CustomTPE(2, 2,
254 LONG_DELAY_MS, MILLISECONDS,
255 new ArrayBlockingQueue<Runnable>(10));
256 final CountDownLatch threadStarted = new CountDownLatch(1);
257 final CountDownLatch done = new CountDownLatch(1);
258 try {
259 assertEquals(0, p.getActiveCount());
260 p.execute(new CheckedRunnable() {
261 public void realRun() throws InterruptedException {
262 threadStarted.countDown();
263 assertEquals(1, p.getActiveCount());
264 done.await();
265 }});
266 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
267 assertEquals(1, p.getActiveCount());
268 } finally {
269 done.countDown();
270 joinPool(p);
271 }
272 }
273
274 /**
275 * prestartCoreThread starts a thread if under corePoolSize, else doesn't
276 */
277 public void testPrestartCoreThread() {
278 ThreadPoolExecutor p = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
279 assertEquals(0, p.getPoolSize());
280 assertTrue(p.prestartCoreThread());
281 assertEquals(1, p.getPoolSize());
282 assertTrue(p.prestartCoreThread());
283 assertEquals(2, p.getPoolSize());
284 assertFalse(p.prestartCoreThread());
285 assertEquals(2, p.getPoolSize());
286 joinPool(p);
287 }
288
289 /**
290 * prestartAllCoreThreads starts all corePoolSize threads
291 */
292 public void testPrestartAllCoreThreads() {
293 ThreadPoolExecutor p = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
294 assertEquals(0, p.getPoolSize());
295 p.prestartAllCoreThreads();
296 assertEquals(2, p.getPoolSize());
297 p.prestartAllCoreThreads();
298 assertEquals(2, p.getPoolSize());
299 joinPool(p);
300 }
301
302 /**
303 * getCompletedTaskCount increases, but doesn't overestimate,
304 * when tasks complete
305 */
306 public void testGetCompletedTaskCount() throws InterruptedException {
307 final ThreadPoolExecutor p =
308 new CustomTPE(2, 2,
309 LONG_DELAY_MS, MILLISECONDS,
310 new ArrayBlockingQueue<Runnable>(10));
311 final CountDownLatch threadStarted = new CountDownLatch(1);
312 final CountDownLatch threadProceed = new CountDownLatch(1);
313 final CountDownLatch threadDone = new CountDownLatch(1);
314 try {
315 assertEquals(0, p.getCompletedTaskCount());
316 p.execute(new CheckedRunnable() {
317 public void realRun() throws InterruptedException {
318 threadStarted.countDown();
319 assertEquals(0, p.getCompletedTaskCount());
320 threadProceed.await();
321 threadDone.countDown();
322 }});
323 await(threadStarted);
324 assertEquals(0, p.getCompletedTaskCount());
325 threadProceed.countDown();
326 threadDone.await();
327 long startTime = System.nanoTime();
328 while (p.getCompletedTaskCount() != 1) {
329 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
330 fail("timed out");
331 Thread.yield();
332 }
333 } finally {
334 joinPool(p);
335 }
336 }
337
338 /**
339 * getCorePoolSize returns size given in constructor if not otherwise set
340 */
341 public void testGetCorePoolSize() {
342 ThreadPoolExecutor p = new CustomTPE(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
343 assertEquals(1, p.getCorePoolSize());
344 joinPool(p);
345 }
346
347 /**
348 * getKeepAliveTime returns value given in constructor if not otherwise set
349 */
350 public void testGetKeepAliveTime() {
351 ThreadPoolExecutor p = new CustomTPE(2, 2, 1000, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
352 assertEquals(1, p.getKeepAliveTime(SECONDS));
353 joinPool(p);
354 }
355
356 /**
357 * getThreadFactory returns factory in constructor if not set
358 */
359 public void testGetThreadFactory() {
360 ThreadFactory tf = new SimpleThreadFactory();
361 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), tf, new NoOpREHandler());
362 assertSame(tf, p.getThreadFactory());
363 joinPool(p);
364 }
365
366 /**
367 * setThreadFactory sets the thread factory returned by getThreadFactory
368 */
369 public void testSetThreadFactory() {
370 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
371 ThreadFactory tf = new SimpleThreadFactory();
372 p.setThreadFactory(tf);
373 assertSame(tf, p.getThreadFactory());
374 joinPool(p);
375 }
376
377 /**
378 * setThreadFactory(null) throws NPE
379 */
380 public void testSetThreadFactoryNull() {
381 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
382 try {
383 p.setThreadFactory(null);
384 shouldThrow();
385 } catch (NullPointerException success) {
386 } finally {
387 joinPool(p);
388 }
389 }
390
391 /**
392 * getRejectedExecutionHandler returns handler in constructor if not set
393 */
394 public void testGetRejectedExecutionHandler() {
395 RejectedExecutionHandler h = new NoOpREHandler();
396 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), h);
397 assertSame(h, p.getRejectedExecutionHandler());
398 joinPool(p);
399 }
400
401 /**
402 * setRejectedExecutionHandler sets the handler returned by
403 * getRejectedExecutionHandler
404 */
405 public void testSetRejectedExecutionHandler() {
406 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
407 RejectedExecutionHandler h = new NoOpREHandler();
408 p.setRejectedExecutionHandler(h);
409 assertSame(h, p.getRejectedExecutionHandler());
410 joinPool(p);
411 }
412
413 /**
414 * setRejectedExecutionHandler(null) throws NPE
415 */
416 public void testSetRejectedExecutionHandlerNull() {
417 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
418 try {
419 p.setRejectedExecutionHandler(null);
420 shouldThrow();
421 } catch (NullPointerException success) {
422 } finally {
423 joinPool(p);
424 }
425 }
426
427 /**
428 * getLargestPoolSize increases, but doesn't overestimate, when
429 * multiple threads active
430 */
431 public void testGetLargestPoolSize() throws InterruptedException {
432 final int THREADS = 3;
433 final ThreadPoolExecutor p =
434 new CustomTPE(THREADS, THREADS,
435 LONG_DELAY_MS, MILLISECONDS,
436 new ArrayBlockingQueue<Runnable>(10));
437 final CountDownLatch threadsStarted = new CountDownLatch(THREADS);
438 final CountDownLatch done = new CountDownLatch(1);
439 try {
440 assertEquals(0, p.getLargestPoolSize());
441 for (int i = 0; i < THREADS; i++)
442 p.execute(new CheckedRunnable() {
443 public void realRun() throws InterruptedException {
444 threadsStarted.countDown();
445 done.await();
446 assertEquals(THREADS, p.getLargestPoolSize());
447 }});
448 assertTrue(threadsStarted.await(SMALL_DELAY_MS, MILLISECONDS));
449 assertEquals(THREADS, p.getLargestPoolSize());
450 } finally {
451 done.countDown();
452 joinPool(p);
453 assertEquals(THREADS, p.getLargestPoolSize());
454 }
455 }
456
457 /**
458 * getMaximumPoolSize returns value given in constructor if not
459 * otherwise set
460 */
461 public void testGetMaximumPoolSize() {
462 ThreadPoolExecutor p = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
463 assertEquals(2, p.getMaximumPoolSize());
464 joinPool(p);
465 }
466
467 /**
468 * getPoolSize increases, but doesn't overestimate, when threads
469 * become active
470 */
471 public void testGetPoolSize() throws InterruptedException {
472 final ThreadPoolExecutor p =
473 new CustomTPE(1, 1,
474 LONG_DELAY_MS, MILLISECONDS,
475 new ArrayBlockingQueue<Runnable>(10));
476 final CountDownLatch threadStarted = new CountDownLatch(1);
477 final CountDownLatch done = new CountDownLatch(1);
478 try {
479 assertEquals(0, p.getPoolSize());
480 p.execute(new CheckedRunnable() {
481 public void realRun() throws InterruptedException {
482 threadStarted.countDown();
483 assertEquals(1, p.getPoolSize());
484 done.await();
485 }});
486 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
487 assertEquals(1, p.getPoolSize());
488 } finally {
489 done.countDown();
490 joinPool(p);
491 }
492 }
493
494 /**
495 * getTaskCount increases, but doesn't overestimate, when tasks submitted
496 */
497 public void testGetTaskCount() throws InterruptedException {
498 final ThreadPoolExecutor p =
499 new CustomTPE(1, 1,
500 LONG_DELAY_MS, MILLISECONDS,
501 new ArrayBlockingQueue<Runnable>(10));
502 final CountDownLatch threadStarted = new CountDownLatch(1);
503 final CountDownLatch done = new CountDownLatch(1);
504 try {
505 assertEquals(0, p.getTaskCount());
506 p.execute(new CheckedRunnable() {
507 public void realRun() throws InterruptedException {
508 threadStarted.countDown();
509 assertEquals(1, p.getTaskCount());
510 done.await();
511 }});
512 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
513 assertEquals(1, p.getTaskCount());
514 } finally {
515 done.countDown();
516 joinPool(p);
517 }
518 }
519
520 /**
521 * isShutdown is false before shutdown, true after
522 */
523 public void testIsShutdown() {
524
525 ThreadPoolExecutor p = new CustomTPE(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
526 assertFalse(p.isShutdown());
527 try { p.shutdown(); } catch (SecurityException ok) { return; }
528 assertTrue(p.isShutdown());
529 joinPool(p);
530 }
531
532 /**
533 * isTerminated is false before termination, true after
534 */
535 public void testIsTerminated() throws InterruptedException {
536 final ThreadPoolExecutor p =
537 new CustomTPE(1, 1,
538 LONG_DELAY_MS, MILLISECONDS,
539 new ArrayBlockingQueue<Runnable>(10));
540 final CountDownLatch threadStarted = new CountDownLatch(1);
541 final CountDownLatch done = new CountDownLatch(1);
542 try {
543 assertFalse(p.isTerminating());
544 p.execute(new CheckedRunnable() {
545 public void realRun() throws InterruptedException {
546 assertFalse(p.isTerminating());
547 threadStarted.countDown();
548 done.await();
549 }});
550 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
551 assertFalse(p.isTerminating());
552 done.countDown();
553 } finally {
554 try { p.shutdown(); } catch (SecurityException ok) { return; }
555 }
556 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
557 assertTrue(p.isTerminated());
558 assertFalse(p.isTerminating());
559 }
560
561 /**
562 * isTerminating is not true when running or when terminated
563 */
564 public void testIsTerminating() throws InterruptedException {
565 final ThreadPoolExecutor p =
566 new CustomTPE(1, 1,
567 LONG_DELAY_MS, MILLISECONDS,
568 new ArrayBlockingQueue<Runnable>(10));
569 final CountDownLatch threadStarted = new CountDownLatch(1);
570 final CountDownLatch done = new CountDownLatch(1);
571 try {
572 assertFalse(p.isTerminating());
573 p.execute(new CheckedRunnable() {
574 public void realRun() throws InterruptedException {
575 assertFalse(p.isTerminating());
576 threadStarted.countDown();
577 done.await();
578 }});
579 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
580 assertFalse(p.isTerminating());
581 done.countDown();
582 } finally {
583 try { p.shutdown(); } catch (SecurityException ok) { return; }
584 }
585 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
586 assertTrue(p.isTerminated());
587 assertFalse(p.isTerminating());
588 }
589
590 /**
591 * getQueue returns the work queue, which contains queued tasks
592 */
593 public void testGetQueue() throws InterruptedException {
594 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
595 final ThreadPoolExecutor p =
596 new CustomTPE(1, 1,
597 LONG_DELAY_MS, MILLISECONDS,
598 q);
599 final CountDownLatch threadStarted = new CountDownLatch(1);
600 final CountDownLatch done = new CountDownLatch(1);
601 try {
602 FutureTask[] tasks = new FutureTask[5];
603 for (int i = 0; i < tasks.length; i++) {
604 Callable task = new CheckedCallable<Boolean>() {
605 public Boolean realCall() throws InterruptedException {
606 threadStarted.countDown();
607 assertSame(q, p.getQueue());
608 done.await();
609 return Boolean.TRUE;
610 }};
611 tasks[i] = new FutureTask(task);
612 p.execute(tasks[i]);
613 }
614 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
615 assertSame(q, p.getQueue());
616 assertFalse(q.contains(tasks[0]));
617 assertTrue(q.contains(tasks[tasks.length - 1]));
618 assertEquals(tasks.length - 1, q.size());
619 } finally {
620 done.countDown();
621 joinPool(p);
622 }
623 }
624
625 /**
626 * remove(task) removes queued task, and fails to remove active task
627 */
628 public void testRemove() throws InterruptedException {
629 BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
630 final ThreadPoolExecutor p =
631 new CustomTPE(1, 1,
632 LONG_DELAY_MS, MILLISECONDS,
633 q);
634 Runnable[] tasks = new Runnable[6];
635 final CountDownLatch threadStarted = new CountDownLatch(1);
636 final CountDownLatch done = new CountDownLatch(1);
637 try {
638 for (int i = 0; i < tasks.length; i++) {
639 tasks[i] = new CheckedRunnable() {
640 public void realRun() throws InterruptedException {
641 threadStarted.countDown();
642 done.await();
643 }};
644 p.execute(tasks[i]);
645 }
646 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
647 assertFalse(p.remove(tasks[0]));
648 assertTrue(q.contains(tasks[4]));
649 assertTrue(q.contains(tasks[3]));
650 assertTrue(p.remove(tasks[4]));
651 assertFalse(p.remove(tasks[4]));
652 assertFalse(q.contains(tasks[4]));
653 assertTrue(q.contains(tasks[3]));
654 assertTrue(p.remove(tasks[3]));
655 assertFalse(q.contains(tasks[3]));
656 } finally {
657 done.countDown();
658 joinPool(p);
659 }
660 }
661
662 /**
663 * purge removes cancelled tasks from the queue
664 */
665 public void testPurge() throws InterruptedException {
666 final CountDownLatch threadStarted = new CountDownLatch(1);
667 final CountDownLatch done = new CountDownLatch(1);
668 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
669 final ThreadPoolExecutor p =
670 new CustomTPE(1, 1,
671 LONG_DELAY_MS, MILLISECONDS,
672 q);
673 FutureTask[] tasks = new FutureTask[5];
674 try {
675 for (int i = 0; i < tasks.length; i++) {
676 Callable task = new CheckedCallable<Boolean>() {
677 public Boolean realCall() throws InterruptedException {
678 threadStarted.countDown();
679 done.await();
680 return Boolean.TRUE;
681 }};
682 tasks[i] = new FutureTask(task);
683 p.execute(tasks[i]);
684 }
685 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
686 assertEquals(tasks.length, p.getTaskCount());
687 assertEquals(tasks.length - 1, q.size());
688 assertEquals(1L, p.getActiveCount());
689 assertEquals(0L, p.getCompletedTaskCount());
690 tasks[4].cancel(true);
691 tasks[3].cancel(false);
692 p.purge();
693 assertEquals(tasks.length - 3, q.size());
694 assertEquals(tasks.length - 2, p.getTaskCount());
695 p.purge(); // Nothing to do
696 assertEquals(tasks.length - 3, q.size());
697 assertEquals(tasks.length - 2, p.getTaskCount());
698 } finally {
699 done.countDown();
700 joinPool(p);
701 }
702 }
703
704 /**
705 * shutdownNow returns a list containing tasks that were not run,
706 * and those tasks are drained from the queue
707 */
708 public void testShutdownNow() throws InterruptedException {
709 final int poolSize = 2;
710 final int count = 5;
711 final AtomicInteger ran = new AtomicInteger(0);
712 ThreadPoolExecutor p =
713 new CustomTPE(poolSize, poolSize, LONG_DELAY_MS, MILLISECONDS,
714 new ArrayBlockingQueue<Runnable>(10));
715 CountDownLatch threadsStarted = new CountDownLatch(poolSize);
716 Runnable waiter = new CheckedRunnable() { public void realRun() {
717 threadsStarted.countDown();
718 try {
719 MILLISECONDS.sleep(2 * LONG_DELAY_MS);
720 } catch (InterruptedException success) {}
721 ran.getAndIncrement();
722 }};
723 for (int i = 0; i < count; i++)
724 p.execute(waiter);
725 assertTrue(threadsStarted.await(LONG_DELAY_MS, MILLISECONDS));
726 assertEquals(poolSize, p.getActiveCount());
727 assertEquals(0, p.getCompletedTaskCount());
728 final List<Runnable> queuedTasks;
729 try {
730 queuedTasks = p.shutdownNow();
731 } catch (SecurityException ok) {
732 return; // Allowed in case test doesn't have privs
733 }
734 assertTrue(p.isShutdown());
735 assertTrue(p.getQueue().isEmpty());
736 assertEquals(count - poolSize, queuedTasks.size());
737 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
738 assertTrue(p.isTerminated());
739 assertEquals(poolSize, ran.get());
740 assertEquals(poolSize, p.getCompletedTaskCount());
741 }
742
743 // Exception Tests
744
745 /**
746 * Constructor throws if corePoolSize argument is less than zero
747 */
748 public void testConstructor1() {
749 try {
750 new CustomTPE(-1, 1, 1L, SECONDS,
751 new ArrayBlockingQueue<Runnable>(10));
752 shouldThrow();
753 } catch (IllegalArgumentException success) {}
754 }
755
756 /**
757 * Constructor throws if maximumPoolSize is less than zero
758 */
759 public void testConstructor2() {
760 try {
761 new CustomTPE(1, -1, 1L, SECONDS,
762 new ArrayBlockingQueue<Runnable>(10));
763 shouldThrow();
764 } catch (IllegalArgumentException success) {}
765 }
766
767 /**
768 * Constructor throws if maximumPoolSize is equal to zero
769 */
770 public void testConstructor3() {
771 try {
772 new CustomTPE(1, 0, 1L, SECONDS,
773 new ArrayBlockingQueue<Runnable>(10));
774 shouldThrow();
775 } catch (IllegalArgumentException success) {}
776 }
777
778 /**
779 * Constructor throws if keepAliveTime is less than zero
780 */
781 public void testConstructor4() {
782 try {
783 new CustomTPE(1, 2, -1L, SECONDS,
784 new ArrayBlockingQueue<Runnable>(10));
785 shouldThrow();
786 } catch (IllegalArgumentException success) {}
787 }
788
789 /**
790 * Constructor throws if corePoolSize is greater than the maximumPoolSize
791 */
792 public void testConstructor5() {
793 try {
794 new CustomTPE(2, 1, 1L, SECONDS,
795 new ArrayBlockingQueue<Runnable>(10));
796 shouldThrow();
797 } catch (IllegalArgumentException success) {}
798 }
799
800 /**
801 * Constructor throws if workQueue is set to null
802 */
803 public void testConstructorNullPointerException() {
804 try {
805 new CustomTPE(1, 2, 1L, SECONDS, null);
806 shouldThrow();
807 } catch (NullPointerException success) {}
808 }
809
810 /**
811 * Constructor throws if corePoolSize argument is less than zero
812 */
813 public void testConstructor6() {
814 try {
815 new CustomTPE(-1, 1, 1L, SECONDS,
816 new ArrayBlockingQueue<Runnable>(10),
817 new SimpleThreadFactory());
818 shouldThrow();
819 } catch (IllegalArgumentException success) {}
820 }
821
822 /**
823 * Constructor throws if maximumPoolSize is less than zero
824 */
825 public void testConstructor7() {
826 try {
827 new CustomTPE(1,-1, 1L, SECONDS,
828 new ArrayBlockingQueue<Runnable>(10),
829 new SimpleThreadFactory());
830 shouldThrow();
831 } catch (IllegalArgumentException success) {}
832 }
833
834 /**
835 * Constructor throws if maximumPoolSize is equal to zero
836 */
837 public void testConstructor8() {
838 try {
839 new CustomTPE(1, 0, 1L, SECONDS,
840 new ArrayBlockingQueue<Runnable>(10),
841 new SimpleThreadFactory());
842 shouldThrow();
843 } catch (IllegalArgumentException success) {}
844 }
845
846 /**
847 * Constructor throws if keepAliveTime is less than zero
848 */
849 public void testConstructor9() {
850 try {
851 new CustomTPE(1, 2, -1L, SECONDS,
852 new ArrayBlockingQueue<Runnable>(10),
853 new SimpleThreadFactory());
854 shouldThrow();
855 } catch (IllegalArgumentException success) {}
856 }
857
858 /**
859 * Constructor throws if corePoolSize is greater than the maximumPoolSize
860 */
861 public void testConstructor10() {
862 try {
863 new CustomTPE(2, 1, 1L, SECONDS,
864 new ArrayBlockingQueue<Runnable>(10),
865 new SimpleThreadFactory());
866 shouldThrow();
867 } catch (IllegalArgumentException success) {}
868 }
869
870 /**
871 * Constructor throws if workQueue is set to null
872 */
873 public void testConstructorNullPointerException2() {
874 try {
875 new CustomTPE(1, 2, 1L, SECONDS, null, new SimpleThreadFactory());
876 shouldThrow();
877 } catch (NullPointerException success) {}
878 }
879
880 /**
881 * Constructor throws if threadFactory is set to null
882 */
883 public void testConstructorNullPointerException3() {
884 try {
885 new CustomTPE(1, 2, 1L, SECONDS,
886 new ArrayBlockingQueue<Runnable>(10),
887 (ThreadFactory) null);
888 shouldThrow();
889 } catch (NullPointerException success) {}
890 }
891
892 /**
893 * Constructor throws if corePoolSize argument is less than zero
894 */
895 public void testConstructor11() {
896 try {
897 new CustomTPE(-1, 1, 1L, SECONDS,
898 new ArrayBlockingQueue<Runnable>(10),
899 new NoOpREHandler());
900 shouldThrow();
901 } catch (IllegalArgumentException success) {}
902 }
903
904 /**
905 * Constructor throws if maximumPoolSize is less than zero
906 */
907 public void testConstructor12() {
908 try {
909 new CustomTPE(1, -1, 1L, SECONDS,
910 new ArrayBlockingQueue<Runnable>(10),
911 new NoOpREHandler());
912 shouldThrow();
913 } catch (IllegalArgumentException success) {}
914 }
915
916 /**
917 * Constructor throws if maximumPoolSize is equal to zero
918 */
919 public void testConstructor13() {
920 try {
921 new CustomTPE(1, 0, 1L, SECONDS,
922 new ArrayBlockingQueue<Runnable>(10),
923 new NoOpREHandler());
924 shouldThrow();
925 } catch (IllegalArgumentException success) {}
926 }
927
928 /**
929 * Constructor throws if keepAliveTime is less than zero
930 */
931 public void testConstructor14() {
932 try {
933 new CustomTPE(1, 2, -1L, SECONDS,
934 new ArrayBlockingQueue<Runnable>(10),
935 new NoOpREHandler());
936 shouldThrow();
937 } catch (IllegalArgumentException success) {}
938 }
939
940 /**
941 * Constructor throws if corePoolSize is greater than the maximumPoolSize
942 */
943 public void testConstructor15() {
944 try {
945 new CustomTPE(2, 1, 1L, SECONDS,
946 new ArrayBlockingQueue<Runnable>(10),
947 new NoOpREHandler());
948 shouldThrow();
949 } catch (IllegalArgumentException success) {}
950 }
951
952 /**
953 * Constructor throws if workQueue is set to null
954 */
955 public void testConstructorNullPointerException4() {
956 try {
957 new CustomTPE(1, 2, 1L, SECONDS,
958 null,
959 new NoOpREHandler());
960 shouldThrow();
961 } catch (NullPointerException success) {}
962 }
963
964 /**
965 * Constructor throws if handler is set to null
966 */
967 public void testConstructorNullPointerException5() {
968 try {
969 new CustomTPE(1, 2, 1L, SECONDS,
970 new ArrayBlockingQueue<Runnable>(10),
971 (RejectedExecutionHandler) null);
972 shouldThrow();
973 } catch (NullPointerException success) {}
974 }
975
976 /**
977 * Constructor throws if corePoolSize argument is less than zero
978 */
979 public void testConstructor16() {
980 try {
981 new CustomTPE(-1, 1, 1L, SECONDS,
982 new ArrayBlockingQueue<Runnable>(10),
983 new SimpleThreadFactory(),
984 new NoOpREHandler());
985 shouldThrow();
986 } catch (IllegalArgumentException success) {}
987 }
988
989 /**
990 * Constructor throws if maximumPoolSize is less than zero
991 */
992 public void testConstructor17() {
993 try {
994 new CustomTPE(1, -1, 1L, SECONDS,
995 new ArrayBlockingQueue<Runnable>(10),
996 new SimpleThreadFactory(),
997 new NoOpREHandler());
998 shouldThrow();
999 } catch (IllegalArgumentException success) {}
1000 }
1001
1002 /**
1003 * Constructor throws if maximumPoolSize is equal to zero
1004 */
1005 public void testConstructor18() {
1006 try {
1007 new CustomTPE(1, 0, 1L, SECONDS,
1008 new ArrayBlockingQueue<Runnable>(10),
1009 new SimpleThreadFactory(),
1010 new NoOpREHandler());
1011 shouldThrow();
1012 } catch (IllegalArgumentException success) {}
1013 }
1014
1015 /**
1016 * Constructor throws if keepAliveTime is less than zero
1017 */
1018 public void testConstructor19() {
1019 try {
1020 new CustomTPE(1, 2, -1L, SECONDS,
1021 new ArrayBlockingQueue<Runnable>(10),
1022 new SimpleThreadFactory(),
1023 new NoOpREHandler());
1024 shouldThrow();
1025 } catch (IllegalArgumentException success) {}
1026 }
1027
1028 /**
1029 * Constructor throws if corePoolSize is greater than the maximumPoolSize
1030 */
1031 public void testConstructor20() {
1032 try {
1033 new CustomTPE(2, 1, 1L, SECONDS,
1034 new ArrayBlockingQueue<Runnable>(10),
1035 new SimpleThreadFactory(),
1036 new NoOpREHandler());
1037 shouldThrow();
1038 } catch (IllegalArgumentException success) {}
1039 }
1040
1041 /**
1042 * Constructor throws if workQueue is null
1043 */
1044 public void testConstructorNullPointerException6() {
1045 try {
1046 new CustomTPE(1, 2, 1L, SECONDS,
1047 null,
1048 new SimpleThreadFactory(),
1049 new NoOpREHandler());
1050 shouldThrow();
1051 } catch (NullPointerException success) {}
1052 }
1053
1054 /**
1055 * Constructor throws if handler is null
1056 */
1057 public void testConstructorNullPointerException7() {
1058 try {
1059 new CustomTPE(1, 2, 1L, SECONDS,
1060 new ArrayBlockingQueue<Runnable>(10),
1061 new SimpleThreadFactory(),
1062 (RejectedExecutionHandler) null);
1063 shouldThrow();
1064 } catch (NullPointerException success) {}
1065 }
1066
1067 /**
1068 * Constructor throws if ThreadFactory is null
1069 */
1070 public void testConstructorNullPointerException8() {
1071 try {
1072 new CustomTPE(1, 2, 1L, SECONDS,
1073 new ArrayBlockingQueue<Runnable>(10),
1074 (ThreadFactory) null,
1075 new NoOpREHandler());
1076 shouldThrow();
1077 } catch (NullPointerException success) {}
1078 }
1079
1080 /**
1081 * execute throws RejectedExecutionException if saturated.
1082 */
1083 public void testSaturatedExecute() {
1084 ThreadPoolExecutor p =
1085 new CustomTPE(1, 1,
1086 LONG_DELAY_MS, MILLISECONDS,
1087 new ArrayBlockingQueue<Runnable>(1));
1088 final CountDownLatch done = new CountDownLatch(1);
1089 try {
1090 Runnable task = new CheckedRunnable() {
1091 public void realRun() throws InterruptedException {
1092 done.await();
1093 }};
1094 for (int i = 0; i < 2; ++i)
1095 p.execute(task);
1096 for (int i = 0; i < 2; ++i) {
1097 try {
1098 p.execute(task);
1099 shouldThrow();
1100 } catch (RejectedExecutionException success) {}
1101 assertTrue(p.getTaskCount() <= 2);
1102 }
1103 } finally {
1104 done.countDown();
1105 joinPool(p);
1106 }
1107 }
1108
1109 /**
1110 * executor using CallerRunsPolicy runs task if saturated.
1111 */
1112 public void testSaturatedExecute2() {
1113 RejectedExecutionHandler h = new CustomTPE.CallerRunsPolicy();
1114 ThreadPoolExecutor p = new CustomTPE(1, 1,
1115 LONG_DELAY_MS, MILLISECONDS,
1116 new ArrayBlockingQueue<Runnable>(1),
1117 h);
1118 try {
1119 TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5];
1120 for (int i = 0; i < tasks.length; ++i)
1121 tasks[i] = new TrackedNoOpRunnable();
1122 TrackedLongRunnable mr = new TrackedLongRunnable();
1123 p.execute(mr);
1124 for (int i = 0; i < tasks.length; ++i)
1125 p.execute(tasks[i]);
1126 for (int i = 1; i < tasks.length; ++i)
1127 assertTrue(tasks[i].done);
1128 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1129 } finally {
1130 joinPool(p);
1131 }
1132 }
1133
1134 /**
1135 * executor using DiscardPolicy drops task if saturated.
1136 */
1137 public void testSaturatedExecute3() {
1138 RejectedExecutionHandler h = new CustomTPE.DiscardPolicy();
1139 ThreadPoolExecutor p =
1140 new CustomTPE(1, 1,
1141 LONG_DELAY_MS, MILLISECONDS,
1142 new ArrayBlockingQueue<Runnable>(1),
1143 h);
1144 try {
1145 TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5];
1146 for (int i = 0; i < tasks.length; ++i)
1147 tasks[i] = new TrackedNoOpRunnable();
1148 p.execute(new TrackedLongRunnable());
1149 for (TrackedNoOpRunnable task : tasks)
1150 p.execute(task);
1151 for (TrackedNoOpRunnable task : tasks)
1152 assertFalse(task.done);
1153 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1154 } finally {
1155 joinPool(p);
1156 }
1157 }
1158
1159 /**
1160 * executor using DiscardOldestPolicy drops oldest task if saturated.
1161 */
1162 public void testSaturatedExecute4() {
1163 RejectedExecutionHandler h = new CustomTPE.DiscardOldestPolicy();
1164 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1165 try {
1166 p.execute(new TrackedLongRunnable());
1167 TrackedLongRunnable r2 = new TrackedLongRunnable();
1168 p.execute(r2);
1169 assertTrue(p.getQueue().contains(r2));
1170 TrackedNoOpRunnable r3 = new TrackedNoOpRunnable();
1171 p.execute(r3);
1172 assertFalse(p.getQueue().contains(r2));
1173 assertTrue(p.getQueue().contains(r3));
1174 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1175 } finally {
1176 joinPool(p);
1177 }
1178 }
1179
1180 /**
1181 * execute throws RejectedExecutionException if shutdown
1182 */
1183 public void testRejectedExecutionExceptionOnShutdown() {
1184 ThreadPoolExecutor p =
1185 new CustomTPE(1,1,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(1));
1186 try { p.shutdown(); } catch (SecurityException ok) { return; }
1187 try {
1188 p.execute(new NoOpRunnable());
1189 shouldThrow();
1190 } catch (RejectedExecutionException success) {}
1191
1192 joinPool(p);
1193 }
1194
1195 /**
1196 * execute using CallerRunsPolicy drops task on shutdown
1197 */
1198 public void testCallerRunsOnShutdown() {
1199 RejectedExecutionHandler h = new CustomTPE.CallerRunsPolicy();
1200 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1201
1202 try { p.shutdown(); } catch (SecurityException ok) { return; }
1203 try {
1204 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1205 p.execute(r);
1206 assertFalse(r.done);
1207 } finally {
1208 joinPool(p);
1209 }
1210 }
1211
1212 /**
1213 * execute using DiscardPolicy drops task on shutdown
1214 */
1215 public void testDiscardOnShutdown() {
1216 RejectedExecutionHandler h = new CustomTPE.DiscardPolicy();
1217 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1218
1219 try { p.shutdown(); } catch (SecurityException ok) { return; }
1220 try {
1221 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1222 p.execute(r);
1223 assertFalse(r.done);
1224 } finally {
1225 joinPool(p);
1226 }
1227 }
1228
1229 /**
1230 * execute using DiscardOldestPolicy drops task on shutdown
1231 */
1232 public void testDiscardOldestOnShutdown() {
1233 RejectedExecutionHandler h = new CustomTPE.DiscardOldestPolicy();
1234 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1235
1236 try { p.shutdown(); } catch (SecurityException ok) { return; }
1237 try {
1238 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1239 p.execute(r);
1240 assertFalse(r.done);
1241 } finally {
1242 joinPool(p);
1243 }
1244 }
1245
1246 /**
1247 * execute(null) throws NPE
1248 */
1249 public void testExecuteNull() {
1250 ThreadPoolExecutor p =
1251 new CustomTPE(1, 2, 1L, SECONDS,
1252 new ArrayBlockingQueue<Runnable>(10));
1253 try {
1254 p.execute(null);
1255 shouldThrow();
1256 } catch (NullPointerException success) {}
1257
1258 joinPool(p);
1259 }
1260
1261 /**
1262 * setCorePoolSize of negative value throws IllegalArgumentException
1263 */
1264 public void testCorePoolSizeIllegalArgumentException() {
1265 ThreadPoolExecutor p =
1266 new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1267 try {
1268 p.setCorePoolSize(-1);
1269 shouldThrow();
1270 } catch (IllegalArgumentException success) {
1271 } finally {
1272 try { p.shutdown(); } catch (SecurityException ok) { return; }
1273 }
1274 joinPool(p);
1275 }
1276
1277 /**
1278 * setMaximumPoolSize(int) throws IllegalArgumentException
1279 * if given a value less the core pool size
1280 */
1281 public void testMaximumPoolSizeIllegalArgumentException() {
1282 ThreadPoolExecutor p =
1283 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1284 try {
1285 p.setMaximumPoolSize(1);
1286 shouldThrow();
1287 } catch (IllegalArgumentException success) {
1288 } finally {
1289 try { p.shutdown(); } catch (SecurityException ok) { return; }
1290 }
1291 joinPool(p);
1292 }
1293
1294 /**
1295 * setMaximumPoolSize throws IllegalArgumentException
1296 * if given a negative value
1297 */
1298 public void testMaximumPoolSizeIllegalArgumentException2() {
1299 ThreadPoolExecutor p =
1300 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1301 try {
1302 p.setMaximumPoolSize(-1);
1303 shouldThrow();
1304 } catch (IllegalArgumentException success) {
1305 } finally {
1306 try { p.shutdown(); } catch (SecurityException ok) { return; }
1307 }
1308 joinPool(p);
1309 }
1310
1311 /**
1312 * setKeepAliveTime throws IllegalArgumentException
1313 * when given a negative value
1314 */
1315 public void testKeepAliveTimeIllegalArgumentException() {
1316 ThreadPoolExecutor p =
1317 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1318
1319 try {
1320 p.setKeepAliveTime(-1,MILLISECONDS);
1321 shouldThrow();
1322 } catch (IllegalArgumentException success) {
1323 } finally {
1324 try { p.shutdown(); } catch (SecurityException ok) { return; }
1325 }
1326 joinPool(p);
1327 }
1328
1329 /**
1330 * terminated() is called on termination
1331 */
1332 public void testTerminated() {
1333 CustomTPE p = new CustomTPE();
1334 try { p.shutdown(); } catch (SecurityException ok) { return; }
1335 assertTrue(p.terminatedCalled());
1336 joinPool(p);
1337 }
1338
1339 /**
1340 * beforeExecute and afterExecute are called when executing task
1341 */
1342 public void testBeforeAfter() throws InterruptedException {
1343 CustomTPE p = new CustomTPE();
1344 try {
1345 final CountDownLatch done = new CountDownLatch(1);
1346 p.execute(new CheckedRunnable() {
1347 public void realRun() {
1348 done.countDown();
1349 }});
1350 await(p.afterCalled);
1351 assertEquals(0, done.getCount());
1352 assertTrue(p.afterCalled());
1353 assertTrue(p.beforeCalled());
1354 try { p.shutdown(); } catch (SecurityException ok) { return; }
1355 } finally {
1356 joinPool(p);
1357 }
1358 }
1359
1360 /**
1361 * completed submit of callable returns result
1362 */
1363 public void testSubmitCallable() throws Exception {
1364 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1365 try {
1366 Future<String> future = e.submit(new StringTask());
1367 String result = future.get();
1368 assertSame(TEST_STRING, result);
1369 } finally {
1370 joinPool(e);
1371 }
1372 }
1373
1374 /**
1375 * completed submit of runnable returns successfully
1376 */
1377 public void testSubmitRunnable() throws Exception {
1378 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1379 try {
1380 Future<?> future = e.submit(new NoOpRunnable());
1381 future.get();
1382 assertTrue(future.isDone());
1383 } finally {
1384 joinPool(e);
1385 }
1386 }
1387
1388 /**
1389 * completed submit of (runnable, result) returns result
1390 */
1391 public void testSubmitRunnable2() throws Exception {
1392 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1393 try {
1394 Future<String> future = e.submit(new NoOpRunnable(), TEST_STRING);
1395 String result = future.get();
1396 assertSame(TEST_STRING, result);
1397 } finally {
1398 joinPool(e);
1399 }
1400 }
1401
1402 /**
1403 * invokeAny(null) throws NPE
1404 */
1405 public void testInvokeAny1() throws Exception {
1406 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1407 try {
1408 e.invokeAny(null);
1409 shouldThrow();
1410 } catch (NullPointerException success) {
1411 } finally {
1412 joinPool(e);
1413 }
1414 }
1415
1416 /**
1417 * invokeAny(empty collection) throws IAE
1418 */
1419 public void testInvokeAny2() throws Exception {
1420 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1421 try {
1422 e.invokeAny(new ArrayList<Callable<String>>());
1423 shouldThrow();
1424 } catch (IllegalArgumentException success) {
1425 } finally {
1426 joinPool(e);
1427 }
1428 }
1429
1430 /**
1431 * invokeAny(c) throws NPE if c has null elements
1432 */
1433 public void testInvokeAny3() throws Exception {
1434 CountDownLatch latch = new CountDownLatch(1);
1435 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1436 List<Callable<String>> l = new ArrayList<Callable<String>>();
1437 l.add(latchAwaitingStringTask(latch));
1438 l.add(null);
1439 try {
1440 e.invokeAny(l);
1441 shouldThrow();
1442 } catch (NullPointerException success) {
1443 } finally {
1444 latch.countDown();
1445 joinPool(e);
1446 }
1447 }
1448
1449 /**
1450 * invokeAny(c) throws ExecutionException if no task completes
1451 */
1452 public void testInvokeAny4() throws Exception {
1453 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1454 List<Callable<String>> l = new ArrayList<Callable<String>>();
1455 l.add(new NPETask());
1456 try {
1457 e.invokeAny(l);
1458 shouldThrow();
1459 } catch (ExecutionException success) {
1460 assertTrue(success.getCause() instanceof NullPointerException);
1461 } finally {
1462 joinPool(e);
1463 }
1464 }
1465
1466 /**
1467 * invokeAny(c) returns result of some task
1468 */
1469 public void testInvokeAny5() throws Exception {
1470 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1471 try {
1472 List<Callable<String>> l = new ArrayList<Callable<String>>();
1473 l.add(new StringTask());
1474 l.add(new StringTask());
1475 String result = e.invokeAny(l);
1476 assertSame(TEST_STRING, result);
1477 } finally {
1478 joinPool(e);
1479 }
1480 }
1481
1482 /**
1483 * invokeAll(null) throws NPE
1484 */
1485 public void testInvokeAll1() throws Exception {
1486 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1487 try {
1488 e.invokeAll(null);
1489 shouldThrow();
1490 } catch (NullPointerException success) {
1491 } finally {
1492 joinPool(e);
1493 }
1494 }
1495
1496 /**
1497 * invokeAll(empty collection) returns empty collection
1498 */
1499 public void testInvokeAll2() throws Exception {
1500 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1501 try {
1502 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>());
1503 assertTrue(r.isEmpty());
1504 } finally {
1505 joinPool(e);
1506 }
1507 }
1508
1509 /**
1510 * invokeAll(c) throws NPE if c has null elements
1511 */
1512 public void testInvokeAll3() throws Exception {
1513 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1514 List<Callable<String>> l = new ArrayList<Callable<String>>();
1515 l.add(new StringTask());
1516 l.add(null);
1517 try {
1518 e.invokeAll(l);
1519 shouldThrow();
1520 } catch (NullPointerException success) {
1521 } finally {
1522 joinPool(e);
1523 }
1524 }
1525
1526 /**
1527 * get of element of invokeAll(c) throws exception on failed task
1528 */
1529 public void testInvokeAll4() throws Exception {
1530 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1531 List<Callable<String>> l = new ArrayList<Callable<String>>();
1532 l.add(new NPETask());
1533 List<Future<String>> futures = e.invokeAll(l);
1534 assertEquals(1, futures.size());
1535 try {
1536 futures.get(0).get();
1537 shouldThrow();
1538 } catch (ExecutionException success) {
1539 assertTrue(success.getCause() instanceof NullPointerException);
1540 } finally {
1541 joinPool(e);
1542 }
1543 }
1544
1545 /**
1546 * invokeAll(c) returns results of all completed tasks
1547 */
1548 public void testInvokeAll5() throws Exception {
1549 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1550 try {
1551 List<Callable<String>> l = new ArrayList<Callable<String>>();
1552 l.add(new StringTask());
1553 l.add(new StringTask());
1554 List<Future<String>> futures = e.invokeAll(l);
1555 assertEquals(2, futures.size());
1556 for (Future<String> future : futures)
1557 assertSame(TEST_STRING, future.get());
1558 } finally {
1559 joinPool(e);
1560 }
1561 }
1562
1563 /**
1564 * timed invokeAny(null) throws NPE
1565 */
1566 public void testTimedInvokeAny1() throws Exception {
1567 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1568 try {
1569 e.invokeAny(null, MEDIUM_DELAY_MS, MILLISECONDS);
1570 shouldThrow();
1571 } catch (NullPointerException success) {
1572 } finally {
1573 joinPool(e);
1574 }
1575 }
1576
1577 /**
1578 * timed invokeAny(,,null) throws NPE
1579 */
1580 public void testTimedInvokeAnyNullTimeUnit() throws Exception {
1581 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1582 List<Callable<String>> l = new ArrayList<Callable<String>>();
1583 l.add(new StringTask());
1584 try {
1585 e.invokeAny(l, MEDIUM_DELAY_MS, null);
1586 shouldThrow();
1587 } catch (NullPointerException success) {
1588 } finally {
1589 joinPool(e);
1590 }
1591 }
1592
1593 /**
1594 * timed invokeAny(empty collection) throws IAE
1595 */
1596 public void testTimedInvokeAny2() throws Exception {
1597 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1598 try {
1599 e.invokeAny(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1600 shouldThrow();
1601 } catch (IllegalArgumentException success) {
1602 } finally {
1603 joinPool(e);
1604 }
1605 }
1606
1607 /**
1608 * timed invokeAny(c) throws NPE if c has null elements
1609 */
1610 public void testTimedInvokeAny3() throws Exception {
1611 CountDownLatch latch = new CountDownLatch(1);
1612 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1613 List<Callable<String>> l = new ArrayList<Callable<String>>();
1614 l.add(latchAwaitingStringTask(latch));
1615 l.add(null);
1616 try {
1617 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1618 shouldThrow();
1619 } catch (NullPointerException success) {
1620 } finally {
1621 latch.countDown();
1622 joinPool(e);
1623 }
1624 }
1625
1626 /**
1627 * timed invokeAny(c) throws ExecutionException if no task completes
1628 */
1629 public void testTimedInvokeAny4() throws Exception {
1630 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1631 List<Callable<String>> l = new ArrayList<Callable<String>>();
1632 l.add(new NPETask());
1633 try {
1634 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1635 shouldThrow();
1636 } catch (ExecutionException success) {
1637 assertTrue(success.getCause() instanceof NullPointerException);
1638 } finally {
1639 joinPool(e);
1640 }
1641 }
1642
1643 /**
1644 * timed invokeAny(c) returns result of some task
1645 */
1646 public void testTimedInvokeAny5() throws Exception {
1647 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1648 try {
1649 List<Callable<String>> l = new ArrayList<Callable<String>>();
1650 l.add(new StringTask());
1651 l.add(new StringTask());
1652 String result = e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1653 assertSame(TEST_STRING, result);
1654 } finally {
1655 joinPool(e);
1656 }
1657 }
1658
1659 /**
1660 * timed invokeAll(null) throws NPE
1661 */
1662 public void testTimedInvokeAll1() throws Exception {
1663 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1664 try {
1665 e.invokeAll(null, MEDIUM_DELAY_MS, MILLISECONDS);
1666 shouldThrow();
1667 } catch (NullPointerException success) {
1668 } finally {
1669 joinPool(e);
1670 }
1671 }
1672
1673 /**
1674 * timed invokeAll(,,null) throws NPE
1675 */
1676 public void testTimedInvokeAllNullTimeUnit() throws Exception {
1677 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1678 List<Callable<String>> l = new ArrayList<Callable<String>>();
1679 l.add(new StringTask());
1680 try {
1681 e.invokeAll(l, MEDIUM_DELAY_MS, null);
1682 shouldThrow();
1683 } catch (NullPointerException success) {
1684 } finally {
1685 joinPool(e);
1686 }
1687 }
1688
1689 /**
1690 * timed invokeAll(empty collection) returns empty collection
1691 */
1692 public void testTimedInvokeAll2() throws Exception {
1693 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1694 try {
1695 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1696 assertTrue(r.isEmpty());
1697 } finally {
1698 joinPool(e);
1699 }
1700 }
1701
1702 /**
1703 * timed invokeAll(c) throws NPE if c has null elements
1704 */
1705 public void testTimedInvokeAll3() throws Exception {
1706 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1707 List<Callable<String>> l = new ArrayList<Callable<String>>();
1708 l.add(new StringTask());
1709 l.add(null);
1710 try {
1711 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1712 shouldThrow();
1713 } catch (NullPointerException success) {
1714 } finally {
1715 joinPool(e);
1716 }
1717 }
1718
1719 /**
1720 * get of element of invokeAll(c) throws exception on failed task
1721 */
1722 public void testTimedInvokeAll4() throws Exception {
1723 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1724 List<Callable<String>> l = new ArrayList<Callable<String>>();
1725 l.add(new NPETask());
1726 List<Future<String>> futures =
1727 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1728 assertEquals(1, futures.size());
1729 try {
1730 futures.get(0).get();
1731 shouldThrow();
1732 } catch (ExecutionException success) {
1733 assertTrue(success.getCause() instanceof NullPointerException);
1734 } finally {
1735 joinPool(e);
1736 }
1737 }
1738
1739 /**
1740 * timed invokeAll(c) returns results of all completed tasks
1741 */
1742 public void testTimedInvokeAll5() throws Exception {
1743 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1744 try {
1745 List<Callable<String>> l = new ArrayList<Callable<String>>();
1746 l.add(new StringTask());
1747 l.add(new StringTask());
1748 List<Future<String>> futures =
1749 e.invokeAll(l, LONG_DELAY_MS, MILLISECONDS);
1750 assertEquals(2, futures.size());
1751 for (Future<String> future : futures)
1752 assertSame(TEST_STRING, future.get());
1753 } finally {
1754 joinPool(e);
1755 }
1756 }
1757
1758 /**
1759 * timed invokeAll(c) cancels tasks not completed by timeout
1760 */
1761 public void testTimedInvokeAll6() throws Exception {
1762 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1763 try {
1764 for (long timeout = timeoutMillis();;) {
1765 List<Callable<String>> tasks = new ArrayList<>();
1766 tasks.add(new StringTask("0"));
1767 tasks.add(Executors.callable(new LongPossiblyInterruptedRunnable(), TEST_STRING));
1768 tasks.add(new StringTask("2"));
1769 long startTime = System.nanoTime();
1770 List<Future<String>> futures =
1771 e.invokeAll(tasks, timeout, MILLISECONDS);
1772 assertEquals(tasks.size(), futures.size());
1773 assertTrue(millisElapsedSince(startTime) >= timeout);
1774 for (Future future : futures)
1775 assertTrue(future.isDone());
1776 assertTrue(futures.get(1).isCancelled());
1777 try {
1778 assertEquals("0", futures.get(0).get());
1779 assertEquals("2", futures.get(2).get());
1780 break;
1781 } catch (CancellationException retryWithLongerTimeout) {
1782 timeout *= 2;
1783 if (timeout >= LONG_DELAY_MS / 2)
1784 fail("expected exactly one task to be cancelled");
1785 }
1786 }
1787 } finally {
1788 joinPool(e);
1789 }
1790 }
1791
1792 /**
1793 * Execution continues if there is at least one thread even if
1794 * thread factory fails to create more
1795 */
1796 public void testFailingThreadFactory() throws InterruptedException {
1797 final ExecutorService e =
1798 new CustomTPE(100, 100,
1799 LONG_DELAY_MS, MILLISECONDS,
1800 new LinkedBlockingQueue<Runnable>(),
1801 new FailingThreadFactory());
1802 try {
1803 final int TASKS = 100;
1804 final CountDownLatch done = new CountDownLatch(TASKS);
1805 for (int k = 0; k < TASKS; ++k)
1806 e.execute(new CheckedRunnable() {
1807 public void realRun() {
1808 done.countDown();
1809 }});
1810 assertTrue(done.await(LONG_DELAY_MS, MILLISECONDS));
1811 } finally {
1812 joinPool(e);
1813 }
1814 }
1815
1816 /**
1817 * allowsCoreThreadTimeOut is by default false.
1818 */
1819 public void testAllowsCoreThreadTimeOut() {
1820 ThreadPoolExecutor p = new CustomTPE(2, 2, 1000, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1821 assertFalse(p.allowsCoreThreadTimeOut());
1822 joinPool(p);
1823 }
1824
1825 /**
1826 * allowCoreThreadTimeOut(true) causes idle threads to time out
1827 */
1828 public void testAllowCoreThreadTimeOut_true() throws Exception {
1829 long keepAliveTime = timeoutMillis();
1830 final ThreadPoolExecutor p =
1831 new CustomTPE(2, 10,
1832 keepAliveTime, MILLISECONDS,
1833 new ArrayBlockingQueue<Runnable>(10));
1834 final CountDownLatch threadStarted = new CountDownLatch(1);
1835 try {
1836 p.allowCoreThreadTimeOut(true);
1837 p.execute(new CheckedRunnable() {
1838 public void realRun() {
1839 threadStarted.countDown();
1840 assertEquals(1, p.getPoolSize());
1841 }});
1842 await(threadStarted);
1843 delay(keepAliveTime);
1844 long startTime = System.nanoTime();
1845 while (p.getPoolSize() > 0
1846 && millisElapsedSince(startTime) < LONG_DELAY_MS)
1847 Thread.yield();
1848 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1849 assertEquals(0, p.getPoolSize());
1850 } finally {
1851 joinPool(p);
1852 }
1853 }
1854
1855 /**
1856 * allowCoreThreadTimeOut(false) causes idle threads not to time out
1857 */
1858 public void testAllowCoreThreadTimeOut_false() throws Exception {
1859 long keepAliveTime = timeoutMillis();
1860 final ThreadPoolExecutor p =
1861 new CustomTPE(2, 10,
1862 keepAliveTime, MILLISECONDS,
1863 new ArrayBlockingQueue<Runnable>(10));
1864 final CountDownLatch threadStarted = new CountDownLatch(1);
1865 try {
1866 p.allowCoreThreadTimeOut(false);
1867 p.execute(new CheckedRunnable() {
1868 public void realRun() throws InterruptedException {
1869 threadStarted.countDown();
1870 assertTrue(p.getPoolSize() >= 1);
1871 }});
1872 delay(2 * keepAliveTime);
1873 assertTrue(p.getPoolSize() >= 1);
1874 } finally {
1875 joinPool(p);
1876 }
1877 }
1878
1879 /**
1880 * get(cancelled task) throws CancellationException
1881 * (in part, a test of CustomTPE itself)
1882 */
1883 public void testGet_cancelled() throws Exception {
1884 final ExecutorService e =
1885 new CustomTPE(1, 1,
1886 LONG_DELAY_MS, MILLISECONDS,
1887 new LinkedBlockingQueue<Runnable>());
1888 try {
1889 final CountDownLatch blockerStarted = new CountDownLatch(1);
1890 final CountDownLatch done = new CountDownLatch(1);
1891 final List<Future<?>> futures = new ArrayList<>();
1892 for (int i = 0; i < 2; i++) {
1893 Runnable r = new CheckedRunnable() { public void realRun()
1894 throws Throwable {
1895 blockerStarted.countDown();
1896 assertTrue(done.await(2 * LONG_DELAY_MS, MILLISECONDS));
1897 }};
1898 futures.add(e.submit(r));
1899 }
1900 assertTrue(blockerStarted.await(LONG_DELAY_MS, MILLISECONDS));
1901 for (Future<?> future : futures) future.cancel(false);
1902 for (Future<?> future : futures) {
1903 try {
1904 future.get();
1905 shouldThrow();
1906 } catch (CancellationException success) {}
1907 try {
1908 future.get(LONG_DELAY_MS, MILLISECONDS);
1909 shouldThrow();
1910 } catch (CancellationException success) {}
1911 assertTrue(future.isCancelled());
1912 assertTrue(future.isDone());
1913 }
1914 done.countDown();
1915 } finally {
1916 joinPool(e);
1917 }
1918 }
1919
1920 }