ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/ScheduledExecutorTest.java
Revision: 1.93
Committed: Mon May 29 19:15:03 2017 UTC (6 years, 11 months ago) by jsr166
Branch: MAIN
Changes since 1.92: +3 -3 lines
Log Message:
more timeout handling rework; remove most uses of SMALL_DELAY_MS; randomize timeouts and TimeUnits; remove hardcoded 5 second timeouts

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include Andrew Wright, Jeffrey Hayes,
6 * Pat Fisher, Mike Judd.
7 */
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.NANOSECONDS;
11 import static java.util.concurrent.TimeUnit.SECONDS;
12
13 import java.util.ArrayList;
14 import java.util.HashSet;
15 import java.util.List;
16 import java.util.concurrent.BlockingQueue;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.CancellationException;
19 import java.util.concurrent.CountDownLatch;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.RejectedExecutionException;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.ScheduledThreadPoolExecutor;
26 import java.util.concurrent.ThreadFactory;
27 import java.util.concurrent.ThreadLocalRandom;
28 import java.util.concurrent.ThreadPoolExecutor;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import java.util.concurrent.atomic.AtomicLong;
32 import java.util.stream.Stream;
33
34 import junit.framework.Test;
35 import junit.framework.TestSuite;
36
37 public class ScheduledExecutorTest extends JSR166TestCase {
38 public static void main(String[] args) {
39 main(suite(), args);
40 }
41 public static Test suite() {
42 return new TestSuite(ScheduledExecutorTest.class);
43 }
44
45 /**
46 * execute successfully executes a runnable
47 */
48 public void testExecute() throws InterruptedException {
49 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
50 try (PoolCleaner cleaner = cleaner(p)) {
51 final CountDownLatch done = new CountDownLatch(1);
52 final Runnable task = new CheckedRunnable() {
53 public void realRun() { done.countDown(); }};
54 p.execute(task);
55 await(done);
56 }
57 }
58
59 /**
60 * delayed schedule of callable successfully executes after delay
61 */
62 public void testSchedule1() throws Exception {
63 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
64 try (PoolCleaner cleaner = cleaner(p)) {
65 final long startTime = System.nanoTime();
66 final CountDownLatch done = new CountDownLatch(1);
67 Callable task = new CheckedCallable<Boolean>() {
68 public Boolean realCall() {
69 done.countDown();
70 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
71 return Boolean.TRUE;
72 }};
73 Future f = p.schedule(task, timeoutMillis(), MILLISECONDS);
74 assertSame(Boolean.TRUE, f.get());
75 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
76 assertEquals(0L, done.getCount());
77 }
78 }
79
80 /**
81 * delayed schedule of runnable successfully executes after delay
82 */
83 public void testSchedule3() throws Exception {
84 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
85 try (PoolCleaner cleaner = cleaner(p)) {
86 final long startTime = System.nanoTime();
87 final CountDownLatch done = new CountDownLatch(1);
88 Runnable task = new CheckedRunnable() {
89 public void realRun() {
90 done.countDown();
91 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
92 }};
93 Future f = p.schedule(task, timeoutMillis(), MILLISECONDS);
94 await(done);
95 assertNull(f.get(LONG_DELAY_MS, MILLISECONDS));
96 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
97 }
98 }
99
100 /**
101 * scheduleAtFixedRate executes runnable after given initial delay
102 */
103 public void testSchedule4() throws Exception {
104 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
105 try (PoolCleaner cleaner = cleaner(p)) {
106 final long startTime = System.nanoTime();
107 final CountDownLatch done = new CountDownLatch(1);
108 Runnable task = new CheckedRunnable() {
109 public void realRun() {
110 done.countDown();
111 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
112 }};
113 ScheduledFuture f =
114 p.scheduleAtFixedRate(task, timeoutMillis(),
115 LONG_DELAY_MS, MILLISECONDS);
116 await(done);
117 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
118 f.cancel(true);
119 }
120 }
121
122 /**
123 * scheduleWithFixedDelay executes runnable after given initial delay
124 */
125 public void testSchedule5() throws Exception {
126 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
127 try (PoolCleaner cleaner = cleaner(p)) {
128 final long startTime = System.nanoTime();
129 final CountDownLatch done = new CountDownLatch(1);
130 Runnable task = new CheckedRunnable() {
131 public void realRun() {
132 done.countDown();
133 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
134 }};
135 ScheduledFuture f =
136 p.scheduleWithFixedDelay(task, timeoutMillis(),
137 LONG_DELAY_MS, MILLISECONDS);
138 await(done);
139 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
140 f.cancel(true);
141 }
142 }
143
144 static class RunnableCounter implements Runnable {
145 AtomicInteger count = new AtomicInteger(0);
146 public void run() { count.getAndIncrement(); }
147 }
148
149 /**
150 * scheduleAtFixedRate executes series of tasks at given rate.
151 * Eventually, it must hold that:
152 * cycles - 1 <= elapsedMillis/delay < cycles
153 */
154 public void testFixedRateSequence() throws InterruptedException {
155 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
156 try (PoolCleaner cleaner = cleaner(p)) {
157 for (int delay = 1; delay <= LONG_DELAY_MS; delay *= 3) {
158 final long startTime = System.nanoTime();
159 final int cycles = 8;
160 final CountDownLatch done = new CountDownLatch(cycles);
161 final Runnable task = new CheckedRunnable() {
162 public void realRun() { done.countDown(); }};
163 final ScheduledFuture periodicTask =
164 p.scheduleAtFixedRate(task, 0, delay, MILLISECONDS);
165 final int totalDelayMillis = (cycles - 1) * delay;
166 await(done, totalDelayMillis + LONG_DELAY_MS);
167 periodicTask.cancel(true);
168 final long elapsedMillis = millisElapsedSince(startTime);
169 assertTrue(elapsedMillis >= totalDelayMillis);
170 if (elapsedMillis <= cycles * delay)
171 return;
172 // else retry with longer delay
173 }
174 fail("unexpected execution rate");
175 }
176 }
177
178 /**
179 * scheduleWithFixedDelay executes series of tasks with given period.
180 * Eventually, it must hold that each task starts at least delay and at
181 * most 2 * delay after the termination of the previous task.
182 */
183 public void testFixedDelaySequence() throws InterruptedException {
184 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
185 try (PoolCleaner cleaner = cleaner(p)) {
186 for (int delay = 1; delay <= LONG_DELAY_MS; delay *= 3) {
187 final long startTime = System.nanoTime();
188 final AtomicLong previous = new AtomicLong(startTime);
189 final AtomicBoolean tryLongerDelay = new AtomicBoolean(false);
190 final int cycles = 8;
191 final CountDownLatch done = new CountDownLatch(cycles);
192 final int d = delay;
193 final Runnable task = new CheckedRunnable() {
194 public void realRun() {
195 long now = System.nanoTime();
196 long elapsedMillis
197 = NANOSECONDS.toMillis(now - previous.get());
198 if (done.getCount() == cycles) { // first execution
199 if (elapsedMillis >= d)
200 tryLongerDelay.set(true);
201 } else {
202 assertTrue(elapsedMillis >= d);
203 if (elapsedMillis >= 2 * d)
204 tryLongerDelay.set(true);
205 }
206 previous.set(now);
207 done.countDown();
208 }};
209 final ScheduledFuture periodicTask =
210 p.scheduleWithFixedDelay(task, 0, delay, MILLISECONDS);
211 final int totalDelayMillis = (cycles - 1) * delay;
212 await(done, totalDelayMillis + cycles * LONG_DELAY_MS);
213 periodicTask.cancel(true);
214 final long elapsedMillis = millisElapsedSince(startTime);
215 assertTrue(elapsedMillis >= totalDelayMillis);
216 if (!tryLongerDelay.get())
217 return;
218 // else retry with longer delay
219 }
220 fail("unexpected execution rate");
221 }
222 }
223
224 /**
225 * execute(null) throws NPE
226 */
227 public void testExecuteNull() throws InterruptedException {
228 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
229 try (PoolCleaner cleaner = cleaner(p)) {
230 try {
231 p.execute(null);
232 shouldThrow();
233 } catch (NullPointerException success) {}
234 }
235 }
236
237 /**
238 * schedule(null) throws NPE
239 */
240 public void testScheduleNull() throws InterruptedException {
241 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
242 try (PoolCleaner cleaner = cleaner(p)) {
243 try {
244 Future f = p.schedule((Callable)null,
245 randomTimeout(), randomTimeUnit());
246 shouldThrow();
247 } catch (NullPointerException success) {}
248 }
249 }
250
251 /**
252 * execute throws RejectedExecutionException if shutdown
253 */
254 public void testSchedule1_RejectedExecutionException() throws InterruptedException {
255 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
256 try (PoolCleaner cleaner = cleaner(p)) {
257 try {
258 p.shutdown();
259 p.schedule(new NoOpRunnable(),
260 MEDIUM_DELAY_MS, MILLISECONDS);
261 shouldThrow();
262 } catch (RejectedExecutionException success) {
263 } catch (SecurityException ok) {}
264 }
265 }
266
267 /**
268 * schedule throws RejectedExecutionException if shutdown
269 */
270 public void testSchedule2_RejectedExecutionException() throws InterruptedException {
271 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
272 try (PoolCleaner cleaner = cleaner(p)) {
273 try {
274 p.shutdown();
275 p.schedule(new NoOpCallable(),
276 MEDIUM_DELAY_MS, MILLISECONDS);
277 shouldThrow();
278 } catch (RejectedExecutionException success) {
279 } catch (SecurityException ok) {}
280 }
281 }
282
283 /**
284 * schedule callable throws RejectedExecutionException if shutdown
285 */
286 public void testSchedule3_RejectedExecutionException() throws InterruptedException {
287 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
288 try (PoolCleaner cleaner = cleaner(p)) {
289 try {
290 p.shutdown();
291 p.schedule(new NoOpCallable(),
292 MEDIUM_DELAY_MS, MILLISECONDS);
293 shouldThrow();
294 } catch (RejectedExecutionException success) {
295 } catch (SecurityException ok) {}
296 }
297 }
298
299 /**
300 * scheduleAtFixedRate throws RejectedExecutionException if shutdown
301 */
302 public void testScheduleAtFixedRate1_RejectedExecutionException() throws InterruptedException {
303 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
304 try (PoolCleaner cleaner = cleaner(p)) {
305 try {
306 p.shutdown();
307 p.scheduleAtFixedRate(new NoOpRunnable(),
308 MEDIUM_DELAY_MS, MEDIUM_DELAY_MS, MILLISECONDS);
309 shouldThrow();
310 } catch (RejectedExecutionException success) {
311 } catch (SecurityException ok) {}
312 }
313 }
314
315 /**
316 * scheduleWithFixedDelay throws RejectedExecutionException if shutdown
317 */
318 public void testScheduleWithFixedDelay1_RejectedExecutionException() throws InterruptedException {
319 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
320 try (PoolCleaner cleaner = cleaner(p)) {
321 try {
322 p.shutdown();
323 p.scheduleWithFixedDelay(new NoOpRunnable(),
324 MEDIUM_DELAY_MS, MEDIUM_DELAY_MS, MILLISECONDS);
325 shouldThrow();
326 } catch (RejectedExecutionException success) {
327 } catch (SecurityException ok) {}
328 }
329 }
330
331 /**
332 * getActiveCount increases but doesn't overestimate, when a
333 * thread becomes active
334 */
335 public void testGetActiveCount() throws InterruptedException {
336 final CountDownLatch done = new CountDownLatch(1);
337 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(2);
338 try (PoolCleaner cleaner = cleaner(p, done)) {
339 final CountDownLatch threadStarted = new CountDownLatch(1);
340 assertEquals(0, p.getActiveCount());
341 p.execute(new CheckedRunnable() {
342 public void realRun() throws InterruptedException {
343 threadStarted.countDown();
344 assertEquals(1, p.getActiveCount());
345 await(done);
346 }});
347 await(threadStarted);
348 assertEquals(1, p.getActiveCount());
349 }
350 }
351
352 /**
353 * getCompletedTaskCount increases, but doesn't overestimate,
354 * when tasks complete
355 */
356 public void testGetCompletedTaskCount() throws InterruptedException {
357 final ThreadPoolExecutor p = new ScheduledThreadPoolExecutor(2);
358 try (PoolCleaner cleaner = cleaner(p)) {
359 final CountDownLatch threadStarted = new CountDownLatch(1);
360 final CountDownLatch threadProceed = new CountDownLatch(1);
361 final CountDownLatch threadDone = new CountDownLatch(1);
362 assertEquals(0, p.getCompletedTaskCount());
363 p.execute(new CheckedRunnable() {
364 public void realRun() throws InterruptedException {
365 threadStarted.countDown();
366 assertEquals(0, p.getCompletedTaskCount());
367 await(threadProceed);
368 threadDone.countDown();
369 }});
370 await(threadStarted);
371 assertEquals(0, p.getCompletedTaskCount());
372 threadProceed.countDown();
373 await(threadDone);
374 long startTime = System.nanoTime();
375 while (p.getCompletedTaskCount() != 1) {
376 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
377 fail("timed out");
378 Thread.yield();
379 }
380 }
381 }
382
383 /**
384 * getCorePoolSize returns size given in constructor if not otherwise set
385 */
386 public void testGetCorePoolSize() throws InterruptedException {
387 ThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
388 try (PoolCleaner cleaner = cleaner(p)) {
389 assertEquals(1, p.getCorePoolSize());
390 }
391 }
392
393 /**
394 * getLargestPoolSize increases, but doesn't overestimate, when
395 * multiple threads active
396 */
397 public void testGetLargestPoolSize() throws InterruptedException {
398 final int THREADS = 3;
399 final ThreadPoolExecutor p = new ScheduledThreadPoolExecutor(THREADS);
400 final CountDownLatch threadsStarted = new CountDownLatch(THREADS);
401 final CountDownLatch done = new CountDownLatch(1);
402 try (PoolCleaner cleaner = cleaner(p, done)) {
403 assertEquals(0, p.getLargestPoolSize());
404 for (int i = 0; i < THREADS; i++)
405 p.execute(new CheckedRunnable() {
406 public void realRun() throws InterruptedException {
407 threadsStarted.countDown();
408 await(done);
409 assertEquals(THREADS, p.getLargestPoolSize());
410 }});
411 await(threadsStarted);
412 assertEquals(THREADS, p.getLargestPoolSize());
413 }
414 assertEquals(THREADS, p.getLargestPoolSize());
415 }
416
417 /**
418 * getPoolSize increases, but doesn't overestimate, when threads
419 * become active
420 */
421 public void testGetPoolSize() throws InterruptedException {
422 final ThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
423 final CountDownLatch threadStarted = new CountDownLatch(1);
424 final CountDownLatch done = new CountDownLatch(1);
425 try (PoolCleaner cleaner = cleaner(p, done)) {
426 assertEquals(0, p.getPoolSize());
427 p.execute(new CheckedRunnable() {
428 public void realRun() throws InterruptedException {
429 threadStarted.countDown();
430 assertEquals(1, p.getPoolSize());
431 await(done);
432 }});
433 await(threadStarted);
434 assertEquals(1, p.getPoolSize());
435 }
436 }
437
438 /**
439 * getTaskCount increases, but doesn't overestimate, when tasks
440 * submitted
441 */
442 public void testGetTaskCount() throws InterruptedException {
443 final int TASKS = 3;
444 final CountDownLatch done = new CountDownLatch(1);
445 final ThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
446 try (PoolCleaner cleaner = cleaner(p, done)) {
447 final CountDownLatch threadStarted = new CountDownLatch(1);
448 assertEquals(0, p.getTaskCount());
449 assertEquals(0, p.getCompletedTaskCount());
450 p.execute(new CheckedRunnable() {
451 public void realRun() throws InterruptedException {
452 threadStarted.countDown();
453 await(done);
454 }});
455 await(threadStarted);
456 assertEquals(1, p.getTaskCount());
457 assertEquals(0, p.getCompletedTaskCount());
458 for (int i = 0; i < TASKS; i++) {
459 assertEquals(1 + i, p.getTaskCount());
460 p.execute(new CheckedRunnable() {
461 public void realRun() throws InterruptedException {
462 threadStarted.countDown();
463 assertEquals(1 + TASKS, p.getTaskCount());
464 await(done);
465 }});
466 }
467 assertEquals(1 + TASKS, p.getTaskCount());
468 assertEquals(0, p.getCompletedTaskCount());
469 }
470 assertEquals(1 + TASKS, p.getTaskCount());
471 assertEquals(1 + TASKS, p.getCompletedTaskCount());
472 }
473
474 /**
475 * getThreadFactory returns factory in constructor if not set
476 */
477 public void testGetThreadFactory() throws InterruptedException {
478 final ThreadFactory threadFactory = new SimpleThreadFactory();
479 final ScheduledThreadPoolExecutor p =
480 new ScheduledThreadPoolExecutor(1, threadFactory);
481 try (PoolCleaner cleaner = cleaner(p)) {
482 assertSame(threadFactory, p.getThreadFactory());
483 }
484 }
485
486 /**
487 * setThreadFactory sets the thread factory returned by getThreadFactory
488 */
489 public void testSetThreadFactory() throws InterruptedException {
490 ThreadFactory threadFactory = new SimpleThreadFactory();
491 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
492 try (PoolCleaner cleaner = cleaner(p)) {
493 p.setThreadFactory(threadFactory);
494 assertSame(threadFactory, p.getThreadFactory());
495 }
496 }
497
498 /**
499 * setThreadFactory(null) throws NPE
500 */
501 public void testSetThreadFactoryNull() throws InterruptedException {
502 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
503 try (PoolCleaner cleaner = cleaner(p)) {
504 try {
505 p.setThreadFactory(null);
506 shouldThrow();
507 } catch (NullPointerException success) {}
508 }
509 }
510
511 /**
512 * The default rejected execution handler is AbortPolicy.
513 */
514 public void testDefaultRejectedExecutionHandler() {
515 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
516 try (PoolCleaner cleaner = cleaner(p)) {
517 assertTrue(p.getRejectedExecutionHandler()
518 instanceof ThreadPoolExecutor.AbortPolicy);
519 }
520 }
521
522 /**
523 * isShutdown is false before shutdown, true after
524 */
525 public void testIsShutdown() {
526 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
527 assertFalse(p.isShutdown());
528 try (PoolCleaner cleaner = cleaner(p)) {
529 try {
530 p.shutdown();
531 assertTrue(p.isShutdown());
532 } catch (SecurityException ok) {}
533 }
534 }
535
536 /**
537 * isTerminated is false before termination, true after
538 */
539 public void testIsTerminated() throws InterruptedException {
540 final ThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
541 try (PoolCleaner cleaner = cleaner(p)) {
542 final CountDownLatch threadStarted = new CountDownLatch(1);
543 final CountDownLatch done = new CountDownLatch(1);
544 assertFalse(p.isTerminated());
545 p.execute(new CheckedRunnable() {
546 public void realRun() throws InterruptedException {
547 assertFalse(p.isTerminated());
548 threadStarted.countDown();
549 await(done);
550 }});
551 await(threadStarted);
552 assertFalse(p.isTerminating());
553 done.countDown();
554 try { p.shutdown(); } catch (SecurityException ok) { return; }
555 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
556 assertTrue(p.isTerminated());
557 }
558 }
559
560 /**
561 * isTerminating is not true when running or when terminated
562 */
563 public void testIsTerminating() throws InterruptedException {
564 final ThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
565 final CountDownLatch threadStarted = new CountDownLatch(1);
566 final CountDownLatch done = new CountDownLatch(1);
567 try (PoolCleaner cleaner = cleaner(p)) {
568 assertFalse(p.isTerminating());
569 p.execute(new CheckedRunnable() {
570 public void realRun() throws InterruptedException {
571 assertFalse(p.isTerminating());
572 threadStarted.countDown();
573 await(done);
574 }});
575 await(threadStarted);
576 assertFalse(p.isTerminating());
577 done.countDown();
578 try { p.shutdown(); } catch (SecurityException ok) { return; }
579 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
580 assertTrue(p.isTerminated());
581 assertFalse(p.isTerminating());
582 }
583 }
584
585 /**
586 * getQueue returns the work queue, which contains queued tasks
587 */
588 public void testGetQueue() throws InterruptedException {
589 final CountDownLatch done = new CountDownLatch(1);
590 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
591 try (PoolCleaner cleaner = cleaner(p, done)) {
592 final CountDownLatch threadStarted = new CountDownLatch(1);
593 ScheduledFuture[] tasks = new ScheduledFuture[5];
594 for (int i = 0; i < tasks.length; i++) {
595 Runnable r = new CheckedRunnable() {
596 public void realRun() throws InterruptedException {
597 threadStarted.countDown();
598 await(done);
599 }};
600 tasks[i] = p.schedule(r, 1, MILLISECONDS);
601 }
602 await(threadStarted);
603 BlockingQueue<Runnable> q = p.getQueue();
604 assertTrue(q.contains(tasks[tasks.length - 1]));
605 assertFalse(q.contains(tasks[0]));
606 }
607 }
608
609 /**
610 * remove(task) removes queued task, and fails to remove active task
611 */
612 public void testRemove() throws InterruptedException {
613 final CountDownLatch done = new CountDownLatch(1);
614 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
615 try (PoolCleaner cleaner = cleaner(p, done)) {
616 ScheduledFuture[] tasks = new ScheduledFuture[5];
617 final CountDownLatch threadStarted = new CountDownLatch(1);
618 for (int i = 0; i < tasks.length; i++) {
619 Runnable r = new CheckedRunnable() {
620 public void realRun() throws InterruptedException {
621 threadStarted.countDown();
622 await(done);
623 }};
624 tasks[i] = p.schedule(r, 1, MILLISECONDS);
625 }
626 await(threadStarted);
627 BlockingQueue<Runnable> q = p.getQueue();
628 assertFalse(p.remove((Runnable)tasks[0]));
629 assertTrue(q.contains((Runnable)tasks[4]));
630 assertTrue(q.contains((Runnable)tasks[3]));
631 assertTrue(p.remove((Runnable)tasks[4]));
632 assertFalse(p.remove((Runnable)tasks[4]));
633 assertFalse(q.contains((Runnable)tasks[4]));
634 assertTrue(q.contains((Runnable)tasks[3]));
635 assertTrue(p.remove((Runnable)tasks[3]));
636 assertFalse(q.contains((Runnable)tasks[3]));
637 }
638 }
639
640 /**
641 * purge eventually removes cancelled tasks from the queue
642 */
643 public void testPurge() throws InterruptedException {
644 final ScheduledFuture[] tasks = new ScheduledFuture[5];
645 final Runnable releaser = new Runnable() { public void run() {
646 for (ScheduledFuture task : tasks)
647 if (task != null) task.cancel(true); }};
648 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
649 try (PoolCleaner cleaner = cleaner(p, releaser)) {
650 for (int i = 0; i < tasks.length; i++)
651 tasks[i] = p.schedule(new SmallPossiblyInterruptedRunnable(),
652 LONG_DELAY_MS, MILLISECONDS);
653 int max = tasks.length;
654 if (tasks[4].cancel(true)) --max;
655 if (tasks[3].cancel(true)) --max;
656 // There must eventually be an interference-free point at
657 // which purge will not fail. (At worst, when queue is empty.)
658 long startTime = System.nanoTime();
659 do {
660 p.purge();
661 long count = p.getTaskCount();
662 if (count == max)
663 return;
664 } while (millisElapsedSince(startTime) < LONG_DELAY_MS);
665 fail("Purge failed to remove cancelled tasks");
666 }
667 }
668
669 /**
670 * shutdownNow returns a list containing tasks that were not run,
671 * and those tasks are drained from the queue
672 */
673 public void testShutdownNow() throws InterruptedException {
674 final int poolSize = 2;
675 final int count = 5;
676 final AtomicInteger ran = new AtomicInteger(0);
677 final ScheduledThreadPoolExecutor p =
678 new ScheduledThreadPoolExecutor(poolSize);
679 final CountDownLatch threadsStarted = new CountDownLatch(poolSize);
680 Runnable waiter = new CheckedRunnable() { public void realRun() {
681 threadsStarted.countDown();
682 try {
683 MILLISECONDS.sleep(2 * LONG_DELAY_MS);
684 } catch (InterruptedException success) {}
685 ran.getAndIncrement();
686 }};
687 for (int i = 0; i < count; i++)
688 p.execute(waiter);
689 await(threadsStarted);
690 assertEquals(poolSize, p.getActiveCount());
691 assertEquals(0, p.getCompletedTaskCount());
692 final List<Runnable> queuedTasks;
693 try {
694 queuedTasks = p.shutdownNow();
695 } catch (SecurityException ok) {
696 return; // Allowed in case test doesn't have privs
697 }
698 assertTrue(p.isShutdown());
699 assertTrue(p.getQueue().isEmpty());
700 assertEquals(count - poolSize, queuedTasks.size());
701 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
702 assertTrue(p.isTerminated());
703 assertEquals(poolSize, ran.get());
704 assertEquals(poolSize, p.getCompletedTaskCount());
705 }
706
707 /**
708 * shutdownNow returns a list containing tasks that were not run,
709 * and those tasks are drained from the queue
710 */
711 public void testShutdownNow_delayedTasks() throws InterruptedException {
712 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
713 List<ScheduledFuture> tasks = new ArrayList<>();
714 for (int i = 0; i < 3; i++) {
715 Runnable r = new NoOpRunnable();
716 tasks.add(p.schedule(r, 9, SECONDS));
717 tasks.add(p.scheduleAtFixedRate(r, 9, 9, SECONDS));
718 tasks.add(p.scheduleWithFixedDelay(r, 9, 9, SECONDS));
719 }
720 if (testImplementationDetails)
721 assertEquals(new HashSet(tasks), new HashSet(p.getQueue()));
722 final List<Runnable> queuedTasks;
723 try {
724 queuedTasks = p.shutdownNow();
725 } catch (SecurityException ok) {
726 return; // Allowed in case test doesn't have privs
727 }
728 assertTrue(p.isShutdown());
729 assertTrue(p.getQueue().isEmpty());
730 if (testImplementationDetails)
731 assertEquals(new HashSet(tasks), new HashSet(queuedTasks));
732 assertEquals(tasks.size(), queuedTasks.size());
733 for (ScheduledFuture task : tasks) {
734 assertFalse(task.isDone());
735 assertFalse(task.isCancelled());
736 }
737 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
738 assertTrue(p.isTerminated());
739 }
740
741 /**
742 * By default, periodic tasks are cancelled at shutdown.
743 * By default, delayed tasks keep running after shutdown.
744 * Check that changing the default values work:
745 * - setExecuteExistingDelayedTasksAfterShutdownPolicy
746 * - setContinueExistingPeriodicTasksAfterShutdownPolicy
747 */
748 public void testShutdown_cancellation() throws Exception {
749 final int poolSize = 4;
750 final ScheduledThreadPoolExecutor p
751 = new ScheduledThreadPoolExecutor(poolSize);
752 final BlockingQueue<Runnable> q = p.getQueue();
753 final ThreadLocalRandom rnd = ThreadLocalRandom.current();
754 final long delay = rnd.nextInt(2);
755 final int rounds = rnd.nextInt(1, 3);
756 final boolean effectiveDelayedPolicy;
757 final boolean effectivePeriodicPolicy;
758 final boolean effectiveRemovePolicy;
759
760 if (rnd.nextBoolean())
761 p.setExecuteExistingDelayedTasksAfterShutdownPolicy(
762 effectiveDelayedPolicy = rnd.nextBoolean());
763 else
764 effectiveDelayedPolicy = true;
765 assertEquals(effectiveDelayedPolicy,
766 p.getExecuteExistingDelayedTasksAfterShutdownPolicy());
767
768 if (rnd.nextBoolean())
769 p.setContinueExistingPeriodicTasksAfterShutdownPolicy(
770 effectivePeriodicPolicy = rnd.nextBoolean());
771 else
772 effectivePeriodicPolicy = false;
773 assertEquals(effectivePeriodicPolicy,
774 p.getContinueExistingPeriodicTasksAfterShutdownPolicy());
775
776 if (rnd.nextBoolean())
777 p.setRemoveOnCancelPolicy(
778 effectiveRemovePolicy = rnd.nextBoolean());
779 else
780 effectiveRemovePolicy = false;
781 assertEquals(effectiveRemovePolicy,
782 p.getRemoveOnCancelPolicy());
783
784 final boolean periodicTasksContinue = effectivePeriodicPolicy && rnd.nextBoolean();
785
786 // Strategy: Wedge the pool with one wave of "blocker" tasks,
787 // then add a second wave that waits in the queue until unblocked.
788 final AtomicInteger ran = new AtomicInteger(0);
789 final CountDownLatch poolBlocked = new CountDownLatch(poolSize);
790 final CountDownLatch unblock = new CountDownLatch(1);
791 final RuntimeException exception = new RuntimeException();
792
793 class Task implements Runnable {
794 public void run() {
795 try {
796 ran.getAndIncrement();
797 poolBlocked.countDown();
798 await(unblock);
799 } catch (Throwable fail) { threadUnexpectedException(fail); }
800 }
801 }
802
803 class PeriodicTask extends Task {
804 PeriodicTask(int rounds) { this.rounds = rounds; }
805 int rounds;
806 public void run() {
807 if (--rounds == 0) super.run();
808 // throw exception to surely terminate this periodic task,
809 // but in a separate execution and in a detectable way.
810 if (rounds == -1) throw exception;
811 }
812 }
813
814 Runnable task = new Task();
815
816 List<Future<?>> immediates = new ArrayList<>();
817 List<Future<?>> delayeds = new ArrayList<>();
818 List<Future<?>> periodics = new ArrayList<>();
819
820 immediates.add(p.submit(task));
821 delayeds.add(p.schedule(task, delay, MILLISECONDS));
822 periodics.add(p.scheduleAtFixedRate(
823 new PeriodicTask(rounds), delay, 1, MILLISECONDS));
824 periodics.add(p.scheduleWithFixedDelay(
825 new PeriodicTask(rounds), delay, 1, MILLISECONDS));
826
827 await(poolBlocked);
828
829 assertEquals(poolSize, ran.get());
830 assertEquals(poolSize, p.getActiveCount());
831 assertTrue(q.isEmpty());
832
833 // Add second wave of tasks.
834 immediates.add(p.submit(task));
835 delayeds.add(p.schedule(task, effectiveDelayedPolicy ? delay : LONG_DELAY_MS, MILLISECONDS));
836 periodics.add(p.scheduleAtFixedRate(
837 new PeriodicTask(rounds), delay, 1, MILLISECONDS));
838 periodics.add(p.scheduleWithFixedDelay(
839 new PeriodicTask(rounds), delay, 1, MILLISECONDS));
840
841 assertEquals(poolSize, q.size());
842 assertEquals(poolSize, ran.get());
843
844 immediates.forEach(
845 f -> assertTrue(((ScheduledFuture)f).getDelay(NANOSECONDS) <= 0L));
846
847 Stream.of(immediates, delayeds, periodics).flatMap(c -> c.stream())
848 .forEach(f -> assertFalse(f.isDone()));
849
850 try { p.shutdown(); } catch (SecurityException ok) { return; }
851 assertTrue(p.isShutdown());
852 assertTrue(p.isTerminating());
853 assertFalse(p.isTerminated());
854
855 if (rnd.nextBoolean())
856 assertThrows(
857 RejectedExecutionException.class,
858 () -> p.submit(task),
859 () -> p.schedule(task, 1, SECONDS),
860 () -> p.scheduleAtFixedRate(
861 new PeriodicTask(1), 1, 1, SECONDS),
862 () -> p.scheduleWithFixedDelay(
863 new PeriodicTask(2), 1, 1, SECONDS));
864
865 assertTrue(q.contains(immediates.get(1)));
866 assertTrue(!effectiveDelayedPolicy
867 ^ q.contains(delayeds.get(1)));
868 assertTrue(!effectivePeriodicPolicy
869 ^ q.containsAll(periodics.subList(2, 4)));
870
871 immediates.forEach(f -> assertFalse(f.isDone()));
872
873 assertFalse(delayeds.get(0).isDone());
874 if (effectiveDelayedPolicy)
875 assertFalse(delayeds.get(1).isDone());
876 else
877 assertTrue(delayeds.get(1).isCancelled());
878
879 if (effectivePeriodicPolicy)
880 periodics.forEach(
881 f -> {
882 assertFalse(f.isDone());
883 if (!periodicTasksContinue) {
884 assertTrue(f.cancel(false));
885 assertTrue(f.isCancelled());
886 }
887 });
888 else {
889 periodics.subList(0, 2).forEach(f -> assertFalse(f.isDone()));
890 periodics.subList(2, 4).forEach(f -> assertTrue(f.isCancelled()));
891 }
892
893 unblock.countDown(); // Release all pool threads
894
895 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
896 assertFalse(p.isTerminating());
897 assertTrue(p.isTerminated());
898
899 assertTrue(q.isEmpty());
900
901 Stream.of(immediates, delayeds, periodics).flatMap(c -> c.stream())
902 .forEach(f -> assertTrue(f.isDone()));
903
904 for (Future<?> f : immediates) assertNull(f.get());
905
906 assertNull(delayeds.get(0).get());
907 if (effectiveDelayedPolicy)
908 assertNull(delayeds.get(1).get());
909 else
910 assertTrue(delayeds.get(1).isCancelled());
911
912 if (periodicTasksContinue)
913 periodics.forEach(
914 f -> {
915 try { f.get(); }
916 catch (ExecutionException success) {
917 assertSame(exception, success.getCause());
918 }
919 catch (Throwable fail) { threadUnexpectedException(fail); }
920 });
921 else
922 periodics.forEach(f -> assertTrue(f.isCancelled()));
923
924 assertEquals(poolSize + 1
925 + (effectiveDelayedPolicy ? 1 : 0)
926 + (periodicTasksContinue ? 2 : 0),
927 ran.get());
928 }
929
930 /**
931 * completed submit of callable returns result
932 */
933 public void testSubmitCallable() throws Exception {
934 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
935 try (PoolCleaner cleaner = cleaner(e)) {
936 Future<String> future = e.submit(new StringTask());
937 String result = future.get();
938 assertSame(TEST_STRING, result);
939 }
940 }
941
942 /**
943 * completed submit of runnable returns successfully
944 */
945 public void testSubmitRunnable() throws Exception {
946 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
947 try (PoolCleaner cleaner = cleaner(e)) {
948 Future<?> future = e.submit(new NoOpRunnable());
949 future.get();
950 assertTrue(future.isDone());
951 }
952 }
953
954 /**
955 * completed submit of (runnable, result) returns result
956 */
957 public void testSubmitRunnable2() throws Exception {
958 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
959 try (PoolCleaner cleaner = cleaner(e)) {
960 Future<String> future = e.submit(new NoOpRunnable(), TEST_STRING);
961 String result = future.get();
962 assertSame(TEST_STRING, result);
963 }
964 }
965
966 /**
967 * invokeAny(null) throws NPE
968 */
969 public void testInvokeAny1() throws Exception {
970 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
971 try (PoolCleaner cleaner = cleaner(e)) {
972 try {
973 e.invokeAny(null);
974 shouldThrow();
975 } catch (NullPointerException success) {}
976 }
977 }
978
979 /**
980 * invokeAny(empty collection) throws IAE
981 */
982 public void testInvokeAny2() throws Exception {
983 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
984 try (PoolCleaner cleaner = cleaner(e)) {
985 try {
986 e.invokeAny(new ArrayList<Callable<String>>());
987 shouldThrow();
988 } catch (IllegalArgumentException success) {}
989 }
990 }
991
992 /**
993 * invokeAny(c) throws NPE if c has null elements
994 */
995 public void testInvokeAny3() throws Exception {
996 CountDownLatch latch = new CountDownLatch(1);
997 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
998 try (PoolCleaner cleaner = cleaner(e)) {
999 List<Callable<String>> l = new ArrayList<>();
1000 l.add(latchAwaitingStringTask(latch));
1001 l.add(null);
1002 try {
1003 e.invokeAny(l);
1004 shouldThrow();
1005 } catch (NullPointerException success) {}
1006 latch.countDown();
1007 }
1008 }
1009
1010 /**
1011 * invokeAny(c) throws ExecutionException if no task completes
1012 */
1013 public void testInvokeAny4() throws Exception {
1014 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1015 try (PoolCleaner cleaner = cleaner(e)) {
1016 List<Callable<String>> l = new ArrayList<>();
1017 l.add(new NPETask());
1018 try {
1019 e.invokeAny(l);
1020 shouldThrow();
1021 } catch (ExecutionException success) {
1022 assertTrue(success.getCause() instanceof NullPointerException);
1023 }
1024 }
1025 }
1026
1027 /**
1028 * invokeAny(c) returns result of some task
1029 */
1030 public void testInvokeAny5() throws Exception {
1031 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1032 try (PoolCleaner cleaner = cleaner(e)) {
1033 List<Callable<String>> l = new ArrayList<>();
1034 l.add(new StringTask());
1035 l.add(new StringTask());
1036 String result = e.invokeAny(l);
1037 assertSame(TEST_STRING, result);
1038 }
1039 }
1040
1041 /**
1042 * invokeAll(null) throws NPE
1043 */
1044 public void testInvokeAll1() throws Exception {
1045 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1046 try (PoolCleaner cleaner = cleaner(e)) {
1047 try {
1048 e.invokeAll(null);
1049 shouldThrow();
1050 } catch (NullPointerException success) {}
1051 }
1052 }
1053
1054 /**
1055 * invokeAll(empty collection) returns empty collection
1056 */
1057 public void testInvokeAll2() throws Exception {
1058 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1059 try (PoolCleaner cleaner = cleaner(e)) {
1060 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>());
1061 assertTrue(r.isEmpty());
1062 }
1063 }
1064
1065 /**
1066 * invokeAll(c) throws NPE if c has null elements
1067 */
1068 public void testInvokeAll3() throws Exception {
1069 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1070 try (PoolCleaner cleaner = cleaner(e)) {
1071 List<Callable<String>> l = new ArrayList<>();
1072 l.add(new StringTask());
1073 l.add(null);
1074 try {
1075 e.invokeAll(l);
1076 shouldThrow();
1077 } catch (NullPointerException success) {}
1078 }
1079 }
1080
1081 /**
1082 * get of invokeAll(c) throws exception on failed task
1083 */
1084 public void testInvokeAll4() throws Exception {
1085 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1086 try (PoolCleaner cleaner = cleaner(e)) {
1087 List<Callable<String>> l = new ArrayList<>();
1088 l.add(new NPETask());
1089 List<Future<String>> futures = e.invokeAll(l);
1090 assertEquals(1, futures.size());
1091 try {
1092 futures.get(0).get();
1093 shouldThrow();
1094 } catch (ExecutionException success) {
1095 assertTrue(success.getCause() instanceof NullPointerException);
1096 }
1097 }
1098 }
1099
1100 /**
1101 * invokeAll(c) returns results of all completed tasks
1102 */
1103 public void testInvokeAll5() throws Exception {
1104 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1105 try (PoolCleaner cleaner = cleaner(e)) {
1106 List<Callable<String>> l = new ArrayList<>();
1107 l.add(new StringTask());
1108 l.add(new StringTask());
1109 List<Future<String>> futures = e.invokeAll(l);
1110 assertEquals(2, futures.size());
1111 for (Future<String> future : futures)
1112 assertSame(TEST_STRING, future.get());
1113 }
1114 }
1115
1116 /**
1117 * timed invokeAny(null) throws NPE
1118 */
1119 public void testTimedInvokeAny1() throws Exception {
1120 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1121 try (PoolCleaner cleaner = cleaner(e)) {
1122 try {
1123 e.invokeAny(null, MEDIUM_DELAY_MS, MILLISECONDS);
1124 shouldThrow();
1125 } catch (NullPointerException success) {}
1126 }
1127 }
1128
1129 /**
1130 * timed invokeAny(,,null) throws NPE
1131 */
1132 public void testTimedInvokeAnyNullTimeUnit() throws Exception {
1133 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1134 try (PoolCleaner cleaner = cleaner(e)) {
1135 List<Callable<String>> l = new ArrayList<>();
1136 l.add(new StringTask());
1137 try {
1138 e.invokeAny(l, MEDIUM_DELAY_MS, null);
1139 shouldThrow();
1140 } catch (NullPointerException success) {}
1141 }
1142 }
1143
1144 /**
1145 * timed invokeAny(empty collection) throws IAE
1146 */
1147 public void testTimedInvokeAny2() throws Exception {
1148 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1149 try (PoolCleaner cleaner = cleaner(e)) {
1150 try {
1151 e.invokeAny(new ArrayList<Callable<String>>(), MEDIUM_DELAY_MS, MILLISECONDS);
1152 shouldThrow();
1153 } catch (IllegalArgumentException success) {}
1154 }
1155 }
1156
1157 /**
1158 * timed invokeAny(c) throws NPE if c has null elements
1159 */
1160 public void testTimedInvokeAny3() throws Exception {
1161 CountDownLatch latch = new CountDownLatch(1);
1162 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1163 try (PoolCleaner cleaner = cleaner(e)) {
1164 List<Callable<String>> l = new ArrayList<>();
1165 l.add(latchAwaitingStringTask(latch));
1166 l.add(null);
1167 try {
1168 e.invokeAny(l, MEDIUM_DELAY_MS, MILLISECONDS);
1169 shouldThrow();
1170 } catch (NullPointerException success) {}
1171 latch.countDown();
1172 }
1173 }
1174
1175 /**
1176 * timed invokeAny(c) throws ExecutionException if no task completes
1177 */
1178 public void testTimedInvokeAny4() throws Exception {
1179 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1180 try (PoolCleaner cleaner = cleaner(e)) {
1181 long startTime = System.nanoTime();
1182 List<Callable<String>> l = new ArrayList<>();
1183 l.add(new NPETask());
1184 try {
1185 e.invokeAny(l, LONG_DELAY_MS, MILLISECONDS);
1186 shouldThrow();
1187 } catch (ExecutionException success) {
1188 assertTrue(success.getCause() instanceof NullPointerException);
1189 }
1190 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1191 }
1192 }
1193
1194 /**
1195 * timed invokeAny(c) returns result of some task
1196 */
1197 public void testTimedInvokeAny5() throws Exception {
1198 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1199 try (PoolCleaner cleaner = cleaner(e)) {
1200 long startTime = System.nanoTime();
1201 List<Callable<String>> l = new ArrayList<>();
1202 l.add(new StringTask());
1203 l.add(new StringTask());
1204 String result = e.invokeAny(l, LONG_DELAY_MS, MILLISECONDS);
1205 assertSame(TEST_STRING, result);
1206 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1207 }
1208 }
1209
1210 /**
1211 * timed invokeAll(null) throws NPE
1212 */
1213 public void testTimedInvokeAll1() throws Exception {
1214 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1215 try (PoolCleaner cleaner = cleaner(e)) {
1216 try {
1217 e.invokeAll(null, MEDIUM_DELAY_MS, MILLISECONDS);
1218 shouldThrow();
1219 } catch (NullPointerException success) {}
1220 }
1221 }
1222
1223 /**
1224 * timed invokeAll(,,null) throws NPE
1225 */
1226 public void testTimedInvokeAllNullTimeUnit() throws Exception {
1227 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1228 try (PoolCleaner cleaner = cleaner(e)) {
1229 List<Callable<String>> l = new ArrayList<>();
1230 l.add(new StringTask());
1231 try {
1232 e.invokeAll(l, MEDIUM_DELAY_MS, null);
1233 shouldThrow();
1234 } catch (NullPointerException success) {}
1235 }
1236 }
1237
1238 /**
1239 * timed invokeAll(empty collection) returns empty collection
1240 */
1241 public void testTimedInvokeAll2() throws Exception {
1242 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1243 try (PoolCleaner cleaner = cleaner(e)) {
1244 List<Future<String>> r = e.invokeAll(new ArrayList<Callable<String>>(),
1245 MEDIUM_DELAY_MS, MILLISECONDS);
1246 assertTrue(r.isEmpty());
1247 }
1248 }
1249
1250 /**
1251 * timed invokeAll(c) throws NPE if c has null elements
1252 */
1253 public void testTimedInvokeAll3() throws Exception {
1254 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1255 try (PoolCleaner cleaner = cleaner(e)) {
1256 List<Callable<String>> l = new ArrayList<>();
1257 l.add(new StringTask());
1258 l.add(null);
1259 try {
1260 e.invokeAll(l, MEDIUM_DELAY_MS, MILLISECONDS);
1261 shouldThrow();
1262 } catch (NullPointerException success) {}
1263 }
1264 }
1265
1266 /**
1267 * get of element of invokeAll(c) throws exception on failed task
1268 */
1269 public void testTimedInvokeAll4() throws Exception {
1270 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1271 try (PoolCleaner cleaner = cleaner(e)) {
1272 List<Callable<String>> l = new ArrayList<>();
1273 l.add(new NPETask());
1274 List<Future<String>> futures =
1275 e.invokeAll(l, LONG_DELAY_MS, MILLISECONDS);
1276 assertEquals(1, futures.size());
1277 try {
1278 futures.get(0).get();
1279 shouldThrow();
1280 } catch (ExecutionException success) {
1281 assertTrue(success.getCause() instanceof NullPointerException);
1282 }
1283 }
1284 }
1285
1286 /**
1287 * timed invokeAll(c) returns results of all completed tasks
1288 */
1289 public void testTimedInvokeAll5() throws Exception {
1290 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1291 try (PoolCleaner cleaner = cleaner(e)) {
1292 List<Callable<String>> l = new ArrayList<>();
1293 l.add(new StringTask());
1294 l.add(new StringTask());
1295 List<Future<String>> futures =
1296 e.invokeAll(l, LONG_DELAY_MS, MILLISECONDS);
1297 assertEquals(2, futures.size());
1298 for (Future<String> future : futures)
1299 assertSame(TEST_STRING, future.get());
1300 }
1301 }
1302
1303 /**
1304 * timed invokeAll(c) cancels tasks not completed by timeout
1305 */
1306 public void testTimedInvokeAll6() throws Exception {
1307 for (long timeout = timeoutMillis();;) {
1308 final CountDownLatch done = new CountDownLatch(1);
1309 final Callable<String> waiter = new CheckedCallable<String>() {
1310 public String realCall() {
1311 try { done.await(LONG_DELAY_MS, MILLISECONDS); }
1312 catch (InterruptedException ok) {}
1313 return "1"; }};
1314 final ExecutorService p = new ScheduledThreadPoolExecutor(2);
1315 try (PoolCleaner cleaner = cleaner(p, done)) {
1316 List<Callable<String>> tasks = new ArrayList<>();
1317 tasks.add(new StringTask("0"));
1318 tasks.add(waiter);
1319 tasks.add(new StringTask("2"));
1320 long startTime = System.nanoTime();
1321 List<Future<String>> futures =
1322 p.invokeAll(tasks, timeout, MILLISECONDS);
1323 assertEquals(tasks.size(), futures.size());
1324 assertTrue(millisElapsedSince(startTime) >= timeout);
1325 for (Future future : futures)
1326 assertTrue(future.isDone());
1327 assertTrue(futures.get(1).isCancelled());
1328 try {
1329 assertEquals("0", futures.get(0).get());
1330 assertEquals("2", futures.get(2).get());
1331 break;
1332 } catch (CancellationException retryWithLongerTimeout) {
1333 timeout *= 2;
1334 if (timeout >= LONG_DELAY_MS / 2)
1335 fail("expected exactly one task to be cancelled");
1336 }
1337 }
1338 }
1339 }
1340
1341 /**
1342 * A fixed delay task with overflowing period should not prevent a
1343 * one-shot task from executing.
1344 * https://bugs.openjdk.java.net/browse/JDK-8051859
1345 */
1346 public void testScheduleWithFixedDelay_overflow() throws Exception {
1347 final CountDownLatch delayedDone = new CountDownLatch(1);
1348 final CountDownLatch immediateDone = new CountDownLatch(1);
1349 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
1350 try (PoolCleaner cleaner = cleaner(p)) {
1351 final Runnable immediate = new Runnable() { public void run() {
1352 immediateDone.countDown();
1353 }};
1354 final Runnable delayed = new Runnable() { public void run() {
1355 delayedDone.countDown();
1356 p.submit(immediate);
1357 }};
1358 p.scheduleWithFixedDelay(delayed, 0L, Long.MAX_VALUE, SECONDS);
1359 await(delayedDone);
1360 await(immediateDone);
1361 }
1362 }
1363
1364 }