ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SynchronousQueueTest.java
(Generate patch)

Comparing jsr166/src/test/tck/SynchronousQueueTest.java (file contents):
Revision 1.11 by jsr166, Mon Nov 16 04:57:10 2009 UTC vs.
Revision 1.35 by jsr166, Fri May 27 20:07:24 2011 UTC

# Line 1 | Line 1
1   /*
2   * Written by Doug Lea with assistance from members of JCP JSR-166
3   * Expert Group and released to the public domain, as explained at
4 < * http://creativecommons.org/licenses/publicdomain
4 > * http://creativecommons.org/publicdomain/zero/1.0/
5   * Other contributors include Andrew Wright, Jeffrey Hayes,
6   * Pat Fisher, Mike Judd.
7   */
# Line 9 | Line 9
9   import junit.framework.*;
10   import java.util.*;
11   import java.util.concurrent.*;
12 + import static java.util.concurrent.TimeUnit.MILLISECONDS;
13   import java.io.*;
14  
15   public class SynchronousQueueTest extends JSR166TestCase {
16  
17 <    public static void main(String[] args) {
18 <        junit.textui.TestRunner.run (suite());
17 >    public static class Fair extends BlockingQueueTest {
18 >        protected BlockingQueue emptyCollection() {
19 >            return new SynchronousQueue(true);
20 >        }
21      }
22  
23 <    public static Test suite() {
24 <        return new TestSuite(SynchronousQueueTest.class);
23 >    public static class NonFair extends BlockingQueueTest {
24 >        protected BlockingQueue emptyCollection() {
25 >            return new SynchronousQueue(false);
26 >        }
27      }
28  
29 <    /**
30 <     * A SynchronousQueue is both empty and full
31 <     */
32 <    public void testEmptyFull() {
33 <        SynchronousQueue q = new SynchronousQueue();
34 <        assertTrue(q.isEmpty());
35 <        assertEquals(0, q.size());
36 <        assertEquals(0, q.remainingCapacity());
32 <        assertFalse(q.offer(zero));
29 >    public static void main(String[] args) {
30 >        junit.textui.TestRunner.run(suite());
31 >    }
32 >
33 >    public static Test suite() {
34 >        return newTestSuite(SynchronousQueueTest.class,
35 >                            new Fair().testSuite(),
36 >                            new NonFair().testSuite());
37      }
38  
39      /**
40 <     * A fair SynchronousQueue is both empty and full
40 >     * Any SynchronousQueue is both empty and full
41       */
42 <    public void testFairEmptyFull() {
43 <        SynchronousQueue q = new SynchronousQueue(true);
42 >    public void testEmptyFull()      { testEmptyFull(false); }
43 >    public void testEmptyFull_fair() { testEmptyFull(true); }
44 >    public void testEmptyFull(boolean fair) {
45 >        final SynchronousQueue q = new SynchronousQueue(fair);
46          assertTrue(q.isEmpty());
47 <        assertEquals(0, q.size());
47 >        assertEquals(0, q.size());
48          assertEquals(0, q.remainingCapacity());
49          assertFalse(q.offer(zero));
50      }
# Line 47 | Line 53 | public class SynchronousQueueTest extend
53       * offer(null) throws NPE
54       */
55      public void testOfferNull() {
56 <        try {
56 >        try {
57              SynchronousQueue q = new SynchronousQueue();
58              q.offer(null);
59              shouldThrow();
60 <        } catch (NullPointerException success) { }
60 >        } catch (NullPointerException success) {}
61      }
62  
63      /**
64       * add(null) throws NPE
65       */
66      public void testAddNull() {
67 <        try {
67 >        try {
68              SynchronousQueue q = new SynchronousQueue();
69              q.add(null);
70              shouldThrow();
71 <        } catch (NullPointerException success) { }
71 >        } catch (NullPointerException success) {}
72      }
73  
74      /**
# Line 77 | Line 83 | public class SynchronousQueueTest extend
83       * add throws ISE if no active taker
84       */
85      public void testAdd() {
86 <        try {
86 >        try {
87              SynchronousQueue q = new SynchronousQueue();
88              assertEquals(0, q.remainingCapacity());
89              q.add(one);
90              shouldThrow();
91 <        } catch (IllegalStateException success){
86 <        }
91 >        } catch (IllegalStateException success) {}
92      }
93  
94      /**
# Line 94 | Line 99 | public class SynchronousQueueTest extend
99              SynchronousQueue q = new SynchronousQueue();
100              q.addAll(null);
101              shouldThrow();
102 <        }
98 <        catch (NullPointerException success) {}
102 >        } catch (NullPointerException success) {}
103      }
104  
105      /**
# Line 106 | Line 110 | public class SynchronousQueueTest extend
110              SynchronousQueue q = new SynchronousQueue();
111              q.addAll(q);
112              shouldThrow();
113 <        }
110 <        catch (IllegalArgumentException success) {}
113 >        } catch (IllegalArgumentException success) {}
114      }
115  
116      /**
# Line 119 | Line 122 | public class SynchronousQueueTest extend
122              Integer[] ints = new Integer[1];
123              q.addAll(Arrays.asList(ints));
124              shouldThrow();
125 <        }
123 <        catch (NullPointerException success) {}
125 >        } catch (NullPointerException success) {}
126      }
127 +
128      /**
129       * addAll throws ISE if no active taker
130       */
# Line 133 | Line 136 | public class SynchronousQueueTest extend
136                  ints[i] = new Integer(i);
137              q.addAll(Arrays.asList(ints));
138              shouldThrow();
139 <        }
137 <        catch (IllegalStateException success) {}
139 >        } catch (IllegalStateException success) {}
140      }
141  
142      /**
143       * put(null) throws NPE
144       */
145 <    public void testPutNull() {
146 <        try {
145 >    public void testPutNull() throws InterruptedException {
146 >        try {
147              SynchronousQueue q = new SynchronousQueue();
148              q.put(null);
149              shouldThrow();
150 <        }
149 <        catch (NullPointerException success){
150 <        }
151 <        catch (InterruptedException ie) {
152 <            unexpectedException();
153 <        }
154 <     }
155 <
156 <    /**
157 <     * put blocks interruptibly if no active taker
158 <     */
159 <    public void testBlockingPut() {
160 <        Thread t = new Thread(new Runnable() {
161 <                public void run() {
162 <                    try {
163 <                        SynchronousQueue q = new SynchronousQueue();
164 <                        q.put(zero);
165 <                        threadShouldThrow();
166 <                    } catch (InterruptedException ie){
167 <                    }
168 <                }});
169 <        t.start();
170 <        try {
171 <           Thread.sleep(SHORT_DELAY_MS);
172 <           t.interrupt();
173 <           t.join();
174 <        }
175 <        catch (InterruptedException ie) {
176 <            unexpectedException();
177 <        }
150 >        } catch (NullPointerException success) {}
151      }
152  
153      /**
154 <     * put blocks waiting for take
182 <     */
183 <    public void testPutWithTake() {
184 <        final SynchronousQueue q = new SynchronousQueue();
185 <        Thread t = new Thread(new Runnable() {
186 <                public void run() {
187 <                    int added = 0;
188 <                    try {
189 <                        q.put(new Object());
190 <                        ++added;
191 <                        q.put(new Object());
192 <                        ++added;
193 <                        q.put(new Object());
194 <                        ++added;
195 <                        q.put(new Object());
196 <                        ++added;
197 <                        threadShouldThrow();
198 <                    } catch (InterruptedException e){
199 <                        assertTrue(added >= 1);
200 <                    }
201 <                }
202 <            });
203 <        try {
204 <            t.start();
205 <            Thread.sleep(SHORT_DELAY_MS);
206 <            q.take();
207 <            Thread.sleep(SHORT_DELAY_MS);
208 <            t.interrupt();
209 <            t.join();
210 <        } catch (Exception e){
211 <            unexpectedException();
212 <        }
213 <    }
214 <
215 <    /**
216 <     * timed offer times out if elements not taken
154 >     * put blocks interruptibly if no active taker
155       */
156 <    public void testTimedOffer() {
157 <        final SynchronousQueue q = new SynchronousQueue();
158 <        Thread t = new Thread(new Runnable() {
159 <                public void run() {
160 <                    try {
161 <
162 <                        threadAssertFalse(q.offer(new Object(), SHORT_DELAY_MS, TimeUnit.MILLISECONDS));
163 <                        q.offer(new Object(), LONG_DELAY_MS, TimeUnit.MILLISECONDS);
164 <                        threadShouldThrow();
165 <                    } catch (InterruptedException success){}
166 <                }
167 <            });
168 <
231 <        try {
232 <            t.start();
233 <            Thread.sleep(SMALL_DELAY_MS);
234 <            t.interrupt();
235 <            t.join();
236 <        } catch (Exception e){
237 <            unexpectedException();
238 <        }
239 <    }
240 <
156 >    public void testBlockingPut()      { testBlockingPut(false); }
157 >    public void testBlockingPut_fair() { testBlockingPut(true); }
158 >    public void testBlockingPut(boolean fair) {
159 >        final SynchronousQueue q = new SynchronousQueue(fair);
160 >        final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
161 >        Thread t = newStartedThread(new CheckedRunnable() {
162 >            public void realRun() throws InterruptedException {
163 >                Thread.currentThread().interrupt();
164 >                try {
165 >                    q.put(99);
166 >                    shouldThrow();
167 >                } catch (InterruptedException success) {}
168 >                assertFalse(Thread.interrupted());
169  
170 <    /**
171 <     * take blocks interruptibly when empty
172 <     */
173 <    public void testTakeFromEmpty() {
174 <        final SynchronousQueue q = new SynchronousQueue();
175 <        Thread t = new Thread(new Runnable() {
176 <                public void run() {
177 <                    try {
178 <                        q.take();
179 <                        threadShouldThrow();
180 <                    } catch (InterruptedException success){ }
181 <                }
182 <            });
255 <        try {
256 <            t.start();
257 <            Thread.sleep(SHORT_DELAY_MS);
258 <            t.interrupt();
259 <            t.join();
260 <        } catch (Exception e){
261 <            unexpectedException();
262 <        }
170 >                pleaseInterrupt.countDown();
171 >                try {
172 >                    q.put(99);
173 >                    shouldThrow();
174 >                } catch (InterruptedException success) {}
175 >                assertFalse(Thread.interrupted());
176 >            }});
177 >
178 >        await(pleaseInterrupt);
179 >        assertThreadStaysAlive(t);
180 >        t.interrupt();
181 >        awaitTermination(t);
182 >        assertEquals(0, q.remainingCapacity());
183      }
184  
265
185      /**
186 <     * put blocks interruptibly if no active taker
186 >     * put blocks interruptibly waiting for take
187       */
188 <    public void testFairBlockingPut() {
189 <        Thread t = new Thread(new Runnable() {
190 <                public void run() {
191 <                    try {
192 <                        SynchronousQueue q = new SynchronousQueue(true);
193 <                        q.put(zero);
194 <                        threadShouldThrow();
195 <                    } catch (InterruptedException ie){
196 <                    }
197 <                }});
279 <        t.start();
280 <        try {
281 <           Thread.sleep(SHORT_DELAY_MS);
282 <           t.interrupt();
283 <           t.join();
284 <        }
285 <        catch (InterruptedException ie) {
286 <            unexpectedException();
287 <        }
288 <    }
188 >    public void testPutWithTake()      { testPutWithTake(false); }
189 >    public void testPutWithTake_fair() { testPutWithTake(true); }
190 >    public void testPutWithTake(boolean fair) {
191 >        final SynchronousQueue q = new SynchronousQueue(fair);
192 >        final CountDownLatch pleaseTake = new CountDownLatch(1);
193 >        final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
194 >        Thread t = newStartedThread(new CheckedRunnable() {
195 >            public void realRun() throws InterruptedException {
196 >                pleaseTake.countDown();
197 >                q.put(one);
198  
199 <    /**
200 <     * put blocks waiting for take
201 <     */
202 <    public void testFairPutWithTake() {
203 <        final SynchronousQueue q = new SynchronousQueue(true);
204 <        Thread t = new Thread(new Runnable() {
205 <                public void run() {
206 <                    int added = 0;
207 <                    try {
208 <                        q.put(new Object());
209 <                        ++added;
210 <                        q.put(new Object());
211 <                        ++added;
212 <                        q.put(new Object());
213 <                        ++added;
214 <                        q.put(new Object());
215 <                        ++added;
216 <                        threadShouldThrow();
308 <                    } catch (InterruptedException e){
309 <                        assertTrue(added >= 1);
310 <                    }
311 <                }
312 <            });
313 <        try {
314 <            t.start();
315 <            Thread.sleep(SHORT_DELAY_MS);
316 <            q.take();
317 <            Thread.sleep(SHORT_DELAY_MS);
318 <            t.interrupt();
319 <            t.join();
320 <        } catch (Exception e){
321 <            unexpectedException();
322 <        }
199 >                pleaseInterrupt.countDown();
200 >                try {
201 >                    q.put(99);
202 >                    shouldThrow();
203 >                } catch (InterruptedException success) {}
204 >                assertFalse(Thread.interrupted());
205 >            }});
206 >
207 >        await(pleaseTake);
208 >        assertEquals(q.remainingCapacity(), 0);
209 >        try { assertSame(one, q.take()); }
210 >        catch (InterruptedException e) { threadUnexpectedException(e); }
211 >
212 >        await(pleaseInterrupt);
213 >        assertThreadStaysAlive(t);
214 >        t.interrupt();
215 >        awaitTermination(t);
216 >        assertEquals(q.remainingCapacity(), 0);
217      }
218  
219      /**
220       * timed offer times out if elements not taken
221       */
222 <    public void testFairTimedOffer() {
223 <        final SynchronousQueue q = new SynchronousQueue(true);
224 <        Thread t = new Thread(new Runnable() {
225 <                public void run() {
226 <                    try {
227 <
228 <                        threadAssertFalse(q.offer(new Object(), SHORT_DELAY_MS, TimeUnit.MILLISECONDS));
229 <                        q.offer(new Object(), LONG_DELAY_MS, TimeUnit.MILLISECONDS);
230 <                        threadShouldThrow();
231 <                    } catch (InterruptedException success){}
232 <                }
233 <            });
234 <
235 <        try {
236 <            t.start();
237 <            Thread.sleep(SMALL_DELAY_MS);
344 <            t.interrupt();
345 <            t.join();
346 <        } catch (Exception e){
347 <            unexpectedException();
348 <        }
349 <    }
350 <
222 >    public void testTimedOffer()      { testTimedOffer(false); }
223 >    public void testTimedOffer_fair() { testTimedOffer(true); }
224 >    public void testTimedOffer(boolean fair) {
225 >        final SynchronousQueue q = new SynchronousQueue(fair);
226 >        final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
227 >        Thread t = newStartedThread(new CheckedRunnable() {
228 >            public void realRun() throws InterruptedException {
229 >                long startTime = System.nanoTime();
230 >                assertFalse(q.offer(new Object(), timeoutMillis(), MILLISECONDS));
231 >                assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
232 >                pleaseInterrupt.countDown();
233 >                try {
234 >                    q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS);
235 >                    shouldThrow();
236 >                } catch (InterruptedException success) {}
237 >            }});
238  
239 <    /**
240 <     * take blocks interruptibly when empty
241 <     */
242 <    public void testFairTakeFromEmpty() {
356 <        final SynchronousQueue q = new SynchronousQueue(true);
357 <        Thread t = new Thread(new Runnable() {
358 <                public void run() {
359 <                    try {
360 <                        q.take();
361 <                        threadShouldThrow();
362 <                    } catch (InterruptedException success){ }
363 <                }
364 <            });
365 <        try {
366 <            t.start();
367 <            Thread.sleep(SHORT_DELAY_MS);
368 <            t.interrupt();
369 <            t.join();
370 <        } catch (Exception e){
371 <            unexpectedException();
372 <        }
239 >        await(pleaseInterrupt);
240 >        assertThreadStaysAlive(t);
241 >        t.interrupt();
242 >        awaitTermination(t);
243      }
244  
245      /**
246 <     * poll fails unless active taker
246 >     * poll return null if no active putter
247       */
248      public void testPoll() {
249          SynchronousQueue q = new SynchronousQueue();
250 <        assertNull(q.poll());
250 >        assertNull(q.poll());
251      }
252  
253      /**
254 <     * timed pool with zero timeout times out if no active taker
254 >     * timed poll with zero timeout times out if no active putter
255       */
256 <    public void testTimedPoll0() {
257 <        try {
258 <            SynchronousQueue q = new SynchronousQueue();
389 <            assertNull(q.poll(0, TimeUnit.MILLISECONDS));
390 <        } catch (InterruptedException e){
391 <            unexpectedException();
392 <        }
256 >    public void testTimedPoll0() throws InterruptedException {
257 >        SynchronousQueue q = new SynchronousQueue();
258 >        assertNull(q.poll(0, MILLISECONDS));
259      }
260  
261      /**
262 <     * timed pool with nonzero timeout times out if no active taker
262 >     * timed poll with nonzero timeout times out if no active putter
263       */
264 <    public void testTimedPoll() {
265 <        try {
266 <            SynchronousQueue q = new SynchronousQueue();
267 <            assertNull(q.poll(SHORT_DELAY_MS, TimeUnit.MILLISECONDS));
268 <        } catch (InterruptedException e){
403 <            unexpectedException();
404 <        }
405 <    }
406 <
407 <    /**
408 <     * Interrupted timed poll throws InterruptedException instead of
409 <     * returning timeout status
410 <     */
411 <    public void testInterruptedTimedPoll() {
412 <        Thread t = new Thread(new Runnable() {
413 <                public void run() {
414 <                    try {
415 <                        SynchronousQueue q = new SynchronousQueue();
416 <                        assertNull(q.poll(SHORT_DELAY_MS, TimeUnit.MILLISECONDS));
417 <                    } catch (InterruptedException success){
418 <                    }
419 <                }});
420 <        t.start();
421 <        try {
422 <           Thread.sleep(SHORT_DELAY_MS);
423 <           t.interrupt();
424 <           t.join();
425 <        }
426 <        catch (InterruptedException ie) {
427 <            unexpectedException();
428 <        }
264 >    public void testTimedPoll() throws InterruptedException {
265 >        SynchronousQueue q = new SynchronousQueue();
266 >        long startTime = System.nanoTime();
267 >        assertNull(q.poll(timeoutMillis(), MILLISECONDS));
268 >        assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
269      }
270  
271      /**
272 <     *  timed poll before a delayed offer fails; after offer succeeds;
273 <     *  on interruption throws
272 >     * timed poll before a delayed offer times out, returning null;
273 >     * after offer succeeds; on interruption throws
274       */
275 <    public void testTimedPollWithOffer() {
276 <        final SynchronousQueue q = new SynchronousQueue();
277 <        Thread t = new Thread(new Runnable() {
278 <                public void run() {
279 <                    try {
280 <                        threadAssertNull(q.poll(SHORT_DELAY_MS, TimeUnit.MILLISECONDS));
281 <                        q.poll(LONG_DELAY_MS, TimeUnit.MILLISECONDS);
282 <                        q.poll(LONG_DELAY_MS, TimeUnit.MILLISECONDS);
283 <                        threadShouldThrow();
284 <                    } catch (InterruptedException success) { }
285 <                }
286 <            });
287 <        try {
448 <            t.start();
449 <            Thread.sleep(SMALL_DELAY_MS);
450 <            assertTrue(q.offer(zero, SHORT_DELAY_MS, TimeUnit.MILLISECONDS));
451 <            t.interrupt();
452 <            t.join();
453 <        } catch (Exception e){
454 <            unexpectedException();
455 <        }
456 <    }
275 >    public void testFairTimedPollWithOffer() throws InterruptedException {
276 >        final SynchronousQueue q = new SynchronousQueue(true);
277 >        final CountDownLatch pleaseOffer = new CountDownLatch(1);
278 >        Thread t = newStartedThread(new CheckedRunnable() {
279 >            public void realRun() throws InterruptedException {
280 >                long t0 = System.nanoTime();
281 >                assertNull(q.poll(SHORT_DELAY_MS, MILLISECONDS));
282 >                assertTrue(millisElapsedSince(t0) >= SHORT_DELAY_MS);
283 >
284 >                pleaseOffer.countDown();
285 >                t0 = System.nanoTime();
286 >                assertSame(zero, q.poll(LONG_DELAY_MS, MILLISECONDS));
287 >                assertTrue(millisElapsedSince(t0) < MEDIUM_DELAY_MS);
288  
289 <    /**
290 <     * Interrupted timed poll throws InterruptedException instead of
291 <     * returning timeout status
292 <     */
293 <    public void testFairInterruptedTimedPoll() {
294 <        Thread t = new Thread(new Runnable() {
295 <                public void run() {
465 <                    try {
466 <                        SynchronousQueue q = new SynchronousQueue(true);
467 <                        assertNull(q.poll(SHORT_DELAY_MS, TimeUnit.MILLISECONDS));
468 <                    } catch (InterruptedException success){
469 <                    }
470 <                }});
471 <        t.start();
472 <        try {
473 <           Thread.sleep(SHORT_DELAY_MS);
474 <           t.interrupt();
475 <           t.join();
476 <        }
477 <        catch (InterruptedException ie) {
478 <            unexpectedException();
479 <        }
480 <    }
289 >                t0 = System.nanoTime();
290 >                try {
291 >                    q.poll(LONG_DELAY_MS, MILLISECONDS);
292 >                    shouldThrow();
293 >                } catch (InterruptedException success) {}
294 >                assertTrue(millisElapsedSince(t0) < MEDIUM_DELAY_MS);
295 >            }});
296  
297 <    /**
298 <     *  timed poll before a delayed offer fails; after offer succeeds;
299 <     *  on interruption throws
300 <     */
486 <    public void testFairTimedPollWithOffer() {
487 <        final SynchronousQueue q = new SynchronousQueue(true);
488 <        Thread t = new Thread(new Runnable() {
489 <                public void run() {
490 <                    try {
491 <                        threadAssertNull(q.poll(SHORT_DELAY_MS, TimeUnit.MILLISECONDS));
492 <                        q.poll(LONG_DELAY_MS, TimeUnit.MILLISECONDS);
493 <                        q.poll(LONG_DELAY_MS, TimeUnit.MILLISECONDS);
494 <                        threadShouldThrow();
495 <                    } catch (InterruptedException success) { }
496 <                }
497 <            });
498 <        try {
499 <            t.start();
500 <            Thread.sleep(SMALL_DELAY_MS);
501 <            assertTrue(q.offer(zero, SHORT_DELAY_MS, TimeUnit.MILLISECONDS));
502 <            t.interrupt();
503 <            t.join();
504 <        } catch (Exception e){
505 <            unexpectedException();
506 <        }
507 <    }
297 >        assertTrue(pleaseOffer.await(MEDIUM_DELAY_MS, MILLISECONDS));
298 >        long t0 = System.nanoTime();
299 >        assertTrue(q.offer(zero, LONG_DELAY_MS, MILLISECONDS));
300 >        assertTrue(millisElapsedSince(t0) < MEDIUM_DELAY_MS);
301  
302 +        t.interrupt();
303 +        awaitTermination(t, MEDIUM_DELAY_MS);
304 +    }
305  
306      /**
307 <     * peek returns null
307 >     * peek() returns null if no active putter
308       */
309      public void testPeek() {
310          SynchronousQueue q = new SynchronousQueue();
311 <        assertNull(q.peek());
311 >        assertNull(q.peek());
312      }
313  
314      /**
315 <     * element throws NSEE
315 >     * element() throws NSEE if no active putter
316       */
317      public void testElement() {
318          SynchronousQueue q = new SynchronousQueue();
319          try {
320              q.element();
321              shouldThrow();
322 <        }
527 <        catch (NoSuchElementException success) {}
322 >        } catch (NoSuchElementException success) {}
323      }
324  
325      /**
326 <     * remove throws NSEE if no active taker
326 >     * remove() throws NSEE if no active putter
327       */
328      public void testRemove() {
329          SynchronousQueue q = new SynchronousQueue();
330          try {
331              q.remove();
332              shouldThrow();
333 <        } catch (NoSuchElementException success){
539 <        }
333 >        } catch (NoSuchElementException success) {}
334      }
335  
336      /**
# Line 598 | Line 392 | public class SynchronousQueueTest extend
392          assertFalse(q.containsAll(Arrays.asList(ints)));
393      }
394  
601
395      /**
396       * toArray is empty
397       */
398      public void testToArray() {
399          SynchronousQueue q = new SynchronousQueue();
400 <        Object[] o = q.toArray();
400 >        Object[] o = q.toArray();
401          assertEquals(o.length, 0);
402      }
403  
# Line 613 | Line 406 | public class SynchronousQueueTest extend
406       */
407      public void testToArray2() {
408          SynchronousQueue q = new SynchronousQueue();
409 <        Integer[] ints = new Integer[1];
409 >        Integer[] ints = new Integer[1];
410          assertNull(ints[0]);
411      }
412  
# Line 621 | Line 414 | public class SynchronousQueueTest extend
414       * toArray(null) throws NPE
415       */
416      public void testToArray_BadArg() {
417 <        try {
418 <            SynchronousQueue q = new SynchronousQueue();
419 <            Object o[] = q.toArray(null);
420 <            shouldThrow();
421 <        } catch (NullPointerException success){}
417 >        SynchronousQueue q = new SynchronousQueue();
418 >        try {
419 >            Object o[] = q.toArray(null);
420 >            shouldThrow();
421 >        } catch (NullPointerException success) {}
422      }
423  
631
424      /**
425       * iterator does not traverse any elements
426       */
427      public void testIterator() {
428          SynchronousQueue q = new SynchronousQueue();
429 <        Iterator it = q.iterator();
429 >        Iterator it = q.iterator();
430          assertFalse(it.hasNext());
431          try {
432              Object x = it.next();
433              shouldThrow();
434 <        }
643 <        catch (NoSuchElementException success) {}
434 >        } catch (NoSuchElementException success) {}
435      }
436  
437      /**
# Line 648 | Line 439 | public class SynchronousQueueTest extend
439       */
440      public void testIteratorRemove() {
441          SynchronousQueue q = new SynchronousQueue();
442 <        Iterator it = q.iterator();
442 >        Iterator it = q.iterator();
443          try {
444              it.remove();
445              shouldThrow();
446 <        }
656 <        catch (IllegalStateException success) {}
446 >        } catch (IllegalStateException success) {}
447      }
448  
449      /**
# Line 665 | Line 455 | public class SynchronousQueueTest extend
455          assertNotNull(s);
456      }
457  
668
458      /**
459       * offer transfers elements across Executor tasks
460       */
461      public void testOfferInExecutor() {
462          final SynchronousQueue q = new SynchronousQueue();
463          ExecutorService executor = Executors.newFixedThreadPool(2);
464 <        final Integer one = new Integer(1);
676 <
677 <        executor.execute(new Runnable() {
678 <            public void run() {
679 <                threadAssertFalse(q.offer(one));
680 <                try {
681 <                    threadAssertTrue(q.offer(one, MEDIUM_DELAY_MS, TimeUnit.MILLISECONDS));
682 <                    threadAssertEquals(0, q.remainingCapacity());
683 <                }
684 <                catch (InterruptedException e) {
685 <                    threadUnexpectedException();
686 <                }
687 <            }
688 <        });
464 >        final CheckedBarrier threadsStarted = new CheckedBarrier(2);
465  
466 <        executor.execute(new Runnable() {
467 <            public void run() {
468 <                try {
469 <                    Thread.sleep(SMALL_DELAY_MS);
470 <                    threadAssertEquals(one, q.take());
471 <                }
472 <                catch (InterruptedException e) {
473 <                    threadUnexpectedException();
474 <                }
475 <            }
476 <        });
466 >        executor.execute(new CheckedRunnable() {
467 >            public void realRun() throws InterruptedException {
468 >                assertFalse(q.offer(one));
469 >                threadsStarted.await();
470 >                assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
471 >                assertEquals(0, q.remainingCapacity());
472 >            }});
473 >
474 >        executor.execute(new CheckedRunnable() {
475 >            public void realRun() throws InterruptedException {
476 >                threadsStarted.await();
477 >                assertSame(one, q.take());
478 >            }});
479  
480          joinPool(executor);
703
481      }
482  
483      /**
484 <     * poll retrieves elements across Executor threads
484 >     * timed poll retrieves elements across Executor threads
485       */
486      public void testPollInExecutor() {
487          final SynchronousQueue q = new SynchronousQueue();
488 +        final CheckedBarrier threadsStarted = new CheckedBarrier(2);
489          ExecutorService executor = Executors.newFixedThreadPool(2);
490 <        executor.execute(new Runnable() {
491 <            public void run() {
492 <                threadAssertNull(q.poll());
493 <                try {
494 <                    threadAssertTrue(null != q.poll(MEDIUM_DELAY_MS, TimeUnit.MILLISECONDS));
495 <                    threadAssertTrue(q.isEmpty());
496 <                }
497 <                catch (InterruptedException e) {
498 <                    threadUnexpectedException();
499 <                }
500 <            }
501 <        });
502 <
725 <        executor.execute(new Runnable() {
726 <            public void run() {
727 <                try {
728 <                    Thread.sleep(SMALL_DELAY_MS);
729 <                    q.put(new Integer(1));
730 <                }
731 <                catch (InterruptedException e) {
732 <                    threadUnexpectedException();
733 <                }
734 <            }
735 <        });
490 >        executor.execute(new CheckedRunnable() {
491 >            public void realRun() throws InterruptedException {
492 >                assertNull(q.poll());
493 >                threadsStarted.await();
494 >                assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
495 >                assertTrue(q.isEmpty());
496 >            }});
497 >
498 >        executor.execute(new CheckedRunnable() {
499 >            public void realRun() throws InterruptedException {
500 >                threadsStarted.await();
501 >                q.put(one);
502 >            }});
503  
504          joinPool(executor);
505      }
# Line 740 | Line 507 | public class SynchronousQueueTest extend
507      /**
508       * a deserialized serialized queue is usable
509       */
510 <    public void testSerialization() {
510 >    public void testSerialization() throws Exception {
511          SynchronousQueue q = new SynchronousQueue();
512 <        try {
513 <            ByteArrayOutputStream bout = new ByteArrayOutputStream(10000);
514 <            ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(bout));
515 <            out.writeObject(q);
516 <            out.close();
517 <
518 <            ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
519 <            ObjectInputStream in = new ObjectInputStream(new BufferedInputStream(bin));
520 <            SynchronousQueue r = (SynchronousQueue)in.readObject();
521 <            assertEquals(q.size(), r.size());
522 <            while (!q.isEmpty())
756 <                assertEquals(q.remove(), r.remove());
757 <        } catch (Exception e){
758 <            e.printStackTrace();
759 <            unexpectedException();
760 <        }
512 >        ByteArrayOutputStream bout = new ByteArrayOutputStream(10000);
513 >        ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(bout));
514 >        out.writeObject(q);
515 >        out.close();
516 >
517 >        ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
518 >        ObjectInputStream in = new ObjectInputStream(new BufferedInputStream(bin));
519 >        SynchronousQueue r = (SynchronousQueue)in.readObject();
520 >        assertEquals(q.size(), r.size());
521 >        while (!q.isEmpty())
522 >            assertEquals(q.remove(), r.remove());
523      }
524  
525      /**
# Line 768 | Line 530 | public class SynchronousQueueTest extend
530          try {
531              q.drainTo(null);
532              shouldThrow();
533 <        } catch (NullPointerException success) {
772 <        }
533 >        } catch (NullPointerException success) {}
534      }
535  
536      /**
# Line 780 | Line 541 | public class SynchronousQueueTest extend
541          try {
542              q.drainTo(q);
543              shouldThrow();
544 <        } catch (IllegalArgumentException success) {
784 <        }
544 >        } catch (IllegalArgumentException success) {}
545      }
546  
547      /**
# Line 798 | Line 558 | public class SynchronousQueueTest extend
558      /**
559       * drainTo empties queue, unblocking a waiting put.
560       */
561 <    public void testDrainToWithActivePut() {
561 >    public void testDrainToWithActivePut() throws InterruptedException {
562          final SynchronousQueue q = new SynchronousQueue();
563 <        Thread t = new Thread(new Runnable() {
564 <                public void run() {
565 <                    try {
566 <                        q.put(new Integer(1));
567 <                    } catch (InterruptedException ie){
568 <                        threadUnexpectedException();
569 <                    }
570 <                }
811 <            });
812 <        try {
813 <            t.start();
814 <            ArrayList l = new ArrayList();
815 <            Thread.sleep(SHORT_DELAY_MS);
563 >        Thread t = newStartedThread(new CheckedRunnable() {
564 >            public void realRun() throws InterruptedException {
565 >                q.put(one);
566 >            }});
567 >
568 >        ArrayList l = new ArrayList();
569 >        long startTime = System.nanoTime();
570 >        while (l.isEmpty()) {
571              q.drainTo(l);
572 <            assertTrue(l.size() <= 1);
573 <            if (l.size() > 0)
574 <                assertEquals(l.get(0), new Integer(1));
575 <            t.join();
576 <            assertTrue(l.size() <= 1);
577 <        } catch (Exception e){
578 <            unexpectedException();
824 <        }
572 >            if (millisElapsedSince(startTime) > LONG_DELAY_MS)
573 >                fail("timed out");
574 >            Thread.yield();
575 >        }
576 >        assertTrue(l.size() == 1);
577 >        assertSame(one, l.get(0));
578 >        awaitTermination(t);
579      }
580  
581      /**
# Line 832 | Line 586 | public class SynchronousQueueTest extend
586          try {
587              q.drainTo(null, 0);
588              shouldThrow();
589 <        } catch (NullPointerException success) {
836 <        }
589 >        } catch (NullPointerException success) {}
590      }
591  
592      /**
# Line 844 | Line 597 | public class SynchronousQueueTest extend
597          try {
598              q.drainTo(q, 0);
599              shouldThrow();
600 <        } catch (IllegalArgumentException success) {
848 <        }
600 >        } catch (IllegalArgumentException success) {}
601      }
602  
603      /**
604       * drainTo(c, n) empties up to n elements of queue into c
605       */
606 <    public void testDrainToN() {
606 >    public void testDrainToN() throws InterruptedException {
607          final SynchronousQueue q = new SynchronousQueue();
608 <        Thread t1 = new Thread(new Runnable() {
609 <                public void run() {
610 <                    try {
611 <                        q.put(one);
612 <                    } catch (InterruptedException ie){
613 <                        threadUnexpectedException();
614 <                    }
615 <                }
616 <            });
865 <        Thread t2 = new Thread(new Runnable() {
866 <                public void run() {
867 <                    try {
868 <                        q.put(two);
869 <                    } catch (InterruptedException ie){
870 <                        threadUnexpectedException();
871 <                    }
872 <                }
873 <            });
874 <
875 <        try {
876 <            t1.start();
877 <            t2.start();
878 <            ArrayList l = new ArrayList();
879 <            Thread.sleep(SHORT_DELAY_MS);
880 <            q.drainTo(l, 1);
881 <            assertTrue(l.size() == 1);
882 <            q.drainTo(l, 1);
883 <            assertTrue(l.size() == 2);
884 <            assertTrue(l.contains(one));
885 <            assertTrue(l.contains(two));
886 <            t1.join();
887 <            t2.join();
888 <        } catch (Exception e){
889 <            unexpectedException();
890 <        }
891 <    }
608 >        Thread t1 = newStartedThread(new CheckedRunnable() {
609 >            public void realRun() throws InterruptedException {
610 >                q.put(one);
611 >            }});
612 >
613 >        Thread t2 = newStartedThread(new CheckedRunnable() {
614 >            public void realRun() throws InterruptedException {
615 >                q.put(two);
616 >            }});
617  
618 +        ArrayList l = new ArrayList();
619 +        delay(SHORT_DELAY_MS);
620 +        q.drainTo(l, 1);
621 +        assertEquals(1, l.size());
622 +        q.drainTo(l, 1);
623 +        assertEquals(2, l.size());
624 +        assertTrue(l.contains(one));
625 +        assertTrue(l.contains(two));
626 +        awaitTermination(t1);
627 +        awaitTermination(t2);
628 +    }
629  
630   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines