ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/LinkedTransferQueueTest.java
Revision: 1.82
Committed: Sun Aug 11 22:29:27 2019 UTC (4 years, 9 months ago) by jsr166
Branch: MAIN
Changes since 1.81: +8 -7 lines
Log Message:
more assertions; more interleavings

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include John Vint
6 */
7
8 import static java.util.concurrent.TimeUnit.MILLISECONDS;
9
10 import java.util.ArrayList;
11 import java.util.Arrays;
12 import java.util.Collection;
13 import java.util.Iterator;
14 import java.util.List;
15 import java.util.NoSuchElementException;
16 import java.util.Queue;
17 import java.util.concurrent.BlockingQueue;
18 import java.util.concurrent.Callable;
19 import java.util.concurrent.CountDownLatch;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.LinkedTransferQueue;
23
24 import junit.framework.Test;
25
26 @SuppressWarnings({"unchecked", "rawtypes"})
27 public class LinkedTransferQueueTest extends JSR166TestCase {
28 public static class Generic extends BlockingQueueTest {
29 protected BlockingQueue emptyCollection() {
30 return new LinkedTransferQueue();
31 }
32 }
33
34 public static void main(String[] args) {
35 main(suite(), args);
36 }
37
38 public static Test suite() {
39 class Implementation implements CollectionImplementation {
40 public Class<?> klazz() { return LinkedTransferQueue.class; }
41 public Collection emptyCollection() { return new LinkedTransferQueue(); }
42 public Object makeElement(int i) { return i; }
43 public boolean isConcurrent() { return true; }
44 public boolean permitsNulls() { return false; }
45 }
46 return newTestSuite(LinkedTransferQueueTest.class,
47 new Generic().testSuite(),
48 CollectionTest.testSuite(new Implementation()));
49 }
50
51 /**
52 * Constructor builds new queue with size being zero and empty
53 * being true
54 */
55 public void testConstructor1() {
56 assertEquals(0, new LinkedTransferQueue().size());
57 assertTrue(new LinkedTransferQueue().isEmpty());
58 }
59
60 /**
61 * Initializing constructor with null collection throws
62 * NullPointerException
63 */
64 public void testConstructor2() {
65 try {
66 new LinkedTransferQueue(null);
67 shouldThrow();
68 } catch (NullPointerException success) {}
69 }
70
71 /**
72 * Initializing from Collection of null elements throws
73 * NullPointerException
74 */
75 public void testConstructor3() {
76 Collection<Integer> elements = Arrays.asList(new Integer[SIZE]);
77 try {
78 new LinkedTransferQueue(elements);
79 shouldThrow();
80 } catch (NullPointerException success) {}
81 }
82
83 /**
84 * Initializing constructor with a collection containing some null elements
85 * throws NullPointerException
86 */
87 public void testConstructor4() {
88 Integer[] ints = new Integer[SIZE];
89 for (int i = 0; i < SIZE - 1; ++i)
90 ints[i] = i;
91 Collection<Integer> elements = Arrays.asList(ints);
92 try {
93 new LinkedTransferQueue(elements);
94 shouldThrow();
95 } catch (NullPointerException success) {}
96 }
97
98 /**
99 * Queue contains all elements of the collection it is initialized by
100 */
101 public void testConstructor5() {
102 Integer[] ints = new Integer[SIZE];
103 for (int i = 0; i < SIZE; ++i) {
104 ints[i] = i;
105 }
106 List intList = Arrays.asList(ints);
107 LinkedTransferQueue q
108 = new LinkedTransferQueue(intList);
109 assertEquals(q.size(), intList.size());
110 assertEquals(q.toString(), intList.toString());
111 assertTrue(Arrays.equals(q.toArray(),
112 intList.toArray()));
113 assertTrue(Arrays.equals(q.toArray(new Object[0]),
114 intList.toArray(new Object[0])));
115 assertTrue(Arrays.equals(q.toArray(new Object[SIZE]),
116 intList.toArray(new Object[SIZE])));
117 for (int i = 0; i < SIZE; ++i) {
118 assertEquals(ints[i], q.poll());
119 }
120 }
121
122 /**
123 * remainingCapacity() always returns Integer.MAX_VALUE
124 */
125 public void testRemainingCapacity() {
126 BlockingQueue q = populatedQueue(SIZE);
127 for (int i = 0; i < SIZE; ++i) {
128 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
129 assertEquals(SIZE - i, q.size());
130 assertEquals(i, q.remove());
131 }
132 for (int i = 0; i < SIZE; ++i) {
133 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
134 assertEquals(i, q.size());
135 assertTrue(q.add(i));
136 }
137 }
138
139 /**
140 * addAll(this) throws IllegalArgumentException
141 */
142 public void testAddAllSelf() {
143 LinkedTransferQueue q = populatedQueue(SIZE);
144 try {
145 q.addAll(q);
146 shouldThrow();
147 } catch (IllegalArgumentException success) {}
148 }
149
150 /**
151 * addAll of a collection with any null elements throws
152 * NullPointerException after possibly adding some elements
153 */
154 public void testAddAll3() {
155 LinkedTransferQueue q = new LinkedTransferQueue();
156 Integer[] ints = new Integer[SIZE];
157 for (int i = 0; i < SIZE - 1; ++i)
158 ints[i] = i;
159 try {
160 q.addAll(Arrays.asList(ints));
161 shouldThrow();
162 } catch (NullPointerException success) {}
163 }
164
165 /**
166 * Queue contains all elements, in traversal order, of successful addAll
167 */
168 public void testAddAll5() {
169 Integer[] empty = new Integer[0];
170 Integer[] ints = new Integer[SIZE];
171 for (int i = 0; i < SIZE; ++i) {
172 ints[i] = i;
173 }
174 LinkedTransferQueue q = new LinkedTransferQueue();
175 assertFalse(q.addAll(Arrays.asList(empty)));
176 assertTrue(q.addAll(Arrays.asList(ints)));
177 for (int i = 0; i < SIZE; ++i) {
178 assertEquals(ints[i], q.poll());
179 }
180 }
181
182 /**
183 * all elements successfully put are contained
184 */
185 public void testPut() {
186 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
187 for (int i = 0; i < SIZE; ++i) {
188 assertEquals(i, q.size());
189 q.put(i);
190 assertTrue(q.contains(i));
191 }
192 }
193
194 /**
195 * take retrieves elements in FIFO order
196 */
197 public void testTake() throws InterruptedException {
198 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
199 for (int i = 0; i < SIZE; ++i) {
200 assertEquals(i, (int) q.take());
201 }
202 }
203
204 /**
205 * take removes existing elements until empty, then blocks interruptibly
206 */
207 public void testBlockingTake() throws InterruptedException {
208 final BlockingQueue q = populatedQueue(SIZE);
209 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
210 Thread t = newStartedThread(new CheckedRunnable() {
211 public void realRun() throws InterruptedException {
212 for (int i = 0; i < SIZE; i++) assertEquals(i, q.take());
213
214 Thread.currentThread().interrupt();
215 try {
216 q.take();
217 shouldThrow();
218 } catch (InterruptedException success) {}
219 assertFalse(Thread.interrupted());
220
221 pleaseInterrupt.countDown();
222 try {
223 q.take();
224 shouldThrow();
225 } catch (InterruptedException success) {}
226 assertFalse(Thread.interrupted());
227 }});
228
229 await(pleaseInterrupt);
230 if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING);
231 t.interrupt();
232 awaitTermination(t);
233 }
234
235 /**
236 * poll succeeds unless empty
237 */
238 public void testPoll() throws InterruptedException {
239 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
240 for (int i = 0; i < SIZE; ++i) {
241 assertEquals(i, (int) q.poll());
242 }
243 assertNull(q.poll());
244 checkEmpty(q);
245 }
246
247 /**
248 * timed poll with zero timeout succeeds when non-empty, else times out
249 */
250 public void testTimedPoll0() throws InterruptedException {
251 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
252 for (int i = 0; i < SIZE; ++i) {
253 assertEquals(i, (int) q.poll(0, MILLISECONDS));
254 }
255 assertNull(q.poll(0, MILLISECONDS));
256 checkEmpty(q);
257 }
258
259 /**
260 * timed poll with nonzero timeout succeeds when non-empty, else times out
261 */
262 public void testTimedPoll() throws InterruptedException {
263 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
264 long startTime = System.nanoTime();
265 for (int i = 0; i < SIZE; ++i)
266 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
267 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
268
269 startTime = System.nanoTime();
270 assertNull(q.poll(timeoutMillis(), MILLISECONDS));
271 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
272 checkEmpty(q);
273 }
274
275 /**
276 * Interrupted timed poll throws InterruptedException instead of
277 * returning timeout status
278 */
279 public void testInterruptedTimedPoll() throws InterruptedException {
280 final BlockingQueue<Integer> q = populatedQueue(SIZE);
281 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
282 Thread t = newStartedThread(new CheckedRunnable() {
283 public void realRun() throws InterruptedException {
284 long startTime = System.nanoTime();
285 for (int i = 0; i < SIZE; i++)
286 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
287
288 Thread.currentThread().interrupt();
289 try {
290 q.poll(randomTimeout(), randomTimeUnit());
291 shouldThrow();
292 } catch (InterruptedException success) {}
293 assertFalse(Thread.interrupted());
294
295 pleaseInterrupt.countDown();
296 try {
297 q.poll(LONG_DELAY_MS, MILLISECONDS);
298 shouldThrow();
299 } catch (InterruptedException success) {}
300 assertFalse(Thread.interrupted());
301
302 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
303 }});
304
305 await(pleaseInterrupt);
306 if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING);
307 t.interrupt();
308 awaitTermination(t);
309 checkEmpty(q);
310 }
311
312 /**
313 * timed poll after thread interrupted throws InterruptedException
314 * instead of returning timeout status
315 */
316 public void testTimedPollAfterInterrupt() throws InterruptedException {
317 final BlockingQueue<Integer> q = populatedQueue(SIZE);
318 Thread t = newStartedThread(new CheckedRunnable() {
319 public void realRun() throws InterruptedException {
320 long startTime = System.nanoTime();
321 Thread.currentThread().interrupt();
322 for (int i = 0; i < SIZE; ++i)
323 assertEquals(i, (int) q.poll(randomTimeout(), randomTimeUnit()));
324 try {
325 q.poll(randomTimeout(), randomTimeUnit());
326 shouldThrow();
327 } catch (InterruptedException success) {}
328 assertFalse(Thread.interrupted());
329 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
330 }});
331
332 awaitTermination(t);
333 checkEmpty(q);
334 }
335
336 /**
337 * peek returns next element, or null if empty
338 */
339 public void testPeek() throws InterruptedException {
340 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
341 for (int i = 0; i < SIZE; ++i) {
342 assertEquals(i, (int) q.peek());
343 assertEquals(i, (int) q.poll());
344 assertTrue(q.peek() == null ||
345 i != (int) q.peek());
346 }
347 assertNull(q.peek());
348 checkEmpty(q);
349 }
350
351 /**
352 * element returns next element, or throws NoSuchElementException if empty
353 */
354 public void testElement() throws InterruptedException {
355 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
356 for (int i = 0; i < SIZE; ++i) {
357 assertEquals(i, (int) q.element());
358 assertEquals(i, (int) q.poll());
359 }
360 try {
361 q.element();
362 shouldThrow();
363 } catch (NoSuchElementException success) {}
364 checkEmpty(q);
365 }
366
367 /**
368 * remove removes next element, or throws NoSuchElementException if empty
369 */
370 public void testRemove() throws InterruptedException {
371 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
372 for (int i = 0; i < SIZE; ++i) {
373 assertEquals(i, (int) q.remove());
374 }
375 try {
376 q.remove();
377 shouldThrow();
378 } catch (NoSuchElementException success) {}
379 checkEmpty(q);
380 }
381
382 /**
383 * An add following remove(x) succeeds
384 */
385 public void testRemoveElementAndAdd() throws InterruptedException {
386 LinkedTransferQueue q = new LinkedTransferQueue();
387 assertTrue(q.add(one));
388 assertTrue(q.add(two));
389 assertTrue(q.remove(one));
390 assertTrue(q.remove(two));
391 assertTrue(q.add(three));
392 assertSame(q.take(), three);
393 }
394
395 /**
396 * contains(x) reports true when elements added but not yet removed
397 */
398 public void testContains() {
399 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
400 for (int i = 0; i < SIZE; ++i) {
401 assertTrue(q.contains(i));
402 assertEquals(i, (int) q.poll());
403 assertFalse(q.contains(i));
404 }
405 }
406
407 /**
408 * clear removes all elements
409 */
410 public void testClear() throws InterruptedException {
411 LinkedTransferQueue q = populatedQueue(SIZE);
412 q.clear();
413 checkEmpty(q);
414 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
415 q.add(one);
416 assertFalse(q.isEmpty());
417 assertEquals(1, q.size());
418 assertTrue(q.contains(one));
419 q.clear();
420 checkEmpty(q);
421 }
422
423 /**
424 * containsAll(c) is true when c contains a subset of elements
425 */
426 public void testContainsAll() {
427 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
428 LinkedTransferQueue<Integer> p = new LinkedTransferQueue<>();
429 for (int i = 0; i < SIZE; ++i) {
430 assertTrue(q.containsAll(p));
431 assertFalse(p.containsAll(q));
432 p.add(i);
433 }
434 assertTrue(p.containsAll(q));
435 }
436
437 /**
438 * retainAll(c) retains only those elements of c and reports true
439 * if changed
440 */
441 public void testRetainAll() {
442 LinkedTransferQueue q = populatedQueue(SIZE);
443 LinkedTransferQueue p = populatedQueue(SIZE);
444 for (int i = 0; i < SIZE; ++i) {
445 boolean changed = q.retainAll(p);
446 if (i == 0) {
447 assertFalse(changed);
448 } else {
449 assertTrue(changed);
450 }
451 assertTrue(q.containsAll(p));
452 assertEquals(SIZE - i, q.size());
453 p.remove();
454 }
455 }
456
457 /**
458 * removeAll(c) removes only those elements of c and reports true
459 * if changed
460 */
461 public void testRemoveAll() {
462 for (int i = 1; i < SIZE; ++i) {
463 LinkedTransferQueue q = populatedQueue(SIZE);
464 LinkedTransferQueue p = populatedQueue(i);
465 assertTrue(q.removeAll(p));
466 assertEquals(SIZE - i, q.size());
467 for (int j = 0; j < i; ++j) {
468 assertFalse(q.contains(p.remove()));
469 }
470 }
471 }
472
473 /**
474 * toArray() contains all elements in FIFO order
475 */
476 public void testToArray() {
477 LinkedTransferQueue q = populatedQueue(SIZE);
478 Object[] a = q.toArray();
479 assertSame(Object[].class, a.getClass());
480 for (Object o : a)
481 assertSame(o, q.poll());
482 assertTrue(q.isEmpty());
483 }
484
485 /**
486 * toArray(a) contains all elements in FIFO order
487 */
488 public void testToArray2() {
489 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
490 Integer[] ints = new Integer[SIZE];
491 Integer[] array = q.toArray(ints);
492 assertSame(ints, array);
493 for (Integer o : ints)
494 assertSame(o, q.poll());
495 assertTrue(q.isEmpty());
496 }
497
498 /**
499 * toArray(incompatible array type) throws ArrayStoreException
500 */
501 public void testToArray1_BadArg() {
502 LinkedTransferQueue q = populatedQueue(SIZE);
503 try {
504 q.toArray(new String[10]);
505 shouldThrow();
506 } catch (ArrayStoreException success) {}
507 }
508
509 /**
510 * iterator iterates through all elements
511 */
512 public void testIterator() throws InterruptedException {
513 LinkedTransferQueue q = populatedQueue(SIZE);
514 Iterator it = q.iterator();
515 int i;
516 for (i = 0; it.hasNext(); i++)
517 assertTrue(q.contains(it.next()));
518 assertEquals(i, SIZE);
519 assertIteratorExhausted(it);
520
521 it = q.iterator();
522 for (i = 0; it.hasNext(); i++)
523 assertEquals(it.next(), q.take());
524 assertEquals(i, SIZE);
525 assertIteratorExhausted(it);
526 }
527
528 /**
529 * iterator of empty collection has no elements
530 */
531 public void testEmptyIterator() {
532 assertIteratorExhausted(new LinkedTransferQueue().iterator());
533 }
534
535 /**
536 * iterator.remove() removes current element
537 */
538 public void testIteratorRemove() {
539 final LinkedTransferQueue q = new LinkedTransferQueue();
540 q.add(two);
541 q.add(one);
542 q.add(three);
543
544 Iterator it = q.iterator();
545 it.next();
546 it.remove();
547
548 it = q.iterator();
549 assertSame(it.next(), one);
550 assertSame(it.next(), three);
551 assertFalse(it.hasNext());
552 }
553
554 /**
555 * iterator ordering is FIFO
556 */
557 public void testIteratorOrdering() {
558 final LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
559 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
560 q.add(one);
561 q.add(two);
562 q.add(three);
563 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
564 int k = 0;
565 for (Integer n : q) {
566 assertEquals(++k, (int) n);
567 }
568 assertEquals(3, k);
569 }
570
571 /**
572 * Modifications do not cause iterators to fail
573 */
574 public void testWeaklyConsistentIteration() {
575 final LinkedTransferQueue q = new LinkedTransferQueue();
576 q.add(one);
577 q.add(two);
578 q.add(three);
579 for (Iterator it = q.iterator(); it.hasNext();) {
580 q.remove();
581 it.next();
582 }
583 assertEquals(0, q.size());
584 }
585
586 /**
587 * toString contains toStrings of elements
588 */
589 public void testToString() {
590 LinkedTransferQueue q = populatedQueue(SIZE);
591 String s = q.toString();
592 for (int i = 0; i < SIZE; ++i) {
593 assertTrue(s.contains(String.valueOf(i)));
594 }
595 }
596
597 /**
598 * offer transfers elements across Executor tasks
599 */
600 public void testOfferInExecutor() {
601 final LinkedTransferQueue q = new LinkedTransferQueue();
602 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
603 final ExecutorService executor = Executors.newFixedThreadPool(2);
604 try (PoolCleaner cleaner = cleaner(executor)) {
605
606 executor.execute(new CheckedRunnable() {
607 public void realRun() throws InterruptedException {
608 threadsStarted.await();
609 long startTime = System.nanoTime();
610 assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
611 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
612 }});
613
614 executor.execute(new CheckedRunnable() {
615 public void realRun() throws InterruptedException {
616 threadsStarted.await();
617 assertSame(one, q.take());
618 checkEmpty(q);
619 }});
620 }
621 }
622
623 /**
624 * timed poll retrieves elements across Executor threads
625 */
626 public void testPollInExecutor() {
627 final LinkedTransferQueue q = new LinkedTransferQueue();
628 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
629 final ExecutorService executor = Executors.newFixedThreadPool(2);
630 try (PoolCleaner cleaner = cleaner(executor)) {
631
632 executor.execute(new CheckedRunnable() {
633 public void realRun() throws InterruptedException {
634 assertNull(q.poll());
635 threadsStarted.await();
636 long startTime = System.nanoTime();
637 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
638 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
639 checkEmpty(q);
640 }});
641
642 executor.execute(new CheckedRunnable() {
643 public void realRun() throws InterruptedException {
644 threadsStarted.await();
645 q.put(one);
646 }});
647 }
648 }
649
650 /**
651 * A deserialized/reserialized queue has same elements in same order
652 */
653 public void testSerialization() throws Exception {
654 Queue x = populatedQueue(SIZE);
655 Queue y = serialClone(x);
656
657 assertNotSame(y, x);
658 assertEquals(x.size(), y.size());
659 assertEquals(x.toString(), y.toString());
660 assertTrue(Arrays.equals(x.toArray(), y.toArray()));
661 while (!x.isEmpty()) {
662 assertFalse(y.isEmpty());
663 assertEquals(x.remove(), y.remove());
664 }
665 assertTrue(y.isEmpty());
666 }
667
668 /**
669 * drainTo(c) empties queue into another collection c
670 */
671 public void testDrainTo() {
672 LinkedTransferQueue q = populatedQueue(SIZE);
673 ArrayList l = new ArrayList();
674 q.drainTo(l);
675 assertEquals(0, q.size());
676 assertEquals(SIZE, l.size());
677 for (int i = 0; i < SIZE; ++i) {
678 assertEquals(i, l.get(i));
679 }
680 q.add(zero);
681 q.add(one);
682 assertFalse(q.isEmpty());
683 assertTrue(q.contains(zero));
684 assertTrue(q.contains(one));
685 l.clear();
686 q.drainTo(l);
687 assertEquals(0, q.size());
688 assertEquals(2, l.size());
689 for (int i = 0; i < 2; ++i) {
690 assertEquals(i, l.get(i));
691 }
692 }
693
694 /**
695 * drainTo(c) empties full queue, unblocking a waiting put.
696 */
697 public void testDrainToWithActivePut() throws InterruptedException {
698 final LinkedTransferQueue q = populatedQueue(SIZE);
699 Thread t = newStartedThread(new CheckedRunnable() {
700 public void realRun() {
701 q.put(SIZE + 1);
702 }});
703 ArrayList l = new ArrayList();
704 q.drainTo(l);
705 assertTrue(l.size() >= SIZE);
706 for (int i = 0; i < SIZE; ++i)
707 assertEquals(i, l.get(i));
708 awaitTermination(t);
709 assertTrue(q.size() + l.size() >= SIZE);
710 }
711
712 /**
713 * drainTo(c, n) empties first min(n, size) elements of queue into c
714 */
715 public void testDrainToN() {
716 LinkedTransferQueue q = new LinkedTransferQueue();
717 for (int i = 0; i < SIZE + 2; ++i) {
718 for (int j = 0; j < SIZE; j++) {
719 assertTrue(q.offer(j));
720 }
721 ArrayList l = new ArrayList();
722 q.drainTo(l, i);
723 int k = (i < SIZE) ? i : SIZE;
724 assertEquals(k, l.size());
725 assertEquals(SIZE - k, q.size());
726 for (int j = 0; j < k; ++j)
727 assertEquals(j, l.get(j));
728 do {} while (q.poll() != null);
729 }
730 }
731
732 /**
733 * timed poll() or take() increments the waiting consumer count;
734 * offer(e) decrements the waiting consumer count
735 */
736 public void testWaitingConsumer() throws InterruptedException {
737 final LinkedTransferQueue q = new LinkedTransferQueue();
738 assertEquals(0, q.getWaitingConsumerCount());
739 assertFalse(q.hasWaitingConsumer());
740 final CountDownLatch threadStarted = new CountDownLatch(1);
741
742 Thread t = newStartedThread(new CheckedRunnable() {
743 public void realRun() throws InterruptedException {
744 threadStarted.countDown();
745 long startTime = System.nanoTime();
746 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
747 assertEquals(0, q.getWaitingConsumerCount());
748 assertFalse(q.hasWaitingConsumer());
749 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
750 }});
751
752 threadStarted.await();
753 Callable<Boolean> oneConsumer
754 = new Callable<Boolean>() { public Boolean call() {
755 return q.hasWaitingConsumer()
756 && q.getWaitingConsumerCount() == 1; }};
757 waitForThreadToEnterWaitState(t, oneConsumer);
758
759 assertTrue(q.offer(one));
760 assertEquals(0, q.getWaitingConsumerCount());
761 assertFalse(q.hasWaitingConsumer());
762
763 awaitTermination(t);
764 }
765
766 /**
767 * transfer(null) throws NullPointerException
768 */
769 public void testTransfer1() throws InterruptedException {
770 try {
771 LinkedTransferQueue q = new LinkedTransferQueue();
772 q.transfer(null);
773 shouldThrow();
774 } catch (NullPointerException success) {}
775 }
776
777 /**
778 * transfer waits until a poll occurs. The transferred element
779 * is returned by the associated poll.
780 */
781 public void testTransfer2() throws InterruptedException {
782 final LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
783 final CountDownLatch threadStarted = new CountDownLatch(1);
784
785 Thread t = newStartedThread(new CheckedRunnable() {
786 public void realRun() throws InterruptedException {
787 threadStarted.countDown();
788 q.transfer(five);
789 checkEmpty(q);
790 }});
791
792 threadStarted.await();
793 Callable<Boolean> oneElement
794 = new Callable<Boolean>() { public Boolean call() {
795 return !q.isEmpty() && q.size() == 1; }};
796 waitForThreadToEnterWaitState(t, oneElement);
797
798 assertSame(five, q.poll());
799 checkEmpty(q);
800 awaitTermination(t);
801 }
802
803 /**
804 * transfer waits until a poll occurs, and then transfers in fifo order
805 */
806 public void testTransfer3() throws InterruptedException {
807 final LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
808
809 Thread first = newStartedThread(new CheckedRunnable() {
810 public void realRun() throws InterruptedException {
811 q.transfer(four);
812 assertFalse(q.contains(four));
813 assertEquals(1, q.size());
814 }});
815
816 Thread interruptedThread = newStartedThread(
817 new CheckedInterruptedRunnable() {
818 public void realRun() throws InterruptedException {
819 while (q.isEmpty())
820 Thread.yield();
821 q.transfer(five);
822 }});
823
824 while (q.size() < 2)
825 Thread.yield();
826 assertEquals(2, q.size());
827 assertSame(four, q.poll());
828 first.join();
829 assertEquals(1, q.size());
830 interruptedThread.interrupt();
831 interruptedThread.join();
832 checkEmpty(q);
833 }
834
835 /**
836 * transfer waits until a poll occurs, at which point the polling
837 * thread returns the element
838 */
839 public void testTransfer4() throws InterruptedException {
840 final LinkedTransferQueue q = new LinkedTransferQueue();
841
842 Thread t = newStartedThread(new CheckedRunnable() {
843 public void realRun() throws InterruptedException {
844 q.transfer(four);
845 assertFalse(q.contains(four));
846 assertSame(three, q.poll());
847 }});
848
849 while (q.isEmpty())
850 Thread.yield();
851 assertFalse(q.isEmpty());
852 assertEquals(1, q.size());
853 assertTrue(q.offer(three));
854 assertSame(four, q.poll());
855 awaitTermination(t);
856 }
857
858 /**
859 * transfer waits until a take occurs. The transferred element
860 * is returned by the associated take.
861 */
862 public void testTransfer5() throws InterruptedException {
863 final LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
864
865 Thread t = newStartedThread(new CheckedRunnable() {
866 public void realRun() throws InterruptedException {
867 q.transfer(four);
868 checkEmpty(q);
869 }});
870
871 while (q.isEmpty())
872 Thread.yield();
873 assertFalse(q.isEmpty());
874 assertEquals(1, q.size());
875 assertSame(four, q.take());
876 checkEmpty(q);
877 awaitTermination(t);
878 }
879
880 /**
881 * tryTransfer(null) throws NullPointerException
882 */
883 public void testTryTransfer1() {
884 final LinkedTransferQueue q = new LinkedTransferQueue();
885 try {
886 q.tryTransfer(null);
887 shouldThrow();
888 } catch (NullPointerException success) {}
889 }
890
891 /**
892 * tryTransfer returns false and does not enqueue if there are no
893 * consumers waiting to poll or take.
894 */
895 public void testTryTransfer2() throws InterruptedException {
896 final LinkedTransferQueue q = new LinkedTransferQueue();
897 assertFalse(q.tryTransfer(new Object()));
898 assertFalse(q.hasWaitingConsumer());
899 checkEmpty(q);
900 }
901
902 /**
903 * If there is a consumer waiting in timed poll, tryTransfer
904 * returns true while successfully transfering object.
905 */
906 public void testTryTransfer3() throws InterruptedException {
907 final Object hotPotato = new Object();
908 final LinkedTransferQueue q = new LinkedTransferQueue();
909
910 Thread t = newStartedThread(new CheckedRunnable() {
911 public void realRun() {
912 while (! q.hasWaitingConsumer())
913 Thread.yield();
914 assertTrue(q.hasWaitingConsumer());
915 checkEmpty(q);
916 assertTrue(q.tryTransfer(hotPotato));
917 }});
918
919 long startTime = System.nanoTime();
920 assertSame(hotPotato, q.poll(LONG_DELAY_MS, MILLISECONDS));
921 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
922 checkEmpty(q);
923 awaitTermination(t);
924 }
925
926 /**
927 * If there is a consumer waiting in take, tryTransfer returns
928 * true while successfully transfering object.
929 */
930 public void testTryTransfer4() throws InterruptedException {
931 final Object hotPotato = new Object();
932 final LinkedTransferQueue q = new LinkedTransferQueue();
933
934 Thread t = newStartedThread(new CheckedRunnable() {
935 public void realRun() {
936 while (! q.hasWaitingConsumer())
937 Thread.yield();
938 assertTrue(q.hasWaitingConsumer());
939 checkEmpty(q);
940 assertTrue(q.tryTransfer(hotPotato));
941 }});
942
943 assertSame(q.take(), hotPotato);
944 checkEmpty(q);
945 awaitTermination(t);
946 }
947
948 /**
949 * tryTransfer blocks interruptibly if no takers
950 */
951 public void testTryTransfer5() throws InterruptedException {
952 final LinkedTransferQueue q = new LinkedTransferQueue();
953 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
954 assertTrue(q.isEmpty());
955
956 Thread t = newStartedThread(new CheckedRunnable() {
957 public void realRun() throws InterruptedException {
958 long startTime = System.nanoTime();
959 Thread.currentThread().interrupt();
960 try {
961 q.tryTransfer(new Object(), randomTimeout(), randomTimeUnit());
962 shouldThrow();
963 } catch (InterruptedException success) {}
964 assertFalse(Thread.interrupted());
965
966 pleaseInterrupt.countDown();
967 try {
968 q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
969 shouldThrow();
970 } catch (InterruptedException success) {}
971 assertFalse(Thread.interrupted());
972
973 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
974 }});
975
976 await(pleaseInterrupt);
977 if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING);
978 t.interrupt();
979 awaitTermination(t);
980 checkEmpty(q);
981 }
982
983 /**
984 * tryTransfer gives up after the timeout and returns false
985 */
986 public void testTryTransfer6() throws InterruptedException {
987 final LinkedTransferQueue q = new LinkedTransferQueue();
988
989 Thread t = newStartedThread(new CheckedRunnable() {
990 public void realRun() throws InterruptedException {
991 long startTime = System.nanoTime();
992 assertFalse(q.tryTransfer(new Object(),
993 timeoutMillis(), MILLISECONDS));
994 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
995 checkEmpty(q);
996 }});
997
998 awaitTermination(t);
999 checkEmpty(q);
1000 }
1001
1002 /**
1003 * tryTransfer waits for any elements previously in to be removed
1004 * before transfering to a poll or take
1005 */
1006 public void testTryTransfer7() throws InterruptedException {
1007 final LinkedTransferQueue q = new LinkedTransferQueue();
1008 assertTrue(q.offer(four));
1009
1010 Thread t = newStartedThread(new CheckedRunnable() {
1011 public void realRun() throws InterruptedException {
1012 long startTime = System.nanoTime();
1013 assertTrue(q.tryTransfer(five, LONG_DELAY_MS, MILLISECONDS));
1014 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1015 checkEmpty(q);
1016 }});
1017
1018 while (q.size() != 2)
1019 Thread.yield();
1020 assertEquals(2, q.size());
1021 assertSame(four, q.poll());
1022 assertSame(five, q.poll());
1023 checkEmpty(q);
1024 awaitTermination(t);
1025 }
1026
1027 /**
1028 * tryTransfer attempts to enqueue into the queue and fails
1029 * returning false not enqueueing and the successive poll is null
1030 */
1031 public void testTryTransfer8() throws InterruptedException {
1032 final LinkedTransferQueue q = new LinkedTransferQueue();
1033 assertTrue(q.offer(four));
1034 assertEquals(1, q.size());
1035 long startTime = System.nanoTime();
1036 assertFalse(q.tryTransfer(five, timeoutMillis(), MILLISECONDS));
1037 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
1038 assertEquals(1, q.size());
1039 assertSame(four, q.poll());
1040 assertNull(q.poll());
1041 checkEmpty(q);
1042 }
1043
1044 private LinkedTransferQueue<Integer> populatedQueue(int n) {
1045 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
1046 checkEmpty(q);
1047 for (int i = 0; i < n; i++) {
1048 assertEquals(i, q.size());
1049 assertTrue(q.offer(i));
1050 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
1051 }
1052 assertFalse(q.isEmpty());
1053 return q;
1054 }
1055
1056 /**
1057 * remove(null), contains(null) always return false
1058 */
1059 public void testNeverContainsNull() {
1060 Collection<?>[] qs = {
1061 new LinkedTransferQueue<Object>(),
1062 populatedQueue(2),
1063 };
1064
1065 for (Collection<?> q : qs) {
1066 assertFalse(q.contains(null));
1067 assertFalse(q.remove(null));
1068 }
1069 }
1070 }