ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/ScheduledExecutorTest.java
Revision: 1.90
Committed: Tue Mar 28 23:21:24 2017 UTC (7 years, 1 month ago) by jsr166
Branch: MAIN
Changes since 1.89: +35 -16 lines
Log Message:
testShutdown_cancellation: ensure periodic tasks continue executing if policy set

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include Andrew Wright, Jeffrey Hayes,
6 * Pat Fisher, Mike Judd.
7 */
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.NANOSECONDS;
11 import static java.util.concurrent.TimeUnit.SECONDS;
12
13 import java.util.ArrayList;
14 import java.util.HashSet;
15 import java.util.List;
16 import java.util.concurrent.BlockingQueue;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.CancellationException;
19 import java.util.concurrent.CountDownLatch;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.RejectedExecutionException;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.ScheduledThreadPoolExecutor;
26 import java.util.concurrent.ThreadFactory;
27 import java.util.concurrent.ThreadLocalRandom;
28 import java.util.concurrent.ThreadPoolExecutor;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import java.util.concurrent.atomic.AtomicLong;
32 import java.util.stream.Stream;
33
34 import junit.framework.Test;
35 import junit.framework.TestSuite;
36
37 public class ScheduledExecutorTest extends JSR166TestCase {
38 public static void main(String[] args) {
39 main(suite(), args);
40 }
41 public static Test suite() {
42 return new TestSuite(ScheduledExecutorTest.class);
43 }
44
45 /**
46 * execute successfully executes a runnable
47 */
48 public void testExecute() throws InterruptedException {
49 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
50 try (PoolCleaner cleaner = cleaner(p)) {
51 final CountDownLatch done = new CountDownLatch(1);
52 final Runnable task = new CheckedRunnable() {
53 public void realRun() { done.countDown(); }};
54 p.execute(task);
55 await(done);
56 }
57 }
58
59 /**
60 * delayed schedule of callable successfully executes after delay
61 */
62 public void testSchedule1() throws Exception {
63 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
64 try (PoolCleaner cleaner = cleaner(p)) {
65 final long startTime = System.nanoTime();
66 final CountDownLatch done = new CountDownLatch(1);
67 Callable task = new CheckedCallable<Boolean>() {
68 public Boolean realCall() {
69 done.countDown();
70 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
71 return Boolean.TRUE;
72 }};
73 Future f = p.schedule(task, timeoutMillis(), MILLISECONDS);
74 assertSame(Boolean.TRUE, f.get());
75 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
76 assertTrue(done.await(0L, MILLISECONDS));
77 }
78 }
79
80 /**
81 * delayed schedule of runnable successfully executes after delay
82 */
83 public void testSchedule3() throws Exception {
84 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
85 try (PoolCleaner cleaner = cleaner(p)) {
86 final long startTime = System.nanoTime();
87 final CountDownLatch done = new CountDownLatch(1);
88 Runnable task = new CheckedRunnable() {
89 public void realRun() {
90 done.countDown();
91 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
92 }};
93 Future f = p.schedule(task, timeoutMillis(), MILLISECONDS);
94 await(done);
95 assertNull(f.get(LONG_DELAY_MS, MILLISECONDS));
96 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
97 }
98 }
99
100 /**
101 * scheduleAtFixedRate executes runnable after given initial delay
102 */
103 public void testSchedule4() throws Exception {
104 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
105 try (PoolCleaner cleaner = cleaner(p)) {
106 final long startTime = System.nanoTime();
107 final CountDownLatch done = new CountDownLatch(1);
108 Runnable task = new CheckedRunnable() {
109 public void realRun() {
110 done.countDown();
111 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
112 }};
113 ScheduledFuture f =
114 p.scheduleAtFixedRate(task, timeoutMillis(),
115 LONG_DELAY_MS, MILLISECONDS);
116 await(done);
117 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
118 f.cancel(true);
119 }
120 }
121
122 /**
123 * scheduleWithFixedDelay executes runnable after given initial delay
124 */
125 public void testSchedule5() throws Exception {
126 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
127 try (PoolCleaner cleaner = cleaner(p)) {
128 final long startTime = System.nanoTime();
129 final CountDownLatch done = new CountDownLatch(1);
130 Runnable task = new CheckedRunnable() {
131 public void realRun() {
132 done.countDown();
133 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
134 }};
135 ScheduledFuture f =
136 p.scheduleWithFixedDelay(task, timeoutMillis(),
137 LONG_DELAY_MS, MILLISECONDS);
138 await(done);
139 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
140 f.cancel(true);
141 }
142 }
143
144 static class RunnableCounter implements Runnable {
145 AtomicInteger count = new AtomicInteger(0);
146 public void run() { count.getAndIncrement(); }
147 }
148
149 /**
150 * scheduleAtFixedRate executes series of tasks at given rate.
151 * Eventually, it must hold that:
152 * cycles - 1 <= elapsedMillis/delay < cycles
153 */
154 public void testFixedRateSequence() throws InterruptedException {
155 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
156 try (PoolCleaner cleaner = cleaner(p)) {
157 for (int delay = 1; delay <= LONG_DELAY_MS; delay *= 3) {
158 final long startTime = System.nanoTime();
159 final int cycles = 8;
160 final CountDownLatch done = new CountDownLatch(cycles);
161 final Runnable task = new CheckedRunnable() {
162 public void realRun() { done.countDown(); }};
163 final ScheduledFuture periodicTask =
164 p.scheduleAtFixedRate(task, 0, delay, MILLISECONDS);
165 final int totalDelayMillis = (cycles - 1) * delay;
166 await(done, totalDelayMillis + LONG_DELAY_MS);
167 periodicTask.cancel(true);
168 final long elapsedMillis = millisElapsedSince(startTime);
169 assertTrue(elapsedMillis >= totalDelayMillis);
170 if (elapsedMillis <= cycles * delay)
171 return;
172 // else retry with longer delay
173 }
174 fail("unexpected execution rate");
175 }
176 }
177
178 /**
179 * scheduleWithFixedDelay executes series of tasks with given period.
180 * Eventually, it must hold that each task starts at least delay and at
181 * most 2 * delay after the termination of the previous task.
182 */
183 public void testFixedDelaySequence() throws InterruptedException {
184 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
185 try (PoolCleaner cleaner = cleaner(p)) {
186 for (int delay = 1; delay <= LONG_DELAY_MS; delay *= 3) {
187 final long startTime = System.nanoTime();
188 final AtomicLong previous = new AtomicLong(startTime);
189 final AtomicBoolean tryLongerDelay = new AtomicBoolean(false);
190 final int cycles = 8;
191 final CountDownLatch done = new CountDownLatch(cycles);
192 final int d = delay;
193 final Runnable task = new CheckedRunnable() {
194 public void realRun() {
195 long now = System.nanoTime();
196 long elapsedMillis
197 = NANOSECONDS.toMillis(now - previous.get());
198 if (done.getCount() == cycles) { // first execution
199 if (elapsedMillis >= d)
200 tryLongerDelay.set(true);
201 } else {
202 assertTrue(elapsedMillis >= d);
203 if (elapsedMillis >= 2 * d)
204 tryLongerDelay.set(true);
205 }
206 previous.set(now);
207 done.countDown();
208 }};
209 final ScheduledFuture periodicTask =
210 p.scheduleWithFixedDelay(task, 0, delay, MILLISECONDS);
211 final int totalDelayMillis = (cycles - 1) * delay;
212 await(done, totalDelayMillis + cycles * LONG_DELAY_MS);
213 periodicTask.cancel(true);
214 final long elapsedMillis = millisElapsedSince(startTime);
215 assertTrue(elapsedMillis >= totalDelayMillis);
216 if (!tryLongerDelay.get())
217 return;
218 // else retry with longer delay
219 }
220 fail("unexpected execution rate");
221 }
222 }
223
224 /**
225 * execute(null) throws NPE
226 */
227 public void testExecuteNull() throws InterruptedException {
228 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
229 try (PoolCleaner cleaner = cleaner(p)) {
230 try {
231 p.execute(null);
232 shouldThrow();
233 } catch (NullPointerException success) {}
234 }
235 }
236
237 /**
238 * schedule(null) throws NPE
239 */
240 public void testScheduleNull() throws InterruptedException {
241 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
242 try (PoolCleaner cleaner = cleaner(p)) {
243 try {
244 TrackedCallable callable = null;
245 Future f = p.schedule(callable, SHORT_DELAY_MS, MILLISECONDS);
246 shouldThrow();
247 } catch (NullPointerException success) {}
248 }
249 }
250
251 /**
252 * execute throws RejectedExecutionException if shutdown
253 */
254 public void testSchedule1_RejectedExecutionException() throws InterruptedException {
255 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
256 try (PoolCleaner cleaner = cleaner(p)) {
257 try {
258 p.shutdown();
259 p.schedule(new NoOpRunnable(),
260 MEDIUM_DELAY_MS, MILLISECONDS);
261 shouldThrow();
262 } catch (RejectedExecutionException success) {
263 } catch (SecurityException ok) {}
264 }
265 }
266
267 /**
268 * schedule throws RejectedExecutionException if shutdown
269 */
270 public void testSchedule2_RejectedExecutionException() throws InterruptedException {
271 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
272 try (PoolCleaner cleaner = cleaner(p)) {
273 try {
274 p.shutdown();
275 p.schedule(new NoOpCallable(),
276 MEDIUM_DELAY_MS, MILLISECONDS);
277 shouldThrow();
278 } catch (RejectedExecutionException success) {
279 } catch (SecurityException ok) {}
280 }
281 }
282
283 /**
284 * schedule callable throws RejectedExecutionException if shutdown
285 */
286 public void testSchedule3_RejectedExecutionException() throws InterruptedException {
287 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
288 try (PoolCleaner cleaner = cleaner(p)) {
289 try {
290 p.shutdown();
291 p.schedule(new NoOpCallable(),
292 MEDIUM_DELAY_MS, MILLISECONDS);
293 shouldThrow();
294 } catch (RejectedExecutionException success) {
295 } catch (SecurityException ok) {}
296 }
297 }
298
299 /**
300 * scheduleAtFixedRate throws RejectedExecutionException if shutdown
301 */
302 public void testScheduleAtFixedRate1_RejectedExecutionException() throws InterruptedException {
303 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
304 try (PoolCleaner cleaner = cleaner(p)) {
305 try {
306 p.shutdown();
307 p.scheduleAtFixedRate(new NoOpRunnable(),
308 MEDIUM_DELAY_MS, MEDIUM_DELAY_MS, MILLISECONDS);
309 shouldThrow();
310 } catch (RejectedExecutionException success) {
311 } catch (SecurityException ok) {}
312 }
313 }
314
315 /**
316 * scheduleWithFixedDelay throws RejectedExecutionException if shutdown
317 */
318 public void testScheduleWithFixedDelay1_RejectedExecutionException() throws InterruptedException {
319 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
320 try (PoolCleaner cleaner = cleaner(p)) {
321 try {
322 p.shutdown();
323 p.scheduleWithFixedDelay(new NoOpRunnable(),
324 MEDIUM_DELAY_MS, MEDIUM_DELAY_MS, MILLISECONDS);
325 shouldThrow();
326 } catch (RejectedExecutionException success) {
327 } catch (SecurityException ok) {}
328 }
329 }
330
331 /**
332 * getActiveCount increases but doesn't overestimate, when a
333 * thread becomes active
334 */
335 public void testGetActiveCount() throws InterruptedException {
336 final CountDownLatch done = new CountDownLatch(1);
337 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(2);
338 try (PoolCleaner cleaner = cleaner(p, done)) {
339 final CountDownLatch threadStarted = new CountDownLatch(1);
340 assertEquals(0, p.getActiveCount());
341 p.execute(new CheckedRunnable() {
342 public void realRun() throws InterruptedException {
343 threadStarted.countDown();
344 assertEquals(1, p.getActiveCount());
345 await(done);
346 }});
347 await(threadStarted);
348 assertEquals(1, p.getActiveCount());
349 }
350 }
351
352 /**
353 * getCompletedTaskCount increases, but doesn't overestimate,
354 * when tasks complete
355 */
356 public void testGetCompletedTaskCount() throws InterruptedException {
357 final ThreadPoolExecutor p = new ScheduledThreadPoolExecutor(2);
358 try (PoolCleaner cleaner = cleaner(p)) {
359 final CountDownLatch threadStarted = new CountDownLatch(1);
360 final CountDownLatch threadProceed = new CountDownLatch(1);
361 final CountDownLatch threadDone = new CountDownLatch(1);
362 assertEquals(0, p.getCompletedTaskCount());
363 p.execute(new CheckedRunnable() {
364 public void realRun() throws InterruptedException {
365 threadStarted.countDown();
366 assertEquals(0, p.getCompletedTaskCount());
367 await(threadProceed);
368 threadDone.countDown();
369 }});
370 await(threadStarted);
371 assertEquals(0, p.getCompletedTaskCount());
372 threadProceed.countDown();
373 await(threadDone);
374 long startTime = System.nanoTime();
375 while (p.getCompletedTaskCount() != 1) {
376 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
377 fail("timed out");
378 Thread.yield();
379 }
380 }
381 }
382
383 /**
384 * getCorePoolSize returns size given in constructor if not otherwise set
385 */
386 public void testGetCorePoolSize() throws InterruptedException {
387 ThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
388 try (PoolCleaner cleaner = cleaner(p)) {
389 assertEquals(1, p.getCorePoolSize());
390 }
391 }
392
393 /**
394 * getLargestPoolSize increases, but doesn't overestimate, when
395 * multiple threads active
396 */
397 public void testGetLargestPoolSize() throws InterruptedException {
398 final int THREADS = 3;
399 final ThreadPoolExecutor p = new ScheduledThreadPoolExecutor(THREADS);
400 final CountDownLatch threadsStarted = new CountDownLatch(THREADS);
401 final CountDownLatch done = new CountDownLatch(1);
402 try (PoolCleaner cleaner = cleaner(p, done)) {
403 assertEquals(0, p.getLargestPoolSize());
404 for (int i = 0; i < THREADS; i++)
405 p.execute(new CheckedRunnable() {
406 public void realRun() throws InterruptedException {
407 threadsStarted.countDown();
408 await(done);
409 assertEquals(THREADS, p.getLargestPoolSize());
410 }});
411 await(threadsStarted);
412 assertEquals(THREADS, p.getLargestPoolSize());
413 }
414 assertEquals(THREADS, p.getLargestPoolSize());
415 }
416
417 /**
418 * getPoolSize increases, but doesn't overestimate, when threads
419 * become active
420 */
421 public void testGetPoolSize() throws InterruptedException {
422 final ThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
423 final CountDownLatch threadStarted = new CountDownLatch(1);
424 final CountDownLatch done = new CountDownLatch(1);
425 try (PoolCleaner cleaner = cleaner(p, done)) {
426 assertEquals(0, p.getPoolSize());
427 p.execute(new CheckedRunnable() {
428 public void realRun() throws InterruptedException {
429 threadStarted.countDown();
430 assertEquals(1, p.getPoolSize());
431 await(done);
432 }});
433 await(threadStarted);
434 assertEquals(1, p.getPoolSize());
435 }
436 }
437
438 /**
439 * getTaskCount increases, but doesn't overestimate, when tasks
440 * submitted
441 */
442 public void testGetTaskCount() throws InterruptedException {
443 final int TASKS = 3;
444 final CountDownLatch done = new CountDownLatch(1);
445 final ThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
446 try (PoolCleaner cleaner = cleaner(p, done)) {
447 final CountDownLatch threadStarted = new CountDownLatch(1);
448 assertEquals(0, p.getTaskCount());
449 assertEquals(0, p.getCompletedTaskCount());
450 p.execute(new CheckedRunnable() {
451 public void realRun() throws InterruptedException {
452 threadStarted.countDown();
453 await(done);
454 }});
455 await(threadStarted);
456 assertEquals(1, p.getTaskCount());
457 assertEquals(0, p.getCompletedTaskCount());
458 for (int i = 0; i < TASKS; i++) {
459 assertEquals(1 + i, p.getTaskCount());
460 p.execute(new CheckedRunnable() {
461 public void realRun() throws InterruptedException {
462 threadStarted.countDown();
463 assertEquals(1 + TASKS, p.getTaskCount());
464 await(done);
465 }});
466 }
467 assertEquals(1 + TASKS, p.getTaskCount());
468 assertEquals(0, p.getCompletedTaskCount());
469 }
470 assertEquals(1 + TASKS, p.getTaskCount());
471 assertEquals(1 + TASKS, p.getCompletedTaskCount());
472 }
473
474 /**
475 * getThreadFactory returns factory in constructor if not set
476 */
477 public void testGetThreadFactory() throws InterruptedException {
478 final ThreadFactory threadFactory = new SimpleThreadFactory();
479 final ScheduledThreadPoolExecutor p =
480 new ScheduledThreadPoolExecutor(1, threadFactory);
481 try (PoolCleaner cleaner = cleaner(p)) {
482 assertSame(threadFactory, p.getThreadFactory());
483 }
484 }
485
486 /**
487 * setThreadFactory sets the thread factory returned by getThreadFactory
488 */
489 public void testSetThreadFactory() throws InterruptedException {
490 ThreadFactory threadFactory = new SimpleThreadFactory();
491 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
492 try (PoolCleaner cleaner = cleaner(p)) {
493 p.setThreadFactory(threadFactory);
494 assertSame(threadFactory, p.getThreadFactory());
495 }
496 }
497
498 /**
499 * setThreadFactory(null) throws NPE
500 */
501 public void testSetThreadFactoryNull() throws InterruptedException {
502 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
503 try (PoolCleaner cleaner = cleaner(p)) {
504 try {
505 p.setThreadFactory(null);
506 shouldThrow();
507 } catch (NullPointerException success) {}
508 }
509 }
510
511 /**
512 * The default rejected execution handler is AbortPolicy.
513 */
514 public void testDefaultRejectedExecutionHandler() {
515 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
516 try (PoolCleaner cleaner = cleaner(p)) {
517 assertTrue(p.getRejectedExecutionHandler()
518 instanceof ThreadPoolExecutor.AbortPolicy);
519 }
520 }
521
522 /**
523 * isShutdown is false before shutdown, true after
524 */
525 public void testIsShutdown() {
526 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
527 assertFalse(p.isShutdown());
528 try (PoolCleaner cleaner = cleaner(p)) {
529 try {
530 p.shutdown();
531 assertTrue(p.isShutdown());
532 } catch (SecurityException ok) {}
533 }
534 }
535
536 /**
537 * isTerminated is false before termination, true after
538 */
539 public void testIsTerminated() throws InterruptedException {
540 final ThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
541 try (PoolCleaner cleaner = cleaner(p)) {
542 final CountDownLatch threadStarted = new CountDownLatch(1);
543 final CountDownLatch done = new CountDownLatch(1);
544 assertFalse(p.isTerminated());
545 p.execute(new CheckedRunnable() {
546 public void realRun() throws InterruptedException {
547 assertFalse(p.isTerminated());
548 threadStarted.countDown();
549 await(done);
550 }});
551 await(threadStarted);
552 assertFalse(p.isTerminating());
553 done.countDown();
554 try { p.shutdown(); } catch (SecurityException ok) { return; }
555 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
556 assertTrue(p.isTerminated());
557 }
558 }
559
560 /**
561 * isTerminating is not true when running or when terminated
562 */
563 public void testIsTerminating() throws InterruptedException {
564 final ThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
565 final CountDownLatch threadStarted = new CountDownLatch(1);
566 final CountDownLatch done = new CountDownLatch(1);
567 try (PoolCleaner cleaner = cleaner(p)) {
568 assertFalse(p.isTerminating());
569 p.execute(new CheckedRunnable() {
570 public void realRun() throws InterruptedException {
571 assertFalse(p.isTerminating());
572 threadStarted.countDown();
573 await(done);
574 }});
575 await(threadStarted);
576 assertFalse(p.isTerminating());
577 done.countDown();
578 try { p.shutdown(); } catch (SecurityException ok) { return; }
579 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
580 assertTrue(p.isTerminated());
581 assertFalse(p.isTerminating());
582 }
583 }
584
585 /**
586 * getQueue returns the work queue, which contains queued tasks
587 */
588 public void testGetQueue() throws InterruptedException {
589 final CountDownLatch done = new CountDownLatch(1);
590 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
591 try (PoolCleaner cleaner = cleaner(p, done)) {
592 final CountDownLatch threadStarted = new CountDownLatch(1);
593 ScheduledFuture[] tasks = new ScheduledFuture[5];
594 for (int i = 0; i < tasks.length; i++) {
595 Runnable r = new CheckedRunnable() {
596 public void realRun() throws InterruptedException {
597 threadStarted.countDown();
598 await(done);
599 }};
600 tasks[i] = p.schedule(r, 1, MILLISECONDS);
601 }
602 await(threadStarted);
603 BlockingQueue<Runnable> q = p.getQueue();
604 assertTrue(q.contains(tasks[tasks.length - 1]));
605 assertFalse(q.contains(tasks[0]));
606 }
607 }
608
609 /**
610 * remove(task) removes queued task, and fails to remove active task
611 */
612 public void testRemove() throws InterruptedException {
613 final CountDownLatch done = new CountDownLatch(1);
614 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
615 try (PoolCleaner cleaner = cleaner(p, done)) {
616 ScheduledFuture[] tasks = new ScheduledFuture[5];
617 final CountDownLatch threadStarted = new CountDownLatch(1);
618 for (int i = 0; i < tasks.length; i++) {
619 Runnable r = new CheckedRunnable() {
620 public void realRun() throws InterruptedException {
621 threadStarted.countDown();
622 await(done);
623 }};
624 tasks[i] = p.schedule(r, 1, MILLISECONDS);
625 }
626 await(threadStarted);
627 BlockingQueue<Runnable> q = p.getQueue();
628 assertFalse(p.remove((Runnable)tasks[0]));
629 assertTrue(q.contains((Runnable)tasks[4]));
630 assertTrue(q.contains((Runnable)tasks[3]));
631 assertTrue(p.remove((Runnable)tasks[4]));
632 assertFalse(p.remove((Runnable)tasks[4]));
633 assertFalse(q.contains((Runnable)tasks[4]));
634 assertTrue(q.contains((Runnable)tasks[3]));
635 assertTrue(p.remove((Runnable)tasks[3]));
636 assertFalse(q.contains((Runnable)tasks[3]));
637 }
638 }
639
640 /**
641 * purge eventually removes cancelled tasks from the queue
642 */
643 public void testPurge() throws InterruptedException {
644 final ScheduledFuture[] tasks = new ScheduledFuture[5];
645 final Runnable releaser = new Runnable() { public void run() {
646 for (ScheduledFuture task : tasks)
647 if (task != null) task.cancel(true); }};
648 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
649 try (PoolCleaner cleaner = cleaner(p, releaser)) {
650 for (int i = 0; i < tasks.length; i++)
651 tasks[i] = p.schedule(new SmallPossiblyInterruptedRunnable(),
652 LONG_DELAY_MS, MILLISECONDS);
653 int max = tasks.length;
654 if (tasks[4].cancel(true)) --max;
655 if (tasks[3].cancel(true)) --max;
656 // There must eventually be an interference-free point at
657 // which purge will not fail. (At worst, when queue is empty.)
658 long startTime = System.nanoTime();
659 do {
660 p.purge();
661 long count = p.getTaskCount();
662 if (count == max)
663 return;
664 } while (millisElapsedSince(startTime) < LONG_DELAY_MS);
665 fail("Purge failed to remove cancelled tasks");
666 }
667 }
668
669 /**
670 * shutdownNow returns a list containing tasks that were not run,
671 * and those tasks are drained from the queue
672 */
673 public void testShutdownNow() throws InterruptedException {
674 final int poolSize = 2;
675 final int count = 5;
676 final AtomicInteger ran = new AtomicInteger(0);
677 final ScheduledThreadPoolExecutor p =
678 new ScheduledThreadPoolExecutor(poolSize);
679 final CountDownLatch threadsStarted = new CountDownLatch(poolSize);
680 Runnable waiter = new CheckedRunnable() { public void realRun() {
681 threadsStarted.countDown();
682 try {
683 MILLISECONDS.sleep(2 * LONG_DELAY_MS);
684 } catch (InterruptedException success) {}
685 ran.getAndIncrement();
686 }};
687 for (int i = 0; i < count; i++)
688 p.execute(waiter);
689 await(threadsStarted);
690 assertEquals(poolSize, p.getActiveCount());
691 assertEquals(0, p.getCompletedTaskCount());
692 final List<Runnable> queuedTasks;
693 try {
694 queuedTasks = p.shutdownNow();
695 } catch (SecurityException ok) {
696 return; // Allowed in case test doesn't have privs
697 }
698 assertTrue(p.isShutdown());
699 assertTrue(p.getQueue().isEmpty());
700 assertEquals(count - poolSize, queuedTasks.size());
701 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
702 assertTrue(p.isTerminated());
703 assertEquals(poolSize, ran.get());
704 assertEquals(poolSize, p.getCompletedTaskCount());
705 }
706
707 /**
708 * shutdownNow returns a list containing tasks that were not run,
709 * and those tasks are drained from the queue
710 */
711 public void testShutdownNow_delayedTasks() throws InterruptedException {
712 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
713 List<ScheduledFuture> tasks = new ArrayList<>();
714 for (int i = 0; i < 3; i++) {
715 Runnable r = new NoOpRunnable();
716 tasks.add(p.schedule(r, 9, SECONDS));
717 tasks.add(p.scheduleAtFixedRate(r, 9, 9, SECONDS));
718 tasks.add(p.scheduleWithFixedDelay(r, 9, 9, SECONDS));
719 }
720 if (testImplementationDetails)
721 assertEquals(new HashSet(tasks), new HashSet(p.getQueue()));
722 final List<Runnable> queuedTasks;
723 try {
724 queuedTasks = p.shutdownNow();
725 } catch (SecurityException ok) {
726 return; // Allowed in case test doesn't have privs
727 }
728 assertTrue(p.isShutdown());
729 assertTrue(p.getQueue().isEmpty());
730 if (testImplementationDetails)
731 assertEquals(new HashSet(tasks), new HashSet(queuedTasks));
732 assertEquals(tasks.size(), queuedTasks.size());
733 for (ScheduledFuture task : tasks) {
734 assertFalse(task.isDone());
735 assertFalse(task.isCancelled());
736 }
737 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
738 assertTrue(p.isTerminated());
739 }
740
741 /**
742 * By default, periodic tasks are cancelled at shutdown.
743 * By default, delayed tasks keep running after shutdown.
744 * Check that changing the default values work:
745 * - setExecuteExistingDelayedTasksAfterShutdownPolicy
746 * - setContinueExistingPeriodicTasksAfterShutdownPolicy
747 */
748 public void testShutdown_cancellation() throws Exception {
749 final int poolSize = 6;
750 final ScheduledThreadPoolExecutor p
751 = new ScheduledThreadPoolExecutor(poolSize);
752 final BlockingQueue<Runnable> q = p.getQueue();
753 final ThreadLocalRandom rnd = ThreadLocalRandom.current();
754 final boolean effectiveDelayedPolicy;
755 final boolean effectivePeriodicPolicy;
756 final boolean effectiveRemovePolicy;
757
758 if (rnd.nextBoolean())
759 p.setExecuteExistingDelayedTasksAfterShutdownPolicy(
760 effectiveDelayedPolicy = rnd.nextBoolean());
761 else
762 effectiveDelayedPolicy = true;
763 assertEquals(effectiveDelayedPolicy,
764 p.getExecuteExistingDelayedTasksAfterShutdownPolicy());
765
766 if (rnd.nextBoolean())
767 p.setContinueExistingPeriodicTasksAfterShutdownPolicy(
768 effectivePeriodicPolicy = rnd.nextBoolean());
769 else
770 effectivePeriodicPolicy = false;
771 assertEquals(effectivePeriodicPolicy,
772 p.getContinueExistingPeriodicTasksAfterShutdownPolicy());
773
774 if (rnd.nextBoolean())
775 p.setRemoveOnCancelPolicy(
776 effectiveRemovePolicy = rnd.nextBoolean());
777 else
778 effectiveRemovePolicy = false;
779 assertEquals(effectiveRemovePolicy,
780 p.getRemoveOnCancelPolicy());
781
782 final boolean periodicTasksContinue = effectivePeriodicPolicy && rnd.nextBoolean();
783
784 // Strategy: Wedge the pool with one wave of "blocker" tasks,
785 // then add a second wave that waits in the queue until unblocked.
786 final AtomicInteger ran = new AtomicInteger(0);
787 final CountDownLatch poolBlocked = new CountDownLatch(poolSize);
788 final CountDownLatch unblock = new CountDownLatch(1);
789 final RuntimeException exception = new RuntimeException();
790
791 class Task implements Runnable {
792 public void run() {
793 try {
794 ran.getAndIncrement();
795 poolBlocked.countDown();
796 await(unblock);
797 } catch (Throwable fail) { threadUnexpectedException(fail); }
798 }
799 }
800
801 class PeriodicTask extends Task {
802 PeriodicTask(int rounds) { this.rounds = rounds; }
803 int rounds;
804 public void run() {
805 if (--rounds == 0) super.run();
806 // throw exception to surely terminate this periodic task,
807 // but in a separate execution and in a detectable way.
808 if (rounds == -1) throw exception;
809 }
810 }
811
812 Runnable task = new Task();
813
814 List<Future<?>> immediates = new ArrayList<>();
815 List<Future<?>> delayeds = new ArrayList<>();
816 List<Future<?>> periodics = new ArrayList<>();
817
818 immediates.add(p.submit(task));
819 delayeds.add(p.schedule(task, 1, MILLISECONDS));
820 for (int rounds : new int[] { 1, 2 }) {
821 periodics.add(p.scheduleAtFixedRate(
822 new PeriodicTask(rounds), 1, 1, MILLISECONDS));
823 periodics.add(p.scheduleWithFixedDelay(
824 new PeriodicTask(rounds), 1, 1, MILLISECONDS));
825 }
826
827 await(poolBlocked);
828
829 assertEquals(poolSize, ran.get());
830 assertTrue(q.isEmpty());
831
832 // Add second wave of tasks.
833 immediates.add(p.submit(task));
834 long delay_ms = effectiveDelayedPolicy ? 1 : LONG_DELAY_MS;
835 delayeds.add(p.schedule(task, delay_ms, MILLISECONDS));
836 for (int rounds : new int[] { 1, 2 }) {
837 periodics.add(p.scheduleAtFixedRate(
838 new PeriodicTask(rounds), 1, 1, MILLISECONDS));
839 periodics.add(p.scheduleWithFixedDelay(
840 new PeriodicTask(rounds), 1, 1, MILLISECONDS));
841 }
842
843 assertEquals(poolSize, q.size());
844 assertEquals(poolSize, ran.get());
845
846 immediates.forEach(
847 f -> assertTrue(((ScheduledFuture)f).getDelay(NANOSECONDS) <= 0L));
848
849 Stream.of(immediates, delayeds, periodics).flatMap(c -> c.stream())
850 .forEach(f -> assertFalse(f.isDone()));
851
852 try { p.shutdown(); } catch (SecurityException ok) { return; }
853 assertTrue(p.isShutdown());
854 assertTrue(p.isTerminating());
855 assertFalse(p.isTerminated());
856
857 if (rnd.nextBoolean())
858 assertThrows(
859 RejectedExecutionException.class,
860 () -> p.submit(task),
861 () -> p.schedule(task, 1, SECONDS),
862 () -> p.scheduleAtFixedRate(
863 new PeriodicTask(1), 1, 1, SECONDS),
864 () -> p.scheduleWithFixedDelay(
865 new PeriodicTask(2), 1, 1, SECONDS));
866
867 assertTrue(q.contains(immediates.get(1)));
868 assertTrue(!effectiveDelayedPolicy
869 ^ q.contains(delayeds.get(1)));
870 assertTrue(!effectivePeriodicPolicy
871 ^ q.containsAll(periodics.subList(4, 8)));
872
873 immediates.forEach(f -> assertFalse(f.isDone()));
874
875 assertFalse(delayeds.get(0).isDone());
876 if (effectiveDelayedPolicy)
877 assertFalse(delayeds.get(1).isDone());
878 else
879 assertTrue(delayeds.get(1).isCancelled());
880
881 if (testImplementationDetails) {
882 if (effectivePeriodicPolicy)
883 periodics.forEach(
884 f -> {
885 assertFalse(f.isDone());
886 if (!periodicTasksContinue)
887 assertTrue(f.cancel(false));
888 });
889 else {
890 periodics.subList(0, 4).forEach(f -> assertFalse(f.isDone()));
891 periodics.subList(4, 8).forEach(f -> assertTrue(f.isCancelled()));
892 }
893 }
894
895 unblock.countDown(); // Release all pool threads
896
897 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
898 assertFalse(p.isTerminating());
899 assertTrue(p.isTerminated());
900
901 assertTrue(q.isEmpty());
902
903 Stream.of(immediates, delayeds, periodics).flatMap(c -> c.stream())
904 .forEach(f -> assertTrue(f.isDone()));
905
906 for (Future<?> f : immediates) assertNull(f.get());
907
908 assertNull(delayeds.get(0).get());
909 if (effectiveDelayedPolicy)
910 assertNull(delayeds.get(1).get());
911 else
912 assertTrue(delayeds.get(1).isCancelled());
913
914 if (periodicTasksContinue)
915 periodics.forEach(
916 f -> {
917 try { f.get(); }
918 catch (ExecutionException success) {
919 assertSame(exception, success.getCause());
920 }
921 catch (Throwable fail) { threadUnexpectedException(fail); }
922 });
923 else
924 periodics.forEach(f -> assertTrue(f.isCancelled()));
925
926 assertEquals(poolSize + 1
927 + (effectiveDelayedPolicy ? 1 : 0)
928 + (periodicTasksContinue ? 4 : 0),
929 ran.get());
930 }
931
932 /**
933 * completed submit of callable returns result
934 */
935 public void testSubmitCallable() throws Exception {
936 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
937 try (PoolCleaner cleaner = cleaner(e)) {
938 Future<String> future = e.submit(new StringTask());
939 String result = future.get();
940 assertSame(TEST_STRING, result);
941 }
942 }
943
944 /**
945 * completed submit of runnable returns successfully
946 */
947 public void testSubmitRunnable() throws Exception {
948 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
949 try (PoolCleaner cleaner = cleaner(e)) {
950 Future<?> future = e.submit(new NoOpRunnable());
951 future.get();
952 assertTrue(future.isDone());
953 }
954 }
955
956 /**
957 * completed submit of (runnable, result) returns result
958 */
959 public void testSubmitRunnable2() throws Exception {
960 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
961 try (PoolCleaner cleaner = cleaner(e)) {
962 Future<String> future = e.submit(new NoOpRunnable(), TEST_STRING);
963 String result = future.get();
964 assertSame(TEST_STRING, result);
965 }
966 }
967
968 /**
969 * invokeAny(null) throws NPE
970 */
971 public void testInvokeAny1() throws Exception {
972 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
973 try (PoolCleaner cleaner = cleaner(e)) {
974 try {
975 e.invokeAny(null);
976 shouldThrow();
977 } catch (NullPointerException success) {}
978 }
979 }
980
981 /**
982 * invokeAny(empty collection) throws IAE
983 */
984 public void testInvokeAny2() throws Exception {
985 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
986 try (PoolCleaner cleaner = cleaner(e)) {
987 try {
988 e.invokeAny(new ArrayList<Callable<String>>());
989 shouldThrow();
990 } catch (IllegalArgumentException success) {}
991 }
992 }
993
994 /**
995 * invokeAny(c) throws NPE if c has null elements
996 */
997 public void testInvokeAny3() throws Exception {
998 CountDownLatch latch = new CountDownLatch(1);
999 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1000 try (PoolCleaner cleaner = cleaner(e)) {
1001 List<Callable<String>> l = new ArrayList<>();
1002 l.add(latchAwaitingStringTask(latch));
1003 l.add(null);
1004 try {
1005 e.invokeAny(l);
1006 shouldThrow();
1007 } catch (NullPointerException success) {}
1008 latch.countDown();
1009 }
1010 }
1011
1012 /**
1013 * invokeAny(c) throws ExecutionException if no task completes
1014 */
1015 public void testInvokeAny4() throws Exception {
1016 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1017 try (PoolCleaner cleaner = cleaner(e)) {
1018 List<Callable<String>> l = new ArrayList<>();
1019 l.add(new NPETask());
1020 try {
1021 e.invokeAny(l);
1022 shouldThrow();
1023 } catch (ExecutionException success) {
1024 assertTrue(success.getCause() instanceof NullPointerException);
1025 }
1026 }
1027 }
1028
1029 /**
1030 * invokeAny(c) returns result of some task
1031 */
1032 public void testInvokeAny5() throws Exception {
1033 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1034 try (PoolCleaner cleaner = cleaner(e)) {
1035 List<Callable<String>> l = new ArrayList<>();
1036 l.add(new StringTask());
1037 l.add(new StringTask());
1038 String result = e.invokeAny(l);
1039 assertSame(TEST_STRING, result);
1040 }
1041 }
1042
1043 /**
1044 * invokeAll(null) throws NPE
1045 */
1046 public void testInvokeAll1() throws Exception {
1047 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1048 try (PoolCleaner cleaner = cleaner(e)) {
1049 try {
1050 e.invokeAll(null);
1051 shouldThrow();
1052 } catch (NullPointerException success) {}
1053 }
1054 }
1055
1056 /**
1057 * invokeAll(empty collection) returns empty collection
1058 */
1059 public void testInvokeAll2() throws Exception {
1060 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1061 try (PoolCleaner cleaner = cleaner(e)) {
1062 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>());
1063 assertTrue(r.isEmpty());
1064 }
1065 }
1066
1067 /**
1068 * invokeAll(c) throws NPE if c has null elements
1069 */
1070 public void testInvokeAll3() throws Exception {
1071 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1072 try (PoolCleaner cleaner = cleaner(e)) {
1073 List<Callable<String>> l = new ArrayList<>();
1074 l.add(new StringTask());
1075 l.add(null);
1076 try {
1077 e.invokeAll(l);
1078 shouldThrow();
1079 } catch (NullPointerException success) {}
1080 }
1081 }
1082
1083 /**
1084 * get of invokeAll(c) throws exception on failed task
1085 */
1086 public void testInvokeAll4() throws Exception {
1087 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1088 try (PoolCleaner cleaner = cleaner(e)) {
1089 List<Callable<String>> l = new ArrayList<>();
1090 l.add(new NPETask());
1091 List<Future<String>> futures = e.invokeAll(l);
1092 assertEquals(1, futures.size());
1093 try {
1094 futures.get(0).get();
1095 shouldThrow();
1096 } catch (ExecutionException success) {
1097 assertTrue(success.getCause() instanceof NullPointerException);
1098 }
1099 }
1100 }
1101
1102 /**
1103 * invokeAll(c) returns results of all completed tasks
1104 */
1105 public void testInvokeAll5() throws Exception {
1106 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1107 try (PoolCleaner cleaner = cleaner(e)) {
1108 List<Callable<String>> l = new ArrayList<>();
1109 l.add(new StringTask());
1110 l.add(new StringTask());
1111 List<Future<String>> futures = e.invokeAll(l);
1112 assertEquals(2, futures.size());
1113 for (Future<String> future : futures)
1114 assertSame(TEST_STRING, future.get());
1115 }
1116 }
1117
1118 /**
1119 * timed invokeAny(null) throws NPE
1120 */
1121 public void testTimedInvokeAny1() throws Exception {
1122 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1123 try (PoolCleaner cleaner = cleaner(e)) {
1124 try {
1125 e.invokeAny(null, MEDIUM_DELAY_MS, MILLISECONDS);
1126 shouldThrow();
1127 } catch (NullPointerException success) {}
1128 }
1129 }
1130
1131 /**
1132 * timed invokeAny(,,null) throws NPE
1133 */
1134 public void testTimedInvokeAnyNullTimeUnit() throws Exception {
1135 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1136 try (PoolCleaner cleaner = cleaner(e)) {
1137 List<Callable<String>> l = new ArrayList<>();
1138 l.add(new StringTask());
1139 try {
1140 e.invokeAny(l, MEDIUM_DELAY_MS, null);
1141 shouldThrow();
1142 } catch (NullPointerException success) {}
1143 }
1144 }
1145
1146 /**
1147 * timed invokeAny(empty collection) throws IAE
1148 */
1149 public void testTimedInvokeAny2() throws Exception {
1150 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1151 try (PoolCleaner cleaner = cleaner(e)) {
1152 try {
1153 e.invokeAny(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1154 shouldThrow();
1155 } catch (IllegalArgumentException success) {}
1156 }
1157 }
1158
1159 /**
1160 * timed invokeAny(c) throws NPE if c has null elements
1161 */
1162 public void testTimedInvokeAny3() throws Exception {
1163 CountDownLatch latch = new CountDownLatch(1);
1164 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1165 try (PoolCleaner cleaner = cleaner(e)) {
1166 List<Callable<String>> l = new ArrayList<>();
1167 l.add(latchAwaitingStringTask(latch));
1168 l.add(null);
1169 try {
1170 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1171 shouldThrow();
1172 } catch (NullPointerException success) {}
1173 latch.countDown();
1174 }
1175 }
1176
1177 /**
1178 * timed invokeAny(c) throws ExecutionException if no task completes
1179 */
1180 public void testTimedInvokeAny4() throws Exception {
1181 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1182 try (PoolCleaner cleaner = cleaner(e)) {
1183 long startTime = System.nanoTime();
1184 List<Callable<String>> l = new ArrayList<>();
1185 l.add(new NPETask());
1186 try {
1187 e.invokeAny(l, LONG_DELAY_MS, MILLISECONDS);
1188 shouldThrow();
1189 } catch (ExecutionException success) {
1190 assertTrue(success.getCause() instanceof NullPointerException);
1191 }
1192 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1193 }
1194 }
1195
1196 /**
1197 * timed invokeAny(c) returns result of some task
1198 */
1199 public void testTimedInvokeAny5() throws Exception {
1200 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1201 try (PoolCleaner cleaner = cleaner(e)) {
1202 long startTime = System.nanoTime();
1203 List<Callable<String>> l = new ArrayList<>();
1204 l.add(new StringTask());
1205 l.add(new StringTask());
1206 String result = e.invokeAny(l, LONG_DELAY_MS, MILLISECONDS);
1207 assertSame(TEST_STRING, result);
1208 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1209 }
1210 }
1211
1212 /**
1213 * timed invokeAll(null) throws NPE
1214 */
1215 public void testTimedInvokeAll1() throws Exception {
1216 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1217 try (PoolCleaner cleaner = cleaner(e)) {
1218 try {
1219 e.invokeAll(null, MEDIUM_DELAY_MS, MILLISECONDS);
1220 shouldThrow();
1221 } catch (NullPointerException success) {}
1222 }
1223 }
1224
1225 /**
1226 * timed invokeAll(,,null) throws NPE
1227 */
1228 public void testTimedInvokeAllNullTimeUnit() throws Exception {
1229 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1230 try (PoolCleaner cleaner = cleaner(e)) {
1231 List<Callable<String>> l = new ArrayList<>();
1232 l.add(new StringTask());
1233 try {
1234 e.invokeAll(l, MEDIUM_DELAY_MS, null);
1235 shouldThrow();
1236 } catch (NullPointerException success) {}
1237 }
1238 }
1239
1240 /**
1241 * timed invokeAll(empty collection) returns empty collection
1242 */
1243 public void testTimedInvokeAll2() throws Exception {
1244 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1245 try (PoolCleaner cleaner = cleaner(e)) {
1246 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>(),
1247 MEDIUM_DELAY_MS, MILLISECONDS);
1248 assertTrue(r.isEmpty());
1249 }
1250 }
1251
1252 /**
1253 * timed invokeAll(c) throws NPE if c has null elements
1254 */
1255 public void testTimedInvokeAll3() throws Exception {
1256 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1257 try (PoolCleaner cleaner = cleaner(e)) {
1258 List<Callable<String>> l = new ArrayList<>();
1259 l.add(new StringTask());
1260 l.add(null);
1261 try {
1262 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1263 shouldThrow();
1264 } catch (NullPointerException success) {}
1265 }
1266 }
1267
1268 /**
1269 * get of element of invokeAll(c) throws exception on failed task
1270 */
1271 public void testTimedInvokeAll4() throws Exception {
1272 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1273 try (PoolCleaner cleaner = cleaner(e)) {
1274 List<Callable<String>> l = new ArrayList<>();
1275 l.add(new NPETask());
1276 List<Future<String>> futures =
1277 e.invokeAll(l, LONG_DELAY_MS, MILLISECONDS);
1278 assertEquals(1, futures.size());
1279 try {
1280 futures.get(0).get();
1281 shouldThrow();
1282 } catch (ExecutionException success) {
1283 assertTrue(success.getCause() instanceof NullPointerException);
1284 }
1285 }
1286 }
1287
1288 /**
1289 * timed invokeAll(c) returns results of all completed tasks
1290 */
1291 public void testTimedInvokeAll5() throws Exception {
1292 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1293 try (PoolCleaner cleaner = cleaner(e)) {
1294 List<Callable<String>> l = new ArrayList<>();
1295 l.add(new StringTask());
1296 l.add(new StringTask());
1297 List<Future<String>> futures =
1298 e.invokeAll(l, LONG_DELAY_MS, MILLISECONDS);
1299 assertEquals(2, futures.size());
1300 for (Future<String> future : futures)
1301 assertSame(TEST_STRING, future.get());
1302 }
1303 }
1304
1305 /**
1306 * timed invokeAll(c) cancels tasks not completed by timeout
1307 */
1308 public void testTimedInvokeAll6() throws Exception {
1309 for (long timeout = timeoutMillis();;) {
1310 final CountDownLatch done = new CountDownLatch(1);
1311 final Callable<String> waiter = new CheckedCallable<String>() {
1312 public String realCall() {
1313 try { done.await(LONG_DELAY_MS, MILLISECONDS); }
1314 catch (InterruptedException ok) {}
1315 return "1"; }};
1316 final ExecutorService p = new ScheduledThreadPoolExecutor(2);
1317 try (PoolCleaner cleaner = cleaner(p, done)) {
1318 List<Callable<String>> tasks = new ArrayList<>();
1319 tasks.add(new StringTask("0"));
1320 tasks.add(waiter);
1321 tasks.add(new StringTask("2"));
1322 long startTime = System.nanoTime();
1323 List<Future<String>> futures =
1324 p.invokeAll(tasks, timeout, MILLISECONDS);
1325 assertEquals(tasks.size(), futures.size());
1326 assertTrue(millisElapsedSince(startTime) >= timeout);
1327 for (Future future : futures)
1328 assertTrue(future.isDone());
1329 assertTrue(futures.get(1).isCancelled());
1330 try {
1331 assertEquals("0", futures.get(0).get());
1332 assertEquals("2", futures.get(2).get());
1333 break;
1334 } catch (CancellationException retryWithLongerTimeout) {
1335 timeout *= 2;
1336 if (timeout >= LONG_DELAY_MS / 2)
1337 fail("expected exactly one task to be cancelled");
1338 }
1339 }
1340 }
1341 }
1342
1343 /**
1344 * A fixed delay task with overflowing period should not prevent a
1345 * one-shot task from executing.
1346 * https://bugs.openjdk.java.net/browse/JDK-8051859
1347 */
1348 public void testScheduleWithFixedDelay_overflow() throws Exception {
1349 final CountDownLatch delayedDone = new CountDownLatch(1);
1350 final CountDownLatch immediateDone = new CountDownLatch(1);
1351 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
1352 try (PoolCleaner cleaner = cleaner(p)) {
1353 final Runnable immediate = new Runnable() { public void run() {
1354 immediateDone.countDown();
1355 }};
1356 final Runnable delayed = new Runnable() { public void run() {
1357 delayedDone.countDown();
1358 p.submit(immediate);
1359 }};
1360 p.scheduleWithFixedDelay(delayed, 0L, Long.MAX_VALUE, SECONDS);
1361 await(delayedDone);
1362 await(immediateDone);
1363 }
1364 }
1365
1366 }