ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java (file contents):
Revision 1.106 by jsr166, Wed Dec 28 04:52:39 2016 UTC vs.
Revision 1.107 by jsr166, Thu Dec 29 17:42:05 2016 UTC

# Line 17 | Line 17 | import java.util.concurrent.atomic.Atomi
17   import java.util.concurrent.locks.Condition;
18   import java.util.concurrent.locks.ReentrantLock;
19   import java.util.function.Consumer;
20 + import java.util.function.Predicate;
21  
22   /**
23   * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
# Line 985 | Line 986 | public class LinkedBlockingQueue<E> exte
986      }
987  
988      /**
989 +     * @throws NullPointerException {@inheritDoc}
990 +     */
991 +    public boolean removeIf(Predicate<? super E> filter) {
992 +        Objects.requireNonNull(filter);
993 +        return bulkRemove(filter);
994 +    }
995 +
996 +    /**
997 +     * @throws NullPointerException {@inheritDoc}
998 +     */
999 +    public boolean removeAll(Collection<?> c) {
1000 +        Objects.requireNonNull(c);
1001 +        return bulkRemove(e -> c.contains(e));
1002 +    }
1003 +
1004 +    /**
1005 +     * @throws NullPointerException {@inheritDoc}
1006 +     */
1007 +    public boolean retainAll(Collection<?> c) {
1008 +        Objects.requireNonNull(c);
1009 +        return bulkRemove(e -> !c.contains(e));
1010 +    }
1011 +
1012 +    /**
1013 +     * Returns the predecessor of live node p, given a node that was
1014 +     * once a live ancestor of p (or head); allows unlinking of p.
1015 +     */
1016 +    private Node<E> findPred(Node<E> p, Node<E> ancestor) {
1017 +        // assert p.item != null;
1018 +        if (ancestor.item == null)
1019 +            ancestor = head;
1020 +        // Fails with NPE if precondition not satisfied
1021 +        for (Node<E> q; (q = ancestor.next) != p; )
1022 +            ancestor = q;
1023 +        return ancestor;
1024 +    }
1025 +
1026 +    /** Implementation of bulk remove methods. */
1027 +    @SuppressWarnings("unchecked")
1028 +    private boolean bulkRemove(Predicate<? super E> filter) {
1029 +        boolean removed = false;
1030 +        Node<E> p = null, ancestor = head;
1031 +        Node<E>[] nodes = null;
1032 +        int n, len = 0;
1033 +        do {
1034 +            // 1. Extract batch of up to 64 elements while holding the lock.
1035 +            long deathRow = 0;          // "bitset" of size 64
1036 +            fullyLock();
1037 +            try {
1038 +                if (nodes == null) {
1039 +                    if (p == null) p = head.next;
1040 +                    for (Node<E> q = p; q != null; q = succ(q))
1041 +                        if (q.item != null && ++len == 64)
1042 +                            break;
1043 +                    nodes = (Node<E>[]) new Node<?>[len];
1044 +                }
1045 +                for (n = 0; p != null && n < len; p = succ(p))
1046 +                    nodes[n++] = p;
1047 +            } finally {
1048 +                fullyUnlock();
1049 +            }
1050 +
1051 +            // 2. Run the filter on the elements while lock is free.
1052 +            for (int i = 0; i < n; i++) {
1053 +                final E e;
1054 +                if ((e = nodes[i].item) != null && filter.test(e))
1055 +                    deathRow |= 1L << i;
1056 +            }
1057 +
1058 +            // 3. Remove any filtered elements while holding the lock.
1059 +            if (deathRow != 0) {
1060 +                fullyLock();
1061 +                try {
1062 +                    for (int i = 0; i < n; i++) {
1063 +                        final Node<E> q;
1064 +                        if ((deathRow & (1L << i)) != 0L
1065 +                            && (q = nodes[i]).item != null) {
1066 +                            ancestor = findPred(q, ancestor);
1067 +                            unlink(q, ancestor);
1068 +                            removed = true;
1069 +                        }
1070 +                    }
1071 +                } finally {
1072 +                    fullyUnlock();
1073 +                }
1074 +            }
1075 +        } while (n > 0 && p != null);
1076 +        return removed;
1077 +    }
1078 +
1079 +    /**
1080       * Saves this queue to a stream (that is, serializes it).
1081       *
1082       * @param s the stream

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines