ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SynchronousQueueTest.java
(Generate patch)

Comparing jsr166/src/test/tck/SynchronousQueueTest.java (file contents):
Revision 1.28 by jsr166, Thu Oct 28 22:20:47 2010 UTC vs.
Revision 1.35 by jsr166, Fri May 27 20:07:24 2011 UTC

# Line 1 | Line 1
1   /*
2   * Written by Doug Lea with assistance from members of JCP JSR-166
3   * Expert Group and released to the public domain, as explained at
4 < * http://creativecommons.org/licenses/publicdomain
4 > * http://creativecommons.org/publicdomain/zero/1.0/
5   * Other contributors include Andrew Wright, Jeffrey Hayes,
6   * Pat Fisher, Mike Judd.
7   */
# Line 37 | Line 37 | public class SynchronousQueueTest extend
37      }
38  
39      /**
40 <     * A SynchronousQueue is both empty and full
40 >     * Any SynchronousQueue is both empty and full
41       */
42 <    public void testEmptyFull() {
43 <        SynchronousQueue q = new SynchronousQueue();
44 <        assertTrue(q.isEmpty());
45 <        assertEquals(0, q.size());
46 <        assertEquals(0, q.remainingCapacity());
47 <        assertFalse(q.offer(zero));
48 <    }
49 <
50 <    /**
51 <     * A fair SynchronousQueue is both empty and full
52 <     */
53 <    public void testFairEmptyFull() {
54 <        SynchronousQueue q = new SynchronousQueue(true);
42 >    public void testEmptyFull()      { testEmptyFull(false); }
43 >    public void testEmptyFull_fair() { testEmptyFull(true); }
44 >    public void testEmptyFull(boolean fair) {
45 >        final SynchronousQueue q = new SynchronousQueue(fair);
46          assertTrue(q.isEmpty());
47          assertEquals(0, q.size());
48          assertEquals(0, q.remainingCapacity());
# Line 157 | Line 148 | public class SynchronousQueueTest extend
148              q.put(null);
149              shouldThrow();
150          } catch (NullPointerException success) {}
160     }
161
162    /**
163     * put blocks interruptibly if no active taker
164     */
165    public void testBlockingPut() throws InterruptedException {
166        Thread t = new Thread(new CheckedInterruptedRunnable() {
167            public void realRun() throws InterruptedException {
168                SynchronousQueue q = new SynchronousQueue();
169                q.put(zero);
170            }});
171
172        t.start();
173        Thread.sleep(SHORT_DELAY_MS);
174        t.interrupt();
175        t.join();
151      }
152  
153      /**
154 <     * put blocks waiting for take
154 >     * put blocks interruptibly if no active taker
155       */
156 <    public void testPutWithTake() throws InterruptedException {
157 <        final SynchronousQueue q = new SynchronousQueue();
158 <        Thread t = new Thread(new CheckedRunnable() {
156 >    public void testBlockingPut()      { testBlockingPut(false); }
157 >    public void testBlockingPut_fair() { testBlockingPut(true); }
158 >    public void testBlockingPut(boolean fair) {
159 >        final SynchronousQueue q = new SynchronousQueue(fair);
160 >        final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
161 >        Thread t = newStartedThread(new CheckedRunnable() {
162              public void realRun() throws InterruptedException {
163 <                int added = 0;
163 >                Thread.currentThread().interrupt();
164                  try {
165 <                    while (true) {
166 <                        q.put(added);
167 <                        ++added;
168 <                    }
191 <                } catch (InterruptedException success) {
192 <                    assertEquals(1, added);
193 <                }
194 <            }});
195 <
196 <        t.start();
197 <        Thread.sleep(SHORT_DELAY_MS);
198 <        assertEquals(0, q.take());
199 <        Thread.sleep(SHORT_DELAY_MS);
200 <        t.interrupt();
201 <        t.join();
202 <    }
165 >                    q.put(99);
166 >                    shouldThrow();
167 >                } catch (InterruptedException success) {}
168 >                assertFalse(Thread.interrupted());
169  
170 <    /**
171 <     * timed offer times out if elements not taken
172 <     */
173 <    public void testTimedOffer() throws InterruptedException {
174 <        final SynchronousQueue q = new SynchronousQueue();
175 <        Thread t = new Thread(new CheckedInterruptedRunnable() {
210 <            public void realRun() throws InterruptedException {
211 <                assertFalse(q.offer(new Object(), SHORT_DELAY_MS, MILLISECONDS));
212 <                q.offer(new Object(), LONG_DELAY_MS, MILLISECONDS);
170 >                pleaseInterrupt.countDown();
171 >                try {
172 >                    q.put(99);
173 >                    shouldThrow();
174 >                } catch (InterruptedException success) {}
175 >                assertFalse(Thread.interrupted());
176              }});
177  
178 <        t.start();
179 <        Thread.sleep(SMALL_DELAY_MS);
178 >        await(pleaseInterrupt);
179 >        assertThreadStaysAlive(t);
180          t.interrupt();
181 <        t.join();
181 >        awaitTermination(t);
182 >        assertEquals(0, q.remainingCapacity());
183      }
184  
185      /**
186 <     * put blocks interruptibly if no active taker
186 >     * put blocks interruptibly waiting for take
187       */
188 <    public void testFairBlockingPut() throws InterruptedException {
189 <        Thread t = new Thread(new CheckedInterruptedRunnable() {
188 >    public void testPutWithTake()      { testPutWithTake(false); }
189 >    public void testPutWithTake_fair() { testPutWithTake(true); }
190 >    public void testPutWithTake(boolean fair) {
191 >        final SynchronousQueue q = new SynchronousQueue(fair);
192 >        final CountDownLatch pleaseTake = new CountDownLatch(1);
193 >        final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
194 >        Thread t = newStartedThread(new CheckedRunnable() {
195              public void realRun() throws InterruptedException {
196 <                SynchronousQueue q = new SynchronousQueue(true);
197 <                q.put(zero);
229 <            }});
230 <
231 <        t.start();
232 <        Thread.sleep(SHORT_DELAY_MS);
233 <        t.interrupt();
234 <        t.join();
235 <    }
196 >                pleaseTake.countDown();
197 >                q.put(one);
198  
199 <    /**
238 <     * put blocks waiting for take
239 <     */
240 <    public void testFairPutWithTake() throws InterruptedException {
241 <        final SynchronousQueue q = new SynchronousQueue(true);
242 <        Thread t = new Thread(new CheckedRunnable() {
243 <            public void realRun() throws InterruptedException {
244 <                int added = 0;
199 >                pleaseInterrupt.countDown();
200                  try {
201 <                    while (true) {
202 <                        q.put(added);
203 <                        ++added;
204 <                    }
250 <                } catch (InterruptedException success) {
251 <                    assertEquals(1, added);
252 <                }
201 >                    q.put(99);
202 >                    shouldThrow();
203 >                } catch (InterruptedException success) {}
204 >                assertFalse(Thread.interrupted());
205              }});
206  
207 <        t.start();
208 <        Thread.sleep(SHORT_DELAY_MS);
209 <        assertEquals(0, q.take());
210 <        Thread.sleep(SHORT_DELAY_MS);
259 <        t.interrupt();
260 <        t.join();
261 <    }
207 >        await(pleaseTake);
208 >        assertEquals(q.remainingCapacity(), 0);
209 >        try { assertSame(one, q.take()); }
210 >        catch (InterruptedException e) { threadUnexpectedException(e); }
211  
212 <    /**
213 <     * timed offer times out if elements not taken
265 <     */
266 <    public void testFairTimedOffer() throws InterruptedException {
267 <        final SynchronousQueue q = new SynchronousQueue(true);
268 <        Thread t = new Thread(new CheckedInterruptedRunnable() {
269 <            public void realRun() throws InterruptedException {
270 <                assertFalse(q.offer(new Object(), SHORT_DELAY_MS, MILLISECONDS));
271 <                q.offer(new Object(), LONG_DELAY_MS, MILLISECONDS);
272 <            }});
273 <
274 <        t.start();
275 <        Thread.sleep(SMALL_DELAY_MS);
212 >        await(pleaseInterrupt);
213 >        assertThreadStaysAlive(t);
214          t.interrupt();
215 <        t.join();
215 >        awaitTermination(t);
216 >        assertEquals(q.remainingCapacity(), 0);
217      }
218  
280
219      /**
220 <     * take blocks interruptibly when empty
220 >     * timed offer times out if elements not taken
221       */
222 <    public void testFairTakeFromEmpty() throws InterruptedException {
223 <        final SynchronousQueue q = new SynchronousQueue(true);
224 <        Thread t = new Thread(new CheckedInterruptedRunnable() {
225 <            public void realRun() throws InterruptedException {
226 <                q.take();
222 >    public void testTimedOffer()      { testTimedOffer(false); }
223 >    public void testTimedOffer_fair() { testTimedOffer(true); }
224 >    public void testTimedOffer(boolean fair) {
225 >        final SynchronousQueue q = new SynchronousQueue(fair);
226 >        final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
227 >        Thread t = newStartedThread(new CheckedRunnable() {
228 >            public void realRun() throws InterruptedException {
229 >                long startTime = System.nanoTime();
230 >                assertFalse(q.offer(new Object(), timeoutMillis(), MILLISECONDS));
231 >                assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
232 >                pleaseInterrupt.countDown();
233 >                try {
234 >                    q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS);
235 >                    shouldThrow();
236 >                } catch (InterruptedException success) {}
237              }});
238  
239 <        t.start();
240 <        Thread.sleep(SHORT_DELAY_MS);
239 >        await(pleaseInterrupt);
240 >        assertThreadStaysAlive(t);
241          t.interrupt();
242 <        t.join();
242 >        awaitTermination(t);
243      }
244  
245      /**
246 <     * poll fails unless active taker
246 >     * poll return null if no active putter
247       */
248      public void testPoll() {
249          SynchronousQueue q = new SynchronousQueue();
# Line 303 | Line 251 | public class SynchronousQueueTest extend
251      }
252  
253      /**
254 <     * timed poll with zero timeout times out if no active taker
254 >     * timed poll with zero timeout times out if no active putter
255       */
256      public void testTimedPoll0() throws InterruptedException {
257          SynchronousQueue q = new SynchronousQueue();
# Line 311 | Line 259 | public class SynchronousQueueTest extend
259      }
260  
261      /**
262 <     * timed poll with nonzero timeout times out if no active taker
262 >     * timed poll with nonzero timeout times out if no active putter
263       */
264      public void testTimedPoll() throws InterruptedException {
265          SynchronousQueue q = new SynchronousQueue();
266 <        assertNull(q.poll(SHORT_DELAY_MS, MILLISECONDS));
267 <    }
268 <
321 <    /**
322 <     * Interrupted timed poll throws InterruptedException instead of
323 <     * returning timeout status
324 <     */
325 <    public void testInterruptedTimedPoll() throws InterruptedException {
326 <        final SynchronousQueue q = new SynchronousQueue();
327 <        Thread t = new Thread(new CheckedInterruptedRunnable() {
328 <            public void realRun() throws InterruptedException {
329 <                q.poll(SMALL_DELAY_MS, MILLISECONDS);
330 <            }});
331 <
332 <        t.start();
333 <        Thread.sleep(SHORT_DELAY_MS);
334 <        t.interrupt();
335 <        t.join();
266 >        long startTime = System.nanoTime();
267 >        assertNull(q.poll(timeoutMillis(), MILLISECONDS));
268 >        assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
269      }
270  
271      /**
272 <     * Interrupted timed poll throws InterruptedException instead of
273 <     * returning timeout status
341 <     */
342 <    public void testFairInterruptedTimedPoll() throws InterruptedException {
343 <        Thread t = new Thread(new CheckedInterruptedRunnable() {
344 <            public void realRun() throws InterruptedException {
345 <                SynchronousQueue q = new SynchronousQueue(true);
346 <                q.poll(SMALL_DELAY_MS, MILLISECONDS);
347 <            }});
348 <
349 <        t.start();
350 <        Thread.sleep(SHORT_DELAY_MS);
351 <        t.interrupt();
352 <        t.join();
353 <    }
354 <
355 <    /**
356 <     * timed poll before a delayed offer fails; after offer succeeds;
357 <     * on interruption throws
272 >     * timed poll before a delayed offer times out, returning null;
273 >     * after offer succeeds; on interruption throws
274       */
275      public void testFairTimedPollWithOffer() throws InterruptedException {
276          final SynchronousQueue q = new SynchronousQueue(true);
277 <        Thread t = new Thread(new CheckedRunnable() {
277 >        final CountDownLatch pleaseOffer = new CountDownLatch(1);
278 >        Thread t = newStartedThread(new CheckedRunnable() {
279              public void realRun() throws InterruptedException {
280 +                long t0 = System.nanoTime();
281 +                assertNull(q.poll(SHORT_DELAY_MS, MILLISECONDS));
282 +                assertTrue(millisElapsedSince(t0) >= SHORT_DELAY_MS);
283 +
284 +                pleaseOffer.countDown();
285 +                t0 = System.nanoTime();
286 +                assertSame(zero, q.poll(LONG_DELAY_MS, MILLISECONDS));
287 +                assertTrue(millisElapsedSince(t0) < MEDIUM_DELAY_MS);
288 +
289 +                t0 = System.nanoTime();
290                  try {
364                    assertNull(q.poll(SHORT_DELAY_MS, MILLISECONDS));
365                    assertSame(zero, q.poll(LONG_DELAY_MS, MILLISECONDS));
291                      q.poll(LONG_DELAY_MS, MILLISECONDS);
292 <                    threadShouldThrow();
292 >                    shouldThrow();
293                  } catch (InterruptedException success) {}
294 +                assertTrue(millisElapsedSince(t0) < MEDIUM_DELAY_MS);
295              }});
296  
297 <        t.start();
298 <        Thread.sleep(SMALL_DELAY_MS);
299 <        assertTrue(q.offer(zero, SHORT_DELAY_MS, MILLISECONDS));
297 >        assertTrue(pleaseOffer.await(MEDIUM_DELAY_MS, MILLISECONDS));
298 >        long t0 = System.nanoTime();
299 >        assertTrue(q.offer(zero, LONG_DELAY_MS, MILLISECONDS));
300 >        assertTrue(millisElapsedSince(t0) < MEDIUM_DELAY_MS);
301 >
302          t.interrupt();
303 <        t.join();
303 >        awaitTermination(t, MEDIUM_DELAY_MS);
304      }
305  
378
306      /**
307 <     * peek returns null
307 >     * peek() returns null if no active putter
308       */
309      public void testPeek() {
310          SynchronousQueue q = new SynchronousQueue();
# Line 385 | Line 312 | public class SynchronousQueueTest extend
312      }
313  
314      /**
315 <     * element throws NSEE
315 >     * element() throws NSEE if no active putter
316       */
317      public void testElement() {
318          SynchronousQueue q = new SynchronousQueue();
# Line 396 | Line 323 | public class SynchronousQueueTest extend
323      }
324  
325      /**
326 <     * remove throws NSEE if no active taker
326 >     * remove() throws NSEE if no active putter
327       */
328      public void testRemove() {
329          SynchronousQueue q = new SynchronousQueue();
# Line 465 | Line 392 | public class SynchronousQueueTest extend
392          assertFalse(q.containsAll(Arrays.asList(ints)));
393      }
394  
468
395      /**
396       * toArray is empty
397       */
# Line 495 | Line 421 | public class SynchronousQueueTest extend
421          } catch (NullPointerException success) {}
422      }
423  
498
424      /**
425       * iterator does not traverse any elements
426       */
# Line 530 | Line 455 | public class SynchronousQueueTest extend
455          assertNotNull(s);
456      }
457  
533
458      /**
459       * offer transfers elements across Executor tasks
460       */
461      public void testOfferInExecutor() {
462          final SynchronousQueue q = new SynchronousQueue();
463          ExecutorService executor = Executors.newFixedThreadPool(2);
464 +        final CheckedBarrier threadsStarted = new CheckedBarrier(2);
465  
466          executor.execute(new CheckedRunnable() {
467              public void realRun() throws InterruptedException {
468                  assertFalse(q.offer(one));
469 <                assertTrue(q.offer(one, MEDIUM_DELAY_MS, MILLISECONDS));
469 >                threadsStarted.await();
470 >                assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
471                  assertEquals(0, q.remainingCapacity());
472              }});
473  
474          executor.execute(new CheckedRunnable() {
475              public void realRun() throws InterruptedException {
476 <                Thread.sleep(SMALL_DELAY_MS);
476 >                threadsStarted.await();
477                  assertSame(one, q.take());
478              }});
479  
# Line 555 | Line 481 | public class SynchronousQueueTest extend
481      }
482  
483      /**
484 <     * poll retrieves elements across Executor threads
484 >     * timed poll retrieves elements across Executor threads
485       */
486      public void testPollInExecutor() {
487          final SynchronousQueue q = new SynchronousQueue();
488 +        final CheckedBarrier threadsStarted = new CheckedBarrier(2);
489          ExecutorService executor = Executors.newFixedThreadPool(2);
490          executor.execute(new CheckedRunnable() {
491              public void realRun() throws InterruptedException {
492                  assertNull(q.poll());
493 <                assertSame(one, q.poll(MEDIUM_DELAY_MS, MILLISECONDS));
493 >                threadsStarted.await();
494 >                assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
495                  assertTrue(q.isEmpty());
496              }});
497  
498          executor.execute(new CheckedRunnable() {
499              public void realRun() throws InterruptedException {
500 <                Thread.sleep(SHORT_DELAY_MS);
500 >                threadsStarted.await();
501                  q.put(one);
502              }});
503  
# Line 632 | Line 560 | public class SynchronousQueueTest extend
560       */
561      public void testDrainToWithActivePut() throws InterruptedException {
562          final SynchronousQueue q = new SynchronousQueue();
563 <        Thread t = new Thread(new CheckedRunnable() {
563 >        Thread t = newStartedThread(new CheckedRunnable() {
564              public void realRun() throws InterruptedException {
565 <                q.put(new Integer(1));
565 >                q.put(one);
566              }});
567  
640        t.start();
568          ArrayList l = new ArrayList();
569 <        Thread.sleep(SHORT_DELAY_MS);
570 <        q.drainTo(l);
571 <        assertTrue(l.size() <= 1);
572 <        if (l.size() > 0)
573 <            assertEquals(l.get(0), new Integer(1));
574 <        t.join();
575 <        assertTrue(l.size() <= 1);
569 >        long startTime = System.nanoTime();
570 >        while (l.isEmpty()) {
571 >            q.drainTo(l);
572 >            if (millisElapsedSince(startTime) > LONG_DELAY_MS)
573 >                fail("timed out");
574 >            Thread.yield();
575 >        }
576 >        assertTrue(l.size() == 1);
577 >        assertSame(one, l.get(0));
578 >        awaitTermination(t);
579      }
580  
581      /**
# Line 675 | Line 605 | public class SynchronousQueueTest extend
605       */
606      public void testDrainToN() throws InterruptedException {
607          final SynchronousQueue q = new SynchronousQueue();
608 <        Thread t1 = new Thread(new CheckedRunnable() {
608 >        Thread t1 = newStartedThread(new CheckedRunnable() {
609              public void realRun() throws InterruptedException {
610                  q.put(one);
611              }});
612  
613 <        Thread t2 = new Thread(new CheckedRunnable() {
613 >        Thread t2 = newStartedThread(new CheckedRunnable() {
614              public void realRun() throws InterruptedException {
615                  q.put(two);
616              }});
617  
688        t1.start();
689        t2.start();
618          ArrayList l = new ArrayList();
619 <        Thread.sleep(SHORT_DELAY_MS);
619 >        delay(SHORT_DELAY_MS);
620          q.drainTo(l, 1);
621          assertEquals(1, l.size());
622          q.drainTo(l, 1);
623          assertEquals(2, l.size());
624          assertTrue(l.contains(one));
625          assertTrue(l.contains(two));
626 <        t1.join();
627 <        t2.join();
626 >        awaitTermination(t1);
627 >        awaitTermination(t2);
628      }
629  
630   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines