ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SynchronousQueueTest.java
(Generate patch)

Comparing jsr166/src/test/tck/SynchronousQueueTest.java (file contents):
Revision 1.19 by jsr166, Tue Dec 1 06:03:49 2009 UTC vs.
Revision 1.35 by jsr166, Fri May 27 20:07:24 2011 UTC

# Line 1 | Line 1
1   /*
2   * Written by Doug Lea with assistance from members of JCP JSR-166
3   * Expert Group and released to the public domain, as explained at
4 < * http://creativecommons.org/licenses/publicdomain
4 > * http://creativecommons.org/publicdomain/zero/1.0/
5   * Other contributors include Andrew Wright, Jeffrey Hayes,
6   * Pat Fisher, Mike Judd.
7   */
# Line 14 | Line 14 | import java.io.*;
14  
15   public class SynchronousQueueTest extends JSR166TestCase {
16  
17 <    public static void main(String[] args) {
18 <        junit.textui.TestRunner.run (suite());
17 >    public static class Fair extends BlockingQueueTest {
18 >        protected BlockingQueue emptyCollection() {
19 >            return new SynchronousQueue(true);
20 >        }
21      }
22  
23 <    public static Test suite() {
24 <        return new TestSuite(SynchronousQueueTest.class);
23 >    public static class NonFair extends BlockingQueueTest {
24 >        protected BlockingQueue emptyCollection() {
25 >            return new SynchronousQueue(false);
26 >        }
27      }
28  
29 <    /**
30 <     * A SynchronousQueue is both empty and full
31 <     */
32 <    public void testEmptyFull() {
33 <        SynchronousQueue q = new SynchronousQueue();
34 <        assertTrue(q.isEmpty());
35 <        assertEquals(0, q.size());
36 <        assertEquals(0, q.remainingCapacity());
33 <        assertFalse(q.offer(zero));
29 >    public static void main(String[] args) {
30 >        junit.textui.TestRunner.run(suite());
31 >    }
32 >
33 >    public static Test suite() {
34 >        return newTestSuite(SynchronousQueueTest.class,
35 >                            new Fair().testSuite(),
36 >                            new NonFair().testSuite());
37      }
38  
39      /**
40 <     * A fair SynchronousQueue is both empty and full
40 >     * Any SynchronousQueue is both empty and full
41       */
42 <    public void testFairEmptyFull() {
43 <        SynchronousQueue q = new SynchronousQueue(true);
42 >    public void testEmptyFull()      { testEmptyFull(false); }
43 >    public void testEmptyFull_fair() { testEmptyFull(true); }
44 >    public void testEmptyFull(boolean fair) {
45 >        final SynchronousQueue q = new SynchronousQueue(fair);
46          assertTrue(q.isEmpty());
47          assertEquals(0, q.size());
48          assertEquals(0, q.remainingCapacity());
# Line 119 | Line 124 | public class SynchronousQueueTest extend
124              shouldThrow();
125          } catch (NullPointerException success) {}
126      }
127 +
128      /**
129       * addAll throws ISE if no active taker
130       */
# Line 142 | Line 148 | public class SynchronousQueueTest extend
148              q.put(null);
149              shouldThrow();
150          } catch (NullPointerException success) {}
145     }
146
147    /**
148     * put blocks interruptibly if no active taker
149     */
150    public void testBlockingPut() throws InterruptedException {
151        Thread t = new Thread(new CheckedInterruptedRunnable() {
152            public void realRun() throws InterruptedException {
153                SynchronousQueue q = new SynchronousQueue();
154                q.put(zero);
155            }});
156
157        t.start();
158        Thread.sleep(SHORT_DELAY_MS);
159        t.interrupt();
160        t.join();
151      }
152  
153      /**
154 <     * put blocks waiting for take
154 >     * put blocks interruptibly if no active taker
155       */
156 <    public void testPutWithTake() throws InterruptedException {
157 <        final SynchronousQueue q = new SynchronousQueue();
158 <        Thread t = new Thread(new CheckedRunnable() {
156 >    public void testBlockingPut()      { testBlockingPut(false); }
157 >    public void testBlockingPut_fair() { testBlockingPut(true); }
158 >    public void testBlockingPut(boolean fair) {
159 >        final SynchronousQueue q = new SynchronousQueue(fair);
160 >        final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
161 >        Thread t = newStartedThread(new CheckedRunnable() {
162              public void realRun() throws InterruptedException {
163 <                int added = 0;
163 >                Thread.currentThread().interrupt();
164                  try {
165 <                    while (true) {
166 <                        q.put(added);
167 <                        ++added;
168 <                    }
176 <                } catch (InterruptedException success) {
177 <                    assertTrue(added == 1);
178 <                }
179 <            }});
180 <
181 <        t.start();
182 <        Thread.sleep(SHORT_DELAY_MS);
183 <        assertEquals(0, q.take());
184 <        Thread.sleep(SHORT_DELAY_MS);
185 <        t.interrupt();
186 <        t.join();
187 <    }
188 <
189 <    /**
190 <     * timed offer times out if elements not taken
191 <     */
192 <    public void testTimedOffer() throws InterruptedException {
193 <        final SynchronousQueue q = new SynchronousQueue();
194 <        Thread t = new Thread(new CheckedInterruptedRunnable() {
195 <            public void realRun() throws InterruptedException {
196 <                assertFalse(q.offer(new Object(), SHORT_DELAY_MS, MILLISECONDS));
197 <                q.offer(new Object(), LONG_DELAY_MS, MILLISECONDS);
198 <            }});
199 <
200 <        t.start();
201 <        Thread.sleep(SMALL_DELAY_MS);
202 <        t.interrupt();
203 <        t.join();
204 <    }
205 <
165 >                    q.put(99);
166 >                    shouldThrow();
167 >                } catch (InterruptedException success) {}
168 >                assertFalse(Thread.interrupted());
169  
170 <    /**
171 <     * take blocks interruptibly when empty
172 <     */
173 <    public void testTakeFromEmpty() throws InterruptedException {
174 <        final SynchronousQueue q = new SynchronousQueue();
175 <        Thread t = new Thread(new CheckedInterruptedRunnable() {
213 <            public void realRun() throws InterruptedException {
214 <                q.take();
170 >                pleaseInterrupt.countDown();
171 >                try {
172 >                    q.put(99);
173 >                    shouldThrow();
174 >                } catch (InterruptedException success) {}
175 >                assertFalse(Thread.interrupted());
176              }});
177  
178 <        t.start();
179 <        Thread.sleep(SHORT_DELAY_MS);
178 >        await(pleaseInterrupt);
179 >        assertThreadStaysAlive(t);
180          t.interrupt();
181 <        t.join();
181 >        awaitTermination(t);
182 >        assertEquals(0, q.remainingCapacity());
183      }
184  
223
185      /**
186 <     * put blocks interruptibly if no active taker
186 >     * put blocks interruptibly waiting for take
187       */
188 <    public void testFairBlockingPut() throws InterruptedException {
189 <        Thread t = new Thread(new CheckedInterruptedRunnable() {
188 >    public void testPutWithTake()      { testPutWithTake(false); }
189 >    public void testPutWithTake_fair() { testPutWithTake(true); }
190 >    public void testPutWithTake(boolean fair) {
191 >        final SynchronousQueue q = new SynchronousQueue(fair);
192 >        final CountDownLatch pleaseTake = new CountDownLatch(1);
193 >        final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
194 >        Thread t = newStartedThread(new CheckedRunnable() {
195              public void realRun() throws InterruptedException {
196 <                SynchronousQueue q = new SynchronousQueue(true);
197 <                q.put(zero);
232 <            }});
233 <
234 <        t.start();
235 <        Thread.sleep(SHORT_DELAY_MS);
236 <        t.interrupt();
237 <        t.join();
238 <    }
196 >                pleaseTake.countDown();
197 >                q.put(one);
198  
199 <    /**
241 <     * put blocks waiting for take
242 <     */
243 <    public void testFairPutWithTake() throws InterruptedException {
244 <        final SynchronousQueue q = new SynchronousQueue(true);
245 <        Thread t = new Thread(new CheckedRunnable() {
246 <            public void realRun() throws InterruptedException {
247 <                int added = 0;
199 >                pleaseInterrupt.countDown();
200                  try {
201 <                    while (true) {
202 <                        q.put(added);
203 <                        ++added;
204 <                    }
253 <                } catch (InterruptedException success) {
254 <                    assertTrue(added == 1);
255 <                }
201 >                    q.put(99);
202 >                    shouldThrow();
203 >                } catch (InterruptedException success) {}
204 >                assertFalse(Thread.interrupted());
205              }});
206  
207 <        t.start();
208 <        Thread.sleep(SHORT_DELAY_MS);
209 <        assertEquals(0, q.take());
210 <        Thread.sleep(SHORT_DELAY_MS);
262 <        t.interrupt();
263 <        t.join();
264 <    }
265 <
266 <    /**
267 <     * timed offer times out if elements not taken
268 <     */
269 <    public void testFairTimedOffer() throws InterruptedException {
270 <        final SynchronousQueue q = new SynchronousQueue(true);
271 <        Thread t = new Thread(new CheckedInterruptedRunnable() {
272 <            public void realRun() throws InterruptedException {
273 <                assertFalse(q.offer(new Object(), SHORT_DELAY_MS, MILLISECONDS));
274 <                q.offer(new Object(), LONG_DELAY_MS, MILLISECONDS);
275 <            }});
207 >        await(pleaseTake);
208 >        assertEquals(q.remainingCapacity(), 0);
209 >        try { assertSame(one, q.take()); }
210 >        catch (InterruptedException e) { threadUnexpectedException(e); }
211  
212 <        t.start();
213 <        Thread.sleep(SMALL_DELAY_MS);
212 >        await(pleaseInterrupt);
213 >        assertThreadStaysAlive(t);
214          t.interrupt();
215 <        t.join();
215 >        awaitTermination(t);
216 >        assertEquals(q.remainingCapacity(), 0);
217      }
218  
283
219      /**
220 <     * take blocks interruptibly when empty
220 >     * timed offer times out if elements not taken
221       */
222 <    public void testFairTakeFromEmpty() throws InterruptedException {
223 <        final SynchronousQueue q = new SynchronousQueue(true);
224 <        Thread t = new Thread(new CheckedInterruptedRunnable() {
225 <            public void realRun() throws InterruptedException {
226 <                q.take();
222 >    public void testTimedOffer()      { testTimedOffer(false); }
223 >    public void testTimedOffer_fair() { testTimedOffer(true); }
224 >    public void testTimedOffer(boolean fair) {
225 >        final SynchronousQueue q = new SynchronousQueue(fair);
226 >        final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
227 >        Thread t = newStartedThread(new CheckedRunnable() {
228 >            public void realRun() throws InterruptedException {
229 >                long startTime = System.nanoTime();
230 >                assertFalse(q.offer(new Object(), timeoutMillis(), MILLISECONDS));
231 >                assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
232 >                pleaseInterrupt.countDown();
233 >                try {
234 >                    q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS);
235 >                    shouldThrow();
236 >                } catch (InterruptedException success) {}
237              }});
238  
239 <        t.start();
240 <        Thread.sleep(SHORT_DELAY_MS);
239 >        await(pleaseInterrupt);
240 >        assertThreadStaysAlive(t);
241          t.interrupt();
242 <        t.join();
242 >        awaitTermination(t);
243      }
244  
245      /**
246 <     * poll fails unless active taker
246 >     * poll return null if no active putter
247       */
248      public void testPoll() {
249          SynchronousQueue q = new SynchronousQueue();
# Line 306 | Line 251 | public class SynchronousQueueTest extend
251      }
252  
253      /**
254 <     * timed pool with zero timeout times out if no active taker
254 >     * timed poll with zero timeout times out if no active putter
255       */
256      public void testTimedPoll0() throws InterruptedException {
257          SynchronousQueue q = new SynchronousQueue();
# Line 314 | Line 259 | public class SynchronousQueueTest extend
259      }
260  
261      /**
262 <     * timed pool with nonzero timeout times out if no active taker
262 >     * timed poll with nonzero timeout times out if no active putter
263       */
264      public void testTimedPoll() throws InterruptedException {
265          SynchronousQueue q = new SynchronousQueue();
266 <        assertNull(q.poll(SHORT_DELAY_MS, MILLISECONDS));
266 >        long startTime = System.nanoTime();
267 >        assertNull(q.poll(timeoutMillis(), MILLISECONDS));
268 >        assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
269      }
270  
271      /**
272 <     * Interrupted timed poll throws InterruptedException instead of
273 <     * returning timeout status
327 <     */
328 <    public void testInterruptedTimedPoll() throws InterruptedException {
329 <        final SynchronousQueue q = new SynchronousQueue();
330 <        Thread t = new Thread(new CheckedInterruptedRunnable() {
331 <            public void realRun() throws InterruptedException {
332 <                q.poll(SMALL_DELAY_MS, MILLISECONDS);
333 <            }});
334 <
335 <        t.start();
336 <        Thread.sleep(SHORT_DELAY_MS);
337 <        t.interrupt();
338 <        t.join();
339 <    }
340 <
341 <    /**
342 <     *  timed poll before a delayed offer fails; after offer succeeds;
343 <     *  on interruption throws
344 <     */
345 <    public void testTimedPollWithOffer() throws InterruptedException {
346 <        final SynchronousQueue q = new SynchronousQueue();
347 <        Thread t = new Thread(new CheckedRunnable() {
348 <            public void realRun() throws InterruptedException {
349 <                assertNull(q.poll(SHORT_DELAY_MS, MILLISECONDS));
350 <                assertSame(zero, q.poll(LONG_DELAY_MS, MILLISECONDS));
351 <                try {
352 <                    q.poll(LONG_DELAY_MS, MILLISECONDS);
353 <                    shouldThrow();
354 <                } catch (InterruptedException success) {}
355 <            }});
356 <
357 <        t.start();
358 <        Thread.sleep(SMALL_DELAY_MS);
359 <        assertTrue(q.offer(zero, SHORT_DELAY_MS, MILLISECONDS));
360 <        t.interrupt();
361 <        t.join();
362 <    }
363 <
364 <    /**
365 <     * Interrupted timed poll throws InterruptedException instead of
366 <     * returning timeout status
367 <     */
368 <    public void testFairInterruptedTimedPoll() throws InterruptedException {
369 <        Thread t = new Thread(new CheckedInterruptedRunnable() {
370 <            public void realRun() throws InterruptedException {
371 <                SynchronousQueue q = new SynchronousQueue(true);
372 <                q.poll(SMALL_DELAY_MS, MILLISECONDS);
373 <            }});
374 <
375 <        t.start();
376 <        Thread.sleep(SHORT_DELAY_MS);
377 <        t.interrupt();
378 <        t.join();
379 <    }
380 <
381 <    /**
382 <     *  timed poll before a delayed offer fails; after offer succeeds;
383 <     *  on interruption throws
272 >     * timed poll before a delayed offer times out, returning null;
273 >     * after offer succeeds; on interruption throws
274       */
275      public void testFairTimedPollWithOffer() throws InterruptedException {
276          final SynchronousQueue q = new SynchronousQueue(true);
277 <        Thread t = new Thread(new CheckedRunnable() {
277 >        final CountDownLatch pleaseOffer = new CountDownLatch(1);
278 >        Thread t = newStartedThread(new CheckedRunnable() {
279              public void realRun() throws InterruptedException {
280 +                long t0 = System.nanoTime();
281                  assertNull(q.poll(SHORT_DELAY_MS, MILLISECONDS));
282 +                assertTrue(millisElapsedSince(t0) >= SHORT_DELAY_MS);
283 +
284 +                pleaseOffer.countDown();
285 +                t0 = System.nanoTime();
286                  assertSame(zero, q.poll(LONG_DELAY_MS, MILLISECONDS));
287 +                assertTrue(millisElapsedSince(t0) < MEDIUM_DELAY_MS);
288 +
289 +                t0 = System.nanoTime();
290                  try {
291                      q.poll(LONG_DELAY_MS, MILLISECONDS);
292 <                    threadShouldThrow();
292 >                    shouldThrow();
293                  } catch (InterruptedException success) {}
294 +                assertTrue(millisElapsedSince(t0) < MEDIUM_DELAY_MS);
295              }});
296  
297 <        t.start();
298 <        Thread.sleep(SMALL_DELAY_MS);
299 <        assertTrue(q.offer(zero, SHORT_DELAY_MS, MILLISECONDS));
297 >        assertTrue(pleaseOffer.await(MEDIUM_DELAY_MS, MILLISECONDS));
298 >        long t0 = System.nanoTime();
299 >        assertTrue(q.offer(zero, LONG_DELAY_MS, MILLISECONDS));
300 >        assertTrue(millisElapsedSince(t0) < MEDIUM_DELAY_MS);
301 >
302          t.interrupt();
303 <        t.join();
303 >        awaitTermination(t, MEDIUM_DELAY_MS);
304      }
305  
404
306      /**
307 <     * peek returns null
307 >     * peek() returns null if no active putter
308       */
309      public void testPeek() {
310          SynchronousQueue q = new SynchronousQueue();
# Line 411 | Line 312 | public class SynchronousQueueTest extend
312      }
313  
314      /**
315 <     * element throws NSEE
315 >     * element() throws NSEE if no active putter
316       */
317      public void testElement() {
318          SynchronousQueue q = new SynchronousQueue();
# Line 422 | Line 323 | public class SynchronousQueueTest extend
323      }
324  
325      /**
326 <     * remove throws NSEE if no active taker
326 >     * remove() throws NSEE if no active putter
327       */
328      public void testRemove() {
329          SynchronousQueue q = new SynchronousQueue();
# Line 491 | Line 392 | public class SynchronousQueueTest extend
392          assertFalse(q.containsAll(Arrays.asList(ints)));
393      }
394  
494
395      /**
396       * toArray is empty
397       */
# Line 521 | Line 421 | public class SynchronousQueueTest extend
421          } catch (NullPointerException success) {}
422      }
423  
524
424      /**
425       * iterator does not traverse any elements
426       */
# Line 556 | Line 455 | public class SynchronousQueueTest extend
455          assertNotNull(s);
456      }
457  
559
458      /**
459       * offer transfers elements across Executor tasks
460       */
461      public void testOfferInExecutor() {
462          final SynchronousQueue q = new SynchronousQueue();
463          ExecutorService executor = Executors.newFixedThreadPool(2);
464 +        final CheckedBarrier threadsStarted = new CheckedBarrier(2);
465  
466          executor.execute(new CheckedRunnable() {
467              public void realRun() throws InterruptedException {
468                  assertFalse(q.offer(one));
469 <                assertTrue(q.offer(one, MEDIUM_DELAY_MS, MILLISECONDS));
469 >                threadsStarted.await();
470 >                assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
471                  assertEquals(0, q.remainingCapacity());
472              }});
473  
474          executor.execute(new CheckedRunnable() {
475              public void realRun() throws InterruptedException {
476 <                Thread.sleep(SMALL_DELAY_MS);
476 >                threadsStarted.await();
477                  assertSame(one, q.take());
478              }});
479  
480          joinPool(executor);
581
481      }
482  
483      /**
484 <     * poll retrieves elements across Executor threads
484 >     * timed poll retrieves elements across Executor threads
485       */
486      public void testPollInExecutor() {
487          final SynchronousQueue q = new SynchronousQueue();
488 +        final CheckedBarrier threadsStarted = new CheckedBarrier(2);
489          ExecutorService executor = Executors.newFixedThreadPool(2);
490          executor.execute(new CheckedRunnable() {
491              public void realRun() throws InterruptedException {
492                  assertNull(q.poll());
493 <                assertSame(one, q.poll(MEDIUM_DELAY_MS, MILLISECONDS));
493 >                threadsStarted.await();
494 >                assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
495                  assertTrue(q.isEmpty());
496              }});
497  
498          executor.execute(new CheckedRunnable() {
499              public void realRun() throws InterruptedException {
500 <                Thread.sleep(SHORT_DELAY_MS);
500 >                threadsStarted.await();
501                  q.put(one);
502              }});
503  
# Line 659 | Line 560 | public class SynchronousQueueTest extend
560       */
561      public void testDrainToWithActivePut() throws InterruptedException {
562          final SynchronousQueue q = new SynchronousQueue();
563 <        Thread t = new Thread(new CheckedRunnable() {
563 >        Thread t = newStartedThread(new CheckedRunnable() {
564              public void realRun() throws InterruptedException {
565 <                q.put(new Integer(1));
565 >                q.put(one);
566              }});
567  
667        t.start();
568          ArrayList l = new ArrayList();
569 <        Thread.sleep(SHORT_DELAY_MS);
570 <        q.drainTo(l);
571 <        assertTrue(l.size() <= 1);
572 <        if (l.size() > 0)
573 <            assertEquals(l.get(0), new Integer(1));
574 <        t.join();
575 <        assertTrue(l.size() <= 1);
569 >        long startTime = System.nanoTime();
570 >        while (l.isEmpty()) {
571 >            q.drainTo(l);
572 >            if (millisElapsedSince(startTime) > LONG_DELAY_MS)
573 >                fail("timed out");
574 >            Thread.yield();
575 >        }
576 >        assertTrue(l.size() == 1);
577 >        assertSame(one, l.get(0));
578 >        awaitTermination(t);
579      }
580  
581      /**
# Line 702 | Line 605 | public class SynchronousQueueTest extend
605       */
606      public void testDrainToN() throws InterruptedException {
607          final SynchronousQueue q = new SynchronousQueue();
608 <        Thread t1 = new Thread(new CheckedRunnable() {
608 >        Thread t1 = newStartedThread(new CheckedRunnable() {
609              public void realRun() throws InterruptedException {
610                  q.put(one);
611              }});
612  
613 <        Thread t2 = new Thread(new CheckedRunnable() {
613 >        Thread t2 = newStartedThread(new CheckedRunnable() {
614              public void realRun() throws InterruptedException {
615                  q.put(two);
616              }});
617  
715        t1.start();
716        t2.start();
618          ArrayList l = new ArrayList();
619 <        Thread.sleep(SHORT_DELAY_MS);
619 >        delay(SHORT_DELAY_MS);
620          q.drainTo(l, 1);
621 <        assertTrue(l.size() == 1);
621 >        assertEquals(1, l.size());
622          q.drainTo(l, 1);
623 <        assertTrue(l.size() == 2);
623 >        assertEquals(2, l.size());
624          assertTrue(l.contains(one));
625          assertTrue(l.contains(two));
626 <        t1.join();
627 <        t2.join();
626 >        awaitTermination(t1);
627 >        awaitTermination(t2);
628      }
629  
630   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines