--- jsr166/src/test/tck/SynchronousQueueTest.java 2009/11/16 04:57:10 1.11 +++ jsr166/src/test/tck/SynchronousQueueTest.java 2011/05/30 22:43:20 1.37 @@ -1,549 +1,333 @@ /* * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/licenses/publicdomain + * http://creativecommons.org/publicdomain/zero/1.0/ * Other contributors include Andrew Wright, Jeffrey Hayes, * Pat Fisher, Mike Judd. */ import junit.framework.*; -import java.util.*; -import java.util.concurrent.*; +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import java.io.*; public class SynchronousQueueTest extends JSR166TestCase { - public static void main(String[] args) { - junit.textui.TestRunner.run (suite()); + public static class Fair extends BlockingQueueTest { + protected BlockingQueue emptyCollection() { + return new SynchronousQueue(true); + } } - public static Test suite() { - return new TestSuite(SynchronousQueueTest.class); + public static class NonFair extends BlockingQueueTest { + protected BlockingQueue emptyCollection() { + return new SynchronousQueue(false); + } } - /** - * A SynchronousQueue is both empty and full - */ - public void testEmptyFull() { - SynchronousQueue q = new SynchronousQueue(); - assertTrue(q.isEmpty()); - assertEquals(0, q.size()); - assertEquals(0, q.remainingCapacity()); - assertFalse(q.offer(zero)); + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + + public static Test suite() { + return newTestSuite(SynchronousQueueTest.class, + new Fair().testSuite(), + new NonFair().testSuite()); } /** - * A fair SynchronousQueue is both empty and full + * Any SynchronousQueue is both empty and full */ - public void testFairEmptyFull() { - SynchronousQueue q = new SynchronousQueue(true); + public void testEmptyFull() { testEmptyFull(false); } + public void testEmptyFull_fair() { testEmptyFull(true); } + public void testEmptyFull(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); assertTrue(q.isEmpty()); - assertEquals(0, q.size()); + assertEquals(0, q.size()); assertEquals(0, q.remainingCapacity()); assertFalse(q.offer(zero)); } /** - * offer(null) throws NPE - */ - public void testOfferNull() { - try { - SynchronousQueue q = new SynchronousQueue(); - q.offer(null); - shouldThrow(); - } catch (NullPointerException success) { } - } - - /** - * add(null) throws NPE - */ - public void testAddNull() { - try { - SynchronousQueue q = new SynchronousQueue(); - q.add(null); - shouldThrow(); - } catch (NullPointerException success) { } - } - - /** * offer fails if no active taker */ - public void testOffer() { - SynchronousQueue q = new SynchronousQueue(); + public void testOffer() { testOffer(false); } + public void testOffer_fair() { testOffer(true); } + public void testOffer(boolean fair) { + SynchronousQueue q = new SynchronousQueue(fair); assertFalse(q.offer(one)); } /** - * add throws ISE if no active taker + * add throws IllegalStateException if no active taker */ - public void testAdd() { - try { - SynchronousQueue q = new SynchronousQueue(); - assertEquals(0, q.remainingCapacity()); - q.add(one); - shouldThrow(); - } catch (IllegalStateException success){ - } - } - - /** - * addAll(null) throws NPE - */ - public void testAddAll1() { + public void testAdd() { testAdd(false); } + public void testAdd_fair() { testAdd(true); } + public void testAdd(boolean fair) { + SynchronousQueue q = new SynchronousQueue(fair); + assertEquals(0, q.remainingCapacity()); try { - SynchronousQueue q = new SynchronousQueue(); - q.addAll(null); + q.add(one); shouldThrow(); - } - catch (NullPointerException success) {} + } catch (IllegalStateException success) {} } /** - * addAll(this) throws IAE + * addAll(this) throws IllegalArgumentException */ - public void testAddAllSelf() { + public void testAddAll_self() { testAddAll_self(false); } + public void testAddAll_self_fair() { testAddAll_self(true); } + public void testAddAll_self(boolean fair) { + SynchronousQueue q = new SynchronousQueue(fair); try { - SynchronousQueue q = new SynchronousQueue(); q.addAll(q); shouldThrow(); - } - catch (IllegalArgumentException success) {} + } catch (IllegalArgumentException success) {} } /** - * addAll of a collection with null elements throws NPE - */ - public void testAddAll2() { - try { - SynchronousQueue q = new SynchronousQueue(); - Integer[] ints = new Integer[1]; - q.addAll(Arrays.asList(ints)); - shouldThrow(); - } - catch (NullPointerException success) {} - } - /** * addAll throws ISE if no active taker */ - public void testAddAll4() { + public void testAddAll_ISE() { testAddAll_ISE(false); } + public void testAddAll_ISE_fair() { testAddAll_ISE(true); } + public void testAddAll_ISE(boolean fair) { + SynchronousQueue q = new SynchronousQueue(fair); + Integer[] ints = new Integer[1]; + for (int i = 0; i < ints.length; i++) + ints[i] = i; + Collection coll = Arrays.asList(ints); try { - SynchronousQueue q = new SynchronousQueue(); - Integer[] ints = new Integer[1]; - for (int i = 0; i < 1; ++i) - ints[i] = new Integer(i); - q.addAll(Arrays.asList(ints)); + q.addAll(coll); shouldThrow(); - } - catch (IllegalStateException success) {} + } catch (IllegalStateException success) {} } /** - * put(null) throws NPE - */ - public void testPutNull() { - try { - SynchronousQueue q = new SynchronousQueue(); - q.put(null); - shouldThrow(); - } - catch (NullPointerException success){ - } - catch (InterruptedException ie) { - unexpectedException(); - } - } - - /** * put blocks interruptibly if no active taker */ - public void testBlockingPut() { - Thread t = new Thread(new Runnable() { - public void run() { - try { - SynchronousQueue q = new SynchronousQueue(); - q.put(zero); - threadShouldThrow(); - } catch (InterruptedException ie){ - } - }}); - t.start(); - try { - Thread.sleep(SHORT_DELAY_MS); - t.interrupt(); - t.join(); - } - catch (InterruptedException ie) { - unexpectedException(); - } - } - - /** - * put blocks waiting for take - */ - public void testPutWithTake() { - final SynchronousQueue q = new SynchronousQueue(); - Thread t = new Thread(new Runnable() { - public void run() { - int added = 0; - try { - q.put(new Object()); - ++added; - q.put(new Object()); - ++added; - q.put(new Object()); - ++added; - q.put(new Object()); - ++added; - threadShouldThrow(); - } catch (InterruptedException e){ - assertTrue(added >= 1); - } - } - }); - try { - t.start(); - Thread.sleep(SHORT_DELAY_MS); - q.take(); - Thread.sleep(SHORT_DELAY_MS); - t.interrupt(); - t.join(); - } catch (Exception e){ - unexpectedException(); - } - } - - /** - * timed offer times out if elements not taken - */ - public void testTimedOffer() { - final SynchronousQueue q = new SynchronousQueue(); - Thread t = new Thread(new Runnable() { - public void run() { - try { - - threadAssertFalse(q.offer(new Object(), SHORT_DELAY_MS, TimeUnit.MILLISECONDS)); - q.offer(new Object(), LONG_DELAY_MS, TimeUnit.MILLISECONDS); - threadShouldThrow(); - } catch (InterruptedException success){} - } - }); - - try { - t.start(); - Thread.sleep(SMALL_DELAY_MS); - t.interrupt(); - t.join(); - } catch (Exception e){ - unexpectedException(); - } - } - + public void testBlockingPut() { testBlockingPut(false); } + public void testBlockingPut_fair() { testBlockingPut(true); } + public void testBlockingPut(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); + final CountDownLatch pleaseInterrupt = new CountDownLatch(1); + Thread t = newStartedThread(new CheckedRunnable() { + public void realRun() throws InterruptedException { + Thread.currentThread().interrupt(); + try { + q.put(99); + shouldThrow(); + } catch (InterruptedException success) {} + assertFalse(Thread.interrupted()); - /** - * take blocks interruptibly when empty - */ - public void testTakeFromEmpty() { - final SynchronousQueue q = new SynchronousQueue(); - Thread t = new Thread(new Runnable() { - public void run() { - try { - q.take(); - threadShouldThrow(); - } catch (InterruptedException success){ } - } - }); - try { - t.start(); - Thread.sleep(SHORT_DELAY_MS); - t.interrupt(); - t.join(); - } catch (Exception e){ - unexpectedException(); - } + pleaseInterrupt.countDown(); + try { + q.put(99); + shouldThrow(); + } catch (InterruptedException success) {} + assertFalse(Thread.interrupted()); + }}); + + await(pleaseInterrupt); + assertThreadStaysAlive(t); + t.interrupt(); + awaitTermination(t); + assertEquals(0, q.remainingCapacity()); } - /** - * put blocks interruptibly if no active taker + * put blocks interruptibly waiting for take */ - public void testFairBlockingPut() { - Thread t = new Thread(new Runnable() { - public void run() { - try { - SynchronousQueue q = new SynchronousQueue(true); - q.put(zero); - threadShouldThrow(); - } catch (InterruptedException ie){ - } - }}); - t.start(); - try { - Thread.sleep(SHORT_DELAY_MS); - t.interrupt(); - t.join(); - } - catch (InterruptedException ie) { - unexpectedException(); - } - } + public void testPutWithTake() { testPutWithTake(false); } + public void testPutWithTake_fair() { testPutWithTake(true); } + public void testPutWithTake(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); + final CountDownLatch pleaseTake = new CountDownLatch(1); + final CountDownLatch pleaseInterrupt = new CountDownLatch(1); + Thread t = newStartedThread(new CheckedRunnable() { + public void realRun() throws InterruptedException { + pleaseTake.countDown(); + q.put(one); - /** - * put blocks waiting for take - */ - public void testFairPutWithTake() { - final SynchronousQueue q = new SynchronousQueue(true); - Thread t = new Thread(new Runnable() { - public void run() { - int added = 0; - try { - q.put(new Object()); - ++added; - q.put(new Object()); - ++added; - q.put(new Object()); - ++added; - q.put(new Object()); - ++added; - threadShouldThrow(); - } catch (InterruptedException e){ - assertTrue(added >= 1); - } - } - }); - try { - t.start(); - Thread.sleep(SHORT_DELAY_MS); - q.take(); - Thread.sleep(SHORT_DELAY_MS); - t.interrupt(); - t.join(); - } catch (Exception e){ - unexpectedException(); - } + pleaseInterrupt.countDown(); + try { + q.put(99); + shouldThrow(); + } catch (InterruptedException success) {} + assertFalse(Thread.interrupted()); + }}); + + await(pleaseTake); + assertEquals(q.remainingCapacity(), 0); + try { assertSame(one, q.take()); } + catch (InterruptedException e) { threadUnexpectedException(e); } + + await(pleaseInterrupt); + assertThreadStaysAlive(t); + t.interrupt(); + awaitTermination(t); + assertEquals(q.remainingCapacity(), 0); } /** * timed offer times out if elements not taken */ - public void testFairTimedOffer() { - final SynchronousQueue q = new SynchronousQueue(true); - Thread t = new Thread(new Runnable() { - public void run() { - try { - - threadAssertFalse(q.offer(new Object(), SHORT_DELAY_MS, TimeUnit.MILLISECONDS)); - q.offer(new Object(), LONG_DELAY_MS, TimeUnit.MILLISECONDS); - threadShouldThrow(); - } catch (InterruptedException success){} - } - }); + public void testTimedOffer() { testTimedOffer(false); } + public void testTimedOffer_fair() { testTimedOffer(true); } + public void testTimedOffer(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); + final CountDownLatch pleaseInterrupt = new CountDownLatch(1); + Thread t = newStartedThread(new CheckedRunnable() { + public void realRun() throws InterruptedException { + long startTime = System.nanoTime(); + assertFalse(q.offer(new Object(), timeoutMillis(), MILLISECONDS)); + assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); + pleaseInterrupt.countDown(); + try { + q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS); + shouldThrow(); + } catch (InterruptedException success) {} + }}); - try { - t.start(); - Thread.sleep(SMALL_DELAY_MS); - t.interrupt(); - t.join(); - } catch (Exception e){ - unexpectedException(); - } + await(pleaseInterrupt); + assertThreadStaysAlive(t); + t.interrupt(); + awaitTermination(t); } - /** - * take blocks interruptibly when empty + * poll return null if no active putter */ - public void testFairTakeFromEmpty() { - final SynchronousQueue q = new SynchronousQueue(true); - Thread t = new Thread(new Runnable() { - public void run() { - try { - q.take(); - threadShouldThrow(); - } catch (InterruptedException success){ } - } - }); - try { - t.start(); - Thread.sleep(SHORT_DELAY_MS); - t.interrupt(); - t.join(); - } catch (Exception e){ - unexpectedException(); - } + public void testPoll() { testPoll(false); } + public void testPoll_fair() { testPoll(true); } + public void testPoll(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); + assertNull(q.poll()); } /** - * poll fails unless active taker + * timed poll with zero timeout times out if no active putter */ - public void testPoll() { - SynchronousQueue q = new SynchronousQueue(); - assertNull(q.poll()); + public void testTimedPoll0() { testTimedPoll0(false); } + public void testTimedPoll0_fair() { testTimedPoll0(true); } + public void testTimedPoll0(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); + try { assertNull(q.poll(0, MILLISECONDS)); } + catch (InterruptedException e) { threadUnexpectedException(e); } } /** - * timed pool with zero timeout times out if no active taker + * timed poll with nonzero timeout times out if no active putter */ - public void testTimedPoll0() { - try { - SynchronousQueue q = new SynchronousQueue(); - assertNull(q.poll(0, TimeUnit.MILLISECONDS)); - } catch (InterruptedException e){ - unexpectedException(); - } + public void testTimedPoll() { testTimedPoll(false); } + public void testTimedPoll_fair() { testTimedPoll(true); } + public void testTimedPoll(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); + long startTime = System.nanoTime(); + try { assertNull(q.poll(timeoutMillis(), MILLISECONDS)); } + catch (InterruptedException e) { threadUnexpectedException(e); } + assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); } /** - * timed pool with nonzero timeout times out if no active taker + * timed poll before a delayed offer times out, returning null; + * after offer succeeds; on interruption throws */ - public void testTimedPoll() { - try { - SynchronousQueue q = new SynchronousQueue(); - assertNull(q.poll(SHORT_DELAY_MS, TimeUnit.MILLISECONDS)); - } catch (InterruptedException e){ - unexpectedException(); - } - } + public void testTimedPollWithOffer() { testTimedPollWithOffer(false); } + public void testTimedPollWithOffer_fair() { testTimedPollWithOffer(true); } + public void testTimedPollWithOffer(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); + final CountDownLatch pleaseOffer = new CountDownLatch(1); + final CountDownLatch pleaseInterrupt = new CountDownLatch(1); + Thread t = newStartedThread(new CheckedRunnable() { + public void realRun() throws InterruptedException { + long startTime = System.nanoTime(); + assertNull(q.poll(timeoutMillis(), MILLISECONDS)); + assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); - /** - * Interrupted timed poll throws InterruptedException instead of - * returning timeout status - */ - public void testInterruptedTimedPoll() { - Thread t = new Thread(new Runnable() { - public void run() { - try { - SynchronousQueue q = new SynchronousQueue(); - assertNull(q.poll(SHORT_DELAY_MS, TimeUnit.MILLISECONDS)); - } catch (InterruptedException success){ - } - }}); - t.start(); - try { - Thread.sleep(SHORT_DELAY_MS); - t.interrupt(); - t.join(); - } - catch (InterruptedException ie) { - unexpectedException(); - } - } + pleaseOffer.countDown(); + startTime = System.nanoTime(); + assertSame(zero, q.poll(LONG_DELAY_MS, MILLISECONDS)); + assertTrue(millisElapsedSince(startTime) < MEDIUM_DELAY_MS); - /** - * timed poll before a delayed offer fails; after offer succeeds; - * on interruption throws - */ - public void testTimedPollWithOffer() { - final SynchronousQueue q = new SynchronousQueue(); - Thread t = new Thread(new Runnable() { - public void run() { - try { - threadAssertNull(q.poll(SHORT_DELAY_MS, TimeUnit.MILLISECONDS)); - q.poll(LONG_DELAY_MS, TimeUnit.MILLISECONDS); - q.poll(LONG_DELAY_MS, TimeUnit.MILLISECONDS); - threadShouldThrow(); - } catch (InterruptedException success) { } - } - }); - try { - t.start(); - Thread.sleep(SMALL_DELAY_MS); - assertTrue(q.offer(zero, SHORT_DELAY_MS, TimeUnit.MILLISECONDS)); - t.interrupt(); - t.join(); - } catch (Exception e){ - unexpectedException(); - } - } + Thread.currentThread().interrupt(); + try { + q.poll(LONG_DELAY_MS, MILLISECONDS); + shouldThrow(); + } catch (InterruptedException success) {} + assertFalse(Thread.interrupted()); - /** - * Interrupted timed poll throws InterruptedException instead of - * returning timeout status - */ - public void testFairInterruptedTimedPoll() { - Thread t = new Thread(new Runnable() { - public void run() { - try { - SynchronousQueue q = new SynchronousQueue(true); - assertNull(q.poll(SHORT_DELAY_MS, TimeUnit.MILLISECONDS)); - } catch (InterruptedException success){ - } - }}); - t.start(); - try { - Thread.sleep(SHORT_DELAY_MS); - t.interrupt(); - t.join(); - } - catch (InterruptedException ie) { - unexpectedException(); - } - } + pleaseInterrupt.countDown(); + try { + q.poll(LONG_DELAY_MS, MILLISECONDS); + shouldThrow(); + } catch (InterruptedException success) {} + assertFalse(Thread.interrupted()); + }}); - /** - * timed poll before a delayed offer fails; after offer succeeds; - * on interruption throws - */ - public void testFairTimedPollWithOffer() { - final SynchronousQueue q = new SynchronousQueue(true); - Thread t = new Thread(new Runnable() { - public void run() { - try { - threadAssertNull(q.poll(SHORT_DELAY_MS, TimeUnit.MILLISECONDS)); - q.poll(LONG_DELAY_MS, TimeUnit.MILLISECONDS); - q.poll(LONG_DELAY_MS, TimeUnit.MILLISECONDS); - threadShouldThrow(); - } catch (InterruptedException success) { } - } - }); - try { - t.start(); - Thread.sleep(SMALL_DELAY_MS); - assertTrue(q.offer(zero, SHORT_DELAY_MS, TimeUnit.MILLISECONDS)); - t.interrupt(); - t.join(); - } catch (Exception e){ - unexpectedException(); - } - } + await(pleaseOffer); + long startTime = System.nanoTime(); + try { assertTrue(q.offer(zero, LONG_DELAY_MS, MILLISECONDS)); } + catch (InterruptedException e) { threadUnexpectedException(e); } + assertTrue(millisElapsedSince(startTime) < MEDIUM_DELAY_MS); + await(pleaseInterrupt); + assertThreadStaysAlive(t); + t.interrupt(); + awaitTermination(t); + } /** - * peek returns null + * peek() returns null if no active putter */ - public void testPeek() { - SynchronousQueue q = new SynchronousQueue(); - assertNull(q.peek()); + public void testPeek() { testPeek(false); } + public void testPeek_fair() { testPeek(true); } + public void testPeek(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); + assertNull(q.peek()); } /** - * element throws NSEE + * element() throws NoSuchElementException if no active putter */ - public void testElement() { - SynchronousQueue q = new SynchronousQueue(); + public void testElement() { testElement(false); } + public void testElement_fair() { testElement(true); } + public void testElement(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); try { q.element(); shouldThrow(); - } - catch (NoSuchElementException success) {} + } catch (NoSuchElementException success) {} } /** - * remove throws NSEE if no active taker + * remove() throws NoSuchElementException if no active putter */ - public void testRemove() { - SynchronousQueue q = new SynchronousQueue(); + public void testRemove() { testRemove(false); } + public void testRemove_fair() { testRemove(true); } + public void testRemove(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); try { q.remove(); shouldThrow(); - } catch (NoSuchElementException success){ - } + } catch (NoSuchElementException success) {} } /** * remove(x) returns false */ - public void testRemoveElement() { - SynchronousQueue q = new SynchronousQueue(); + public void testRemoveElement() { testRemoveElement(false); } + public void testRemoveElement_fair() { testRemoveElement(true); } + public void testRemoveElement(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); assertFalse(q.remove(zero)); assertTrue(q.isEmpty()); } @@ -551,16 +335,20 @@ public class SynchronousQueueTest extend /** * contains returns false */ - public void testContains() { - SynchronousQueue q = new SynchronousQueue(); + public void testContains() { testContains(false); } + public void testContains_fair() { testContains(true); } + public void testContains(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); assertFalse(q.contains(zero)); } /** * clear ensures isEmpty */ - public void testClear() { - SynchronousQueue q = new SynchronousQueue(); + public void testClear() { testClear(false); } + public void testClear_fair() { testClear(true); } + public void testClear(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); q.clear(); assertTrue(q.isEmpty()); } @@ -568,8 +356,10 @@ public class SynchronousQueueTest extend /** * containsAll returns false unless empty */ - public void testContainsAll() { - SynchronousQueue q = new SynchronousQueue(); + public void testContainsAll() { testContainsAll(false); } + public void testContainsAll_fair() { testContainsAll(true); } + public void testContainsAll(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); Integer[] empty = new Integer[0]; assertTrue(q.containsAll(Arrays.asList(empty))); Integer[] ints = new Integer[1]; ints[0] = zero; @@ -579,8 +369,10 @@ public class SynchronousQueueTest extend /** * retainAll returns false */ - public void testRetainAll() { - SynchronousQueue q = new SynchronousQueue(); + public void testRetainAll() { testRetainAll(false); } + public void testRetainAll_fair() { testRetainAll(true); } + public void testRetainAll(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); Integer[] empty = new Integer[0]; assertFalse(q.retainAll(Arrays.asList(empty))); Integer[] ints = new Integer[1]; ints[0] = zero; @@ -590,149 +382,140 @@ public class SynchronousQueueTest extend /** * removeAll returns false */ - public void testRemoveAll() { - SynchronousQueue q = new SynchronousQueue(); + public void testRemoveAll() { testRemoveAll(false); } + public void testRemoveAll_fair() { testRemoveAll(true); } + public void testRemoveAll(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); Integer[] empty = new Integer[0]; assertFalse(q.removeAll(Arrays.asList(empty))); Integer[] ints = new Integer[1]; ints[0] = zero; assertFalse(q.containsAll(Arrays.asList(ints))); } - /** * toArray is empty */ - public void testToArray() { - SynchronousQueue q = new SynchronousQueue(); - Object[] o = q.toArray(); + public void testToArray() { testToArray(false); } + public void testToArray_fair() { testToArray(true); } + public void testToArray(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); + Object[] o = q.toArray(); assertEquals(o.length, 0); } /** * toArray(a) is nulled at position 0 */ - public void testToArray2() { - SynchronousQueue q = new SynchronousQueue(); - Integer[] ints = new Integer[1]; + public void testToArray2() { testToArray2(false); } + public void testToArray2_fair() { testToArray2(true); } + public void testToArray2(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); + Integer[] ints = new Integer[1]; assertNull(ints[0]); } /** * toArray(null) throws NPE */ - public void testToArray_BadArg() { - try { - SynchronousQueue q = new SynchronousQueue(); - Object o[] = q.toArray(null); - shouldThrow(); - } catch (NullPointerException success){} + public void testToArray_null() { testToArray_null(false); } + public void testToArray_null_fair() { testToArray_null(true); } + public void testToArray_null(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); + try { + Object o[] = q.toArray(null); + shouldThrow(); + } catch (NullPointerException success) {} } - /** * iterator does not traverse any elements */ - public void testIterator() { - SynchronousQueue q = new SynchronousQueue(); - Iterator it = q.iterator(); + public void testIterator() { testIterator(false); } + public void testIterator_fair() { testIterator(true); } + public void testIterator(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); + Iterator it = q.iterator(); assertFalse(it.hasNext()); try { Object x = it.next(); shouldThrow(); - } - catch (NoSuchElementException success) {} + } catch (NoSuchElementException success) {} } /** * iterator remove throws ISE */ - public void testIteratorRemove() { - SynchronousQueue q = new SynchronousQueue(); - Iterator it = q.iterator(); + public void testIteratorRemove() { testIteratorRemove(false); } + public void testIteratorRemove_fair() { testIteratorRemove(true); } + public void testIteratorRemove(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); + Iterator it = q.iterator(); try { it.remove(); shouldThrow(); - } - catch (IllegalStateException success) {} + } catch (IllegalStateException success) {} } /** * toString returns a non-null string */ - public void testToString() { - SynchronousQueue q = new SynchronousQueue(); + public void testToString() { testToString(false); } + public void testToString_fair() { testToString(true); } + public void testToString(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); String s = q.toString(); assertNotNull(s); } - /** * offer transfers elements across Executor tasks */ - public void testOfferInExecutor() { - final SynchronousQueue q = new SynchronousQueue(); + public void testOfferInExecutor() { testOfferInExecutor(false); } + public void testOfferInExecutor_fair() { testOfferInExecutor(true); } + public void testOfferInExecutor(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); ExecutorService executor = Executors.newFixedThreadPool(2); - final Integer one = new Integer(1); + final CheckedBarrier threadsStarted = new CheckedBarrier(2); - executor.execute(new Runnable() { - public void run() { - threadAssertFalse(q.offer(one)); - try { - threadAssertTrue(q.offer(one, MEDIUM_DELAY_MS, TimeUnit.MILLISECONDS)); - threadAssertEquals(0, q.remainingCapacity()); - } - catch (InterruptedException e) { - threadUnexpectedException(); - } - } - }); - - executor.execute(new Runnable() { - public void run() { - try { - Thread.sleep(SMALL_DELAY_MS); - threadAssertEquals(one, q.take()); - } - catch (InterruptedException e) { - threadUnexpectedException(); - } - } - }); + executor.execute(new CheckedRunnable() { + public void realRun() throws InterruptedException { + assertFalse(q.offer(one)); + threadsStarted.await(); + assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS)); + assertEquals(0, q.remainingCapacity()); + }}); + + executor.execute(new CheckedRunnable() { + public void realRun() throws InterruptedException { + threadsStarted.await(); + assertSame(one, q.take()); + }}); joinPool(executor); - } /** - * poll retrieves elements across Executor threads + * timed poll retrieves elements across Executor threads */ - public void testPollInExecutor() { - final SynchronousQueue q = new SynchronousQueue(); + public void testPollInExecutor() { testPollInExecutor(false); } + public void testPollInExecutor_fair() { testPollInExecutor(true); } + public void testPollInExecutor(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); + final CheckedBarrier threadsStarted = new CheckedBarrier(2); ExecutorService executor = Executors.newFixedThreadPool(2); - executor.execute(new Runnable() { - public void run() { - threadAssertNull(q.poll()); - try { - threadAssertTrue(null != q.poll(MEDIUM_DELAY_MS, TimeUnit.MILLISECONDS)); - threadAssertTrue(q.isEmpty()); - } - catch (InterruptedException e) { - threadUnexpectedException(); - } - } - }); - - executor.execute(new Runnable() { - public void run() { - try { - Thread.sleep(SMALL_DELAY_MS); - q.put(new Integer(1)); - } - catch (InterruptedException e) { - threadUnexpectedException(); - } - } - }); + executor.execute(new CheckedRunnable() { + public void realRun() throws InterruptedException { + assertNull(q.poll()); + threadsStarted.await(); + assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS)); + assertTrue(q.isEmpty()); + }}); + + executor.execute(new CheckedRunnable() { + public void realRun() throws InterruptedException { + threadsStarted.await(); + q.put(one); + }}); joinPool(executor); } @@ -740,55 +523,24 @@ public class SynchronousQueueTest extend /** * a deserialized serialized queue is usable */ - public void testSerialization() { - SynchronousQueue q = new SynchronousQueue(); - try { - ByteArrayOutputStream bout = new ByteArrayOutputStream(10000); - ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(bout)); - out.writeObject(q); - out.close(); - - ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray()); - ObjectInputStream in = new ObjectInputStream(new BufferedInputStream(bin)); - SynchronousQueue r = (SynchronousQueue)in.readObject(); - assertEquals(q.size(), r.size()); - while (!q.isEmpty()) - assertEquals(q.remove(), r.remove()); - } catch (Exception e){ - e.printStackTrace(); - unexpectedException(); - } - } - - /** - * drainTo(null) throws NPE - */ - public void testDrainToNull() { - SynchronousQueue q = new SynchronousQueue(); - try { - q.drainTo(null); - shouldThrow(); - } catch (NullPointerException success) { - } - } - - /** - * drainTo(this) throws IAE - */ - public void testDrainToSelf() { - SynchronousQueue q = new SynchronousQueue(); - try { - q.drainTo(q); - shouldThrow(); - } catch (IllegalArgumentException success) { - } + public void testSerialization() { testSerialization(false); } + public void testSerialization_fair() { testSerialization(true); } + public void testSerialization(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); + final SynchronousQueue r = serialClone(q); + assertTrue(q != r); + assertEquals(q.size(), r.size()); + while (!q.isEmpty()) + assertEquals(q.remove(), r.remove()); } /** * drainTo(c) of empty queue doesn't transfer elements */ - public void testDrainTo() { - SynchronousQueue q = new SynchronousQueue(); + public void testDrainTo() { testDrainTo(false); } + public void testDrainTo_fair() { testDrainTo(true); } + public void testDrainTo(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); ArrayList l = new ArrayList(); q.drainTo(l); assertEquals(q.size(), 0); @@ -798,97 +550,53 @@ public class SynchronousQueueTest extend /** * drainTo empties queue, unblocking a waiting put. */ - public void testDrainToWithActivePut() { - final SynchronousQueue q = new SynchronousQueue(); - Thread t = new Thread(new Runnable() { - public void run() { - try { - q.put(new Integer(1)); - } catch (InterruptedException ie){ - threadUnexpectedException(); - } - } - }); - try { - t.start(); - ArrayList l = new ArrayList(); - Thread.sleep(SHORT_DELAY_MS); - q.drainTo(l); - assertTrue(l.size() <= 1); - if (l.size() > 0) - assertEquals(l.get(0), new Integer(1)); - t.join(); - assertTrue(l.size() <= 1); - } catch (Exception e){ - unexpectedException(); - } - } - - /** - * drainTo(null, n) throws NPE - */ - public void testDrainToNullN() { - SynchronousQueue q = new SynchronousQueue(); - try { - q.drainTo(null, 0); - shouldThrow(); - } catch (NullPointerException success) { - } - } + public void testDrainToWithActivePut() { testDrainToWithActivePut(false); } + public void testDrainToWithActivePut_fair() { testDrainToWithActivePut(true); } + public void testDrainToWithActivePut(boolean fair) { + final SynchronousQueue q = new SynchronousQueue(fair); + Thread t = newStartedThread(new CheckedRunnable() { + public void realRun() throws InterruptedException { + q.put(one); + }}); - /** - * drainTo(this, n) throws IAE - */ - public void testDrainToSelfN() { - SynchronousQueue q = new SynchronousQueue(); - try { - q.drainTo(q, 0); - shouldThrow(); - } catch (IllegalArgumentException success) { - } + ArrayList l = new ArrayList(); + long startTime = System.nanoTime(); + while (l.isEmpty()) { + q.drainTo(l); + if (millisElapsedSince(startTime) > LONG_DELAY_MS) + fail("timed out"); + Thread.yield(); + } + assertTrue(l.size() == 1); + assertSame(one, l.get(0)); + awaitTermination(t); } /** * drainTo(c, n) empties up to n elements of queue into c */ - public void testDrainToN() { + public void testDrainToN() throws InterruptedException { final SynchronousQueue q = new SynchronousQueue(); - Thread t1 = new Thread(new Runnable() { - public void run() { - try { - q.put(one); - } catch (InterruptedException ie){ - threadUnexpectedException(); - } - } - }); - Thread t2 = new Thread(new Runnable() { - public void run() { - try { - q.put(two); - } catch (InterruptedException ie){ - threadUnexpectedException(); - } - } - }); + Thread t1 = newStartedThread(new CheckedRunnable() { + public void realRun() throws InterruptedException { + q.put(one); + }}); + + Thread t2 = newStartedThread(new CheckedRunnable() { + public void realRun() throws InterruptedException { + q.put(two); + }}); - try { - t1.start(); - t2.start(); - ArrayList l = new ArrayList(); - Thread.sleep(SHORT_DELAY_MS); - q.drainTo(l, 1); - assertTrue(l.size() == 1); - q.drainTo(l, 1); - assertTrue(l.size() == 2); - assertTrue(l.contains(one)); - assertTrue(l.contains(two)); - t1.join(); - t2.join(); - } catch (Exception e){ - unexpectedException(); - } + ArrayList l = new ArrayList(); + delay(SHORT_DELAY_MS); + q.drainTo(l, 1); + assertEquals(1, l.size()); + q.drainTo(l, 1); + assertEquals(2, l.size()); + assertTrue(l.contains(one)); + assertTrue(l.contains(two)); + awaitTermination(t1); + awaitTermination(t2); } - }