254 |
|
* take removes existing elements until empty, then blocks interruptibly |
255 |
|
*/ |
256 |
|
public void testBlockingTake() throws InterruptedException { |
257 |
< |
final BlockingQueue<Integer> q = populatedQueue(SIZE); |
258 |
< |
final CountDownLatch aboutToWait = new CountDownLatch(1); |
257 |
> |
final BlockingQueue q = populatedQueue(SIZE); |
258 |
> |
final CountDownLatch pleaseInterrupt = new CountDownLatch(1); |
259 |
|
Thread t = newStartedThread(new CheckedRunnable() { |
260 |
|
public void realRun() throws InterruptedException { |
261 |
|
for (int i = 0; i < SIZE; ++i) { |
262 |
< |
assertEquals(i, (int) q.take()); |
262 |
> |
assertEquals(i, q.take()); |
263 |
|
} |
264 |
< |
aboutToWait.countDown(); |
264 |
> |
|
265 |
> |
Thread.currentThread().interrupt(); |
266 |
> |
try { |
267 |
> |
q.take(); |
268 |
> |
shouldThrow(); |
269 |
> |
} catch (InterruptedException success) {} |
270 |
> |
assertFalse(Thread.interrupted()); |
271 |
> |
|
272 |
> |
pleaseInterrupt.countDown(); |
273 |
|
try { |
274 |
|
q.take(); |
275 |
|
shouldThrow(); |
276 |
|
} catch (InterruptedException success) {} |
277 |
+ |
assertFalse(Thread.interrupted()); |
278 |
|
}}); |
279 |
|
|
280 |
< |
aboutToWait.await(); |
281 |
< |
waitForThreadToEnterWaitState(t, SMALL_DELAY_MS); |
280 |
> |
await(pleaseInterrupt); |
281 |
> |
assertThreadStaysAlive(t); |
282 |
|
t.interrupt(); |
283 |
< |
awaitTermination(t, MEDIUM_DELAY_MS); |
275 |
< |
checkEmpty(q); |
283 |
> |
awaitTermination(t); |
284 |
|
} |
285 |
|
|
286 |
|
/** |
313 |
|
public void testTimedPoll() throws InterruptedException { |
314 |
|
LinkedTransferQueue<Integer> q = populatedQueue(SIZE); |
315 |
|
for (int i = 0; i < SIZE; ++i) { |
316 |
< |
long t0 = System.nanoTime(); |
317 |
< |
assertEquals(i, (int) q.poll(SMALL_DELAY_MS, MILLISECONDS)); |
318 |
< |
assertTrue(millisElapsedSince(t0) < SMALL_DELAY_MS); |
319 |
< |
} |
320 |
< |
long t0 = System.nanoTime(); |
321 |
< |
assertNull(q.poll(SHORT_DELAY_MS, MILLISECONDS)); |
322 |
< |
assertTrue(millisElapsedSince(t0) >= SHORT_DELAY_MS); |
316 |
> |
long startTime = System.nanoTime(); |
317 |
> |
assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS)); |
318 |
> |
assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); |
319 |
> |
} |
320 |
> |
long startTime = System.nanoTime(); |
321 |
> |
assertNull(q.poll(timeoutMillis(), MILLISECONDS)); |
322 |
> |
assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); |
323 |
|
checkEmpty(q); |
324 |
|
} |
325 |
|
|
654 |
|
LinkedTransferQueue q = populatedQueue(SIZE); |
655 |
|
String s = q.toString(); |
656 |
|
for (int i = 0; i < SIZE; ++i) { |
657 |
< |
assertTrue(s.indexOf(String.valueOf(i)) >= 0); |
657 |
> |
assertTrue(s.contains(String.valueOf(i))); |
658 |
|
} |
659 |
|
} |
660 |
|
|
663 |
|
*/ |
664 |
|
public void testOfferInExecutor() { |
665 |
|
final LinkedTransferQueue q = new LinkedTransferQueue(); |
666 |
< |
final CountDownLatch threadsStarted = new CountDownLatch(2); |
666 |
> |
final CheckedBarrier threadsStarted = new CheckedBarrier(2); |
667 |
|
ExecutorService executor = Executors.newFixedThreadPool(2); |
668 |
|
|
669 |
|
executor.execute(new CheckedRunnable() { |
670 |
|
public void realRun() throws InterruptedException { |
663 |
– |
threadsStarted.countDown(); |
671 |
|
threadsStarted.await(); |
672 |
< |
assertTrue(q.offer(one, MEDIUM_DELAY_MS, MILLISECONDS)); |
672 |
> |
assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS)); |
673 |
|
}}); |
674 |
|
|
675 |
|
executor.execute(new CheckedRunnable() { |
676 |
|
public void realRun() throws InterruptedException { |
670 |
– |
threadsStarted.countDown(); |
677 |
|
threadsStarted.await(); |
678 |
|
assertSame(one, q.take()); |
679 |
|
checkEmpty(q); |
687 |
|
*/ |
688 |
|
public void testPollInExecutor() { |
689 |
|
final LinkedTransferQueue q = new LinkedTransferQueue(); |
690 |
< |
final CountDownLatch threadsStarted = new CountDownLatch(2); |
690 |
> |
final CheckedBarrier threadsStarted = new CheckedBarrier(2); |
691 |
|
ExecutorService executor = Executors.newFixedThreadPool(2); |
692 |
|
|
693 |
|
executor.execute(new CheckedRunnable() { |
694 |
|
public void realRun() throws InterruptedException { |
695 |
|
assertNull(q.poll()); |
690 |
– |
threadsStarted.countDown(); |
696 |
|
threadsStarted.await(); |
697 |
< |
assertSame(one, q.poll(SMALL_DELAY_MS, MILLISECONDS)); |
697 |
> |
assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS)); |
698 |
|
checkEmpty(q); |
699 |
|
}}); |
700 |
|
|
701 |
|
executor.execute(new CheckedRunnable() { |
702 |
|
public void realRun() throws InterruptedException { |
698 |
– |
threadsStarted.countDown(); |
703 |
|
threadsStarted.await(); |
704 |
|
q.put(one); |
705 |
|
}}); |
1055 |
|
} |
1056 |
|
|
1057 |
|
/** |
1058 |
< |
* tryTransfer waits the amount given, and throws |
1055 |
< |
* InterruptedException when interrupted. |
1058 |
> |
* tryTransfer blocks interruptibly if no takers |
1059 |
|
*/ |
1060 |
|
public void testTryTransfer5() throws InterruptedException { |
1061 |
|
final LinkedTransferQueue q = new LinkedTransferQueue(); |
1062 |
< |
final CountDownLatch threadStarted = new CountDownLatch(1); |
1062 |
> |
final CountDownLatch pleaseInterrupt = new CountDownLatch(1); |
1063 |
|
assertTrue(q.isEmpty()); |
1064 |
|
|
1065 |
|
Thread t = newStartedThread(new CheckedRunnable() { |
1066 |
|
public void realRun() throws InterruptedException { |
1067 |
< |
long t0 = System.nanoTime(); |
1065 |
< |
threadStarted.countDown(); |
1067 |
> |
Thread.currentThread().interrupt(); |
1068 |
|
try { |
1069 |
|
q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS); |
1070 |
|
shouldThrow(); |
1071 |
|
} catch (InterruptedException success) {} |
1072 |
< |
assertTrue(millisElapsedSince(t0) >= SHORT_DELAY_MS); |
1073 |
< |
assertTrue(millisElapsedSince(t0) < MEDIUM_DELAY_MS); |
1072 |
> |
assertFalse(Thread.interrupted()); |
1073 |
> |
|
1074 |
> |
pleaseInterrupt.countDown(); |
1075 |
> |
try { |
1076 |
> |
q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS); |
1077 |
> |
shouldThrow(); |
1078 |
> |
} catch (InterruptedException success) {} |
1079 |
> |
assertFalse(Thread.interrupted()); |
1080 |
|
}}); |
1081 |
|
|
1082 |
< |
threadStarted.await(); |
1083 |
< |
while (q.isEmpty()) |
1076 |
< |
Thread.yield(); |
1077 |
< |
delay(SHORT_DELAY_MS); |
1082 |
> |
await(pleaseInterrupt); |
1083 |
> |
assertThreadStaysAlive(t); |
1084 |
|
t.interrupt(); |
1085 |
< |
awaitTermination(t, MEDIUM_DELAY_MS); |
1085 |
> |
awaitTermination(t); |
1086 |
|
checkEmpty(q); |
1087 |
|
} |
1088 |
|
|
1096 |
|
public void realRun() throws InterruptedException { |
1097 |
|
long t0 = System.nanoTime(); |
1098 |
|
assertFalse(q.tryTransfer(new Object(), |
1099 |
< |
SHORT_DELAY_MS, MILLISECONDS)); |
1100 |
< |
assertTrue(millisElapsedSince(t0) >= SHORT_DELAY_MS); |
1099 |
> |
timeoutMillis(), MILLISECONDS)); |
1100 |
> |
assertTrue(millisElapsedSince(t0) >= timeoutMillis()); |
1101 |
|
checkEmpty(q); |
1102 |
|
}}); |
1103 |
|
|
1104 |
< |
awaitTermination(t, MEDIUM_DELAY_MS); |
1104 |
> |
awaitTermination(t); |
1105 |
|
checkEmpty(q); |
1106 |
|
} |
1107 |
|
|
1137 |
|
assertTrue(q.offer(four)); |
1138 |
|
assertEquals(1, q.size()); |
1139 |
|
long t0 = System.nanoTime(); |
1140 |
< |
assertFalse(q.tryTransfer(five, SHORT_DELAY_MS, MILLISECONDS)); |
1141 |
< |
assertTrue(millisElapsedSince(t0) >= SHORT_DELAY_MS); |
1140 |
> |
assertFalse(q.tryTransfer(five, timeoutMillis(), MILLISECONDS)); |
1141 |
> |
assertTrue(millisElapsedSince(t0) >= timeoutMillis()); |
1142 |
|
assertEquals(1, q.size()); |
1143 |
|
assertSame(four, q.poll()); |
1144 |
|
assertNull(q.poll()); |