ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/ExecutorCompletionServiceTest.java
Revision: 1.28
Committed: Sat Mar 25 21:41:10 2017 UTC (7 years, 1 month ago) by jsr166
Branch: MAIN
Changes since 1.27: +1 -1 lines
Log Message:
use await(CountDownLatch) pervasively

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include Andrew Wright, Jeffrey Hayes,
6 * Pat Fisher, Mike Judd.
7 */
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10
11 import java.util.concurrent.ArrayBlockingQueue;
12 import java.util.concurrent.Callable;
13 import java.util.concurrent.CompletionService;
14 import java.util.concurrent.CountDownLatch;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.ExecutorCompletionService;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Future;
19 import java.util.concurrent.FutureTask;
20 import java.util.concurrent.RunnableFuture;
21 import java.util.concurrent.ThreadPoolExecutor;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.atomic.AtomicBoolean;
24
25 import junit.framework.Test;
26 import junit.framework.TestSuite;
27
28 public class ExecutorCompletionServiceTest extends JSR166TestCase {
29 public static void main(String[] args) {
30 main(suite(), args);
31 }
32 public static Test suite() {
33 return new TestSuite(ExecutorCompletionServiceTest.class);
34 }
35
36 /**
37 * new ExecutorCompletionService(null) throws NullPointerException
38 */
39 public void testConstructorNPE() {
40 try {
41 new ExecutorCompletionService(null);
42 shouldThrow();
43 } catch (NullPointerException success) {}
44 }
45
46 /**
47 * new ExecutorCompletionService(e, null) throws NullPointerException
48 */
49 public void testConstructorNPE2() {
50 try {
51 new ExecutorCompletionService(cachedThreadPool, null);
52 shouldThrow();
53 } catch (NullPointerException success) {}
54 }
55
56 /**
57 * ecs.submit(null) throws NullPointerException
58 */
59 public void testSubmitNullCallable() {
60 CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
61 try {
62 cs.submit((Callable) null);
63 shouldThrow();
64 } catch (NullPointerException success) {}
65 }
66
67 /**
68 * ecs.submit(null, val) throws NullPointerException
69 */
70 public void testSubmitNullRunnable() {
71 CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
72 try {
73 cs.submit((Runnable) null, Boolean.TRUE);
74 shouldThrow();
75 } catch (NullPointerException success) {}
76 }
77
78 /**
79 * A taken submitted task is completed
80 */
81 public void testTake()
82 throws InterruptedException, ExecutionException {
83 CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
84 cs.submit(new StringTask());
85 Future f = cs.take();
86 assertTrue(f.isDone());
87 assertSame(TEST_STRING, f.get());
88 }
89
90 /**
91 * Take returns the same future object returned by submit
92 */
93 public void testTake2() throws InterruptedException {
94 CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
95 Future f1 = cs.submit(new StringTask());
96 Future f2 = cs.take();
97 assertSame(f1, f2);
98 }
99
100 /**
101 * poll returns non-null when the returned task is completed
102 */
103 public void testPoll1()
104 throws InterruptedException, ExecutionException {
105 CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
106 assertNull(cs.poll());
107 cs.submit(new StringTask());
108
109 long startTime = System.nanoTime();
110 Future f;
111 while ((f = cs.poll()) == null) {
112 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
113 fail("timed out");
114 Thread.yield();
115 }
116 assertTrue(f.isDone());
117 assertSame(TEST_STRING, f.get());
118 }
119
120 /**
121 * timed poll returns non-null when the returned task is completed
122 */
123 public void testPoll2()
124 throws InterruptedException, ExecutionException {
125 CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
126 assertNull(cs.poll());
127 cs.submit(new StringTask());
128
129 long startTime = System.nanoTime();
130 Future f;
131 while ((f = cs.poll(SHORT_DELAY_MS, MILLISECONDS)) == null) {
132 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
133 fail("timed out");
134 Thread.yield();
135 }
136 assertTrue(f.isDone());
137 assertSame(TEST_STRING, f.get());
138 }
139
140 /**
141 * poll returns null before the returned task is completed
142 */
143 public void testPollReturnsNull()
144 throws InterruptedException, ExecutionException {
145 CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
146 final CountDownLatch proceed = new CountDownLatch(1);
147 cs.submit(new Callable() { public String call() throws Exception {
148 await(proceed);
149 return TEST_STRING;
150 }});
151 assertNull(cs.poll());
152 assertNull(cs.poll(0L, MILLISECONDS));
153 assertNull(cs.poll(Long.MIN_VALUE, MILLISECONDS));
154 long startTime = System.nanoTime();
155 assertNull(cs.poll(timeoutMillis(), MILLISECONDS));
156 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
157 proceed.countDown();
158 assertSame(TEST_STRING, cs.take().get());
159 }
160
161 /**
162 * successful and failed tasks are both returned
163 */
164 public void testTaskAssortment()
165 throws InterruptedException, ExecutionException {
166 CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
167 ArithmeticException ex = new ArithmeticException();
168 for (int i = 0; i < 2; i++) {
169 cs.submit(new StringTask());
170 cs.submit(callableThrowing(ex));
171 cs.submit(runnableThrowing(ex), null);
172 }
173 int normalCompletions = 0;
174 int exceptionalCompletions = 0;
175 for (int i = 0; i < 3 * 2; i++) {
176 try {
177 if (cs.take().get() == TEST_STRING)
178 normalCompletions++;
179 }
180 catch (ExecutionException expected) {
181 assertTrue(expected.getCause() instanceof ArithmeticException);
182 exceptionalCompletions++;
183 }
184 }
185 assertEquals(2 * 1, normalCompletions);
186 assertEquals(2 * 2, exceptionalCompletions);
187 assertNull(cs.poll());
188 }
189
190 /**
191 * Submitting to underlying AES that overrides newTaskFor(Callable)
192 * returns and eventually runs Future returned by newTaskFor.
193 */
194 public void testNewTaskForCallable() throws InterruptedException {
195 final AtomicBoolean done = new AtomicBoolean(false);
196 class MyCallableFuture<V> extends FutureTask<V> {
197 MyCallableFuture(Callable<V> c) { super(c); }
198 @Override protected void done() { done.set(true); }
199 }
200 final ExecutorService e =
201 new ThreadPoolExecutor(1, 1,
202 30L, TimeUnit.SECONDS,
203 new ArrayBlockingQueue<Runnable>(1)) {
204 protected <T> RunnableFuture<T> newTaskFor(Callable<T> c) {
205 return new MyCallableFuture<T>(c);
206 }};
207 CompletionService<String> cs = new ExecutorCompletionService<>(e);
208 try (PoolCleaner cleaner = cleaner(e)) {
209 assertNull(cs.poll());
210 Callable<String> c = new StringTask();
211 Future f1 = cs.submit(c);
212 assertTrue("submit must return MyCallableFuture",
213 f1 instanceof MyCallableFuture);
214 Future f2 = cs.take();
215 assertSame("submit and take must return same objects", f1, f2);
216 assertTrue("completed task must have set done", done.get());
217 }
218 }
219
220 /**
221 * Submitting to underlying AES that overrides newTaskFor(Runnable,T)
222 * returns and eventually runs Future returned by newTaskFor.
223 */
224 public void testNewTaskForRunnable() throws InterruptedException {
225 final AtomicBoolean done = new AtomicBoolean(false);
226 class MyRunnableFuture<V> extends FutureTask<V> {
227 MyRunnableFuture(Runnable t, V r) { super(t, r); }
228 @Override protected void done() { done.set(true); }
229 }
230 final ExecutorService e =
231 new ThreadPoolExecutor(1, 1,
232 30L, TimeUnit.SECONDS,
233 new ArrayBlockingQueue<Runnable>(1)) {
234 protected <T> RunnableFuture<T> newTaskFor(Runnable t, T r) {
235 return new MyRunnableFuture<T>(t, r);
236 }};
237 CompletionService<String> cs = new ExecutorCompletionService<>(e);
238 try (PoolCleaner cleaner = cleaner(e)) {
239 assertNull(cs.poll());
240 Runnable r = new NoOpRunnable();
241 Future f1 = cs.submit(r, null);
242 assertTrue("submit must return MyRunnableFuture",
243 f1 instanceof MyRunnableFuture);
244 Future f2 = cs.take();
245 assertSame("submit and take must return same objects", f1, f2);
246 assertTrue("completed task must have set done", done.get());
247 }
248 }
249
250 }