ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/LinkedTransferQueueTest.java
Revision: 1.74
Committed: Sat May 13 22:49:01 2017 UTC (7 years ago) by jsr166
Branch: MAIN
Changes since 1.73: +1 -1 lines
Log Message:
claw back some millis using assertThreadBlocks

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include John Vint
6 */
7
8 import static java.util.concurrent.TimeUnit.MILLISECONDS;
9
10 import java.util.ArrayList;
11 import java.util.Arrays;
12 import java.util.Collection;
13 import java.util.Iterator;
14 import java.util.List;
15 import java.util.NoSuchElementException;
16 import java.util.Queue;
17 import java.util.concurrent.BlockingQueue;
18 import java.util.concurrent.Callable;
19 import java.util.concurrent.CountDownLatch;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.LinkedTransferQueue;
23
24 import junit.framework.Test;
25
26 @SuppressWarnings({"unchecked", "rawtypes"})
27 public class LinkedTransferQueueTest extends JSR166TestCase {
28 public static class Generic extends BlockingQueueTest {
29 protected BlockingQueue emptyCollection() {
30 return new LinkedTransferQueue();
31 }
32 }
33
34 public static void main(String[] args) {
35 main(suite(), args);
36 }
37
38 public static Test suite() {
39 class Implementation implements CollectionImplementation {
40 public Class<?> klazz() { return LinkedTransferQueue.class; }
41 public Collection emptyCollection() { return new LinkedTransferQueue(); }
42 public Object makeElement(int i) { return i; }
43 public boolean isConcurrent() { return true; }
44 public boolean permitsNulls() { return false; }
45 }
46 return newTestSuite(LinkedTransferQueueTest.class,
47 new Generic().testSuite(),
48 CollectionTest.testSuite(new Implementation()));
49 }
50
51 /**
52 * Constructor builds new queue with size being zero and empty
53 * being true
54 */
55 public void testConstructor1() {
56 assertEquals(0, new LinkedTransferQueue().size());
57 assertTrue(new LinkedTransferQueue().isEmpty());
58 }
59
60 /**
61 * Initializing constructor with null collection throws
62 * NullPointerException
63 */
64 public void testConstructor2() {
65 try {
66 new LinkedTransferQueue(null);
67 shouldThrow();
68 } catch (NullPointerException success) {}
69 }
70
71 /**
72 * Initializing from Collection of null elements throws
73 * NullPointerException
74 */
75 public void testConstructor3() {
76 Collection<Integer> elements = Arrays.asList(new Integer[SIZE]);
77 try {
78 new LinkedTransferQueue(elements);
79 shouldThrow();
80 } catch (NullPointerException success) {}
81 }
82
83 /**
84 * Initializing constructor with a collection containing some null elements
85 * throws NullPointerException
86 */
87 public void testConstructor4() {
88 Integer[] ints = new Integer[SIZE];
89 for (int i = 0; i < SIZE - 1; ++i)
90 ints[i] = i;
91 Collection<Integer> elements = Arrays.asList(ints);
92 try {
93 new LinkedTransferQueue(elements);
94 shouldThrow();
95 } catch (NullPointerException success) {}
96 }
97
98 /**
99 * Queue contains all elements of the collection it is initialized by
100 */
101 public void testConstructor5() {
102 Integer[] ints = new Integer[SIZE];
103 for (int i = 0; i < SIZE; ++i) {
104 ints[i] = i;
105 }
106 List intList = Arrays.asList(ints);
107 LinkedTransferQueue q
108 = new LinkedTransferQueue(intList);
109 assertEquals(q.size(), intList.size());
110 assertEquals(q.toString(), intList.toString());
111 assertTrue(Arrays.equals(q.toArray(),
112 intList.toArray()));
113 assertTrue(Arrays.equals(q.toArray(new Object[0]),
114 intList.toArray(new Object[0])));
115 assertTrue(Arrays.equals(q.toArray(new Object[SIZE]),
116 intList.toArray(new Object[SIZE])));
117 for (int i = 0; i < SIZE; ++i) {
118 assertEquals(ints[i], q.poll());
119 }
120 }
121
122 /**
123 * remainingCapacity() always returns Integer.MAX_VALUE
124 */
125 public void testRemainingCapacity() {
126 BlockingQueue q = populatedQueue(SIZE);
127 for (int i = 0; i < SIZE; ++i) {
128 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
129 assertEquals(SIZE - i, q.size());
130 assertEquals(i, q.remove());
131 }
132 for (int i = 0; i < SIZE; ++i) {
133 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
134 assertEquals(i, q.size());
135 assertTrue(q.add(i));
136 }
137 }
138
139 /**
140 * addAll(this) throws IllegalArgumentException
141 */
142 public void testAddAllSelf() {
143 LinkedTransferQueue q = populatedQueue(SIZE);
144 try {
145 q.addAll(q);
146 shouldThrow();
147 } catch (IllegalArgumentException success) {}
148 }
149
150 /**
151 * addAll of a collection with any null elements throws
152 * NullPointerException after possibly adding some elements
153 */
154 public void testAddAll3() {
155 LinkedTransferQueue q = new LinkedTransferQueue();
156 Integer[] ints = new Integer[SIZE];
157 for (int i = 0; i < SIZE - 1; ++i)
158 ints[i] = i;
159 try {
160 q.addAll(Arrays.asList(ints));
161 shouldThrow();
162 } catch (NullPointerException success) {}
163 }
164
165 /**
166 * Queue contains all elements, in traversal order, of successful addAll
167 */
168 public void testAddAll5() {
169 Integer[] empty = new Integer[0];
170 Integer[] ints = new Integer[SIZE];
171 for (int i = 0; i < SIZE; ++i) {
172 ints[i] = i;
173 }
174 LinkedTransferQueue q = new LinkedTransferQueue();
175 assertFalse(q.addAll(Arrays.asList(empty)));
176 assertTrue(q.addAll(Arrays.asList(ints)));
177 for (int i = 0; i < SIZE; ++i) {
178 assertEquals(ints[i], q.poll());
179 }
180 }
181
182 /**
183 * all elements successfully put are contained
184 */
185 public void testPut() {
186 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
187 for (int i = 0; i < SIZE; ++i) {
188 assertEquals(i, q.size());
189 q.put(i);
190 assertTrue(q.contains(i));
191 }
192 }
193
194 /**
195 * take retrieves elements in FIFO order
196 */
197 public void testTake() throws InterruptedException {
198 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
199 for (int i = 0; i < SIZE; ++i) {
200 assertEquals(i, (int) q.take());
201 }
202 }
203
204 /**
205 * take removes existing elements until empty, then blocks interruptibly
206 */
207 public void testBlockingTake() throws InterruptedException {
208 final BlockingQueue q = populatedQueue(SIZE);
209 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
210 Thread t = newStartedThread(new CheckedRunnable() {
211 public void realRun() throws InterruptedException {
212 for (int i = 0; i < SIZE; i++) assertEquals(i, q.take());
213
214 Thread.currentThread().interrupt();
215 try {
216 q.take();
217 shouldThrow();
218 } catch (InterruptedException success) {}
219 assertFalse(Thread.interrupted());
220
221 pleaseInterrupt.countDown();
222 try {
223 q.take();
224 shouldThrow();
225 } catch (InterruptedException success) {}
226 assertFalse(Thread.interrupted());
227 }});
228
229 await(pleaseInterrupt);
230 assertThreadBlocks(t, Thread.State.WAITING);
231 t.interrupt();
232 awaitTermination(t);
233 }
234
235 /**
236 * poll succeeds unless empty
237 */
238 public void testPoll() throws InterruptedException {
239 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
240 for (int i = 0; i < SIZE; ++i) {
241 assertEquals(i, (int) q.poll());
242 }
243 assertNull(q.poll());
244 checkEmpty(q);
245 }
246
247 /**
248 * timed poll with zero timeout succeeds when non-empty, else times out
249 */
250 public void testTimedPoll0() throws InterruptedException {
251 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
252 for (int i = 0; i < SIZE; ++i) {
253 assertEquals(i, (int) q.poll(0, MILLISECONDS));
254 }
255 assertNull(q.poll(0, MILLISECONDS));
256 checkEmpty(q);
257 }
258
259 /**
260 * timed poll with nonzero timeout succeeds when non-empty, else times out
261 */
262 public void testTimedPoll() throws InterruptedException {
263 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
264 long startTime = System.nanoTime();
265 for (int i = 0; i < SIZE; ++i)
266 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
267 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
268
269 startTime = System.nanoTime();
270 assertNull(q.poll(timeoutMillis(), MILLISECONDS));
271 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
272 checkEmpty(q);
273 }
274
275 /**
276 * Interrupted timed poll throws InterruptedException instead of
277 * returning timeout status
278 */
279 public void testInterruptedTimedPoll() throws InterruptedException {
280 final BlockingQueue<Integer> q = populatedQueue(SIZE);
281 final CountDownLatch aboutToWait = new CountDownLatch(1);
282 Thread t = newStartedThread(new CheckedRunnable() {
283 public void realRun() throws InterruptedException {
284 long startTime = System.nanoTime();
285 for (int i = 0; i < SIZE; ++i)
286 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
287 aboutToWait.countDown();
288 try {
289 q.poll(LONG_DELAY_MS, MILLISECONDS);
290 shouldThrow();
291 } catch (InterruptedException success) {}
292 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
293 }});
294
295 await(aboutToWait);
296 assertThreadBlocks(t, Thread.State.TIMED_WAITING);
297 t.interrupt();
298 awaitTermination(t);
299 checkEmpty(q);
300 }
301
302 /**
303 * timed poll after thread interrupted throws InterruptedException
304 * instead of returning timeout status
305 */
306 public void testTimedPollAfterInterrupt() throws InterruptedException {
307 final BlockingQueue<Integer> q = populatedQueue(SIZE);
308 Thread t = newStartedThread(new CheckedRunnable() {
309 public void realRun() throws InterruptedException {
310 long startTime = System.nanoTime();
311 Thread.currentThread().interrupt();
312 for (int i = 0; i < SIZE; ++i)
313 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
314 try {
315 q.poll(LONG_DELAY_MS, MILLISECONDS);
316 shouldThrow();
317 } catch (InterruptedException success) {}
318 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
319 }});
320
321 awaitTermination(t);
322 checkEmpty(q);
323 }
324
325 /**
326 * peek returns next element, or null if empty
327 */
328 public void testPeek() throws InterruptedException {
329 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
330 for (int i = 0; i < SIZE; ++i) {
331 assertEquals(i, (int) q.peek());
332 assertEquals(i, (int) q.poll());
333 assertTrue(q.peek() == null ||
334 i != (int) q.peek());
335 }
336 assertNull(q.peek());
337 checkEmpty(q);
338 }
339
340 /**
341 * element returns next element, or throws NoSuchElementException if empty
342 */
343 public void testElement() throws InterruptedException {
344 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
345 for (int i = 0; i < SIZE; ++i) {
346 assertEquals(i, (int) q.element());
347 assertEquals(i, (int) q.poll());
348 }
349 try {
350 q.element();
351 shouldThrow();
352 } catch (NoSuchElementException success) {}
353 checkEmpty(q);
354 }
355
356 /**
357 * remove removes next element, or throws NoSuchElementException if empty
358 */
359 public void testRemove() throws InterruptedException {
360 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
361 for (int i = 0; i < SIZE; ++i) {
362 assertEquals(i, (int) q.remove());
363 }
364 try {
365 q.remove();
366 shouldThrow();
367 } catch (NoSuchElementException success) {}
368 checkEmpty(q);
369 }
370
371 /**
372 * An add following remove(x) succeeds
373 */
374 public void testRemoveElementAndAdd() throws InterruptedException {
375 LinkedTransferQueue q = new LinkedTransferQueue();
376 assertTrue(q.add(one));
377 assertTrue(q.add(two));
378 assertTrue(q.remove(one));
379 assertTrue(q.remove(two));
380 assertTrue(q.add(three));
381 assertSame(q.take(), three);
382 }
383
384 /**
385 * contains(x) reports true when elements added but not yet removed
386 */
387 public void testContains() {
388 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
389 for (int i = 0; i < SIZE; ++i) {
390 assertTrue(q.contains(i));
391 assertEquals(i, (int) q.poll());
392 assertFalse(q.contains(i));
393 }
394 }
395
396 /**
397 * clear removes all elements
398 */
399 public void testClear() throws InterruptedException {
400 LinkedTransferQueue q = populatedQueue(SIZE);
401 q.clear();
402 checkEmpty(q);
403 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
404 q.add(one);
405 assertFalse(q.isEmpty());
406 assertEquals(1, q.size());
407 assertTrue(q.contains(one));
408 q.clear();
409 checkEmpty(q);
410 }
411
412 /**
413 * containsAll(c) is true when c contains a subset of elements
414 */
415 public void testContainsAll() {
416 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
417 LinkedTransferQueue<Integer> p = new LinkedTransferQueue<>();
418 for (int i = 0; i < SIZE; ++i) {
419 assertTrue(q.containsAll(p));
420 assertFalse(p.containsAll(q));
421 p.add(i);
422 }
423 assertTrue(p.containsAll(q));
424 }
425
426 /**
427 * retainAll(c) retains only those elements of c and reports true
428 * if changed
429 */
430 public void testRetainAll() {
431 LinkedTransferQueue q = populatedQueue(SIZE);
432 LinkedTransferQueue p = populatedQueue(SIZE);
433 for (int i = 0; i < SIZE; ++i) {
434 boolean changed = q.retainAll(p);
435 if (i == 0) {
436 assertFalse(changed);
437 } else {
438 assertTrue(changed);
439 }
440 assertTrue(q.containsAll(p));
441 assertEquals(SIZE - i, q.size());
442 p.remove();
443 }
444 }
445
446 /**
447 * removeAll(c) removes only those elements of c and reports true
448 * if changed
449 */
450 public void testRemoveAll() {
451 for (int i = 1; i < SIZE; ++i) {
452 LinkedTransferQueue q = populatedQueue(SIZE);
453 LinkedTransferQueue p = populatedQueue(i);
454 assertTrue(q.removeAll(p));
455 assertEquals(SIZE - i, q.size());
456 for (int j = 0; j < i; ++j) {
457 assertFalse(q.contains(p.remove()));
458 }
459 }
460 }
461
462 /**
463 * toArray() contains all elements in FIFO order
464 */
465 public void testToArray() {
466 LinkedTransferQueue q = populatedQueue(SIZE);
467 Object[] o = q.toArray();
468 for (int i = 0; i < o.length; i++) {
469 assertSame(o[i], q.poll());
470 }
471 }
472
473 /**
474 * toArray(a) contains all elements in FIFO order
475 */
476 public void testToArray2() {
477 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
478 Integer[] ints = new Integer[SIZE];
479 Integer[] array = q.toArray(ints);
480 assertSame(ints, array);
481 for (int i = 0; i < ints.length; i++) {
482 assertSame(ints[i], q.poll());
483 }
484 }
485
486 /**
487 * toArray(incompatible array type) throws ArrayStoreException
488 */
489 public void testToArray1_BadArg() {
490 LinkedTransferQueue q = populatedQueue(SIZE);
491 try {
492 q.toArray(new String[10]);
493 shouldThrow();
494 } catch (ArrayStoreException success) {}
495 }
496
497 /**
498 * iterator iterates through all elements
499 */
500 public void testIterator() throws InterruptedException {
501 LinkedTransferQueue q = populatedQueue(SIZE);
502 Iterator it = q.iterator();
503 int i;
504 for (i = 0; it.hasNext(); i++)
505 assertTrue(q.contains(it.next()));
506 assertEquals(i, SIZE);
507 assertIteratorExhausted(it);
508
509 it = q.iterator();
510 for (i = 0; it.hasNext(); i++)
511 assertEquals(it.next(), q.take());
512 assertEquals(i, SIZE);
513 assertIteratorExhausted(it);
514 }
515
516 /**
517 * iterator of empty collection has no elements
518 */
519 public void testEmptyIterator() {
520 assertIteratorExhausted(new LinkedTransferQueue().iterator());
521 }
522
523 /**
524 * iterator.remove() removes current element
525 */
526 public void testIteratorRemove() {
527 final LinkedTransferQueue q = new LinkedTransferQueue();
528 q.add(two);
529 q.add(one);
530 q.add(three);
531
532 Iterator it = q.iterator();
533 it.next();
534 it.remove();
535
536 it = q.iterator();
537 assertSame(it.next(), one);
538 assertSame(it.next(), three);
539 assertFalse(it.hasNext());
540 }
541
542 /**
543 * iterator ordering is FIFO
544 */
545 public void testIteratorOrdering() {
546 final LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
547 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
548 q.add(one);
549 q.add(two);
550 q.add(three);
551 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
552 int k = 0;
553 for (Integer n : q) {
554 assertEquals(++k, (int) n);
555 }
556 assertEquals(3, k);
557 }
558
559 /**
560 * Modifications do not cause iterators to fail
561 */
562 public void testWeaklyConsistentIteration() {
563 final LinkedTransferQueue q = new LinkedTransferQueue();
564 q.add(one);
565 q.add(two);
566 q.add(three);
567 for (Iterator it = q.iterator(); it.hasNext();) {
568 q.remove();
569 it.next();
570 }
571 assertEquals(0, q.size());
572 }
573
574 /**
575 * toString contains toStrings of elements
576 */
577 public void testToString() {
578 LinkedTransferQueue q = populatedQueue(SIZE);
579 String s = q.toString();
580 for (int i = 0; i < SIZE; ++i) {
581 assertTrue(s.contains(String.valueOf(i)));
582 }
583 }
584
585 /**
586 * offer transfers elements across Executor tasks
587 */
588 public void testOfferInExecutor() {
589 final LinkedTransferQueue q = new LinkedTransferQueue();
590 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
591 final ExecutorService executor = Executors.newFixedThreadPool(2);
592 try (PoolCleaner cleaner = cleaner(executor)) {
593
594 executor.execute(new CheckedRunnable() {
595 public void realRun() throws InterruptedException {
596 threadsStarted.await();
597 long startTime = System.nanoTime();
598 assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
599 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
600 }});
601
602 executor.execute(new CheckedRunnable() {
603 public void realRun() throws InterruptedException {
604 threadsStarted.await();
605 assertSame(one, q.take());
606 checkEmpty(q);
607 }});
608 }
609 }
610
611 /**
612 * timed poll retrieves elements across Executor threads
613 */
614 public void testPollInExecutor() {
615 final LinkedTransferQueue q = new LinkedTransferQueue();
616 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
617 final ExecutorService executor = Executors.newFixedThreadPool(2);
618 try (PoolCleaner cleaner = cleaner(executor)) {
619
620 executor.execute(new CheckedRunnable() {
621 public void realRun() throws InterruptedException {
622 assertNull(q.poll());
623 threadsStarted.await();
624 long startTime = System.nanoTime();
625 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
626 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
627 checkEmpty(q);
628 }});
629
630 executor.execute(new CheckedRunnable() {
631 public void realRun() throws InterruptedException {
632 threadsStarted.await();
633 q.put(one);
634 }});
635 }
636 }
637
638 /**
639 * A deserialized serialized queue has same elements in same order
640 */
641 public void testSerialization() throws Exception {
642 Queue x = populatedQueue(SIZE);
643 Queue y = serialClone(x);
644
645 assertNotSame(y, x);
646 assertEquals(x.size(), y.size());
647 assertEquals(x.toString(), y.toString());
648 assertTrue(Arrays.equals(x.toArray(), y.toArray()));
649 while (!x.isEmpty()) {
650 assertFalse(y.isEmpty());
651 assertEquals(x.remove(), y.remove());
652 }
653 assertTrue(y.isEmpty());
654 }
655
656 /**
657 * drainTo(c) empties queue into another collection c
658 */
659 public void testDrainTo() {
660 LinkedTransferQueue q = populatedQueue(SIZE);
661 ArrayList l = new ArrayList();
662 q.drainTo(l);
663 assertEquals(0, q.size());
664 assertEquals(SIZE, l.size());
665 for (int i = 0; i < SIZE; ++i) {
666 assertEquals(i, l.get(i));
667 }
668 q.add(zero);
669 q.add(one);
670 assertFalse(q.isEmpty());
671 assertTrue(q.contains(zero));
672 assertTrue(q.contains(one));
673 l.clear();
674 q.drainTo(l);
675 assertEquals(0, q.size());
676 assertEquals(2, l.size());
677 for (int i = 0; i < 2; ++i) {
678 assertEquals(i, l.get(i));
679 }
680 }
681
682 /**
683 * drainTo(c) empties full queue, unblocking a waiting put.
684 */
685 public void testDrainToWithActivePut() throws InterruptedException {
686 final LinkedTransferQueue q = populatedQueue(SIZE);
687 Thread t = newStartedThread(new CheckedRunnable() {
688 public void realRun() {
689 q.put(SIZE + 1);
690 }});
691 ArrayList l = new ArrayList();
692 q.drainTo(l);
693 assertTrue(l.size() >= SIZE);
694 for (int i = 0; i < SIZE; ++i)
695 assertEquals(i, l.get(i));
696 awaitTermination(t);
697 assertTrue(q.size() + l.size() >= SIZE);
698 }
699
700 /**
701 * drainTo(c, n) empties first min(n, size) elements of queue into c
702 */
703 public void testDrainToN() {
704 LinkedTransferQueue q = new LinkedTransferQueue();
705 for (int i = 0; i < SIZE + 2; ++i) {
706 for (int j = 0; j < SIZE; j++) {
707 assertTrue(q.offer(j));
708 }
709 ArrayList l = new ArrayList();
710 q.drainTo(l, i);
711 int k = (i < SIZE) ? i : SIZE;
712 assertEquals(k, l.size());
713 assertEquals(SIZE - k, q.size());
714 for (int j = 0; j < k; ++j)
715 assertEquals(j, l.get(j));
716 do {} while (q.poll() != null);
717 }
718 }
719
720 /**
721 * timed poll() or take() increments the waiting consumer count;
722 * offer(e) decrements the waiting consumer count
723 */
724 public void testWaitingConsumer() throws InterruptedException {
725 final LinkedTransferQueue q = new LinkedTransferQueue();
726 assertEquals(0, q.getWaitingConsumerCount());
727 assertFalse(q.hasWaitingConsumer());
728 final CountDownLatch threadStarted = new CountDownLatch(1);
729
730 Thread t = newStartedThread(new CheckedRunnable() {
731 public void realRun() throws InterruptedException {
732 threadStarted.countDown();
733 long startTime = System.nanoTime();
734 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
735 assertEquals(0, q.getWaitingConsumerCount());
736 assertFalse(q.hasWaitingConsumer());
737 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
738 }});
739
740 threadStarted.await();
741 Callable<Boolean> oneConsumer
742 = new Callable<Boolean>() { public Boolean call() {
743 return q.hasWaitingConsumer()
744 && q.getWaitingConsumerCount() == 1; }};
745 waitForThreadToEnterWaitState(t, oneConsumer);
746
747 assertTrue(q.offer(one));
748 assertEquals(0, q.getWaitingConsumerCount());
749 assertFalse(q.hasWaitingConsumer());
750
751 awaitTermination(t);
752 }
753
754 /**
755 * transfer(null) throws NullPointerException
756 */
757 public void testTransfer1() throws InterruptedException {
758 try {
759 LinkedTransferQueue q = new LinkedTransferQueue();
760 q.transfer(null);
761 shouldThrow();
762 } catch (NullPointerException success) {}
763 }
764
765 /**
766 * transfer waits until a poll occurs. The transfered element
767 * is returned by the associated poll.
768 */
769 public void testTransfer2() throws InterruptedException {
770 final LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
771 final CountDownLatch threadStarted = new CountDownLatch(1);
772
773 Thread t = newStartedThread(new CheckedRunnable() {
774 public void realRun() throws InterruptedException {
775 threadStarted.countDown();
776 q.transfer(five);
777 checkEmpty(q);
778 }});
779
780 threadStarted.await();
781 Callable<Boolean> oneElement
782 = new Callable<Boolean>() { public Boolean call() {
783 return !q.isEmpty() && q.size() == 1; }};
784 waitForThreadToEnterWaitState(t, oneElement);
785
786 assertSame(five, q.poll());
787 checkEmpty(q);
788 awaitTermination(t);
789 }
790
791 /**
792 * transfer waits until a poll occurs, and then transfers in fifo order
793 */
794 public void testTransfer3() throws InterruptedException {
795 final LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
796
797 Thread first = newStartedThread(new CheckedRunnable() {
798 public void realRun() throws InterruptedException {
799 q.transfer(four);
800 assertFalse(q.contains(four));
801 assertEquals(1, q.size());
802 }});
803
804 Thread interruptedThread = newStartedThread(
805 new CheckedInterruptedRunnable() {
806 public void realRun() throws InterruptedException {
807 while (q.isEmpty())
808 Thread.yield();
809 q.transfer(five);
810 }});
811
812 while (q.size() < 2)
813 Thread.yield();
814 assertEquals(2, q.size());
815 assertSame(four, q.poll());
816 first.join();
817 assertEquals(1, q.size());
818 interruptedThread.interrupt();
819 interruptedThread.join();
820 checkEmpty(q);
821 }
822
823 /**
824 * transfer waits until a poll occurs, at which point the polling
825 * thread returns the element
826 */
827 public void testTransfer4() throws InterruptedException {
828 final LinkedTransferQueue q = new LinkedTransferQueue();
829
830 Thread t = newStartedThread(new CheckedRunnable() {
831 public void realRun() throws InterruptedException {
832 q.transfer(four);
833 assertFalse(q.contains(four));
834 assertSame(three, q.poll());
835 }});
836
837 while (q.isEmpty())
838 Thread.yield();
839 assertFalse(q.isEmpty());
840 assertEquals(1, q.size());
841 assertTrue(q.offer(three));
842 assertSame(four, q.poll());
843 awaitTermination(t);
844 }
845
846 /**
847 * transfer waits until a take occurs. The transfered element
848 * is returned by the associated take.
849 */
850 public void testTransfer5() throws InterruptedException {
851 final LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
852
853 Thread t = newStartedThread(new CheckedRunnable() {
854 public void realRun() throws InterruptedException {
855 q.transfer(four);
856 checkEmpty(q);
857 }});
858
859 while (q.isEmpty())
860 Thread.yield();
861 assertFalse(q.isEmpty());
862 assertEquals(1, q.size());
863 assertSame(four, q.take());
864 checkEmpty(q);
865 awaitTermination(t);
866 }
867
868 /**
869 * tryTransfer(null) throws NullPointerException
870 */
871 public void testTryTransfer1() {
872 final LinkedTransferQueue q = new LinkedTransferQueue();
873 try {
874 q.tryTransfer(null);
875 shouldThrow();
876 } catch (NullPointerException success) {}
877 }
878
879 /**
880 * tryTransfer returns false and does not enqueue if there are no
881 * consumers waiting to poll or take.
882 */
883 public void testTryTransfer2() throws InterruptedException {
884 final LinkedTransferQueue q = new LinkedTransferQueue();
885 assertFalse(q.tryTransfer(new Object()));
886 assertFalse(q.hasWaitingConsumer());
887 checkEmpty(q);
888 }
889
890 /**
891 * If there is a consumer waiting in timed poll, tryTransfer
892 * returns true while successfully transfering object.
893 */
894 public void testTryTransfer3() throws InterruptedException {
895 final Object hotPotato = new Object();
896 final LinkedTransferQueue q = new LinkedTransferQueue();
897
898 Thread t = newStartedThread(new CheckedRunnable() {
899 public void realRun() {
900 while (! q.hasWaitingConsumer())
901 Thread.yield();
902 assertTrue(q.hasWaitingConsumer());
903 checkEmpty(q);
904 assertTrue(q.tryTransfer(hotPotato));
905 }});
906
907 long startTime = System.nanoTime();
908 assertSame(hotPotato, q.poll(LONG_DELAY_MS, MILLISECONDS));
909 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
910 checkEmpty(q);
911 awaitTermination(t);
912 }
913
914 /**
915 * If there is a consumer waiting in take, tryTransfer returns
916 * true while successfully transfering object.
917 */
918 public void testTryTransfer4() throws InterruptedException {
919 final Object hotPotato = new Object();
920 final LinkedTransferQueue q = new LinkedTransferQueue();
921
922 Thread t = newStartedThread(new CheckedRunnable() {
923 public void realRun() {
924 while (! q.hasWaitingConsumer())
925 Thread.yield();
926 assertTrue(q.hasWaitingConsumer());
927 checkEmpty(q);
928 assertTrue(q.tryTransfer(hotPotato));
929 }});
930
931 assertSame(q.take(), hotPotato);
932 checkEmpty(q);
933 awaitTermination(t);
934 }
935
936 /**
937 * tryTransfer blocks interruptibly if no takers
938 */
939 public void testTryTransfer5() throws InterruptedException {
940 final LinkedTransferQueue q = new LinkedTransferQueue();
941 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
942 assertTrue(q.isEmpty());
943
944 Thread t = newStartedThread(new CheckedRunnable() {
945 public void realRun() throws InterruptedException {
946 long startTime = System.nanoTime();
947 Thread.currentThread().interrupt();
948 try {
949 q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
950 shouldThrow();
951 } catch (InterruptedException success) {}
952 assertFalse(Thread.interrupted());
953
954 pleaseInterrupt.countDown();
955 try {
956 q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
957 shouldThrow();
958 } catch (InterruptedException success) {}
959 assertFalse(Thread.interrupted());
960 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
961 }});
962
963 await(pleaseInterrupt);
964 assertThreadStaysAlive(t);
965 t.interrupt();
966 awaitTermination(t);
967 checkEmpty(q);
968 }
969
970 /**
971 * tryTransfer gives up after the timeout and returns false
972 */
973 public void testTryTransfer6() throws InterruptedException {
974 final LinkedTransferQueue q = new LinkedTransferQueue();
975
976 Thread t = newStartedThread(new CheckedRunnable() {
977 public void realRun() throws InterruptedException {
978 long startTime = System.nanoTime();
979 assertFalse(q.tryTransfer(new Object(),
980 timeoutMillis(), MILLISECONDS));
981 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
982 checkEmpty(q);
983 }});
984
985 awaitTermination(t);
986 checkEmpty(q);
987 }
988
989 /**
990 * tryTransfer waits for any elements previously in to be removed
991 * before transfering to a poll or take
992 */
993 public void testTryTransfer7() throws InterruptedException {
994 final LinkedTransferQueue q = new LinkedTransferQueue();
995 assertTrue(q.offer(four));
996
997 Thread t = newStartedThread(new CheckedRunnable() {
998 public void realRun() throws InterruptedException {
999 long startTime = System.nanoTime();
1000 assertTrue(q.tryTransfer(five, LONG_DELAY_MS, MILLISECONDS));
1001 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1002 checkEmpty(q);
1003 }});
1004
1005 while (q.size() != 2)
1006 Thread.yield();
1007 assertEquals(2, q.size());
1008 assertSame(four, q.poll());
1009 assertSame(five, q.poll());
1010 checkEmpty(q);
1011 awaitTermination(t);
1012 }
1013
1014 /**
1015 * tryTransfer attempts to enqueue into the queue and fails
1016 * returning false not enqueueing and the successive poll is null
1017 */
1018 public void testTryTransfer8() throws InterruptedException {
1019 final LinkedTransferQueue q = new LinkedTransferQueue();
1020 assertTrue(q.offer(four));
1021 assertEquals(1, q.size());
1022 long startTime = System.nanoTime();
1023 assertFalse(q.tryTransfer(five, timeoutMillis(), MILLISECONDS));
1024 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
1025 assertEquals(1, q.size());
1026 assertSame(four, q.poll());
1027 assertNull(q.poll());
1028 checkEmpty(q);
1029 }
1030
1031 private LinkedTransferQueue<Integer> populatedQueue(int n) {
1032 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
1033 checkEmpty(q);
1034 for (int i = 0; i < n; i++) {
1035 assertEquals(i, q.size());
1036 assertTrue(q.offer(i));
1037 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
1038 }
1039 assertFalse(q.isEmpty());
1040 return q;
1041 }
1042
1043 /**
1044 * remove(null), contains(null) always return false
1045 */
1046 public void testNeverContainsNull() {
1047 Collection<?>[] qs = {
1048 new LinkedTransferQueue<Object>(),
1049 populatedQueue(2),
1050 };
1051
1052 for (Collection<?> q : qs) {
1053 assertFalse(q.contains(null));
1054 assertFalse(q.remove(null));
1055 }
1056 }
1057 }