ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/ScheduledExecutorSubclassTest.java
Revision: 1.68
Committed: Mon May 29 22:44:27 2017 UTC (6 years, 11 months ago) by jsr166
Branch: MAIN
Changes since 1.67: +30 -21 lines
Log Message:
more timeout handling rework; remove most uses of MEDIUM_DELAY_MS; randomize timeouts and TimeUnits; write out IAE and ISE

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 import static java.util.concurrent.TimeUnit.MILLISECONDS;
8 import static java.util.concurrent.TimeUnit.NANOSECONDS;
9 import static java.util.concurrent.TimeUnit.SECONDS;
10
11 import java.util.ArrayList;
12 import java.util.Collection;
13 import java.util.Collections;
14 import java.util.HashSet;
15 import java.util.List;
16 import java.util.concurrent.BlockingQueue;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.CancellationException;
19 import java.util.concurrent.CountDownLatch;
20 import java.util.concurrent.Delayed;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.RejectedExecutionException;
25 import java.util.concurrent.RejectedExecutionHandler;
26 import java.util.concurrent.RunnableScheduledFuture;
27 import java.util.concurrent.ScheduledFuture;
28 import java.util.concurrent.ScheduledThreadPoolExecutor;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.ThreadLocalRandom;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeoutException;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicBoolean;
35 import java.util.concurrent.atomic.AtomicInteger;
36 import java.util.concurrent.atomic.AtomicLong;
37 import java.util.stream.Stream;
38
39 import junit.framework.Test;
40 import junit.framework.TestSuite;
41
42 public class ScheduledExecutorSubclassTest extends JSR166TestCase {
43 public static void main(String[] args) {
44 main(suite(), args);
45 }
46 public static Test suite() {
47 return new TestSuite(ScheduledExecutorSubclassTest.class);
48 }
49
50 static class CustomTask<V> implements RunnableScheduledFuture<V> {
51 private final RunnableScheduledFuture<V> task;
52 volatile boolean ran;
53 CustomTask(RunnableScheduledFuture<V> task) { this.task = task; }
54 public boolean isPeriodic() { return task.isPeriodic(); }
55 public void run() {
56 ran = true;
57 task.run();
58 }
59 public long getDelay(TimeUnit unit) { return task.getDelay(unit); }
60 public int compareTo(Delayed t) {
61 return task.compareTo(((CustomTask)t).task);
62 }
63 public boolean cancel(boolean mayInterruptIfRunning) {
64 return task.cancel(mayInterruptIfRunning);
65 }
66 public boolean isCancelled() { return task.isCancelled(); }
67 public boolean isDone() { return task.isDone(); }
68 public V get() throws InterruptedException, ExecutionException {
69 V v = task.get();
70 assertTrue(ran);
71 return v;
72 }
73 public V get(long time, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
74 V v = task.get(time, unit);
75 assertTrue(ran);
76 return v;
77 }
78 }
79
80 public class CustomExecutor extends ScheduledThreadPoolExecutor {
81
82 protected <V> RunnableScheduledFuture<V> decorateTask(Runnable r, RunnableScheduledFuture<V> task) {
83 return new CustomTask<V>(task);
84 }
85
86 protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> c, RunnableScheduledFuture<V> task) {
87 return new CustomTask<V>(task);
88 }
89 CustomExecutor(int corePoolSize) { super(corePoolSize); }
90 CustomExecutor(int corePoolSize, RejectedExecutionHandler handler) {
91 super(corePoolSize, handler);
92 }
93
94 CustomExecutor(int corePoolSize, ThreadFactory threadFactory) {
95 super(corePoolSize, threadFactory);
96 }
97 CustomExecutor(int corePoolSize, ThreadFactory threadFactory,
98 RejectedExecutionHandler handler) {
99 super(corePoolSize, threadFactory, handler);
100 }
101
102 }
103
104 /**
105 * execute successfully executes a runnable
106 */
107 public void testExecute() throws InterruptedException {
108 final CustomExecutor p = new CustomExecutor(1);
109 try (PoolCleaner cleaner = cleaner(p)) {
110 final CountDownLatch done = new CountDownLatch(1);
111 final Runnable task = new CheckedRunnable() {
112 public void realRun() { done.countDown(); }};
113 p.execute(task);
114 await(done);
115 }
116 }
117
118 /**
119 * delayed schedule of callable successfully executes after delay
120 */
121 public void testSchedule1() throws Exception {
122 final CountDownLatch done = new CountDownLatch(1);
123 final CustomExecutor p = new CustomExecutor(1);
124 try (PoolCleaner cleaner = cleaner(p, done)) {
125 final long startTime = System.nanoTime();
126 Callable task = new CheckedCallable<Boolean>() {
127 public Boolean realCall() {
128 done.countDown();
129 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
130 return Boolean.TRUE;
131 }};
132 Future f = p.schedule(task, timeoutMillis(), MILLISECONDS);
133 assertSame(Boolean.TRUE, f.get());
134 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
135 }
136 }
137
138 /**
139 * delayed schedule of runnable successfully executes after delay
140 */
141 public void testSchedule3() throws Exception {
142 final CustomExecutor p = new CustomExecutor(1);
143 try (PoolCleaner cleaner = cleaner(p)) {
144 final long startTime = System.nanoTime();
145 final CountDownLatch done = new CountDownLatch(1);
146 Runnable task = new CheckedRunnable() {
147 public void realRun() {
148 done.countDown();
149 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
150 }};
151 Future f = p.schedule(task, timeoutMillis(), MILLISECONDS);
152 await(done);
153 assertNull(f.get(LONG_DELAY_MS, MILLISECONDS));
154 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
155 }
156 }
157
158 /**
159 * scheduleAtFixedRate executes runnable after given initial delay
160 */
161 public void testSchedule4() throws InterruptedException {
162 final CustomExecutor p = new CustomExecutor(1);
163 try (PoolCleaner cleaner = cleaner(p)) {
164 final long startTime = System.nanoTime();
165 final CountDownLatch done = new CountDownLatch(1);
166 Runnable task = new CheckedRunnable() {
167 public void realRun() {
168 done.countDown();
169 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
170 }};
171 ScheduledFuture f =
172 p.scheduleAtFixedRate(task, timeoutMillis(),
173 LONG_DELAY_MS, MILLISECONDS);
174 await(done);
175 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
176 f.cancel(true);
177 }
178 }
179
180 /**
181 * scheduleWithFixedDelay executes runnable after given initial delay
182 */
183 public void testSchedule5() throws InterruptedException {
184 final CustomExecutor p = new CustomExecutor(1);
185 try (PoolCleaner cleaner = cleaner(p)) {
186 final long startTime = System.nanoTime();
187 final CountDownLatch done = new CountDownLatch(1);
188 Runnable task = new CheckedRunnable() {
189 public void realRun() {
190 done.countDown();
191 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
192 }};
193 ScheduledFuture f =
194 p.scheduleWithFixedDelay(task, timeoutMillis(),
195 LONG_DELAY_MS, MILLISECONDS);
196 await(done);
197 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
198 f.cancel(true);
199 }
200 }
201
202 static class RunnableCounter implements Runnable {
203 AtomicInteger count = new AtomicInteger(0);
204 public void run() { count.getAndIncrement(); }
205 }
206
207 /**
208 * scheduleAtFixedRate executes series of tasks at given rate.
209 * Eventually, it must hold that:
210 * cycles - 1 <= elapsedMillis/delay < cycles
211 */
212 public void testFixedRateSequence() throws InterruptedException {
213 final CustomExecutor p = new CustomExecutor(1);
214 try (PoolCleaner cleaner = cleaner(p)) {
215 for (int delay = 1; delay <= LONG_DELAY_MS; delay *= 3) {
216 final long startTime = System.nanoTime();
217 final int cycles = 8;
218 final CountDownLatch done = new CountDownLatch(cycles);
219 final Runnable task = new CheckedRunnable() {
220 public void realRun() { done.countDown(); }};
221 final ScheduledFuture periodicTask =
222 p.scheduleAtFixedRate(task, 0, delay, MILLISECONDS);
223 final int totalDelayMillis = (cycles - 1) * delay;
224 await(done, totalDelayMillis + LONG_DELAY_MS);
225 periodicTask.cancel(true);
226 final long elapsedMillis = millisElapsedSince(startTime);
227 assertTrue(elapsedMillis >= totalDelayMillis);
228 if (elapsedMillis <= cycles * delay)
229 return;
230 // else retry with longer delay
231 }
232 fail("unexpected execution rate");
233 }
234 }
235
236 /**
237 * scheduleWithFixedDelay executes series of tasks with given period.
238 * Eventually, it must hold that each task starts at least delay and at
239 * most 2 * delay after the termination of the previous task.
240 */
241 public void testFixedDelaySequence() throws InterruptedException {
242 final CustomExecutor p = new CustomExecutor(1);
243 try (PoolCleaner cleaner = cleaner(p)) {
244 for (int delay = 1; delay <= LONG_DELAY_MS; delay *= 3) {
245 final long startTime = System.nanoTime();
246 final AtomicLong previous = new AtomicLong(startTime);
247 final AtomicBoolean tryLongerDelay = new AtomicBoolean(false);
248 final int cycles = 8;
249 final CountDownLatch done = new CountDownLatch(cycles);
250 final int d = delay;
251 final Runnable task = new CheckedRunnable() {
252 public void realRun() {
253 long now = System.nanoTime();
254 long elapsedMillis
255 = NANOSECONDS.toMillis(now - previous.get());
256 if (done.getCount() == cycles) { // first execution
257 if (elapsedMillis >= d)
258 tryLongerDelay.set(true);
259 } else {
260 assertTrue(elapsedMillis >= d);
261 if (elapsedMillis >= 2 * d)
262 tryLongerDelay.set(true);
263 }
264 previous.set(now);
265 done.countDown();
266 }};
267 final ScheduledFuture periodicTask =
268 p.scheduleWithFixedDelay(task, 0, delay, MILLISECONDS);
269 final int totalDelayMillis = (cycles - 1) * delay;
270 await(done, totalDelayMillis + cycles * LONG_DELAY_MS);
271 periodicTask.cancel(true);
272 final long elapsedMillis = millisElapsedSince(startTime);
273 assertTrue(elapsedMillis >= totalDelayMillis);
274 if (!tryLongerDelay.get())
275 return;
276 // else retry with longer delay
277 }
278 fail("unexpected execution rate");
279 }
280 }
281
282 /**
283 * execute(null) throws NPE
284 */
285 public void testExecuteNull() throws InterruptedException {
286 final CustomExecutor p = new CustomExecutor(1);
287 try (PoolCleaner cleaner = cleaner(p)) {
288 try {
289 p.execute(null);
290 shouldThrow();
291 } catch (NullPointerException success) {}
292 }
293 }
294
295 /**
296 * schedule(null) throws NPE
297 */
298 public void testScheduleNull() throws InterruptedException {
299 final CustomExecutor p = new CustomExecutor(1);
300 try (PoolCleaner cleaner = cleaner(p)) {
301 try {
302 Future f = p.schedule((Callable)null,
303 randomTimeout(), randomTimeUnit());
304 shouldThrow();
305 } catch (NullPointerException success) {}
306 }
307 }
308
309 /**
310 * execute throws RejectedExecutionException if shutdown
311 */
312 public void testSchedule1_RejectedExecutionException() {
313 final CustomExecutor p = new CustomExecutor(1);
314 try (PoolCleaner cleaner = cleaner(p)) {
315 try {
316 p.shutdown();
317 p.schedule(new NoOpRunnable(),
318 randomTimeout(), randomTimeUnit());
319 shouldThrow();
320 } catch (RejectedExecutionException success) {
321 } catch (SecurityException ok) {}
322 }
323 }
324
325 /**
326 * schedule throws RejectedExecutionException if shutdown
327 */
328 public void testSchedule2_RejectedExecutionException() {
329 final CustomExecutor p = new CustomExecutor(1);
330 try (PoolCleaner cleaner = cleaner(p)) {
331 try {
332 p.shutdown();
333 p.schedule(new NoOpCallable(),
334 randomTimeout(), randomTimeUnit());
335 shouldThrow();
336 } catch (RejectedExecutionException success) {
337 } catch (SecurityException ok) {}
338 }
339 }
340
341 /**
342 * schedule callable throws RejectedExecutionException if shutdown
343 */
344 public void testSchedule3_RejectedExecutionException() {
345 final CustomExecutor p = new CustomExecutor(1);
346 try (PoolCleaner cleaner = cleaner(p)) {
347 try {
348 p.shutdown();
349 p.schedule(new NoOpCallable(),
350 randomTimeout(), randomTimeUnit());
351 shouldThrow();
352 } catch (RejectedExecutionException success) {
353 } catch (SecurityException ok) {}
354 }
355 }
356
357 /**
358 * scheduleAtFixedRate throws RejectedExecutionException if shutdown
359 */
360 public void testScheduleAtFixedRate1_RejectedExecutionException() {
361 final CustomExecutor p = new CustomExecutor(1);
362 try (PoolCleaner cleaner = cleaner(p)) {
363 try {
364 p.shutdown();
365 p.scheduleAtFixedRate(new NoOpRunnable(),
366 MEDIUM_DELAY_MS, MEDIUM_DELAY_MS, MILLISECONDS);
367 shouldThrow();
368 } catch (RejectedExecutionException success) {
369 } catch (SecurityException ok) {}
370 }
371 }
372
373 /**
374 * scheduleWithFixedDelay throws RejectedExecutionException if shutdown
375 */
376 public void testScheduleWithFixedDelay1_RejectedExecutionException() {
377 final CustomExecutor p = new CustomExecutor(1);
378 try (PoolCleaner cleaner = cleaner(p)) {
379 try {
380 p.shutdown();
381 p.scheduleWithFixedDelay(new NoOpRunnable(),
382 MEDIUM_DELAY_MS, MEDIUM_DELAY_MS, MILLISECONDS);
383 shouldThrow();
384 } catch (RejectedExecutionException success) {
385 } catch (SecurityException ok) {}
386 }
387 }
388
389 /**
390 * getActiveCount increases but doesn't overestimate, when a
391 * thread becomes active
392 */
393 public void testGetActiveCount() throws InterruptedException {
394 final CountDownLatch done = new CountDownLatch(1);
395 final ThreadPoolExecutor p = new CustomExecutor(2);
396 try (PoolCleaner cleaner = cleaner(p, done)) {
397 final CountDownLatch threadStarted = new CountDownLatch(1);
398 assertEquals(0, p.getActiveCount());
399 p.execute(new CheckedRunnable() {
400 public void realRun() throws InterruptedException {
401 threadStarted.countDown();
402 assertEquals(1, p.getActiveCount());
403 await(done);
404 }});
405 await(threadStarted);
406 assertEquals(1, p.getActiveCount());
407 }
408 }
409
410 /**
411 * getCompletedTaskCount increases, but doesn't overestimate,
412 * when tasks complete
413 */
414 public void testGetCompletedTaskCount() throws InterruptedException {
415 final ThreadPoolExecutor p = new CustomExecutor(2);
416 try (PoolCleaner cleaner = cleaner(p)) {
417 final CountDownLatch threadStarted = new CountDownLatch(1);
418 final CountDownLatch threadProceed = new CountDownLatch(1);
419 final CountDownLatch threadDone = new CountDownLatch(1);
420 assertEquals(0, p.getCompletedTaskCount());
421 p.execute(new CheckedRunnable() {
422 public void realRun() throws InterruptedException {
423 threadStarted.countDown();
424 assertEquals(0, p.getCompletedTaskCount());
425 await(threadProceed);
426 threadDone.countDown();
427 }});
428 await(threadStarted);
429 assertEquals(0, p.getCompletedTaskCount());
430 threadProceed.countDown();
431 await(threadDone);
432 long startTime = System.nanoTime();
433 while (p.getCompletedTaskCount() != 1) {
434 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
435 fail("timed out");
436 Thread.yield();
437 }
438 }
439 }
440
441 /**
442 * getCorePoolSize returns size given in constructor if not otherwise set
443 */
444 public void testGetCorePoolSize() {
445 final CustomExecutor p = new CustomExecutor(1);
446 try (PoolCleaner cleaner = cleaner(p)) {
447 assertEquals(1, p.getCorePoolSize());
448 }
449 }
450
451 /**
452 * getLargestPoolSize increases, but doesn't overestimate, when
453 * multiple threads active
454 */
455 public void testGetLargestPoolSize() throws InterruptedException {
456 final int THREADS = 3;
457 final CountDownLatch done = new CountDownLatch(1);
458 final ThreadPoolExecutor p = new CustomExecutor(THREADS);
459 try (PoolCleaner cleaner = cleaner(p, done)) {
460 final CountDownLatch threadsStarted = new CountDownLatch(THREADS);
461 assertEquals(0, p.getLargestPoolSize());
462 for (int i = 0; i < THREADS; i++)
463 p.execute(new CheckedRunnable() {
464 public void realRun() throws InterruptedException {
465 threadsStarted.countDown();
466 await(done);
467 assertEquals(THREADS, p.getLargestPoolSize());
468 }});
469 await(threadsStarted);
470 assertEquals(THREADS, p.getLargestPoolSize());
471 }
472 assertEquals(THREADS, p.getLargestPoolSize());
473 }
474
475 /**
476 * getPoolSize increases, but doesn't overestimate, when threads
477 * become active
478 */
479 public void testGetPoolSize() throws InterruptedException {
480 final CountDownLatch done = new CountDownLatch(1);
481 final ThreadPoolExecutor p = new CustomExecutor(1);
482 try (PoolCleaner cleaner = cleaner(p, done)) {
483 final CountDownLatch threadStarted = new CountDownLatch(1);
484 assertEquals(0, p.getPoolSize());
485 p.execute(new CheckedRunnable() {
486 public void realRun() throws InterruptedException {
487 threadStarted.countDown();
488 assertEquals(1, p.getPoolSize());
489 await(done);
490 }});
491 await(threadStarted);
492 assertEquals(1, p.getPoolSize());
493 }
494 }
495
496 /**
497 * getTaskCount increases, but doesn't overestimate, when tasks
498 * submitted
499 */
500 public void testGetTaskCount() throws InterruptedException {
501 final int TASKS = 3;
502 final CountDownLatch done = new CountDownLatch(1);
503 final ThreadPoolExecutor p = new CustomExecutor(1);
504 try (PoolCleaner cleaner = cleaner(p, done)) {
505 final CountDownLatch threadStarted = new CountDownLatch(1);
506 assertEquals(0, p.getTaskCount());
507 assertEquals(0, p.getCompletedTaskCount());
508 p.execute(new CheckedRunnable() {
509 public void realRun() throws InterruptedException {
510 threadStarted.countDown();
511 await(done);
512 }});
513 await(threadStarted);
514 assertEquals(1, p.getTaskCount());
515 assertEquals(0, p.getCompletedTaskCount());
516 for (int i = 0; i < TASKS; i++) {
517 assertEquals(1 + i, p.getTaskCount());
518 p.execute(new CheckedRunnable() {
519 public void realRun() throws InterruptedException {
520 threadStarted.countDown();
521 assertEquals(1 + TASKS, p.getTaskCount());
522 await(done);
523 }});
524 }
525 assertEquals(1 + TASKS, p.getTaskCount());
526 assertEquals(0, p.getCompletedTaskCount());
527 }
528 assertEquals(1 + TASKS, p.getTaskCount());
529 assertEquals(1 + TASKS, p.getCompletedTaskCount());
530 }
531
532 /**
533 * getThreadFactory returns factory in constructor if not set
534 */
535 public void testGetThreadFactory() {
536 final ThreadFactory threadFactory = new SimpleThreadFactory();
537 final CustomExecutor p = new CustomExecutor(1, threadFactory);
538 try (PoolCleaner cleaner = cleaner(p)) {
539 assertSame(threadFactory, p.getThreadFactory());
540 }
541 }
542
543 /**
544 * setThreadFactory sets the thread factory returned by getThreadFactory
545 */
546 public void testSetThreadFactory() {
547 final ThreadFactory threadFactory = new SimpleThreadFactory();
548 final CustomExecutor p = new CustomExecutor(1);
549 try (PoolCleaner cleaner = cleaner(p)) {
550 p.setThreadFactory(threadFactory);
551 assertSame(threadFactory, p.getThreadFactory());
552 }
553 }
554
555 /**
556 * setThreadFactory(null) throws NPE
557 */
558 public void testSetThreadFactoryNull() {
559 final CustomExecutor p = new CustomExecutor(1);
560 try (PoolCleaner cleaner = cleaner(p)) {
561 try {
562 p.setThreadFactory(null);
563 shouldThrow();
564 } catch (NullPointerException success) {}
565 }
566 }
567
568 /**
569 * isShutdown is false before shutdown, true after
570 */
571 public void testIsShutdown() {
572 final CustomExecutor p = new CustomExecutor(1);
573 try (PoolCleaner cleaner = cleaner(p)) {
574 assertFalse(p.isShutdown());
575 try { p.shutdown(); } catch (SecurityException ok) { return; }
576 assertTrue(p.isShutdown());
577 }
578 }
579
580 /**
581 * isTerminated is false before termination, true after
582 */
583 public void testIsTerminated() throws InterruptedException {
584 final CountDownLatch done = new CountDownLatch(1);
585 final ThreadPoolExecutor p = new CustomExecutor(1);
586 try (PoolCleaner cleaner = cleaner(p)) {
587 final CountDownLatch threadStarted = new CountDownLatch(1);
588 p.execute(new CheckedRunnable() {
589 public void realRun() throws InterruptedException {
590 assertFalse(p.isTerminated());
591 threadStarted.countDown();
592 await(done);
593 }});
594 await(threadStarted);
595 assertFalse(p.isTerminated());
596 assertFalse(p.isTerminating());
597 done.countDown();
598 try { p.shutdown(); } catch (SecurityException ok) { return; }
599 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
600 assertTrue(p.isTerminated());
601 }
602 }
603
604 /**
605 * isTerminating is not true when running or when terminated
606 */
607 public void testIsTerminating() throws InterruptedException {
608 final CountDownLatch done = new CountDownLatch(1);
609 final ThreadPoolExecutor p = new CustomExecutor(1);
610 try (PoolCleaner cleaner = cleaner(p)) {
611 final CountDownLatch threadStarted = new CountDownLatch(1);
612 assertFalse(p.isTerminating());
613 p.execute(new CheckedRunnable() {
614 public void realRun() throws InterruptedException {
615 assertFalse(p.isTerminating());
616 threadStarted.countDown();
617 await(done);
618 }});
619 await(threadStarted);
620 assertFalse(p.isTerminating());
621 done.countDown();
622 try { p.shutdown(); } catch (SecurityException ok) { return; }
623 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
624 assertTrue(p.isTerminated());
625 assertFalse(p.isTerminating());
626 }
627 }
628
629 /**
630 * getQueue returns the work queue, which contains queued tasks
631 */
632 public void testGetQueue() throws InterruptedException {
633 final CountDownLatch done = new CountDownLatch(1);
634 final ScheduledThreadPoolExecutor p = new CustomExecutor(1);
635 try (PoolCleaner cleaner = cleaner(p, done)) {
636 final CountDownLatch threadStarted = new CountDownLatch(1);
637 ScheduledFuture[] tasks = new ScheduledFuture[5];
638 for (int i = 0; i < tasks.length; i++) {
639 Runnable r = new CheckedRunnable() {
640 public void realRun() throws InterruptedException {
641 threadStarted.countDown();
642 await(done);
643 }};
644 tasks[i] = p.schedule(r, 1, MILLISECONDS);
645 }
646 await(threadStarted);
647 BlockingQueue<Runnable> q = p.getQueue();
648 assertTrue(q.contains(tasks[tasks.length - 1]));
649 assertFalse(q.contains(tasks[0]));
650 }
651 }
652
653 /**
654 * remove(task) removes queued task, and fails to remove active task
655 */
656 public void testRemove() throws InterruptedException {
657 final CountDownLatch done = new CountDownLatch(1);
658 final ScheduledThreadPoolExecutor p = new CustomExecutor(1);
659 try (PoolCleaner cleaner = cleaner(p, done)) {
660 ScheduledFuture[] tasks = new ScheduledFuture[5];
661 final CountDownLatch threadStarted = new CountDownLatch(1);
662 for (int i = 0; i < tasks.length; i++) {
663 Runnable r = new CheckedRunnable() {
664 public void realRun() throws InterruptedException {
665 threadStarted.countDown();
666 await(done);
667 }};
668 tasks[i] = p.schedule(r, 1, MILLISECONDS);
669 }
670 await(threadStarted);
671 BlockingQueue<Runnable> q = p.getQueue();
672 assertFalse(p.remove((Runnable)tasks[0]));
673 assertTrue(q.contains((Runnable)tasks[4]));
674 assertTrue(q.contains((Runnable)tasks[3]));
675 assertTrue(p.remove((Runnable)tasks[4]));
676 assertFalse(p.remove((Runnable)tasks[4]));
677 assertFalse(q.contains((Runnable)tasks[4]));
678 assertTrue(q.contains((Runnable)tasks[3]));
679 assertTrue(p.remove((Runnable)tasks[3]));
680 assertFalse(q.contains((Runnable)tasks[3]));
681 }
682 }
683
684 /**
685 * purge removes cancelled tasks from the queue
686 */
687 public void testPurge() throws InterruptedException {
688 final ScheduledFuture[] tasks = new ScheduledFuture[5];
689 final Runnable releaser = new Runnable() { public void run() {
690 for (ScheduledFuture task : tasks)
691 if (task != null) task.cancel(true); }};
692 final CustomExecutor p = new CustomExecutor(1);
693 try (PoolCleaner cleaner = cleaner(p, releaser)) {
694 for (int i = 0; i < tasks.length; i++)
695 tasks[i] = p.schedule(new SmallPossiblyInterruptedRunnable(),
696 LONG_DELAY_MS, MILLISECONDS);
697 int max = tasks.length;
698 if (tasks[4].cancel(true)) --max;
699 if (tasks[3].cancel(true)) --max;
700 // There must eventually be an interference-free point at
701 // which purge will not fail. (At worst, when queue is empty.)
702 long startTime = System.nanoTime();
703 do {
704 p.purge();
705 long count = p.getTaskCount();
706 if (count == max)
707 return;
708 } while (millisElapsedSince(startTime) < LONG_DELAY_MS);
709 fail("Purge failed to remove cancelled tasks");
710 }
711 }
712
713 /**
714 * shutdownNow returns a list containing tasks that were not run,
715 * and those tasks are drained from the queue
716 */
717 public void testShutdownNow() throws InterruptedException {
718 final int poolSize = 2;
719 final int count = 5;
720 final AtomicInteger ran = new AtomicInteger(0);
721 final CustomExecutor p = new CustomExecutor(poolSize);
722 final CountDownLatch threadsStarted = new CountDownLatch(poolSize);
723 Runnable waiter = new CheckedRunnable() { public void realRun() {
724 threadsStarted.countDown();
725 try {
726 MILLISECONDS.sleep(2 * LONG_DELAY_MS);
727 } catch (InterruptedException success) {}
728 ran.getAndIncrement();
729 }};
730 for (int i = 0; i < count; i++)
731 p.execute(waiter);
732 await(threadsStarted);
733 assertEquals(poolSize, p.getActiveCount());
734 assertEquals(0, p.getCompletedTaskCount());
735 final List<Runnable> queuedTasks;
736 try {
737 queuedTasks = p.shutdownNow();
738 } catch (SecurityException ok) {
739 return; // Allowed in case test doesn't have privs
740 }
741 assertTrue(p.isShutdown());
742 assertTrue(p.getQueue().isEmpty());
743 assertEquals(count - poolSize, queuedTasks.size());
744 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
745 assertTrue(p.isTerminated());
746 assertEquals(poolSize, ran.get());
747 assertEquals(poolSize, p.getCompletedTaskCount());
748 }
749
750 /**
751 * shutdownNow returns a list containing tasks that were not run,
752 * and those tasks are drained from the queue
753 */
754 public void testShutdownNow_delayedTasks() throws InterruptedException {
755 final CustomExecutor p = new CustomExecutor(1);
756 List<ScheduledFuture> tasks = new ArrayList<>();
757 for (int i = 0; i < 3; i++) {
758 Runnable r = new NoOpRunnable();
759 tasks.add(p.schedule(r, 9, SECONDS));
760 tasks.add(p.scheduleAtFixedRate(r, 9, 9, SECONDS));
761 tasks.add(p.scheduleWithFixedDelay(r, 9, 9, SECONDS));
762 }
763 if (testImplementationDetails)
764 assertEquals(new HashSet(tasks), new HashSet(p.getQueue()));
765 final List<Runnable> queuedTasks;
766 try {
767 queuedTasks = p.shutdownNow();
768 } catch (SecurityException ok) {
769 return; // Allowed in case test doesn't have privs
770 }
771 assertTrue(p.isShutdown());
772 assertTrue(p.getQueue().isEmpty());
773 if (testImplementationDetails)
774 assertEquals(new HashSet(tasks), new HashSet(queuedTasks));
775 assertEquals(tasks.size(), queuedTasks.size());
776 for (ScheduledFuture task : tasks) {
777 assertFalse(((CustomTask)task).ran);
778 assertFalse(task.isDone());
779 assertFalse(task.isCancelled());
780 }
781 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
782 assertTrue(p.isTerminated());
783 }
784
785 /**
786 * By default, periodic tasks are cancelled at shutdown.
787 * By default, delayed tasks keep running after shutdown.
788 * Check that changing the default values work:
789 * - setExecuteExistingDelayedTasksAfterShutdownPolicy
790 * - setContinueExistingPeriodicTasksAfterShutdownPolicy
791 */
792 public void testShutdown_cancellation() throws Exception {
793 final int poolSize = 4;
794 final CustomExecutor p = new CustomExecutor(poolSize);
795 final BlockingQueue<Runnable> q = p.getQueue();
796 final ThreadLocalRandom rnd = ThreadLocalRandom.current();
797 final long delay = rnd.nextInt(2);
798 final int rounds = rnd.nextInt(1, 3);
799 final boolean effectiveDelayedPolicy;
800 final boolean effectivePeriodicPolicy;
801 final boolean effectiveRemovePolicy;
802
803 if (rnd.nextBoolean())
804 p.setExecuteExistingDelayedTasksAfterShutdownPolicy(
805 effectiveDelayedPolicy = rnd.nextBoolean());
806 else
807 effectiveDelayedPolicy = true;
808 assertEquals(effectiveDelayedPolicy,
809 p.getExecuteExistingDelayedTasksAfterShutdownPolicy());
810
811 if (rnd.nextBoolean())
812 p.setContinueExistingPeriodicTasksAfterShutdownPolicy(
813 effectivePeriodicPolicy = rnd.nextBoolean());
814 else
815 effectivePeriodicPolicy = false;
816 assertEquals(effectivePeriodicPolicy,
817 p.getContinueExistingPeriodicTasksAfterShutdownPolicy());
818
819 if (rnd.nextBoolean())
820 p.setRemoveOnCancelPolicy(
821 effectiveRemovePolicy = rnd.nextBoolean());
822 else
823 effectiveRemovePolicy = false;
824 assertEquals(effectiveRemovePolicy,
825 p.getRemoveOnCancelPolicy());
826
827 final boolean periodicTasksContinue = effectivePeriodicPolicy && rnd.nextBoolean();
828
829 // Strategy: Wedge the pool with one wave of "blocker" tasks,
830 // then add a second wave that waits in the queue until unblocked.
831 final AtomicInteger ran = new AtomicInteger(0);
832 final CountDownLatch poolBlocked = new CountDownLatch(poolSize);
833 final CountDownLatch unblock = new CountDownLatch(1);
834 final RuntimeException exception = new RuntimeException();
835
836 class Task implements Runnable {
837 public void run() {
838 try {
839 ran.getAndIncrement();
840 poolBlocked.countDown();
841 await(unblock);
842 } catch (Throwable fail) { threadUnexpectedException(fail); }
843 }
844 }
845
846 class PeriodicTask extends Task {
847 PeriodicTask(int rounds) { this.rounds = rounds; }
848 int rounds;
849 public void run() {
850 if (--rounds == 0) super.run();
851 // throw exception to surely terminate this periodic task,
852 // but in a separate execution and in a detectable way.
853 if (rounds == -1) throw exception;
854 }
855 }
856
857 Runnable task = new Task();
858
859 List<Future<?>> immediates = new ArrayList<>();
860 List<Future<?>> delayeds = new ArrayList<>();
861 List<Future<?>> periodics = new ArrayList<>();
862
863 immediates.add(p.submit(task));
864 delayeds.add(p.schedule(task, delay, MILLISECONDS));
865 periodics.add(p.scheduleAtFixedRate(
866 new PeriodicTask(rounds), delay, 1, MILLISECONDS));
867 periodics.add(p.scheduleWithFixedDelay(
868 new PeriodicTask(rounds), delay, 1, MILLISECONDS));
869
870 await(poolBlocked);
871
872 assertEquals(poolSize, ran.get());
873 assertEquals(poolSize, p.getActiveCount());
874 assertTrue(q.isEmpty());
875
876 // Add second wave of tasks.
877 immediates.add(p.submit(task));
878 delayeds.add(p.schedule(task, effectiveDelayedPolicy ? delay : LONG_DELAY_MS, MILLISECONDS));
879 periodics.add(p.scheduleAtFixedRate(
880 new PeriodicTask(rounds), delay, 1, MILLISECONDS));
881 periodics.add(p.scheduleWithFixedDelay(
882 new PeriodicTask(rounds), delay, 1, MILLISECONDS));
883
884 assertEquals(poolSize, q.size());
885 assertEquals(poolSize, ran.get());
886
887 immediates.forEach(
888 f -> assertTrue(((ScheduledFuture)f).getDelay(NANOSECONDS) <= 0L));
889
890 Stream.of(immediates, delayeds, periodics).flatMap(c -> c.stream())
891 .forEach(f -> assertFalse(f.isDone()));
892
893 try { p.shutdown(); } catch (SecurityException ok) { return; }
894 assertTrue(p.isShutdown());
895 assertTrue(p.isTerminating());
896 assertFalse(p.isTerminated());
897
898 if (rnd.nextBoolean())
899 assertThrows(
900 RejectedExecutionException.class,
901 () -> p.submit(task),
902 () -> p.schedule(task, 1, SECONDS),
903 () -> p.scheduleAtFixedRate(
904 new PeriodicTask(1), 1, 1, SECONDS),
905 () -> p.scheduleWithFixedDelay(
906 new PeriodicTask(2), 1, 1, SECONDS));
907
908 assertTrue(q.contains(immediates.get(1)));
909 assertTrue(!effectiveDelayedPolicy
910 ^ q.contains(delayeds.get(1)));
911 assertTrue(!effectivePeriodicPolicy
912 ^ q.containsAll(periodics.subList(2, 4)));
913
914 immediates.forEach(f -> assertFalse(f.isDone()));
915
916 assertFalse(delayeds.get(0).isDone());
917 if (effectiveDelayedPolicy)
918 assertFalse(delayeds.get(1).isDone());
919 else
920 assertTrue(delayeds.get(1).isCancelled());
921
922 if (effectivePeriodicPolicy)
923 periodics.forEach(
924 f -> {
925 assertFalse(f.isDone());
926 if (!periodicTasksContinue) {
927 assertTrue(f.cancel(false));
928 assertTrue(f.isCancelled());
929 }
930 });
931 else {
932 periodics.subList(0, 2).forEach(f -> assertFalse(f.isDone()));
933 periodics.subList(2, 4).forEach(f -> assertTrue(f.isCancelled()));
934 }
935
936 unblock.countDown(); // Release all pool threads
937
938 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
939 assertFalse(p.isTerminating());
940 assertTrue(p.isTerminated());
941
942 assertTrue(q.isEmpty());
943
944 Stream.of(immediates, delayeds, periodics).flatMap(c -> c.stream())
945 .forEach(f -> assertTrue(f.isDone()));
946
947 for (Future<?> f : immediates) assertNull(f.get());
948
949 assertNull(delayeds.get(0).get());
950 if (effectiveDelayedPolicy)
951 assertNull(delayeds.get(1).get());
952 else
953 assertTrue(delayeds.get(1).isCancelled());
954
955 if (periodicTasksContinue)
956 periodics.forEach(
957 f -> {
958 try { f.get(); }
959 catch (ExecutionException success) {
960 assertSame(exception, success.getCause());
961 }
962 catch (Throwable fail) { threadUnexpectedException(fail); }
963 });
964 else
965 periodics.forEach(f -> assertTrue(f.isCancelled()));
966
967 assertEquals(poolSize + 1
968 + (effectiveDelayedPolicy ? 1 : 0)
969 + (periodicTasksContinue ? 2 : 0),
970 ran.get());
971 }
972
973 /**
974 * completed submit of callable returns result
975 */
976 public void testSubmitCallable() throws Exception {
977 final ExecutorService e = new CustomExecutor(2);
978 try (PoolCleaner cleaner = cleaner(e)) {
979 Future<String> future = e.submit(new StringTask());
980 String result = future.get();
981 assertSame(TEST_STRING, result);
982 }
983 }
984
985 /**
986 * completed submit of runnable returns successfully
987 */
988 public void testSubmitRunnable() throws Exception {
989 final ExecutorService e = new CustomExecutor(2);
990 try (PoolCleaner cleaner = cleaner(e)) {
991 Future<?> future = e.submit(new NoOpRunnable());
992 future.get();
993 assertTrue(future.isDone());
994 }
995 }
996
997 /**
998 * completed submit of (runnable, result) returns result
999 */
1000 public void testSubmitRunnable2() throws Exception {
1001 final ExecutorService e = new CustomExecutor(2);
1002 try (PoolCleaner cleaner = cleaner(e)) {
1003 Future<String> future = e.submit(new NoOpRunnable(), TEST_STRING);
1004 String result = future.get();
1005 assertSame(TEST_STRING, result);
1006 }
1007 }
1008
1009 /**
1010 * invokeAny(null) throws NPE
1011 */
1012 public void testInvokeAny1() throws Exception {
1013 final ExecutorService e = new CustomExecutor(2);
1014 try (PoolCleaner cleaner = cleaner(e)) {
1015 try {
1016 e.invokeAny(null);
1017 shouldThrow();
1018 } catch (NullPointerException success) {}
1019 }
1020 }
1021
1022 /**
1023 * invokeAny(empty collection) throws IllegalArgumentException
1024 */
1025 public void testInvokeAny2() throws Exception {
1026 final ExecutorService e = new CustomExecutor(2);
1027 try (PoolCleaner cleaner = cleaner(e)) {
1028 try {
1029 e.invokeAny(new ArrayList<Callable<String>>());
1030 shouldThrow();
1031 } catch (IllegalArgumentException success) {}
1032 }
1033 }
1034
1035 /**
1036 * invokeAny(c) throws NPE if c has null elements
1037 */
1038 public void testInvokeAny3() throws Exception {
1039 final CountDownLatch latch = new CountDownLatch(1);
1040 final ExecutorService e = new CustomExecutor(2);
1041 try (PoolCleaner cleaner = cleaner(e)) {
1042 List<Callable<String>> l = new ArrayList<>();
1043 l.add(latchAwaitingStringTask(latch));
1044 l.add(null);
1045 try {
1046 e.invokeAny(l);
1047 shouldThrow();
1048 } catch (NullPointerException success) {}
1049 latch.countDown();
1050 }
1051 }
1052
1053 /**
1054 * invokeAny(c) throws ExecutionException if no task completes
1055 */
1056 public void testInvokeAny4() throws Exception {
1057 final ExecutorService e = new CustomExecutor(2);
1058 try (PoolCleaner cleaner = cleaner(e)) {
1059 List<Callable<String>> l = new ArrayList<>();
1060 l.add(new NPETask());
1061 try {
1062 e.invokeAny(l);
1063 shouldThrow();
1064 } catch (ExecutionException success) {
1065 assertTrue(success.getCause() instanceof NullPointerException);
1066 }
1067 }
1068 }
1069
1070 /**
1071 * invokeAny(c) returns result of some task
1072 */
1073 public void testInvokeAny5() throws Exception {
1074 final ExecutorService e = new CustomExecutor(2);
1075 try (PoolCleaner cleaner = cleaner(e)) {
1076 List<Callable<String>> l = new ArrayList<>();
1077 l.add(new StringTask());
1078 l.add(new StringTask());
1079 String result = e.invokeAny(l);
1080 assertSame(TEST_STRING, result);
1081 }
1082 }
1083
1084 /**
1085 * invokeAll(null) throws NPE
1086 */
1087 public void testInvokeAll1() throws Exception {
1088 final ExecutorService e = new CustomExecutor(2);
1089 try (PoolCleaner cleaner = cleaner(e)) {
1090 try {
1091 e.invokeAll(null);
1092 shouldThrow();
1093 } catch (NullPointerException success) {}
1094 }
1095 }
1096
1097 /**
1098 * invokeAll(empty collection) returns empty list
1099 */
1100 public void testInvokeAll2() throws Exception {
1101 final ExecutorService e = new CustomExecutor(2);
1102 final Collection<Callable<String>> emptyCollection
1103 = Collections.emptyList();
1104 try (PoolCleaner cleaner = cleaner(e)) {
1105 List<Future<String>> r = e.invokeAll(emptyCollection);
1106 assertTrue(r.isEmpty());
1107 }
1108 }
1109
1110 /**
1111 * invokeAll(c) throws NPE if c has null elements
1112 */
1113 public void testInvokeAll3() throws Exception {
1114 final ExecutorService e = new CustomExecutor(2);
1115 try (PoolCleaner cleaner = cleaner(e)) {
1116 List<Callable<String>> l = new ArrayList<>();
1117 l.add(new StringTask());
1118 l.add(null);
1119 try {
1120 e.invokeAll(l);
1121 shouldThrow();
1122 } catch (NullPointerException success) {}
1123 }
1124 }
1125
1126 /**
1127 * get of invokeAll(c) throws exception on failed task
1128 */
1129 public void testInvokeAll4() throws Exception {
1130 final ExecutorService e = new CustomExecutor(2);
1131 try (PoolCleaner cleaner = cleaner(e)) {
1132 List<Callable<String>> l = new ArrayList<>();
1133 l.add(new NPETask());
1134 List<Future<String>> futures = e.invokeAll(l);
1135 assertEquals(1, futures.size());
1136 try {
1137 futures.get(0).get();
1138 shouldThrow();
1139 } catch (ExecutionException success) {
1140 assertTrue(success.getCause() instanceof NullPointerException);
1141 }
1142 }
1143 }
1144
1145 /**
1146 * invokeAll(c) returns results of all completed tasks
1147 */
1148 public void testInvokeAll5() throws Exception {
1149 final ExecutorService e = new CustomExecutor(2);
1150 try (PoolCleaner cleaner = cleaner(e)) {
1151 List<Callable<String>> l = new ArrayList<>();
1152 l.add(new StringTask());
1153 l.add(new StringTask());
1154 List<Future<String>> futures = e.invokeAll(l);
1155 assertEquals(2, futures.size());
1156 for (Future<String> future : futures)
1157 assertSame(TEST_STRING, future.get());
1158 }
1159 }
1160
1161 /**
1162 * timed invokeAny(null) throws NPE
1163 */
1164 public void testTimedInvokeAny1() throws Exception {
1165 final ExecutorService e = new CustomExecutor(2);
1166 try (PoolCleaner cleaner = cleaner(e)) {
1167 try {
1168 e.invokeAny(null, randomTimeout(), randomTimeUnit());
1169 shouldThrow();
1170 } catch (NullPointerException success) {}
1171 }
1172 }
1173
1174 /**
1175 * timed invokeAny(,,null) throws NPE
1176 */
1177 public void testTimedInvokeAnyNullTimeUnit() throws Exception {
1178 final ExecutorService e = new CustomExecutor(2);
1179 try (PoolCleaner cleaner = cleaner(e)) {
1180 List<Callable<String>> l = new ArrayList<>();
1181 l.add(new StringTask());
1182 try {
1183 e.invokeAny(l, randomTimeout(), null);
1184 shouldThrow();
1185 } catch (NullPointerException success) {}
1186 }
1187 }
1188
1189 /**
1190 * timed invokeAny(empty collection) throws IllegalArgumentException
1191 */
1192 public void testTimedInvokeAny2() throws Exception {
1193 final ExecutorService e = new CustomExecutor(2);
1194 final Collection<Callable<String>> emptyCollection
1195 = Collections.emptyList();
1196 try (PoolCleaner cleaner = cleaner(e)) {
1197 try {
1198 e.invokeAny(emptyCollection, randomTimeout(), randomTimeUnit());
1199 shouldThrow();
1200 } catch (IllegalArgumentException success) {}
1201 }
1202 }
1203
1204 /**
1205 * timed invokeAny(c) throws NPE if c has null elements
1206 */
1207 public void testTimedInvokeAny3() throws Exception {
1208 CountDownLatch latch = new CountDownLatch(1);
1209 final ExecutorService e = new CustomExecutor(2);
1210 try (PoolCleaner cleaner = cleaner(e)) {
1211 List<Callable<String>> l = new ArrayList<>();
1212 l.add(latchAwaitingStringTask(latch));
1213 l.add(null);
1214 try {
1215 e.invokeAny(l, randomTimeout(), randomTimeUnit());
1216 shouldThrow();
1217 } catch (NullPointerException success) {}
1218 latch.countDown();
1219 }
1220 }
1221
1222 /**
1223 * timed invokeAny(c) throws ExecutionException if no task completes
1224 */
1225 public void testTimedInvokeAny4() throws Exception {
1226 final ExecutorService e = new CustomExecutor(2);
1227 try (PoolCleaner cleaner = cleaner(e)) {
1228 long startTime = System.nanoTime();
1229 List<Callable<String>> l = new ArrayList<>();
1230 l.add(new NPETask());
1231 try {
1232 e.invokeAny(l, LONG_DELAY_MS, MILLISECONDS);
1233 shouldThrow();
1234 } catch (ExecutionException success) {
1235 assertTrue(success.getCause() instanceof NullPointerException);
1236 }
1237 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1238 }
1239 }
1240
1241 /**
1242 * timed invokeAny(c) returns result of some task
1243 */
1244 public void testTimedInvokeAny5() throws Exception {
1245 final ExecutorService e = new CustomExecutor(2);
1246 try (PoolCleaner cleaner = cleaner(e)) {
1247 long startTime = System.nanoTime();
1248 List<Callable<String>> l = new ArrayList<>();
1249 l.add(new StringTask());
1250 l.add(new StringTask());
1251 String result = e.invokeAny(l, LONG_DELAY_MS, MILLISECONDS);
1252 assertSame(TEST_STRING, result);
1253 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1254 }
1255 }
1256
1257 /**
1258 * timed invokeAll(null) throws NullPointerException
1259 */
1260 public void testTimedInvokeAll1() throws Exception {
1261 final ExecutorService e = new CustomExecutor(2);
1262 try (PoolCleaner cleaner = cleaner(e)) {
1263 try {
1264 e.invokeAll(null, randomTimeout(), randomTimeUnit());
1265 shouldThrow();
1266 } catch (NullPointerException success) {}
1267 }
1268 }
1269
1270 /**
1271 * timed invokeAll(,,null) throws NullPointerException
1272 */
1273 public void testTimedInvokeAllNullTimeUnit() throws Exception {
1274 final ExecutorService e = new CustomExecutor(2);
1275 try (PoolCleaner cleaner = cleaner(e)) {
1276 List<Callable<String>> l = new ArrayList<>();
1277 l.add(new StringTask());
1278 try {
1279 e.invokeAll(l, randomTimeout(), null);
1280 shouldThrow();
1281 } catch (NullPointerException success) {}
1282 }
1283 }
1284
1285 /**
1286 * timed invokeAll(empty collection) returns empty list
1287 */
1288 public void testTimedInvokeAll2() throws Exception {
1289 final ExecutorService e = new CustomExecutor(2);
1290 final Collection<Callable<String>> emptyCollection
1291 = Collections.emptyList();
1292 try (PoolCleaner cleaner = cleaner(e)) {
1293 List<Future<String>> r =
1294 e.invokeAll(emptyCollection, randomTimeout(), randomTimeUnit());
1295 assertTrue(r.isEmpty());
1296 }
1297 }
1298
1299 /**
1300 * timed invokeAll(c) throws NPE if c has null elements
1301 */
1302 public void testTimedInvokeAll3() throws Exception {
1303 final ExecutorService e = new CustomExecutor(2);
1304 try (PoolCleaner cleaner = cleaner(e)) {
1305 List<Callable<String>> l = new ArrayList<>();
1306 l.add(new StringTask());
1307 l.add(null);
1308 try {
1309 e.invokeAll(l, randomTimeout(), randomTimeUnit());
1310 shouldThrow();
1311 } catch (NullPointerException success) {}
1312 }
1313 }
1314
1315 /**
1316 * get of element of invokeAll(c) throws exception on failed task
1317 */
1318 public void testTimedInvokeAll4() throws Exception {
1319 final ExecutorService e = new CustomExecutor(2);
1320 final Collection<Callable<String>> c = new ArrayList<>();
1321 c.add(new NPETask());
1322 try (PoolCleaner cleaner = cleaner(e)) {
1323 List<Future<String>> futures =
1324 e.invokeAll(c, LONG_DELAY_MS, MILLISECONDS);
1325 assertEquals(1, futures.size());
1326 try {
1327 futures.get(0).get();
1328 shouldThrow();
1329 } catch (ExecutionException success) {
1330 assertTrue(success.getCause() instanceof NullPointerException);
1331 }
1332 }
1333 }
1334
1335 /**
1336 * timed invokeAll(c) returns results of all completed tasks
1337 */
1338 public void testTimedInvokeAll5() throws Exception {
1339 final ExecutorService e = new CustomExecutor(2);
1340 try (PoolCleaner cleaner = cleaner(e)) {
1341 List<Callable<String>> l = new ArrayList<>();
1342 l.add(new StringTask());
1343 l.add(new StringTask());
1344 List<Future<String>> futures =
1345 e.invokeAll(l, LONG_DELAY_MS, MILLISECONDS);
1346 assertEquals(2, futures.size());
1347 for (Future<String> future : futures)
1348 assertSame(TEST_STRING, future.get());
1349 }
1350 }
1351
1352 /**
1353 * timed invokeAll(c) cancels tasks not completed by timeout
1354 */
1355 public void testTimedInvokeAll6() throws Exception {
1356 for (long timeout = timeoutMillis();;) {
1357 final CountDownLatch done = new CountDownLatch(1);
1358 final Callable<String> waiter = new CheckedCallable<String>() {
1359 public String realCall() {
1360 try { done.await(LONG_DELAY_MS, MILLISECONDS); }
1361 catch (InterruptedException ok) {}
1362 return "1"; }};
1363 final ExecutorService p = new CustomExecutor(2);
1364 try (PoolCleaner cleaner = cleaner(p, done)) {
1365 List<Callable<String>> tasks = new ArrayList<>();
1366 tasks.add(new StringTask("0"));
1367 tasks.add(waiter);
1368 tasks.add(new StringTask("2"));
1369 long startTime = System.nanoTime();
1370 List<Future<String>> futures =
1371 p.invokeAll(tasks, timeout, MILLISECONDS);
1372 assertEquals(tasks.size(), futures.size());
1373 assertTrue(millisElapsedSince(startTime) >= timeout);
1374 for (Future future : futures)
1375 assertTrue(future.isDone());
1376 assertTrue(futures.get(1).isCancelled());
1377 try {
1378 assertEquals("0", futures.get(0).get());
1379 assertEquals("2", futures.get(2).get());
1380 break;
1381 } catch (CancellationException retryWithLongerTimeout) {
1382 timeout *= 2;
1383 if (timeout >= LONG_DELAY_MS / 2)
1384 fail("expected exactly one task to be cancelled");
1385 }
1386 }
1387 }
1388 }
1389
1390 }