ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/ScheduledExecutorTest.java
Revision: 1.100
Committed: Wed Jan 27 01:57:24 2021 UTC (3 years, 3 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.99: +2 -2 lines
Log Message:
use diamond <> pervasively

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include Andrew Wright, Jeffrey Hayes,
6 * Pat Fisher, Mike Judd.
7 */
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.NANOSECONDS;
11 import static java.util.concurrent.TimeUnit.SECONDS;
12
13 import java.util.ArrayList;
14 import java.util.Collection;
15 import java.util.Collections;
16 import java.util.HashSet;
17 import java.util.List;
18 import java.util.concurrent.BlockingQueue;
19 import java.util.concurrent.Callable;
20 import java.util.concurrent.CancellationException;
21 import java.util.concurrent.CountDownLatch;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.ScheduledFuture;
27 import java.util.concurrent.ScheduledThreadPoolExecutor;
28 import java.util.concurrent.ThreadFactory;
29 import java.util.concurrent.ThreadLocalRandom;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.concurrent.atomic.AtomicLong;
34 import java.util.stream.Stream;
35
36 import junit.framework.Test;
37 import junit.framework.TestSuite;
38
39 public class ScheduledExecutorTest extends JSR166TestCase {
40 public static void main(String[] args) {
41 main(suite(), args);
42 }
43 public static Test suite() {
44 return new TestSuite(ScheduledExecutorTest.class);
45 }
46
47 /**
48 * execute successfully executes a runnable
49 */
50 public void testExecute() throws InterruptedException {
51 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
52 try (PoolCleaner cleaner = cleaner(p)) {
53 final CountDownLatch done = new CountDownLatch(1);
54 final Runnable task = new CheckedRunnable() {
55 public void realRun() { done.countDown(); }};
56 p.execute(task);
57 await(done);
58 }
59 }
60
61 /**
62 * delayed schedule of callable successfully executes after delay
63 */
64 public void testSchedule1() throws Exception {
65 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
66 try (PoolCleaner cleaner = cleaner(p)) {
67 final long startTime = System.nanoTime();
68 final CountDownLatch done = new CountDownLatch(1);
69 Callable<Boolean> task = new CheckedCallable<>() {
70 public Boolean realCall() {
71 done.countDown();
72 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
73 return Boolean.TRUE;
74 }};
75 Future<Boolean> f = p.schedule(task, timeoutMillis(), MILLISECONDS);
76 assertSame(Boolean.TRUE, f.get());
77 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
78 assertEquals(0L, done.getCount());
79 }
80 }
81
82 /**
83 * delayed schedule of runnable successfully executes after delay
84 */
85 public void testSchedule3() throws Exception {
86 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
87 try (PoolCleaner cleaner = cleaner(p)) {
88 final long startTime = System.nanoTime();
89 final CountDownLatch done = new CountDownLatch(1);
90 Runnable task = new CheckedRunnable() {
91 public void realRun() {
92 done.countDown();
93 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
94 }};
95 Future<?> f = p.schedule(task, timeoutMillis(), MILLISECONDS);
96 await(done);
97 assertNull(f.get(LONG_DELAY_MS, MILLISECONDS));
98 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
99 }
100 }
101
102 /**
103 * scheduleAtFixedRate executes runnable after given initial delay
104 */
105 public void testSchedule4() throws Exception {
106 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
107 try (PoolCleaner cleaner = cleaner(p)) {
108 final long startTime = System.nanoTime();
109 final CountDownLatch done = new CountDownLatch(1);
110 Runnable task = new CheckedRunnable() {
111 public void realRun() {
112 done.countDown();
113 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
114 }};
115 ScheduledFuture<?> f =
116 p.scheduleAtFixedRate(task, timeoutMillis(),
117 LONG_DELAY_MS, MILLISECONDS);
118 await(done);
119 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
120 f.cancel(true);
121 }
122 }
123
124 /**
125 * scheduleWithFixedDelay executes runnable after given initial delay
126 */
127 public void testSchedule5() throws Exception {
128 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
129 try (PoolCleaner cleaner = cleaner(p)) {
130 final long startTime = System.nanoTime();
131 final CountDownLatch done = new CountDownLatch(1);
132 Runnable task = new CheckedRunnable() {
133 public void realRun() {
134 done.countDown();
135 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
136 }};
137 ScheduledFuture<?> f =
138 p.scheduleWithFixedDelay(task, timeoutMillis(),
139 LONG_DELAY_MS, MILLISECONDS);
140 await(done);
141 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
142 f.cancel(true);
143 }
144 }
145
146 static class RunnableCounter implements Runnable {
147 AtomicInteger count = new AtomicInteger(0);
148 public void run() { count.getAndIncrement(); }
149 }
150
151 /**
152 * scheduleAtFixedRate executes series of tasks at given rate.
153 * Eventually, it must hold that:
154 * cycles - 1 <= elapsedMillis/delay < cycles
155 */
156 public void testFixedRateSequence() throws InterruptedException {
157 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
158 try (PoolCleaner cleaner = cleaner(p)) {
159 for (int delay = 1; delay <= LONG_DELAY_MS; delay *= 3) {
160 final long startTime = System.nanoTime();
161 final int cycles = 8;
162 final CountDownLatch done = new CountDownLatch(cycles);
163 final Runnable task = new CheckedRunnable() {
164 public void realRun() { done.countDown(); }};
165 final ScheduledFuture<?> periodicTask =
166 p.scheduleAtFixedRate(task, 0, delay, MILLISECONDS);
167 final int totalDelayMillis = (cycles - 1) * delay;
168 await(done, totalDelayMillis + LONG_DELAY_MS);
169 periodicTask.cancel(true);
170 final long elapsedMillis = millisElapsedSince(startTime);
171 assertTrue(elapsedMillis >= totalDelayMillis);
172 if (elapsedMillis <= cycles * delay)
173 return;
174 // else retry with longer delay
175 }
176 fail("unexpected execution rate");
177 }
178 }
179
180 /**
181 * scheduleWithFixedDelay executes series of tasks with given period.
182 * Eventually, it must hold that each task starts at least delay and at
183 * most 2 * delay after the termination of the previous task.
184 */
185 public void testFixedDelaySequence() throws InterruptedException {
186 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
187 try (PoolCleaner cleaner = cleaner(p)) {
188 for (int delay = 1; delay <= LONG_DELAY_MS; delay *= 3) {
189 final long startTime = System.nanoTime();
190 final AtomicLong previous = new AtomicLong(startTime);
191 final AtomicBoolean tryLongerDelay = new AtomicBoolean(false);
192 final int cycles = 8;
193 final CountDownLatch done = new CountDownLatch(cycles);
194 final int d = delay;
195 final Runnable task = new CheckedRunnable() {
196 public void realRun() {
197 long now = System.nanoTime();
198 long elapsedMillis
199 = NANOSECONDS.toMillis(now - previous.get());
200 if (done.getCount() == cycles) { // first execution
201 if (elapsedMillis >= d)
202 tryLongerDelay.set(true);
203 } else {
204 assertTrue(elapsedMillis >= d);
205 if (elapsedMillis >= 2 * d)
206 tryLongerDelay.set(true);
207 }
208 previous.set(now);
209 done.countDown();
210 }};
211 final ScheduledFuture<?> periodicTask =
212 p.scheduleWithFixedDelay(task, 0, delay, MILLISECONDS);
213 final int totalDelayMillis = (cycles - 1) * delay;
214 await(done, totalDelayMillis + cycles * LONG_DELAY_MS);
215 periodicTask.cancel(true);
216 final long elapsedMillis = millisElapsedSince(startTime);
217 assertTrue(elapsedMillis >= totalDelayMillis);
218 if (!tryLongerDelay.get())
219 return;
220 // else retry with longer delay
221 }
222 fail("unexpected execution rate");
223 }
224 }
225
226 /**
227 * Submitting null tasks throws NullPointerException
228 */
229 public void testNullTaskSubmission() {
230 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
231 try (PoolCleaner cleaner = cleaner(p)) {
232 assertNullTaskSubmissionThrowsNullPointerException(p);
233 }
234 }
235
236 /**
237 * Submitted tasks are rejected when shutdown
238 */
239 public void testSubmittedTasksRejectedWhenShutdown() throws InterruptedException {
240 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
241 final ThreadLocalRandom rnd = ThreadLocalRandom.current();
242 final CountDownLatch threadsStarted = new CountDownLatch(p.getCorePoolSize());
243 final CountDownLatch done = new CountDownLatch(1);
244 final Runnable r = () -> {
245 threadsStarted.countDown();
246 for (;;) {
247 try {
248 done.await();
249 return;
250 } catch (InterruptedException shutdownNowDeliberatelyIgnored) {}
251 }};
252 final Callable<Boolean> c = () -> {
253 threadsStarted.countDown();
254 for (;;) {
255 try {
256 done.await();
257 return Boolean.TRUE;
258 } catch (InterruptedException shutdownNowDeliberatelyIgnored) {}
259 }};
260
261 try (PoolCleaner cleaner = cleaner(p, done)) {
262 for (int i = p.getCorePoolSize(); i--> 0; ) {
263 switch (rnd.nextInt(4)) {
264 case 0: p.execute(r); break;
265 case 1: assertFalse(p.submit(r).isDone()); break;
266 case 2: assertFalse(p.submit(r, Boolean.TRUE).isDone()); break;
267 case 3: assertFalse(p.submit(c).isDone()); break;
268 }
269 }
270
271 // ScheduledThreadPoolExecutor has an unbounded queue, so never saturated.
272 await(threadsStarted);
273
274 if (rnd.nextBoolean())
275 p.shutdownNow();
276 else
277 p.shutdown();
278 // Pool is shutdown, but not yet terminated
279 assertTaskSubmissionsAreRejected(p);
280 assertFalse(p.isTerminated());
281
282 done.countDown(); // release blocking tasks
283 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
284
285 assertTaskSubmissionsAreRejected(p);
286 }
287 assertEquals(p.getCorePoolSize(), p.getCompletedTaskCount());
288 }
289
290 /**
291 * getActiveCount increases but doesn't overestimate, when a
292 * thread becomes active
293 */
294 public void testGetActiveCount() throws InterruptedException {
295 final CountDownLatch done = new CountDownLatch(1);
296 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(2);
297 try (PoolCleaner cleaner = cleaner(p, done)) {
298 final CountDownLatch threadStarted = new CountDownLatch(1);
299 assertEquals(0, p.getActiveCount());
300 p.execute(new CheckedRunnable() {
301 public void realRun() throws InterruptedException {
302 threadStarted.countDown();
303 assertEquals(1, p.getActiveCount());
304 await(done);
305 }});
306 await(threadStarted);
307 assertEquals(1, p.getActiveCount());
308 }
309 }
310
311 /**
312 * getCompletedTaskCount increases, but doesn't overestimate,
313 * when tasks complete
314 */
315 public void testGetCompletedTaskCount() throws InterruptedException {
316 final ThreadPoolExecutor p = new ScheduledThreadPoolExecutor(2);
317 try (PoolCleaner cleaner = cleaner(p)) {
318 final CountDownLatch threadStarted = new CountDownLatch(1);
319 final CountDownLatch threadProceed = new CountDownLatch(1);
320 final CountDownLatch threadDone = new CountDownLatch(1);
321 assertEquals(0, p.getCompletedTaskCount());
322 p.execute(new CheckedRunnable() {
323 public void realRun() throws InterruptedException {
324 threadStarted.countDown();
325 assertEquals(0, p.getCompletedTaskCount());
326 await(threadProceed);
327 threadDone.countDown();
328 }});
329 await(threadStarted);
330 assertEquals(0, p.getCompletedTaskCount());
331 threadProceed.countDown();
332 await(threadDone);
333 long startTime = System.nanoTime();
334 while (p.getCompletedTaskCount() != 1) {
335 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
336 fail("timed out");
337 Thread.yield();
338 }
339 }
340 }
341
342 /**
343 * getCorePoolSize returns size given in constructor if not otherwise set
344 */
345 public void testGetCorePoolSize() throws InterruptedException {
346 ThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
347 try (PoolCleaner cleaner = cleaner(p)) {
348 assertEquals(1, p.getCorePoolSize());
349 }
350 }
351
352 /**
353 * getLargestPoolSize increases, but doesn't overestimate, when
354 * multiple threads active
355 */
356 public void testGetLargestPoolSize() throws InterruptedException {
357 final int THREADS = 3;
358 final ThreadPoolExecutor p = new ScheduledThreadPoolExecutor(THREADS);
359 final CountDownLatch threadsStarted = new CountDownLatch(THREADS);
360 final CountDownLatch done = new CountDownLatch(1);
361 try (PoolCleaner cleaner = cleaner(p, done)) {
362 assertEquals(0, p.getLargestPoolSize());
363 for (int i = 0; i < THREADS; i++)
364 p.execute(new CheckedRunnable() {
365 public void realRun() throws InterruptedException {
366 threadsStarted.countDown();
367 await(done);
368 assertEquals(THREADS, p.getLargestPoolSize());
369 }});
370 await(threadsStarted);
371 assertEquals(THREADS, p.getLargestPoolSize());
372 }
373 assertEquals(THREADS, p.getLargestPoolSize());
374 }
375
376 /**
377 * getPoolSize increases, but doesn't overestimate, when threads
378 * become active
379 */
380 public void testGetPoolSize() throws InterruptedException {
381 final ThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
382 final CountDownLatch threadStarted = new CountDownLatch(1);
383 final CountDownLatch done = new CountDownLatch(1);
384 try (PoolCleaner cleaner = cleaner(p, done)) {
385 assertEquals(0, p.getPoolSize());
386 p.execute(new CheckedRunnable() {
387 public void realRun() throws InterruptedException {
388 threadStarted.countDown();
389 assertEquals(1, p.getPoolSize());
390 await(done);
391 }});
392 await(threadStarted);
393 assertEquals(1, p.getPoolSize());
394 }
395 }
396
397 /**
398 * getTaskCount increases, but doesn't overestimate, when tasks
399 * submitted
400 */
401 public void testGetTaskCount() throws InterruptedException {
402 final int TASKS = 3;
403 final CountDownLatch done = new CountDownLatch(1);
404 final ThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
405 try (PoolCleaner cleaner = cleaner(p, done)) {
406 final CountDownLatch threadStarted = new CountDownLatch(1);
407 assertEquals(0, p.getTaskCount());
408 assertEquals(0, p.getCompletedTaskCount());
409 p.execute(new CheckedRunnable() {
410 public void realRun() throws InterruptedException {
411 threadStarted.countDown();
412 await(done);
413 }});
414 await(threadStarted);
415 assertEquals(1, p.getTaskCount());
416 assertEquals(0, p.getCompletedTaskCount());
417 for (int i = 0; i < TASKS; i++) {
418 assertEquals(1 + i, p.getTaskCount());
419 p.execute(new CheckedRunnable() {
420 public void realRun() throws InterruptedException {
421 threadStarted.countDown();
422 assertEquals(1 + TASKS, p.getTaskCount());
423 await(done);
424 }});
425 }
426 assertEquals(1 + TASKS, p.getTaskCount());
427 assertEquals(0, p.getCompletedTaskCount());
428 }
429 assertEquals(1 + TASKS, p.getTaskCount());
430 assertEquals(1 + TASKS, p.getCompletedTaskCount());
431 }
432
433 /**
434 * getThreadFactory returns factory in constructor if not set
435 */
436 public void testGetThreadFactory() throws InterruptedException {
437 final ThreadFactory threadFactory = new SimpleThreadFactory();
438 final ScheduledThreadPoolExecutor p =
439 new ScheduledThreadPoolExecutor(1, threadFactory);
440 try (PoolCleaner cleaner = cleaner(p)) {
441 assertSame(threadFactory, p.getThreadFactory());
442 }
443 }
444
445 /**
446 * setThreadFactory sets the thread factory returned by getThreadFactory
447 */
448 public void testSetThreadFactory() throws InterruptedException {
449 ThreadFactory threadFactory = new SimpleThreadFactory();
450 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
451 try (PoolCleaner cleaner = cleaner(p)) {
452 p.setThreadFactory(threadFactory);
453 assertSame(threadFactory, p.getThreadFactory());
454 }
455 }
456
457 /**
458 * setThreadFactory(null) throws NPE
459 */
460 public void testSetThreadFactoryNull() throws InterruptedException {
461 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
462 try (PoolCleaner cleaner = cleaner(p)) {
463 try {
464 p.setThreadFactory(null);
465 shouldThrow();
466 } catch (NullPointerException success) {}
467 }
468 }
469
470 /**
471 * The default rejected execution handler is AbortPolicy.
472 */
473 public void testDefaultRejectedExecutionHandler() {
474 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
475 try (PoolCleaner cleaner = cleaner(p)) {
476 assertTrue(p.getRejectedExecutionHandler()
477 instanceof ThreadPoolExecutor.AbortPolicy);
478 }
479 }
480
481 /**
482 * isShutdown is false before shutdown, true after
483 */
484 public void testIsShutdown() {
485 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
486 assertFalse(p.isShutdown());
487 try (PoolCleaner cleaner = cleaner(p)) {
488 try {
489 p.shutdown();
490 assertTrue(p.isShutdown());
491 } catch (SecurityException ok) {}
492 }
493 }
494
495 /**
496 * isTerminated is false before termination, true after
497 */
498 public void testIsTerminated() throws InterruptedException {
499 final ThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
500 try (PoolCleaner cleaner = cleaner(p)) {
501 final CountDownLatch threadStarted = new CountDownLatch(1);
502 final CountDownLatch done = new CountDownLatch(1);
503 assertFalse(p.isTerminated());
504 p.execute(new CheckedRunnable() {
505 public void realRun() throws InterruptedException {
506 assertFalse(p.isTerminated());
507 threadStarted.countDown();
508 await(done);
509 }});
510 await(threadStarted);
511 assertFalse(p.isTerminating());
512 done.countDown();
513 try { p.shutdown(); } catch (SecurityException ok) { return; }
514 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
515 assertTrue(p.isTerminated());
516 }
517 }
518
519 /**
520 * isTerminating is not true when running or when terminated
521 */
522 public void testIsTerminating() throws InterruptedException {
523 final ThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
524 final CountDownLatch threadStarted = new CountDownLatch(1);
525 final CountDownLatch done = new CountDownLatch(1);
526 try (PoolCleaner cleaner = cleaner(p)) {
527 assertFalse(p.isTerminating());
528 p.execute(new CheckedRunnable() {
529 public void realRun() throws InterruptedException {
530 assertFalse(p.isTerminating());
531 threadStarted.countDown();
532 await(done);
533 }});
534 await(threadStarted);
535 assertFalse(p.isTerminating());
536 done.countDown();
537 try { p.shutdown(); } catch (SecurityException ok) { return; }
538 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
539 assertTrue(p.isTerminated());
540 assertFalse(p.isTerminating());
541 }
542 }
543
544 /**
545 * getQueue returns the work queue, which contains queued tasks
546 */
547 public void testGetQueue() throws InterruptedException {
548 final CountDownLatch done = new CountDownLatch(1);
549 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
550 try (PoolCleaner cleaner = cleaner(p, done)) {
551 final CountDownLatch threadStarted = new CountDownLatch(1);
552 @SuppressWarnings("unchecked")
553 ScheduledFuture<?>[] tasks = (ScheduledFuture<?>[])new ScheduledFuture[5];
554 for (int i = 0; i < tasks.length; i++) {
555 Runnable r = new CheckedRunnable() {
556 public void realRun() throws InterruptedException {
557 threadStarted.countDown();
558 await(done);
559 }};
560 tasks[i] = p.schedule(r, 1, MILLISECONDS);
561 }
562 await(threadStarted);
563 BlockingQueue<Runnable> q = p.getQueue();
564 assertTrue(q.contains(tasks[tasks.length - 1]));
565 assertFalse(q.contains(tasks[0]));
566 }
567 }
568
569 /**
570 * remove(task) removes queued task, and fails to remove active task
571 */
572 public void testRemove() throws InterruptedException {
573 final CountDownLatch done = new CountDownLatch(1);
574 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
575 try (PoolCleaner cleaner = cleaner(p, done)) {
576 @SuppressWarnings("unchecked")
577 ScheduledFuture<?>[] tasks = (ScheduledFuture<?>[])new ScheduledFuture[5];
578 final CountDownLatch threadStarted = new CountDownLatch(1);
579 for (int i = 0; i < tasks.length; i++) {
580 Runnable r = new CheckedRunnable() {
581 public void realRun() throws InterruptedException {
582 threadStarted.countDown();
583 await(done);
584 }};
585 tasks[i] = p.schedule(r, 1, MILLISECONDS);
586 }
587 await(threadStarted);
588 BlockingQueue<Runnable> q = p.getQueue();
589 assertFalse(p.remove((Runnable)tasks[0]));
590 assertTrue(q.contains((Runnable)tasks[4]));
591 assertTrue(q.contains((Runnable)tasks[3]));
592 assertTrue(p.remove((Runnable)tasks[4]));
593 assertFalse(p.remove((Runnable)tasks[4]));
594 assertFalse(q.contains((Runnable)tasks[4]));
595 assertTrue(q.contains((Runnable)tasks[3]));
596 assertTrue(p.remove((Runnable)tasks[3]));
597 assertFalse(q.contains((Runnable)tasks[3]));
598 }
599 }
600
601 /**
602 * purge eventually removes cancelled tasks from the queue
603 */
604 public void testPurge() throws InterruptedException {
605 @SuppressWarnings("unchecked")
606 ScheduledFuture<?>[] tasks = (ScheduledFuture<?>[])new ScheduledFuture[5];
607 final Runnable releaser = new Runnable() { public void run() {
608 for (ScheduledFuture<?> task : tasks)
609 if (task != null) task.cancel(true); }};
610 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
611 try (PoolCleaner cleaner = cleaner(p, releaser)) {
612 for (int i = 0; i < tasks.length; i++)
613 tasks[i] = p.schedule(possiblyInterruptedRunnable(SMALL_DELAY_MS),
614 LONG_DELAY_MS, MILLISECONDS);
615 int max = tasks.length;
616 if (tasks[4].cancel(true)) --max;
617 if (tasks[3].cancel(true)) --max;
618 // There must eventually be an interference-free point at
619 // which purge will not fail. (At worst, when queue is empty.)
620 long startTime = System.nanoTime();
621 do {
622 p.purge();
623 long count = p.getTaskCount();
624 if (count == max)
625 return;
626 } while (millisElapsedSince(startTime) < LONG_DELAY_MS);
627 fail("Purge failed to remove cancelled tasks");
628 }
629 }
630
631 /**
632 * shutdownNow returns a list containing tasks that were not run,
633 * and those tasks are drained from the queue
634 */
635 public void testShutdownNow() throws InterruptedException {
636 final int poolSize = 2;
637 final int count = 5;
638 final AtomicInteger ran = new AtomicInteger(0);
639 final ScheduledThreadPoolExecutor p =
640 new ScheduledThreadPoolExecutor(poolSize);
641 final CountDownLatch threadsStarted = new CountDownLatch(poolSize);
642 Runnable waiter = new CheckedRunnable() { public void realRun() {
643 threadsStarted.countDown();
644 try {
645 MILLISECONDS.sleep(LONGER_DELAY_MS);
646 } catch (InterruptedException success) {}
647 ran.getAndIncrement();
648 }};
649 for (int i = 0; i < count; i++)
650 p.execute(waiter);
651 await(threadsStarted);
652 assertEquals(poolSize, p.getActiveCount());
653 assertEquals(0, p.getCompletedTaskCount());
654 final List<Runnable> queuedTasks;
655 try {
656 queuedTasks = p.shutdownNow();
657 } catch (SecurityException ok) {
658 return; // Allowed in case test doesn't have privs
659 }
660 assertTrue(p.isShutdown());
661 assertTrue(p.getQueue().isEmpty());
662 assertEquals(count - poolSize, queuedTasks.size());
663 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
664 assertTrue(p.isTerminated());
665 assertEquals(poolSize, ran.get());
666 assertEquals(poolSize, p.getCompletedTaskCount());
667 }
668
669 /**
670 * shutdownNow returns a list containing tasks that were not run,
671 * and those tasks are drained from the queue
672 */
673 public void testShutdownNow_delayedTasks() throws InterruptedException {
674 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
675 List<ScheduledFuture<?>> tasks = new ArrayList<>();
676 for (int i = 0; i < 3; i++) {
677 Runnable r = new NoOpRunnable();
678 tasks.add(p.schedule(r, 9, SECONDS));
679 tasks.add(p.scheduleAtFixedRate(r, 9, 9, SECONDS));
680 tasks.add(p.scheduleWithFixedDelay(r, 9, 9, SECONDS));
681 }
682 if (testImplementationDetails)
683 assertEquals(new HashSet<Object>(tasks), new HashSet<Object>(p.getQueue()));
684 final List<Runnable> queuedTasks;
685 try {
686 queuedTasks = p.shutdownNow();
687 } catch (SecurityException ok) {
688 return; // Allowed in case test doesn't have privs
689 }
690 assertTrue(p.isShutdown());
691 assertTrue(p.getQueue().isEmpty());
692 if (testImplementationDetails)
693 assertEquals(new HashSet<Object>(tasks), new HashSet<Object>(queuedTasks));
694 assertEquals(tasks.size(), queuedTasks.size());
695 for (ScheduledFuture<?> task : tasks) {
696 assertFalse(task.isDone());
697 assertFalse(task.isCancelled());
698 }
699 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
700 assertTrue(p.isTerminated());
701 }
702
703 /**
704 * By default, periodic tasks are cancelled at shutdown.
705 * By default, delayed tasks keep running after shutdown.
706 * Check that changing the default values work:
707 * - setExecuteExistingDelayedTasksAfterShutdownPolicy
708 * - setContinueExistingPeriodicTasksAfterShutdownPolicy
709 */
710 @SuppressWarnings("FutureReturnValueIgnored")
711 public void testShutdown_cancellation() throws Exception {
712 final int poolSize = 4;
713 final ScheduledThreadPoolExecutor p
714 = new ScheduledThreadPoolExecutor(poolSize);
715 final BlockingQueue<Runnable> q = p.getQueue();
716 final ThreadLocalRandom rnd = ThreadLocalRandom.current();
717 final long delay = rnd.nextInt(2);
718 final int rounds = rnd.nextInt(1, 3);
719 final boolean effectiveDelayedPolicy;
720 final boolean effectivePeriodicPolicy;
721 final boolean effectiveRemovePolicy;
722
723 if (rnd.nextBoolean())
724 p.setExecuteExistingDelayedTasksAfterShutdownPolicy(
725 effectiveDelayedPolicy = rnd.nextBoolean());
726 else
727 effectiveDelayedPolicy = true;
728 assertEquals(effectiveDelayedPolicy,
729 p.getExecuteExistingDelayedTasksAfterShutdownPolicy());
730
731 if (rnd.nextBoolean())
732 p.setContinueExistingPeriodicTasksAfterShutdownPolicy(
733 effectivePeriodicPolicy = rnd.nextBoolean());
734 else
735 effectivePeriodicPolicy = false;
736 assertEquals(effectivePeriodicPolicy,
737 p.getContinueExistingPeriodicTasksAfterShutdownPolicy());
738
739 if (rnd.nextBoolean())
740 p.setRemoveOnCancelPolicy(
741 effectiveRemovePolicy = rnd.nextBoolean());
742 else
743 effectiveRemovePolicy = false;
744 assertEquals(effectiveRemovePolicy,
745 p.getRemoveOnCancelPolicy());
746
747 final boolean periodicTasksContinue = effectivePeriodicPolicy && rnd.nextBoolean();
748
749 // Strategy: Wedge the pool with one wave of "blocker" tasks,
750 // then add a second wave that waits in the queue until unblocked.
751 final AtomicInteger ran = new AtomicInteger(0);
752 final CountDownLatch poolBlocked = new CountDownLatch(poolSize);
753 final CountDownLatch unblock = new CountDownLatch(1);
754 final RuntimeException exception = new RuntimeException();
755
756 class Task implements Runnable {
757 public void run() {
758 try {
759 ran.getAndIncrement();
760 poolBlocked.countDown();
761 await(unblock);
762 } catch (Throwable fail) { threadUnexpectedException(fail); }
763 }
764 }
765
766 class PeriodicTask extends Task {
767 PeriodicTask(int rounds) { this.rounds = rounds; }
768 int rounds;
769 public void run() {
770 if (--rounds == 0) super.run();
771 // throw exception to surely terminate this periodic task,
772 // but in a separate execution and in a detectable way.
773 if (rounds == -1) throw exception;
774 }
775 }
776
777 Runnable task = new Task();
778
779 List<Future<?>> immediates = new ArrayList<>();
780 List<Future<?>> delayeds = new ArrayList<>();
781 List<Future<?>> periodics = new ArrayList<>();
782
783 immediates.add(p.submit(task));
784 delayeds.add(p.schedule(task, delay, MILLISECONDS));
785 periodics.add(p.scheduleAtFixedRate(
786 new PeriodicTask(rounds), delay, 1, MILLISECONDS));
787 periodics.add(p.scheduleWithFixedDelay(
788 new PeriodicTask(rounds), delay, 1, MILLISECONDS));
789
790 await(poolBlocked);
791
792 assertEquals(poolSize, ran.get());
793 assertEquals(poolSize, p.getActiveCount());
794 assertTrue(q.isEmpty());
795
796 // Add second wave of tasks.
797 immediates.add(p.submit(task));
798 delayeds.add(p.schedule(task, effectiveDelayedPolicy ? delay : LONG_DELAY_MS, MILLISECONDS));
799 periodics.add(p.scheduleAtFixedRate(
800 new PeriodicTask(rounds), delay, 1, MILLISECONDS));
801 periodics.add(p.scheduleWithFixedDelay(
802 new PeriodicTask(rounds), delay, 1, MILLISECONDS));
803
804 assertEquals(poolSize, q.size());
805 assertEquals(poolSize, ran.get());
806
807 immediates.forEach(
808 f -> assertTrue(((ScheduledFuture)f).getDelay(NANOSECONDS) <= 0L));
809
810 Stream.of(immediates, delayeds, periodics).flatMap(Collection::stream)
811 .forEach(f -> assertFalse(f.isDone()));
812
813 try { p.shutdown(); } catch (SecurityException ok) { return; }
814 assertTrue(p.isShutdown());
815 assertTrue(p.isTerminating());
816 assertFalse(p.isTerminated());
817
818 if (rnd.nextBoolean())
819 assertThrows(
820 RejectedExecutionException.class,
821 () -> p.submit(task),
822 () -> p.schedule(task, 1, SECONDS),
823 () -> p.scheduleAtFixedRate(
824 new PeriodicTask(1), 1, 1, SECONDS),
825 () -> p.scheduleWithFixedDelay(
826 new PeriodicTask(2), 1, 1, SECONDS));
827
828 assertTrue(q.contains(immediates.get(1)));
829 assertTrue(!effectiveDelayedPolicy
830 ^ q.contains(delayeds.get(1)));
831 assertTrue(!effectivePeriodicPolicy
832 ^ q.containsAll(periodics.subList(2, 4)));
833
834 immediates.forEach(f -> assertFalse(f.isDone()));
835
836 assertFalse(delayeds.get(0).isDone());
837 if (effectiveDelayedPolicy)
838 assertFalse(delayeds.get(1).isDone());
839 else
840 assertTrue(delayeds.get(1).isCancelled());
841
842 if (effectivePeriodicPolicy)
843 periodics.forEach(
844 f -> {
845 assertFalse(f.isDone());
846 if (!periodicTasksContinue) {
847 assertTrue(f.cancel(false));
848 assertTrue(f.isCancelled());
849 }
850 });
851 else {
852 periodics.subList(0, 2).forEach(f -> assertFalse(f.isDone()));
853 periodics.subList(2, 4).forEach(f -> assertTrue(f.isCancelled()));
854 }
855
856 unblock.countDown(); // Release all pool threads
857
858 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
859 assertFalse(p.isTerminating());
860 assertTrue(p.isTerminated());
861
862 assertTrue(q.isEmpty());
863
864 Stream.of(immediates, delayeds, periodics).flatMap(Collection::stream)
865 .forEach(f -> assertTrue(f.isDone()));
866
867 for (Future<?> f : immediates) assertNull(f.get());
868
869 assertNull(delayeds.get(0).get());
870 if (effectiveDelayedPolicy)
871 assertNull(delayeds.get(1).get());
872 else
873 assertTrue(delayeds.get(1).isCancelled());
874
875 if (periodicTasksContinue)
876 periodics.forEach(
877 f -> {
878 try { f.get(); }
879 catch (ExecutionException success) {
880 assertSame(exception, success.getCause());
881 }
882 catch (Throwable fail) { threadUnexpectedException(fail); }
883 });
884 else
885 periodics.forEach(f -> assertTrue(f.isCancelled()));
886
887 assertEquals(poolSize + 1
888 + (effectiveDelayedPolicy ? 1 : 0)
889 + (periodicTasksContinue ? 2 : 0),
890 ran.get());
891 }
892
893 /**
894 * completed submit of callable returns result
895 */
896 public void testSubmitCallable() throws Exception {
897 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
898 try (PoolCleaner cleaner = cleaner(e)) {
899 Future<String> future = e.submit(new StringTask());
900 String result = future.get();
901 assertSame(TEST_STRING, result);
902 }
903 }
904
905 /**
906 * completed submit of runnable returns successfully
907 */
908 public void testSubmitRunnable() throws Exception {
909 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
910 try (PoolCleaner cleaner = cleaner(e)) {
911 Future<?> future = e.submit(new NoOpRunnable());
912 future.get();
913 assertTrue(future.isDone());
914 }
915 }
916
917 /**
918 * completed submit of (runnable, result) returns result
919 */
920 public void testSubmitRunnable2() throws Exception {
921 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
922 try (PoolCleaner cleaner = cleaner(e)) {
923 Future<String> future = e.submit(new NoOpRunnable(), TEST_STRING);
924 String result = future.get();
925 assertSame(TEST_STRING, result);
926 }
927 }
928
929 /**
930 * invokeAny(null) throws NullPointerException
931 */
932 public void testInvokeAny1() throws Exception {
933 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
934 try (PoolCleaner cleaner = cleaner(e)) {
935 try {
936 e.invokeAny(null);
937 shouldThrow();
938 } catch (NullPointerException success) {}
939 }
940 }
941
942 /**
943 * invokeAny(empty collection) throws IllegalArgumentException
944 */
945 public void testInvokeAny2() throws Exception {
946 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
947 try (PoolCleaner cleaner = cleaner(e)) {
948 try {
949 e.invokeAny(new ArrayList<Callable<String>>());
950 shouldThrow();
951 } catch (IllegalArgumentException success) {}
952 }
953 }
954
955 /**
956 * invokeAny(c) throws NullPointerException if c has null elements
957 */
958 public void testInvokeAny3() throws Exception {
959 CountDownLatch latch = new CountDownLatch(1);
960 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
961 try (PoolCleaner cleaner = cleaner(e)) {
962 List<Callable<String>> l = new ArrayList<>();
963 l.add(latchAwaitingStringTask(latch));
964 l.add(null);
965 try {
966 e.invokeAny(l);
967 shouldThrow();
968 } catch (NullPointerException success) {}
969 latch.countDown();
970 }
971 }
972
973 /**
974 * invokeAny(c) throws ExecutionException if no task completes
975 */
976 public void testInvokeAny4() throws Exception {
977 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
978 try (PoolCleaner cleaner = cleaner(e)) {
979 List<Callable<String>> l = new ArrayList<>();
980 l.add(new NPETask());
981 try {
982 e.invokeAny(l);
983 shouldThrow();
984 } catch (ExecutionException success) {
985 assertTrue(success.getCause() instanceof NullPointerException);
986 }
987 }
988 }
989
990 /**
991 * invokeAny(c) returns result of some task
992 */
993 public void testInvokeAny5() throws Exception {
994 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
995 try (PoolCleaner cleaner = cleaner(e)) {
996 List<Callable<String>> l = new ArrayList<>();
997 l.add(new StringTask());
998 l.add(new StringTask());
999 String result = e.invokeAny(l);
1000 assertSame(TEST_STRING, result);
1001 }
1002 }
1003
1004 /**
1005 * invokeAll(null) throws NPE
1006 */
1007 public void testInvokeAll1() throws Exception {
1008 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1009 try (PoolCleaner cleaner = cleaner(e)) {
1010 try {
1011 e.invokeAll(null);
1012 shouldThrow();
1013 } catch (NullPointerException success) {}
1014 }
1015 }
1016
1017 /**
1018 * invokeAll(empty collection) returns empty list
1019 */
1020 public void testInvokeAll2() throws Exception {
1021 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1022 final Collection<Callable<String>> emptyCollection
1023 = Collections.emptyList();
1024 try (PoolCleaner cleaner = cleaner(e)) {
1025 List<Future<String>> r = e.invokeAll(emptyCollection);
1026 assertTrue(r.isEmpty());
1027 }
1028 }
1029
1030 /**
1031 * invokeAll(c) throws NPE if c has null elements
1032 */
1033 public void testInvokeAll3() throws Exception {
1034 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1035 try (PoolCleaner cleaner = cleaner(e)) {
1036 List<Callable<String>> l = new ArrayList<>();
1037 l.add(new StringTask());
1038 l.add(null);
1039 try {
1040 e.invokeAll(l);
1041 shouldThrow();
1042 } catch (NullPointerException success) {}
1043 }
1044 }
1045
1046 /**
1047 * get of invokeAll(c) throws exception on failed task
1048 */
1049 public void testInvokeAll4() throws Exception {
1050 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1051 try (PoolCleaner cleaner = cleaner(e)) {
1052 List<Callable<String>> l = new ArrayList<>();
1053 l.add(new NPETask());
1054 List<Future<String>> futures = e.invokeAll(l);
1055 assertEquals(1, futures.size());
1056 try {
1057 futures.get(0).get();
1058 shouldThrow();
1059 } catch (ExecutionException success) {
1060 assertTrue(success.getCause() instanceof NullPointerException);
1061 }
1062 }
1063 }
1064
1065 /**
1066 * invokeAll(c) returns results of all completed tasks
1067 */
1068 public void testInvokeAll5() throws Exception {
1069 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1070 try (PoolCleaner cleaner = cleaner(e)) {
1071 List<Callable<String>> l = new ArrayList<>();
1072 l.add(new StringTask());
1073 l.add(new StringTask());
1074 List<Future<String>> futures = e.invokeAll(l);
1075 assertEquals(2, futures.size());
1076 for (Future<String> future : futures)
1077 assertSame(TEST_STRING, future.get());
1078 }
1079 }
1080
1081 /**
1082 * timed invokeAny(null) throws NPE
1083 */
1084 public void testTimedInvokeAny1() throws Exception {
1085 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1086 try (PoolCleaner cleaner = cleaner(e)) {
1087 try {
1088 e.invokeAny(null, randomTimeout(), randomTimeUnit());
1089 shouldThrow();
1090 } catch (NullPointerException success) {}
1091 }
1092 }
1093
1094 /**
1095 * timed invokeAny(,,null) throws NullPointerException
1096 */
1097 public void testTimedInvokeAnyNullTimeUnit() throws Exception {
1098 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1099 try (PoolCleaner cleaner = cleaner(e)) {
1100 List<Callable<String>> l = new ArrayList<>();
1101 l.add(new StringTask());
1102 try {
1103 e.invokeAny(l, randomTimeout(), null);
1104 shouldThrow();
1105 } catch (NullPointerException success) {}
1106 }
1107 }
1108
1109 /**
1110 * timed invokeAny(empty collection) throws IllegalArgumentException
1111 */
1112 public void testTimedInvokeAny2() throws Exception {
1113 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1114 final Collection<Callable<String>> emptyCollection
1115 = Collections.emptyList();
1116 try (PoolCleaner cleaner = cleaner(e)) {
1117 try {
1118 e.invokeAny(emptyCollection, randomTimeout(), randomTimeUnit());
1119 shouldThrow();
1120 } catch (IllegalArgumentException success) {}
1121 }
1122 }
1123
1124 /**
1125 * timed invokeAny(c) throws NPE if c has null elements
1126 */
1127 public void testTimedInvokeAny3() throws Exception {
1128 CountDownLatch latch = new CountDownLatch(1);
1129 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1130 try (PoolCleaner cleaner = cleaner(e)) {
1131 List<Callable<String>> l = new ArrayList<>();
1132 l.add(latchAwaitingStringTask(latch));
1133 l.add(null);
1134 try {
1135 e.invokeAny(l, randomTimeout(), randomTimeUnit());
1136 shouldThrow();
1137 } catch (NullPointerException success) {}
1138 latch.countDown();
1139 }
1140 }
1141
1142 /**
1143 * timed invokeAny(c) throws ExecutionException if no task completes
1144 */
1145 public void testTimedInvokeAny4() throws Exception {
1146 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1147 try (PoolCleaner cleaner = cleaner(e)) {
1148 long startTime = System.nanoTime();
1149 List<Callable<String>> l = new ArrayList<>();
1150 l.add(new NPETask());
1151 try {
1152 e.invokeAny(l, LONG_DELAY_MS, MILLISECONDS);
1153 shouldThrow();
1154 } catch (ExecutionException success) {
1155 assertTrue(success.getCause() instanceof NullPointerException);
1156 }
1157 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1158 }
1159 }
1160
1161 /**
1162 * timed invokeAny(c) returns result of some task
1163 */
1164 public void testTimedInvokeAny5() throws Exception {
1165 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1166 try (PoolCleaner cleaner = cleaner(e)) {
1167 long startTime = System.nanoTime();
1168 List<Callable<String>> l = new ArrayList<>();
1169 l.add(new StringTask());
1170 l.add(new StringTask());
1171 String result = e.invokeAny(l, LONG_DELAY_MS, MILLISECONDS);
1172 assertSame(TEST_STRING, result);
1173 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1174 }
1175 }
1176
1177 /**
1178 * timed invokeAll(null) throws NPE
1179 */
1180 public void testTimedInvokeAll1() throws Exception {
1181 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1182 try (PoolCleaner cleaner = cleaner(e)) {
1183 try {
1184 e.invokeAll(null, randomTimeout(), randomTimeUnit());
1185 shouldThrow();
1186 } catch (NullPointerException success) {}
1187 }
1188 }
1189
1190 /**
1191 * timed invokeAll(,,null) throws NPE
1192 */
1193 public void testTimedInvokeAllNullTimeUnit() throws Exception {
1194 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1195 try (PoolCleaner cleaner = cleaner(e)) {
1196 List<Callable<String>> l = new ArrayList<>();
1197 l.add(new StringTask());
1198 try {
1199 e.invokeAll(l, randomTimeout(), null);
1200 shouldThrow();
1201 } catch (NullPointerException success) {}
1202 }
1203 }
1204
1205 /**
1206 * timed invokeAll(empty collection) returns empty list
1207 */
1208 public void testTimedInvokeAll2() throws Exception {
1209 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1210 final Collection<Callable<String>> emptyCollection
1211 = Collections.emptyList();
1212 try (PoolCleaner cleaner = cleaner(e)) {
1213 List<Future<String>> r =
1214 e.invokeAll(emptyCollection, randomTimeout(), randomTimeUnit());
1215 assertTrue(r.isEmpty());
1216 }
1217 }
1218
1219 /**
1220 * timed invokeAll(c) throws NPE if c has null elements
1221 */
1222 public void testTimedInvokeAll3() throws Exception {
1223 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1224 try (PoolCleaner cleaner = cleaner(e)) {
1225 List<Callable<String>> l = new ArrayList<>();
1226 l.add(new StringTask());
1227 l.add(null);
1228 try {
1229 e.invokeAll(l, randomTimeout(), randomTimeUnit());
1230 shouldThrow();
1231 } catch (NullPointerException success) {}
1232 }
1233 }
1234
1235 /**
1236 * get of element of invokeAll(c) throws exception on failed task
1237 */
1238 public void testTimedInvokeAll4() throws Exception {
1239 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1240 try (PoolCleaner cleaner = cleaner(e)) {
1241 List<Callable<String>> l = new ArrayList<>();
1242 l.add(new NPETask());
1243 List<Future<String>> futures =
1244 e.invokeAll(l, LONG_DELAY_MS, MILLISECONDS);
1245 assertEquals(1, futures.size());
1246 try {
1247 futures.get(0).get();
1248 shouldThrow();
1249 } catch (ExecutionException success) {
1250 assertTrue(success.getCause() instanceof NullPointerException);
1251 }
1252 }
1253 }
1254
1255 /**
1256 * timed invokeAll(c) returns results of all completed tasks
1257 */
1258 public void testTimedInvokeAll5() throws Exception {
1259 final ExecutorService e = new ScheduledThreadPoolExecutor(2);
1260 try (PoolCleaner cleaner = cleaner(e)) {
1261 List<Callable<String>> l = new ArrayList<>();
1262 l.add(new StringTask());
1263 l.add(new StringTask());
1264 List<Future<String>> futures =
1265 e.invokeAll(l, LONG_DELAY_MS, MILLISECONDS);
1266 assertEquals(2, futures.size());
1267 for (Future<String> future : futures)
1268 assertSame(TEST_STRING, future.get());
1269 }
1270 }
1271
1272 /**
1273 * timed invokeAll(c) cancels tasks not completed by timeout
1274 */
1275 public void testTimedInvokeAll6() throws Exception {
1276 for (long timeout = timeoutMillis();;) {
1277 final CountDownLatch done = new CountDownLatch(1);
1278 final Callable<String> waiter = new CheckedCallable<>() {
1279 public String realCall() {
1280 try { done.await(LONG_DELAY_MS, MILLISECONDS); }
1281 catch (InterruptedException ok) {}
1282 return "1"; }};
1283 final ExecutorService p = new ScheduledThreadPoolExecutor(2);
1284 try (PoolCleaner cleaner = cleaner(p, done)) {
1285 List<Callable<String>> tasks = new ArrayList<>();
1286 tasks.add(new StringTask("0"));
1287 tasks.add(waiter);
1288 tasks.add(new StringTask("2"));
1289 long startTime = System.nanoTime();
1290 List<Future<String>> futures =
1291 p.invokeAll(tasks, timeout, MILLISECONDS);
1292 assertEquals(tasks.size(), futures.size());
1293 assertTrue(millisElapsedSince(startTime) >= timeout);
1294 for (Future<?> future : futures)
1295 assertTrue(future.isDone());
1296 assertTrue(futures.get(1).isCancelled());
1297 try {
1298 assertEquals("0", futures.get(0).get());
1299 assertEquals("2", futures.get(2).get());
1300 break;
1301 } catch (CancellationException retryWithLongerTimeout) {
1302 timeout *= 2;
1303 if (timeout >= LONG_DELAY_MS / 2)
1304 fail("expected exactly one task to be cancelled");
1305 }
1306 }
1307 }
1308 }
1309
1310 /**
1311 * A fixed delay task with overflowing period should not prevent a
1312 * one-shot task from executing.
1313 * https://bugs.openjdk.java.net/browse/JDK-8051859
1314 */
1315 @SuppressWarnings("FutureReturnValueIgnored")
1316 public void testScheduleWithFixedDelay_overflow() throws Exception {
1317 final CountDownLatch delayedDone = new CountDownLatch(1);
1318 final CountDownLatch immediateDone = new CountDownLatch(1);
1319 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
1320 try (PoolCleaner cleaner = cleaner(p)) {
1321 final Runnable delayed = () -> {
1322 delayedDone.countDown();
1323 p.submit(() -> immediateDone.countDown());
1324 };
1325 p.scheduleWithFixedDelay(delayed, 0L, Long.MAX_VALUE, SECONDS);
1326 await(delayedDone);
1327 await(immediateDone);
1328 }
1329 }
1330
1331 }