ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/LinkedTransferQueueTest.java
Revision: 1.83
Committed: Thu Sep 5 21:11:13 2019 UTC (4 years, 8 months ago) by jsr166
Branch: MAIN
Changes since 1.82: +1 -4 lines
Log Message:
testInterruptedTimedPoll: rely on awaitTermination together with LONGER_DELAY_MS

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include John Vint
6 */
7
8 import static java.util.concurrent.TimeUnit.MILLISECONDS;
9
10 import java.util.ArrayList;
11 import java.util.Arrays;
12 import java.util.Collection;
13 import java.util.Iterator;
14 import java.util.List;
15 import java.util.NoSuchElementException;
16 import java.util.Queue;
17 import java.util.concurrent.BlockingQueue;
18 import java.util.concurrent.Callable;
19 import java.util.concurrent.CountDownLatch;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.LinkedTransferQueue;
23
24 import junit.framework.Test;
25
26 @SuppressWarnings({"unchecked", "rawtypes"})
27 public class LinkedTransferQueueTest extends JSR166TestCase {
28 public static class Generic extends BlockingQueueTest {
29 protected BlockingQueue emptyCollection() {
30 return new LinkedTransferQueue();
31 }
32 }
33
34 public static void main(String[] args) {
35 main(suite(), args);
36 }
37
38 public static Test suite() {
39 class Implementation implements CollectionImplementation {
40 public Class<?> klazz() { return LinkedTransferQueue.class; }
41 public Collection emptyCollection() { return new LinkedTransferQueue(); }
42 public Object makeElement(int i) { return i; }
43 public boolean isConcurrent() { return true; }
44 public boolean permitsNulls() { return false; }
45 }
46 return newTestSuite(LinkedTransferQueueTest.class,
47 new Generic().testSuite(),
48 CollectionTest.testSuite(new Implementation()));
49 }
50
51 /**
52 * Constructor builds new queue with size being zero and empty
53 * being true
54 */
55 public void testConstructor1() {
56 assertEquals(0, new LinkedTransferQueue().size());
57 assertTrue(new LinkedTransferQueue().isEmpty());
58 }
59
60 /**
61 * Initializing constructor with null collection throws
62 * NullPointerException
63 */
64 public void testConstructor2() {
65 try {
66 new LinkedTransferQueue(null);
67 shouldThrow();
68 } catch (NullPointerException success) {}
69 }
70
71 /**
72 * Initializing from Collection of null elements throws
73 * NullPointerException
74 */
75 public void testConstructor3() {
76 Collection<Integer> elements = Arrays.asList(new Integer[SIZE]);
77 try {
78 new LinkedTransferQueue(elements);
79 shouldThrow();
80 } catch (NullPointerException success) {}
81 }
82
83 /**
84 * Initializing constructor with a collection containing some null elements
85 * throws NullPointerException
86 */
87 public void testConstructor4() {
88 Integer[] ints = new Integer[SIZE];
89 for (int i = 0; i < SIZE - 1; ++i)
90 ints[i] = i;
91 Collection<Integer> elements = Arrays.asList(ints);
92 try {
93 new LinkedTransferQueue(elements);
94 shouldThrow();
95 } catch (NullPointerException success) {}
96 }
97
98 /**
99 * Queue contains all elements of the collection it is initialized by
100 */
101 public void testConstructor5() {
102 Integer[] ints = new Integer[SIZE];
103 for (int i = 0; i < SIZE; ++i) {
104 ints[i] = i;
105 }
106 List intList = Arrays.asList(ints);
107 LinkedTransferQueue q
108 = new LinkedTransferQueue(intList);
109 assertEquals(q.size(), intList.size());
110 assertEquals(q.toString(), intList.toString());
111 assertTrue(Arrays.equals(q.toArray(),
112 intList.toArray()));
113 assertTrue(Arrays.equals(q.toArray(new Object[0]),
114 intList.toArray(new Object[0])));
115 assertTrue(Arrays.equals(q.toArray(new Object[SIZE]),
116 intList.toArray(new Object[SIZE])));
117 for (int i = 0; i < SIZE; ++i) {
118 assertEquals(ints[i], q.poll());
119 }
120 }
121
122 /**
123 * remainingCapacity() always returns Integer.MAX_VALUE
124 */
125 public void testRemainingCapacity() {
126 BlockingQueue q = populatedQueue(SIZE);
127 for (int i = 0; i < SIZE; ++i) {
128 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
129 assertEquals(SIZE - i, q.size());
130 assertEquals(i, q.remove());
131 }
132 for (int i = 0; i < SIZE; ++i) {
133 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
134 assertEquals(i, q.size());
135 assertTrue(q.add(i));
136 }
137 }
138
139 /**
140 * addAll(this) throws IllegalArgumentException
141 */
142 public void testAddAllSelf() {
143 LinkedTransferQueue q = populatedQueue(SIZE);
144 try {
145 q.addAll(q);
146 shouldThrow();
147 } catch (IllegalArgumentException success) {}
148 }
149
150 /**
151 * addAll of a collection with any null elements throws
152 * NullPointerException after possibly adding some elements
153 */
154 public void testAddAll3() {
155 LinkedTransferQueue q = new LinkedTransferQueue();
156 Integer[] ints = new Integer[SIZE];
157 for (int i = 0; i < SIZE - 1; ++i)
158 ints[i] = i;
159 try {
160 q.addAll(Arrays.asList(ints));
161 shouldThrow();
162 } catch (NullPointerException success) {}
163 }
164
165 /**
166 * Queue contains all elements, in traversal order, of successful addAll
167 */
168 public void testAddAll5() {
169 Integer[] empty = new Integer[0];
170 Integer[] ints = new Integer[SIZE];
171 for (int i = 0; i < SIZE; ++i) {
172 ints[i] = i;
173 }
174 LinkedTransferQueue q = new LinkedTransferQueue();
175 assertFalse(q.addAll(Arrays.asList(empty)));
176 assertTrue(q.addAll(Arrays.asList(ints)));
177 for (int i = 0; i < SIZE; ++i) {
178 assertEquals(ints[i], q.poll());
179 }
180 }
181
182 /**
183 * all elements successfully put are contained
184 */
185 public void testPut() {
186 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
187 for (int i = 0; i < SIZE; ++i) {
188 assertEquals(i, q.size());
189 q.put(i);
190 assertTrue(q.contains(i));
191 }
192 }
193
194 /**
195 * take retrieves elements in FIFO order
196 */
197 public void testTake() throws InterruptedException {
198 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
199 for (int i = 0; i < SIZE; ++i) {
200 assertEquals(i, (int) q.take());
201 }
202 }
203
204 /**
205 * take removes existing elements until empty, then blocks interruptibly
206 */
207 public void testBlockingTake() throws InterruptedException {
208 final BlockingQueue q = populatedQueue(SIZE);
209 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
210 Thread t = newStartedThread(new CheckedRunnable() {
211 public void realRun() throws InterruptedException {
212 for (int i = 0; i < SIZE; i++) assertEquals(i, q.take());
213
214 Thread.currentThread().interrupt();
215 try {
216 q.take();
217 shouldThrow();
218 } catch (InterruptedException success) {}
219 assertFalse(Thread.interrupted());
220
221 pleaseInterrupt.countDown();
222 try {
223 q.take();
224 shouldThrow();
225 } catch (InterruptedException success) {}
226 assertFalse(Thread.interrupted());
227 }});
228
229 await(pleaseInterrupt);
230 if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING);
231 t.interrupt();
232 awaitTermination(t);
233 }
234
235 /**
236 * poll succeeds unless empty
237 */
238 public void testPoll() throws InterruptedException {
239 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
240 for (int i = 0; i < SIZE; ++i) {
241 assertEquals(i, (int) q.poll());
242 }
243 assertNull(q.poll());
244 checkEmpty(q);
245 }
246
247 /**
248 * timed poll with zero timeout succeeds when non-empty, else times out
249 */
250 public void testTimedPoll0() throws InterruptedException {
251 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
252 for (int i = 0; i < SIZE; ++i) {
253 assertEquals(i, (int) q.poll(0, MILLISECONDS));
254 }
255 assertNull(q.poll(0, MILLISECONDS));
256 checkEmpty(q);
257 }
258
259 /**
260 * timed poll with nonzero timeout succeeds when non-empty, else times out
261 */
262 public void testTimedPoll() throws InterruptedException {
263 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
264 long startTime = System.nanoTime();
265 for (int i = 0; i < SIZE; ++i)
266 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
267 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
268
269 startTime = System.nanoTime();
270 assertNull(q.poll(timeoutMillis(), MILLISECONDS));
271 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
272 checkEmpty(q);
273 }
274
275 /**
276 * Interrupted timed poll throws InterruptedException instead of
277 * returning timeout status
278 */
279 public void testInterruptedTimedPoll() throws InterruptedException {
280 final BlockingQueue<Integer> q = populatedQueue(SIZE);
281 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
282 Thread t = newStartedThread(new CheckedRunnable() {
283 public void realRun() throws InterruptedException {
284 for (int i = 0; i < SIZE; i++)
285 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
286
287 Thread.currentThread().interrupt();
288 try {
289 q.poll(randomTimeout(), randomTimeUnit());
290 shouldThrow();
291 } catch (InterruptedException success) {}
292 assertFalse(Thread.interrupted());
293
294 pleaseInterrupt.countDown();
295 try {
296 q.poll(LONGER_DELAY_MS, MILLISECONDS);
297 shouldThrow();
298 } catch (InterruptedException success) {}
299 assertFalse(Thread.interrupted());
300 }});
301
302 await(pleaseInterrupt);
303 if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING);
304 t.interrupt();
305 awaitTermination(t);
306 checkEmpty(q);
307 }
308
309 /**
310 * timed poll after thread interrupted throws InterruptedException
311 * instead of returning timeout status
312 */
313 public void testTimedPollAfterInterrupt() throws InterruptedException {
314 final BlockingQueue<Integer> q = populatedQueue(SIZE);
315 Thread t = newStartedThread(new CheckedRunnable() {
316 public void realRun() throws InterruptedException {
317 long startTime = System.nanoTime();
318 Thread.currentThread().interrupt();
319 for (int i = 0; i < SIZE; ++i)
320 assertEquals(i, (int) q.poll(randomTimeout(), randomTimeUnit()));
321 try {
322 q.poll(randomTimeout(), randomTimeUnit());
323 shouldThrow();
324 } catch (InterruptedException success) {}
325 assertFalse(Thread.interrupted());
326 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
327 }});
328
329 awaitTermination(t);
330 checkEmpty(q);
331 }
332
333 /**
334 * peek returns next element, or null if empty
335 */
336 public void testPeek() throws InterruptedException {
337 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
338 for (int i = 0; i < SIZE; ++i) {
339 assertEquals(i, (int) q.peek());
340 assertEquals(i, (int) q.poll());
341 assertTrue(q.peek() == null ||
342 i != (int) q.peek());
343 }
344 assertNull(q.peek());
345 checkEmpty(q);
346 }
347
348 /**
349 * element returns next element, or throws NoSuchElementException if empty
350 */
351 public void testElement() throws InterruptedException {
352 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
353 for (int i = 0; i < SIZE; ++i) {
354 assertEquals(i, (int) q.element());
355 assertEquals(i, (int) q.poll());
356 }
357 try {
358 q.element();
359 shouldThrow();
360 } catch (NoSuchElementException success) {}
361 checkEmpty(q);
362 }
363
364 /**
365 * remove removes next element, or throws NoSuchElementException if empty
366 */
367 public void testRemove() throws InterruptedException {
368 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
369 for (int i = 0; i < SIZE; ++i) {
370 assertEquals(i, (int) q.remove());
371 }
372 try {
373 q.remove();
374 shouldThrow();
375 } catch (NoSuchElementException success) {}
376 checkEmpty(q);
377 }
378
379 /**
380 * An add following remove(x) succeeds
381 */
382 public void testRemoveElementAndAdd() throws InterruptedException {
383 LinkedTransferQueue q = new LinkedTransferQueue();
384 assertTrue(q.add(one));
385 assertTrue(q.add(two));
386 assertTrue(q.remove(one));
387 assertTrue(q.remove(two));
388 assertTrue(q.add(three));
389 assertSame(q.take(), three);
390 }
391
392 /**
393 * contains(x) reports true when elements added but not yet removed
394 */
395 public void testContains() {
396 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
397 for (int i = 0; i < SIZE; ++i) {
398 assertTrue(q.contains(i));
399 assertEquals(i, (int) q.poll());
400 assertFalse(q.contains(i));
401 }
402 }
403
404 /**
405 * clear removes all elements
406 */
407 public void testClear() throws InterruptedException {
408 LinkedTransferQueue q = populatedQueue(SIZE);
409 q.clear();
410 checkEmpty(q);
411 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
412 q.add(one);
413 assertFalse(q.isEmpty());
414 assertEquals(1, q.size());
415 assertTrue(q.contains(one));
416 q.clear();
417 checkEmpty(q);
418 }
419
420 /**
421 * containsAll(c) is true when c contains a subset of elements
422 */
423 public void testContainsAll() {
424 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
425 LinkedTransferQueue<Integer> p = new LinkedTransferQueue<>();
426 for (int i = 0; i < SIZE; ++i) {
427 assertTrue(q.containsAll(p));
428 assertFalse(p.containsAll(q));
429 p.add(i);
430 }
431 assertTrue(p.containsAll(q));
432 }
433
434 /**
435 * retainAll(c) retains only those elements of c and reports true
436 * if changed
437 */
438 public void testRetainAll() {
439 LinkedTransferQueue q = populatedQueue(SIZE);
440 LinkedTransferQueue p = populatedQueue(SIZE);
441 for (int i = 0; i < SIZE; ++i) {
442 boolean changed = q.retainAll(p);
443 if (i == 0) {
444 assertFalse(changed);
445 } else {
446 assertTrue(changed);
447 }
448 assertTrue(q.containsAll(p));
449 assertEquals(SIZE - i, q.size());
450 p.remove();
451 }
452 }
453
454 /**
455 * removeAll(c) removes only those elements of c and reports true
456 * if changed
457 */
458 public void testRemoveAll() {
459 for (int i = 1; i < SIZE; ++i) {
460 LinkedTransferQueue q = populatedQueue(SIZE);
461 LinkedTransferQueue p = populatedQueue(i);
462 assertTrue(q.removeAll(p));
463 assertEquals(SIZE - i, q.size());
464 for (int j = 0; j < i; ++j) {
465 assertFalse(q.contains(p.remove()));
466 }
467 }
468 }
469
470 /**
471 * toArray() contains all elements in FIFO order
472 */
473 public void testToArray() {
474 LinkedTransferQueue q = populatedQueue(SIZE);
475 Object[] a = q.toArray();
476 assertSame(Object[].class, a.getClass());
477 for (Object o : a)
478 assertSame(o, q.poll());
479 assertTrue(q.isEmpty());
480 }
481
482 /**
483 * toArray(a) contains all elements in FIFO order
484 */
485 public void testToArray2() {
486 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
487 Integer[] ints = new Integer[SIZE];
488 Integer[] array = q.toArray(ints);
489 assertSame(ints, array);
490 for (Integer o : ints)
491 assertSame(o, q.poll());
492 assertTrue(q.isEmpty());
493 }
494
495 /**
496 * toArray(incompatible array type) throws ArrayStoreException
497 */
498 public void testToArray1_BadArg() {
499 LinkedTransferQueue q = populatedQueue(SIZE);
500 try {
501 q.toArray(new String[10]);
502 shouldThrow();
503 } catch (ArrayStoreException success) {}
504 }
505
506 /**
507 * iterator iterates through all elements
508 */
509 public void testIterator() throws InterruptedException {
510 LinkedTransferQueue q = populatedQueue(SIZE);
511 Iterator it = q.iterator();
512 int i;
513 for (i = 0; it.hasNext(); i++)
514 assertTrue(q.contains(it.next()));
515 assertEquals(i, SIZE);
516 assertIteratorExhausted(it);
517
518 it = q.iterator();
519 for (i = 0; it.hasNext(); i++)
520 assertEquals(it.next(), q.take());
521 assertEquals(i, SIZE);
522 assertIteratorExhausted(it);
523 }
524
525 /**
526 * iterator of empty collection has no elements
527 */
528 public void testEmptyIterator() {
529 assertIteratorExhausted(new LinkedTransferQueue().iterator());
530 }
531
532 /**
533 * iterator.remove() removes current element
534 */
535 public void testIteratorRemove() {
536 final LinkedTransferQueue q = new LinkedTransferQueue();
537 q.add(two);
538 q.add(one);
539 q.add(three);
540
541 Iterator it = q.iterator();
542 it.next();
543 it.remove();
544
545 it = q.iterator();
546 assertSame(it.next(), one);
547 assertSame(it.next(), three);
548 assertFalse(it.hasNext());
549 }
550
551 /**
552 * iterator ordering is FIFO
553 */
554 public void testIteratorOrdering() {
555 final LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
556 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
557 q.add(one);
558 q.add(two);
559 q.add(three);
560 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
561 int k = 0;
562 for (Integer n : q) {
563 assertEquals(++k, (int) n);
564 }
565 assertEquals(3, k);
566 }
567
568 /**
569 * Modifications do not cause iterators to fail
570 */
571 public void testWeaklyConsistentIteration() {
572 final LinkedTransferQueue q = new LinkedTransferQueue();
573 q.add(one);
574 q.add(two);
575 q.add(three);
576 for (Iterator it = q.iterator(); it.hasNext();) {
577 q.remove();
578 it.next();
579 }
580 assertEquals(0, q.size());
581 }
582
583 /**
584 * toString contains toStrings of elements
585 */
586 public void testToString() {
587 LinkedTransferQueue q = populatedQueue(SIZE);
588 String s = q.toString();
589 for (int i = 0; i < SIZE; ++i) {
590 assertTrue(s.contains(String.valueOf(i)));
591 }
592 }
593
594 /**
595 * offer transfers elements across Executor tasks
596 */
597 public void testOfferInExecutor() {
598 final LinkedTransferQueue q = new LinkedTransferQueue();
599 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
600 final ExecutorService executor = Executors.newFixedThreadPool(2);
601 try (PoolCleaner cleaner = cleaner(executor)) {
602
603 executor.execute(new CheckedRunnable() {
604 public void realRun() throws InterruptedException {
605 threadsStarted.await();
606 long startTime = System.nanoTime();
607 assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
608 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
609 }});
610
611 executor.execute(new CheckedRunnable() {
612 public void realRun() throws InterruptedException {
613 threadsStarted.await();
614 assertSame(one, q.take());
615 checkEmpty(q);
616 }});
617 }
618 }
619
620 /**
621 * timed poll retrieves elements across Executor threads
622 */
623 public void testPollInExecutor() {
624 final LinkedTransferQueue q = new LinkedTransferQueue();
625 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
626 final ExecutorService executor = Executors.newFixedThreadPool(2);
627 try (PoolCleaner cleaner = cleaner(executor)) {
628
629 executor.execute(new CheckedRunnable() {
630 public void realRun() throws InterruptedException {
631 assertNull(q.poll());
632 threadsStarted.await();
633 long startTime = System.nanoTime();
634 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
635 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
636 checkEmpty(q);
637 }});
638
639 executor.execute(new CheckedRunnable() {
640 public void realRun() throws InterruptedException {
641 threadsStarted.await();
642 q.put(one);
643 }});
644 }
645 }
646
647 /**
648 * A deserialized/reserialized queue has same elements in same order
649 */
650 public void testSerialization() throws Exception {
651 Queue x = populatedQueue(SIZE);
652 Queue y = serialClone(x);
653
654 assertNotSame(y, x);
655 assertEquals(x.size(), y.size());
656 assertEquals(x.toString(), y.toString());
657 assertTrue(Arrays.equals(x.toArray(), y.toArray()));
658 while (!x.isEmpty()) {
659 assertFalse(y.isEmpty());
660 assertEquals(x.remove(), y.remove());
661 }
662 assertTrue(y.isEmpty());
663 }
664
665 /**
666 * drainTo(c) empties queue into another collection c
667 */
668 public void testDrainTo() {
669 LinkedTransferQueue q = populatedQueue(SIZE);
670 ArrayList l = new ArrayList();
671 q.drainTo(l);
672 assertEquals(0, q.size());
673 assertEquals(SIZE, l.size());
674 for (int i = 0; i < SIZE; ++i) {
675 assertEquals(i, l.get(i));
676 }
677 q.add(zero);
678 q.add(one);
679 assertFalse(q.isEmpty());
680 assertTrue(q.contains(zero));
681 assertTrue(q.contains(one));
682 l.clear();
683 q.drainTo(l);
684 assertEquals(0, q.size());
685 assertEquals(2, l.size());
686 for (int i = 0; i < 2; ++i) {
687 assertEquals(i, l.get(i));
688 }
689 }
690
691 /**
692 * drainTo(c) empties full queue, unblocking a waiting put.
693 */
694 public void testDrainToWithActivePut() throws InterruptedException {
695 final LinkedTransferQueue q = populatedQueue(SIZE);
696 Thread t = newStartedThread(new CheckedRunnable() {
697 public void realRun() {
698 q.put(SIZE + 1);
699 }});
700 ArrayList l = new ArrayList();
701 q.drainTo(l);
702 assertTrue(l.size() >= SIZE);
703 for (int i = 0; i < SIZE; ++i)
704 assertEquals(i, l.get(i));
705 awaitTermination(t);
706 assertTrue(q.size() + l.size() >= SIZE);
707 }
708
709 /**
710 * drainTo(c, n) empties first min(n, size) elements of queue into c
711 */
712 public void testDrainToN() {
713 LinkedTransferQueue q = new LinkedTransferQueue();
714 for (int i = 0; i < SIZE + 2; ++i) {
715 for (int j = 0; j < SIZE; j++) {
716 assertTrue(q.offer(j));
717 }
718 ArrayList l = new ArrayList();
719 q.drainTo(l, i);
720 int k = (i < SIZE) ? i : SIZE;
721 assertEquals(k, l.size());
722 assertEquals(SIZE - k, q.size());
723 for (int j = 0; j < k; ++j)
724 assertEquals(j, l.get(j));
725 do {} while (q.poll() != null);
726 }
727 }
728
729 /**
730 * timed poll() or take() increments the waiting consumer count;
731 * offer(e) decrements the waiting consumer count
732 */
733 public void testWaitingConsumer() throws InterruptedException {
734 final LinkedTransferQueue q = new LinkedTransferQueue();
735 assertEquals(0, q.getWaitingConsumerCount());
736 assertFalse(q.hasWaitingConsumer());
737 final CountDownLatch threadStarted = new CountDownLatch(1);
738
739 Thread t = newStartedThread(new CheckedRunnable() {
740 public void realRun() throws InterruptedException {
741 threadStarted.countDown();
742 long startTime = System.nanoTime();
743 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
744 assertEquals(0, q.getWaitingConsumerCount());
745 assertFalse(q.hasWaitingConsumer());
746 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
747 }});
748
749 threadStarted.await();
750 Callable<Boolean> oneConsumer
751 = new Callable<Boolean>() { public Boolean call() {
752 return q.hasWaitingConsumer()
753 && q.getWaitingConsumerCount() == 1; }};
754 waitForThreadToEnterWaitState(t, oneConsumer);
755
756 assertTrue(q.offer(one));
757 assertEquals(0, q.getWaitingConsumerCount());
758 assertFalse(q.hasWaitingConsumer());
759
760 awaitTermination(t);
761 }
762
763 /**
764 * transfer(null) throws NullPointerException
765 */
766 public void testTransfer1() throws InterruptedException {
767 try {
768 LinkedTransferQueue q = new LinkedTransferQueue();
769 q.transfer(null);
770 shouldThrow();
771 } catch (NullPointerException success) {}
772 }
773
774 /**
775 * transfer waits until a poll occurs. The transferred element
776 * is returned by the associated poll.
777 */
778 public void testTransfer2() throws InterruptedException {
779 final LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
780 final CountDownLatch threadStarted = new CountDownLatch(1);
781
782 Thread t = newStartedThread(new CheckedRunnable() {
783 public void realRun() throws InterruptedException {
784 threadStarted.countDown();
785 q.transfer(five);
786 checkEmpty(q);
787 }});
788
789 threadStarted.await();
790 Callable<Boolean> oneElement
791 = new Callable<Boolean>() { public Boolean call() {
792 return !q.isEmpty() && q.size() == 1; }};
793 waitForThreadToEnterWaitState(t, oneElement);
794
795 assertSame(five, q.poll());
796 checkEmpty(q);
797 awaitTermination(t);
798 }
799
800 /**
801 * transfer waits until a poll occurs, and then transfers in fifo order
802 */
803 public void testTransfer3() throws InterruptedException {
804 final LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
805
806 Thread first = newStartedThread(new CheckedRunnable() {
807 public void realRun() throws InterruptedException {
808 q.transfer(four);
809 assertFalse(q.contains(four));
810 assertEquals(1, q.size());
811 }});
812
813 Thread interruptedThread = newStartedThread(
814 new CheckedInterruptedRunnable() {
815 public void realRun() throws InterruptedException {
816 while (q.isEmpty())
817 Thread.yield();
818 q.transfer(five);
819 }});
820
821 while (q.size() < 2)
822 Thread.yield();
823 assertEquals(2, q.size());
824 assertSame(four, q.poll());
825 first.join();
826 assertEquals(1, q.size());
827 interruptedThread.interrupt();
828 interruptedThread.join();
829 checkEmpty(q);
830 }
831
832 /**
833 * transfer waits until a poll occurs, at which point the polling
834 * thread returns the element
835 */
836 public void testTransfer4() throws InterruptedException {
837 final LinkedTransferQueue q = new LinkedTransferQueue();
838
839 Thread t = newStartedThread(new CheckedRunnable() {
840 public void realRun() throws InterruptedException {
841 q.transfer(four);
842 assertFalse(q.contains(four));
843 assertSame(three, q.poll());
844 }});
845
846 while (q.isEmpty())
847 Thread.yield();
848 assertFalse(q.isEmpty());
849 assertEquals(1, q.size());
850 assertTrue(q.offer(three));
851 assertSame(four, q.poll());
852 awaitTermination(t);
853 }
854
855 /**
856 * transfer waits until a take occurs. The transferred element
857 * is returned by the associated take.
858 */
859 public void testTransfer5() throws InterruptedException {
860 final LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
861
862 Thread t = newStartedThread(new CheckedRunnable() {
863 public void realRun() throws InterruptedException {
864 q.transfer(four);
865 checkEmpty(q);
866 }});
867
868 while (q.isEmpty())
869 Thread.yield();
870 assertFalse(q.isEmpty());
871 assertEquals(1, q.size());
872 assertSame(four, q.take());
873 checkEmpty(q);
874 awaitTermination(t);
875 }
876
877 /**
878 * tryTransfer(null) throws NullPointerException
879 */
880 public void testTryTransfer1() {
881 final LinkedTransferQueue q = new LinkedTransferQueue();
882 try {
883 q.tryTransfer(null);
884 shouldThrow();
885 } catch (NullPointerException success) {}
886 }
887
888 /**
889 * tryTransfer returns false and does not enqueue if there are no
890 * consumers waiting to poll or take.
891 */
892 public void testTryTransfer2() throws InterruptedException {
893 final LinkedTransferQueue q = new LinkedTransferQueue();
894 assertFalse(q.tryTransfer(new Object()));
895 assertFalse(q.hasWaitingConsumer());
896 checkEmpty(q);
897 }
898
899 /**
900 * If there is a consumer waiting in timed poll, tryTransfer
901 * returns true while successfully transfering object.
902 */
903 public void testTryTransfer3() throws InterruptedException {
904 final Object hotPotato = new Object();
905 final LinkedTransferQueue q = new LinkedTransferQueue();
906
907 Thread t = newStartedThread(new CheckedRunnable() {
908 public void realRun() {
909 while (! q.hasWaitingConsumer())
910 Thread.yield();
911 assertTrue(q.hasWaitingConsumer());
912 checkEmpty(q);
913 assertTrue(q.tryTransfer(hotPotato));
914 }});
915
916 long startTime = System.nanoTime();
917 assertSame(hotPotato, q.poll(LONG_DELAY_MS, MILLISECONDS));
918 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
919 checkEmpty(q);
920 awaitTermination(t);
921 }
922
923 /**
924 * If there is a consumer waiting in take, tryTransfer returns
925 * true while successfully transfering object.
926 */
927 public void testTryTransfer4() throws InterruptedException {
928 final Object hotPotato = new Object();
929 final LinkedTransferQueue q = new LinkedTransferQueue();
930
931 Thread t = newStartedThread(new CheckedRunnable() {
932 public void realRun() {
933 while (! q.hasWaitingConsumer())
934 Thread.yield();
935 assertTrue(q.hasWaitingConsumer());
936 checkEmpty(q);
937 assertTrue(q.tryTransfer(hotPotato));
938 }});
939
940 assertSame(q.take(), hotPotato);
941 checkEmpty(q);
942 awaitTermination(t);
943 }
944
945 /**
946 * tryTransfer blocks interruptibly if no takers
947 */
948 public void testTryTransfer5() throws InterruptedException {
949 final LinkedTransferQueue q = new LinkedTransferQueue();
950 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
951 assertTrue(q.isEmpty());
952
953 Thread t = newStartedThread(new CheckedRunnable() {
954 public void realRun() throws InterruptedException {
955 long startTime = System.nanoTime();
956 Thread.currentThread().interrupt();
957 try {
958 q.tryTransfer(new Object(), randomTimeout(), randomTimeUnit());
959 shouldThrow();
960 } catch (InterruptedException success) {}
961 assertFalse(Thread.interrupted());
962
963 pleaseInterrupt.countDown();
964 try {
965 q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
966 shouldThrow();
967 } catch (InterruptedException success) {}
968 assertFalse(Thread.interrupted());
969
970 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
971 }});
972
973 await(pleaseInterrupt);
974 if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING);
975 t.interrupt();
976 awaitTermination(t);
977 checkEmpty(q);
978 }
979
980 /**
981 * tryTransfer gives up after the timeout and returns false
982 */
983 public void testTryTransfer6() throws InterruptedException {
984 final LinkedTransferQueue q = new LinkedTransferQueue();
985
986 Thread t = newStartedThread(new CheckedRunnable() {
987 public void realRun() throws InterruptedException {
988 long startTime = System.nanoTime();
989 assertFalse(q.tryTransfer(new Object(),
990 timeoutMillis(), MILLISECONDS));
991 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
992 checkEmpty(q);
993 }});
994
995 awaitTermination(t);
996 checkEmpty(q);
997 }
998
999 /**
1000 * tryTransfer waits for any elements previously in to be removed
1001 * before transfering to a poll or take
1002 */
1003 public void testTryTransfer7() throws InterruptedException {
1004 final LinkedTransferQueue q = new LinkedTransferQueue();
1005 assertTrue(q.offer(four));
1006
1007 Thread t = newStartedThread(new CheckedRunnable() {
1008 public void realRun() throws InterruptedException {
1009 long startTime = System.nanoTime();
1010 assertTrue(q.tryTransfer(five, LONG_DELAY_MS, MILLISECONDS));
1011 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1012 checkEmpty(q);
1013 }});
1014
1015 while (q.size() != 2)
1016 Thread.yield();
1017 assertEquals(2, q.size());
1018 assertSame(four, q.poll());
1019 assertSame(five, q.poll());
1020 checkEmpty(q);
1021 awaitTermination(t);
1022 }
1023
1024 /**
1025 * tryTransfer attempts to enqueue into the queue and fails
1026 * returning false not enqueueing and the successive poll is null
1027 */
1028 public void testTryTransfer8() throws InterruptedException {
1029 final LinkedTransferQueue q = new LinkedTransferQueue();
1030 assertTrue(q.offer(four));
1031 assertEquals(1, q.size());
1032 long startTime = System.nanoTime();
1033 assertFalse(q.tryTransfer(five, timeoutMillis(), MILLISECONDS));
1034 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
1035 assertEquals(1, q.size());
1036 assertSame(four, q.poll());
1037 assertNull(q.poll());
1038 checkEmpty(q);
1039 }
1040
1041 private LinkedTransferQueue<Integer> populatedQueue(int n) {
1042 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
1043 checkEmpty(q);
1044 for (int i = 0; i < n; i++) {
1045 assertEquals(i, q.size());
1046 assertTrue(q.offer(i));
1047 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
1048 }
1049 assertFalse(q.isEmpty());
1050 return q;
1051 }
1052
1053 /**
1054 * remove(null), contains(null) always return false
1055 */
1056 public void testNeverContainsNull() {
1057 Collection<?>[] qs = {
1058 new LinkedTransferQueue<Object>(),
1059 populatedQueue(2),
1060 };
1061
1062 for (Collection<?> q : qs) {
1063 assertFalse(q.contains(null));
1064 assertFalse(q.remove(null));
1065 }
1066 }
1067 }