ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.143
Committed: Thu Jun 11 17:04:16 2020 UTC (3 years, 11 months ago) by jsr166
Branch: MAIN
Changes since 1.142: +4 -18 lines
Log Message:
8230744: Several classes throw OutOfMemoryError without message

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.33 * Expert Group and released to the public domain, as explained at
4 jsr166 1.71 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 tim 1.13
9 dl 1.115 import java.lang.invoke.MethodHandles;
10     import java.lang.invoke.VarHandle;
11 dl 1.86 import java.util.AbstractQueue;
12     import java.util.Arrays;
13     import java.util.Collection;
14     import java.util.Comparator;
15     import java.util.Iterator;
16     import java.util.NoSuchElementException;
17 jsr166 1.124 import java.util.Objects;
18 dl 1.86 import java.util.PriorityQueue;
19     import java.util.Queue;
20     import java.util.SortedSet;
21     import java.util.Spliterator;
22 jsr166 1.105 import java.util.concurrent.locks.Condition;
23     import java.util.concurrent.locks.ReentrantLock;
24     import java.util.function.Consumer;
25 jsr166 1.139 import java.util.function.Predicate;
26 jsr166 1.141 // OPENJDK import jdk.internal.access.SharedSecrets;
27 jsr166 1.143 import jdk.internal.util.ArraysSupport;
28 tim 1.1
29     /**
30 dl 1.25 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
31     * the same ordering rules as class {@link PriorityQueue} and supplies
32     * blocking retrieval operations. While this queue is logically
33 dl 1.24 * unbounded, attempted additions may fail due to resource exhaustion
34 jsr166 1.63 * (causing {@code OutOfMemoryError}). This class does not permit
35     * {@code null} elements. A priority queue relying on {@linkplain
36 jsr166 1.42 * Comparable natural ordering} also does not permit insertion of
37     * non-comparable objects (doing so results in
38 jsr166 1.63 * {@code ClassCastException}).
39 dl 1.20 *
40 jsr166 1.126 * <p>This class and its iterator implement all of the <em>optional</em>
41     * methods of the {@link Collection} and {@link Iterator} interfaces.
42     * The Iterator provided in method {@link #iterator()} and the
43     * Spliterator provided in method {@link #spliterator()} are <em>not</em>
44     * guaranteed to traverse the elements of the PriorityBlockingQueue in
45     * any particular order. If you need ordered traversal, consider using
46     * {@code Arrays.sort(pq.toArray())}. Also, method {@code drainTo} can
47     * be used to <em>remove</em> some or all elements in priority order and
48     * place them in another collection.
49 dl 1.41 *
50     * <p>Operations on this class make no guarantees about the ordering
51     * of elements with equal priority. If you need to enforce an
52     * ordering, you can define custom classes or comparators that use a
53     * secondary key to break ties in primary priority values. For
54     * example, here is a class that applies first-in-first-out
55     * tie-breaking to comparable elements. To use it, you would insert a
56 jsr166 1.63 * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
57 dl 1.41 *
58 jsr166 1.109 * <pre> {@code
59 jsr166 1.56 * class FIFOEntry<E extends Comparable<? super E>>
60     * implements Comparable<FIFOEntry<E>> {
61 jsr166 1.58 * static final AtomicLong seq = new AtomicLong(0);
62 dl 1.41 * final long seqNum;
63     * final E entry;
64     * public FIFOEntry(E entry) {
65     * seqNum = seq.getAndIncrement();
66     * this.entry = entry;
67     * }
68     * public E getEntry() { return entry; }
69 jsr166 1.56 * public int compareTo(FIFOEntry<E> other) {
70 dl 1.41 * int res = entry.compareTo(other.entry);
71 jsr166 1.56 * if (res == 0 && other.entry != this.entry)
72     * res = (seqNum < other.seqNum ? -1 : 1);
73 dl 1.41 * return res;
74     * }
75 jsr166 1.56 * }}</pre>
76 dl 1.20 *
77 dl 1.35 * <p>This class is a member of the
78 jsr166 1.140 * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
79 dl 1.35 * Java Collections Framework</a>.
80     *
81 dl 1.6 * @since 1.5
82     * @author Doug Lea
83 jsr166 1.104 * @param <E> the type of elements held in this queue
84 dl 1.28 */
85 jsr166 1.82 @SuppressWarnings("unchecked")
86 dl 1.5 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
87 dl 1.15 implements BlockingQueue<E>, java.io.Serializable {
88 dl 1.21 private static final long serialVersionUID = 5595510919245408276L;
89 tim 1.1
90 dl 1.59 /*
91 dl 1.66 * The implementation uses an array-based binary heap, with public
92     * operations protected with a single lock. However, allocation
93     * during resizing uses a simple spinlock (used only while not
94     * holding main lock) in order to allow takes to operate
95     * concurrently with allocation. This avoids repeated
96     * postponement of waiting consumers and consequent element
97     * build-up. The need to back away from lock during allocation
98     * makes it impossible to simply wrap delegated
99     * java.util.PriorityQueue operations within a lock, as was done
100     * in a previous version of this class. To maintain
101     * interoperability, a plain PriorityQueue is still used during
102 jsr166 1.77 * serialization, which maintains compatibility at the expense of
103 dl 1.66 * transiently doubling overhead.
104 dl 1.59 */
105    
106     /**
107     * Default array capacity.
108     */
109     private static final int DEFAULT_INITIAL_CAPACITY = 11;
110    
111     /**
112     * Priority queue represented as a balanced binary heap: the two
113     * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
114     * priority queue is ordered by comparator, or by the elements'
115     * natural ordering, if comparator is null: For each node n in the
116     * heap and each descendant d of n, n <= d. The element with the
117     * lowest value is in queue[0], assuming the queue is nonempty.
118     */
119     private transient Object[] queue;
120    
121     /**
122     * The number of elements in the priority queue.
123     */
124 dl 1.66 private transient int size;
125 dl 1.59
126     /**
127     * The comparator, or null if priority queue uses elements'
128     * natural ordering.
129     */
130     private transient Comparator<? super E> comparator;
131    
132     /**
133 jsr166 1.112 * Lock used for all public operations.
134 dl 1.59 */
135 jsr166 1.135 private final ReentrantLock lock = new ReentrantLock();
136 dl 1.59
137     /**
138 jsr166 1.112 * Condition for blocking when empty.
139 dl 1.59 */
140 jsr166 1.142 @SuppressWarnings("serial") // Classes implementing Condition may be serializable.
141 jsr166 1.135 private final Condition notEmpty = lock.newCondition();
142 dl 1.5
143 dl 1.2 /**
144 dl 1.59 * Spinlock for allocation, acquired via CAS.
145     */
146     private transient volatile int allocationSpinLock;
147    
148     /**
149 dl 1.66 * A plain PriorityQueue used only for serialization,
150     * to maintain compatibility with previous versions
151     * of this class. Non-null only during serialization/deserialization.
152     */
153 jsr166 1.72 private PriorityQueue<E> q;
154 dl 1.66
155     /**
156 jsr166 1.63 * Creates a {@code PriorityBlockingQueue} with the default
157 jsr166 1.42 * initial capacity (11) that orders its elements according to
158     * their {@linkplain Comparable natural ordering}.
159 dl 1.2 */
160     public PriorityBlockingQueue() {
161 dl 1.59 this(DEFAULT_INITIAL_CAPACITY, null);
162 dl 1.2 }
163    
164     /**
165 jsr166 1.63 * Creates a {@code PriorityBlockingQueue} with the specified
166 jsr166 1.42 * initial capacity that orders its elements according to their
167     * {@linkplain Comparable natural ordering}.
168 dl 1.2 *
169 jsr166 1.42 * @param initialCapacity the initial capacity for this priority queue
170 jsr166 1.63 * @throws IllegalArgumentException if {@code initialCapacity} is less
171 jsr166 1.52 * than 1
172 dl 1.2 */
173     public PriorityBlockingQueue(int initialCapacity) {
174 dl 1.59 this(initialCapacity, null);
175 dl 1.2 }
176    
177     /**
178 jsr166 1.63 * Creates a {@code PriorityBlockingQueue} with the specified initial
179 jsr166 1.39 * capacity that orders its elements according to the specified
180     * comparator.
181 dl 1.2 *
182 jsr166 1.42 * @param initialCapacity the initial capacity for this priority queue
183 jsr166 1.52 * @param comparator the comparator that will be used to order this
184     * priority queue. If {@code null}, the {@linkplain Comparable
185     * natural ordering} of the elements will be used.
186 jsr166 1.63 * @throws IllegalArgumentException if {@code initialCapacity} is less
187 jsr166 1.52 * than 1
188 dl 1.2 */
189 tim 1.13 public PriorityBlockingQueue(int initialCapacity,
190 dholmes 1.14 Comparator<? super E> comparator) {
191 dl 1.59 if (initialCapacity < 1)
192     throw new IllegalArgumentException();
193 dl 1.66 this.comparator = comparator;
194 jsr166 1.135 this.queue = new Object[Math.max(1, initialCapacity)];
195 dl 1.2 }
196    
197     /**
198 jsr166 1.63 * Creates a {@code PriorityBlockingQueue} containing the elements
199 jsr166 1.52 * in the specified collection. If the specified collection is a
200 jsr166 1.99 * {@link SortedSet} or a {@link PriorityQueue}, this
201 jsr166 1.52 * priority queue will be ordered according to the same ordering.
202     * Otherwise, this priority queue will be ordered according to the
203     * {@linkplain Comparable natural ordering} of its elements.
204 dl 1.2 *
205 jsr166 1.52 * @param c the collection whose elements are to be placed
206     * into this priority queue
207 dl 1.2 * @throws ClassCastException if elements of the specified collection
208     * cannot be compared to one another according to the priority
209 jsr166 1.52 * queue's ordering
210 jsr166 1.42 * @throws NullPointerException if the specified collection or any
211     * of its elements are null
212 dl 1.2 */
213 dholmes 1.14 public PriorityBlockingQueue(Collection<? extends E> c) {
214 dl 1.66 boolean heapify = true; // true if not known to be in heap order
215     boolean screen = true; // true if must screen for nulls
216 dl 1.59 if (c instanceof SortedSet<?>) {
217     SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
218     this.comparator = (Comparator<? super E>) ss.comparator();
219 dl 1.66 heapify = false;
220 dl 1.59 }
221     else if (c instanceof PriorityBlockingQueue<?>) {
222 jsr166 1.61 PriorityBlockingQueue<? extends E> pq =
223 dl 1.59 (PriorityBlockingQueue<? extends E>) c;
224     this.comparator = (Comparator<? super E>) pq.comparator();
225 jsr166 1.67 screen = false;
226 dl 1.66 if (pq.getClass() == PriorityBlockingQueue.class) // exact match
227     heapify = false;
228 dl 1.59 }
229 jsr166 1.134 Object[] es = c.toArray();
230     int n = es.length;
231 dl 1.59 // If c.toArray incorrectly doesn't return Object[], copy it.
232 jsr166 1.134 if (es.getClass() != Object[].class)
233     es = Arrays.copyOf(es, n, Object[].class);
234 dl 1.66 if (screen && (n == 1 || this.comparator != null)) {
235 jsr166 1.134 for (Object e : es)
236     if (e == null)
237 dl 1.59 throw new NullPointerException();
238 dl 1.66 }
239 jsr166 1.135 this.queue = ensureNonEmpty(es);
240 dl 1.66 this.size = n;
241     if (heapify)
242     heapify();
243 dl 1.59 }
244    
245 jsr166 1.135 /** Ensures that queue[0] exists, helping peek() and poll(). */
246     private static Object[] ensureNonEmpty(Object[] es) {
247     return (es.length > 0) ? es : new Object[1];
248     }
249    
250 dl 1.59 /**
251 dl 1.66 * Tries to grow array to accommodate at least one more element
252     * (but normally expand by about 50%), giving up (allowing retry)
253     * on contention (which we expect to be rare). Call only while
254     * holding lock.
255 jsr166 1.67 *
256 dl 1.66 * @param array the heap array
257     * @param oldCap the length of the array
258 dl 1.59 */
259 dl 1.66 private void tryGrow(Object[] array, int oldCap) {
260 dl 1.59 lock.unlock(); // must release and then re-acquire main lock
261     Object[] newArray = null;
262     if (allocationSpinLock == 0 &&
263 dl 1.115 ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {
264 dl 1.59 try {
265 jsr166 1.143 int growth = oldCap < 64 ? oldCap + 2 : oldCap >> 1;
266     int newCap = ArraysSupport.newLength(oldCap, 1, growth);
267     if (queue == array)
268 dl 1.59 newArray = new Object[newCap];
269     } finally {
270     allocationSpinLock = 0;
271     }
272     }
273 dl 1.66 if (newArray == null) // back off if another thread is allocating
274 dl 1.59 Thread.yield();
275     lock.lock();
276     if (newArray != null && queue == array) {
277     queue = newArray;
278 dl 1.66 System.arraycopy(array, 0, newArray, 0, oldCap);
279 dl 1.59 }
280     }
281    
282     /**
283 jsr166 1.62 * Mechanics for poll(). Call only while holding lock.
284 dl 1.59 */
285 jsr166 1.79 private E dequeue() {
286 jsr166 1.134 // assert lock.isHeldByCurrentThread();
287 jsr166 1.135 final Object[] es;
288     final E result;
289    
290     if ((result = (E) ((es = queue)[0])) != null) {
291     final int n;
292     final E x = (E) es[(n = --size)];
293 jsr166 1.134 es[n] = null;
294     if (n > 0) {
295 jsr166 1.135 final Comparator<? super E> cmp;
296     if ((cmp = comparator) == null)
297 jsr166 1.134 siftDownComparable(0, x, es, n);
298     else
299     siftDownUsingComparator(0, x, es, n, cmp);
300     }
301 dl 1.59 }
302 jsr166 1.135 return result;
303 dl 1.59 }
304    
305     /**
306     * Inserts item x at position k, maintaining heap invariant by
307     * promoting x up the tree until it is greater than or equal to
308     * its parent, or is the root.
309     *
310 jsr166 1.121 * To simplify and speed up coercions and comparisons, the
311 dl 1.59 * Comparable and Comparator versions are separated into different
312     * methods that are otherwise identical. (Similarly for siftDown.)
313     *
314     * @param k the position to fill
315     * @param x the item to insert
316 jsr166 1.134 * @param es the heap array
317 dl 1.59 */
318 jsr166 1.134 private static <T> void siftUpComparable(int k, T x, Object[] es) {
319 dl 1.66 Comparable<? super T> key = (Comparable<? super T>) x;
320 dl 1.59 while (k > 0) {
321     int parent = (k - 1) >>> 1;
322 jsr166 1.134 Object e = es[parent];
323 dl 1.66 if (key.compareTo((T) e) >= 0)
324 dl 1.59 break;
325 jsr166 1.134 es[k] = e;
326 dl 1.59 k = parent;
327     }
328 jsr166 1.134 es[k] = key;
329 dl 1.59 }
330    
331 jsr166 1.134 private static <T> void siftUpUsingComparator(
332     int k, T x, Object[] es, Comparator<? super T> cmp) {
333 dl 1.59 while (k > 0) {
334     int parent = (k - 1) >>> 1;
335 jsr166 1.134 Object e = es[parent];
336 dl 1.66 if (cmp.compare(x, (T) e) >= 0)
337 dl 1.59 break;
338 jsr166 1.134 es[k] = e;
339 dl 1.59 k = parent;
340     }
341 jsr166 1.134 es[k] = x;
342 dl 1.59 }
343    
344     /**
345     * Inserts item x at position k, maintaining heap invariant by
346     * demoting x down the tree repeatedly until it is less than or
347     * equal to its children or is a leaf.
348     *
349     * @param k the position to fill
350     * @param x the item to insert
351 jsr166 1.134 * @param es the heap array
352 dl 1.66 * @param n heap size
353 dl 1.59 */
354 jsr166 1.134 private static <T> void siftDownComparable(int k, T x, Object[] es, int n) {
355     // assert n > 0;
356     Comparable<? super T> key = (Comparable<? super T>)x;
357     int half = n >>> 1; // loop while a non-leaf
358     while (k < half) {
359     int child = (k << 1) + 1; // assume left child is least
360     Object c = es[child];
361     int right = child + 1;
362     if (right < n &&
363     ((Comparable<? super T>) c).compareTo((T) es[right]) > 0)
364     c = es[child = right];
365     if (key.compareTo((T) c) <= 0)
366     break;
367     es[k] = c;
368     k = child;
369 dl 1.59 }
370 jsr166 1.134 es[k] = key;
371 dl 1.59 }
372    
373 jsr166 1.134 private static <T> void siftDownUsingComparator(
374     int k, T x, Object[] es, int n, Comparator<? super T> cmp) {
375     // assert n > 0;
376     int half = n >>> 1;
377     while (k < half) {
378     int child = (k << 1) + 1;
379     Object c = es[child];
380     int right = child + 1;
381     if (right < n && cmp.compare((T) c, (T) es[right]) > 0)
382     c = es[child = right];
383     if (cmp.compare(x, (T) c) <= 0)
384     break;
385     es[k] = c;
386     k = child;
387 dl 1.59 }
388 jsr166 1.134 es[k] = x;
389 dl 1.7 }
390    
391 dholmes 1.10 /**
392 dl 1.59 * Establishes the heap invariant (described above) in the entire tree,
393     * assuming nothing about the order of the elements prior to the call.
394 jsr166 1.118 * This classic algorithm due to Floyd (1964) is known to be O(size).
395 dl 1.59 */
396     private void heapify() {
397 jsr166 1.134 final Object[] es = queue;
398 jsr166 1.127 int n = size, i = (n >>> 1) - 1;
399 jsr166 1.135 final Comparator<? super E> cmp;
400     if ((cmp = comparator) == null)
401 jsr166 1.127 for (; i >= 0; i--)
402 jsr166 1.134 siftDownComparable(i, (E) es[i], es, n);
403     else
404 jsr166 1.127 for (; i >= 0; i--)
405 jsr166 1.134 siftDownUsingComparator(i, (E) es[i], es, n, cmp);
406 dl 1.59 }
407    
408     /**
409 jsr166 1.42 * Inserts the specified element into this priority queue.
410     *
411 jsr166 1.40 * @param e the element to add
412 jsr166 1.63 * @return {@code true} (as specified by {@link Collection#add})
413 dholmes 1.16 * @throws ClassCastException if the specified element cannot be compared
414 jsr166 1.42 * with elements currently in the priority queue according to the
415     * priority queue's ordering
416     * @throws NullPointerException if the specified element is null
417 dholmes 1.10 */
418 jsr166 1.40 public boolean add(E e) {
419 jsr166 1.42 return offer(e);
420 dl 1.5 }
421    
422 dholmes 1.16 /**
423 dl 1.24 * Inserts the specified element into this priority queue.
424 jsr166 1.64 * As the queue is unbounded, this method will never return {@code false}.
425 dholmes 1.16 *
426 jsr166 1.40 * @param e the element to add
427 jsr166 1.63 * @return {@code true} (as specified by {@link Queue#offer})
428 dholmes 1.16 * @throws ClassCastException if the specified element cannot be compared
429 jsr166 1.42 * with elements currently in the priority queue according to the
430     * priority queue's ordering
431     * @throws NullPointerException if the specified element is null
432 dholmes 1.16 */
433 jsr166 1.40 public boolean offer(E e) {
434 dl 1.59 if (e == null)
435     throw new NullPointerException();
436 dl 1.31 final ReentrantLock lock = this.lock;
437 dl 1.5 lock.lock();
438 dl 1.66 int n, cap;
439 jsr166 1.138 Object[] es;
440     while ((n = size) >= (cap = (es = queue).length))
441     tryGrow(es, cap);
442 dl 1.59 try {
443 jsr166 1.135 final Comparator<? super E> cmp;
444     if ((cmp = comparator) == null)
445 jsr166 1.138 siftUpComparable(n, e, es);
446 dl 1.59 else
447 jsr166 1.138 siftUpUsingComparator(n, e, es, cmp);
448 dl 1.66 size = n + 1;
449 dl 1.5 notEmpty.signal();
450 tim 1.19 } finally {
451 tim 1.13 lock.unlock();
452 dl 1.5 }
453 dl 1.59 return true;
454 dl 1.5 }
455    
456 dholmes 1.16 /**
457 jsr166 1.64 * Inserts the specified element into this priority queue.
458     * As the queue is unbounded, this method will never block.
459 jsr166 1.42 *
460 jsr166 1.40 * @param e the element to add
461 jsr166 1.42 * @throws ClassCastException if the specified element cannot be compared
462     * with elements currently in the priority queue according to the
463     * priority queue's ordering
464     * @throws NullPointerException if the specified element is null
465 dholmes 1.16 */
466 jsr166 1.40 public void put(E e) {
467     offer(e); // never need to block
468 dl 1.5 }
469    
470 dholmes 1.16 /**
471 jsr166 1.64 * Inserts the specified element into this priority queue.
472     * As the queue is unbounded, this method will never block or
473     * return {@code false}.
474 jsr166 1.42 *
475 jsr166 1.40 * @param e the element to add
476 dholmes 1.16 * @param timeout This parameter is ignored as the method never blocks
477     * @param unit This parameter is ignored as the method never blocks
478 jsr166 1.65 * @return {@code true} (as specified by
479     * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
480 jsr166 1.42 * @throws ClassCastException if the specified element cannot be compared
481     * with elements currently in the priority queue according to the
482     * priority queue's ordering
483     * @throws NullPointerException if the specified element is null
484 dholmes 1.16 */
485 jsr166 1.40 public boolean offer(E e, long timeout, TimeUnit unit) {
486     return offer(e); // never need to block
487 dl 1.5 }
488    
489 jsr166 1.42 public E poll() {
490     final ReentrantLock lock = this.lock;
491     lock.lock();
492     try {
493 jsr166 1.79 return dequeue();
494 jsr166 1.42 } finally {
495     lock.unlock();
496     }
497     }
498    
499 dl 1.5 public E take() throws InterruptedException {
500 dl 1.31 final ReentrantLock lock = this.lock;
501 dl 1.5 lock.lockInterruptibly();
502 dl 1.66 E result;
503 dl 1.5 try {
504 jsr166 1.79 while ( (result = dequeue()) == null)
505 jsr166 1.55 notEmpty.await();
506 tim 1.19 } finally {
507 dl 1.5 lock.unlock();
508     }
509 dl 1.59 return result;
510 dl 1.5 }
511    
512     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
513 dholmes 1.10 long nanos = unit.toNanos(timeout);
514 dl 1.31 final ReentrantLock lock = this.lock;
515 dl 1.5 lock.lockInterruptibly();
516 dl 1.66 E result;
517 dl 1.5 try {
518 jsr166 1.79 while ( (result = dequeue()) == null && nanos > 0)
519 jsr166 1.55 nanos = notEmpty.awaitNanos(nanos);
520 tim 1.19 } finally {
521 dl 1.5 lock.unlock();
522     }
523 dl 1.59 return result;
524 dl 1.5 }
525    
526     public E peek() {
527 dl 1.31 final ReentrantLock lock = this.lock;
528 dl 1.5 lock.lock();
529     try {
530 jsr166 1.135 return (E) queue[0];
531 tim 1.19 } finally {
532 tim 1.13 lock.unlock();
533 dl 1.5 }
534     }
535 jsr166 1.61
536 jsr166 1.42 /**
537     * Returns the comparator used to order the elements in this queue,
538 jsr166 1.63 * or {@code null} if this queue uses the {@linkplain Comparable
539 jsr166 1.42 * natural ordering} of its elements.
540     *
541     * @return the comparator used to order the elements in this queue,
542 jsr166 1.63 * or {@code null} if this queue uses the natural
543 jsr166 1.52 * ordering of its elements
544 jsr166 1.42 */
545     public Comparator<? super E> comparator() {
546 dl 1.59 return comparator;
547 jsr166 1.42 }
548    
549 dl 1.5 public int size() {
550 dl 1.31 final ReentrantLock lock = this.lock;
551 dl 1.5 lock.lock();
552     try {
553 jsr166 1.68 return size;
554 tim 1.19 } finally {
555 dl 1.5 lock.unlock();
556     }
557     }
558    
559     /**
560 jsr166 1.63 * Always returns {@code Integer.MAX_VALUE} because
561     * a {@code PriorityBlockingQueue} is not capacity constrained.
562     * @return {@code Integer.MAX_VALUE} always
563 dl 1.5 */
564     public int remainingCapacity() {
565     return Integer.MAX_VALUE;
566     }
567    
568 dl 1.59 private int indexOf(Object o) {
569     if (o != null) {
570 jsr166 1.133 final Object[] es = queue;
571     for (int i = 0, n = size; i < n; i++)
572     if (o.equals(es[i]))
573 dl 1.59 return i;
574     }
575     return -1;
576     }
577    
578     /**
579     * Removes the ith element from queue.
580     */
581     private void removeAt(int i) {
582 jsr166 1.134 final Object[] es = queue;
583 jsr166 1.135 final int n = size - 1;
584 dl 1.66 if (n == i) // removed last element
585 jsr166 1.134 es[i] = null;
586 dl 1.59 else {
587 jsr166 1.134 E moved = (E) es[n];
588     es[n] = null;
589 jsr166 1.135 final Comparator<? super E> cmp;
590     if ((cmp = comparator) == null)
591 jsr166 1.134 siftDownComparable(i, moved, es, n);
592 dl 1.66 else
593 jsr166 1.134 siftDownUsingComparator(i, moved, es, n, cmp);
594     if (es[i] == moved) {
595 dl 1.66 if (cmp == null)
596 jsr166 1.134 siftUpComparable(i, moved, es);
597 dl 1.66 else
598 jsr166 1.134 siftUpUsingComparator(i, moved, es, cmp);
599 dl 1.66 }
600 dl 1.59 }
601 dl 1.66 size = n;
602 dl 1.59 }
603    
604 dl 1.37 /**
605 jsr166 1.42 * Removes a single instance of the specified element from this queue,
606 jsr166 1.52 * if it is present. More formally, removes an element {@code e} such
607     * that {@code o.equals(e)}, if this queue contains one or more such
608     * elements. Returns {@code true} if and only if this queue contained
609     * the specified element (or equivalently, if this queue changed as a
610     * result of the call).
611 jsr166 1.42 *
612     * @param o element to be removed from this queue, if present
613 jsr166 1.63 * @return {@code true} if this queue changed as a result of the call
614 dl 1.37 */
615 dholmes 1.14 public boolean remove(Object o) {
616 dl 1.31 final ReentrantLock lock = this.lock;
617 dl 1.5 lock.lock();
618     try {
619 dl 1.59 int i = indexOf(o);
620 jsr166 1.78 if (i == -1)
621     return false;
622     removeAt(i);
623     return true;
624 dl 1.59 } finally {
625     lock.unlock();
626     }
627     }
628    
629     /**
630 jsr166 1.112 * Identity-based version for use in Itr.remove.
631 jsr166 1.133 *
632     * @param o element to be removed from this queue, if present
633 dl 1.59 */
634 jsr166 1.133 void removeEq(Object o) {
635 dl 1.59 final ReentrantLock lock = this.lock;
636     lock.lock();
637     try {
638 jsr166 1.133 final Object[] es = queue;
639 jsr166 1.78 for (int i = 0, n = size; i < n; i++) {
640 jsr166 1.133 if (o == es[i]) {
641 dl 1.59 removeAt(i);
642     break;
643     }
644     }
645 tim 1.19 } finally {
646 dl 1.5 lock.unlock();
647     }
648     }
649    
650 jsr166 1.42 /**
651 jsr166 1.52 * Returns {@code true} if this queue contains the specified element.
652     * More formally, returns {@code true} if and only if this queue contains
653     * at least one element {@code e} such that {@code o.equals(e)}.
654 jsr166 1.42 *
655     * @param o object to be checked for containment in this queue
656 jsr166 1.63 * @return {@code true} if this queue contains the specified element
657 jsr166 1.42 */
658 dholmes 1.14 public boolean contains(Object o) {
659 dl 1.31 final ReentrantLock lock = this.lock;
660 dl 1.5 lock.lock();
661     try {
662 jsr166 1.78 return indexOf(o) != -1;
663 tim 1.19 } finally {
664 dl 1.5 lock.unlock();
665     }
666     }
667    
668     public String toString() {
669 jsr166 1.111 return Helpers.collectionToString(this);
670 dl 1.5 }
671    
672 jsr166 1.42 /**
673     * @throws UnsupportedOperationException {@inheritDoc}
674     * @throws ClassCastException {@inheritDoc}
675     * @throws NullPointerException {@inheritDoc}
676     * @throws IllegalArgumentException {@inheritDoc}
677     */
678 dl 1.26 public int drainTo(Collection<? super E> c) {
679 jsr166 1.76 return drainTo(c, Integer.MAX_VALUE);
680 dl 1.26 }
681    
682 jsr166 1.42 /**
683     * @throws UnsupportedOperationException {@inheritDoc}
684     * @throws ClassCastException {@inheritDoc}
685     * @throws NullPointerException {@inheritDoc}
686     * @throws IllegalArgumentException {@inheritDoc}
687     */
688 dl 1.26 public int drainTo(Collection<? super E> c, int maxElements) {
689 jsr166 1.124 Objects.requireNonNull(c);
690 dl 1.26 if (c == this)
691     throw new IllegalArgumentException();
692     if (maxElements <= 0)
693     return 0;
694 dl 1.31 final ReentrantLock lock = this.lock;
695 dl 1.26 lock.lock();
696     try {
697 jsr166 1.76 int n = Math.min(size, maxElements);
698     for (int i = 0; i < n; i++) {
699     c.add((E) queue[0]); // In this order, in case add() throws.
700 jsr166 1.79 dequeue();
701 dl 1.26 }
702     return n;
703     } finally {
704     lock.unlock();
705     }
706     }
707    
708 dl 1.17 /**
709 dl 1.37 * Atomically removes all of the elements from this queue.
710 dl 1.17 * The queue will be empty after this call returns.
711     */
712     public void clear() {
713 dl 1.31 final ReentrantLock lock = this.lock;
714 dl 1.17 lock.lock();
715     try {
716 jsr166 1.133 final Object[] es = queue;
717     for (int i = 0, n = size; i < n; i++)
718     es[i] = null;
719 dl 1.59 size = 0;
720 tim 1.19 } finally {
721 dl 1.17 lock.unlock();
722     }
723     }
724    
725 jsr166 1.42 /**
726 jsr166 1.110 * Returns an array containing all of the elements in this queue.
727     * The returned array elements are in no particular order.
728     *
729     * <p>The returned array will be "safe" in that no references to it are
730     * maintained by this queue. (In other words, this method must allocate
731     * a new array). The caller is thus free to modify the returned array.
732     *
733     * <p>This method acts as bridge between array-based and collection-based
734     * APIs.
735     *
736     * @return an array containing all of the elements in this queue
737     */
738     public Object[] toArray() {
739     final ReentrantLock lock = this.lock;
740     lock.lock();
741     try {
742     return Arrays.copyOf(queue, size);
743     } finally {
744     lock.unlock();
745     }
746     }
747    
748     /**
749 jsr166 1.42 * Returns an array containing all of the elements in this queue; the
750     * runtime type of the returned array is that of the specified array.
751     * The returned array elements are in no particular order.
752     * If the queue fits in the specified array, it is returned therein.
753     * Otherwise, a new array is allocated with the runtime type of the
754     * specified array and the size of this queue.
755     *
756     * <p>If this queue fits in the specified array with room to spare
757     * (i.e., the array has more elements than this queue), the element in
758     * the array immediately following the end of the queue is set to
759 jsr166 1.63 * {@code null}.
760 jsr166 1.42 *
761     * <p>Like the {@link #toArray()} method, this method acts as bridge between
762     * array-based and collection-based APIs. Further, this method allows
763     * precise control over the runtime type of the output array, and may,
764     * under certain circumstances, be used to save allocation costs.
765     *
766 jsr166 1.63 * <p>Suppose {@code x} is a queue known to contain only strings.
767 jsr166 1.42 * The following code can be used to dump the queue into a newly
768 jsr166 1.63 * allocated array of {@code String}:
769 jsr166 1.42 *
770 jsr166 1.109 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
771 jsr166 1.42 *
772 jsr166 1.63 * Note that {@code toArray(new Object[0])} is identical in function to
773     * {@code toArray()}.
774 jsr166 1.42 *
775     * @param a the array into which the elements of the queue are to
776     * be stored, if it is big enough; otherwise, a new array of the
777     * same runtime type is allocated for this purpose
778     * @return an array containing all of the elements in this queue
779     * @throws ArrayStoreException if the runtime type of the specified array
780     * is not a supertype of the runtime type of every element in
781     * this queue
782     * @throws NullPointerException if the specified array is null
783     */
784 dl 1.5 public <T> T[] toArray(T[] a) {
785 dl 1.31 final ReentrantLock lock = this.lock;
786 dl 1.5 lock.lock();
787     try {
788 dl 1.66 int n = size;
789     if (a.length < n)
790 dl 1.59 // Make a new array of a's runtime type, but my contents:
791     return (T[]) Arrays.copyOf(queue, size, a.getClass());
792 dl 1.66 System.arraycopy(queue, 0, a, 0, n);
793     if (a.length > n)
794     a[n] = null;
795 dl 1.59 return a;
796 tim 1.19 } finally {
797 dl 1.5 lock.unlock();
798     }
799     }
800    
801 dholmes 1.16 /**
802 dl 1.23 * Returns an iterator over the elements in this queue. The
803     * iterator does not return the elements in any particular order.
804 jsr166 1.69 *
805 jsr166 1.103 * <p>The returned iterator is
806     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
807 dholmes 1.16 *
808 jsr166 1.42 * @return an iterator over the elements in this queue
809 dholmes 1.16 */
810 dl 1.5 public Iterator<E> iterator() {
811 dl 1.51 return new Itr(toArray());
812 dl 1.5 }
813    
814 dl 1.49 /**
815     * Snapshot iterator that works off copy of underlying q array.
816     */
817 dl 1.59 final class Itr implements Iterator<E> {
818 dl 1.49 final Object[] array; // Array of all elements
819 jsr166 1.81 int cursor; // index of next element to return
820 jsr166 1.136 int lastRet = -1; // index of last element, or -1 if no such
821 jsr166 1.50
822 dl 1.49 Itr(Object[] array) {
823     this.array = array;
824 dl 1.5 }
825    
826 tim 1.13 public boolean hasNext() {
827 dl 1.49 return cursor < array.length;
828 tim 1.13 }
829    
830     public E next() {
831 dl 1.49 if (cursor >= array.length)
832     throw new NoSuchElementException();
833 jsr166 1.120 return (E)array[lastRet = cursor++];
834 tim 1.13 }
835    
836     public void remove() {
837 jsr166 1.50 if (lastRet < 0)
838 jsr166 1.54 throw new IllegalStateException();
839 jsr166 1.133 removeEq(array[lastRet]);
840 dl 1.49 lastRet = -1;
841 tim 1.13 }
842 jsr166 1.137
843     public void forEachRemaining(Consumer<? super E> action) {
844     Objects.requireNonNull(action);
845     final Object[] es = array;
846     int i;
847     if ((i = cursor) < es.length) {
848     lastRet = -1;
849     cursor = es.length;
850     for (; i < es.length; i++)
851     action.accept((E) es[i]);
852     lastRet = es.length - 1;
853     }
854     }
855 dl 1.5 }
856    
857     /**
858 jsr166 1.83 * Saves this queue to a stream (that is, serializes it).
859     *
860     * For compatibility with previous version of this class, elements
861     * are first copied to a java.util.PriorityQueue, which is then
862     * serialized.
863 jsr166 1.97 *
864     * @param s the stream
865 jsr166 1.98 * @throws java.io.IOException if an I/O error occurs
866 dl 1.5 */
867     private void writeObject(java.io.ObjectOutputStream s)
868     throws java.io.IOException {
869     lock.lock();
870     try {
871 jsr166 1.78 // avoid zero capacity argument
872     q = new PriorityQueue<E>(Math.max(size, 1), comparator);
873 dl 1.59 q.addAll(this);
874 dl 1.5 s.defaultWriteObject();
875 dl 1.66 } finally {
876 dl 1.59 q = null;
877 dl 1.5 lock.unlock();
878     }
879 tim 1.1 }
880    
881 dl 1.59 /**
882 jsr166 1.83 * Reconstitutes this queue from a stream (that is, deserializes it).
883 jsr166 1.97 * @param s the stream
884 jsr166 1.98 * @throws ClassNotFoundException if the class of a serialized object
885     * could not be found
886     * @throws java.io.IOException if an I/O error occurs
887 dl 1.59 */
888     private void readObject(java.io.ObjectInputStream s)
889     throws java.io.IOException, ClassNotFoundException {
890 jsr166 1.67 try {
891 dl 1.66 s.defaultReadObject();
892 jsr166 1.131 int sz = q.size();
893 jsr166 1.141 jsr166.Platform.checkArray(s, Object[].class, sz);
894 jsr166 1.135 this.queue = new Object[Math.max(1, sz)];
895 dl 1.66 comparator = q.comparator();
896     addAll(q);
897 jsr166 1.67 } finally {
898 dl 1.66 q = null;
899     }
900 dl 1.59 }
901    
902 jsr166 1.116 /**
903     * Immutable snapshot spliterator that binds to elements "late".
904     */
905 jsr166 1.119 final class PBQSpliterator implements Spliterator<E> {
906 jsr166 1.125 Object[] array; // null until late-bound-initialized
907 dl 1.93 int index;
908     int fence;
909    
910 jsr166 1.125 PBQSpliterator() {}
911    
912 jsr166 1.119 PBQSpliterator(Object[] array, int index, int fence) {
913 dl 1.93 this.array = array;
914     this.index = index;
915     this.fence = fence;
916     }
917    
918 jsr166 1.125 private int getFence() {
919     if (array == null)
920     fence = (array = toArray()).length;
921     return fence;
922 dl 1.93 }
923    
924 jsr166 1.119 public PBQSpliterator trySplit() {
925 dl 1.93 int hi = getFence(), lo = index, mid = (lo + hi) >>> 1;
926     return (lo >= mid) ? null :
927 jsr166 1.119 new PBQSpliterator(array, lo, index = mid);
928 dl 1.93 }
929    
930 dl 1.95 public void forEachRemaining(Consumer<? super E> action) {
931 jsr166 1.124 Objects.requireNonNull(action);
932 jsr166 1.125 final int hi = getFence(), lo = index;
933 jsr166 1.134 final Object[] es = array;
934 jsr166 1.125 index = hi; // ensure exhaustion
935     for (int i = lo; i < hi; i++)
936 jsr166 1.134 action.accept((E) es[i]);
937 dl 1.93 }
938    
939     public boolean tryAdvance(Consumer<? super E> action) {
940 jsr166 1.124 Objects.requireNonNull(action);
941 dl 1.93 if (getFence() > index && index >= 0) {
942 jsr166 1.125 action.accept((E) array[index++]);
943 dl 1.93 return true;
944     }
945     return false;
946     }
947    
948 jsr166 1.119 public long estimateSize() { return getFence() - index; }
949 dl 1.93
950     public int characteristics() {
951 jsr166 1.123 return (Spliterator.NONNULL |
952     Spliterator.SIZED |
953     Spliterator.SUBSIZED);
954 dl 1.93 }
955     }
956    
957 jsr166 1.102 /**
958     * Returns a {@link Spliterator} over the elements in this queue.
959 jsr166 1.117 * The spliterator does not traverse elements in any particular order
960     * (the {@link Spliterator#ORDERED ORDERED} characteristic is not reported).
961 jsr166 1.102 *
962 jsr166 1.103 * <p>The returned spliterator is
963     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
964     *
965 jsr166 1.102 * <p>The {@code Spliterator} reports {@link Spliterator#SIZED} and
966     * {@link Spliterator#NONNULL}.
967     *
968     * @implNote
969     * The {@code Spliterator} additionally reports {@link Spliterator#SUBSIZED}.
970     *
971     * @return a {@code Spliterator} over the elements in this queue
972     * @since 1.8
973     */
974 dl 1.94 public Spliterator<E> spliterator() {
975 jsr166 1.125 return new PBQSpliterator();
976 dl 1.86 }
977    
978 jsr166 1.132 /**
979     * @throws NullPointerException {@inheritDoc}
980     */
981 jsr166 1.139 public boolean removeIf(Predicate<? super E> filter) {
982     Objects.requireNonNull(filter);
983     return bulkRemove(filter);
984     }
985    
986     /**
987     * @throws NullPointerException {@inheritDoc}
988     */
989     public boolean removeAll(Collection<?> c) {
990     Objects.requireNonNull(c);
991     return bulkRemove(e -> c.contains(e));
992     }
993    
994     /**
995     * @throws NullPointerException {@inheritDoc}
996     */
997     public boolean retainAll(Collection<?> c) {
998     Objects.requireNonNull(c);
999     return bulkRemove(e -> !c.contains(e));
1000     }
1001    
1002     // A tiny bit set implementation
1003    
1004     private static long[] nBits(int n) {
1005     return new long[((n - 1) >> 6) + 1];
1006     }
1007     private static void setBit(long[] bits, int i) {
1008     bits[i >> 6] |= 1L << i;
1009     }
1010     private static boolean isClear(long[] bits, int i) {
1011     return (bits[i >> 6] & (1L << i)) == 0;
1012     }
1013    
1014     /** Implementation of bulk remove methods. */
1015     private boolean bulkRemove(Predicate<? super E> filter) {
1016     final ReentrantLock lock = this.lock;
1017     lock.lock();
1018     try {
1019     final Object[] es = queue;
1020     final int end = size;
1021     int i;
1022     // Optimize for initial run of survivors
1023     for (i = 0; i < end && !filter.test((E) es[i]); i++)
1024     ;
1025     if (i >= end)
1026     return false;
1027     // Tolerate predicates that reentrantly access the
1028     // collection for read, so traverse once to find elements
1029     // to delete, a second pass to physically expunge.
1030     final int beg = i;
1031     final long[] deathRow = nBits(end - beg);
1032     deathRow[0] = 1L; // set bit 0
1033     for (i = beg + 1; i < end; i++)
1034     if (filter.test((E) es[i]))
1035     setBit(deathRow, i - beg);
1036     int w = beg;
1037     for (i = beg; i < end; i++)
1038     if (isClear(deathRow, i - beg))
1039     es[w++] = es[i];
1040     for (i = size = w; i < end; i++)
1041     es[i] = null;
1042     heapify();
1043     return true;
1044     } finally {
1045     lock.unlock();
1046     }
1047     }
1048    
1049     /**
1050     * @throws NullPointerException {@inheritDoc}
1051     */
1052 jsr166 1.132 public void forEach(Consumer<? super E> action) {
1053     Objects.requireNonNull(action);
1054     final ReentrantLock lock = this.lock;
1055     lock.lock();
1056     try {
1057     final Object[] es = queue;
1058     for (int i = 0, n = size; i < n; i++)
1059     action.accept((E) es[i]);
1060     } finally {
1061     lock.unlock();
1062     }
1063     }
1064    
1065 dl 1.115 // VarHandle mechanics
1066     private static final VarHandle ALLOCATIONSPINLOCK;
1067 dl 1.70 static {
1068 dl 1.59 try {
1069 dl 1.115 MethodHandles.Lookup l = MethodHandles.lookup();
1070     ALLOCATIONSPINLOCK = l.findVarHandle(PriorityBlockingQueue.class,
1071     "allocationSpinLock",
1072     int.class);
1073 jsr166 1.107 } catch (ReflectiveOperationException e) {
1074 jsr166 1.130 throw new ExceptionInInitializerError(e);
1075 dl 1.59 }
1076     }
1077 tim 1.1 }