10 |
|
|
11 |
|
import java.util.AbstractQueue; |
12 |
|
import java.util.Collection; |
13 |
+ |
import java.util.ConcurrentModificationException; |
14 |
|
import java.util.Iterator; |
15 |
|
import java.util.NoSuchElementException; |
16 |
+ |
import java.util.Queue; |
17 |
|
import java.util.concurrent.locks.LockSupport; |
18 |
|
import java.util.concurrent.atomic.AtomicReference; |
17 |
– |
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; |
19 |
|
|
20 |
|
/** |
21 |
|
* An unbounded {@linkplain TransferQueue} based on linked nodes. |
116 |
|
this.isData = isData; |
117 |
|
} |
118 |
|
|
119 |
< |
@SuppressWarnings("rawtypes") |
120 |
< |
static final AtomicReferenceFieldUpdater<Node, Node> |
121 |
< |
nextUpdater = AtomicReferenceFieldUpdater.newUpdater |
122 |
< |
(Node.class, Node.class, "next"); |
119 |
> |
// Unsafe mechanics |
120 |
> |
|
121 |
> |
private static final sun.misc.Unsafe UNSAFE = getUnsafe(); |
122 |
> |
private static final long nextOffset = |
123 |
> |
objectFieldOffset(UNSAFE, "next", Node.class); |
124 |
|
|
125 |
|
final boolean casNext(Node<E> cmp, Node<E> val) { |
126 |
< |
return nextUpdater.compareAndSet(this, cmp, val); |
126 |
> |
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); |
127 |
|
} |
128 |
|
|
129 |
|
final void clearNext() { |
130 |
< |
nextUpdater.lazySet(this, this); |
130 |
> |
UNSAFE.putOrderedObject(this, nextOffset, this); |
131 |
> |
} |
132 |
> |
|
133 |
> |
/** |
134 |
> |
* Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. |
135 |
> |
* Replace with a simple call to Unsafe.getUnsafe when integrating |
136 |
> |
* into a jdk. |
137 |
> |
* |
138 |
> |
* @return a sun.misc.Unsafe |
139 |
> |
*/ |
140 |
> |
private static sun.misc.Unsafe getUnsafe() { |
141 |
> |
try { |
142 |
> |
return sun.misc.Unsafe.getUnsafe(); |
143 |
> |
} catch (SecurityException se) { |
144 |
> |
try { |
145 |
> |
return java.security.AccessController.doPrivileged |
146 |
> |
(new java.security |
147 |
> |
.PrivilegedExceptionAction<sun.misc.Unsafe>() { |
148 |
> |
public sun.misc.Unsafe run() throws Exception { |
149 |
> |
java.lang.reflect.Field f = sun.misc |
150 |
> |
.Unsafe.class.getDeclaredField("theUnsafe"); |
151 |
> |
f.setAccessible(true); |
152 |
> |
return (sun.misc.Unsafe) f.get(null); |
153 |
> |
}}); |
154 |
> |
} catch (java.security.PrivilegedActionException e) { |
155 |
> |
throw new RuntimeException("Could not initialize intrinsics", |
156 |
> |
e.getCause()); |
157 |
> |
} |
158 |
> |
} |
159 |
|
} |
160 |
|
|
161 |
|
private static final long serialVersionUID = -3375979862319811754L; |
294 |
|
* @param e the comparison value for checking match |
295 |
|
* @param mode mode |
296 |
|
* @param nanos timeout value |
297 |
< |
* @return matched item, or s if cancelled |
297 |
> |
* @return matched item, or null if cancelled |
298 |
|
*/ |
299 |
|
private E awaitFulfill(Node<E> pred, Node<E> s, E e, |
300 |
|
int mode, long nanos) { |
360 |
|
for (;;) { |
361 |
|
Node<E> h = head.get(); |
362 |
|
Node<E> first = h.next; |
363 |
< |
if (first != null && first.next == first) { // help advance |
363 |
> |
if (first != null && first.get() == first) { // help advance |
364 |
|
advanceHead(h, first); |
365 |
|
continue; |
366 |
|
} |
472 |
|
addAll(c); |
473 |
|
} |
474 |
|
|
475 |
< |
public void put(E e) throws InterruptedException { |
476 |
< |
if (e == null) throw new NullPointerException(); |
477 |
< |
if (Thread.interrupted()) throw new InterruptedException(); |
478 |
< |
xfer(e, NOWAIT, 0); |
475 |
> |
/** |
476 |
> |
* Inserts the specified element at the tail of this queue. |
477 |
> |
* As the queue is unbounded, this method will never block. |
478 |
> |
* |
479 |
> |
* @throws NullPointerException if the specified element is null |
480 |
> |
*/ |
481 |
> |
public void put(E e) { |
482 |
> |
offer(e); |
483 |
|
} |
484 |
|
|
485 |
< |
public boolean offer(E e, long timeout, TimeUnit unit) |
486 |
< |
throws InterruptedException { |
487 |
< |
if (e == null) throw new NullPointerException(); |
488 |
< |
if (Thread.interrupted()) throw new InterruptedException(); |
489 |
< |
xfer(e, NOWAIT, 0); |
490 |
< |
return true; |
485 |
> |
/** |
486 |
> |
* Inserts the specified element at the tail of this queue. |
487 |
> |
* As the queue is unbounded, this method will never block or |
488 |
> |
* return {@code false}. |
489 |
> |
* |
490 |
> |
* @return {@code true} (as specified by |
491 |
> |
* {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer}) |
492 |
> |
* @throws NullPointerException if the specified element is null |
493 |
> |
*/ |
494 |
> |
public boolean offer(E e, long timeout, TimeUnit unit) { |
495 |
> |
return offer(e); |
496 |
|
} |
497 |
|
|
498 |
+ |
/** |
499 |
+ |
* Inserts the specified element at the tail of this queue. |
500 |
+ |
* As the queue is unbounded, this method will never return {@code false}. |
501 |
+ |
* |
502 |
+ |
* @return {@code true} (as specified by |
503 |
+ |
* {@link BlockingQueue#offer(Object) BlockingQueue.offer}) |
504 |
+ |
* @throws NullPointerException if the specified element is null |
505 |
+ |
*/ |
506 |
|
public boolean offer(E e) { |
507 |
|
if (e == null) throw new NullPointerException(); |
508 |
|
xfer(e, NOWAIT, 0); |
509 |
|
return true; |
510 |
|
} |
511 |
|
|
512 |
+ |
/** |
513 |
+ |
* Inserts the specified element at the tail of this queue. |
514 |
+ |
* As the queue is unbounded, this method will never throw |
515 |
+ |
* {@link IllegalStateException} or return {@code false}. |
516 |
+ |
* |
517 |
+ |
* @return {@code true} (as specified by {@link Collection#add}) |
518 |
+ |
* @throws NullPointerException if the specified element is null |
519 |
+ |
*/ |
520 |
|
public boolean add(E e) { |
521 |
+ |
return offer(e); |
522 |
+ |
} |
523 |
+ |
|
524 |
+ |
/** |
525 |
+ |
* Transfers the specified element immediately if there exists a |
526 |
+ |
* consumer already waiting to receive it (in {@link #take} or |
527 |
+ |
* timed {@link #poll(long,TimeUnit) poll}), otherwise |
528 |
+ |
* returning {@code false} without enqueuing the element. |
529 |
+ |
* |
530 |
+ |
* @throws NullPointerException if the specified element is null |
531 |
+ |
*/ |
532 |
+ |
public boolean tryTransfer(E e) { |
533 |
|
if (e == null) throw new NullPointerException(); |
534 |
< |
xfer(e, NOWAIT, 0); |
468 |
< |
return true; |
534 |
> |
return fulfill(e) != null; |
535 |
|
} |
536 |
|
|
537 |
+ |
/** |
538 |
+ |
* Inserts the specified element at the tail of this queue, |
539 |
+ |
* waiting if necessary for the element to be received by a |
540 |
+ |
* consumer invoking {@code take} or {@code poll}. |
541 |
+ |
* |
542 |
+ |
* @throws NullPointerException if the specified element is null |
543 |
+ |
*/ |
544 |
|
public void transfer(E e) throws InterruptedException { |
545 |
|
if (e == null) throw new NullPointerException(); |
546 |
|
if (xfer(e, WAIT, 0) == null) { |
549 |
|
} |
550 |
|
} |
551 |
|
|
552 |
+ |
/** |
553 |
+ |
* Inserts the specified element at the tail of this queue, |
554 |
+ |
* waiting up to the specified wait time if necessary for the |
555 |
+ |
* element to be received by a consumer invoking {@code take} or |
556 |
+ |
* {@code poll}. |
557 |
+ |
* |
558 |
+ |
* @throws NullPointerException if the specified element is null |
559 |
+ |
*/ |
560 |
|
public boolean tryTransfer(E e, long timeout, TimeUnit unit) |
561 |
|
throws InterruptedException { |
562 |
|
if (e == null) throw new NullPointerException(); |
567 |
|
throw new InterruptedException(); |
568 |
|
} |
569 |
|
|
489 |
– |
public boolean tryTransfer(E e) { |
490 |
– |
if (e == null) throw new NullPointerException(); |
491 |
– |
return fulfill(e) != null; |
492 |
– |
} |
493 |
– |
|
570 |
|
public E take() throws InterruptedException { |
571 |
< |
Object e = xfer(null, WAIT, 0); |
571 |
> |
E e = xfer(null, WAIT, 0); |
572 |
|
if (e != null) |
573 |
< |
return (E) e; |
573 |
> |
return e; |
574 |
|
Thread.interrupted(); |
575 |
|
throw new InterruptedException(); |
576 |
|
} |
577 |
|
|
578 |
|
public E poll(long timeout, TimeUnit unit) throws InterruptedException { |
579 |
< |
Object e = xfer(null, TIMEOUT, unit.toNanos(timeout)); |
579 |
> |
E e = xfer(null, TIMEOUT, unit.toNanos(timeout)); |
580 |
|
if (e != null || !Thread.interrupted()) |
581 |
< |
return (E) e; |
581 |
> |
return e; |
582 |
|
throw new InterruptedException(); |
583 |
|
} |
584 |
|
|
586 |
|
return fulfill(null); |
587 |
|
} |
588 |
|
|
589 |
+ |
/** |
590 |
+ |
* @throws NullPointerException {@inheritDoc} |
591 |
+ |
* @throws IllegalArgumentException {@inheritDoc} |
592 |
+ |
*/ |
593 |
|
public int drainTo(Collection<? super E> c) { |
594 |
|
if (c == null) |
595 |
|
throw new NullPointerException(); |
604 |
|
return n; |
605 |
|
} |
606 |
|
|
607 |
+ |
/** |
608 |
+ |
* @throws NullPointerException {@inheritDoc} |
609 |
+ |
* @throws IllegalArgumentException {@inheritDoc} |
610 |
+ |
*/ |
611 |
|
public int drainTo(Collection<? super E> c, int maxElements) { |
612 |
|
if (c == null) |
613 |
|
throw new NullPointerException(); |
652 |
|
} |
653 |
|
} |
654 |
|
|
655 |
< |
|
655 |
> |
/** |
656 |
> |
* Returns an iterator over the elements in this queue in proper |
657 |
> |
* sequence, from head to tail. |
658 |
> |
* |
659 |
> |
* <p>The returned iterator is a "weakly consistent" iterator that |
660 |
> |
* will never throw |
661 |
> |
* {@link ConcurrentModificationException ConcurrentModificationException}, |
662 |
> |
* and guarantees to traverse elements as they existed upon |
663 |
> |
* construction of the iterator, and may (but is not guaranteed |
664 |
> |
* to) reflect any modifications subsequent to construction. |
665 |
> |
* |
666 |
> |
* @return an iterator over the elements in this queue in proper sequence |
667 |
> |
*/ |
668 |
|
public Iterator<E> iterator() { |
669 |
|
return new Itr(); |
670 |
|
} |
679 |
|
class Itr implements Iterator<E> { |
680 |
|
Node<E> next; // node to return next |
681 |
|
Node<E> pnext; // predecessor of next |
586 |
– |
Node<E> snext; // successor of next |
682 |
|
Node<E> curr; // last returned node, for remove() |
683 |
|
Node<E> pcurr; // predecessor of curr, for remove() |
684 |
< |
E nextItem; // Cache of next item, once committed to in next |
684 |
> |
E nextItem; // Cache of next item, once committed to in next |
685 |
|
|
686 |
|
Itr() { |
687 |
< |
findNext(); |
687 |
> |
advance(); |
688 |
|
} |
689 |
|
|
690 |
|
/** |
691 |
< |
* Ensures next points to next valid node, or null if none. |
691 |
> |
* Moves to next valid node and returns item to return for |
692 |
> |
* next(), or null if no such. |
693 |
|
*/ |
694 |
< |
void findNext() { |
694 |
> |
private E advance() { |
695 |
> |
pcurr = pnext; |
696 |
> |
curr = next; |
697 |
> |
E item = nextItem; |
698 |
> |
|
699 |
|
for (;;) { |
700 |
< |
Node<E> pred = pnext; |
701 |
< |
Node<E> q = next; |
702 |
< |
if (pred == null || pred == q) { |
603 |
< |
pred = traversalHead(); |
604 |
< |
q = pred.next; |
605 |
< |
} |
606 |
< |
if (q == null || !q.isData) { |
700 |
> |
pnext = (next == null) ? traversalHead() : next; |
701 |
> |
next = pnext.next; |
702 |
> |
if (next == pnext) { |
703 |
|
next = null; |
704 |
< |
return; |
704 |
> |
continue; // restart |
705 |
|
} |
706 |
< |
Object x = q.get(); |
707 |
< |
Node<E> s = q.next; |
708 |
< |
if (x != null && q != x && q != s) { |
706 |
> |
if (next == null) |
707 |
> |
break; |
708 |
> |
Object x = next.get(); |
709 |
> |
if (x != null && x != next) { |
710 |
|
nextItem = (E) x; |
711 |
< |
snext = s; |
615 |
< |
pnext = pred; |
616 |
< |
next = q; |
617 |
< |
return; |
711 |
> |
break; |
712 |
|
} |
619 |
– |
pnext = q; |
620 |
– |
next = s; |
713 |
|
} |
714 |
+ |
return item; |
715 |
|
} |
716 |
|
|
717 |
|
public boolean hasNext() { |
719 |
|
} |
720 |
|
|
721 |
|
public E next() { |
722 |
< |
if (next == null) throw new NoSuchElementException(); |
723 |
< |
pcurr = pnext; |
724 |
< |
curr = next; |
632 |
< |
pnext = next; |
633 |
< |
next = snext; |
634 |
< |
E x = nextItem; |
635 |
< |
findNext(); |
636 |
< |
return x; |
722 |
> |
if (next == null) |
723 |
> |
throw new NoSuchElementException(); |
724 |
> |
return advance(); |
725 |
|
} |
726 |
|
|
727 |
|
public void remove() { |
791 |
|
* @return the number of elements in this queue |
792 |
|
*/ |
793 |
|
public int size() { |
794 |
< |
int count = 0; |
795 |
< |
Node<E> h = traversalHead(); |
796 |
< |
for (Node<E> p = h.next; p != null && p.isData; p = p.next) { |
797 |
< |
Object x = p.get(); |
798 |
< |
if (x != null && x != p) { |
799 |
< |
if (++count == Integer.MAX_VALUE) // saturated |
794 |
> |
for (;;) { |
795 |
> |
int count = 0; |
796 |
> |
Node<E> pred = traversalHead(); |
797 |
> |
for (;;) { |
798 |
> |
Node<E> q = pred.next; |
799 |
> |
if (q == pred) // restart |
800 |
|
break; |
801 |
+ |
if (q == null || !q.isData) |
802 |
+ |
return count; |
803 |
+ |
Object x = q.get(); |
804 |
+ |
if (x != null && x != q) { |
805 |
+ |
if (++count == Integer.MAX_VALUE) // saturated |
806 |
+ |
return count; |
807 |
+ |
} |
808 |
+ |
pred = q; |
809 |
|
} |
810 |
|
} |
715 |
– |
return count; |
811 |
|
} |
812 |
|
|
813 |
|
public int getWaitingConsumerCount() { |
814 |
< |
int count = 0; |
815 |
< |
Node<E> h = traversalHead(); |
816 |
< |
for (Node<E> p = h.next; p != null && !p.isData; p = p.next) { |
817 |
< |
if (p.get() == null) { |
818 |
< |
if (++count == Integer.MAX_VALUE) |
814 |
> |
// converse of size -- count valid non-data nodes |
815 |
> |
for (;;) { |
816 |
> |
int count = 0; |
817 |
> |
Node<E> pred = traversalHead(); |
818 |
> |
for (;;) { |
819 |
> |
Node<E> q = pred.next; |
820 |
> |
if (q == pred) // restart |
821 |
|
break; |
822 |
+ |
if (q == null || q.isData) |
823 |
+ |
return count; |
824 |
+ |
Object x = q.get(); |
825 |
+ |
if (x == null) { |
826 |
+ |
if (++count == Integer.MAX_VALUE) // saturated |
827 |
+ |
return count; |
828 |
+ |
} |
829 |
+ |
pred = q; |
830 |
|
} |
831 |
|
} |
727 |
– |
return count; |
728 |
– |
} |
729 |
– |
|
730 |
– |
public int remainingCapacity() { |
731 |
– |
return Integer.MAX_VALUE; |
832 |
|
} |
833 |
|
|
834 |
|
public boolean remove(Object o) { |
838 |
|
Node<E> pred = traversalHead(); |
839 |
|
for (;;) { |
840 |
|
Node<E> q = pred.next; |
741 |
– |
if (q == null || !q.isData) |
742 |
– |
return false; |
841 |
|
if (q == pred) // restart |
842 |
|
break; |
843 |
+ |
if (q == null || !q.isData) |
844 |
+ |
return false; |
845 |
|
Object x = q.get(); |
846 |
|
if (x != null && x != q && o.equals(x) && |
847 |
|
q.compareAndSet(x, q)) { |
854 |
|
} |
855 |
|
|
856 |
|
/** |
857 |
+ |
* Always returns {@code Integer.MAX_VALUE} because a |
858 |
+ |
* {@code LinkedTransferQueue} is not capacity constrained. |
859 |
+ |
* |
860 |
+ |
* @return {@code Integer.MAX_VALUE} (as specified by |
861 |
+ |
* {@link BlockingQueue#remainingCapacity()}) |
862 |
+ |
*/ |
863 |
+ |
public int remainingCapacity() { |
864 |
+ |
return Integer.MAX_VALUE; |
865 |
+ |
} |
866 |
+ |
|
867 |
+ |
/** |
868 |
|
* Save the state to a stream (that is, serialize it). |
869 |
|
* |
870 |
|
* @serialData All of the elements (each an {@code E}) in |
914 |
|
|
915 |
|
private static final sun.misc.Unsafe UNSAFE = getUnsafe(); |
916 |
|
private static final long headOffset = |
917 |
< |
objectFieldOffset("head", LinkedTransferQueue.class); |
917 |
> |
objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class); |
918 |
|
private static final long tailOffset = |
919 |
< |
objectFieldOffset("tail", LinkedTransferQueue.class); |
919 |
> |
objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class); |
920 |
|
private static final long cleanMeOffset = |
921 |
< |
objectFieldOffset("cleanMe", LinkedTransferQueue.class); |
921 |
> |
objectFieldOffset(UNSAFE, "cleanMe", LinkedTransferQueue.class); |
922 |
> |
|
923 |
|
|
924 |
< |
private static long objectFieldOffset(String field, Class<?> klazz) { |
924 |
> |
static long objectFieldOffset(sun.misc.Unsafe UNSAFE, |
925 |
> |
String field, Class<?> klazz) { |
926 |
|
try { |
927 |
|
return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field)); |
928 |
|
} catch (NoSuchFieldException e) { |