ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.148
Committed: Fri Mar 18 16:01:42 2022 UTC (2 years, 2 months ago) by dl
Branch: MAIN
CVS Tags: HEAD
Changes since 1.147: +11 -5 lines
Log Message:
jdk17+ suppressWarnings, FJ updates

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.33 * Expert Group and released to the public domain, as explained at
4 jsr166 1.71 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 tim 1.13
9 dl 1.115 import java.lang.invoke.MethodHandles;
10     import java.lang.invoke.VarHandle;
11 dl 1.86 import java.util.AbstractQueue;
12     import java.util.Arrays;
13     import java.util.Collection;
14     import java.util.Comparator;
15     import java.util.Iterator;
16     import java.util.NoSuchElementException;
17 jsr166 1.124 import java.util.Objects;
18 dl 1.86 import java.util.PriorityQueue;
19     import java.util.Queue;
20     import java.util.SortedSet;
21     import java.util.Spliterator;
22 jsr166 1.105 import java.util.concurrent.locks.Condition;
23     import java.util.concurrent.locks.ReentrantLock;
24     import java.util.function.Consumer;
25 jsr166 1.139 import java.util.function.Predicate;
26 jsr166 1.141 // OPENJDK import jdk.internal.access.SharedSecrets;
27 jsr166 1.143 import jdk.internal.util.ArraysSupport;
28 tim 1.1
29     /**
30 dl 1.25 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
31     * the same ordering rules as class {@link PriorityQueue} and supplies
32     * blocking retrieval operations. While this queue is logically
33 dl 1.24 * unbounded, attempted additions may fail due to resource exhaustion
34 jsr166 1.63 * (causing {@code OutOfMemoryError}). This class does not permit
35     * {@code null} elements. A priority queue relying on {@linkplain
36 jsr166 1.42 * Comparable natural ordering} also does not permit insertion of
37     * non-comparable objects (doing so results in
38 jsr166 1.63 * {@code ClassCastException}).
39 dl 1.20 *
40 jsr166 1.126 * <p>This class and its iterator implement all of the <em>optional</em>
41     * methods of the {@link Collection} and {@link Iterator} interfaces.
42     * The Iterator provided in method {@link #iterator()} and the
43     * Spliterator provided in method {@link #spliterator()} are <em>not</em>
44     * guaranteed to traverse the elements of the PriorityBlockingQueue in
45     * any particular order. If you need ordered traversal, consider using
46     * {@code Arrays.sort(pq.toArray())}. Also, method {@code drainTo} can
47     * be used to <em>remove</em> some or all elements in priority order and
48     * place them in another collection.
49 dl 1.41 *
50     * <p>Operations on this class make no guarantees about the ordering
51     * of elements with equal priority. If you need to enforce an
52     * ordering, you can define custom classes or comparators that use a
53     * secondary key to break ties in primary priority values. For
54     * example, here is a class that applies first-in-first-out
55     * tie-breaking to comparable elements. To use it, you would insert a
56 jsr166 1.63 * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
57 dl 1.41 *
58 jsr166 1.109 * <pre> {@code
59 jsr166 1.56 * class FIFOEntry<E extends Comparable<? super E>>
60     * implements Comparable<FIFOEntry<E>> {
61 dl 1.146 * static final AtomicLong seq = new AtomicLong();
62 dl 1.41 * final long seqNum;
63     * final E entry;
64     * public FIFOEntry(E entry) {
65     * seqNum = seq.getAndIncrement();
66     * this.entry = entry;
67     * }
68     * public E getEntry() { return entry; }
69 jsr166 1.56 * public int compareTo(FIFOEntry<E> other) {
70 dl 1.41 * int res = entry.compareTo(other.entry);
71 jsr166 1.56 * if (res == 0 && other.entry != this.entry)
72     * res = (seqNum < other.seqNum ? -1 : 1);
73 dl 1.41 * return res;
74     * }
75 jsr166 1.56 * }}</pre>
76 dl 1.20 *
77 dl 1.35 * <p>This class is a member of the
78 jsr166 1.140 * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
79 dl 1.35 * Java Collections Framework</a>.
80     *
81 dl 1.6 * @since 1.5
82     * @author Doug Lea
83 jsr166 1.104 * @param <E> the type of elements held in this queue
84 dl 1.28 */
85 jsr166 1.82 @SuppressWarnings("unchecked")
86 dl 1.5 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
87 dl 1.15 implements BlockingQueue<E>, java.io.Serializable {
88 dl 1.21 private static final long serialVersionUID = 5595510919245408276L;
89 tim 1.1
90 dl 1.59 /*
91 dl 1.66 * The implementation uses an array-based binary heap, with public
92     * operations protected with a single lock. However, allocation
93     * during resizing uses a simple spinlock (used only while not
94     * holding main lock) in order to allow takes to operate
95     * concurrently with allocation. This avoids repeated
96     * postponement of waiting consumers and consequent element
97     * build-up. The need to back away from lock during allocation
98     * makes it impossible to simply wrap delegated
99     * java.util.PriorityQueue operations within a lock, as was done
100     * in a previous version of this class. To maintain
101     * interoperability, a plain PriorityQueue is still used during
102 jsr166 1.77 * serialization, which maintains compatibility at the expense of
103 dl 1.66 * transiently doubling overhead.
104 dl 1.59 */
105    
106     /**
107     * Default array capacity.
108     */
109     private static final int DEFAULT_INITIAL_CAPACITY = 11;
110    
111     /**
112     * Priority queue represented as a balanced binary heap: the two
113     * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
114     * priority queue is ordered by comparator, or by the elements'
115     * natural ordering, if comparator is null: For each node n in the
116     * heap and each descendant d of n, n <= d. The element with the
117     * lowest value is in queue[0], assuming the queue is nonempty.
118     */
119     private transient Object[] queue;
120    
121     /**
122     * The number of elements in the priority queue.
123     */
124 dl 1.66 private transient int size;
125 dl 1.59
126     /**
127     * The comparator, or null if priority queue uses elements'
128     * natural ordering.
129     */
130     private transient Comparator<? super E> comparator;
131    
132     /**
133 jsr166 1.112 * Lock used for all public operations.
134 dl 1.59 */
135 jsr166 1.135 private final ReentrantLock lock = new ReentrantLock();
136 dl 1.59
137     /**
138 jsr166 1.112 * Condition for blocking when empty.
139 dl 1.59 */
140 jsr166 1.142 @SuppressWarnings("serial") // Classes implementing Condition may be serializable.
141 jsr166 1.135 private final Condition notEmpty = lock.newCondition();
142 dl 1.5
143 dl 1.2 /**
144 dl 1.59 * Spinlock for allocation, acquired via CAS.
145     */
146     private transient volatile int allocationSpinLock;
147    
148     /**
149 dl 1.66 * A plain PriorityQueue used only for serialization,
150     * to maintain compatibility with previous versions
151     * of this class. Non-null only during serialization/deserialization.
152     */
153 jsr166 1.72 private PriorityQueue<E> q;
154 dl 1.66
155     /**
156 jsr166 1.63 * Creates a {@code PriorityBlockingQueue} with the default
157 jsr166 1.42 * initial capacity (11) that orders its elements according to
158     * their {@linkplain Comparable natural ordering}.
159 dl 1.2 */
160     public PriorityBlockingQueue() {
161 dl 1.59 this(DEFAULT_INITIAL_CAPACITY, null);
162 dl 1.2 }
163    
164     /**
165 jsr166 1.63 * Creates a {@code PriorityBlockingQueue} with the specified
166 jsr166 1.42 * initial capacity that orders its elements according to their
167     * {@linkplain Comparable natural ordering}.
168 dl 1.2 *
169 jsr166 1.42 * @param initialCapacity the initial capacity for this priority queue
170 jsr166 1.63 * @throws IllegalArgumentException if {@code initialCapacity} is less
171 jsr166 1.52 * than 1
172 dl 1.2 */
173     public PriorityBlockingQueue(int initialCapacity) {
174 dl 1.59 this(initialCapacity, null);
175 dl 1.2 }
176    
177     /**
178 jsr166 1.63 * Creates a {@code PriorityBlockingQueue} with the specified initial
179 jsr166 1.39 * capacity that orders its elements according to the specified
180     * comparator.
181 dl 1.2 *
182 jsr166 1.42 * @param initialCapacity the initial capacity for this priority queue
183 jsr166 1.52 * @param comparator the comparator that will be used to order this
184     * priority queue. If {@code null}, the {@linkplain Comparable
185     * natural ordering} of the elements will be used.
186 jsr166 1.63 * @throws IllegalArgumentException if {@code initialCapacity} is less
187 jsr166 1.52 * than 1
188 dl 1.2 */
189 tim 1.13 public PriorityBlockingQueue(int initialCapacity,
190 dholmes 1.14 Comparator<? super E> comparator) {
191 dl 1.59 if (initialCapacity < 1)
192     throw new IllegalArgumentException();
193 dl 1.66 this.comparator = comparator;
194 jsr166 1.135 this.queue = new Object[Math.max(1, initialCapacity)];
195 dl 1.2 }
196    
197     /**
198 jsr166 1.63 * Creates a {@code PriorityBlockingQueue} containing the elements
199 jsr166 1.52 * in the specified collection. If the specified collection is a
200 dl 1.147 * {@link SortedSet} or a {@link PriorityBlockingQueue}, this
201 jsr166 1.52 * priority queue will be ordered according to the same ordering.
202     * Otherwise, this priority queue will be ordered according to the
203     * {@linkplain Comparable natural ordering} of its elements.
204 dl 1.2 *
205 jsr166 1.52 * @param c the collection whose elements are to be placed
206     * into this priority queue
207 dl 1.2 * @throws ClassCastException if elements of the specified collection
208     * cannot be compared to one another according to the priority
209 jsr166 1.52 * queue's ordering
210 jsr166 1.42 * @throws NullPointerException if the specified collection or any
211     * of its elements are null
212 dl 1.2 */
213 dholmes 1.14 public PriorityBlockingQueue(Collection<? extends E> c) {
214 dl 1.66 boolean heapify = true; // true if not known to be in heap order
215     boolean screen = true; // true if must screen for nulls
216 dl 1.59 if (c instanceof SortedSet<?>) {
217     SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
218     this.comparator = (Comparator<? super E>) ss.comparator();
219 dl 1.66 heapify = false;
220 dl 1.59 }
221     else if (c instanceof PriorityBlockingQueue<?>) {
222 jsr166 1.61 PriorityBlockingQueue<? extends E> pq =
223 dl 1.59 (PriorityBlockingQueue<? extends E>) c;
224     this.comparator = (Comparator<? super E>) pq.comparator();
225 jsr166 1.67 screen = false;
226 dl 1.66 if (pq.getClass() == PriorityBlockingQueue.class) // exact match
227     heapify = false;
228 dl 1.59 }
229 jsr166 1.134 Object[] es = c.toArray();
230     int n = es.length;
231 jsr166 1.145 if (c.getClass() != java.util.ArrayList.class)
232 jsr166 1.134 es = Arrays.copyOf(es, n, Object[].class);
233 dl 1.66 if (screen && (n == 1 || this.comparator != null)) {
234 jsr166 1.134 for (Object e : es)
235     if (e == null)
236 dl 1.59 throw new NullPointerException();
237 dl 1.66 }
238 jsr166 1.135 this.queue = ensureNonEmpty(es);
239 dl 1.66 this.size = n;
240     if (heapify)
241     heapify();
242 dl 1.59 }
243    
244 jsr166 1.135 /** Ensures that queue[0] exists, helping peek() and poll(). */
245     private static Object[] ensureNonEmpty(Object[] es) {
246     return (es.length > 0) ? es : new Object[1];
247     }
248    
249 dl 1.59 /**
250 dl 1.66 * Tries to grow array to accommodate at least one more element
251     * (but normally expand by about 50%), giving up (allowing retry)
252     * on contention (which we expect to be rare). Call only while
253     * holding lock.
254 jsr166 1.67 *
255 dl 1.66 * @param array the heap array
256     * @param oldCap the length of the array
257 dl 1.59 */
258 dl 1.66 private void tryGrow(Object[] array, int oldCap) {
259 dl 1.59 lock.unlock(); // must release and then re-acquire main lock
260     Object[] newArray = null;
261     if (allocationSpinLock == 0 &&
262 dl 1.115 ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {
263 dl 1.59 try {
264 dl 1.148 int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
265     int newCap = oldCap + ((oldCap < 64) ?
266     (oldCap + 2) : // grow faster if small
267     (oldCap >> 1));
268     if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
269     int minCap = oldCap + 1;
270     if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
271     throw new OutOfMemoryError();
272     newCap = MAX_ARRAY_SIZE;
273     }
274     if (newCap > oldCap && queue == array)
275 dl 1.59 newArray = new Object[newCap];
276     } finally {
277     allocationSpinLock = 0;
278     }
279     }
280 dl 1.66 if (newArray == null) // back off if another thread is allocating
281 dl 1.59 Thread.yield();
282     lock.lock();
283     if (newArray != null && queue == array) {
284     queue = newArray;
285 dl 1.66 System.arraycopy(array, 0, newArray, 0, oldCap);
286 dl 1.59 }
287     }
288    
289     /**
290 jsr166 1.62 * Mechanics for poll(). Call only while holding lock.
291 dl 1.59 */
292 jsr166 1.79 private E dequeue() {
293 jsr166 1.134 // assert lock.isHeldByCurrentThread();
294 jsr166 1.135 final Object[] es;
295     final E result;
296    
297     if ((result = (E) ((es = queue)[0])) != null) {
298     final int n;
299     final E x = (E) es[(n = --size)];
300 jsr166 1.134 es[n] = null;
301     if (n > 0) {
302 jsr166 1.135 final Comparator<? super E> cmp;
303     if ((cmp = comparator) == null)
304 jsr166 1.134 siftDownComparable(0, x, es, n);
305     else
306     siftDownUsingComparator(0, x, es, n, cmp);
307     }
308 dl 1.59 }
309 jsr166 1.135 return result;
310 dl 1.59 }
311    
312     /**
313     * Inserts item x at position k, maintaining heap invariant by
314     * promoting x up the tree until it is greater than or equal to
315     * its parent, or is the root.
316     *
317 jsr166 1.121 * To simplify and speed up coercions and comparisons, the
318 dl 1.59 * Comparable and Comparator versions are separated into different
319     * methods that are otherwise identical. (Similarly for siftDown.)
320     *
321     * @param k the position to fill
322     * @param x the item to insert
323 jsr166 1.134 * @param es the heap array
324 dl 1.59 */
325 jsr166 1.134 private static <T> void siftUpComparable(int k, T x, Object[] es) {
326 dl 1.66 Comparable<? super T> key = (Comparable<? super T>) x;
327 dl 1.59 while (k > 0) {
328     int parent = (k - 1) >>> 1;
329 jsr166 1.134 Object e = es[parent];
330 dl 1.66 if (key.compareTo((T) e) >= 0)
331 dl 1.59 break;
332 jsr166 1.134 es[k] = e;
333 dl 1.59 k = parent;
334     }
335 jsr166 1.134 es[k] = key;
336 dl 1.59 }
337    
338 jsr166 1.134 private static <T> void siftUpUsingComparator(
339     int k, T x, Object[] es, Comparator<? super T> cmp) {
340 dl 1.59 while (k > 0) {
341     int parent = (k - 1) >>> 1;
342 jsr166 1.134 Object e = es[parent];
343 dl 1.66 if (cmp.compare(x, (T) e) >= 0)
344 dl 1.59 break;
345 jsr166 1.134 es[k] = e;
346 dl 1.59 k = parent;
347     }
348 jsr166 1.134 es[k] = x;
349 dl 1.59 }
350    
351     /**
352     * Inserts item x at position k, maintaining heap invariant by
353     * demoting x down the tree repeatedly until it is less than or
354     * equal to its children or is a leaf.
355     *
356     * @param k the position to fill
357     * @param x the item to insert
358 jsr166 1.134 * @param es the heap array
359 dl 1.66 * @param n heap size
360 dl 1.59 */
361 jsr166 1.134 private static <T> void siftDownComparable(int k, T x, Object[] es, int n) {
362     // assert n > 0;
363     Comparable<? super T> key = (Comparable<? super T>)x;
364     int half = n >>> 1; // loop while a non-leaf
365     while (k < half) {
366     int child = (k << 1) + 1; // assume left child is least
367     Object c = es[child];
368     int right = child + 1;
369     if (right < n &&
370     ((Comparable<? super T>) c).compareTo((T) es[right]) > 0)
371     c = es[child = right];
372     if (key.compareTo((T) c) <= 0)
373     break;
374     es[k] = c;
375     k = child;
376 dl 1.59 }
377 jsr166 1.134 es[k] = key;
378 dl 1.59 }
379    
380 jsr166 1.134 private static <T> void siftDownUsingComparator(
381     int k, T x, Object[] es, int n, Comparator<? super T> cmp) {
382     // assert n > 0;
383     int half = n >>> 1;
384     while (k < half) {
385     int child = (k << 1) + 1;
386     Object c = es[child];
387     int right = child + 1;
388     if (right < n && cmp.compare((T) c, (T) es[right]) > 0)
389     c = es[child = right];
390     if (cmp.compare(x, (T) c) <= 0)
391     break;
392     es[k] = c;
393     k = child;
394 dl 1.59 }
395 jsr166 1.134 es[k] = x;
396 dl 1.7 }
397    
398 dholmes 1.10 /**
399 dl 1.59 * Establishes the heap invariant (described above) in the entire tree,
400     * assuming nothing about the order of the elements prior to the call.
401 jsr166 1.118 * This classic algorithm due to Floyd (1964) is known to be O(size).
402 dl 1.59 */
403     private void heapify() {
404 jsr166 1.134 final Object[] es = queue;
405 jsr166 1.127 int n = size, i = (n >>> 1) - 1;
406 jsr166 1.135 final Comparator<? super E> cmp;
407     if ((cmp = comparator) == null)
408 jsr166 1.127 for (; i >= 0; i--)
409 jsr166 1.134 siftDownComparable(i, (E) es[i], es, n);
410     else
411 jsr166 1.127 for (; i >= 0; i--)
412 jsr166 1.134 siftDownUsingComparator(i, (E) es[i], es, n, cmp);
413 dl 1.59 }
414    
415     /**
416 jsr166 1.42 * Inserts the specified element into this priority queue.
417     *
418 jsr166 1.40 * @param e the element to add
419 jsr166 1.63 * @return {@code true} (as specified by {@link Collection#add})
420 dholmes 1.16 * @throws ClassCastException if the specified element cannot be compared
421 jsr166 1.42 * with elements currently in the priority queue according to the
422     * priority queue's ordering
423     * @throws NullPointerException if the specified element is null
424 dholmes 1.10 */
425 jsr166 1.40 public boolean add(E e) {
426 jsr166 1.42 return offer(e);
427 dl 1.5 }
428    
429 dholmes 1.16 /**
430 dl 1.24 * Inserts the specified element into this priority queue.
431 jsr166 1.64 * As the queue is unbounded, this method will never return {@code false}.
432 dholmes 1.16 *
433 jsr166 1.40 * @param e the element to add
434 jsr166 1.63 * @return {@code true} (as specified by {@link Queue#offer})
435 dholmes 1.16 * @throws ClassCastException if the specified element cannot be compared
436 jsr166 1.42 * with elements currently in the priority queue according to the
437     * priority queue's ordering
438     * @throws NullPointerException if the specified element is null
439 dholmes 1.16 */
440 jsr166 1.40 public boolean offer(E e) {
441 dl 1.59 if (e == null)
442     throw new NullPointerException();
443 dl 1.31 final ReentrantLock lock = this.lock;
444 dl 1.5 lock.lock();
445 dl 1.66 int n, cap;
446 jsr166 1.138 Object[] es;
447     while ((n = size) >= (cap = (es = queue).length))
448     tryGrow(es, cap);
449 dl 1.59 try {
450 jsr166 1.135 final Comparator<? super E> cmp;
451     if ((cmp = comparator) == null)
452 jsr166 1.138 siftUpComparable(n, e, es);
453 dl 1.59 else
454 jsr166 1.138 siftUpUsingComparator(n, e, es, cmp);
455 dl 1.66 size = n + 1;
456 dl 1.5 notEmpty.signal();
457 tim 1.19 } finally {
458 tim 1.13 lock.unlock();
459 dl 1.5 }
460 dl 1.59 return true;
461 dl 1.5 }
462    
463 dholmes 1.16 /**
464 jsr166 1.64 * Inserts the specified element into this priority queue.
465     * As the queue is unbounded, this method will never block.
466 jsr166 1.42 *
467 jsr166 1.40 * @param e the element to add
468 jsr166 1.42 * @throws ClassCastException if the specified element cannot be compared
469     * with elements currently in the priority queue according to the
470     * priority queue's ordering
471     * @throws NullPointerException if the specified element is null
472 dholmes 1.16 */
473 jsr166 1.40 public void put(E e) {
474     offer(e); // never need to block
475 dl 1.5 }
476    
477 dholmes 1.16 /**
478 jsr166 1.64 * Inserts the specified element into this priority queue.
479     * As the queue is unbounded, this method will never block or
480     * return {@code false}.
481 jsr166 1.42 *
482 jsr166 1.40 * @param e the element to add
483 dholmes 1.16 * @param timeout This parameter is ignored as the method never blocks
484     * @param unit This parameter is ignored as the method never blocks
485 jsr166 1.65 * @return {@code true} (as specified by
486     * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
487 jsr166 1.42 * @throws ClassCastException if the specified element cannot be compared
488     * with elements currently in the priority queue according to the
489     * priority queue's ordering
490     * @throws NullPointerException if the specified element is null
491 dholmes 1.16 */
492 jsr166 1.40 public boolean offer(E e, long timeout, TimeUnit unit) {
493     return offer(e); // never need to block
494 dl 1.5 }
495    
496 jsr166 1.42 public E poll() {
497     final ReentrantLock lock = this.lock;
498     lock.lock();
499     try {
500 jsr166 1.79 return dequeue();
501 jsr166 1.42 } finally {
502     lock.unlock();
503     }
504     }
505    
506 dl 1.5 public E take() throws InterruptedException {
507 dl 1.31 final ReentrantLock lock = this.lock;
508 dl 1.5 lock.lockInterruptibly();
509 dl 1.66 E result;
510 dl 1.5 try {
511 jsr166 1.79 while ( (result = dequeue()) == null)
512 jsr166 1.55 notEmpty.await();
513 tim 1.19 } finally {
514 dl 1.5 lock.unlock();
515     }
516 dl 1.59 return result;
517 dl 1.5 }
518    
519     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
520 dholmes 1.10 long nanos = unit.toNanos(timeout);
521 dl 1.31 final ReentrantLock lock = this.lock;
522 dl 1.5 lock.lockInterruptibly();
523 dl 1.66 E result;
524 dl 1.5 try {
525 jsr166 1.79 while ( (result = dequeue()) == null && nanos > 0)
526 jsr166 1.55 nanos = notEmpty.awaitNanos(nanos);
527 tim 1.19 } finally {
528 dl 1.5 lock.unlock();
529     }
530 dl 1.59 return result;
531 dl 1.5 }
532    
533     public E peek() {
534 dl 1.31 final ReentrantLock lock = this.lock;
535 dl 1.5 lock.lock();
536     try {
537 jsr166 1.135 return (E) queue[0];
538 tim 1.19 } finally {
539 tim 1.13 lock.unlock();
540 dl 1.5 }
541     }
542 jsr166 1.61
543 jsr166 1.42 /**
544     * Returns the comparator used to order the elements in this queue,
545 jsr166 1.63 * or {@code null} if this queue uses the {@linkplain Comparable
546 jsr166 1.42 * natural ordering} of its elements.
547     *
548     * @return the comparator used to order the elements in this queue,
549 jsr166 1.63 * or {@code null} if this queue uses the natural
550 jsr166 1.52 * ordering of its elements
551 jsr166 1.42 */
552     public Comparator<? super E> comparator() {
553 dl 1.59 return comparator;
554 jsr166 1.42 }
555    
556 dl 1.5 public int size() {
557 dl 1.31 final ReentrantLock lock = this.lock;
558 dl 1.5 lock.lock();
559     try {
560 jsr166 1.68 return size;
561 tim 1.19 } finally {
562 dl 1.5 lock.unlock();
563     }
564     }
565    
566     /**
567 jsr166 1.63 * Always returns {@code Integer.MAX_VALUE} because
568     * a {@code PriorityBlockingQueue} is not capacity constrained.
569     * @return {@code Integer.MAX_VALUE} always
570 dl 1.5 */
571     public int remainingCapacity() {
572     return Integer.MAX_VALUE;
573     }
574    
575 dl 1.59 private int indexOf(Object o) {
576     if (o != null) {
577 jsr166 1.133 final Object[] es = queue;
578     for (int i = 0, n = size; i < n; i++)
579     if (o.equals(es[i]))
580 dl 1.59 return i;
581     }
582     return -1;
583     }
584    
585     /**
586     * Removes the ith element from queue.
587     */
588     private void removeAt(int i) {
589 jsr166 1.134 final Object[] es = queue;
590 jsr166 1.135 final int n = size - 1;
591 dl 1.66 if (n == i) // removed last element
592 jsr166 1.134 es[i] = null;
593 dl 1.59 else {
594 jsr166 1.134 E moved = (E) es[n];
595     es[n] = null;
596 jsr166 1.135 final Comparator<? super E> cmp;
597     if ((cmp = comparator) == null)
598 jsr166 1.134 siftDownComparable(i, moved, es, n);
599 dl 1.66 else
600 jsr166 1.134 siftDownUsingComparator(i, moved, es, n, cmp);
601     if (es[i] == moved) {
602 dl 1.66 if (cmp == null)
603 jsr166 1.134 siftUpComparable(i, moved, es);
604 dl 1.66 else
605 jsr166 1.134 siftUpUsingComparator(i, moved, es, cmp);
606 dl 1.66 }
607 dl 1.59 }
608 dl 1.66 size = n;
609 dl 1.59 }
610    
611 dl 1.37 /**
612 jsr166 1.42 * Removes a single instance of the specified element from this queue,
613 jsr166 1.52 * if it is present. More formally, removes an element {@code e} such
614     * that {@code o.equals(e)}, if this queue contains one or more such
615     * elements. Returns {@code true} if and only if this queue contained
616     * the specified element (or equivalently, if this queue changed as a
617     * result of the call).
618 jsr166 1.42 *
619     * @param o element to be removed from this queue, if present
620 jsr166 1.63 * @return {@code true} if this queue changed as a result of the call
621 dl 1.37 */
622 dholmes 1.14 public boolean remove(Object o) {
623 dl 1.31 final ReentrantLock lock = this.lock;
624 dl 1.5 lock.lock();
625     try {
626 dl 1.59 int i = indexOf(o);
627 jsr166 1.78 if (i == -1)
628     return false;
629     removeAt(i);
630     return true;
631 dl 1.59 } finally {
632     lock.unlock();
633     }
634     }
635    
636     /**
637 jsr166 1.112 * Identity-based version for use in Itr.remove.
638 jsr166 1.133 *
639     * @param o element to be removed from this queue, if present
640 dl 1.59 */
641 jsr166 1.133 void removeEq(Object o) {
642 dl 1.59 final ReentrantLock lock = this.lock;
643     lock.lock();
644     try {
645 jsr166 1.133 final Object[] es = queue;
646 jsr166 1.78 for (int i = 0, n = size; i < n; i++) {
647 jsr166 1.133 if (o == es[i]) {
648 dl 1.59 removeAt(i);
649     break;
650     }
651     }
652 tim 1.19 } finally {
653 dl 1.5 lock.unlock();
654     }
655     }
656    
657 jsr166 1.42 /**
658 jsr166 1.52 * Returns {@code true} if this queue contains the specified element.
659     * More formally, returns {@code true} if and only if this queue contains
660     * at least one element {@code e} such that {@code o.equals(e)}.
661 jsr166 1.42 *
662     * @param o object to be checked for containment in this queue
663 jsr166 1.63 * @return {@code true} if this queue contains the specified element
664 jsr166 1.42 */
665 dholmes 1.14 public boolean contains(Object o) {
666 dl 1.31 final ReentrantLock lock = this.lock;
667 dl 1.5 lock.lock();
668     try {
669 jsr166 1.78 return indexOf(o) != -1;
670 tim 1.19 } finally {
671 dl 1.5 lock.unlock();
672     }
673     }
674    
675     public String toString() {
676 jsr166 1.111 return Helpers.collectionToString(this);
677 dl 1.5 }
678    
679 jsr166 1.42 /**
680     * @throws UnsupportedOperationException {@inheritDoc}
681     * @throws ClassCastException {@inheritDoc}
682     * @throws NullPointerException {@inheritDoc}
683     * @throws IllegalArgumentException {@inheritDoc}
684     */
685 dl 1.26 public int drainTo(Collection<? super E> c) {
686 jsr166 1.76 return drainTo(c, Integer.MAX_VALUE);
687 dl 1.26 }
688    
689 jsr166 1.42 /**
690     * @throws UnsupportedOperationException {@inheritDoc}
691     * @throws ClassCastException {@inheritDoc}
692     * @throws NullPointerException {@inheritDoc}
693     * @throws IllegalArgumentException {@inheritDoc}
694     */
695 dl 1.26 public int drainTo(Collection<? super E> c, int maxElements) {
696 jsr166 1.124 Objects.requireNonNull(c);
697 dl 1.26 if (c == this)
698     throw new IllegalArgumentException();
699     if (maxElements <= 0)
700     return 0;
701 dl 1.31 final ReentrantLock lock = this.lock;
702 dl 1.26 lock.lock();
703     try {
704 jsr166 1.76 int n = Math.min(size, maxElements);
705     for (int i = 0; i < n; i++) {
706     c.add((E) queue[0]); // In this order, in case add() throws.
707 jsr166 1.79 dequeue();
708 dl 1.26 }
709     return n;
710     } finally {
711     lock.unlock();
712     }
713     }
714    
715 dl 1.17 /**
716 dl 1.37 * Atomically removes all of the elements from this queue.
717 dl 1.17 * The queue will be empty after this call returns.
718     */
719     public void clear() {
720 dl 1.31 final ReentrantLock lock = this.lock;
721 dl 1.17 lock.lock();
722     try {
723 jsr166 1.133 final Object[] es = queue;
724     for (int i = 0, n = size; i < n; i++)
725     es[i] = null;
726 dl 1.59 size = 0;
727 tim 1.19 } finally {
728 dl 1.17 lock.unlock();
729     }
730     }
731    
732 jsr166 1.42 /**
733 jsr166 1.110 * Returns an array containing all of the elements in this queue.
734     * The returned array elements are in no particular order.
735     *
736     * <p>The returned array will be "safe" in that no references to it are
737     * maintained by this queue. (In other words, this method must allocate
738     * a new array). The caller is thus free to modify the returned array.
739     *
740     * <p>This method acts as bridge between array-based and collection-based
741     * APIs.
742     *
743     * @return an array containing all of the elements in this queue
744     */
745     public Object[] toArray() {
746     final ReentrantLock lock = this.lock;
747     lock.lock();
748     try {
749     return Arrays.copyOf(queue, size);
750     } finally {
751     lock.unlock();
752     }
753     }
754    
755     /**
756 jsr166 1.42 * Returns an array containing all of the elements in this queue; the
757     * runtime type of the returned array is that of the specified array.
758     * The returned array elements are in no particular order.
759     * If the queue fits in the specified array, it is returned therein.
760     * Otherwise, a new array is allocated with the runtime type of the
761     * specified array and the size of this queue.
762     *
763     * <p>If this queue fits in the specified array with room to spare
764     * (i.e., the array has more elements than this queue), the element in
765     * the array immediately following the end of the queue is set to
766 jsr166 1.63 * {@code null}.
767 jsr166 1.42 *
768     * <p>Like the {@link #toArray()} method, this method acts as bridge between
769     * array-based and collection-based APIs. Further, this method allows
770     * precise control over the runtime type of the output array, and may,
771     * under certain circumstances, be used to save allocation costs.
772     *
773 jsr166 1.63 * <p>Suppose {@code x} is a queue known to contain only strings.
774 jsr166 1.42 * The following code can be used to dump the queue into a newly
775 jsr166 1.63 * allocated array of {@code String}:
776 jsr166 1.42 *
777 jsr166 1.109 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
778 jsr166 1.42 *
779 jsr166 1.63 * Note that {@code toArray(new Object[0])} is identical in function to
780     * {@code toArray()}.
781 jsr166 1.42 *
782     * @param a the array into which the elements of the queue are to
783     * be stored, if it is big enough; otherwise, a new array of the
784     * same runtime type is allocated for this purpose
785     * @return an array containing all of the elements in this queue
786     * @throws ArrayStoreException if the runtime type of the specified array
787     * is not a supertype of the runtime type of every element in
788     * this queue
789     * @throws NullPointerException if the specified array is null
790     */
791 dl 1.5 public <T> T[] toArray(T[] a) {
792 dl 1.31 final ReentrantLock lock = this.lock;
793 dl 1.5 lock.lock();
794     try {
795 dl 1.66 int n = size;
796     if (a.length < n)
797 dl 1.59 // Make a new array of a's runtime type, but my contents:
798     return (T[]) Arrays.copyOf(queue, size, a.getClass());
799 dl 1.66 System.arraycopy(queue, 0, a, 0, n);
800     if (a.length > n)
801     a[n] = null;
802 dl 1.59 return a;
803 tim 1.19 } finally {
804 dl 1.5 lock.unlock();
805     }
806     }
807    
808 dholmes 1.16 /**
809 dl 1.23 * Returns an iterator over the elements in this queue. The
810     * iterator does not return the elements in any particular order.
811 jsr166 1.69 *
812 jsr166 1.103 * <p>The returned iterator is
813     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
814 dholmes 1.16 *
815 jsr166 1.42 * @return an iterator over the elements in this queue
816 dholmes 1.16 */
817 dl 1.5 public Iterator<E> iterator() {
818 dl 1.51 return new Itr(toArray());
819 dl 1.5 }
820    
821 dl 1.49 /**
822     * Snapshot iterator that works off copy of underlying q array.
823     */
824 dl 1.59 final class Itr implements Iterator<E> {
825 dl 1.49 final Object[] array; // Array of all elements
826 jsr166 1.81 int cursor; // index of next element to return
827 jsr166 1.136 int lastRet = -1; // index of last element, or -1 if no such
828 jsr166 1.50
829 dl 1.49 Itr(Object[] array) {
830     this.array = array;
831 dl 1.5 }
832    
833 tim 1.13 public boolean hasNext() {
834 dl 1.49 return cursor < array.length;
835 tim 1.13 }
836    
837     public E next() {
838 dl 1.49 if (cursor >= array.length)
839     throw new NoSuchElementException();
840 jsr166 1.120 return (E)array[lastRet = cursor++];
841 tim 1.13 }
842    
843     public void remove() {
844 jsr166 1.50 if (lastRet < 0)
845 jsr166 1.54 throw new IllegalStateException();
846 jsr166 1.133 removeEq(array[lastRet]);
847 dl 1.49 lastRet = -1;
848 tim 1.13 }
849 jsr166 1.137
850     public void forEachRemaining(Consumer<? super E> action) {
851     Objects.requireNonNull(action);
852     final Object[] es = array;
853     int i;
854     if ((i = cursor) < es.length) {
855     lastRet = -1;
856     cursor = es.length;
857     for (; i < es.length; i++)
858     action.accept((E) es[i]);
859     lastRet = es.length - 1;
860     }
861     }
862 dl 1.5 }
863    
864     /**
865 jsr166 1.83 * Saves this queue to a stream (that is, serializes it).
866     *
867     * For compatibility with previous version of this class, elements
868     * are first copied to a java.util.PriorityQueue, which is then
869     * serialized.
870 jsr166 1.97 *
871     * @param s the stream
872 jsr166 1.98 * @throws java.io.IOException if an I/O error occurs
873 dl 1.5 */
874     private void writeObject(java.io.ObjectOutputStream s)
875     throws java.io.IOException {
876     lock.lock();
877     try {
878 jsr166 1.78 // avoid zero capacity argument
879     q = new PriorityQueue<E>(Math.max(size, 1), comparator);
880 dl 1.59 q.addAll(this);
881 dl 1.5 s.defaultWriteObject();
882 dl 1.66 } finally {
883 dl 1.59 q = null;
884 dl 1.5 lock.unlock();
885     }
886 tim 1.1 }
887    
888 dl 1.59 /**
889 jsr166 1.83 * Reconstitutes this queue from a stream (that is, deserializes it).
890 jsr166 1.97 * @param s the stream
891 jsr166 1.98 * @throws ClassNotFoundException if the class of a serialized object
892     * could not be found
893     * @throws java.io.IOException if an I/O error occurs
894 dl 1.59 */
895     private void readObject(java.io.ObjectInputStream s)
896     throws java.io.IOException, ClassNotFoundException {
897 jsr166 1.67 try {
898 dl 1.66 s.defaultReadObject();
899 jsr166 1.131 int sz = q.size();
900 jsr166 1.141 jsr166.Platform.checkArray(s, Object[].class, sz);
901 jsr166 1.135 this.queue = new Object[Math.max(1, sz)];
902 dl 1.66 comparator = q.comparator();
903     addAll(q);
904 jsr166 1.67 } finally {
905 dl 1.66 q = null;
906     }
907 dl 1.59 }
908    
909 jsr166 1.116 /**
910     * Immutable snapshot spliterator that binds to elements "late".
911     */
912 jsr166 1.119 final class PBQSpliterator implements Spliterator<E> {
913 jsr166 1.125 Object[] array; // null until late-bound-initialized
914 dl 1.93 int index;
915     int fence;
916    
917 jsr166 1.125 PBQSpliterator() {}
918    
919 jsr166 1.119 PBQSpliterator(Object[] array, int index, int fence) {
920 dl 1.93 this.array = array;
921     this.index = index;
922     this.fence = fence;
923     }
924    
925 jsr166 1.125 private int getFence() {
926     if (array == null)
927     fence = (array = toArray()).length;
928     return fence;
929 dl 1.93 }
930    
931 jsr166 1.119 public PBQSpliterator trySplit() {
932 dl 1.93 int hi = getFence(), lo = index, mid = (lo + hi) >>> 1;
933     return (lo >= mid) ? null :
934 jsr166 1.119 new PBQSpliterator(array, lo, index = mid);
935 dl 1.93 }
936    
937 dl 1.95 public void forEachRemaining(Consumer<? super E> action) {
938 jsr166 1.124 Objects.requireNonNull(action);
939 jsr166 1.125 final int hi = getFence(), lo = index;
940 jsr166 1.134 final Object[] es = array;
941 jsr166 1.125 index = hi; // ensure exhaustion
942     for (int i = lo; i < hi; i++)
943 jsr166 1.134 action.accept((E) es[i]);
944 dl 1.93 }
945    
946     public boolean tryAdvance(Consumer<? super E> action) {
947 jsr166 1.124 Objects.requireNonNull(action);
948 dl 1.93 if (getFence() > index && index >= 0) {
949 jsr166 1.125 action.accept((E) array[index++]);
950 dl 1.93 return true;
951     }
952     return false;
953     }
954    
955 jsr166 1.119 public long estimateSize() { return getFence() - index; }
956 dl 1.93
957     public int characteristics() {
958 jsr166 1.123 return (Spliterator.NONNULL |
959     Spliterator.SIZED |
960     Spliterator.SUBSIZED);
961 dl 1.93 }
962     }
963    
964 jsr166 1.102 /**
965     * Returns a {@link Spliterator} over the elements in this queue.
966 jsr166 1.117 * The spliterator does not traverse elements in any particular order
967     * (the {@link Spliterator#ORDERED ORDERED} characteristic is not reported).
968 jsr166 1.102 *
969 jsr166 1.103 * <p>The returned spliterator is
970     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
971     *
972 jsr166 1.102 * <p>The {@code Spliterator} reports {@link Spliterator#SIZED} and
973     * {@link Spliterator#NONNULL}.
974     *
975     * @implNote
976     * The {@code Spliterator} additionally reports {@link Spliterator#SUBSIZED}.
977     *
978     * @return a {@code Spliterator} over the elements in this queue
979     * @since 1.8
980     */
981 dl 1.94 public Spliterator<E> spliterator() {
982 jsr166 1.125 return new PBQSpliterator();
983 dl 1.86 }
984    
985 jsr166 1.132 /**
986     * @throws NullPointerException {@inheritDoc}
987     */
988 jsr166 1.139 public boolean removeIf(Predicate<? super E> filter) {
989     Objects.requireNonNull(filter);
990     return bulkRemove(filter);
991     }
992    
993     /**
994     * @throws NullPointerException {@inheritDoc}
995     */
996     public boolean removeAll(Collection<?> c) {
997     Objects.requireNonNull(c);
998     return bulkRemove(e -> c.contains(e));
999     }
1000    
1001     /**
1002     * @throws NullPointerException {@inheritDoc}
1003     */
1004     public boolean retainAll(Collection<?> c) {
1005     Objects.requireNonNull(c);
1006     return bulkRemove(e -> !c.contains(e));
1007     }
1008    
1009     // A tiny bit set implementation
1010    
1011     private static long[] nBits(int n) {
1012     return new long[((n - 1) >> 6) + 1];
1013     }
1014     private static void setBit(long[] bits, int i) {
1015     bits[i >> 6] |= 1L << i;
1016     }
1017     private static boolean isClear(long[] bits, int i) {
1018     return (bits[i >> 6] & (1L << i)) == 0;
1019     }
1020    
1021     /** Implementation of bulk remove methods. */
1022     private boolean bulkRemove(Predicate<? super E> filter) {
1023     final ReentrantLock lock = this.lock;
1024     lock.lock();
1025     try {
1026     final Object[] es = queue;
1027     final int end = size;
1028     int i;
1029     // Optimize for initial run of survivors
1030     for (i = 0; i < end && !filter.test((E) es[i]); i++)
1031     ;
1032     if (i >= end)
1033     return false;
1034     // Tolerate predicates that reentrantly access the
1035     // collection for read, so traverse once to find elements
1036     // to delete, a second pass to physically expunge.
1037     final int beg = i;
1038     final long[] deathRow = nBits(end - beg);
1039     deathRow[0] = 1L; // set bit 0
1040     for (i = beg + 1; i < end; i++)
1041     if (filter.test((E) es[i]))
1042     setBit(deathRow, i - beg);
1043     int w = beg;
1044     for (i = beg; i < end; i++)
1045     if (isClear(deathRow, i - beg))
1046     es[w++] = es[i];
1047     for (i = size = w; i < end; i++)
1048     es[i] = null;
1049     heapify();
1050     return true;
1051     } finally {
1052     lock.unlock();
1053     }
1054     }
1055    
1056     /**
1057     * @throws NullPointerException {@inheritDoc}
1058     */
1059 jsr166 1.132 public void forEach(Consumer<? super E> action) {
1060     Objects.requireNonNull(action);
1061     final ReentrantLock lock = this.lock;
1062     lock.lock();
1063     try {
1064     final Object[] es = queue;
1065     for (int i = 0, n = size; i < n; i++)
1066     action.accept((E) es[i]);
1067     } finally {
1068     lock.unlock();
1069     }
1070     }
1071    
1072 dl 1.115 // VarHandle mechanics
1073     private static final VarHandle ALLOCATIONSPINLOCK;
1074 dl 1.70 static {
1075 dl 1.59 try {
1076 dl 1.115 MethodHandles.Lookup l = MethodHandles.lookup();
1077     ALLOCATIONSPINLOCK = l.findVarHandle(PriorityBlockingQueue.class,
1078     "allocationSpinLock",
1079     int.class);
1080 jsr166 1.107 } catch (ReflectiveOperationException e) {
1081 jsr166 1.130 throw new ExceptionInInitializerError(e);
1082 dl 1.59 }
1083     }
1084 tim 1.1 }