ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/ScheduledExecutorSubclassTest.java
Revision: 1.74
Committed: Wed Jan 27 01:57:24 2021 UTC (3 years, 3 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.73: +2 -2 lines
Log Message:
use diamond <> pervasively

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 import static java.util.concurrent.TimeUnit.MILLISECONDS;
8 import static java.util.concurrent.TimeUnit.NANOSECONDS;
9 import static java.util.concurrent.TimeUnit.SECONDS;
10
11 import java.util.ArrayList;
12 import java.util.Collection;
13 import java.util.Collections;
14 import java.util.HashSet;
15 import java.util.List;
16 import java.util.concurrent.BlockingQueue;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.CancellationException;
19 import java.util.concurrent.CountDownLatch;
20 import java.util.concurrent.Delayed;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.RejectedExecutionException;
25 import java.util.concurrent.RejectedExecutionHandler;
26 import java.util.concurrent.RunnableScheduledFuture;
27 import java.util.concurrent.ScheduledFuture;
28 import java.util.concurrent.ScheduledThreadPoolExecutor;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.ThreadLocalRandom;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeoutException;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicBoolean;
35 import java.util.concurrent.atomic.AtomicInteger;
36 import java.util.concurrent.atomic.AtomicLong;
37 import java.util.stream.Stream;
38
39 import junit.framework.Test;
40 import junit.framework.TestSuite;
41
42 public class ScheduledExecutorSubclassTest extends JSR166TestCase {
43 public static void main(String[] args) {
44 main(suite(), args);
45 }
46 public static Test suite() {
47 return new TestSuite(ScheduledExecutorSubclassTest.class);
48 }
49
50 static class CustomTask<V> implements RunnableScheduledFuture<V> {
51 private final RunnableScheduledFuture<V> task;
52 volatile boolean ran;
53 CustomTask(RunnableScheduledFuture<V> task) { this.task = task; }
54 public boolean isPeriodic() { return task.isPeriodic(); }
55 public void run() {
56 ran = true;
57 task.run();
58 }
59 public long getDelay(TimeUnit unit) { return task.getDelay(unit); }
60 public int compareTo(Delayed t) {
61 return task.compareTo(((CustomTask)t).task);
62 }
63 public boolean cancel(boolean mayInterruptIfRunning) {
64 return task.cancel(mayInterruptIfRunning);
65 }
66 public boolean isCancelled() { return task.isCancelled(); }
67 public boolean isDone() { return task.isDone(); }
68 public V get() throws InterruptedException, ExecutionException {
69 V v = task.get();
70 assertTrue(ran);
71 return v;
72 }
73 public V get(long time, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
74 V v = task.get(time, unit);
75 assertTrue(ran);
76 return v;
77 }
78 }
79
80 public class CustomExecutor extends ScheduledThreadPoolExecutor {
81
82 protected <V> RunnableScheduledFuture<V> decorateTask(Runnable r, RunnableScheduledFuture<V> task) {
83 return new CustomTask<V>(task);
84 }
85
86 protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> c, RunnableScheduledFuture<V> task) {
87 return new CustomTask<V>(task);
88 }
89 CustomExecutor(int corePoolSize) { super(corePoolSize); }
90 CustomExecutor(int corePoolSize, RejectedExecutionHandler handler) {
91 super(corePoolSize, handler);
92 }
93
94 CustomExecutor(int corePoolSize, ThreadFactory threadFactory) {
95 super(corePoolSize, threadFactory);
96 }
97 CustomExecutor(int corePoolSize, ThreadFactory threadFactory,
98 RejectedExecutionHandler handler) {
99 super(corePoolSize, threadFactory, handler);
100 }
101
102 }
103
104 /**
105 * execute successfully executes a runnable
106 */
107 public void testExecute() throws InterruptedException {
108 final CustomExecutor p = new CustomExecutor(1);
109 try (PoolCleaner cleaner = cleaner(p)) {
110 final CountDownLatch done = new CountDownLatch(1);
111 final Runnable task = new CheckedRunnable() {
112 public void realRun() { done.countDown(); }};
113 p.execute(task);
114 await(done);
115 }
116 }
117
118 /**
119 * delayed schedule of callable successfully executes after delay
120 */
121 public void testSchedule1() throws Exception {
122 final CountDownLatch done = new CountDownLatch(1);
123 final CustomExecutor p = new CustomExecutor(1);
124 try (PoolCleaner cleaner = cleaner(p, done)) {
125 final long startTime = System.nanoTime();
126 Callable<Boolean> task = new CheckedCallable<>() {
127 public Boolean realCall() {
128 done.countDown();
129 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
130 return Boolean.TRUE;
131 }};
132 Future<Boolean> f = p.schedule(task, timeoutMillis(), MILLISECONDS);
133 assertSame(Boolean.TRUE, f.get());
134 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
135 }
136 }
137
138 /**
139 * delayed schedule of runnable successfully executes after delay
140 */
141 public void testSchedule3() throws Exception {
142 final CustomExecutor p = new CustomExecutor(1);
143 try (PoolCleaner cleaner = cleaner(p)) {
144 final long startTime = System.nanoTime();
145 final CountDownLatch done = new CountDownLatch(1);
146 Runnable task = new CheckedRunnable() {
147 public void realRun() {
148 done.countDown();
149 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
150 }};
151 Future<?> f = p.schedule(task, timeoutMillis(), MILLISECONDS);
152 await(done);
153 assertNull(f.get(LONG_DELAY_MS, MILLISECONDS));
154 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
155 }
156 }
157
158 /**
159 * scheduleAtFixedRate executes runnable after given initial delay
160 */
161 public void testSchedule4() throws InterruptedException {
162 final CustomExecutor p = new CustomExecutor(1);
163 try (PoolCleaner cleaner = cleaner(p)) {
164 final long startTime = System.nanoTime();
165 final CountDownLatch done = new CountDownLatch(1);
166 Runnable task = new CheckedRunnable() {
167 public void realRun() {
168 done.countDown();
169 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
170 }};
171 ScheduledFuture<?> f =
172 p.scheduleAtFixedRate(task, timeoutMillis(),
173 LONG_DELAY_MS, MILLISECONDS);
174 await(done);
175 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
176 f.cancel(true);
177 }
178 }
179
180 /**
181 * scheduleWithFixedDelay executes runnable after given initial delay
182 */
183 public void testSchedule5() throws InterruptedException {
184 final CustomExecutor p = new CustomExecutor(1);
185 try (PoolCleaner cleaner = cleaner(p)) {
186 final long startTime = System.nanoTime();
187 final CountDownLatch done = new CountDownLatch(1);
188 Runnable task = new CheckedRunnable() {
189 public void realRun() {
190 done.countDown();
191 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
192 }};
193 ScheduledFuture<?> f =
194 p.scheduleWithFixedDelay(task, timeoutMillis(),
195 LONG_DELAY_MS, MILLISECONDS);
196 await(done);
197 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
198 f.cancel(true);
199 }
200 }
201
202 static class RunnableCounter implements Runnable {
203 AtomicInteger count = new AtomicInteger(0);
204 public void run() { count.getAndIncrement(); }
205 }
206
207 /**
208 * scheduleAtFixedRate executes series of tasks at given rate.
209 * Eventually, it must hold that:
210 * cycles - 1 <= elapsedMillis/delay < cycles
211 */
212 public void testFixedRateSequence() throws InterruptedException {
213 final CustomExecutor p = new CustomExecutor(1);
214 try (PoolCleaner cleaner = cleaner(p)) {
215 for (int delay = 1; delay <= LONG_DELAY_MS; delay *= 3) {
216 final long startTime = System.nanoTime();
217 final int cycles = 8;
218 final CountDownLatch done = new CountDownLatch(cycles);
219 final Runnable task = new CheckedRunnable() {
220 public void realRun() { done.countDown(); }};
221 final ScheduledFuture<?> periodicTask =
222 p.scheduleAtFixedRate(task, 0, delay, MILLISECONDS);
223 final int totalDelayMillis = (cycles - 1) * delay;
224 await(done, totalDelayMillis + LONG_DELAY_MS);
225 periodicTask.cancel(true);
226 final long elapsedMillis = millisElapsedSince(startTime);
227 assertTrue(elapsedMillis >= totalDelayMillis);
228 if (elapsedMillis <= cycles * delay)
229 return;
230 // else retry with longer delay
231 }
232 fail("unexpected execution rate");
233 }
234 }
235
236 /**
237 * scheduleWithFixedDelay executes series of tasks with given period.
238 * Eventually, it must hold that each task starts at least delay and at
239 * most 2 * delay after the termination of the previous task.
240 */
241 public void testFixedDelaySequence() throws InterruptedException {
242 final CustomExecutor p = new CustomExecutor(1);
243 try (PoolCleaner cleaner = cleaner(p)) {
244 for (int delay = 1; delay <= LONG_DELAY_MS; delay *= 3) {
245 final long startTime = System.nanoTime();
246 final AtomicLong previous = new AtomicLong(startTime);
247 final AtomicBoolean tryLongerDelay = new AtomicBoolean(false);
248 final int cycles = 8;
249 final CountDownLatch done = new CountDownLatch(cycles);
250 final int d = delay;
251 final Runnable task = new CheckedRunnable() {
252 public void realRun() {
253 long now = System.nanoTime();
254 long elapsedMillis
255 = NANOSECONDS.toMillis(now - previous.get());
256 if (done.getCount() == cycles) { // first execution
257 if (elapsedMillis >= d)
258 tryLongerDelay.set(true);
259 } else {
260 assertTrue(elapsedMillis >= d);
261 if (elapsedMillis >= 2 * d)
262 tryLongerDelay.set(true);
263 }
264 previous.set(now);
265 done.countDown();
266 }};
267 final ScheduledFuture<?> periodicTask =
268 p.scheduleWithFixedDelay(task, 0, delay, MILLISECONDS);
269 final int totalDelayMillis = (cycles - 1) * delay;
270 await(done, totalDelayMillis + cycles * LONG_DELAY_MS);
271 periodicTask.cancel(true);
272 final long elapsedMillis = millisElapsedSince(startTime);
273 assertTrue(elapsedMillis >= totalDelayMillis);
274 if (!tryLongerDelay.get())
275 return;
276 // else retry with longer delay
277 }
278 fail("unexpected execution rate");
279 }
280 }
281
282 /**
283 * Submitting null tasks throws NullPointerException
284 */
285 public void testNullTaskSubmission() {
286 final CustomExecutor p = new CustomExecutor(1);
287 try (PoolCleaner cleaner = cleaner(p)) {
288 assertNullTaskSubmissionThrowsNullPointerException(p);
289 }
290 }
291
292 /**
293 * Submitted tasks are rejected when shutdown
294 */
295 public void testSubmittedTasksRejectedWhenShutdown() throws InterruptedException {
296 final CustomExecutor p = new CustomExecutor(2);
297 final ThreadLocalRandom rnd = ThreadLocalRandom.current();
298 final CountDownLatch threadsStarted = new CountDownLatch(p.getCorePoolSize());
299 final CountDownLatch done = new CountDownLatch(1);
300 final Runnable r = () -> {
301 threadsStarted.countDown();
302 for (;;) {
303 try {
304 done.await();
305 return;
306 } catch (InterruptedException shutdownNowDeliberatelyIgnored) {}
307 }};
308 final Callable<Boolean> c = () -> {
309 threadsStarted.countDown();
310 for (;;) {
311 try {
312 done.await();
313 return Boolean.TRUE;
314 } catch (InterruptedException shutdownNowDeliberatelyIgnored) {}
315 }};
316
317 try (PoolCleaner cleaner = cleaner(p, done)) {
318 for (int i = p.getCorePoolSize(); i--> 0; ) {
319 switch (rnd.nextInt(4)) {
320 case 0: p.execute(r); break;
321 case 1: assertFalse(p.submit(r).isDone()); break;
322 case 2: assertFalse(p.submit(r, Boolean.TRUE).isDone()); break;
323 case 3: assertFalse(p.submit(c).isDone()); break;
324 }
325 }
326
327 // ScheduledThreadPoolExecutor has an unbounded queue, so never saturated.
328 await(threadsStarted);
329
330 if (rnd.nextBoolean())
331 p.shutdownNow();
332 else
333 p.shutdown();
334 // Pool is shutdown, but not yet terminated
335 assertTaskSubmissionsAreRejected(p);
336 assertFalse(p.isTerminated());
337
338 done.countDown(); // release blocking tasks
339 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
340
341 assertTaskSubmissionsAreRejected(p);
342 }
343 assertEquals(p.getCorePoolSize(), p.getCompletedTaskCount());
344 }
345
346 /**
347 * getActiveCount increases but doesn't overestimate, when a
348 * thread becomes active
349 */
350 public void testGetActiveCount() throws InterruptedException {
351 final CountDownLatch done = new CountDownLatch(1);
352 final ThreadPoolExecutor p = new CustomExecutor(2);
353 try (PoolCleaner cleaner = cleaner(p, done)) {
354 final CountDownLatch threadStarted = new CountDownLatch(1);
355 assertEquals(0, p.getActiveCount());
356 p.execute(new CheckedRunnable() {
357 public void realRun() throws InterruptedException {
358 threadStarted.countDown();
359 assertEquals(1, p.getActiveCount());
360 await(done);
361 }});
362 await(threadStarted);
363 assertEquals(1, p.getActiveCount());
364 }
365 }
366
367 /**
368 * getCompletedTaskCount increases, but doesn't overestimate,
369 * when tasks complete
370 */
371 public void testGetCompletedTaskCount() throws InterruptedException {
372 final ThreadPoolExecutor p = new CustomExecutor(2);
373 try (PoolCleaner cleaner = cleaner(p)) {
374 final CountDownLatch threadStarted = new CountDownLatch(1);
375 final CountDownLatch threadProceed = new CountDownLatch(1);
376 final CountDownLatch threadDone = new CountDownLatch(1);
377 assertEquals(0, p.getCompletedTaskCount());
378 p.execute(new CheckedRunnable() {
379 public void realRun() throws InterruptedException {
380 threadStarted.countDown();
381 assertEquals(0, p.getCompletedTaskCount());
382 await(threadProceed);
383 threadDone.countDown();
384 }});
385 await(threadStarted);
386 assertEquals(0, p.getCompletedTaskCount());
387 threadProceed.countDown();
388 await(threadDone);
389 long startTime = System.nanoTime();
390 while (p.getCompletedTaskCount() != 1) {
391 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
392 fail("timed out");
393 Thread.yield();
394 }
395 }
396 }
397
398 /**
399 * getCorePoolSize returns size given in constructor if not otherwise set
400 */
401 public void testGetCorePoolSize() {
402 final CustomExecutor p = new CustomExecutor(1);
403 try (PoolCleaner cleaner = cleaner(p)) {
404 assertEquals(1, p.getCorePoolSize());
405 }
406 }
407
408 /**
409 * getLargestPoolSize increases, but doesn't overestimate, when
410 * multiple threads active
411 */
412 public void testGetLargestPoolSize() throws InterruptedException {
413 final int THREADS = 3;
414 final CountDownLatch done = new CountDownLatch(1);
415 final ThreadPoolExecutor p = new CustomExecutor(THREADS);
416 try (PoolCleaner cleaner = cleaner(p, done)) {
417 final CountDownLatch threadsStarted = new CountDownLatch(THREADS);
418 assertEquals(0, p.getLargestPoolSize());
419 for (int i = 0; i < THREADS; i++)
420 p.execute(new CheckedRunnable() {
421 public void realRun() throws InterruptedException {
422 threadsStarted.countDown();
423 await(done);
424 assertEquals(THREADS, p.getLargestPoolSize());
425 }});
426 await(threadsStarted);
427 assertEquals(THREADS, p.getLargestPoolSize());
428 }
429 assertEquals(THREADS, p.getLargestPoolSize());
430 }
431
432 /**
433 * getPoolSize increases, but doesn't overestimate, when threads
434 * become active
435 */
436 public void testGetPoolSize() throws InterruptedException {
437 final CountDownLatch done = new CountDownLatch(1);
438 final ThreadPoolExecutor p = new CustomExecutor(1);
439 try (PoolCleaner cleaner = cleaner(p, done)) {
440 final CountDownLatch threadStarted = new CountDownLatch(1);
441 assertEquals(0, p.getPoolSize());
442 p.execute(new CheckedRunnable() {
443 public void realRun() throws InterruptedException {
444 threadStarted.countDown();
445 assertEquals(1, p.getPoolSize());
446 await(done);
447 }});
448 await(threadStarted);
449 assertEquals(1, p.getPoolSize());
450 }
451 }
452
453 /**
454 * getTaskCount increases, but doesn't overestimate, when tasks
455 * submitted
456 */
457 public void testGetTaskCount() throws InterruptedException {
458 final int TASKS = 3;
459 final CountDownLatch done = new CountDownLatch(1);
460 final ThreadPoolExecutor p = new CustomExecutor(1);
461 try (PoolCleaner cleaner = cleaner(p, done)) {
462 final CountDownLatch threadStarted = new CountDownLatch(1);
463 assertEquals(0, p.getTaskCount());
464 assertEquals(0, p.getCompletedTaskCount());
465 p.execute(new CheckedRunnable() {
466 public void realRun() throws InterruptedException {
467 threadStarted.countDown();
468 await(done);
469 }});
470 await(threadStarted);
471 assertEquals(1, p.getTaskCount());
472 assertEquals(0, p.getCompletedTaskCount());
473 for (int i = 0; i < TASKS; i++) {
474 assertEquals(1 + i, p.getTaskCount());
475 p.execute(new CheckedRunnable() {
476 public void realRun() throws InterruptedException {
477 threadStarted.countDown();
478 assertEquals(1 + TASKS, p.getTaskCount());
479 await(done);
480 }});
481 }
482 assertEquals(1 + TASKS, p.getTaskCount());
483 assertEquals(0, p.getCompletedTaskCount());
484 }
485 assertEquals(1 + TASKS, p.getTaskCount());
486 assertEquals(1 + TASKS, p.getCompletedTaskCount());
487 }
488
489 /**
490 * getThreadFactory returns factory in constructor if not set
491 */
492 public void testGetThreadFactory() {
493 final ThreadFactory threadFactory = new SimpleThreadFactory();
494 final CustomExecutor p = new CustomExecutor(1, threadFactory);
495 try (PoolCleaner cleaner = cleaner(p)) {
496 assertSame(threadFactory, p.getThreadFactory());
497 }
498 }
499
500 /**
501 * setThreadFactory sets the thread factory returned by getThreadFactory
502 */
503 public void testSetThreadFactory() {
504 final ThreadFactory threadFactory = new SimpleThreadFactory();
505 final CustomExecutor p = new CustomExecutor(1);
506 try (PoolCleaner cleaner = cleaner(p)) {
507 p.setThreadFactory(threadFactory);
508 assertSame(threadFactory, p.getThreadFactory());
509 }
510 }
511
512 /**
513 * setThreadFactory(null) throws NPE
514 */
515 public void testSetThreadFactoryNull() {
516 final CustomExecutor p = new CustomExecutor(1);
517 try (PoolCleaner cleaner = cleaner(p)) {
518 try {
519 p.setThreadFactory(null);
520 shouldThrow();
521 } catch (NullPointerException success) {}
522 }
523 }
524
525 /**
526 * isShutdown is false before shutdown, true after
527 */
528 public void testIsShutdown() {
529 final CustomExecutor p = new CustomExecutor(1);
530 try (PoolCleaner cleaner = cleaner(p)) {
531 assertFalse(p.isShutdown());
532 try { p.shutdown(); } catch (SecurityException ok) { return; }
533 assertTrue(p.isShutdown());
534 }
535 }
536
537 /**
538 * isTerminated is false before termination, true after
539 */
540 public void testIsTerminated() throws InterruptedException {
541 final CountDownLatch done = new CountDownLatch(1);
542 final ThreadPoolExecutor p = new CustomExecutor(1);
543 try (PoolCleaner cleaner = cleaner(p)) {
544 final CountDownLatch threadStarted = new CountDownLatch(1);
545 p.execute(new CheckedRunnable() {
546 public void realRun() throws InterruptedException {
547 assertFalse(p.isTerminated());
548 threadStarted.countDown();
549 await(done);
550 }});
551 await(threadStarted);
552 assertFalse(p.isTerminated());
553 assertFalse(p.isTerminating());
554 done.countDown();
555 try { p.shutdown(); } catch (SecurityException ok) { return; }
556 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
557 assertTrue(p.isTerminated());
558 }
559 }
560
561 /**
562 * isTerminating is not true when running or when terminated
563 */
564 public void testIsTerminating() throws InterruptedException {
565 final CountDownLatch done = new CountDownLatch(1);
566 final ThreadPoolExecutor p = new CustomExecutor(1);
567 try (PoolCleaner cleaner = cleaner(p)) {
568 final CountDownLatch threadStarted = new CountDownLatch(1);
569 assertFalse(p.isTerminating());
570 p.execute(new CheckedRunnable() {
571 public void realRun() throws InterruptedException {
572 assertFalse(p.isTerminating());
573 threadStarted.countDown();
574 await(done);
575 }});
576 await(threadStarted);
577 assertFalse(p.isTerminating());
578 done.countDown();
579 try { p.shutdown(); } catch (SecurityException ok) { return; }
580 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
581 assertTrue(p.isTerminated());
582 assertFalse(p.isTerminating());
583 }
584 }
585
586 /**
587 * getQueue returns the work queue, which contains queued tasks
588 */
589 public void testGetQueue() throws InterruptedException {
590 final CountDownLatch done = new CountDownLatch(1);
591 final ScheduledThreadPoolExecutor p = new CustomExecutor(1);
592 try (PoolCleaner cleaner = cleaner(p, done)) {
593 final CountDownLatch threadStarted = new CountDownLatch(1);
594 @SuppressWarnings("unchecked")
595 ScheduledFuture<?>[] tasks = (ScheduledFuture<?>[])new ScheduledFuture[5];
596 for (int i = 0; i < tasks.length; i++) {
597 Runnable r = new CheckedRunnable() {
598 public void realRun() throws InterruptedException {
599 threadStarted.countDown();
600 await(done);
601 }};
602 tasks[i] = p.schedule(r, 1, MILLISECONDS);
603 }
604 await(threadStarted);
605 BlockingQueue<Runnable> q = p.getQueue();
606 assertTrue(q.contains(tasks[tasks.length - 1]));
607 assertFalse(q.contains(tasks[0]));
608 }
609 }
610
611 /**
612 * remove(task) removes queued task, and fails to remove active task
613 */
614 public void testRemove() throws InterruptedException {
615 final CountDownLatch done = new CountDownLatch(1);
616 final ScheduledThreadPoolExecutor p = new CustomExecutor(1);
617 try (PoolCleaner cleaner = cleaner(p, done)) {
618 @SuppressWarnings("unchecked")
619 ScheduledFuture<?>[] tasks = (ScheduledFuture<?>[])new ScheduledFuture[5];
620 final CountDownLatch threadStarted = new CountDownLatch(1);
621 for (int i = 0; i < tasks.length; i++) {
622 Runnable r = new CheckedRunnable() {
623 public void realRun() throws InterruptedException {
624 threadStarted.countDown();
625 await(done);
626 }};
627 tasks[i] = p.schedule(r, 1, MILLISECONDS);
628 }
629 await(threadStarted);
630 BlockingQueue<Runnable> q = p.getQueue();
631 assertFalse(p.remove((Runnable)tasks[0]));
632 assertTrue(q.contains((Runnable)tasks[4]));
633 assertTrue(q.contains((Runnable)tasks[3]));
634 assertTrue(p.remove((Runnable)tasks[4]));
635 assertFalse(p.remove((Runnable)tasks[4]));
636 assertFalse(q.contains((Runnable)tasks[4]));
637 assertTrue(q.contains((Runnable)tasks[3]));
638 assertTrue(p.remove((Runnable)tasks[3]));
639 assertFalse(q.contains((Runnable)tasks[3]));
640 }
641 }
642
643 /**
644 * purge removes cancelled tasks from the queue
645 */
646 public void testPurge() throws InterruptedException {
647 @SuppressWarnings("unchecked")
648 ScheduledFuture<?>[] tasks = (ScheduledFuture<?>[])new ScheduledFuture[5];
649 final Runnable releaser = new Runnable() { public void run() {
650 for (ScheduledFuture<?> task : tasks)
651 if (task != null) task.cancel(true); }};
652 final CustomExecutor p = new CustomExecutor(1);
653 try (PoolCleaner cleaner = cleaner(p, releaser)) {
654 for (int i = 0; i < tasks.length; i++)
655 tasks[i] = p.schedule(possiblyInterruptedRunnable(SMALL_DELAY_MS),
656 LONG_DELAY_MS, MILLISECONDS);
657 int max = tasks.length;
658 if (tasks[4].cancel(true)) --max;
659 if (tasks[3].cancel(true)) --max;
660 // There must eventually be an interference-free point at
661 // which purge will not fail. (At worst, when queue is empty.)
662 long startTime = System.nanoTime();
663 do {
664 p.purge();
665 long count = p.getTaskCount();
666 if (count == max)
667 return;
668 } while (millisElapsedSince(startTime) < LONG_DELAY_MS);
669 fail("Purge failed to remove cancelled tasks");
670 }
671 }
672
673 /**
674 * shutdownNow returns a list containing tasks that were not run,
675 * and those tasks are drained from the queue
676 */
677 public void testShutdownNow() throws InterruptedException {
678 final int poolSize = 2;
679 final int count = 5;
680 final AtomicInteger ran = new AtomicInteger(0);
681 final CustomExecutor p = new CustomExecutor(poolSize);
682 final CountDownLatch threadsStarted = new CountDownLatch(poolSize);
683 Runnable waiter = new CheckedRunnable() { public void realRun() {
684 threadsStarted.countDown();
685 try {
686 MILLISECONDS.sleep(LONGER_DELAY_MS);
687 } catch (InterruptedException success) {}
688 ran.getAndIncrement();
689 }};
690 for (int i = 0; i < count; i++)
691 p.execute(waiter);
692 await(threadsStarted);
693 assertEquals(poolSize, p.getActiveCount());
694 assertEquals(0, p.getCompletedTaskCount());
695 final List<Runnable> queuedTasks;
696 try {
697 queuedTasks = p.shutdownNow();
698 } catch (SecurityException ok) {
699 return; // Allowed in case test doesn't have privs
700 }
701 assertTrue(p.isShutdown());
702 assertTrue(p.getQueue().isEmpty());
703 assertEquals(count - poolSize, queuedTasks.size());
704 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
705 assertTrue(p.isTerminated());
706 assertEquals(poolSize, ran.get());
707 assertEquals(poolSize, p.getCompletedTaskCount());
708 }
709
710 /**
711 * shutdownNow returns a list containing tasks that were not run,
712 * and those tasks are drained from the queue
713 */
714 public void testShutdownNow_delayedTasks() throws InterruptedException {
715 final CustomExecutor p = new CustomExecutor(1);
716 List<ScheduledFuture<?>> tasks = new ArrayList<>();
717 for (int i = 0; i < 3; i++) {
718 Runnable r = new NoOpRunnable();
719 tasks.add(p.schedule(r, 9, SECONDS));
720 tasks.add(p.scheduleAtFixedRate(r, 9, 9, SECONDS));
721 tasks.add(p.scheduleWithFixedDelay(r, 9, 9, SECONDS));
722 }
723 if (testImplementationDetails)
724 assertEquals(new HashSet<Object>(tasks), new HashSet<Object>(p.getQueue()));
725 final List<Runnable> queuedTasks;
726 try {
727 queuedTasks = p.shutdownNow();
728 } catch (SecurityException ok) {
729 return; // Allowed in case test doesn't have privs
730 }
731 assertTrue(p.isShutdown());
732 assertTrue(p.getQueue().isEmpty());
733 if (testImplementationDetails)
734 assertEquals(new HashSet<Object>(tasks), new HashSet<Object>(queuedTasks));
735 assertEquals(tasks.size(), queuedTasks.size());
736 for (ScheduledFuture<?> task : tasks) {
737 assertFalse(((CustomTask)task).ran);
738 assertFalse(task.isDone());
739 assertFalse(task.isCancelled());
740 }
741 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
742 assertTrue(p.isTerminated());
743 }
744
745 /**
746 * By default, periodic tasks are cancelled at shutdown.
747 * By default, delayed tasks keep running after shutdown.
748 * Check that changing the default values work:
749 * - setExecuteExistingDelayedTasksAfterShutdownPolicy
750 * - setContinueExistingPeriodicTasksAfterShutdownPolicy
751 */
752 @SuppressWarnings("FutureReturnValueIgnored")
753 public void testShutdown_cancellation() throws Exception {
754 final int poolSize = 4;
755 final CustomExecutor p = new CustomExecutor(poolSize);
756 final BlockingQueue<Runnable> q = p.getQueue();
757 final ThreadLocalRandom rnd = ThreadLocalRandom.current();
758 final long delay = rnd.nextInt(2);
759 final int rounds = rnd.nextInt(1, 3);
760 final boolean effectiveDelayedPolicy;
761 final boolean effectivePeriodicPolicy;
762 final boolean effectiveRemovePolicy;
763
764 if (rnd.nextBoolean())
765 p.setExecuteExistingDelayedTasksAfterShutdownPolicy(
766 effectiveDelayedPolicy = rnd.nextBoolean());
767 else
768 effectiveDelayedPolicy = true;
769 assertEquals(effectiveDelayedPolicy,
770 p.getExecuteExistingDelayedTasksAfterShutdownPolicy());
771
772 if (rnd.nextBoolean())
773 p.setContinueExistingPeriodicTasksAfterShutdownPolicy(
774 effectivePeriodicPolicy = rnd.nextBoolean());
775 else
776 effectivePeriodicPolicy = false;
777 assertEquals(effectivePeriodicPolicy,
778 p.getContinueExistingPeriodicTasksAfterShutdownPolicy());
779
780 if (rnd.nextBoolean())
781 p.setRemoveOnCancelPolicy(
782 effectiveRemovePolicy = rnd.nextBoolean());
783 else
784 effectiveRemovePolicy = false;
785 assertEquals(effectiveRemovePolicy,
786 p.getRemoveOnCancelPolicy());
787
788 final boolean periodicTasksContinue = effectivePeriodicPolicy && rnd.nextBoolean();
789
790 // Strategy: Wedge the pool with one wave of "blocker" tasks,
791 // then add a second wave that waits in the queue until unblocked.
792 final AtomicInteger ran = new AtomicInteger(0);
793 final CountDownLatch poolBlocked = new CountDownLatch(poolSize);
794 final CountDownLatch unblock = new CountDownLatch(1);
795 final RuntimeException exception = new RuntimeException();
796
797 class Task implements Runnable {
798 public void run() {
799 try {
800 ran.getAndIncrement();
801 poolBlocked.countDown();
802 await(unblock);
803 } catch (Throwable fail) { threadUnexpectedException(fail); }
804 }
805 }
806
807 class PeriodicTask extends Task {
808 PeriodicTask(int rounds) { this.rounds = rounds; }
809 int rounds;
810 public void run() {
811 if (--rounds == 0) super.run();
812 // throw exception to surely terminate this periodic task,
813 // but in a separate execution and in a detectable way.
814 if (rounds == -1) throw exception;
815 }
816 }
817
818 Runnable task = new Task();
819
820 List<Future<?>> immediates = new ArrayList<>();
821 List<Future<?>> delayeds = new ArrayList<>();
822 List<Future<?>> periodics = new ArrayList<>();
823
824 immediates.add(p.submit(task));
825 delayeds.add(p.schedule(task, delay, MILLISECONDS));
826 periodics.add(p.scheduleAtFixedRate(
827 new PeriodicTask(rounds), delay, 1, MILLISECONDS));
828 periodics.add(p.scheduleWithFixedDelay(
829 new PeriodicTask(rounds), delay, 1, MILLISECONDS));
830
831 await(poolBlocked);
832
833 assertEquals(poolSize, ran.get());
834 assertEquals(poolSize, p.getActiveCount());
835 assertTrue(q.isEmpty());
836
837 // Add second wave of tasks.
838 immediates.add(p.submit(task));
839 delayeds.add(p.schedule(task, effectiveDelayedPolicy ? delay : LONG_DELAY_MS, MILLISECONDS));
840 periodics.add(p.scheduleAtFixedRate(
841 new PeriodicTask(rounds), delay, 1, MILLISECONDS));
842 periodics.add(p.scheduleWithFixedDelay(
843 new PeriodicTask(rounds), delay, 1, MILLISECONDS));
844
845 assertEquals(poolSize, q.size());
846 assertEquals(poolSize, ran.get());
847
848 immediates.forEach(
849 f -> assertTrue(((ScheduledFuture)f).getDelay(NANOSECONDS) <= 0L));
850
851 Stream.of(immediates, delayeds, periodics).flatMap(Collection::stream)
852 .forEach(f -> assertFalse(f.isDone()));
853
854 try { p.shutdown(); } catch (SecurityException ok) { return; }
855 assertTrue(p.isShutdown());
856 assertTrue(p.isTerminating());
857 assertFalse(p.isTerminated());
858
859 if (rnd.nextBoolean())
860 assertThrows(
861 RejectedExecutionException.class,
862 () -> p.submit(task),
863 () -> p.schedule(task, 1, SECONDS),
864 () -> p.scheduleAtFixedRate(
865 new PeriodicTask(1), 1, 1, SECONDS),
866 () -> p.scheduleWithFixedDelay(
867 new PeriodicTask(2), 1, 1, SECONDS));
868
869 assertTrue(q.contains(immediates.get(1)));
870 assertTrue(!effectiveDelayedPolicy
871 ^ q.contains(delayeds.get(1)));
872 assertTrue(!effectivePeriodicPolicy
873 ^ q.containsAll(periodics.subList(2, 4)));
874
875 immediates.forEach(f -> assertFalse(f.isDone()));
876
877 assertFalse(delayeds.get(0).isDone());
878 if (effectiveDelayedPolicy)
879 assertFalse(delayeds.get(1).isDone());
880 else
881 assertTrue(delayeds.get(1).isCancelled());
882
883 if (effectivePeriodicPolicy)
884 periodics.forEach(
885 f -> {
886 assertFalse(f.isDone());
887 if (!periodicTasksContinue) {
888 assertTrue(f.cancel(false));
889 assertTrue(f.isCancelled());
890 }
891 });
892 else {
893 periodics.subList(0, 2).forEach(f -> assertFalse(f.isDone()));
894 periodics.subList(2, 4).forEach(f -> assertTrue(f.isCancelled()));
895 }
896
897 unblock.countDown(); // Release all pool threads
898
899 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
900 assertFalse(p.isTerminating());
901 assertTrue(p.isTerminated());
902
903 assertTrue(q.isEmpty());
904
905 Stream.of(immediates, delayeds, periodics).flatMap(Collection::stream)
906 .forEach(f -> assertTrue(f.isDone()));
907
908 for (Future<?> f : immediates) assertNull(f.get());
909
910 assertNull(delayeds.get(0).get());
911 if (effectiveDelayedPolicy)
912 assertNull(delayeds.get(1).get());
913 else
914 assertTrue(delayeds.get(1).isCancelled());
915
916 if (periodicTasksContinue)
917 periodics.forEach(
918 f -> {
919 try { f.get(); }
920 catch (ExecutionException success) {
921 assertSame(exception, success.getCause());
922 }
923 catch (Throwable fail) { threadUnexpectedException(fail); }
924 });
925 else
926 periodics.forEach(f -> assertTrue(f.isCancelled()));
927
928 assertEquals(poolSize + 1
929 + (effectiveDelayedPolicy ? 1 : 0)
930 + (periodicTasksContinue ? 2 : 0),
931 ran.get());
932 }
933
934 /**
935 * completed submit of callable returns result
936 */
937 public void testSubmitCallable() throws Exception {
938 final ExecutorService e = new CustomExecutor(2);
939 try (PoolCleaner cleaner = cleaner(e)) {
940 Future<String> future = e.submit(new StringTask());
941 String result = future.get();
942 assertSame(TEST_STRING, result);
943 }
944 }
945
946 /**
947 * completed submit of runnable returns successfully
948 */
949 public void testSubmitRunnable() throws Exception {
950 final ExecutorService e = new CustomExecutor(2);
951 try (PoolCleaner cleaner = cleaner(e)) {
952 Future<?> future = e.submit(new NoOpRunnable());
953 future.get();
954 assertTrue(future.isDone());
955 }
956 }
957
958 /**
959 * completed submit of (runnable, result) returns result
960 */
961 public void testSubmitRunnable2() throws Exception {
962 final ExecutorService e = new CustomExecutor(2);
963 try (PoolCleaner cleaner = cleaner(e)) {
964 Future<String> future = e.submit(new NoOpRunnable(), TEST_STRING);
965 String result = future.get();
966 assertSame(TEST_STRING, result);
967 }
968 }
969
970 /**
971 * invokeAny(null) throws NPE
972 */
973 public void testInvokeAny1() throws Exception {
974 final ExecutorService e = new CustomExecutor(2);
975 try (PoolCleaner cleaner = cleaner(e)) {
976 try {
977 e.invokeAny(null);
978 shouldThrow();
979 } catch (NullPointerException success) {}
980 }
981 }
982
983 /**
984 * invokeAny(empty collection) throws IllegalArgumentException
985 */
986 public void testInvokeAny2() throws Exception {
987 final ExecutorService e = new CustomExecutor(2);
988 try (PoolCleaner cleaner = cleaner(e)) {
989 try {
990 e.invokeAny(new ArrayList<Callable<String>>());
991 shouldThrow();
992 } catch (IllegalArgumentException success) {}
993 }
994 }
995
996 /**
997 * invokeAny(c) throws NPE if c has null elements
998 */
999 public void testInvokeAny3() throws Exception {
1000 final CountDownLatch latch = new CountDownLatch(1);
1001 final ExecutorService e = new CustomExecutor(2);
1002 try (PoolCleaner cleaner = cleaner(e)) {
1003 List<Callable<String>> l = new ArrayList<>();
1004 l.add(latchAwaitingStringTask(latch));
1005 l.add(null);
1006 try {
1007 e.invokeAny(l);
1008 shouldThrow();
1009 } catch (NullPointerException success) {}
1010 latch.countDown();
1011 }
1012 }
1013
1014 /**
1015 * invokeAny(c) throws ExecutionException if no task completes
1016 */
1017 public void testInvokeAny4() throws Exception {
1018 final ExecutorService e = new CustomExecutor(2);
1019 try (PoolCleaner cleaner = cleaner(e)) {
1020 List<Callable<String>> l = new ArrayList<>();
1021 l.add(new NPETask());
1022 try {
1023 e.invokeAny(l);
1024 shouldThrow();
1025 } catch (ExecutionException success) {
1026 assertTrue(success.getCause() instanceof NullPointerException);
1027 }
1028 }
1029 }
1030
1031 /**
1032 * invokeAny(c) returns result of some task
1033 */
1034 public void testInvokeAny5() throws Exception {
1035 final ExecutorService e = new CustomExecutor(2);
1036 try (PoolCleaner cleaner = cleaner(e)) {
1037 List<Callable<String>> l = new ArrayList<>();
1038 l.add(new StringTask());
1039 l.add(new StringTask());
1040 String result = e.invokeAny(l);
1041 assertSame(TEST_STRING, result);
1042 }
1043 }
1044
1045 /**
1046 * invokeAll(null) throws NPE
1047 */
1048 public void testInvokeAll1() throws Exception {
1049 final ExecutorService e = new CustomExecutor(2);
1050 try (PoolCleaner cleaner = cleaner(e)) {
1051 try {
1052 e.invokeAll(null);
1053 shouldThrow();
1054 } catch (NullPointerException success) {}
1055 }
1056 }
1057
1058 /**
1059 * invokeAll(empty collection) returns empty list
1060 */
1061 public void testInvokeAll2() throws Exception {
1062 final ExecutorService e = new CustomExecutor(2);
1063 final Collection<Callable<String>> emptyCollection
1064 = Collections.emptyList();
1065 try (PoolCleaner cleaner = cleaner(e)) {
1066 List<Future<String>> r = e.invokeAll(emptyCollection);
1067 assertTrue(r.isEmpty());
1068 }
1069 }
1070
1071 /**
1072 * invokeAll(c) throws NPE if c has null elements
1073 */
1074 public void testInvokeAll3() throws Exception {
1075 final ExecutorService e = new CustomExecutor(2);
1076 try (PoolCleaner cleaner = cleaner(e)) {
1077 List<Callable<String>> l = new ArrayList<>();
1078 l.add(new StringTask());
1079 l.add(null);
1080 try {
1081 e.invokeAll(l);
1082 shouldThrow();
1083 } catch (NullPointerException success) {}
1084 }
1085 }
1086
1087 /**
1088 * get of invokeAll(c) throws exception on failed task
1089 */
1090 public void testInvokeAll4() throws Exception {
1091 final ExecutorService e = new CustomExecutor(2);
1092 try (PoolCleaner cleaner = cleaner(e)) {
1093 List<Callable<String>> l = new ArrayList<>();
1094 l.add(new NPETask());
1095 List<Future<String>> futures = e.invokeAll(l);
1096 assertEquals(1, futures.size());
1097 try {
1098 futures.get(0).get();
1099 shouldThrow();
1100 } catch (ExecutionException success) {
1101 assertTrue(success.getCause() instanceof NullPointerException);
1102 }
1103 }
1104 }
1105
1106 /**
1107 * invokeAll(c) returns results of all completed tasks
1108 */
1109 public void testInvokeAll5() throws Exception {
1110 final ExecutorService e = new CustomExecutor(2);
1111 try (PoolCleaner cleaner = cleaner(e)) {
1112 List<Callable<String>> l = new ArrayList<>();
1113 l.add(new StringTask());
1114 l.add(new StringTask());
1115 List<Future<String>> futures = e.invokeAll(l);
1116 assertEquals(2, futures.size());
1117 for (Future<String> future : futures)
1118 assertSame(TEST_STRING, future.get());
1119 }
1120 }
1121
1122 /**
1123 * timed invokeAny(null) throws NPE
1124 */
1125 public void testTimedInvokeAny1() throws Exception {
1126 final ExecutorService e = new CustomExecutor(2);
1127 try (PoolCleaner cleaner = cleaner(e)) {
1128 try {
1129 e.invokeAny(null, randomTimeout(), randomTimeUnit());
1130 shouldThrow();
1131 } catch (NullPointerException success) {}
1132 }
1133 }
1134
1135 /**
1136 * timed invokeAny(,,null) throws NPE
1137 */
1138 public void testTimedInvokeAnyNullTimeUnit() throws Exception {
1139 final ExecutorService e = new CustomExecutor(2);
1140 try (PoolCleaner cleaner = cleaner(e)) {
1141 List<Callable<String>> l = new ArrayList<>();
1142 l.add(new StringTask());
1143 try {
1144 e.invokeAny(l, randomTimeout(), null);
1145 shouldThrow();
1146 } catch (NullPointerException success) {}
1147 }
1148 }
1149
1150 /**
1151 * timed invokeAny(empty collection) throws IllegalArgumentException
1152 */
1153 public void testTimedInvokeAny2() throws Exception {
1154 final ExecutorService e = new CustomExecutor(2);
1155 final Collection<Callable<String>> emptyCollection
1156 = Collections.emptyList();
1157 try (PoolCleaner cleaner = cleaner(e)) {
1158 try {
1159 e.invokeAny(emptyCollection, randomTimeout(), randomTimeUnit());
1160 shouldThrow();
1161 } catch (IllegalArgumentException success) {}
1162 }
1163 }
1164
1165 /**
1166 * timed invokeAny(c) throws NPE if c has null elements
1167 */
1168 public void testTimedInvokeAny3() throws Exception {
1169 CountDownLatch latch = new CountDownLatch(1);
1170 final ExecutorService e = new CustomExecutor(2);
1171 try (PoolCleaner cleaner = cleaner(e)) {
1172 List<Callable<String>> l = new ArrayList<>();
1173 l.add(latchAwaitingStringTask(latch));
1174 l.add(null);
1175 try {
1176 e.invokeAny(l, randomTimeout(), randomTimeUnit());
1177 shouldThrow();
1178 } catch (NullPointerException success) {}
1179 latch.countDown();
1180 }
1181 }
1182
1183 /**
1184 * timed invokeAny(c) throws ExecutionException if no task completes
1185 */
1186 public void testTimedInvokeAny4() throws Exception {
1187 final ExecutorService e = new CustomExecutor(2);
1188 try (PoolCleaner cleaner = cleaner(e)) {
1189 long startTime = System.nanoTime();
1190 List<Callable<String>> l = new ArrayList<>();
1191 l.add(new NPETask());
1192 try {
1193 e.invokeAny(l, LONG_DELAY_MS, MILLISECONDS);
1194 shouldThrow();
1195 } catch (ExecutionException success) {
1196 assertTrue(success.getCause() instanceof NullPointerException);
1197 }
1198 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1199 }
1200 }
1201
1202 /**
1203 * timed invokeAny(c) returns result of some task
1204 */
1205 public void testTimedInvokeAny5() throws Exception {
1206 final ExecutorService e = new CustomExecutor(2);
1207 try (PoolCleaner cleaner = cleaner(e)) {
1208 long startTime = System.nanoTime();
1209 List<Callable<String>> l = new ArrayList<>();
1210 l.add(new StringTask());
1211 l.add(new StringTask());
1212 String result = e.invokeAny(l, LONG_DELAY_MS, MILLISECONDS);
1213 assertSame(TEST_STRING, result);
1214 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1215 }
1216 }
1217
1218 /**
1219 * timed invokeAll(null) throws NullPointerException
1220 */
1221 public void testTimedInvokeAll1() throws Exception {
1222 final ExecutorService e = new CustomExecutor(2);
1223 try (PoolCleaner cleaner = cleaner(e)) {
1224 try {
1225 e.invokeAll(null, randomTimeout(), randomTimeUnit());
1226 shouldThrow();
1227 } catch (NullPointerException success) {}
1228 }
1229 }
1230
1231 /**
1232 * timed invokeAll(,,null) throws NullPointerException
1233 */
1234 public void testTimedInvokeAllNullTimeUnit() throws Exception {
1235 final ExecutorService e = new CustomExecutor(2);
1236 try (PoolCleaner cleaner = cleaner(e)) {
1237 List<Callable<String>> l = new ArrayList<>();
1238 l.add(new StringTask());
1239 try {
1240 e.invokeAll(l, randomTimeout(), null);
1241 shouldThrow();
1242 } catch (NullPointerException success) {}
1243 }
1244 }
1245
1246 /**
1247 * timed invokeAll(empty collection) returns empty list
1248 */
1249 public void testTimedInvokeAll2() throws Exception {
1250 final ExecutorService e = new CustomExecutor(2);
1251 final Collection<Callable<String>> emptyCollection
1252 = Collections.emptyList();
1253 try (PoolCleaner cleaner = cleaner(e)) {
1254 List<Future<String>> r =
1255 e.invokeAll(emptyCollection, randomTimeout(), randomTimeUnit());
1256 assertTrue(r.isEmpty());
1257 }
1258 }
1259
1260 /**
1261 * timed invokeAll(c) throws NPE if c has null elements
1262 */
1263 public void testTimedInvokeAll3() throws Exception {
1264 final ExecutorService e = new CustomExecutor(2);
1265 try (PoolCleaner cleaner = cleaner(e)) {
1266 List<Callable<String>> l = new ArrayList<>();
1267 l.add(new StringTask());
1268 l.add(null);
1269 try {
1270 e.invokeAll(l, randomTimeout(), randomTimeUnit());
1271 shouldThrow();
1272 } catch (NullPointerException success) {}
1273 }
1274 }
1275
1276 /**
1277 * get of element of invokeAll(c) throws exception on failed task
1278 */
1279 public void testTimedInvokeAll4() throws Exception {
1280 final ExecutorService e = new CustomExecutor(2);
1281 final Collection<Callable<String>> c = new ArrayList<>();
1282 c.add(new NPETask());
1283 try (PoolCleaner cleaner = cleaner(e)) {
1284 List<Future<String>> futures =
1285 e.invokeAll(c, LONG_DELAY_MS, MILLISECONDS);
1286 assertEquals(1, futures.size());
1287 try {
1288 futures.get(0).get();
1289 shouldThrow();
1290 } catch (ExecutionException success) {
1291 assertTrue(success.getCause() instanceof NullPointerException);
1292 }
1293 }
1294 }
1295
1296 /**
1297 * timed invokeAll(c) returns results of all completed tasks
1298 */
1299 public void testTimedInvokeAll5() throws Exception {
1300 final ExecutorService e = new CustomExecutor(2);
1301 try (PoolCleaner cleaner = cleaner(e)) {
1302 List<Callable<String>> l = new ArrayList<>();
1303 l.add(new StringTask());
1304 l.add(new StringTask());
1305 List<Future<String>> futures =
1306 e.invokeAll(l, LONG_DELAY_MS, MILLISECONDS);
1307 assertEquals(2, futures.size());
1308 for (Future<String> future : futures)
1309 assertSame(TEST_STRING, future.get());
1310 }
1311 }
1312
1313 /**
1314 * timed invokeAll(c) cancels tasks not completed by timeout
1315 */
1316 public void testTimedInvokeAll6() throws Exception {
1317 for (long timeout = timeoutMillis();;) {
1318 final CountDownLatch done = new CountDownLatch(1);
1319 final Callable<String> waiter = new CheckedCallable<>() {
1320 public String realCall() {
1321 try { done.await(LONG_DELAY_MS, MILLISECONDS); }
1322 catch (InterruptedException ok) {}
1323 return "1"; }};
1324 final ExecutorService p = new CustomExecutor(2);
1325 try (PoolCleaner cleaner = cleaner(p, done)) {
1326 List<Callable<String>> tasks = new ArrayList<>();
1327 tasks.add(new StringTask("0"));
1328 tasks.add(waiter);
1329 tasks.add(new StringTask("2"));
1330 long startTime = System.nanoTime();
1331 List<Future<String>> futures =
1332 p.invokeAll(tasks, timeout, MILLISECONDS);
1333 assertEquals(tasks.size(), futures.size());
1334 assertTrue(millisElapsedSince(startTime) >= timeout);
1335 for (Future<?> future : futures)
1336 assertTrue(future.isDone());
1337 assertTrue(futures.get(1).isCancelled());
1338 try {
1339 assertEquals("0", futures.get(0).get());
1340 assertEquals("2", futures.get(2).get());
1341 break;
1342 } catch (CancellationException retryWithLongerTimeout) {
1343 timeout *= 2;
1344 if (timeout >= LONG_DELAY_MS / 2)
1345 fail("expected exactly one task to be cancelled");
1346 }
1347 }
1348 }
1349 }
1350
1351 }