ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SynchronousQueueTest.java
(Generate patch)

Comparing jsr166/src/test/tck/SynchronousQueueTest.java (file contents):
Revision 1.29 by jsr166, Thu Nov 18 20:18:43 2010 UTC vs.
Revision 1.35 by jsr166, Fri May 27 20:07:24 2011 UTC

# Line 1 | Line 1
1   /*
2   * Written by Doug Lea with assistance from members of JCP JSR-166
3   * Expert Group and released to the public domain, as explained at
4 < * http://creativecommons.org/licenses/publicdomain
4 > * http://creativecommons.org/publicdomain/zero/1.0/
5   * Other contributors include Andrew Wright, Jeffrey Hayes,
6   * Pat Fisher, Mike Judd.
7   */
# Line 39 | Line 39 | public class SynchronousQueueTest extend
39      /**
40       * Any SynchronousQueue is both empty and full
41       */
42 <    public void testEmptyFull(SynchronousQueue q) {
42 >    public void testEmptyFull()      { testEmptyFull(false); }
43 >    public void testEmptyFull_fair() { testEmptyFull(true); }
44 >    public void testEmptyFull(boolean fair) {
45 >        final SynchronousQueue q = new SynchronousQueue(fair);
46          assertTrue(q.isEmpty());
47          assertEquals(0, q.size());
48          assertEquals(0, q.remainingCapacity());
# Line 47 | Line 50 | public class SynchronousQueueTest extend
50      }
51  
52      /**
50     * A non-fair SynchronousQueue is both empty and full
51     */
52    public void testEmptyFull() {
53        testEmptyFull(new SynchronousQueue());
54    }
55
56    /**
57     * A fair SynchronousQueue is both empty and full
58     */
59    public void testFairEmptyFull() {
60        testEmptyFull(new SynchronousQueue(true));
61    }
62
63    /**
53       * offer(null) throws NPE
54       */
55      public void testOfferNull() {
# Line 159 | Line 148 | public class SynchronousQueueTest extend
148              q.put(null);
149              shouldThrow();
150          } catch (NullPointerException success) {}
151 <     }
151 >    }
152  
153      /**
154       * put blocks interruptibly if no active taker
155       */
156 <    public void testBlockingPut() throws InterruptedException {
157 <        Thread t = new Thread(new CheckedInterruptedRunnable() {
156 >    public void testBlockingPut()      { testBlockingPut(false); }
157 >    public void testBlockingPut_fair() { testBlockingPut(true); }
158 >    public void testBlockingPut(boolean fair) {
159 >        final SynchronousQueue q = new SynchronousQueue(fair);
160 >        final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
161 >        Thread t = newStartedThread(new CheckedRunnable() {
162              public void realRun() throws InterruptedException {
163 <                SynchronousQueue q = new SynchronousQueue();
164 <                q.put(zero);
165 <            }});
166 <
167 <        t.start();
168 <        Thread.sleep(SHORT_DELAY_MS);
176 <        t.interrupt();
177 <        t.join();
178 <    }
163 >                Thread.currentThread().interrupt();
164 >                try {
165 >                    q.put(99);
166 >                    shouldThrow();
167 >                } catch (InterruptedException success) {}
168 >                assertFalse(Thread.interrupted());
169  
170 <    /**
181 <     * put blocks waiting for take
182 <     */
183 <    public void testPutWithTake() throws InterruptedException {
184 <        final SynchronousQueue q = new SynchronousQueue();
185 <        Thread t = new Thread(new CheckedRunnable() {
186 <            public void realRun() throws InterruptedException {
187 <                int added = 0;
170 >                pleaseInterrupt.countDown();
171                  try {
172 <                    while (true) {
173 <                        q.put(added);
174 <                        ++added;
175 <                    }
193 <                } catch (InterruptedException success) {
194 <                    assertEquals(1, added);
195 <                }
172 >                    q.put(99);
173 >                    shouldThrow();
174 >                } catch (InterruptedException success) {}
175 >                assertFalse(Thread.interrupted());
176              }});
177  
178 <        t.start();
179 <        Thread.sleep(SHORT_DELAY_MS);
200 <        assertEquals(0, q.take());
201 <        Thread.sleep(SHORT_DELAY_MS);
178 >        await(pleaseInterrupt);
179 >        assertThreadStaysAlive(t);
180          t.interrupt();
181 <        t.join();
181 >        awaitTermination(t);
182 >        assertEquals(0, q.remainingCapacity());
183      }
184  
185      /**
186 <     * timed offer times out if elements not taken
186 >     * put blocks interruptibly waiting for take
187       */
188 <    public void testTimedOffer(final SynchronousQueue q)
189 <            throws InterruptedException {
188 >    public void testPutWithTake()      { testPutWithTake(false); }
189 >    public void testPutWithTake_fair() { testPutWithTake(true); }
190 >    public void testPutWithTake(boolean fair) {
191 >        final SynchronousQueue q = new SynchronousQueue(fair);
192 >        final CountDownLatch pleaseTake = new CountDownLatch(1);
193          final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
194          Thread t = newStartedThread(new CheckedRunnable() {
195              public void realRun() throws InterruptedException {
196 <                long t0 = System.nanoTime();
197 <                assertFalse(q.offer(new Object(), SHORT_DELAY_MS, MILLISECONDS));
198 <                assertTrue(millisElapsedSince(t0) >= SHORT_DELAY_MS);
196 >                pleaseTake.countDown();
197 >                q.put(one);
198 >
199                  pleaseInterrupt.countDown();
218                t0 = System.nanoTime();
200                  try {
201 <                    q.offer(new Object(), LONG_DELAY_MS, MILLISECONDS);
201 >                    q.put(99);
202                      shouldThrow();
203                  } catch (InterruptedException success) {}
204 <                assertTrue(millisElapsedSince(t0) < MEDIUM_DELAY_MS);
204 >                assertFalse(Thread.interrupted());
205              }});
206  
207 <        assertTrue(pleaseInterrupt.await(MEDIUM_DELAY_MS, MILLISECONDS));
208 <        t.interrupt();
209 <        awaitTermination(t, MEDIUM_DELAY_MS);
210 <    }
230 <
231 <    /**
232 <     * timed offer times out if elements not taken
233 <     */
234 <    public void testTimedOffer() throws InterruptedException {
235 <        testTimedOffer(new SynchronousQueue());
236 <    }
237 <
238 <    /**
239 <     * timed offer times out if elements not taken
240 <     */
241 <    public void testFairTimedOffer() throws InterruptedException {
242 <        testTimedOffer(new SynchronousQueue(true));
243 <    }
244 <
245 <    /**
246 <     * put blocks interruptibly if no active taker
247 <     */
248 <    public void testFairBlockingPut() throws InterruptedException {
249 <        Thread t = new Thread(new CheckedInterruptedRunnable() {
250 <            public void realRun() throws InterruptedException {
251 <                SynchronousQueue q = new SynchronousQueue(true);
252 <                q.put(zero);
253 <            }});
207 >        await(pleaseTake);
208 >        assertEquals(q.remainingCapacity(), 0);
209 >        try { assertSame(one, q.take()); }
210 >        catch (InterruptedException e) { threadUnexpectedException(e); }
211  
212 <        t.start();
213 <        Thread.sleep(SHORT_DELAY_MS);
212 >        await(pleaseInterrupt);
213 >        assertThreadStaysAlive(t);
214          t.interrupt();
215 <        t.join();
215 >        awaitTermination(t);
216 >        assertEquals(q.remainingCapacity(), 0);
217      }
218  
219      /**
220 <     * put blocks waiting for take
220 >     * timed offer times out if elements not taken
221       */
222 <    public void testFairPutWithTake() throws InterruptedException {
223 <        final SynchronousQueue q = new SynchronousQueue(true);
224 <        Thread t = new Thread(new CheckedRunnable() {
222 >    public void testTimedOffer()      { testTimedOffer(false); }
223 >    public void testTimedOffer_fair() { testTimedOffer(true); }
224 >    public void testTimedOffer(boolean fair) {
225 >        final SynchronousQueue q = new SynchronousQueue(fair);
226 >        final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
227 >        Thread t = newStartedThread(new CheckedRunnable() {
228              public void realRun() throws InterruptedException {
229 <                int added = 0;
229 >                long startTime = System.nanoTime();
230 >                assertFalse(q.offer(new Object(), timeoutMillis(), MILLISECONDS));
231 >                assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
232 >                pleaseInterrupt.countDown();
233                  try {
234 <                    while (true) {
235 <                        q.put(added);
236 <                        ++added;
273 <                    }
274 <                } catch (InterruptedException success) {
275 <                    assertEquals(1, added);
276 <                }
277 <            }});
278 <
279 <        t.start();
280 <        Thread.sleep(SHORT_DELAY_MS);
281 <        assertEquals(0, q.take());
282 <        Thread.sleep(SHORT_DELAY_MS);
283 <        t.interrupt();
284 <        t.join();
285 <    }
286 <
287 <    /**
288 <     * take blocks interruptibly when empty
289 <     */
290 <    public void testFairTakeFromEmpty() throws InterruptedException {
291 <        final SynchronousQueue q = new SynchronousQueue(true);
292 <        Thread t = new Thread(new CheckedInterruptedRunnable() {
293 <            public void realRun() throws InterruptedException {
294 <                q.take();
234 >                    q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS);
235 >                    shouldThrow();
236 >                } catch (InterruptedException success) {}
237              }});
238  
239 <        t.start();
240 <        Thread.sleep(SHORT_DELAY_MS);
239 >        await(pleaseInterrupt);
240 >        assertThreadStaysAlive(t);
241          t.interrupt();
242 <        t.join();
242 >        awaitTermination(t);
243      }
244  
245      /**
# Line 321 | Line 263 | public class SynchronousQueueTest extend
263       */
264      public void testTimedPoll() throws InterruptedException {
265          SynchronousQueue q = new SynchronousQueue();
266 <        long t0 = System.nanoTime();
267 <        assertNull(q.poll(SHORT_DELAY_MS, MILLISECONDS));
268 <        assertTrue(millisElapsedSince(t0) >= SHORT_DELAY_MS);
327 <    }
328 <
329 <    /**
330 <     * Interrupted timed poll throws InterruptedException instead of
331 <     * returning timeout status
332 <     */
333 <    public void testInterruptedTimedPoll(final SynchronousQueue q)
334 <            throws InterruptedException {
335 <        final CountDownLatch threadStarted = new CountDownLatch(1);
336 <        Thread t = newStartedThread(new CheckedRunnable() {
337 <            public void realRun() throws InterruptedException {
338 <                long t0 = System.nanoTime();
339 <                threadStarted.countDown();
340 <                try {
341 <                    q.poll(LONG_DELAY_MS, MILLISECONDS);
342 <                    shouldThrow();
343 <                } catch (InterruptedException success) {}
344 <                assertTrue(millisElapsedSince(t0) >= SHORT_DELAY_MS);
345 <                assertTrue(millisElapsedSince(t0) < MEDIUM_DELAY_MS);
346 <            }});
347 <
348 <        threadStarted.await();
349 <        Thread.sleep(SHORT_DELAY_MS);
350 <        t.interrupt();
351 <        awaitTermination(t, MEDIUM_DELAY_MS);
352 <    }
353 <
354 <    /**
355 <     * Interrupted timed poll throws InterruptedException instead of
356 <     * returning timeout status
357 <     */
358 <    public void testInterruptedTimedPoll() throws InterruptedException {
359 <        testInterruptedTimedPoll(new SynchronousQueue());
360 <    }
361 <
362 <    /**
363 <     * Interrupted timed poll throws InterruptedException instead of
364 <     * returning timeout status
365 <     */
366 <    public void testFairInterruptedTimedPoll() throws InterruptedException {
367 <        testInterruptedTimedPoll(new SynchronousQueue(true));
266 >        long startTime = System.nanoTime();
267 >        assertNull(q.poll(timeoutMillis(), MILLISECONDS));
268 >        assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
269      }
270  
271      /**
# Line 397 | Line 298 | public class SynchronousQueueTest extend
298          long t0 = System.nanoTime();
299          assertTrue(q.offer(zero, LONG_DELAY_MS, MILLISECONDS));
300          assertTrue(millisElapsedSince(t0) < MEDIUM_DELAY_MS);
301 <        
301 >
302          t.interrupt();
303          awaitTermination(t, MEDIUM_DELAY_MS);
304      }
# Line 491 | Line 392 | public class SynchronousQueueTest extend
392          assertFalse(q.containsAll(Arrays.asList(ints)));
393      }
394  
494
395      /**
396       * toArray is empty
397       */
# Line 521 | Line 421 | public class SynchronousQueueTest extend
421          } catch (NullPointerException success) {}
422      }
423  
524
424      /**
425       * iterator does not traverse any elements
426       */
# Line 556 | Line 455 | public class SynchronousQueueTest extend
455          assertNotNull(s);
456      }
457  
559
458      /**
459       * offer transfers elements across Executor tasks
460       */
461      public void testOfferInExecutor() {
462          final SynchronousQueue q = new SynchronousQueue();
463          ExecutorService executor = Executors.newFixedThreadPool(2);
464 +        final CheckedBarrier threadsStarted = new CheckedBarrier(2);
465  
466          executor.execute(new CheckedRunnable() {
467              public void realRun() throws InterruptedException {
468                  assertFalse(q.offer(one));
469 <                assertTrue(q.offer(one, MEDIUM_DELAY_MS, MILLISECONDS));
469 >                threadsStarted.await();
470 >                assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
471                  assertEquals(0, q.remainingCapacity());
472              }});
473  
474          executor.execute(new CheckedRunnable() {
475              public void realRun() throws InterruptedException {
476 <                Thread.sleep(SMALL_DELAY_MS);
476 >                threadsStarted.await();
477                  assertSame(one, q.take());
478              }});
479  
# Line 581 | Line 481 | public class SynchronousQueueTest extend
481      }
482  
483      /**
484 <     * poll retrieves elements across Executor threads
484 >     * timed poll retrieves elements across Executor threads
485       */
486      public void testPollInExecutor() {
487          final SynchronousQueue q = new SynchronousQueue();
488 +        final CheckedBarrier threadsStarted = new CheckedBarrier(2);
489          ExecutorService executor = Executors.newFixedThreadPool(2);
490          executor.execute(new CheckedRunnable() {
491              public void realRun() throws InterruptedException {
492                  assertNull(q.poll());
493 <                assertSame(one, q.poll(MEDIUM_DELAY_MS, MILLISECONDS));
493 >                threadsStarted.await();
494 >                assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
495                  assertTrue(q.isEmpty());
496              }});
497  
498          executor.execute(new CheckedRunnable() {
499              public void realRun() throws InterruptedException {
500 <                Thread.sleep(SHORT_DELAY_MS);
500 >                threadsStarted.await();
501                  q.put(one);
502              }});
503  
# Line 658 | Line 560 | public class SynchronousQueueTest extend
560       */
561      public void testDrainToWithActivePut() throws InterruptedException {
562          final SynchronousQueue q = new SynchronousQueue();
563 <        Thread t = new Thread(new CheckedRunnable() {
563 >        Thread t = newStartedThread(new CheckedRunnable() {
564              public void realRun() throws InterruptedException {
565 <                q.put(new Integer(1));
565 >                q.put(one);
566              }});
567  
666        t.start();
568          ArrayList l = new ArrayList();
569 <        Thread.sleep(SHORT_DELAY_MS);
570 <        q.drainTo(l);
571 <        assertTrue(l.size() <= 1);
572 <        if (l.size() > 0)
573 <            assertEquals(l.get(0), new Integer(1));
574 <        t.join();
575 <        assertTrue(l.size() <= 1);
569 >        long startTime = System.nanoTime();
570 >        while (l.isEmpty()) {
571 >            q.drainTo(l);
572 >            if (millisElapsedSince(startTime) > LONG_DELAY_MS)
573 >                fail("timed out");
574 >            Thread.yield();
575 >        }
576 >        assertTrue(l.size() == 1);
577 >        assertSame(one, l.get(0));
578 >        awaitTermination(t);
579      }
580  
581      /**
# Line 701 | Line 605 | public class SynchronousQueueTest extend
605       */
606      public void testDrainToN() throws InterruptedException {
607          final SynchronousQueue q = new SynchronousQueue();
608 <        Thread t1 = new Thread(new CheckedRunnable() {
608 >        Thread t1 = newStartedThread(new CheckedRunnable() {
609              public void realRun() throws InterruptedException {
610                  q.put(one);
611              }});
612  
613 <        Thread t2 = new Thread(new CheckedRunnable() {
613 >        Thread t2 = newStartedThread(new CheckedRunnable() {
614              public void realRun() throws InterruptedException {
615                  q.put(two);
616              }});
617  
714        t1.start();
715        t2.start();
618          ArrayList l = new ArrayList();
619 <        Thread.sleep(SHORT_DELAY_MS);
619 >        delay(SHORT_DELAY_MS);
620          q.drainTo(l, 1);
621          assertEquals(1, l.size());
622          q.drainTo(l, 1);
623          assertEquals(2, l.size());
624          assertTrue(l.contains(one));
625          assertTrue(l.contains(two));
626 <        t1.join();
627 <        t2.join();
626 >        awaitTermination(t1);
627 >        awaitTermination(t2);
628      }
629  
630   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines