ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/LinkedTransferQueueTest.java
Revision: 1.76
Committed: Sun May 14 00:48:20 2017 UTC (7 years ago) by jsr166
Branch: MAIN
Changes since 1.75: +6 -3 lines
Log Message:
improve testInterruptedTimedPoll

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include John Vint
6 */
7
8 import static java.util.concurrent.TimeUnit.MILLISECONDS;
9
10 import java.util.ArrayList;
11 import java.util.Arrays;
12 import java.util.Collection;
13 import java.util.Iterator;
14 import java.util.List;
15 import java.util.NoSuchElementException;
16 import java.util.Queue;
17 import java.util.concurrent.BlockingQueue;
18 import java.util.concurrent.Callable;
19 import java.util.concurrent.CountDownLatch;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.LinkedTransferQueue;
23
24 import junit.framework.Test;
25
26 @SuppressWarnings({"unchecked", "rawtypes"})
27 public class LinkedTransferQueueTest extends JSR166TestCase {
28 public static class Generic extends BlockingQueueTest {
29 protected BlockingQueue emptyCollection() {
30 return new LinkedTransferQueue();
31 }
32 }
33
34 public static void main(String[] args) {
35 main(suite(), args);
36 }
37
38 public static Test suite() {
39 class Implementation implements CollectionImplementation {
40 public Class<?> klazz() { return LinkedTransferQueue.class; }
41 public Collection emptyCollection() { return new LinkedTransferQueue(); }
42 public Object makeElement(int i) { return i; }
43 public boolean isConcurrent() { return true; }
44 public boolean permitsNulls() { return false; }
45 }
46 return newTestSuite(LinkedTransferQueueTest.class,
47 new Generic().testSuite(),
48 CollectionTest.testSuite(new Implementation()));
49 }
50
51 /**
52 * Constructor builds new queue with size being zero and empty
53 * being true
54 */
55 public void testConstructor1() {
56 assertEquals(0, new LinkedTransferQueue().size());
57 assertTrue(new LinkedTransferQueue().isEmpty());
58 }
59
60 /**
61 * Initializing constructor with null collection throws
62 * NullPointerException
63 */
64 public void testConstructor2() {
65 try {
66 new LinkedTransferQueue(null);
67 shouldThrow();
68 } catch (NullPointerException success) {}
69 }
70
71 /**
72 * Initializing from Collection of null elements throws
73 * NullPointerException
74 */
75 public void testConstructor3() {
76 Collection<Integer> elements = Arrays.asList(new Integer[SIZE]);
77 try {
78 new LinkedTransferQueue(elements);
79 shouldThrow();
80 } catch (NullPointerException success) {}
81 }
82
83 /**
84 * Initializing constructor with a collection containing some null elements
85 * throws NullPointerException
86 */
87 public void testConstructor4() {
88 Integer[] ints = new Integer[SIZE];
89 for (int i = 0; i < SIZE - 1; ++i)
90 ints[i] = i;
91 Collection<Integer> elements = Arrays.asList(ints);
92 try {
93 new LinkedTransferQueue(elements);
94 shouldThrow();
95 } catch (NullPointerException success) {}
96 }
97
98 /**
99 * Queue contains all elements of the collection it is initialized by
100 */
101 public void testConstructor5() {
102 Integer[] ints = new Integer[SIZE];
103 for (int i = 0; i < SIZE; ++i) {
104 ints[i] = i;
105 }
106 List intList = Arrays.asList(ints);
107 LinkedTransferQueue q
108 = new LinkedTransferQueue(intList);
109 assertEquals(q.size(), intList.size());
110 assertEquals(q.toString(), intList.toString());
111 assertTrue(Arrays.equals(q.toArray(),
112 intList.toArray()));
113 assertTrue(Arrays.equals(q.toArray(new Object[0]),
114 intList.toArray(new Object[0])));
115 assertTrue(Arrays.equals(q.toArray(new Object[SIZE]),
116 intList.toArray(new Object[SIZE])));
117 for (int i = 0; i < SIZE; ++i) {
118 assertEquals(ints[i], q.poll());
119 }
120 }
121
122 /**
123 * remainingCapacity() always returns Integer.MAX_VALUE
124 */
125 public void testRemainingCapacity() {
126 BlockingQueue q = populatedQueue(SIZE);
127 for (int i = 0; i < SIZE; ++i) {
128 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
129 assertEquals(SIZE - i, q.size());
130 assertEquals(i, q.remove());
131 }
132 for (int i = 0; i < SIZE; ++i) {
133 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
134 assertEquals(i, q.size());
135 assertTrue(q.add(i));
136 }
137 }
138
139 /**
140 * addAll(this) throws IllegalArgumentException
141 */
142 public void testAddAllSelf() {
143 LinkedTransferQueue q = populatedQueue(SIZE);
144 try {
145 q.addAll(q);
146 shouldThrow();
147 } catch (IllegalArgumentException success) {}
148 }
149
150 /**
151 * addAll of a collection with any null elements throws
152 * NullPointerException after possibly adding some elements
153 */
154 public void testAddAll3() {
155 LinkedTransferQueue q = new LinkedTransferQueue();
156 Integer[] ints = new Integer[SIZE];
157 for (int i = 0; i < SIZE - 1; ++i)
158 ints[i] = i;
159 try {
160 q.addAll(Arrays.asList(ints));
161 shouldThrow();
162 } catch (NullPointerException success) {}
163 }
164
165 /**
166 * Queue contains all elements, in traversal order, of successful addAll
167 */
168 public void testAddAll5() {
169 Integer[] empty = new Integer[0];
170 Integer[] ints = new Integer[SIZE];
171 for (int i = 0; i < SIZE; ++i) {
172 ints[i] = i;
173 }
174 LinkedTransferQueue q = new LinkedTransferQueue();
175 assertFalse(q.addAll(Arrays.asList(empty)));
176 assertTrue(q.addAll(Arrays.asList(ints)));
177 for (int i = 0; i < SIZE; ++i) {
178 assertEquals(ints[i], q.poll());
179 }
180 }
181
182 /**
183 * all elements successfully put are contained
184 */
185 public void testPut() {
186 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
187 for (int i = 0; i < SIZE; ++i) {
188 assertEquals(i, q.size());
189 q.put(i);
190 assertTrue(q.contains(i));
191 }
192 }
193
194 /**
195 * take retrieves elements in FIFO order
196 */
197 public void testTake() throws InterruptedException {
198 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
199 for (int i = 0; i < SIZE; ++i) {
200 assertEquals(i, (int) q.take());
201 }
202 }
203
204 /**
205 * take removes existing elements until empty, then blocks interruptibly
206 */
207 public void testBlockingTake() throws InterruptedException {
208 final BlockingQueue q = populatedQueue(SIZE);
209 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
210 Thread t = newStartedThread(new CheckedRunnable() {
211 public void realRun() throws InterruptedException {
212 for (int i = 0; i < SIZE; i++) assertEquals(i, q.take());
213
214 Thread.currentThread().interrupt();
215 try {
216 q.take();
217 shouldThrow();
218 } catch (InterruptedException success) {}
219 assertFalse(Thread.interrupted());
220
221 pleaseInterrupt.countDown();
222 try {
223 q.take();
224 shouldThrow();
225 } catch (InterruptedException success) {}
226 assertFalse(Thread.interrupted());
227 }});
228
229 await(pleaseInterrupt);
230 assertThreadBlocks(t, Thread.State.WAITING);
231 t.interrupt();
232 awaitTermination(t);
233 }
234
235 /**
236 * poll succeeds unless empty
237 */
238 public void testPoll() throws InterruptedException {
239 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
240 for (int i = 0; i < SIZE; ++i) {
241 assertEquals(i, (int) q.poll());
242 }
243 assertNull(q.poll());
244 checkEmpty(q);
245 }
246
247 /**
248 * timed poll with zero timeout succeeds when non-empty, else times out
249 */
250 public void testTimedPoll0() throws InterruptedException {
251 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
252 for (int i = 0; i < SIZE; ++i) {
253 assertEquals(i, (int) q.poll(0, MILLISECONDS));
254 }
255 assertNull(q.poll(0, MILLISECONDS));
256 checkEmpty(q);
257 }
258
259 /**
260 * timed poll with nonzero timeout succeeds when non-empty, else times out
261 */
262 public void testTimedPoll() throws InterruptedException {
263 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
264 long startTime = System.nanoTime();
265 for (int i = 0; i < SIZE; ++i)
266 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
267 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
268
269 startTime = System.nanoTime();
270 assertNull(q.poll(timeoutMillis(), MILLISECONDS));
271 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
272 checkEmpty(q);
273 }
274
275 /**
276 * Interrupted timed poll throws InterruptedException instead of
277 * returning timeout status
278 */
279 public void testInterruptedTimedPoll() throws InterruptedException {
280 final BlockingQueue<Integer> q = populatedQueue(SIZE);
281 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
282 Thread t = newStartedThread(new CheckedRunnable() {
283 public void realRun() throws InterruptedException {
284 long startTime = System.nanoTime();
285 for (int i = 0; i < SIZE; ++i)
286 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
287
288 pleaseInterrupt.countDown();
289 try {
290 q.poll(LONG_DELAY_MS, MILLISECONDS);
291 shouldThrow();
292 } catch (InterruptedException success) {}
293 assertFalse(Thread.interrupted());
294
295 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
296 }});
297
298 await(pleaseInterrupt);
299 assertThreadBlocks(t, Thread.State.TIMED_WAITING);
300 t.interrupt();
301 awaitTermination(t);
302 checkEmpty(q);
303 }
304
305 /**
306 * timed poll after thread interrupted throws InterruptedException
307 * instead of returning timeout status
308 */
309 public void testTimedPollAfterInterrupt() throws InterruptedException {
310 final BlockingQueue<Integer> q = populatedQueue(SIZE);
311 Thread t = newStartedThread(new CheckedRunnable() {
312 public void realRun() throws InterruptedException {
313 long startTime = System.nanoTime();
314 Thread.currentThread().interrupt();
315 for (int i = 0; i < SIZE; ++i)
316 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
317 try {
318 q.poll(LONG_DELAY_MS, MILLISECONDS);
319 shouldThrow();
320 } catch (InterruptedException success) {}
321 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
322 }});
323
324 awaitTermination(t);
325 checkEmpty(q);
326 }
327
328 /**
329 * peek returns next element, or null if empty
330 */
331 public void testPeek() throws InterruptedException {
332 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
333 for (int i = 0; i < SIZE; ++i) {
334 assertEquals(i, (int) q.peek());
335 assertEquals(i, (int) q.poll());
336 assertTrue(q.peek() == null ||
337 i != (int) q.peek());
338 }
339 assertNull(q.peek());
340 checkEmpty(q);
341 }
342
343 /**
344 * element returns next element, or throws NoSuchElementException if empty
345 */
346 public void testElement() throws InterruptedException {
347 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
348 for (int i = 0; i < SIZE; ++i) {
349 assertEquals(i, (int) q.element());
350 assertEquals(i, (int) q.poll());
351 }
352 try {
353 q.element();
354 shouldThrow();
355 } catch (NoSuchElementException success) {}
356 checkEmpty(q);
357 }
358
359 /**
360 * remove removes next element, or throws NoSuchElementException if empty
361 */
362 public void testRemove() throws InterruptedException {
363 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
364 for (int i = 0; i < SIZE; ++i) {
365 assertEquals(i, (int) q.remove());
366 }
367 try {
368 q.remove();
369 shouldThrow();
370 } catch (NoSuchElementException success) {}
371 checkEmpty(q);
372 }
373
374 /**
375 * An add following remove(x) succeeds
376 */
377 public void testRemoveElementAndAdd() throws InterruptedException {
378 LinkedTransferQueue q = new LinkedTransferQueue();
379 assertTrue(q.add(one));
380 assertTrue(q.add(two));
381 assertTrue(q.remove(one));
382 assertTrue(q.remove(two));
383 assertTrue(q.add(three));
384 assertSame(q.take(), three);
385 }
386
387 /**
388 * contains(x) reports true when elements added but not yet removed
389 */
390 public void testContains() {
391 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
392 for (int i = 0; i < SIZE; ++i) {
393 assertTrue(q.contains(i));
394 assertEquals(i, (int) q.poll());
395 assertFalse(q.contains(i));
396 }
397 }
398
399 /**
400 * clear removes all elements
401 */
402 public void testClear() throws InterruptedException {
403 LinkedTransferQueue q = populatedQueue(SIZE);
404 q.clear();
405 checkEmpty(q);
406 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
407 q.add(one);
408 assertFalse(q.isEmpty());
409 assertEquals(1, q.size());
410 assertTrue(q.contains(one));
411 q.clear();
412 checkEmpty(q);
413 }
414
415 /**
416 * containsAll(c) is true when c contains a subset of elements
417 */
418 public void testContainsAll() {
419 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
420 LinkedTransferQueue<Integer> p = new LinkedTransferQueue<>();
421 for (int i = 0; i < SIZE; ++i) {
422 assertTrue(q.containsAll(p));
423 assertFalse(p.containsAll(q));
424 p.add(i);
425 }
426 assertTrue(p.containsAll(q));
427 }
428
429 /**
430 * retainAll(c) retains only those elements of c and reports true
431 * if changed
432 */
433 public void testRetainAll() {
434 LinkedTransferQueue q = populatedQueue(SIZE);
435 LinkedTransferQueue p = populatedQueue(SIZE);
436 for (int i = 0; i < SIZE; ++i) {
437 boolean changed = q.retainAll(p);
438 if (i == 0) {
439 assertFalse(changed);
440 } else {
441 assertTrue(changed);
442 }
443 assertTrue(q.containsAll(p));
444 assertEquals(SIZE - i, q.size());
445 p.remove();
446 }
447 }
448
449 /**
450 * removeAll(c) removes only those elements of c and reports true
451 * if changed
452 */
453 public void testRemoveAll() {
454 for (int i = 1; i < SIZE; ++i) {
455 LinkedTransferQueue q = populatedQueue(SIZE);
456 LinkedTransferQueue p = populatedQueue(i);
457 assertTrue(q.removeAll(p));
458 assertEquals(SIZE - i, q.size());
459 for (int j = 0; j < i; ++j) {
460 assertFalse(q.contains(p.remove()));
461 }
462 }
463 }
464
465 /**
466 * toArray() contains all elements in FIFO order
467 */
468 public void testToArray() {
469 LinkedTransferQueue q = populatedQueue(SIZE);
470 Object[] o = q.toArray();
471 for (int i = 0; i < o.length; i++) {
472 assertSame(o[i], q.poll());
473 }
474 }
475
476 /**
477 * toArray(a) contains all elements in FIFO order
478 */
479 public void testToArray2() {
480 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
481 Integer[] ints = new Integer[SIZE];
482 Integer[] array = q.toArray(ints);
483 assertSame(ints, array);
484 for (int i = 0; i < ints.length; i++) {
485 assertSame(ints[i], q.poll());
486 }
487 }
488
489 /**
490 * toArray(incompatible array type) throws ArrayStoreException
491 */
492 public void testToArray1_BadArg() {
493 LinkedTransferQueue q = populatedQueue(SIZE);
494 try {
495 q.toArray(new String[10]);
496 shouldThrow();
497 } catch (ArrayStoreException success) {}
498 }
499
500 /**
501 * iterator iterates through all elements
502 */
503 public void testIterator() throws InterruptedException {
504 LinkedTransferQueue q = populatedQueue(SIZE);
505 Iterator it = q.iterator();
506 int i;
507 for (i = 0; it.hasNext(); i++)
508 assertTrue(q.contains(it.next()));
509 assertEquals(i, SIZE);
510 assertIteratorExhausted(it);
511
512 it = q.iterator();
513 for (i = 0; it.hasNext(); i++)
514 assertEquals(it.next(), q.take());
515 assertEquals(i, SIZE);
516 assertIteratorExhausted(it);
517 }
518
519 /**
520 * iterator of empty collection has no elements
521 */
522 public void testEmptyIterator() {
523 assertIteratorExhausted(new LinkedTransferQueue().iterator());
524 }
525
526 /**
527 * iterator.remove() removes current element
528 */
529 public void testIteratorRemove() {
530 final LinkedTransferQueue q = new LinkedTransferQueue();
531 q.add(two);
532 q.add(one);
533 q.add(three);
534
535 Iterator it = q.iterator();
536 it.next();
537 it.remove();
538
539 it = q.iterator();
540 assertSame(it.next(), one);
541 assertSame(it.next(), three);
542 assertFalse(it.hasNext());
543 }
544
545 /**
546 * iterator ordering is FIFO
547 */
548 public void testIteratorOrdering() {
549 final LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
550 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
551 q.add(one);
552 q.add(two);
553 q.add(three);
554 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
555 int k = 0;
556 for (Integer n : q) {
557 assertEquals(++k, (int) n);
558 }
559 assertEquals(3, k);
560 }
561
562 /**
563 * Modifications do not cause iterators to fail
564 */
565 public void testWeaklyConsistentIteration() {
566 final LinkedTransferQueue q = new LinkedTransferQueue();
567 q.add(one);
568 q.add(two);
569 q.add(three);
570 for (Iterator it = q.iterator(); it.hasNext();) {
571 q.remove();
572 it.next();
573 }
574 assertEquals(0, q.size());
575 }
576
577 /**
578 * toString contains toStrings of elements
579 */
580 public void testToString() {
581 LinkedTransferQueue q = populatedQueue(SIZE);
582 String s = q.toString();
583 for (int i = 0; i < SIZE; ++i) {
584 assertTrue(s.contains(String.valueOf(i)));
585 }
586 }
587
588 /**
589 * offer transfers elements across Executor tasks
590 */
591 public void testOfferInExecutor() {
592 final LinkedTransferQueue q = new LinkedTransferQueue();
593 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
594 final ExecutorService executor = Executors.newFixedThreadPool(2);
595 try (PoolCleaner cleaner = cleaner(executor)) {
596
597 executor.execute(new CheckedRunnable() {
598 public void realRun() throws InterruptedException {
599 threadsStarted.await();
600 long startTime = System.nanoTime();
601 assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
602 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
603 }});
604
605 executor.execute(new CheckedRunnable() {
606 public void realRun() throws InterruptedException {
607 threadsStarted.await();
608 assertSame(one, q.take());
609 checkEmpty(q);
610 }});
611 }
612 }
613
614 /**
615 * timed poll retrieves elements across Executor threads
616 */
617 public void testPollInExecutor() {
618 final LinkedTransferQueue q = new LinkedTransferQueue();
619 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
620 final ExecutorService executor = Executors.newFixedThreadPool(2);
621 try (PoolCleaner cleaner = cleaner(executor)) {
622
623 executor.execute(new CheckedRunnable() {
624 public void realRun() throws InterruptedException {
625 assertNull(q.poll());
626 threadsStarted.await();
627 long startTime = System.nanoTime();
628 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
629 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
630 checkEmpty(q);
631 }});
632
633 executor.execute(new CheckedRunnable() {
634 public void realRun() throws InterruptedException {
635 threadsStarted.await();
636 q.put(one);
637 }});
638 }
639 }
640
641 /**
642 * A deserialized serialized queue has same elements in same order
643 */
644 public void testSerialization() throws Exception {
645 Queue x = populatedQueue(SIZE);
646 Queue y = serialClone(x);
647
648 assertNotSame(y, x);
649 assertEquals(x.size(), y.size());
650 assertEquals(x.toString(), y.toString());
651 assertTrue(Arrays.equals(x.toArray(), y.toArray()));
652 while (!x.isEmpty()) {
653 assertFalse(y.isEmpty());
654 assertEquals(x.remove(), y.remove());
655 }
656 assertTrue(y.isEmpty());
657 }
658
659 /**
660 * drainTo(c) empties queue into another collection c
661 */
662 public void testDrainTo() {
663 LinkedTransferQueue q = populatedQueue(SIZE);
664 ArrayList l = new ArrayList();
665 q.drainTo(l);
666 assertEquals(0, q.size());
667 assertEquals(SIZE, l.size());
668 for (int i = 0; i < SIZE; ++i) {
669 assertEquals(i, l.get(i));
670 }
671 q.add(zero);
672 q.add(one);
673 assertFalse(q.isEmpty());
674 assertTrue(q.contains(zero));
675 assertTrue(q.contains(one));
676 l.clear();
677 q.drainTo(l);
678 assertEquals(0, q.size());
679 assertEquals(2, l.size());
680 for (int i = 0; i < 2; ++i) {
681 assertEquals(i, l.get(i));
682 }
683 }
684
685 /**
686 * drainTo(c) empties full queue, unblocking a waiting put.
687 */
688 public void testDrainToWithActivePut() throws InterruptedException {
689 final LinkedTransferQueue q = populatedQueue(SIZE);
690 Thread t = newStartedThread(new CheckedRunnable() {
691 public void realRun() {
692 q.put(SIZE + 1);
693 }});
694 ArrayList l = new ArrayList();
695 q.drainTo(l);
696 assertTrue(l.size() >= SIZE);
697 for (int i = 0; i < SIZE; ++i)
698 assertEquals(i, l.get(i));
699 awaitTermination(t);
700 assertTrue(q.size() + l.size() >= SIZE);
701 }
702
703 /**
704 * drainTo(c, n) empties first min(n, size) elements of queue into c
705 */
706 public void testDrainToN() {
707 LinkedTransferQueue q = new LinkedTransferQueue();
708 for (int i = 0; i < SIZE + 2; ++i) {
709 for (int j = 0; j < SIZE; j++) {
710 assertTrue(q.offer(j));
711 }
712 ArrayList l = new ArrayList();
713 q.drainTo(l, i);
714 int k = (i < SIZE) ? i : SIZE;
715 assertEquals(k, l.size());
716 assertEquals(SIZE - k, q.size());
717 for (int j = 0; j < k; ++j)
718 assertEquals(j, l.get(j));
719 do {} while (q.poll() != null);
720 }
721 }
722
723 /**
724 * timed poll() or take() increments the waiting consumer count;
725 * offer(e) decrements the waiting consumer count
726 */
727 public void testWaitingConsumer() throws InterruptedException {
728 final LinkedTransferQueue q = new LinkedTransferQueue();
729 assertEquals(0, q.getWaitingConsumerCount());
730 assertFalse(q.hasWaitingConsumer());
731 final CountDownLatch threadStarted = new CountDownLatch(1);
732
733 Thread t = newStartedThread(new CheckedRunnable() {
734 public void realRun() throws InterruptedException {
735 threadStarted.countDown();
736 long startTime = System.nanoTime();
737 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
738 assertEquals(0, q.getWaitingConsumerCount());
739 assertFalse(q.hasWaitingConsumer());
740 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
741 }});
742
743 threadStarted.await();
744 Callable<Boolean> oneConsumer
745 = new Callable<Boolean>() { public Boolean call() {
746 return q.hasWaitingConsumer()
747 && q.getWaitingConsumerCount() == 1; }};
748 waitForThreadToEnterWaitState(t, oneConsumer);
749
750 assertTrue(q.offer(one));
751 assertEquals(0, q.getWaitingConsumerCount());
752 assertFalse(q.hasWaitingConsumer());
753
754 awaitTermination(t);
755 }
756
757 /**
758 * transfer(null) throws NullPointerException
759 */
760 public void testTransfer1() throws InterruptedException {
761 try {
762 LinkedTransferQueue q = new LinkedTransferQueue();
763 q.transfer(null);
764 shouldThrow();
765 } catch (NullPointerException success) {}
766 }
767
768 /**
769 * transfer waits until a poll occurs. The transfered element
770 * is returned by the associated poll.
771 */
772 public void testTransfer2() throws InterruptedException {
773 final LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
774 final CountDownLatch threadStarted = new CountDownLatch(1);
775
776 Thread t = newStartedThread(new CheckedRunnable() {
777 public void realRun() throws InterruptedException {
778 threadStarted.countDown();
779 q.transfer(five);
780 checkEmpty(q);
781 }});
782
783 threadStarted.await();
784 Callable<Boolean> oneElement
785 = new Callable<Boolean>() { public Boolean call() {
786 return !q.isEmpty() && q.size() == 1; }};
787 waitForThreadToEnterWaitState(t, oneElement);
788
789 assertSame(five, q.poll());
790 checkEmpty(q);
791 awaitTermination(t);
792 }
793
794 /**
795 * transfer waits until a poll occurs, and then transfers in fifo order
796 */
797 public void testTransfer3() throws InterruptedException {
798 final LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
799
800 Thread first = newStartedThread(new CheckedRunnable() {
801 public void realRun() throws InterruptedException {
802 q.transfer(four);
803 assertFalse(q.contains(four));
804 assertEquals(1, q.size());
805 }});
806
807 Thread interruptedThread = newStartedThread(
808 new CheckedInterruptedRunnable() {
809 public void realRun() throws InterruptedException {
810 while (q.isEmpty())
811 Thread.yield();
812 q.transfer(five);
813 }});
814
815 while (q.size() < 2)
816 Thread.yield();
817 assertEquals(2, q.size());
818 assertSame(four, q.poll());
819 first.join();
820 assertEquals(1, q.size());
821 interruptedThread.interrupt();
822 interruptedThread.join();
823 checkEmpty(q);
824 }
825
826 /**
827 * transfer waits until a poll occurs, at which point the polling
828 * thread returns the element
829 */
830 public void testTransfer4() throws InterruptedException {
831 final LinkedTransferQueue q = new LinkedTransferQueue();
832
833 Thread t = newStartedThread(new CheckedRunnable() {
834 public void realRun() throws InterruptedException {
835 q.transfer(four);
836 assertFalse(q.contains(four));
837 assertSame(three, q.poll());
838 }});
839
840 while (q.isEmpty())
841 Thread.yield();
842 assertFalse(q.isEmpty());
843 assertEquals(1, q.size());
844 assertTrue(q.offer(three));
845 assertSame(four, q.poll());
846 awaitTermination(t);
847 }
848
849 /**
850 * transfer waits until a take occurs. The transfered element
851 * is returned by the associated take.
852 */
853 public void testTransfer5() throws InterruptedException {
854 final LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
855
856 Thread t = newStartedThread(new CheckedRunnable() {
857 public void realRun() throws InterruptedException {
858 q.transfer(four);
859 checkEmpty(q);
860 }});
861
862 while (q.isEmpty())
863 Thread.yield();
864 assertFalse(q.isEmpty());
865 assertEquals(1, q.size());
866 assertSame(four, q.take());
867 checkEmpty(q);
868 awaitTermination(t);
869 }
870
871 /**
872 * tryTransfer(null) throws NullPointerException
873 */
874 public void testTryTransfer1() {
875 final LinkedTransferQueue q = new LinkedTransferQueue();
876 try {
877 q.tryTransfer(null);
878 shouldThrow();
879 } catch (NullPointerException success) {}
880 }
881
882 /**
883 * tryTransfer returns false and does not enqueue if there are no
884 * consumers waiting to poll or take.
885 */
886 public void testTryTransfer2() throws InterruptedException {
887 final LinkedTransferQueue q = new LinkedTransferQueue();
888 assertFalse(q.tryTransfer(new Object()));
889 assertFalse(q.hasWaitingConsumer());
890 checkEmpty(q);
891 }
892
893 /**
894 * If there is a consumer waiting in timed poll, tryTransfer
895 * returns true while successfully transfering object.
896 */
897 public void testTryTransfer3() throws InterruptedException {
898 final Object hotPotato = new Object();
899 final LinkedTransferQueue q = new LinkedTransferQueue();
900
901 Thread t = newStartedThread(new CheckedRunnable() {
902 public void realRun() {
903 while (! q.hasWaitingConsumer())
904 Thread.yield();
905 assertTrue(q.hasWaitingConsumer());
906 checkEmpty(q);
907 assertTrue(q.tryTransfer(hotPotato));
908 }});
909
910 long startTime = System.nanoTime();
911 assertSame(hotPotato, q.poll(LONG_DELAY_MS, MILLISECONDS));
912 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
913 checkEmpty(q);
914 awaitTermination(t);
915 }
916
917 /**
918 * If there is a consumer waiting in take, tryTransfer returns
919 * true while successfully transfering object.
920 */
921 public void testTryTransfer4() throws InterruptedException {
922 final Object hotPotato = new Object();
923 final LinkedTransferQueue q = new LinkedTransferQueue();
924
925 Thread t = newStartedThread(new CheckedRunnable() {
926 public void realRun() {
927 while (! q.hasWaitingConsumer())
928 Thread.yield();
929 assertTrue(q.hasWaitingConsumer());
930 checkEmpty(q);
931 assertTrue(q.tryTransfer(hotPotato));
932 }});
933
934 assertSame(q.take(), hotPotato);
935 checkEmpty(q);
936 awaitTermination(t);
937 }
938
939 /**
940 * tryTransfer blocks interruptibly if no takers
941 */
942 public void testTryTransfer5() throws InterruptedException {
943 final LinkedTransferQueue q = new LinkedTransferQueue();
944 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
945 assertTrue(q.isEmpty());
946
947 Thread t = newStartedThread(new CheckedRunnable() {
948 public void realRun() throws InterruptedException {
949 long startTime = System.nanoTime();
950 Thread.currentThread().interrupt();
951 try {
952 q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
953 shouldThrow();
954 } catch (InterruptedException success) {}
955 assertFalse(Thread.interrupted());
956
957 pleaseInterrupt.countDown();
958 try {
959 q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
960 shouldThrow();
961 } catch (InterruptedException success) {}
962 assertFalse(Thread.interrupted());
963 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
964 }});
965
966 await(pleaseInterrupt);
967 assertThreadBlocks(t, Thread.State.TIMED_WAITING);
968 t.interrupt();
969 awaitTermination(t);
970 checkEmpty(q);
971 }
972
973 /**
974 * tryTransfer gives up after the timeout and returns false
975 */
976 public void testTryTransfer6() throws InterruptedException {
977 final LinkedTransferQueue q = new LinkedTransferQueue();
978
979 Thread t = newStartedThread(new CheckedRunnable() {
980 public void realRun() throws InterruptedException {
981 long startTime = System.nanoTime();
982 assertFalse(q.tryTransfer(new Object(),
983 timeoutMillis(), MILLISECONDS));
984 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
985 checkEmpty(q);
986 }});
987
988 awaitTermination(t);
989 checkEmpty(q);
990 }
991
992 /**
993 * tryTransfer waits for any elements previously in to be removed
994 * before transfering to a poll or take
995 */
996 public void testTryTransfer7() throws InterruptedException {
997 final LinkedTransferQueue q = new LinkedTransferQueue();
998 assertTrue(q.offer(four));
999
1000 Thread t = newStartedThread(new CheckedRunnable() {
1001 public void realRun() throws InterruptedException {
1002 long startTime = System.nanoTime();
1003 assertTrue(q.tryTransfer(five, LONG_DELAY_MS, MILLISECONDS));
1004 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1005 checkEmpty(q);
1006 }});
1007
1008 while (q.size() != 2)
1009 Thread.yield();
1010 assertEquals(2, q.size());
1011 assertSame(four, q.poll());
1012 assertSame(five, q.poll());
1013 checkEmpty(q);
1014 awaitTermination(t);
1015 }
1016
1017 /**
1018 * tryTransfer attempts to enqueue into the queue and fails
1019 * returning false not enqueueing and the successive poll is null
1020 */
1021 public void testTryTransfer8() throws InterruptedException {
1022 final LinkedTransferQueue q = new LinkedTransferQueue();
1023 assertTrue(q.offer(four));
1024 assertEquals(1, q.size());
1025 long startTime = System.nanoTime();
1026 assertFalse(q.tryTransfer(five, timeoutMillis(), MILLISECONDS));
1027 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
1028 assertEquals(1, q.size());
1029 assertSame(four, q.poll());
1030 assertNull(q.poll());
1031 checkEmpty(q);
1032 }
1033
1034 private LinkedTransferQueue<Integer> populatedQueue(int n) {
1035 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
1036 checkEmpty(q);
1037 for (int i = 0; i < n; i++) {
1038 assertEquals(i, q.size());
1039 assertTrue(q.offer(i));
1040 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
1041 }
1042 assertFalse(q.isEmpty());
1043 return q;
1044 }
1045
1046 /**
1047 * remove(null), contains(null) always return false
1048 */
1049 public void testNeverContainsNull() {
1050 Collection<?>[] qs = {
1051 new LinkedTransferQueue<Object>(),
1052 populatedQueue(2),
1053 };
1054
1055 for (Collection<?> q : qs) {
1056 assertFalse(q.contains(null));
1057 assertFalse(q.remove(null));
1058 }
1059 }
1060 }