ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SynchronousQueueTest.java
(Generate patch)

Comparing jsr166/src/test/tck/SynchronousQueueTest.java (file contents):
Revision 1.14 by jsr166, Sat Nov 21 02:07:27 2009 UTC vs.
Revision 1.35 by jsr166, Fri May 27 20:07:24 2011 UTC

# Line 1 | Line 1
1   /*
2   * Written by Doug Lea with assistance from members of JCP JSR-166
3   * Expert Group and released to the public domain, as explained at
4 < * http://creativecommons.org/licenses/publicdomain
4 > * http://creativecommons.org/publicdomain/zero/1.0/
5   * Other contributors include Andrew Wright, Jeffrey Hayes,
6   * Pat Fisher, Mike Judd.
7   */
# Line 14 | Line 14 | import java.io.*;
14  
15   public class SynchronousQueueTest extends JSR166TestCase {
16  
17 <    public static void main(String[] args) {
18 <        junit.textui.TestRunner.run (suite());
17 >    public static class Fair extends BlockingQueueTest {
18 >        protected BlockingQueue emptyCollection() {
19 >            return new SynchronousQueue(true);
20 >        }
21      }
22  
23 <    public static Test suite() {
24 <        return new TestSuite(SynchronousQueueTest.class);
23 >    public static class NonFair extends BlockingQueueTest {
24 >        protected BlockingQueue emptyCollection() {
25 >            return new SynchronousQueue(false);
26 >        }
27      }
28  
29 <    /**
30 <     * A SynchronousQueue is both empty and full
31 <     */
32 <    public void testEmptyFull() {
33 <        SynchronousQueue q = new SynchronousQueue();
34 <        assertTrue(q.isEmpty());
35 <        assertEquals(0, q.size());
36 <        assertEquals(0, q.remainingCapacity());
33 <        assertFalse(q.offer(zero));
29 >    public static void main(String[] args) {
30 >        junit.textui.TestRunner.run(suite());
31 >    }
32 >
33 >    public static Test suite() {
34 >        return newTestSuite(SynchronousQueueTest.class,
35 >                            new Fair().testSuite(),
36 >                            new NonFair().testSuite());
37      }
38  
39      /**
40 <     * A fair SynchronousQueue is both empty and full
40 >     * Any SynchronousQueue is both empty and full
41       */
42 <    public void testFairEmptyFull() {
43 <        SynchronousQueue q = new SynchronousQueue(true);
42 >    public void testEmptyFull()      { testEmptyFull(false); }
43 >    public void testEmptyFull_fair() { testEmptyFull(true); }
44 >    public void testEmptyFull(boolean fair) {
45 >        final SynchronousQueue q = new SynchronousQueue(fair);
46          assertTrue(q.isEmpty());
47          assertEquals(0, q.size());
48          assertEquals(0, q.remainingCapacity());
# Line 119 | Line 124 | public class SynchronousQueueTest extend
124              shouldThrow();
125          } catch (NullPointerException success) {}
126      }
127 +
128      /**
129       * addAll throws ISE if no active taker
130       */
# Line 142 | Line 148 | public class SynchronousQueueTest extend
148              q.put(null);
149              shouldThrow();
150          } catch (NullPointerException success) {}
145     }
146
147    /**
148     * put blocks interruptibly if no active taker
149     */
150    public void testBlockingPut() throws InterruptedException {
151        Thread t = new Thread(new CheckedInterruptedRunnable() {
152            public void realRun() throws InterruptedException {
153                SynchronousQueue q = new SynchronousQueue();
154                q.put(zero);
155            }});
156
157        t.start();
158        Thread.sleep(SHORT_DELAY_MS);
159        t.interrupt();
160        t.join();
151      }
152  
153      /**
154 <     * put blocks waiting for take
154 >     * put blocks interruptibly if no active taker
155       */
156 <    public void testPutWithTake() throws InterruptedException {
157 <        final SynchronousQueue q = new SynchronousQueue();
158 <        Thread t = new Thread(new CheckedRunnable() {
156 >    public void testBlockingPut()      { testBlockingPut(false); }
157 >    public void testBlockingPut_fair() { testBlockingPut(true); }
158 >    public void testBlockingPut(boolean fair) {
159 >        final SynchronousQueue q = new SynchronousQueue(fair);
160 >        final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
161 >        Thread t = newStartedThread(new CheckedRunnable() {
162              public void realRun() throws InterruptedException {
163 <                int added = 0;
163 >                Thread.currentThread().interrupt();
164                  try {
165 <                    q.put(new Object());
166 <                    ++added;
167 <                    q.put(new Object());
168 <                    ++added;
176 <                    q.put(new Object());
177 <                    ++added;
178 <                    q.put(new Object());
179 <                    ++added;
180 <                    threadShouldThrow();
181 <                } catch (InterruptedException success) {
182 <                    assertTrue(added >= 1);
183 <                }
184 <            }});
185 <
186 <        t.start();
187 <        Thread.sleep(SHORT_DELAY_MS);
188 <        q.take();
189 <        Thread.sleep(SHORT_DELAY_MS);
190 <        t.interrupt();
191 <        t.join();
192 <    }
165 >                    q.put(99);
166 >                    shouldThrow();
167 >                } catch (InterruptedException success) {}
168 >                assertFalse(Thread.interrupted());
169  
170 <    /**
171 <     * timed offer times out if elements not taken
172 <     */
173 <    public void testTimedOffer() throws InterruptedException {
174 <        final SynchronousQueue q = new SynchronousQueue();
175 <        Thread t = new Thread(new CheckedInterruptedRunnable() {
200 <            public void realRun() throws InterruptedException {
201 <                threadAssertFalse(q.offer(new Object(), SHORT_DELAY_MS, MILLISECONDS));
202 <                q.offer(new Object(), LONG_DELAY_MS, MILLISECONDS);
170 >                pleaseInterrupt.countDown();
171 >                try {
172 >                    q.put(99);
173 >                    shouldThrow();
174 >                } catch (InterruptedException success) {}
175 >                assertFalse(Thread.interrupted());
176              }});
177  
178 <        t.start();
179 <        Thread.sleep(SMALL_DELAY_MS);
178 >        await(pleaseInterrupt);
179 >        assertThreadStaysAlive(t);
180          t.interrupt();
181 <        t.join();
181 >        awaitTermination(t);
182 >        assertEquals(0, q.remainingCapacity());
183      }
184  
211
185      /**
186 <     * take blocks interruptibly when empty
186 >     * put blocks interruptibly waiting for take
187       */
188 <    public void testTakeFromEmpty() throws InterruptedException {
189 <        final SynchronousQueue q = new SynchronousQueue();
190 <        Thread t = new Thread(new CheckedInterruptedRunnable() {
188 >    public void testPutWithTake()      { testPutWithTake(false); }
189 >    public void testPutWithTake_fair() { testPutWithTake(true); }
190 >    public void testPutWithTake(boolean fair) {
191 >        final SynchronousQueue q = new SynchronousQueue(fair);
192 >        final CountDownLatch pleaseTake = new CountDownLatch(1);
193 >        final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
194 >        Thread t = newStartedThread(new CheckedRunnable() {
195              public void realRun() throws InterruptedException {
196 <                q.take();
197 <            }});
221 <
222 <        t.start();
223 <        Thread.sleep(SHORT_DELAY_MS);
224 <        t.interrupt();
225 <        t.join();
226 <    }
227 <
196 >                pleaseTake.countDown();
197 >                q.put(one);
198  
199 <    /**
200 <     * put blocks interruptibly if no active taker
201 <     */
202 <    public void testFairBlockingPut() throws InterruptedException {
203 <        Thread t = new Thread(new CheckedInterruptedRunnable() {
204 <            public void realRun() throws InterruptedException {
235 <                SynchronousQueue q = new SynchronousQueue(true);
236 <                q.put(zero);
199 >                pleaseInterrupt.countDown();
200 >                try {
201 >                    q.put(99);
202 >                    shouldThrow();
203 >                } catch (InterruptedException success) {}
204 >                assertFalse(Thread.interrupted());
205              }});
206  
207 <        t.start();
208 <        Thread.sleep(SHORT_DELAY_MS);
209 <        t.interrupt();
210 <        t.join();
243 <    }
207 >        await(pleaseTake);
208 >        assertEquals(q.remainingCapacity(), 0);
209 >        try { assertSame(one, q.take()); }
210 >        catch (InterruptedException e) { threadUnexpectedException(e); }
211  
212 <    /**
213 <     * put blocks waiting for take
247 <     */
248 <    public void testFairPutWithTake() throws InterruptedException {
249 <        final SynchronousQueue q = new SynchronousQueue(true);
250 <        Thread t = new Thread(new CheckedRunnable() {
251 <            public void realRun() throws InterruptedException {
252 <                int added = 0;
253 <                try {
254 <                    q.put(new Object());
255 <                    ++added;
256 <                    q.put(new Object());
257 <                    ++added;
258 <                    q.put(new Object());
259 <                    ++added;
260 <                    q.put(new Object());
261 <                    ++added;
262 <                    threadShouldThrow();
263 <                } catch (InterruptedException success) {
264 <                    assertTrue(added >= 1);
265 <                }
266 <            }});
267 <
268 <        t.start();
269 <        Thread.sleep(SHORT_DELAY_MS);
270 <        q.take();
271 <        Thread.sleep(SHORT_DELAY_MS);
212 >        await(pleaseInterrupt);
213 >        assertThreadStaysAlive(t);
214          t.interrupt();
215 <        t.join();
215 >        awaitTermination(t);
216 >        assertEquals(q.remainingCapacity(), 0);
217      }
218  
219      /**
220       * timed offer times out if elements not taken
221       */
222 <    public void testFairTimedOffer() throws InterruptedException {
223 <        final SynchronousQueue q = new SynchronousQueue(true);
224 <        Thread t = new Thread(new CheckedInterruptedRunnable() {
225 <            public void realRun() throws InterruptedException {
226 <                threadAssertFalse(q.offer(new Object(), SHORT_DELAY_MS, MILLISECONDS));
227 <                q.offer(new Object(), LONG_DELAY_MS, MILLISECONDS);
228 <            }});
229 <
230 <        t.start();
231 <        Thread.sleep(SMALL_DELAY_MS);
232 <        t.interrupt();
233 <        t.join();
234 <    }
235 <
236 <
294 <    /**
295 <     * take blocks interruptibly when empty
296 <     */
297 <    public void testFairTakeFromEmpty() throws InterruptedException {
298 <        final SynchronousQueue q = new SynchronousQueue(true);
299 <        Thread t = new Thread(new CheckedInterruptedRunnable() {
300 <            public void realRun() throws InterruptedException {
301 <                q.take();
222 >    public void testTimedOffer()      { testTimedOffer(false); }
223 >    public void testTimedOffer_fair() { testTimedOffer(true); }
224 >    public void testTimedOffer(boolean fair) {
225 >        final SynchronousQueue q = new SynchronousQueue(fair);
226 >        final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
227 >        Thread t = newStartedThread(new CheckedRunnable() {
228 >            public void realRun() throws InterruptedException {
229 >                long startTime = System.nanoTime();
230 >                assertFalse(q.offer(new Object(), timeoutMillis(), MILLISECONDS));
231 >                assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
232 >                pleaseInterrupt.countDown();
233 >                try {
234 >                    q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS);
235 >                    shouldThrow();
236 >                } catch (InterruptedException success) {}
237              }});
238  
239 <        t.start();
240 <        Thread.sleep(SHORT_DELAY_MS);
239 >        await(pleaseInterrupt);
240 >        assertThreadStaysAlive(t);
241          t.interrupt();
242 <        t.join();
242 >        awaitTermination(t);
243      }
244  
245      /**
246 <     * poll fails unless active taker
246 >     * poll return null if no active putter
247       */
248      public void testPoll() {
249          SynchronousQueue q = new SynchronousQueue();
# Line 316 | Line 251 | public class SynchronousQueueTest extend
251      }
252  
253      /**
254 <     * timed pool with zero timeout times out if no active taker
254 >     * timed poll with zero timeout times out if no active putter
255       */
256      public void testTimedPoll0() throws InterruptedException {
257          SynchronousQueue q = new SynchronousQueue();
# Line 324 | Line 259 | public class SynchronousQueueTest extend
259      }
260  
261      /**
262 <     * timed pool with nonzero timeout times out if no active taker
262 >     * timed poll with nonzero timeout times out if no active putter
263       */
264      public void testTimedPoll() throws InterruptedException {
265          SynchronousQueue q = new SynchronousQueue();
266 <        assertNull(q.poll(SHORT_DELAY_MS, MILLISECONDS));
267 <    }
268 <
334 <    /**
335 <     * Interrupted timed poll throws InterruptedException instead of
336 <     * returning timeout status
337 <     */
338 <    public void testInterruptedTimedPoll() throws InterruptedException {
339 <        Thread t = new Thread(new CheckedInterruptedRunnable() {
340 <            public void realRun() throws InterruptedException {
341 <                SynchronousQueue q = new SynchronousQueue();
342 <                q.poll(SMALL_DELAY_MS, MILLISECONDS);
343 <            }});
344 <
345 <        t.start();
346 <        Thread.sleep(SHORT_DELAY_MS);
347 <        t.interrupt();
348 <        t.join();
349 <    }
350 <
351 <    /**
352 <     *  timed poll before a delayed offer fails; after offer succeeds;
353 <     *  on interruption throws
354 <     */
355 <    public void testTimedPollWithOffer() throws InterruptedException {
356 <        final SynchronousQueue q = new SynchronousQueue();
357 <        Thread t = new Thread(new CheckedRunnable() {
358 <            public void realRun() throws InterruptedException {
359 <                threadAssertNull(q.poll(SHORT_DELAY_MS, MILLISECONDS));
360 <                assertSame(zero, q.poll(LONG_DELAY_MS, MILLISECONDS));
361 <                try {
362 <                    q.poll(LONG_DELAY_MS, MILLISECONDS);
363 <                    threadShouldThrow();
364 <                } catch (InterruptedException success) {}
365 <            }});
366 <
367 <        t.start();
368 <        Thread.sleep(SMALL_DELAY_MS);
369 <        assertTrue(q.offer(zero, SHORT_DELAY_MS, MILLISECONDS));
370 <        t.interrupt();
371 <        t.join();
266 >        long startTime = System.nanoTime();
267 >        assertNull(q.poll(timeoutMillis(), MILLISECONDS));
268 >        assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
269      }
270  
271      /**
272 <     * Interrupted timed poll throws InterruptedException instead of
273 <     * returning timeout status
377 <     */
378 <    public void testFairInterruptedTimedPoll() throws InterruptedException {
379 <        Thread t = new Thread(new CheckedInterruptedRunnable() {
380 <            public void realRun() throws InterruptedException {
381 <                SynchronousQueue q = new SynchronousQueue(true);
382 <                q.poll(SMALL_DELAY_MS, MILLISECONDS);
383 <            }});
384 <
385 <        t.start();
386 <        Thread.sleep(SHORT_DELAY_MS);
387 <        t.interrupt();
388 <        t.join();
389 <    }
390 <
391 <    /**
392 <     *  timed poll before a delayed offer fails; after offer succeeds;
393 <     *  on interruption throws
272 >     * timed poll before a delayed offer times out, returning null;
273 >     * after offer succeeds; on interruption throws
274       */
275      public void testFairTimedPollWithOffer() throws InterruptedException {
276          final SynchronousQueue q = new SynchronousQueue(true);
277 <        Thread t = new Thread(new CheckedRunnable() {
277 >        final CountDownLatch pleaseOffer = new CountDownLatch(1);
278 >        Thread t = newStartedThread(new CheckedRunnable() {
279              public void realRun() throws InterruptedException {
280 <                threadAssertNull(q.poll(SHORT_DELAY_MS, MILLISECONDS));
280 >                long t0 = System.nanoTime();
281 >                assertNull(q.poll(SHORT_DELAY_MS, MILLISECONDS));
282 >                assertTrue(millisElapsedSince(t0) >= SHORT_DELAY_MS);
283 >
284 >                pleaseOffer.countDown();
285 >                t0 = System.nanoTime();
286                  assertSame(zero, q.poll(LONG_DELAY_MS, MILLISECONDS));
287 +                assertTrue(millisElapsedSince(t0) < MEDIUM_DELAY_MS);
288 +
289 +                t0 = System.nanoTime();
290                  try {
291                      q.poll(LONG_DELAY_MS, MILLISECONDS);
292 <                    threadShouldThrow();
292 >                    shouldThrow();
293                  } catch (InterruptedException success) {}
294 +                assertTrue(millisElapsedSince(t0) < MEDIUM_DELAY_MS);
295              }});
296  
297 <        t.start();
298 <        Thread.sleep(SMALL_DELAY_MS);
299 <        assertTrue(q.offer(zero, SHORT_DELAY_MS, MILLISECONDS));
297 >        assertTrue(pleaseOffer.await(MEDIUM_DELAY_MS, MILLISECONDS));
298 >        long t0 = System.nanoTime();
299 >        assertTrue(q.offer(zero, LONG_DELAY_MS, MILLISECONDS));
300 >        assertTrue(millisElapsedSince(t0) < MEDIUM_DELAY_MS);
301 >
302          t.interrupt();
303 <        t.join();
303 >        awaitTermination(t, MEDIUM_DELAY_MS);
304      }
305  
414
306      /**
307 <     * peek returns null
307 >     * peek() returns null if no active putter
308       */
309      public void testPeek() {
310          SynchronousQueue q = new SynchronousQueue();
# Line 421 | Line 312 | public class SynchronousQueueTest extend
312      }
313  
314      /**
315 <     * element throws NSEE
315 >     * element() throws NSEE if no active putter
316       */
317      public void testElement() {
318          SynchronousQueue q = new SynchronousQueue();
# Line 432 | Line 323 | public class SynchronousQueueTest extend
323      }
324  
325      /**
326 <     * remove throws NSEE if no active taker
326 >     * remove() throws NSEE if no active putter
327       */
328      public void testRemove() {
329          SynchronousQueue q = new SynchronousQueue();
330          try {
331              q.remove();
332              shouldThrow();
333 <        } catch (NoSuchElementException success) {
443 <        }
333 >        } catch (NoSuchElementException success) {}
334      }
335  
336      /**
# Line 502 | Line 392 | public class SynchronousQueueTest extend
392          assertFalse(q.containsAll(Arrays.asList(ints)));
393      }
394  
505
395      /**
396       * toArray is empty
397       */
# Line 525 | Line 414 | public class SynchronousQueueTest extend
414       * toArray(null) throws NPE
415       */
416      public void testToArray_BadArg() {
417 +        SynchronousQueue q = new SynchronousQueue();
418          try {
529            SynchronousQueue q = new SynchronousQueue();
419              Object o[] = q.toArray(null);
420              shouldThrow();
421          } catch (NullPointerException success) {}
422      }
423  
535
424      /**
425       * iterator does not traverse any elements
426       */
# Line 567 | Line 455 | public class SynchronousQueueTest extend
455          assertNotNull(s);
456      }
457  
570
458      /**
459       * offer transfers elements across Executor tasks
460       */
461      public void testOfferInExecutor() {
462          final SynchronousQueue q = new SynchronousQueue();
463          ExecutorService executor = Executors.newFixedThreadPool(2);
464 <        final Integer one = new Integer(1);
464 >        final CheckedBarrier threadsStarted = new CheckedBarrier(2);
465  
466          executor.execute(new CheckedRunnable() {
467              public void realRun() throws InterruptedException {
468 <                threadAssertFalse(q.offer(one));
469 <                threadAssertTrue(q.offer(one, MEDIUM_DELAY_MS, MILLISECONDS));
470 <                threadAssertEquals(0, q.remainingCapacity());
468 >                assertFalse(q.offer(one));
469 >                threadsStarted.await();
470 >                assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
471 >                assertEquals(0, q.remainingCapacity());
472              }});
473  
474          executor.execute(new CheckedRunnable() {
475              public void realRun() throws InterruptedException {
476 <                Thread.sleep(SMALL_DELAY_MS);
477 <                threadAssertEquals(one, q.take());
476 >                threadsStarted.await();
477 >                assertSame(one, q.take());
478              }});
479  
480          joinPool(executor);
593
481      }
482  
483      /**
484 <     * poll retrieves elements across Executor threads
484 >     * timed poll retrieves elements across Executor threads
485       */
486      public void testPollInExecutor() {
487          final SynchronousQueue q = new SynchronousQueue();
488 +        final CheckedBarrier threadsStarted = new CheckedBarrier(2);
489          ExecutorService executor = Executors.newFixedThreadPool(2);
490          executor.execute(new CheckedRunnable() {
491              public void realRun() throws InterruptedException {
492 <                threadAssertNull(q.poll());
493 <                threadAssertTrue(null != q.poll(MEDIUM_DELAY_MS, MILLISECONDS));
494 <                threadAssertTrue(q.isEmpty());
492 >                assertNull(q.poll());
493 >                threadsStarted.await();
494 >                assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
495 >                assertTrue(q.isEmpty());
496              }});
497  
498          executor.execute(new CheckedRunnable() {
499              public void realRun() throws InterruptedException {
500 <                Thread.sleep(SMALL_DELAY_MS);
501 <                q.put(new Integer(1));
500 >                threadsStarted.await();
501 >                q.put(one);
502              }});
503  
504          joinPool(executor);
# Line 671 | Line 560 | public class SynchronousQueueTest extend
560       */
561      public void testDrainToWithActivePut() throws InterruptedException {
562          final SynchronousQueue q = new SynchronousQueue();
563 <        Thread t = new Thread(new CheckedRunnable() {
563 >        Thread t = newStartedThread(new CheckedRunnable() {
564              public void realRun() throws InterruptedException {
565 <                q.put(new Integer(1));
565 >                q.put(one);
566              }});
567  
679        t.start();
568          ArrayList l = new ArrayList();
569 <        Thread.sleep(SHORT_DELAY_MS);
570 <        q.drainTo(l);
571 <        assertTrue(l.size() <= 1);
572 <        if (l.size() > 0)
573 <            assertEquals(l.get(0), new Integer(1));
574 <        t.join();
575 <        assertTrue(l.size() <= 1);
569 >        long startTime = System.nanoTime();
570 >        while (l.isEmpty()) {
571 >            q.drainTo(l);
572 >            if (millisElapsedSince(startTime) > LONG_DELAY_MS)
573 >                fail("timed out");
574 >            Thread.yield();
575 >        }
576 >        assertTrue(l.size() == 1);
577 >        assertSame(one, l.get(0));
578 >        awaitTermination(t);
579      }
580  
581      /**
# Line 706 | Line 597 | public class SynchronousQueueTest extend
597          try {
598              q.drainTo(q, 0);
599              shouldThrow();
600 <        } catch (IllegalArgumentException success) {
710 <        }
600 >        } catch (IllegalArgumentException success) {}
601      }
602  
603      /**
# Line 715 | Line 605 | public class SynchronousQueueTest extend
605       */
606      public void testDrainToN() throws InterruptedException {
607          final SynchronousQueue q = new SynchronousQueue();
608 <        Thread t1 = new Thread(new CheckedRunnable() {
608 >        Thread t1 = newStartedThread(new CheckedRunnable() {
609              public void realRun() throws InterruptedException {
610                  q.put(one);
611              }});
612  
613 <        Thread t2 = new Thread(new CheckedRunnable() {
613 >        Thread t2 = newStartedThread(new CheckedRunnable() {
614              public void realRun() throws InterruptedException {
615                  q.put(two);
616              }});
617  
728        t1.start();
729        t2.start();
618          ArrayList l = new ArrayList();
619 <        Thread.sleep(SHORT_DELAY_MS);
619 >        delay(SHORT_DELAY_MS);
620          q.drainTo(l, 1);
621 <        assertTrue(l.size() == 1);
621 >        assertEquals(1, l.size());
622          q.drainTo(l, 1);
623 <        assertTrue(l.size() == 2);
623 >        assertEquals(2, l.size());
624          assertTrue(l.contains(one));
625          assertTrue(l.contains(two));
626 <        t1.join();
627 <        t2.join();
626 >        awaitTermination(t1);
627 >        awaitTermination(t2);
628      }
629  
630   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines