--- jsr166/src/test/tck/SynchronousQueueTest.java 2011/05/21 06:24:33 1.34 +++ jsr166/src/test/tck/SynchronousQueueTest.java 2011/05/27 20:07:24 1.35 @@ -39,7 +39,10 @@ public class SynchronousQueueTest extend /** * Any SynchronousQueue is both empty and full */ - public void testEmptyFull(SynchronousQueue q) { + public void testEmptyFull() { testEmptyFull(false); } + public void testEmptyFull_fair() { testEmptyFull(true); } + public void testEmptyFull(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); assertTrue(q.isEmpty()); assertEquals(0, q.size()); assertEquals(0, q.remainingCapacity()); @@ -47,20 +50,6 @@ public class SynchronousQueueTest extend } /** - * A non-fair SynchronousQueue is both empty and full - */ - public void testEmptyFull() { - testEmptyFull(new SynchronousQueue()); - } - - /** - * A fair SynchronousQueue is both empty and full - */ - public void testFairEmptyFull() { - testEmptyFull(new SynchronousQueue(true)); - } - - /** * offer(null) throws NPE */ public void testOfferNull() { @@ -164,50 +153,76 @@ public class SynchronousQueueTest extend /** * put blocks interruptibly if no active taker */ - public void testBlockingPut() throws InterruptedException { - Thread t = new Thread(new CheckedInterruptedRunnable() { + public void testBlockingPut() { testBlockingPut(false); } + public void testBlockingPut_fair() { testBlockingPut(true); } + public void testBlockingPut(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); + final CountDownLatch pleaseInterrupt = new CountDownLatch(1); + Thread t = newStartedThread(new CheckedRunnable() { public void realRun() throws InterruptedException { - SynchronousQueue q = new SynchronousQueue(); - q.put(zero); + Thread.currentThread().interrupt(); + try { + q.put(99); + shouldThrow(); + } catch (InterruptedException success) {} + assertFalse(Thread.interrupted()); + + pleaseInterrupt.countDown(); + try { + q.put(99); + shouldThrow(); + } catch (InterruptedException success) {} + assertFalse(Thread.interrupted()); }}); - t.start(); - delay(SHORT_DELAY_MS); + await(pleaseInterrupt); + assertThreadStaysAlive(t); t.interrupt(); - t.join(); + awaitTermination(t); + assertEquals(0, q.remainingCapacity()); } /** - * put blocks waiting for take + * put blocks interruptibly waiting for take */ - public void testPutWithTake() throws InterruptedException { - final SynchronousQueue q = new SynchronousQueue(); - Thread t = new Thread(new CheckedRunnable() { + public void testPutWithTake() { testPutWithTake(false); } + public void testPutWithTake_fair() { testPutWithTake(true); } + public void testPutWithTake(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); + final CountDownLatch pleaseTake = new CountDownLatch(1); + final CountDownLatch pleaseInterrupt = new CountDownLatch(1); + Thread t = newStartedThread(new CheckedRunnable() { public void realRun() throws InterruptedException { - int added = 0; + pleaseTake.countDown(); + q.put(one); + + pleaseInterrupt.countDown(); try { - while (true) { - q.put(added); - ++added; - } - } catch (InterruptedException success) { - assertEquals(1, added); - } + q.put(99); + shouldThrow(); + } catch (InterruptedException success) {} + assertFalse(Thread.interrupted()); }}); - t.start(); - delay(SHORT_DELAY_MS); - assertEquals(0, q.take()); - delay(SHORT_DELAY_MS); + await(pleaseTake); + assertEquals(q.remainingCapacity(), 0); + try { assertSame(one, q.take()); } + catch (InterruptedException e) { threadUnexpectedException(e); } + + await(pleaseInterrupt); + assertThreadStaysAlive(t); t.interrupt(); - t.join(); + awaitTermination(t); + assertEquals(q.remainingCapacity(), 0); } /** * timed offer times out if elements not taken */ - public void testTimedOffer(final SynchronousQueue q) - throws InterruptedException { + public void testTimedOffer() { testTimedOffer(false); } + public void testTimedOffer_fair() { testTimedOffer(true); } + public void testTimedOffer(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); final CountDownLatch pleaseInterrupt = new CountDownLatch(1); Thread t = newStartedThread(new CheckedRunnable() { public void realRun() throws InterruptedException { @@ -222,83 +237,12 @@ public class SynchronousQueueTest extend }}); await(pleaseInterrupt); + assertThreadStaysAlive(t); t.interrupt(); awaitTermination(t); } /** - * timed offer times out if elements not taken - */ - public void testTimedOffer() throws InterruptedException { - testTimedOffer(new SynchronousQueue()); - } - - /** - * timed offer times out if elements not taken - */ - public void testFairTimedOffer() throws InterruptedException { - testTimedOffer(new SynchronousQueue(true)); - } - - /** - * put blocks interruptibly if no active taker - */ - public void testFairBlockingPut() throws InterruptedException { - Thread t = new Thread(new CheckedInterruptedRunnable() { - public void realRun() throws InterruptedException { - SynchronousQueue q = new SynchronousQueue(true); - q.put(zero); - }}); - - t.start(); - delay(SHORT_DELAY_MS); - t.interrupt(); - t.join(); - } - - /** - * put blocks waiting for take - */ - public void testFairPutWithTake() throws InterruptedException { - final SynchronousQueue q = new SynchronousQueue(true); - Thread t = new Thread(new CheckedRunnable() { - public void realRun() throws InterruptedException { - int added = 0; - try { - while (true) { - q.put(added); - ++added; - } - } catch (InterruptedException success) { - assertEquals(1, added); - } - }}); - - t.start(); - delay(SHORT_DELAY_MS); - assertEquals(0, q.take()); - delay(SHORT_DELAY_MS); - t.interrupt(); - t.join(); - } - - /** - * take blocks interruptibly when empty - */ - public void testFairTakeFromEmpty() throws InterruptedException { - final SynchronousQueue q = new SynchronousQueue(true); - Thread t = new Thread(new CheckedInterruptedRunnable() { - public void realRun() throws InterruptedException { - q.take(); - }}); - - t.start(); - delay(SHORT_DELAY_MS); - t.interrupt(); - t.join(); - } - - /** * poll return null if no active putter */ public void testPoll() { @@ -319,50 +263,9 @@ public class SynchronousQueueTest extend */ public void testTimedPoll() throws InterruptedException { SynchronousQueue q = new SynchronousQueue(); - long t0 = System.nanoTime(); - assertNull(q.poll(SHORT_DELAY_MS, MILLISECONDS)); - assertTrue(millisElapsedSince(t0) >= SHORT_DELAY_MS); - } - - /** - * Interrupted timed poll throws InterruptedException instead of - * returning timeout status - */ - public void testInterruptedTimedPoll(final SynchronousQueue q) - throws InterruptedException { - final CountDownLatch threadStarted = new CountDownLatch(1); - Thread t = newStartedThread(new CheckedRunnable() { - public void realRun() throws InterruptedException { - long t0 = System.nanoTime(); - threadStarted.countDown(); - try { - q.poll(LONG_DELAY_MS, MILLISECONDS); - shouldThrow(); - } catch (InterruptedException success) {} - assertTrue(millisElapsedSince(t0) >= SHORT_DELAY_MS); - assertTrue(millisElapsedSince(t0) < MEDIUM_DELAY_MS); - }}); - - threadStarted.await(); - delay(SHORT_DELAY_MS); - t.interrupt(); - awaitTermination(t, MEDIUM_DELAY_MS); - } - - /** - * Interrupted timed poll throws InterruptedException instead of - * returning timeout status - */ - public void testInterruptedTimedPoll() throws InterruptedException { - testInterruptedTimedPoll(new SynchronousQueue()); - } - - /** - * Interrupted timed poll throws InterruptedException instead of - * returning timeout status - */ - public void testFairInterruptedTimedPoll() throws InterruptedException { - testInterruptedTimedPoll(new SynchronousQueue(true)); + long startTime = System.nanoTime(); + assertNull(q.poll(timeoutMillis(), MILLISECONDS)); + assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); } /** @@ -489,7 +392,6 @@ public class SynchronousQueueTest extend assertFalse(q.containsAll(Arrays.asList(ints))); } - /** * toArray is empty */ @@ -519,7 +421,6 @@ public class SynchronousQueueTest extend } catch (NullPointerException success) {} } - /** * iterator does not traverse any elements */ @@ -554,24 +455,25 @@ public class SynchronousQueueTest extend assertNotNull(s); } - /** * offer transfers elements across Executor tasks */ public void testOfferInExecutor() { final SynchronousQueue q = new SynchronousQueue(); ExecutorService executor = Executors.newFixedThreadPool(2); + final CheckedBarrier threadsStarted = new CheckedBarrier(2); executor.execute(new CheckedRunnable() { public void realRun() throws InterruptedException { assertFalse(q.offer(one)); - assertTrue(q.offer(one, MEDIUM_DELAY_MS, MILLISECONDS)); + threadsStarted.await(); + assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS)); assertEquals(0, q.remainingCapacity()); }}); executor.execute(new CheckedRunnable() { public void realRun() throws InterruptedException { - delay(SMALL_DELAY_MS); + threadsStarted.await(); assertSame(one, q.take()); }}); @@ -579,21 +481,23 @@ public class SynchronousQueueTest extend } /** - * poll retrieves elements across Executor threads + * timed poll retrieves elements across Executor threads */ public void testPollInExecutor() { final SynchronousQueue q = new SynchronousQueue(); + final CheckedBarrier threadsStarted = new CheckedBarrier(2); ExecutorService executor = Executors.newFixedThreadPool(2); executor.execute(new CheckedRunnable() { public void realRun() throws InterruptedException { assertNull(q.poll()); - assertSame(one, q.poll(MEDIUM_DELAY_MS, MILLISECONDS)); + threadsStarted.await(); + assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS)); assertTrue(q.isEmpty()); }}); executor.execute(new CheckedRunnable() { public void realRun() throws InterruptedException { - delay(SHORT_DELAY_MS); + threadsStarted.await(); q.put(one); }}); @@ -656,20 +560,22 @@ public class SynchronousQueueTest extend */ public void testDrainToWithActivePut() throws InterruptedException { final SynchronousQueue q = new SynchronousQueue(); - Thread t = new Thread(new CheckedRunnable() { + Thread t = newStartedThread(new CheckedRunnable() { public void realRun() throws InterruptedException { - q.put(new Integer(1)); + q.put(one); }}); - t.start(); ArrayList l = new ArrayList(); - delay(SHORT_DELAY_MS); - q.drainTo(l); - assertTrue(l.size() <= 1); - if (l.size() > 0) - assertEquals(l.get(0), new Integer(1)); - t.join(); - assertTrue(l.size() <= 1); + long startTime = System.nanoTime(); + while (l.isEmpty()) { + q.drainTo(l); + if (millisElapsedSince(startTime) > LONG_DELAY_MS) + fail("timed out"); + Thread.yield(); + } + assertTrue(l.size() == 1); + assertSame(one, l.get(0)); + awaitTermination(t); } /** @@ -699,18 +605,16 @@ public class SynchronousQueueTest extend */ public void testDrainToN() throws InterruptedException { final SynchronousQueue q = new SynchronousQueue(); - Thread t1 = new Thread(new CheckedRunnable() { + Thread t1 = newStartedThread(new CheckedRunnable() { public void realRun() throws InterruptedException { q.put(one); }}); - Thread t2 = new Thread(new CheckedRunnable() { + Thread t2 = newStartedThread(new CheckedRunnable() { public void realRun() throws InterruptedException { q.put(two); }}); - t1.start(); - t2.start(); ArrayList l = new ArrayList(); delay(SHORT_DELAY_MS); q.drainTo(l, 1); @@ -719,8 +623,8 @@ public class SynchronousQueueTest extend assertEquals(2, l.size()); assertTrue(l.contains(one)); assertTrue(l.contains(two)); - t1.join(); - t2.join(); + awaitTermination(t1); + awaitTermination(t2); } }