10 |
|
|
11 |
|
import java.util.AbstractQueue; |
12 |
|
import java.util.Collection; |
13 |
+ |
import java.util.ConcurrentModificationException; |
14 |
|
import java.util.Iterator; |
15 |
|
import java.util.NoSuchElementException; |
16 |
+ |
import java.util.Queue; |
17 |
|
import java.util.concurrent.locks.LockSupport; |
18 |
|
import java.util.concurrent.atomic.AtomicReference; |
19 |
|
|
20 |
|
/** |
21 |
< |
* An unbounded {@linkplain TransferQueue} based on linked nodes. |
21 |
> |
* An unbounded {@link TransferQueue} based on linked nodes. |
22 |
|
* This queue orders elements FIFO (first-in-first-out) with respect |
23 |
|
* to any given producer. The <em>head</em> of the queue is that |
24 |
|
* element that has been on the queue the longest time for some |
219 |
|
Node<E> t = tail.get(); |
220 |
|
Node<E> h = head.get(); |
221 |
|
|
222 |
< |
if (t != null && (t == h || t.isData == isData)) { |
222 |
> |
if (t == h || t.isData == isData) { |
223 |
|
if (s == null) |
224 |
|
s = new Node<E>(e, isData); |
225 |
|
Node<E> last = t.next; |
231 |
|
tail.compareAndSet(t, s); |
232 |
|
return awaitFulfill(t, s, e, mode, nanos); |
233 |
|
} |
234 |
< |
} |
233 |
< |
|
234 |
< |
else if (h != null) { |
234 |
> |
} else { |
235 |
|
Node<E> first = h.next; |
236 |
|
if (t == tail.get() && first != null && |
237 |
|
advanceHead(h, first)) { |
259 |
|
Node<E> t = tail.get(); |
260 |
|
Node<E> h = head.get(); |
261 |
|
|
262 |
< |
if (t != null && (t == h || t.isData == isData)) { |
262 |
> |
if (t == h || t.isData == isData) { |
263 |
|
Node<E> last = t.next; |
264 |
|
if (t == tail.get()) { |
265 |
|
if (last != null) |
267 |
|
else |
268 |
|
return null; |
269 |
|
} |
270 |
< |
} |
271 |
< |
else if (h != null) { |
270 |
> |
} else { |
271 |
|
Node<E> first = h.next; |
272 |
|
if (t == tail.get() && |
273 |
|
first != null && |
291 |
|
* @param e the comparison value for checking match |
292 |
|
* @param mode mode |
293 |
|
* @param nanos timeout value |
294 |
< |
* @return matched item, or s if cancelled |
294 |
> |
* @return matched item, or null if cancelled |
295 |
|
*/ |
296 |
|
private E awaitFulfill(Node<E> pred, Node<E> s, E e, |
297 |
|
int mode, long nanos) { |
329 |
|
} |
330 |
|
if (spins < 0) { |
331 |
|
Node<E> h = head.get(); // only spin if at head |
332 |
< |
spins = ((h != null && h.next == s) ? |
333 |
< |
((mode == TIMEOUT) ? |
334 |
< |
maxTimedSpins : maxUntimedSpins) : 0); |
332 |
> |
spins = ((h.next != s) ? 0 : |
333 |
> |
(mode == TIMEOUT) ? maxTimedSpins : |
334 |
> |
maxUntimedSpins); |
335 |
|
} |
336 |
|
if (spins > 0) |
337 |
|
--spins; |
357 |
|
for (;;) { |
358 |
|
Node<E> h = head.get(); |
359 |
|
Node<E> first = h.next; |
360 |
< |
if (first != null && first.next == first) { // help advance |
360 |
> |
if (first != null && first.get() == first) { // help advance |
361 |
|
advanceHead(h, first); |
362 |
|
continue; |
363 |
|
} |
470 |
|
} |
471 |
|
|
472 |
|
/** |
473 |
< |
* @throws InterruptedException {@inheritDoc} |
474 |
< |
* @throws NullPointerException {@inheritDoc} |
473 |
> |
* Inserts the specified element at the tail of this queue. |
474 |
> |
* As the queue is unbounded, this method will never block. |
475 |
> |
* |
476 |
> |
* @throws NullPointerException if the specified element is null |
477 |
|
*/ |
478 |
< |
public void put(E e) throws InterruptedException { |
479 |
< |
if (e == null) throw new NullPointerException(); |
479 |
< |
if (Thread.interrupted()) throw new InterruptedException(); |
480 |
< |
xfer(e, NOWAIT, 0); |
478 |
> |
public void put(E e) { |
479 |
> |
offer(e); |
480 |
|
} |
481 |
|
|
482 |
|
/** |
483 |
< |
* @throws InterruptedException {@inheritDoc} |
484 |
< |
* @throws NullPointerException {@inheritDoc} |
483 |
> |
* Inserts the specified element at the tail of this queue. |
484 |
> |
* As the queue is unbounded, this method will never block or |
485 |
> |
* return {@code false}. |
486 |
> |
* |
487 |
> |
* @return {@code true} (as specified by |
488 |
> |
* {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer}) |
489 |
> |
* @throws NullPointerException if the specified element is null |
490 |
|
*/ |
491 |
< |
public boolean offer(E e, long timeout, TimeUnit unit) |
492 |
< |
throws InterruptedException { |
489 |
< |
if (e == null) throw new NullPointerException(); |
490 |
< |
if (Thread.interrupted()) throw new InterruptedException(); |
491 |
< |
xfer(e, NOWAIT, 0); |
492 |
< |
return true; |
491 |
> |
public boolean offer(E e, long timeout, TimeUnit unit) { |
492 |
> |
return offer(e); |
493 |
|
} |
494 |
|
|
495 |
|
/** |
496 |
< |
* @throws NullPointerException {@inheritDoc} |
496 |
> |
* Inserts the specified element at the tail of this queue. |
497 |
> |
* As the queue is unbounded, this method will never return {@code false}. |
498 |
> |
* |
499 |
> |
* @return {@code true} (as specified by |
500 |
> |
* {@link BlockingQueue#offer(Object) BlockingQueue.offer}) |
501 |
> |
* @throws NullPointerException if the specified element is null |
502 |
|
*/ |
503 |
|
public boolean offer(E e) { |
504 |
|
if (e == null) throw new NullPointerException(); |
507 |
|
} |
508 |
|
|
509 |
|
/** |
510 |
< |
* @throws NullPointerException {@inheritDoc} |
510 |
> |
* Inserts the specified element at the tail of this queue. |
511 |
> |
* As the queue is unbounded, this method will never throw |
512 |
> |
* {@link IllegalStateException} or return {@code false}. |
513 |
> |
* |
514 |
> |
* @return {@code true} (as specified by {@link Collection#add}) |
515 |
> |
* @throws NullPointerException if the specified element is null |
516 |
|
*/ |
517 |
|
public boolean add(E e) { |
518 |
+ |
return offer(e); |
519 |
+ |
} |
520 |
+ |
|
521 |
+ |
/** |
522 |
+ |
* Transfers the element to a waiting consumer immediately, if possible. |
523 |
+ |
* |
524 |
+ |
* <p>More precisely, transfers the specified element immediately |
525 |
+ |
* if there exists a consumer already waiting to receive it (in |
526 |
+ |
* {@link #take} or timed {@link #poll(long,TimeUnit) poll}), |
527 |
+ |
* otherwise returning {@code false} without enqueuing the element. |
528 |
+ |
* |
529 |
+ |
* @throws NullPointerException if the specified element is null |
530 |
+ |
*/ |
531 |
+ |
public boolean tryTransfer(E e) { |
532 |
|
if (e == null) throw new NullPointerException(); |
533 |
< |
xfer(e, NOWAIT, 0); |
510 |
< |
return true; |
533 |
> |
return fulfill(e) != null; |
534 |
|
} |
535 |
|
|
536 |
|
/** |
537 |
< |
* @throws InterruptedException {@inheritDoc} |
538 |
< |
* @throws NullPointerException {@inheritDoc} |
537 |
> |
* Transfers the element to a consumer, waiting if necessary to do so. |
538 |
> |
* |
539 |
> |
* <p>More precisely, transfers the specified element immediately |
540 |
> |
* if there exists a consumer already waiting to receive it (in |
541 |
> |
* {@link #take} or timed {@link #poll(long,TimeUnit) poll}), |
542 |
> |
* else inserts the specified element at the tail of this queue |
543 |
> |
* and waits until the element is received by a consumer. |
544 |
> |
* |
545 |
> |
* @throws NullPointerException if the specified element is null |
546 |
|
*/ |
547 |
|
public void transfer(E e) throws InterruptedException { |
548 |
|
if (e == null) throw new NullPointerException(); |
553 |
|
} |
554 |
|
|
555 |
|
/** |
556 |
< |
* @throws InterruptedException {@inheritDoc} |
557 |
< |
* @throws NullPointerException {@inheritDoc} |
556 |
> |
* Transfers the element to a consumer if it is possible to do so |
557 |
> |
* before the timeout elapses. |
558 |
> |
* |
559 |
> |
* <p>More precisely, transfers the specified element immediately |
560 |
> |
* if there exists a consumer already waiting to receive it (in |
561 |
> |
* {@link #take} or timed {@link #poll(long,TimeUnit) poll}), |
562 |
> |
* else inserts the specified element at the tail of this queue |
563 |
> |
* and waits until the element is received by a consumer, |
564 |
> |
* returning {@code false} if the specified wait time elapses |
565 |
> |
* before the element can be transferred. |
566 |
> |
* |
567 |
> |
* @throws NullPointerException if the specified element is null |
568 |
|
*/ |
569 |
|
public boolean tryTransfer(E e, long timeout, TimeUnit unit) |
570 |
|
throws InterruptedException { |
576 |
|
throw new InterruptedException(); |
577 |
|
} |
578 |
|
|
539 |
– |
/** |
540 |
– |
* @throws NullPointerException {@inheritDoc} |
541 |
– |
*/ |
542 |
– |
public boolean tryTransfer(E e) { |
543 |
– |
if (e == null) throw new NullPointerException(); |
544 |
– |
return fulfill(e) != null; |
545 |
– |
} |
546 |
– |
|
547 |
– |
/** |
548 |
– |
* @throws InterruptedException {@inheritDoc} |
549 |
– |
*/ |
579 |
|
public E take() throws InterruptedException { |
580 |
|
E e = xfer(null, WAIT, 0); |
581 |
|
if (e != null) |
584 |
|
throw new InterruptedException(); |
585 |
|
} |
586 |
|
|
558 |
– |
/** |
559 |
– |
* @throws InterruptedException {@inheritDoc} |
560 |
– |
*/ |
587 |
|
public E poll(long timeout, TimeUnit unit) throws InterruptedException { |
588 |
|
E e = xfer(null, TIMEOUT, unit.toNanos(timeout)); |
589 |
|
if (e != null || !Thread.interrupted()) |
640 |
|
for (;;) { |
641 |
|
Node<E> t = tail.get(); |
642 |
|
Node<E> h = head.get(); |
643 |
< |
if (h != null && t != null) { |
644 |
< |
Node<E> last = t.next; |
645 |
< |
Node<E> first = h.next; |
646 |
< |
if (t == tail.get()) { |
647 |
< |
if (last != null) |
648 |
< |
tail.compareAndSet(t, last); |
649 |
< |
else if (first != null) { |
650 |
< |
Object x = first.get(); |
651 |
< |
if (x == first) |
626 |
< |
advanceHead(h, first); |
627 |
< |
else |
628 |
< |
return h; |
629 |
< |
} |
643 |
> |
Node<E> last = t.next; |
644 |
> |
Node<E> first = h.next; |
645 |
> |
if (t == tail.get()) { |
646 |
> |
if (last != null) |
647 |
> |
tail.compareAndSet(t, last); |
648 |
> |
else if (first != null) { |
649 |
> |
Object x = first.get(); |
650 |
> |
if (x == first) |
651 |
> |
advanceHead(h, first); |
652 |
|
else |
653 |
|
return h; |
654 |
|
} |
655 |
+ |
else |
656 |
+ |
return h; |
657 |
|
} |
658 |
|
reclean(); |
659 |
|
} |
660 |
|
} |
661 |
|
|
662 |
+ |
/** |
663 |
+ |
* Returns an iterator over the elements in this queue in proper |
664 |
+ |
* sequence, from head to tail. |
665 |
+ |
* |
666 |
+ |
* <p>The returned iterator is a "weakly consistent" iterator that |
667 |
+ |
* will never throw |
668 |
+ |
* {@link ConcurrentModificationException ConcurrentModificationException}, |
669 |
+ |
* and guarantees to traverse elements as they existed upon |
670 |
+ |
* construction of the iterator, and may (but is not guaranteed |
671 |
+ |
* to) reflect any modifications subsequent to construction. |
672 |
+ |
* |
673 |
+ |
* @return an iterator over the elements in this queue in proper sequence |
674 |
+ |
*/ |
675 |
|
public Iterator<E> iterator() { |
676 |
|
return new Itr(); |
677 |
|
} |
726 |
|
} |
727 |
|
|
728 |
|
public E next() { |
729 |
< |
if (next == null) |
729 |
> |
if (next == null) |
730 |
|
throw new NoSuchElementException(); |
731 |
|
return advance(); |
732 |
|
} |
757 |
|
} |
758 |
|
} |
759 |
|
|
760 |
+ |
/** |
761 |
+ |
* Returns {@code true} if this queue contains no elements. |
762 |
+ |
* |
763 |
+ |
* @return {@code true} if this queue contains no elements |
764 |
+ |
*/ |
765 |
|
public boolean isEmpty() { |
766 |
|
for (;;) { |
767 |
|
Node<E> h = traversalHead(); |
843 |
|
} |
844 |
|
} |
845 |
|
|
846 |
+ |
/** |
847 |
+ |
* Removes a single instance of the specified element from this queue, |
848 |
+ |
* if it is present. More formally, removes an element {@code e} such |
849 |
+ |
* that {@code o.equals(e)}, if this queue contains one or more such |
850 |
+ |
* elements. |
851 |
+ |
* Returns {@code true} if this queue contained the specified element |
852 |
+ |
* (or equivalently, if this queue changed as a result of the call). |
853 |
+ |
* |
854 |
+ |
* @param o element to be removed from this queue, if present |
855 |
+ |
* @return {@code true} if this queue changed as a result of the call |
856 |
+ |
*/ |
857 |
|
public boolean remove(Object o) { |
858 |
|
if (o == null) |
859 |
|
return false; |
876 |
|
} |
877 |
|
} |
878 |
|
|
879 |
+ |
/** |
880 |
+ |
* Always returns {@code Integer.MAX_VALUE} because a |
881 |
+ |
* {@code LinkedTransferQueue} is not capacity constrained. |
882 |
+ |
* |
883 |
+ |
* @return {@code Integer.MAX_VALUE} (as specified by |
884 |
+ |
* {@link BlockingQueue#remainingCapacity()}) |
885 |
+ |
*/ |
886 |
|
public int remainingCapacity() { |
887 |
|
return Integer.MAX_VALUE; |
888 |
|
} |