ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/ThreadPoolExecutorSubclassTest.java
Revision: 1.51
Committed: Sun Oct 4 00:59:09 2015 UTC (8 years, 7 months ago) by jsr166
Branch: MAIN
Changes since 1.50: +2 -4 lines
Log Message:
rejigger pool closing infrastructure

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include Andrew Wright, Jeffrey Hayes,
6 * Pat Fisher, Mike Judd.
7 */
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.SECONDS;
11
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.concurrent.ArrayBlockingQueue;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.CancellationException;
18 import java.util.concurrent.CountDownLatch;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.FutureTask;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.RejectedExecutionHandler;
27 import java.util.concurrent.RunnableFuture;
28 import java.util.concurrent.SynchronousQueue;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.TimeoutException;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.locks.Condition;
35 import java.util.concurrent.locks.ReentrantLock;
36
37 import junit.framework.Test;
38 import junit.framework.TestSuite;
39
40 public class ThreadPoolExecutorSubclassTest extends JSR166TestCase {
41 public static void main(String[] args) {
42 main(suite(), args);
43 }
44 public static Test suite() {
45 return new TestSuite(ThreadPoolExecutorSubclassTest.class);
46 }
47
48 static class CustomTask<V> implements RunnableFuture<V> {
49 final Callable<V> callable;
50 final ReentrantLock lock = new ReentrantLock();
51 final Condition cond = lock.newCondition();
52 boolean done;
53 boolean cancelled;
54 V result;
55 Thread thread;
56 Exception exception;
57 CustomTask(Callable<V> c) {
58 if (c == null) throw new NullPointerException();
59 callable = c;
60 }
61 CustomTask(final Runnable r, final V res) {
62 if (r == null) throw new NullPointerException();
63 callable = new Callable<V>() {
64 public V call() throws Exception { r.run(); return res; }};
65 }
66 public boolean isDone() {
67 lock.lock(); try { return done; } finally { lock.unlock() ; }
68 }
69 public boolean isCancelled() {
70 lock.lock(); try { return cancelled; } finally { lock.unlock() ; }
71 }
72 public boolean cancel(boolean mayInterrupt) {
73 lock.lock();
74 try {
75 if (!done) {
76 cancelled = true;
77 done = true;
78 if (mayInterrupt && thread != null)
79 thread.interrupt();
80 return true;
81 }
82 return false;
83 }
84 finally { lock.unlock() ; }
85 }
86 public void run() {
87 lock.lock();
88 try {
89 if (done)
90 return;
91 thread = Thread.currentThread();
92 }
93 finally { lock.unlock() ; }
94 V v = null;
95 Exception e = null;
96 try {
97 v = callable.call();
98 }
99 catch (Exception ex) {
100 e = ex;
101 }
102 lock.lock();
103 try {
104 if (!done) {
105 result = v;
106 exception = e;
107 done = true;
108 thread = null;
109 cond.signalAll();
110 }
111 }
112 finally { lock.unlock(); }
113 }
114 public V get() throws InterruptedException, ExecutionException {
115 lock.lock();
116 try {
117 while (!done)
118 cond.await();
119 if (cancelled)
120 throw new CancellationException();
121 if (exception != null)
122 throw new ExecutionException(exception);
123 return result;
124 }
125 finally { lock.unlock(); }
126 }
127 public V get(long timeout, TimeUnit unit)
128 throws InterruptedException, ExecutionException, TimeoutException {
129 long nanos = unit.toNanos(timeout);
130 lock.lock();
131 try {
132 while (!done) {
133 if (nanos <= 0L)
134 throw new TimeoutException();
135 nanos = cond.awaitNanos(nanos);
136 }
137 if (cancelled)
138 throw new CancellationException();
139 if (exception != null)
140 throw new ExecutionException(exception);
141 return result;
142 }
143 finally { lock.unlock(); }
144 }
145 }
146
147 static class CustomTPE extends ThreadPoolExecutor {
148 protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
149 return new CustomTask<V>(c);
150 }
151 protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
152 return new CustomTask<V>(r, v);
153 }
154
155 CustomTPE(int corePoolSize,
156 int maximumPoolSize,
157 long keepAliveTime,
158 TimeUnit unit,
159 BlockingQueue<Runnable> workQueue) {
160 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
161 workQueue);
162 }
163 CustomTPE(int corePoolSize,
164 int maximumPoolSize,
165 long keepAliveTime,
166 TimeUnit unit,
167 BlockingQueue<Runnable> workQueue,
168 ThreadFactory threadFactory) {
169 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
170 threadFactory);
171 }
172
173 CustomTPE(int corePoolSize,
174 int maximumPoolSize,
175 long keepAliveTime,
176 TimeUnit unit,
177 BlockingQueue<Runnable> workQueue,
178 RejectedExecutionHandler handler) {
179 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
180 handler);
181 }
182 CustomTPE(int corePoolSize,
183 int maximumPoolSize,
184 long keepAliveTime,
185 TimeUnit unit,
186 BlockingQueue<Runnable> workQueue,
187 ThreadFactory threadFactory,
188 RejectedExecutionHandler handler) {
189 super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
190 workQueue, threadFactory, handler);
191 }
192
193 final CountDownLatch beforeCalled = new CountDownLatch(1);
194 final CountDownLatch afterCalled = new CountDownLatch(1);
195 final CountDownLatch terminatedCalled = new CountDownLatch(1);
196
197 public CustomTPE() {
198 super(1, 1, LONG_DELAY_MS, MILLISECONDS, new SynchronousQueue<Runnable>());
199 }
200 protected void beforeExecute(Thread t, Runnable r) {
201 beforeCalled.countDown();
202 }
203 protected void afterExecute(Runnable r, Throwable t) {
204 afterCalled.countDown();
205 }
206 protected void terminated() {
207 terminatedCalled.countDown();
208 }
209
210 public boolean beforeCalled() {
211 return beforeCalled.getCount() == 0;
212 }
213 public boolean afterCalled() {
214 return afterCalled.getCount() == 0;
215 }
216 public boolean terminatedCalled() {
217 return terminatedCalled.getCount() == 0;
218 }
219 }
220
221 static class FailingThreadFactory implements ThreadFactory {
222 int calls = 0;
223 public Thread newThread(Runnable r) {
224 if (++calls > 1) return null;
225 return new Thread(r);
226 }
227 }
228
229 /**
230 * execute successfully executes a runnable
231 */
232 public void testExecute() throws InterruptedException {
233 final ThreadPoolExecutor p =
234 new CustomTPE(1, 1,
235 2 * LONG_DELAY_MS, MILLISECONDS,
236 new ArrayBlockingQueue<Runnable>(10));
237 try (PoolCleaner cleaner = cleaner(p)) {
238 final CountDownLatch done = new CountDownLatch(1);
239 final Runnable task = new CheckedRunnable() {
240 public void realRun() { done.countDown(); }};
241 p.execute(task);
242 assertTrue(done.await(LONG_DELAY_MS, MILLISECONDS));
243 }
244 }
245
246 /**
247 * getActiveCount increases but doesn't overestimate, when a
248 * thread becomes active
249 */
250 public void testGetActiveCount() throws InterruptedException {
251 final ThreadPoolExecutor p =
252 new CustomTPE(2, 2,
253 LONG_DELAY_MS, MILLISECONDS,
254 new ArrayBlockingQueue<Runnable>(10));
255 final CountDownLatch threadStarted = new CountDownLatch(1);
256 final CountDownLatch done = new CountDownLatch(1);
257 try (PoolCleaner cleaner = cleaner(p)) {
258 assertEquals(0, p.getActiveCount());
259 p.execute(new CheckedRunnable() {
260 public void realRun() throws InterruptedException {
261 threadStarted.countDown();
262 assertEquals(1, p.getActiveCount());
263 done.await();
264 }});
265 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
266 assertEquals(1, p.getActiveCount());
267 done.countDown();
268 }
269 }
270
271 /**
272 * prestartCoreThread starts a thread if under corePoolSize, else doesn't
273 */
274 public void testPrestartCoreThread() {
275 ThreadPoolExecutor p = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
276 assertEquals(0, p.getPoolSize());
277 assertTrue(p.prestartCoreThread());
278 assertEquals(1, p.getPoolSize());
279 assertTrue(p.prestartCoreThread());
280 assertEquals(2, p.getPoolSize());
281 assertFalse(p.prestartCoreThread());
282 assertEquals(2, p.getPoolSize());
283 joinPool(p);
284 }
285
286 /**
287 * prestartAllCoreThreads starts all corePoolSize threads
288 */
289 public void testPrestartAllCoreThreads() {
290 ThreadPoolExecutor p = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
291 assertEquals(0, p.getPoolSize());
292 p.prestartAllCoreThreads();
293 assertEquals(2, p.getPoolSize());
294 p.prestartAllCoreThreads();
295 assertEquals(2, p.getPoolSize());
296 joinPool(p);
297 }
298
299 /**
300 * getCompletedTaskCount increases, but doesn't overestimate,
301 * when tasks complete
302 */
303 public void testGetCompletedTaskCount() throws InterruptedException {
304 final ThreadPoolExecutor p =
305 new CustomTPE(2, 2,
306 LONG_DELAY_MS, MILLISECONDS,
307 new ArrayBlockingQueue<Runnable>(10));
308 final CountDownLatch threadStarted = new CountDownLatch(1);
309 final CountDownLatch threadProceed = new CountDownLatch(1);
310 final CountDownLatch threadDone = new CountDownLatch(1);
311 try {
312 assertEquals(0, p.getCompletedTaskCount());
313 p.execute(new CheckedRunnable() {
314 public void realRun() throws InterruptedException {
315 threadStarted.countDown();
316 assertEquals(0, p.getCompletedTaskCount());
317 threadProceed.await();
318 threadDone.countDown();
319 }});
320 await(threadStarted);
321 assertEquals(0, p.getCompletedTaskCount());
322 threadProceed.countDown();
323 threadDone.await();
324 long startTime = System.nanoTime();
325 while (p.getCompletedTaskCount() != 1) {
326 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
327 fail("timed out");
328 Thread.yield();
329 }
330 } finally {
331 joinPool(p);
332 }
333 }
334
335 /**
336 * getCorePoolSize returns size given in constructor if not otherwise set
337 */
338 public void testGetCorePoolSize() {
339 ThreadPoolExecutor p = new CustomTPE(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
340 assertEquals(1, p.getCorePoolSize());
341 joinPool(p);
342 }
343
344 /**
345 * getKeepAliveTime returns value given in constructor if not otherwise set
346 */
347 public void testGetKeepAliveTime() {
348 ThreadPoolExecutor p = new CustomTPE(2, 2, 1000, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
349 assertEquals(1, p.getKeepAliveTime(SECONDS));
350 joinPool(p);
351 }
352
353 /**
354 * getThreadFactory returns factory in constructor if not set
355 */
356 public void testGetThreadFactory() {
357 ThreadFactory tf = new SimpleThreadFactory();
358 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), tf, new NoOpREHandler());
359 assertSame(tf, p.getThreadFactory());
360 joinPool(p);
361 }
362
363 /**
364 * setThreadFactory sets the thread factory returned by getThreadFactory
365 */
366 public void testSetThreadFactory() {
367 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
368 ThreadFactory tf = new SimpleThreadFactory();
369 p.setThreadFactory(tf);
370 assertSame(tf, p.getThreadFactory());
371 joinPool(p);
372 }
373
374 /**
375 * setThreadFactory(null) throws NPE
376 */
377 public void testSetThreadFactoryNull() {
378 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
379 try {
380 p.setThreadFactory(null);
381 shouldThrow();
382 } catch (NullPointerException success) {
383 } finally {
384 joinPool(p);
385 }
386 }
387
388 /**
389 * getRejectedExecutionHandler returns handler in constructor if not set
390 */
391 public void testGetRejectedExecutionHandler() {
392 RejectedExecutionHandler h = new NoOpREHandler();
393 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), h);
394 assertSame(h, p.getRejectedExecutionHandler());
395 joinPool(p);
396 }
397
398 /**
399 * setRejectedExecutionHandler sets the handler returned by
400 * getRejectedExecutionHandler
401 */
402 public void testSetRejectedExecutionHandler() {
403 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
404 RejectedExecutionHandler h = new NoOpREHandler();
405 p.setRejectedExecutionHandler(h);
406 assertSame(h, p.getRejectedExecutionHandler());
407 joinPool(p);
408 }
409
410 /**
411 * setRejectedExecutionHandler(null) throws NPE
412 */
413 public void testSetRejectedExecutionHandlerNull() {
414 ThreadPoolExecutor p = new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
415 try {
416 p.setRejectedExecutionHandler(null);
417 shouldThrow();
418 } catch (NullPointerException success) {
419 } finally {
420 joinPool(p);
421 }
422 }
423
424 /**
425 * getLargestPoolSize increases, but doesn't overestimate, when
426 * multiple threads active
427 */
428 public void testGetLargestPoolSize() throws InterruptedException {
429 final int THREADS = 3;
430 final ThreadPoolExecutor p =
431 new CustomTPE(THREADS, THREADS,
432 LONG_DELAY_MS, MILLISECONDS,
433 new ArrayBlockingQueue<Runnable>(10));
434 final CountDownLatch threadsStarted = new CountDownLatch(THREADS);
435 final CountDownLatch done = new CountDownLatch(1);
436 try {
437 assertEquals(0, p.getLargestPoolSize());
438 for (int i = 0; i < THREADS; i++)
439 p.execute(new CheckedRunnable() {
440 public void realRun() throws InterruptedException {
441 threadsStarted.countDown();
442 done.await();
443 assertEquals(THREADS, p.getLargestPoolSize());
444 }});
445 assertTrue(threadsStarted.await(SMALL_DELAY_MS, MILLISECONDS));
446 assertEquals(THREADS, p.getLargestPoolSize());
447 } finally {
448 done.countDown();
449 joinPool(p);
450 assertEquals(THREADS, p.getLargestPoolSize());
451 }
452 }
453
454 /**
455 * getMaximumPoolSize returns value given in constructor if not
456 * otherwise set
457 */
458 public void testGetMaximumPoolSize() {
459 ThreadPoolExecutor p = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
460 assertEquals(2, p.getMaximumPoolSize());
461 joinPool(p);
462 }
463
464 /**
465 * getPoolSize increases, but doesn't overestimate, when threads
466 * become active
467 */
468 public void testGetPoolSize() throws InterruptedException {
469 final ThreadPoolExecutor p =
470 new CustomTPE(1, 1,
471 LONG_DELAY_MS, MILLISECONDS,
472 new ArrayBlockingQueue<Runnable>(10));
473 final CountDownLatch threadStarted = new CountDownLatch(1);
474 final CountDownLatch done = new CountDownLatch(1);
475 try {
476 assertEquals(0, p.getPoolSize());
477 p.execute(new CheckedRunnable() {
478 public void realRun() throws InterruptedException {
479 threadStarted.countDown();
480 assertEquals(1, p.getPoolSize());
481 done.await();
482 }});
483 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
484 assertEquals(1, p.getPoolSize());
485 } finally {
486 done.countDown();
487 joinPool(p);
488 }
489 }
490
491 /**
492 * getTaskCount increases, but doesn't overestimate, when tasks submitted
493 */
494 public void testGetTaskCount() throws InterruptedException {
495 final ThreadPoolExecutor p =
496 new CustomTPE(1, 1,
497 LONG_DELAY_MS, MILLISECONDS,
498 new ArrayBlockingQueue<Runnable>(10));
499 final CountDownLatch threadStarted = new CountDownLatch(1);
500 final CountDownLatch done = new CountDownLatch(1);
501 try {
502 assertEquals(0, p.getTaskCount());
503 p.execute(new CheckedRunnable() {
504 public void realRun() throws InterruptedException {
505 threadStarted.countDown();
506 assertEquals(1, p.getTaskCount());
507 done.await();
508 }});
509 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
510 assertEquals(1, p.getTaskCount());
511 } finally {
512 done.countDown();
513 joinPool(p);
514 }
515 }
516
517 /**
518 * isShutdown is false before shutdown, true after
519 */
520 public void testIsShutdown() {
521
522 ThreadPoolExecutor p = new CustomTPE(1, 1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
523 assertFalse(p.isShutdown());
524 try { p.shutdown(); } catch (SecurityException ok) { return; }
525 assertTrue(p.isShutdown());
526 joinPool(p);
527 }
528
529 /**
530 * isTerminated is false before termination, true after
531 */
532 public void testIsTerminated() throws InterruptedException {
533 final ThreadPoolExecutor p =
534 new CustomTPE(1, 1,
535 LONG_DELAY_MS, MILLISECONDS,
536 new ArrayBlockingQueue<Runnable>(10));
537 final CountDownLatch threadStarted = new CountDownLatch(1);
538 final CountDownLatch done = new CountDownLatch(1);
539 try {
540 assertFalse(p.isTerminating());
541 p.execute(new CheckedRunnable() {
542 public void realRun() throws InterruptedException {
543 assertFalse(p.isTerminating());
544 threadStarted.countDown();
545 done.await();
546 }});
547 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
548 assertFalse(p.isTerminating());
549 done.countDown();
550 } finally {
551 try { p.shutdown(); } catch (SecurityException ok) { return; }
552 }
553 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
554 assertTrue(p.isTerminated());
555 assertFalse(p.isTerminating());
556 }
557
558 /**
559 * isTerminating is not true when running or when terminated
560 */
561 public void testIsTerminating() throws InterruptedException {
562 final ThreadPoolExecutor p =
563 new CustomTPE(1, 1,
564 LONG_DELAY_MS, MILLISECONDS,
565 new ArrayBlockingQueue<Runnable>(10));
566 final CountDownLatch threadStarted = new CountDownLatch(1);
567 final CountDownLatch done = new CountDownLatch(1);
568 try {
569 assertFalse(p.isTerminating());
570 p.execute(new CheckedRunnable() {
571 public void realRun() throws InterruptedException {
572 assertFalse(p.isTerminating());
573 threadStarted.countDown();
574 done.await();
575 }});
576 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
577 assertFalse(p.isTerminating());
578 done.countDown();
579 } finally {
580 try { p.shutdown(); } catch (SecurityException ok) { return; }
581 }
582 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
583 assertTrue(p.isTerminated());
584 assertFalse(p.isTerminating());
585 }
586
587 /**
588 * getQueue returns the work queue, which contains queued tasks
589 */
590 public void testGetQueue() throws InterruptedException {
591 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
592 final ThreadPoolExecutor p =
593 new CustomTPE(1, 1,
594 LONG_DELAY_MS, MILLISECONDS,
595 q);
596 final CountDownLatch threadStarted = new CountDownLatch(1);
597 final CountDownLatch done = new CountDownLatch(1);
598 try {
599 FutureTask[] tasks = new FutureTask[5];
600 for (int i = 0; i < tasks.length; i++) {
601 Callable task = new CheckedCallable<Boolean>() {
602 public Boolean realCall() throws InterruptedException {
603 threadStarted.countDown();
604 assertSame(q, p.getQueue());
605 done.await();
606 return Boolean.TRUE;
607 }};
608 tasks[i] = new FutureTask(task);
609 p.execute(tasks[i]);
610 }
611 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
612 assertSame(q, p.getQueue());
613 assertFalse(q.contains(tasks[0]));
614 assertTrue(q.contains(tasks[tasks.length - 1]));
615 assertEquals(tasks.length - 1, q.size());
616 } finally {
617 done.countDown();
618 joinPool(p);
619 }
620 }
621
622 /**
623 * remove(task) removes queued task, and fails to remove active task
624 */
625 public void testRemove() throws InterruptedException {
626 BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
627 final ThreadPoolExecutor p =
628 new CustomTPE(1, 1,
629 LONG_DELAY_MS, MILLISECONDS,
630 q);
631 Runnable[] tasks = new Runnable[6];
632 final CountDownLatch threadStarted = new CountDownLatch(1);
633 final CountDownLatch done = new CountDownLatch(1);
634 try {
635 for (int i = 0; i < tasks.length; i++) {
636 tasks[i] = new CheckedRunnable() {
637 public void realRun() throws InterruptedException {
638 threadStarted.countDown();
639 done.await();
640 }};
641 p.execute(tasks[i]);
642 }
643 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
644 assertFalse(p.remove(tasks[0]));
645 assertTrue(q.contains(tasks[4]));
646 assertTrue(q.contains(tasks[3]));
647 assertTrue(p.remove(tasks[4]));
648 assertFalse(p.remove(tasks[4]));
649 assertFalse(q.contains(tasks[4]));
650 assertTrue(q.contains(tasks[3]));
651 assertTrue(p.remove(tasks[3]));
652 assertFalse(q.contains(tasks[3]));
653 } finally {
654 done.countDown();
655 joinPool(p);
656 }
657 }
658
659 /**
660 * purge removes cancelled tasks from the queue
661 */
662 public void testPurge() throws InterruptedException {
663 final CountDownLatch threadStarted = new CountDownLatch(1);
664 final CountDownLatch done = new CountDownLatch(1);
665 final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
666 final ThreadPoolExecutor p =
667 new CustomTPE(1, 1,
668 LONG_DELAY_MS, MILLISECONDS,
669 q);
670 FutureTask[] tasks = new FutureTask[5];
671 try {
672 for (int i = 0; i < tasks.length; i++) {
673 Callable task = new CheckedCallable<Boolean>() {
674 public Boolean realCall() throws InterruptedException {
675 threadStarted.countDown();
676 done.await();
677 return Boolean.TRUE;
678 }};
679 tasks[i] = new FutureTask(task);
680 p.execute(tasks[i]);
681 }
682 assertTrue(threadStarted.await(SMALL_DELAY_MS, MILLISECONDS));
683 assertEquals(tasks.length, p.getTaskCount());
684 assertEquals(tasks.length - 1, q.size());
685 assertEquals(1L, p.getActiveCount());
686 assertEquals(0L, p.getCompletedTaskCount());
687 tasks[4].cancel(true);
688 tasks[3].cancel(false);
689 p.purge();
690 assertEquals(tasks.length - 3, q.size());
691 assertEquals(tasks.length - 2, p.getTaskCount());
692 p.purge(); // Nothing to do
693 assertEquals(tasks.length - 3, q.size());
694 assertEquals(tasks.length - 2, p.getTaskCount());
695 } finally {
696 done.countDown();
697 joinPool(p);
698 }
699 }
700
701 /**
702 * shutdownNow returns a list containing tasks that were not run,
703 * and those tasks are drained from the queue
704 */
705 public void testShutdownNow() throws InterruptedException {
706 final int poolSize = 2;
707 final int count = 5;
708 final AtomicInteger ran = new AtomicInteger(0);
709 ThreadPoolExecutor p =
710 new CustomTPE(poolSize, poolSize, LONG_DELAY_MS, MILLISECONDS,
711 new ArrayBlockingQueue<Runnable>(10));
712 CountDownLatch threadsStarted = new CountDownLatch(poolSize);
713 Runnable waiter = new CheckedRunnable() { public void realRun() {
714 threadsStarted.countDown();
715 try {
716 MILLISECONDS.sleep(2 * LONG_DELAY_MS);
717 } catch (InterruptedException success) {}
718 ran.getAndIncrement();
719 }};
720 for (int i = 0; i < count; i++)
721 p.execute(waiter);
722 assertTrue(threadsStarted.await(LONG_DELAY_MS, MILLISECONDS));
723 assertEquals(poolSize, p.getActiveCount());
724 assertEquals(0, p.getCompletedTaskCount());
725 final List<Runnable> queuedTasks;
726 try {
727 queuedTasks = p.shutdownNow();
728 } catch (SecurityException ok) {
729 return; // Allowed in case test doesn't have privs
730 }
731 assertTrue(p.isShutdown());
732 assertTrue(p.getQueue().isEmpty());
733 assertEquals(count - poolSize, queuedTasks.size());
734 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
735 assertTrue(p.isTerminated());
736 assertEquals(poolSize, ran.get());
737 assertEquals(poolSize, p.getCompletedTaskCount());
738 }
739
740 // Exception Tests
741
742 /**
743 * Constructor throws if corePoolSize argument is less than zero
744 */
745 public void testConstructor1() {
746 try {
747 new CustomTPE(-1, 1, 1L, SECONDS,
748 new ArrayBlockingQueue<Runnable>(10));
749 shouldThrow();
750 } catch (IllegalArgumentException success) {}
751 }
752
753 /**
754 * Constructor throws if maximumPoolSize is less than zero
755 */
756 public void testConstructor2() {
757 try {
758 new CustomTPE(1, -1, 1L, SECONDS,
759 new ArrayBlockingQueue<Runnable>(10));
760 shouldThrow();
761 } catch (IllegalArgumentException success) {}
762 }
763
764 /**
765 * Constructor throws if maximumPoolSize is equal to zero
766 */
767 public void testConstructor3() {
768 try {
769 new CustomTPE(1, 0, 1L, SECONDS,
770 new ArrayBlockingQueue<Runnable>(10));
771 shouldThrow();
772 } catch (IllegalArgumentException success) {}
773 }
774
775 /**
776 * Constructor throws if keepAliveTime is less than zero
777 */
778 public void testConstructor4() {
779 try {
780 new CustomTPE(1, 2, -1L, SECONDS,
781 new ArrayBlockingQueue<Runnable>(10));
782 shouldThrow();
783 } catch (IllegalArgumentException success) {}
784 }
785
786 /**
787 * Constructor throws if corePoolSize is greater than the maximumPoolSize
788 */
789 public void testConstructor5() {
790 try {
791 new CustomTPE(2, 1, 1L, SECONDS,
792 new ArrayBlockingQueue<Runnable>(10));
793 shouldThrow();
794 } catch (IllegalArgumentException success) {}
795 }
796
797 /**
798 * Constructor throws if workQueue is set to null
799 */
800 public void testConstructorNullPointerException() {
801 try {
802 new CustomTPE(1, 2, 1L, SECONDS, null);
803 shouldThrow();
804 } catch (NullPointerException success) {}
805 }
806
807 /**
808 * Constructor throws if corePoolSize argument is less than zero
809 */
810 public void testConstructor6() {
811 try {
812 new CustomTPE(-1, 1, 1L, SECONDS,
813 new ArrayBlockingQueue<Runnable>(10),
814 new SimpleThreadFactory());
815 shouldThrow();
816 } catch (IllegalArgumentException success) {}
817 }
818
819 /**
820 * Constructor throws if maximumPoolSize is less than zero
821 */
822 public void testConstructor7() {
823 try {
824 new CustomTPE(1,-1, 1L, SECONDS,
825 new ArrayBlockingQueue<Runnable>(10),
826 new SimpleThreadFactory());
827 shouldThrow();
828 } catch (IllegalArgumentException success) {}
829 }
830
831 /**
832 * Constructor throws if maximumPoolSize is equal to zero
833 */
834 public void testConstructor8() {
835 try {
836 new CustomTPE(1, 0, 1L, SECONDS,
837 new ArrayBlockingQueue<Runnable>(10),
838 new SimpleThreadFactory());
839 shouldThrow();
840 } catch (IllegalArgumentException success) {}
841 }
842
843 /**
844 * Constructor throws if keepAliveTime is less than zero
845 */
846 public void testConstructor9() {
847 try {
848 new CustomTPE(1, 2, -1L, SECONDS,
849 new ArrayBlockingQueue<Runnable>(10),
850 new SimpleThreadFactory());
851 shouldThrow();
852 } catch (IllegalArgumentException success) {}
853 }
854
855 /**
856 * Constructor throws if corePoolSize is greater than the maximumPoolSize
857 */
858 public void testConstructor10() {
859 try {
860 new CustomTPE(2, 1, 1L, SECONDS,
861 new ArrayBlockingQueue<Runnable>(10),
862 new SimpleThreadFactory());
863 shouldThrow();
864 } catch (IllegalArgumentException success) {}
865 }
866
867 /**
868 * Constructor throws if workQueue is set to null
869 */
870 public void testConstructorNullPointerException2() {
871 try {
872 new CustomTPE(1, 2, 1L, SECONDS, null, new SimpleThreadFactory());
873 shouldThrow();
874 } catch (NullPointerException success) {}
875 }
876
877 /**
878 * Constructor throws if threadFactory is set to null
879 */
880 public void testConstructorNullPointerException3() {
881 try {
882 new CustomTPE(1, 2, 1L, SECONDS,
883 new ArrayBlockingQueue<Runnable>(10),
884 (ThreadFactory) null);
885 shouldThrow();
886 } catch (NullPointerException success) {}
887 }
888
889 /**
890 * Constructor throws if corePoolSize argument is less than zero
891 */
892 public void testConstructor11() {
893 try {
894 new CustomTPE(-1, 1, 1L, SECONDS,
895 new ArrayBlockingQueue<Runnable>(10),
896 new NoOpREHandler());
897 shouldThrow();
898 } catch (IllegalArgumentException success) {}
899 }
900
901 /**
902 * Constructor throws if maximumPoolSize is less than zero
903 */
904 public void testConstructor12() {
905 try {
906 new CustomTPE(1, -1, 1L, SECONDS,
907 new ArrayBlockingQueue<Runnable>(10),
908 new NoOpREHandler());
909 shouldThrow();
910 } catch (IllegalArgumentException success) {}
911 }
912
913 /**
914 * Constructor throws if maximumPoolSize is equal to zero
915 */
916 public void testConstructor13() {
917 try {
918 new CustomTPE(1, 0, 1L, SECONDS,
919 new ArrayBlockingQueue<Runnable>(10),
920 new NoOpREHandler());
921 shouldThrow();
922 } catch (IllegalArgumentException success) {}
923 }
924
925 /**
926 * Constructor throws if keepAliveTime is less than zero
927 */
928 public void testConstructor14() {
929 try {
930 new CustomTPE(1, 2, -1L, SECONDS,
931 new ArrayBlockingQueue<Runnable>(10),
932 new NoOpREHandler());
933 shouldThrow();
934 } catch (IllegalArgumentException success) {}
935 }
936
937 /**
938 * Constructor throws if corePoolSize is greater than the maximumPoolSize
939 */
940 public void testConstructor15() {
941 try {
942 new CustomTPE(2, 1, 1L, SECONDS,
943 new ArrayBlockingQueue<Runnable>(10),
944 new NoOpREHandler());
945 shouldThrow();
946 } catch (IllegalArgumentException success) {}
947 }
948
949 /**
950 * Constructor throws if workQueue is set to null
951 */
952 public void testConstructorNullPointerException4() {
953 try {
954 new CustomTPE(1, 2, 1L, SECONDS,
955 null,
956 new NoOpREHandler());
957 shouldThrow();
958 } catch (NullPointerException success) {}
959 }
960
961 /**
962 * Constructor throws if handler is set to null
963 */
964 public void testConstructorNullPointerException5() {
965 try {
966 new CustomTPE(1, 2, 1L, SECONDS,
967 new ArrayBlockingQueue<Runnable>(10),
968 (RejectedExecutionHandler) null);
969 shouldThrow();
970 } catch (NullPointerException success) {}
971 }
972
973 /**
974 * Constructor throws if corePoolSize argument is less than zero
975 */
976 public void testConstructor16() {
977 try {
978 new CustomTPE(-1, 1, 1L, SECONDS,
979 new ArrayBlockingQueue<Runnable>(10),
980 new SimpleThreadFactory(),
981 new NoOpREHandler());
982 shouldThrow();
983 } catch (IllegalArgumentException success) {}
984 }
985
986 /**
987 * Constructor throws if maximumPoolSize is less than zero
988 */
989 public void testConstructor17() {
990 try {
991 new CustomTPE(1, -1, 1L, SECONDS,
992 new ArrayBlockingQueue<Runnable>(10),
993 new SimpleThreadFactory(),
994 new NoOpREHandler());
995 shouldThrow();
996 } catch (IllegalArgumentException success) {}
997 }
998
999 /**
1000 * Constructor throws if maximumPoolSize is equal to zero
1001 */
1002 public void testConstructor18() {
1003 try {
1004 new CustomTPE(1, 0, 1L, SECONDS,
1005 new ArrayBlockingQueue<Runnable>(10),
1006 new SimpleThreadFactory(),
1007 new NoOpREHandler());
1008 shouldThrow();
1009 } catch (IllegalArgumentException success) {}
1010 }
1011
1012 /**
1013 * Constructor throws if keepAliveTime is less than zero
1014 */
1015 public void testConstructor19() {
1016 try {
1017 new CustomTPE(1, 2, -1L, SECONDS,
1018 new ArrayBlockingQueue<Runnable>(10),
1019 new SimpleThreadFactory(),
1020 new NoOpREHandler());
1021 shouldThrow();
1022 } catch (IllegalArgumentException success) {}
1023 }
1024
1025 /**
1026 * Constructor throws if corePoolSize is greater than the maximumPoolSize
1027 */
1028 public void testConstructor20() {
1029 try {
1030 new CustomTPE(2, 1, 1L, SECONDS,
1031 new ArrayBlockingQueue<Runnable>(10),
1032 new SimpleThreadFactory(),
1033 new NoOpREHandler());
1034 shouldThrow();
1035 } catch (IllegalArgumentException success) {}
1036 }
1037
1038 /**
1039 * Constructor throws if workQueue is null
1040 */
1041 public void testConstructorNullPointerException6() {
1042 try {
1043 new CustomTPE(1, 2, 1L, SECONDS,
1044 null,
1045 new SimpleThreadFactory(),
1046 new NoOpREHandler());
1047 shouldThrow();
1048 } catch (NullPointerException success) {}
1049 }
1050
1051 /**
1052 * Constructor throws if handler is null
1053 */
1054 public void testConstructorNullPointerException7() {
1055 try {
1056 new CustomTPE(1, 2, 1L, SECONDS,
1057 new ArrayBlockingQueue<Runnable>(10),
1058 new SimpleThreadFactory(),
1059 (RejectedExecutionHandler) null);
1060 shouldThrow();
1061 } catch (NullPointerException success) {}
1062 }
1063
1064 /**
1065 * Constructor throws if ThreadFactory is null
1066 */
1067 public void testConstructorNullPointerException8() {
1068 try {
1069 new CustomTPE(1, 2, 1L, SECONDS,
1070 new ArrayBlockingQueue<Runnable>(10),
1071 (ThreadFactory) null,
1072 new NoOpREHandler());
1073 shouldThrow();
1074 } catch (NullPointerException success) {}
1075 }
1076
1077 /**
1078 * execute throws RejectedExecutionException if saturated.
1079 */
1080 public void testSaturatedExecute() {
1081 ThreadPoolExecutor p =
1082 new CustomTPE(1, 1,
1083 LONG_DELAY_MS, MILLISECONDS,
1084 new ArrayBlockingQueue<Runnable>(1));
1085 final CountDownLatch done = new CountDownLatch(1);
1086 try {
1087 Runnable task = new CheckedRunnable() {
1088 public void realRun() throws InterruptedException {
1089 done.await();
1090 }};
1091 for (int i = 0; i < 2; ++i)
1092 p.execute(task);
1093 for (int i = 0; i < 2; ++i) {
1094 try {
1095 p.execute(task);
1096 shouldThrow();
1097 } catch (RejectedExecutionException success) {}
1098 assertTrue(p.getTaskCount() <= 2);
1099 }
1100 } finally {
1101 done.countDown();
1102 joinPool(p);
1103 }
1104 }
1105
1106 /**
1107 * executor using CallerRunsPolicy runs task if saturated.
1108 */
1109 public void testSaturatedExecute2() {
1110 RejectedExecutionHandler h = new CustomTPE.CallerRunsPolicy();
1111 ThreadPoolExecutor p = new CustomTPE(1, 1,
1112 LONG_DELAY_MS, MILLISECONDS,
1113 new ArrayBlockingQueue<Runnable>(1),
1114 h);
1115 try {
1116 TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5];
1117 for (int i = 0; i < tasks.length; ++i)
1118 tasks[i] = new TrackedNoOpRunnable();
1119 TrackedLongRunnable mr = new TrackedLongRunnable();
1120 p.execute(mr);
1121 for (int i = 0; i < tasks.length; ++i)
1122 p.execute(tasks[i]);
1123 for (int i = 1; i < tasks.length; ++i)
1124 assertTrue(tasks[i].done);
1125 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1126 } finally {
1127 joinPool(p);
1128 }
1129 }
1130
1131 /**
1132 * executor using DiscardPolicy drops task if saturated.
1133 */
1134 public void testSaturatedExecute3() {
1135 RejectedExecutionHandler h = new CustomTPE.DiscardPolicy();
1136 ThreadPoolExecutor p =
1137 new CustomTPE(1, 1,
1138 LONG_DELAY_MS, MILLISECONDS,
1139 new ArrayBlockingQueue<Runnable>(1),
1140 h);
1141 try {
1142 TrackedNoOpRunnable[] tasks = new TrackedNoOpRunnable[5];
1143 for (int i = 0; i < tasks.length; ++i)
1144 tasks[i] = new TrackedNoOpRunnable();
1145 p.execute(new TrackedLongRunnable());
1146 for (TrackedNoOpRunnable task : tasks)
1147 p.execute(task);
1148 for (TrackedNoOpRunnable task : tasks)
1149 assertFalse(task.done);
1150 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1151 } finally {
1152 joinPool(p);
1153 }
1154 }
1155
1156 /**
1157 * executor using DiscardOldestPolicy drops oldest task if saturated.
1158 */
1159 public void testSaturatedExecute4() {
1160 RejectedExecutionHandler h = new CustomTPE.DiscardOldestPolicy();
1161 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1162 try {
1163 p.execute(new TrackedLongRunnable());
1164 TrackedLongRunnable r2 = new TrackedLongRunnable();
1165 p.execute(r2);
1166 assertTrue(p.getQueue().contains(r2));
1167 TrackedNoOpRunnable r3 = new TrackedNoOpRunnable();
1168 p.execute(r3);
1169 assertFalse(p.getQueue().contains(r2));
1170 assertTrue(p.getQueue().contains(r3));
1171 try { p.shutdownNow(); } catch (SecurityException ok) { return; }
1172 } finally {
1173 joinPool(p);
1174 }
1175 }
1176
1177 /**
1178 * execute throws RejectedExecutionException if shutdown
1179 */
1180 public void testRejectedExecutionExceptionOnShutdown() {
1181 ThreadPoolExecutor p =
1182 new CustomTPE(1,1,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(1));
1183 try { p.shutdown(); } catch (SecurityException ok) { return; }
1184 try {
1185 p.execute(new NoOpRunnable());
1186 shouldThrow();
1187 } catch (RejectedExecutionException success) {}
1188
1189 joinPool(p);
1190 }
1191
1192 /**
1193 * execute using CallerRunsPolicy drops task on shutdown
1194 */
1195 public void testCallerRunsOnShutdown() {
1196 RejectedExecutionHandler h = new CustomTPE.CallerRunsPolicy();
1197 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1198
1199 try { p.shutdown(); } catch (SecurityException ok) { return; }
1200 try {
1201 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1202 p.execute(r);
1203 assertFalse(r.done);
1204 } finally {
1205 joinPool(p);
1206 }
1207 }
1208
1209 /**
1210 * execute using DiscardPolicy drops task on shutdown
1211 */
1212 public void testDiscardOnShutdown() {
1213 RejectedExecutionHandler h = new CustomTPE.DiscardPolicy();
1214 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1215
1216 try { p.shutdown(); } catch (SecurityException ok) { return; }
1217 try {
1218 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1219 p.execute(r);
1220 assertFalse(r.done);
1221 } finally {
1222 joinPool(p);
1223 }
1224 }
1225
1226 /**
1227 * execute using DiscardOldestPolicy drops task on shutdown
1228 */
1229 public void testDiscardOldestOnShutdown() {
1230 RejectedExecutionHandler h = new CustomTPE.DiscardOldestPolicy();
1231 ThreadPoolExecutor p = new CustomTPE(1,1, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), h);
1232
1233 try { p.shutdown(); } catch (SecurityException ok) { return; }
1234 try {
1235 TrackedNoOpRunnable r = new TrackedNoOpRunnable();
1236 p.execute(r);
1237 assertFalse(r.done);
1238 } finally {
1239 joinPool(p);
1240 }
1241 }
1242
1243 /**
1244 * execute(null) throws NPE
1245 */
1246 public void testExecuteNull() {
1247 ThreadPoolExecutor p =
1248 new CustomTPE(1, 2, 1L, SECONDS,
1249 new ArrayBlockingQueue<Runnable>(10));
1250 try {
1251 p.execute(null);
1252 shouldThrow();
1253 } catch (NullPointerException success) {}
1254
1255 joinPool(p);
1256 }
1257
1258 /**
1259 * setCorePoolSize of negative value throws IllegalArgumentException
1260 */
1261 public void testCorePoolSizeIllegalArgumentException() {
1262 ThreadPoolExecutor p =
1263 new CustomTPE(1,2,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1264 try {
1265 p.setCorePoolSize(-1);
1266 shouldThrow();
1267 } catch (IllegalArgumentException success) {
1268 } finally {
1269 try { p.shutdown(); } catch (SecurityException ok) { return; }
1270 }
1271 joinPool(p);
1272 }
1273
1274 /**
1275 * setMaximumPoolSize(int) throws IllegalArgumentException
1276 * if given a value less the core pool size
1277 */
1278 public void testMaximumPoolSizeIllegalArgumentException() {
1279 ThreadPoolExecutor p =
1280 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1281 try {
1282 p.setMaximumPoolSize(1);
1283 shouldThrow();
1284 } catch (IllegalArgumentException success) {
1285 } finally {
1286 try { p.shutdown(); } catch (SecurityException ok) { return; }
1287 }
1288 joinPool(p);
1289 }
1290
1291 /**
1292 * setMaximumPoolSize throws IllegalArgumentException
1293 * if given a negative value
1294 */
1295 public void testMaximumPoolSizeIllegalArgumentException2() {
1296 ThreadPoolExecutor p =
1297 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1298 try {
1299 p.setMaximumPoolSize(-1);
1300 shouldThrow();
1301 } catch (IllegalArgumentException success) {
1302 } finally {
1303 try { p.shutdown(); } catch (SecurityException ok) { return; }
1304 }
1305 joinPool(p);
1306 }
1307
1308 /**
1309 * setKeepAliveTime throws IllegalArgumentException
1310 * when given a negative value
1311 */
1312 public void testKeepAliveTimeIllegalArgumentException() {
1313 ThreadPoolExecutor p =
1314 new CustomTPE(2,3,LONG_DELAY_MS, MILLISECONDS,new ArrayBlockingQueue<Runnable>(10));
1315
1316 try {
1317 p.setKeepAliveTime(-1,MILLISECONDS);
1318 shouldThrow();
1319 } catch (IllegalArgumentException success) {
1320 } finally {
1321 try { p.shutdown(); } catch (SecurityException ok) { return; }
1322 }
1323 joinPool(p);
1324 }
1325
1326 /**
1327 * terminated() is called on termination
1328 */
1329 public void testTerminated() {
1330 CustomTPE p = new CustomTPE();
1331 try { p.shutdown(); } catch (SecurityException ok) { return; }
1332 assertTrue(p.terminatedCalled());
1333 joinPool(p);
1334 }
1335
1336 /**
1337 * beforeExecute and afterExecute are called when executing task
1338 */
1339 public void testBeforeAfter() throws InterruptedException {
1340 CustomTPE p = new CustomTPE();
1341 try {
1342 final CountDownLatch done = new CountDownLatch(1);
1343 p.execute(new CheckedRunnable() {
1344 public void realRun() {
1345 done.countDown();
1346 }});
1347 await(p.afterCalled);
1348 assertEquals(0, done.getCount());
1349 assertTrue(p.afterCalled());
1350 assertTrue(p.beforeCalled());
1351 try { p.shutdown(); } catch (SecurityException ok) { return; }
1352 } finally {
1353 joinPool(p);
1354 }
1355 }
1356
1357 /**
1358 * completed submit of callable returns result
1359 */
1360 public void testSubmitCallable() throws Exception {
1361 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1362 try {
1363 Future<String> future = e.submit(new StringTask());
1364 String result = future.get();
1365 assertSame(TEST_STRING, result);
1366 } finally {
1367 joinPool(e);
1368 }
1369 }
1370
1371 /**
1372 * completed submit of runnable returns successfully
1373 */
1374 public void testSubmitRunnable() throws Exception {
1375 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1376 try {
1377 Future<?> future = e.submit(new NoOpRunnable());
1378 future.get();
1379 assertTrue(future.isDone());
1380 } finally {
1381 joinPool(e);
1382 }
1383 }
1384
1385 /**
1386 * completed submit of (runnable, result) returns result
1387 */
1388 public void testSubmitRunnable2() throws Exception {
1389 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1390 try {
1391 Future<String> future = e.submit(new NoOpRunnable(), TEST_STRING);
1392 String result = future.get();
1393 assertSame(TEST_STRING, result);
1394 } finally {
1395 joinPool(e);
1396 }
1397 }
1398
1399 /**
1400 * invokeAny(null) throws NPE
1401 */
1402 public void testInvokeAny1() throws Exception {
1403 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1404 try {
1405 e.invokeAny(null);
1406 shouldThrow();
1407 } catch (NullPointerException success) {
1408 } finally {
1409 joinPool(e);
1410 }
1411 }
1412
1413 /**
1414 * invokeAny(empty collection) throws IAE
1415 */
1416 public void testInvokeAny2() throws Exception {
1417 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1418 try {
1419 e.invokeAny(new ArrayList<Callable<String>>());
1420 shouldThrow();
1421 } catch (IllegalArgumentException success) {
1422 } finally {
1423 joinPool(e);
1424 }
1425 }
1426
1427 /**
1428 * invokeAny(c) throws NPE if c has null elements
1429 */
1430 public void testInvokeAny3() throws Exception {
1431 CountDownLatch latch = new CountDownLatch(1);
1432 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1433 List<Callable<String>> l = new ArrayList<Callable<String>>();
1434 l.add(latchAwaitingStringTask(latch));
1435 l.add(null);
1436 try {
1437 e.invokeAny(l);
1438 shouldThrow();
1439 } catch (NullPointerException success) {
1440 } finally {
1441 latch.countDown();
1442 joinPool(e);
1443 }
1444 }
1445
1446 /**
1447 * invokeAny(c) throws ExecutionException if no task completes
1448 */
1449 public void testInvokeAny4() throws Exception {
1450 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1451 List<Callable<String>> l = new ArrayList<Callable<String>>();
1452 l.add(new NPETask());
1453 try {
1454 e.invokeAny(l);
1455 shouldThrow();
1456 } catch (ExecutionException success) {
1457 assertTrue(success.getCause() instanceof NullPointerException);
1458 } finally {
1459 joinPool(e);
1460 }
1461 }
1462
1463 /**
1464 * invokeAny(c) returns result of some task
1465 */
1466 public void testInvokeAny5() throws Exception {
1467 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1468 try {
1469 List<Callable<String>> l = new ArrayList<Callable<String>>();
1470 l.add(new StringTask());
1471 l.add(new StringTask());
1472 String result = e.invokeAny(l);
1473 assertSame(TEST_STRING, result);
1474 } finally {
1475 joinPool(e);
1476 }
1477 }
1478
1479 /**
1480 * invokeAll(null) throws NPE
1481 */
1482 public void testInvokeAll1() throws Exception {
1483 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1484 try {
1485 e.invokeAll(null);
1486 shouldThrow();
1487 } catch (NullPointerException success) {
1488 } finally {
1489 joinPool(e);
1490 }
1491 }
1492
1493 /**
1494 * invokeAll(empty collection) returns empty collection
1495 */
1496 public void testInvokeAll2() throws Exception {
1497 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1498 try {
1499 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>());
1500 assertTrue(r.isEmpty());
1501 } finally {
1502 joinPool(e);
1503 }
1504 }
1505
1506 /**
1507 * invokeAll(c) throws NPE if c has null elements
1508 */
1509 public void testInvokeAll3() throws Exception {
1510 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1511 List<Callable<String>> l = new ArrayList<Callable<String>>();
1512 l.add(new StringTask());
1513 l.add(null);
1514 try {
1515 e.invokeAll(l);
1516 shouldThrow();
1517 } catch (NullPointerException success) {
1518 } finally {
1519 joinPool(e);
1520 }
1521 }
1522
1523 /**
1524 * get of element of invokeAll(c) throws exception on failed task
1525 */
1526 public void testInvokeAll4() throws Exception {
1527 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1528 List<Callable<String>> l = new ArrayList<Callable<String>>();
1529 l.add(new NPETask());
1530 List<Future<String>> futures = e.invokeAll(l);
1531 assertEquals(1, futures.size());
1532 try {
1533 futures.get(0).get();
1534 shouldThrow();
1535 } catch (ExecutionException success) {
1536 assertTrue(success.getCause() instanceof NullPointerException);
1537 } finally {
1538 joinPool(e);
1539 }
1540 }
1541
1542 /**
1543 * invokeAll(c) returns results of all completed tasks
1544 */
1545 public void testInvokeAll5() throws Exception {
1546 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1547 try {
1548 List<Callable<String>> l = new ArrayList<Callable<String>>();
1549 l.add(new StringTask());
1550 l.add(new StringTask());
1551 List<Future<String>> futures = e.invokeAll(l);
1552 assertEquals(2, futures.size());
1553 for (Future<String> future : futures)
1554 assertSame(TEST_STRING, future.get());
1555 } finally {
1556 joinPool(e);
1557 }
1558 }
1559
1560 /**
1561 * timed invokeAny(null) throws NPE
1562 */
1563 public void testTimedInvokeAny1() throws Exception {
1564 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1565 try {
1566 e.invokeAny(null, MEDIUM_DELAY_MS, MILLISECONDS);
1567 shouldThrow();
1568 } catch (NullPointerException success) {
1569 } finally {
1570 joinPool(e);
1571 }
1572 }
1573
1574 /**
1575 * timed invokeAny(,,null) throws NPE
1576 */
1577 public void testTimedInvokeAnyNullTimeUnit() throws Exception {
1578 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1579 List<Callable<String>> l = new ArrayList<Callable<String>>();
1580 l.add(new StringTask());
1581 try {
1582 e.invokeAny(l, MEDIUM_DELAY_MS, null);
1583 shouldThrow();
1584 } catch (NullPointerException success) {
1585 } finally {
1586 joinPool(e);
1587 }
1588 }
1589
1590 /**
1591 * timed invokeAny(empty collection) throws IAE
1592 */
1593 public void testTimedInvokeAny2() throws Exception {
1594 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1595 try {
1596 e.invokeAny(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1597 shouldThrow();
1598 } catch (IllegalArgumentException success) {
1599 } finally {
1600 joinPool(e);
1601 }
1602 }
1603
1604 /**
1605 * timed invokeAny(c) throws NPE if c has null elements
1606 */
1607 public void testTimedInvokeAny3() throws Exception {
1608 CountDownLatch latch = new CountDownLatch(1);
1609 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1610 List<Callable<String>> l = new ArrayList<Callable<String>>();
1611 l.add(latchAwaitingStringTask(latch));
1612 l.add(null);
1613 try {
1614 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1615 shouldThrow();
1616 } catch (NullPointerException success) {
1617 } finally {
1618 latch.countDown();
1619 joinPool(e);
1620 }
1621 }
1622
1623 /**
1624 * timed invokeAny(c) throws ExecutionException if no task completes
1625 */
1626 public void testTimedInvokeAny4() throws Exception {
1627 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1628 List<Callable<String>> l = new ArrayList<Callable<String>>();
1629 l.add(new NPETask());
1630 try {
1631 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1632 shouldThrow();
1633 } catch (ExecutionException success) {
1634 assertTrue(success.getCause() instanceof NullPointerException);
1635 } finally {
1636 joinPool(e);
1637 }
1638 }
1639
1640 /**
1641 * timed invokeAny(c) returns result of some task
1642 */
1643 public void testTimedInvokeAny5() throws Exception {
1644 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1645 try {
1646 List<Callable<String>> l = new ArrayList<Callable<String>>();
1647 l.add(new StringTask());
1648 l.add(new StringTask());
1649 String result = e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1650 assertSame(TEST_STRING, result);
1651 } finally {
1652 joinPool(e);
1653 }
1654 }
1655
1656 /**
1657 * timed invokeAll(null) throws NPE
1658 */
1659 public void testTimedInvokeAll1() throws Exception {
1660 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1661 try {
1662 e.invokeAll(null, MEDIUM_DELAY_MS, MILLISECONDS);
1663 shouldThrow();
1664 } catch (NullPointerException success) {
1665 } finally {
1666 joinPool(e);
1667 }
1668 }
1669
1670 /**
1671 * timed invokeAll(,,null) throws NPE
1672 */
1673 public void testTimedInvokeAllNullTimeUnit() throws Exception {
1674 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1675 List<Callable<String>> l = new ArrayList<Callable<String>>();
1676 l.add(new StringTask());
1677 try {
1678 e.invokeAll(l, MEDIUM_DELAY_MS, null);
1679 shouldThrow();
1680 } catch (NullPointerException success) {
1681 } finally {
1682 joinPool(e);
1683 }
1684 }
1685
1686 /**
1687 * timed invokeAll(empty collection) returns empty collection
1688 */
1689 public void testTimedInvokeAll2() throws Exception {
1690 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1691 try {
1692 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1693 assertTrue(r.isEmpty());
1694 } finally {
1695 joinPool(e);
1696 }
1697 }
1698
1699 /**
1700 * timed invokeAll(c) throws NPE if c has null elements
1701 */
1702 public void testTimedInvokeAll3() throws Exception {
1703 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1704 List<Callable<String>> l = new ArrayList<Callable<String>>();
1705 l.add(new StringTask());
1706 l.add(null);
1707 try {
1708 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1709 shouldThrow();
1710 } catch (NullPointerException success) {
1711 } finally {
1712 joinPool(e);
1713 }
1714 }
1715
1716 /**
1717 * get of element of invokeAll(c) throws exception on failed task
1718 */
1719 public void testTimedInvokeAll4() throws Exception {
1720 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1721 List<Callable<String>> l = new ArrayList<Callable<String>>();
1722 l.add(new NPETask());
1723 List<Future<String>> futures =
1724 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1725 assertEquals(1, futures.size());
1726 try {
1727 futures.get(0).get();
1728 shouldThrow();
1729 } catch (ExecutionException success) {
1730 assertTrue(success.getCause() instanceof NullPointerException);
1731 } finally {
1732 joinPool(e);
1733 }
1734 }
1735
1736 /**
1737 * timed invokeAll(c) returns results of all completed tasks
1738 */
1739 public void testTimedInvokeAll5() throws Exception {
1740 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1741 try {
1742 List<Callable<String>> l = new ArrayList<Callable<String>>();
1743 l.add(new StringTask());
1744 l.add(new StringTask());
1745 List<Future<String>> futures =
1746 e.invokeAll(l, LONG_DELAY_MS, MILLISECONDS);
1747 assertEquals(2, futures.size());
1748 for (Future<String> future : futures)
1749 assertSame(TEST_STRING, future.get());
1750 } finally {
1751 joinPool(e);
1752 }
1753 }
1754
1755 /**
1756 * timed invokeAll(c) cancels tasks not completed by timeout
1757 */
1758 public void testTimedInvokeAll6() throws Exception {
1759 ExecutorService e = new CustomTPE(2, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1760 try {
1761 for (long timeout = timeoutMillis();;) {
1762 List<Callable<String>> tasks = new ArrayList<>();
1763 tasks.add(new StringTask("0"));
1764 tasks.add(Executors.callable(new LongPossiblyInterruptedRunnable(), TEST_STRING));
1765 tasks.add(new StringTask("2"));
1766 long startTime = System.nanoTime();
1767 List<Future<String>> futures =
1768 e.invokeAll(tasks, timeout, MILLISECONDS);
1769 assertEquals(tasks.size(), futures.size());
1770 assertTrue(millisElapsedSince(startTime) >= timeout);
1771 for (Future future : futures)
1772 assertTrue(future.isDone());
1773 assertTrue(futures.get(1).isCancelled());
1774 try {
1775 assertEquals("0", futures.get(0).get());
1776 assertEquals("2", futures.get(2).get());
1777 break;
1778 } catch (CancellationException retryWithLongerTimeout) {
1779 timeout *= 2;
1780 if (timeout >= LONG_DELAY_MS / 2)
1781 fail("expected exactly one task to be cancelled");
1782 }
1783 }
1784 } finally {
1785 joinPool(e);
1786 }
1787 }
1788
1789 /**
1790 * Execution continues if there is at least one thread even if
1791 * thread factory fails to create more
1792 */
1793 public void testFailingThreadFactory() throws InterruptedException {
1794 final ExecutorService e =
1795 new CustomTPE(100, 100,
1796 LONG_DELAY_MS, MILLISECONDS,
1797 new LinkedBlockingQueue<Runnable>(),
1798 new FailingThreadFactory());
1799 try {
1800 final int TASKS = 100;
1801 final CountDownLatch done = new CountDownLatch(TASKS);
1802 for (int k = 0; k < TASKS; ++k)
1803 e.execute(new CheckedRunnable() {
1804 public void realRun() {
1805 done.countDown();
1806 }});
1807 assertTrue(done.await(LONG_DELAY_MS, MILLISECONDS));
1808 } finally {
1809 joinPool(e);
1810 }
1811 }
1812
1813 /**
1814 * allowsCoreThreadTimeOut is by default false.
1815 */
1816 public void testAllowsCoreThreadTimeOut() {
1817 ThreadPoolExecutor p = new CustomTPE(2, 2, 1000, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
1818 assertFalse(p.allowsCoreThreadTimeOut());
1819 joinPool(p);
1820 }
1821
1822 /**
1823 * allowCoreThreadTimeOut(true) causes idle threads to time out
1824 */
1825 public void testAllowCoreThreadTimeOut_true() throws Exception {
1826 long keepAliveTime = timeoutMillis();
1827 final ThreadPoolExecutor p =
1828 new CustomTPE(2, 10,
1829 keepAliveTime, MILLISECONDS,
1830 new ArrayBlockingQueue<Runnable>(10));
1831 final CountDownLatch threadStarted = new CountDownLatch(1);
1832 try {
1833 p.allowCoreThreadTimeOut(true);
1834 p.execute(new CheckedRunnable() {
1835 public void realRun() {
1836 threadStarted.countDown();
1837 assertEquals(1, p.getPoolSize());
1838 }});
1839 await(threadStarted);
1840 delay(keepAliveTime);
1841 long startTime = System.nanoTime();
1842 while (p.getPoolSize() > 0
1843 && millisElapsedSince(startTime) < LONG_DELAY_MS)
1844 Thread.yield();
1845 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1846 assertEquals(0, p.getPoolSize());
1847 } finally {
1848 joinPool(p);
1849 }
1850 }
1851
1852 /**
1853 * allowCoreThreadTimeOut(false) causes idle threads not to time out
1854 */
1855 public void testAllowCoreThreadTimeOut_false() throws Exception {
1856 long keepAliveTime = timeoutMillis();
1857 final ThreadPoolExecutor p =
1858 new CustomTPE(2, 10,
1859 keepAliveTime, MILLISECONDS,
1860 new ArrayBlockingQueue<Runnable>(10));
1861 final CountDownLatch threadStarted = new CountDownLatch(1);
1862 try {
1863 p.allowCoreThreadTimeOut(false);
1864 p.execute(new CheckedRunnable() {
1865 public void realRun() throws InterruptedException {
1866 threadStarted.countDown();
1867 assertTrue(p.getPoolSize() >= 1);
1868 }});
1869 delay(2 * keepAliveTime);
1870 assertTrue(p.getPoolSize() >= 1);
1871 } finally {
1872 joinPool(p);
1873 }
1874 }
1875
1876 /**
1877 * get(cancelled task) throws CancellationException
1878 * (in part, a test of CustomTPE itself)
1879 */
1880 public void testGet_cancelled() throws Exception {
1881 final ExecutorService e =
1882 new CustomTPE(1, 1,
1883 LONG_DELAY_MS, MILLISECONDS,
1884 new LinkedBlockingQueue<Runnable>());
1885 try {
1886 final CountDownLatch blockerStarted = new CountDownLatch(1);
1887 final CountDownLatch done = new CountDownLatch(1);
1888 final List<Future<?>> futures = new ArrayList<>();
1889 for (int i = 0; i < 2; i++) {
1890 Runnable r = new CheckedRunnable() { public void realRun()
1891 throws Throwable {
1892 blockerStarted.countDown();
1893 assertTrue(done.await(2 * LONG_DELAY_MS, MILLISECONDS));
1894 }};
1895 futures.add(e.submit(r));
1896 }
1897 assertTrue(blockerStarted.await(LONG_DELAY_MS, MILLISECONDS));
1898 for (Future<?> future : futures) future.cancel(false);
1899 for (Future<?> future : futures) {
1900 try {
1901 future.get();
1902 shouldThrow();
1903 } catch (CancellationException success) {}
1904 try {
1905 future.get(LONG_DELAY_MS, MILLISECONDS);
1906 shouldThrow();
1907 } catch (CancellationException success) {}
1908 assertTrue(future.isCancelled());
1909 assertTrue(future.isDone());
1910 }
1911 done.countDown();
1912 } finally {
1913 joinPool(e);
1914 }
1915 }
1916
1917 }