ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SynchronousQueueTest.java
(Generate patch)

Comparing jsr166/src/test/tck/SynchronousQueueTest.java (file contents):
Revision 1.34 by jsr166, Sat May 21 06:24:33 2011 UTC vs.
Revision 1.35 by jsr166, Fri May 27 20:07:24 2011 UTC

# Line 39 | Line 39 | public class SynchronousQueueTest extend
39      /**
40       * Any SynchronousQueue is both empty and full
41       */
42 <    public void testEmptyFull(SynchronousQueue q) {
42 >    public void testEmptyFull()      { testEmptyFull(false); }
43 >    public void testEmptyFull_fair() { testEmptyFull(true); }
44 >    public void testEmptyFull(boolean fair) {
45 >        final SynchronousQueue q = new SynchronousQueue(fair);
46          assertTrue(q.isEmpty());
47          assertEquals(0, q.size());
48          assertEquals(0, q.remainingCapacity());
# Line 47 | Line 50 | public class SynchronousQueueTest extend
50      }
51  
52      /**
50     * A non-fair SynchronousQueue is both empty and full
51     */
52    public void testEmptyFull() {
53        testEmptyFull(new SynchronousQueue());
54    }
55
56    /**
57     * A fair SynchronousQueue is both empty and full
58     */
59    public void testFairEmptyFull() {
60        testEmptyFull(new SynchronousQueue(true));
61    }
62
63    /**
53       * offer(null) throws NPE
54       */
55      public void testOfferNull() {
# Line 164 | Line 153 | public class SynchronousQueueTest extend
153      /**
154       * put blocks interruptibly if no active taker
155       */
156 <    public void testBlockingPut() throws InterruptedException {
157 <        Thread t = new Thread(new CheckedInterruptedRunnable() {
156 >    public void testBlockingPut()      { testBlockingPut(false); }
157 >    public void testBlockingPut_fair() { testBlockingPut(true); }
158 >    public void testBlockingPut(boolean fair) {
159 >        final SynchronousQueue q = new SynchronousQueue(fair);
160 >        final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
161 >        Thread t = newStartedThread(new CheckedRunnable() {
162              public void realRun() throws InterruptedException {
163 <                SynchronousQueue q = new SynchronousQueue();
164 <                q.put(zero);
163 >                Thread.currentThread().interrupt();
164 >                try {
165 >                    q.put(99);
166 >                    shouldThrow();
167 >                } catch (InterruptedException success) {}
168 >                assertFalse(Thread.interrupted());
169 >
170 >                pleaseInterrupt.countDown();
171 >                try {
172 >                    q.put(99);
173 >                    shouldThrow();
174 >                } catch (InterruptedException success) {}
175 >                assertFalse(Thread.interrupted());
176              }});
177  
178 <        t.start();
179 <        delay(SHORT_DELAY_MS);
178 >        await(pleaseInterrupt);
179 >        assertThreadStaysAlive(t);
180          t.interrupt();
181 <        t.join();
181 >        awaitTermination(t);
182 >        assertEquals(0, q.remainingCapacity());
183      }
184  
185      /**
186 <     * put blocks waiting for take
186 >     * put blocks interruptibly waiting for take
187       */
188 <    public void testPutWithTake() throws InterruptedException {
189 <        final SynchronousQueue q = new SynchronousQueue();
190 <        Thread t = new Thread(new CheckedRunnable() {
188 >    public void testPutWithTake()      { testPutWithTake(false); }
189 >    public void testPutWithTake_fair() { testPutWithTake(true); }
190 >    public void testPutWithTake(boolean fair) {
191 >        final SynchronousQueue q = new SynchronousQueue(fair);
192 >        final CountDownLatch pleaseTake = new CountDownLatch(1);
193 >        final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
194 >        Thread t = newStartedThread(new CheckedRunnable() {
195              public void realRun() throws InterruptedException {
196 <                int added = 0;
196 >                pleaseTake.countDown();
197 >                q.put(one);
198 >
199 >                pleaseInterrupt.countDown();
200                  try {
201 <                    while (true) {
202 <                        q.put(added);
203 <                        ++added;
204 <                    }
193 <                } catch (InterruptedException success) {
194 <                    assertEquals(1, added);
195 <                }
201 >                    q.put(99);
202 >                    shouldThrow();
203 >                } catch (InterruptedException success) {}
204 >                assertFalse(Thread.interrupted());
205              }});
206  
207 <        t.start();
208 <        delay(SHORT_DELAY_MS);
209 <        assertEquals(0, q.take());
210 <        delay(SHORT_DELAY_MS);
207 >        await(pleaseTake);
208 >        assertEquals(q.remainingCapacity(), 0);
209 >        try { assertSame(one, q.take()); }
210 >        catch (InterruptedException e) { threadUnexpectedException(e); }
211 >
212 >        await(pleaseInterrupt);
213 >        assertThreadStaysAlive(t);
214          t.interrupt();
215 <        t.join();
215 >        awaitTermination(t);
216 >        assertEquals(q.remainingCapacity(), 0);
217      }
218  
219      /**
220       * timed offer times out if elements not taken
221       */
222 <    public void testTimedOffer(final SynchronousQueue q)
223 <            throws InterruptedException {
222 >    public void testTimedOffer()      { testTimedOffer(false); }
223 >    public void testTimedOffer_fair() { testTimedOffer(true); }
224 >    public void testTimedOffer(boolean fair) {
225 >        final SynchronousQueue q = new SynchronousQueue(fair);
226          final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
227          Thread t = newStartedThread(new CheckedRunnable() {
228              public void realRun() throws InterruptedException {
# Line 222 | Line 237 | public class SynchronousQueueTest extend
237              }});
238  
239          await(pleaseInterrupt);
240 +        assertThreadStaysAlive(t);
241          t.interrupt();
242          awaitTermination(t);
243      }
244  
245      /**
230     * timed offer times out if elements not taken
231     */
232    public void testTimedOffer() throws InterruptedException {
233        testTimedOffer(new SynchronousQueue());
234    }
235
236    /**
237     * timed offer times out if elements not taken
238     */
239    public void testFairTimedOffer() throws InterruptedException {
240        testTimedOffer(new SynchronousQueue(true));
241    }
242
243    /**
244     * put blocks interruptibly if no active taker
245     */
246    public void testFairBlockingPut() throws InterruptedException {
247        Thread t = new Thread(new CheckedInterruptedRunnable() {
248            public void realRun() throws InterruptedException {
249                SynchronousQueue q = new SynchronousQueue(true);
250                q.put(zero);
251            }});
252
253        t.start();
254        delay(SHORT_DELAY_MS);
255        t.interrupt();
256        t.join();
257    }
258
259    /**
260     * put blocks waiting for take
261     */
262    public void testFairPutWithTake() throws InterruptedException {
263        final SynchronousQueue q = new SynchronousQueue(true);
264        Thread t = new Thread(new CheckedRunnable() {
265            public void realRun() throws InterruptedException {
266                int added = 0;
267                try {
268                    while (true) {
269                        q.put(added);
270                        ++added;
271                    }
272                } catch (InterruptedException success) {
273                    assertEquals(1, added);
274                }
275            }});
276
277        t.start();
278        delay(SHORT_DELAY_MS);
279        assertEquals(0, q.take());
280        delay(SHORT_DELAY_MS);
281        t.interrupt();
282        t.join();
283    }
284
285    /**
286     * take blocks interruptibly when empty
287     */
288    public void testFairTakeFromEmpty() throws InterruptedException {
289        final SynchronousQueue q = new SynchronousQueue(true);
290        Thread t = new Thread(new CheckedInterruptedRunnable() {
291            public void realRun() throws InterruptedException {
292                q.take();
293            }});
294
295        t.start();
296        delay(SHORT_DELAY_MS);
297        t.interrupt();
298        t.join();
299    }
300
301    /**
246       * poll return null if no active putter
247       */
248      public void testPoll() {
# Line 319 | Line 263 | public class SynchronousQueueTest extend
263       */
264      public void testTimedPoll() throws InterruptedException {
265          SynchronousQueue q = new SynchronousQueue();
266 <        long t0 = System.nanoTime();
267 <        assertNull(q.poll(SHORT_DELAY_MS, MILLISECONDS));
268 <        assertTrue(millisElapsedSince(t0) >= SHORT_DELAY_MS);
325 <    }
326 <
327 <    /**
328 <     * Interrupted timed poll throws InterruptedException instead of
329 <     * returning timeout status
330 <     */
331 <    public void testInterruptedTimedPoll(final SynchronousQueue q)
332 <            throws InterruptedException {
333 <        final CountDownLatch threadStarted = new CountDownLatch(1);
334 <        Thread t = newStartedThread(new CheckedRunnable() {
335 <            public void realRun() throws InterruptedException {
336 <                long t0 = System.nanoTime();
337 <                threadStarted.countDown();
338 <                try {
339 <                    q.poll(LONG_DELAY_MS, MILLISECONDS);
340 <                    shouldThrow();
341 <                } catch (InterruptedException success) {}
342 <                assertTrue(millisElapsedSince(t0) >= SHORT_DELAY_MS);
343 <                assertTrue(millisElapsedSince(t0) < MEDIUM_DELAY_MS);
344 <            }});
345 <
346 <        threadStarted.await();
347 <        delay(SHORT_DELAY_MS);
348 <        t.interrupt();
349 <        awaitTermination(t, MEDIUM_DELAY_MS);
350 <    }
351 <
352 <    /**
353 <     * Interrupted timed poll throws InterruptedException instead of
354 <     * returning timeout status
355 <     */
356 <    public void testInterruptedTimedPoll() throws InterruptedException {
357 <        testInterruptedTimedPoll(new SynchronousQueue());
358 <    }
359 <
360 <    /**
361 <     * Interrupted timed poll throws InterruptedException instead of
362 <     * returning timeout status
363 <     */
364 <    public void testFairInterruptedTimedPoll() throws InterruptedException {
365 <        testInterruptedTimedPoll(new SynchronousQueue(true));
266 >        long startTime = System.nanoTime();
267 >        assertNull(q.poll(timeoutMillis(), MILLISECONDS));
268 >        assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
269      }
270  
271      /**
# Line 489 | Line 392 | public class SynchronousQueueTest extend
392          assertFalse(q.containsAll(Arrays.asList(ints)));
393      }
394  
492
395      /**
396       * toArray is empty
397       */
# Line 519 | Line 421 | public class SynchronousQueueTest extend
421          } catch (NullPointerException success) {}
422      }
423  
522
424      /**
425       * iterator does not traverse any elements
426       */
# Line 554 | Line 455 | public class SynchronousQueueTest extend
455          assertNotNull(s);
456      }
457  
557
458      /**
459       * offer transfers elements across Executor tasks
460       */
461      public void testOfferInExecutor() {
462          final SynchronousQueue q = new SynchronousQueue();
463          ExecutorService executor = Executors.newFixedThreadPool(2);
464 +        final CheckedBarrier threadsStarted = new CheckedBarrier(2);
465  
466          executor.execute(new CheckedRunnable() {
467              public void realRun() throws InterruptedException {
468                  assertFalse(q.offer(one));
469 <                assertTrue(q.offer(one, MEDIUM_DELAY_MS, MILLISECONDS));
469 >                threadsStarted.await();
470 >                assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
471                  assertEquals(0, q.remainingCapacity());
472              }});
473  
474          executor.execute(new CheckedRunnable() {
475              public void realRun() throws InterruptedException {
476 <                delay(SMALL_DELAY_MS);
476 >                threadsStarted.await();
477                  assertSame(one, q.take());
478              }});
479  
# Line 579 | Line 481 | public class SynchronousQueueTest extend
481      }
482  
483      /**
484 <     * poll retrieves elements across Executor threads
484 >     * timed poll retrieves elements across Executor threads
485       */
486      public void testPollInExecutor() {
487          final SynchronousQueue q = new SynchronousQueue();
488 +        final CheckedBarrier threadsStarted = new CheckedBarrier(2);
489          ExecutorService executor = Executors.newFixedThreadPool(2);
490          executor.execute(new CheckedRunnable() {
491              public void realRun() throws InterruptedException {
492                  assertNull(q.poll());
493 <                assertSame(one, q.poll(MEDIUM_DELAY_MS, MILLISECONDS));
493 >                threadsStarted.await();
494 >                assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
495                  assertTrue(q.isEmpty());
496              }});
497  
498          executor.execute(new CheckedRunnable() {
499              public void realRun() throws InterruptedException {
500 <                delay(SHORT_DELAY_MS);
500 >                threadsStarted.await();
501                  q.put(one);
502              }});
503  
# Line 656 | Line 560 | public class SynchronousQueueTest extend
560       */
561      public void testDrainToWithActivePut() throws InterruptedException {
562          final SynchronousQueue q = new SynchronousQueue();
563 <        Thread t = new Thread(new CheckedRunnable() {
563 >        Thread t = newStartedThread(new CheckedRunnable() {
564              public void realRun() throws InterruptedException {
565 <                q.put(new Integer(1));
565 >                q.put(one);
566              }});
567  
664        t.start();
568          ArrayList l = new ArrayList();
569 <        delay(SHORT_DELAY_MS);
570 <        q.drainTo(l);
571 <        assertTrue(l.size() <= 1);
572 <        if (l.size() > 0)
573 <            assertEquals(l.get(0), new Integer(1));
574 <        t.join();
575 <        assertTrue(l.size() <= 1);
569 >        long startTime = System.nanoTime();
570 >        while (l.isEmpty()) {
571 >            q.drainTo(l);
572 >            if (millisElapsedSince(startTime) > LONG_DELAY_MS)
573 >                fail("timed out");
574 >            Thread.yield();
575 >        }
576 >        assertTrue(l.size() == 1);
577 >        assertSame(one, l.get(0));
578 >        awaitTermination(t);
579      }
580  
581      /**
# Line 699 | Line 605 | public class SynchronousQueueTest extend
605       */
606      public void testDrainToN() throws InterruptedException {
607          final SynchronousQueue q = new SynchronousQueue();
608 <        Thread t1 = new Thread(new CheckedRunnable() {
608 >        Thread t1 = newStartedThread(new CheckedRunnable() {
609              public void realRun() throws InterruptedException {
610                  q.put(one);
611              }});
612  
613 <        Thread t2 = new Thread(new CheckedRunnable() {
613 >        Thread t2 = newStartedThread(new CheckedRunnable() {
614              public void realRun() throws InterruptedException {
615                  q.put(two);
616              }});
617  
712        t1.start();
713        t2.start();
618          ArrayList l = new ArrayList();
619          delay(SHORT_DELAY_MS);
620          q.drainTo(l, 1);
# Line 719 | Line 623 | public class SynchronousQueueTest extend
623          assertEquals(2, l.size());
624          assertTrue(l.contains(one));
625          assertTrue(l.contains(two));
626 <        t1.join();
627 <        t2.join();
626 >        awaitTermination(t1);
627 >        awaitTermination(t2);
628      }
629  
630   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines