ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.84
Committed: Thu Dec 22 23:22:59 2011 UTC (12 years, 5 months ago) by jsr166
Branch: MAIN
Changes since 1.83: +2 -1 lines
Log Message:
fix imports

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.util.concurrent.locks.Condition;
10 import java.util.concurrent.locks.ReentrantLock;
11 import java.util.*;
12
13 /**
14 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
15 * the same ordering rules as class {@link PriorityQueue} and supplies
16 * blocking retrieval operations. While this queue is logically
17 * unbounded, attempted additions may fail due to resource exhaustion
18 * (causing {@code OutOfMemoryError}). This class does not permit
19 * {@code null} elements. A priority queue relying on {@linkplain
20 * Comparable natural ordering} also does not permit insertion of
21 * non-comparable objects (doing so results in
22 * {@code ClassCastException}).
23 *
24 * <p>This class and its iterator implement all of the
25 * <em>optional</em> methods of the {@link Collection} and {@link
26 * Iterator} interfaces. The Iterator provided in method {@link
27 * #iterator()} is <em>not</em> guaranteed to traverse the elements of
28 * the PriorityBlockingQueue in any particular order. If you need
29 * ordered traversal, consider using
30 * {@code Arrays.sort(pq.toArray())}. Also, method {@code drainTo}
31 * can be used to <em>remove</em> some or all elements in priority
32 * order and place them in another collection.
33 *
34 * <p>Operations on this class make no guarantees about the ordering
35 * of elements with equal priority. If you need to enforce an
36 * ordering, you can define custom classes or comparators that use a
37 * secondary key to break ties in primary priority values. For
38 * example, here is a class that applies first-in-first-out
39 * tie-breaking to comparable elements. To use it, you would insert a
40 * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
41 *
42 * <pre> {@code
43 * class FIFOEntry<E extends Comparable<? super E>>
44 * implements Comparable<FIFOEntry<E>> {
45 * static final AtomicLong seq = new AtomicLong(0);
46 * final long seqNum;
47 * final E entry;
48 * public FIFOEntry(E entry) {
49 * seqNum = seq.getAndIncrement();
50 * this.entry = entry;
51 * }
52 * public E getEntry() { return entry; }
53 * public int compareTo(FIFOEntry<E> other) {
54 * int res = entry.compareTo(other.entry);
55 * if (res == 0 && other.entry != this.entry)
56 * res = (seqNum < other.seqNum ? -1 : 1);
57 * return res;
58 * }
59 * }}</pre>
60 *
61 * <p>This class is a member of the
62 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
63 * Java Collections Framework</a>.
64 *
65 * @since 1.5
66 * @author Doug Lea
67 * @param <E> the type of elements held in this collection
68 */
69 @SuppressWarnings("unchecked")
70 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
71 implements BlockingQueue<E>, java.io.Serializable {
72 private static final long serialVersionUID = 5595510919245408276L;
73
74 /*
75 * The implementation uses an array-based binary heap, with public
76 * operations protected with a single lock. However, allocation
77 * during resizing uses a simple spinlock (used only while not
78 * holding main lock) in order to allow takes to operate
79 * concurrently with allocation. This avoids repeated
80 * postponement of waiting consumers and consequent element
81 * build-up. The need to back away from lock during allocation
82 * makes it impossible to simply wrap delegated
83 * java.util.PriorityQueue operations within a lock, as was done
84 * in a previous version of this class. To maintain
85 * interoperability, a plain PriorityQueue is still used during
86 * serialization, which maintains compatibility at the expense of
87 * transiently doubling overhead.
88 */
89
90 /**
91 * Default array capacity.
92 */
93 private static final int DEFAULT_INITIAL_CAPACITY = 11;
94
95 /**
96 * The maximum size of array to allocate.
97 * Some VMs reserve some header words in an array.
98 * Attempts to allocate larger arrays may result in
99 * OutOfMemoryError: Requested array size exceeds VM limit
100 */
101 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
102
103 /**
104 * Priority queue represented as a balanced binary heap: the two
105 * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
106 * priority queue is ordered by comparator, or by the elements'
107 * natural ordering, if comparator is null: For each node n in the
108 * heap and each descendant d of n, n <= d. The element with the
109 * lowest value is in queue[0], assuming the queue is nonempty.
110 */
111 private transient Object[] queue;
112
113 /**
114 * The number of elements in the priority queue.
115 */
116 private transient int size;
117
118 /**
119 * The comparator, or null if priority queue uses elements'
120 * natural ordering.
121 */
122 private transient Comparator<? super E> comparator;
123
124 /**
125 * Lock used for all public operations
126 */
127 private final ReentrantLock lock;
128
129 /**
130 * Condition for blocking when empty
131 */
132 private final Condition notEmpty;
133
134 /**
135 * Spinlock for allocation, acquired via CAS.
136 */
137 private transient volatile int allocationSpinLock;
138
139 /**
140 * A plain PriorityQueue used only for serialization,
141 * to maintain compatibility with previous versions
142 * of this class. Non-null only during serialization/deserialization.
143 */
144 private PriorityQueue<E> q;
145
146 /**
147 * Creates a {@code PriorityBlockingQueue} with the default
148 * initial capacity (11) that orders its elements according to
149 * their {@linkplain Comparable natural ordering}.
150 */
151 public PriorityBlockingQueue() {
152 this(DEFAULT_INITIAL_CAPACITY, null);
153 }
154
155 /**
156 * Creates a {@code PriorityBlockingQueue} with the specified
157 * initial capacity that orders its elements according to their
158 * {@linkplain Comparable natural ordering}.
159 *
160 * @param initialCapacity the initial capacity for this priority queue
161 * @throws IllegalArgumentException if {@code initialCapacity} is less
162 * than 1
163 */
164 public PriorityBlockingQueue(int initialCapacity) {
165 this(initialCapacity, null);
166 }
167
168 /**
169 * Creates a {@code PriorityBlockingQueue} with the specified initial
170 * capacity that orders its elements according to the specified
171 * comparator.
172 *
173 * @param initialCapacity the initial capacity for this priority queue
174 * @param comparator the comparator that will be used to order this
175 * priority queue. If {@code null}, the {@linkplain Comparable
176 * natural ordering} of the elements will be used.
177 * @throws IllegalArgumentException if {@code initialCapacity} is less
178 * than 1
179 */
180 public PriorityBlockingQueue(int initialCapacity,
181 Comparator<? super E> comparator) {
182 if (initialCapacity < 1)
183 throw new IllegalArgumentException();
184 this.lock = new ReentrantLock();
185 this.notEmpty = lock.newCondition();
186 this.comparator = comparator;
187 this.queue = new Object[initialCapacity];
188 }
189
190 /**
191 * Creates a {@code PriorityBlockingQueue} containing the elements
192 * in the specified collection. If the specified collection is a
193 * {@link SortedSet} or a {@link PriorityQueue}, this
194 * priority queue will be ordered according to the same ordering.
195 * Otherwise, this priority queue will be ordered according to the
196 * {@linkplain Comparable natural ordering} of its elements.
197 *
198 * @param c the collection whose elements are to be placed
199 * into this priority queue
200 * @throws ClassCastException if elements of the specified collection
201 * cannot be compared to one another according to the priority
202 * queue's ordering
203 * @throws NullPointerException if the specified collection or any
204 * of its elements are null
205 */
206 public PriorityBlockingQueue(Collection<? extends E> c) {
207 this.lock = new ReentrantLock();
208 this.notEmpty = lock.newCondition();
209 boolean heapify = true; // true if not known to be in heap order
210 boolean screen = true; // true if must screen for nulls
211 if (c instanceof SortedSet<?>) {
212 SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
213 this.comparator = (Comparator<? super E>) ss.comparator();
214 heapify = false;
215 }
216 else if (c instanceof PriorityBlockingQueue<?>) {
217 PriorityBlockingQueue<? extends E> pq =
218 (PriorityBlockingQueue<? extends E>) c;
219 this.comparator = (Comparator<? super E>) pq.comparator();
220 screen = false;
221 if (pq.getClass() == PriorityBlockingQueue.class) // exact match
222 heapify = false;
223 }
224 Object[] a = c.toArray();
225 int n = a.length;
226 // If c.toArray incorrectly doesn't return Object[], copy it.
227 if (a.getClass() != Object[].class)
228 a = Arrays.copyOf(a, n, Object[].class);
229 if (screen && (n == 1 || this.comparator != null)) {
230 for (int i = 0; i < n; ++i)
231 if (a[i] == null)
232 throw new NullPointerException();
233 }
234 this.queue = a;
235 this.size = n;
236 if (heapify)
237 heapify();
238 }
239
240 /**
241 * Tries to grow array to accommodate at least one more element
242 * (but normally expand by about 50%), giving up (allowing retry)
243 * on contention (which we expect to be rare). Call only while
244 * holding lock.
245 *
246 * @param array the heap array
247 * @param oldCap the length of the array
248 */
249 private void tryGrow(Object[] array, int oldCap) {
250 lock.unlock(); // must release and then re-acquire main lock
251 Object[] newArray = null;
252 if (allocationSpinLock == 0 &&
253 UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
254 0, 1)) {
255 try {
256 int newCap = oldCap + ((oldCap < 64) ?
257 (oldCap + 2) : // grow faster if small
258 (oldCap >> 1));
259 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
260 int minCap = oldCap + 1;
261 if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
262 throw new OutOfMemoryError();
263 newCap = MAX_ARRAY_SIZE;
264 }
265 if (newCap > oldCap && queue == array)
266 newArray = new Object[newCap];
267 } finally {
268 allocationSpinLock = 0;
269 }
270 }
271 if (newArray == null) // back off if another thread is allocating
272 Thread.yield();
273 lock.lock();
274 if (newArray != null && queue == array) {
275 queue = newArray;
276 System.arraycopy(array, 0, newArray, 0, oldCap);
277 }
278 }
279
280 /**
281 * Mechanics for poll(). Call only while holding lock.
282 */
283 private E dequeue() {
284 int n = size - 1;
285 if (n < 0)
286 return null;
287 else {
288 Object[] array = queue;
289 E result = (E) array[0];
290 E x = (E) array[n];
291 array[n] = null;
292 Comparator<? super E> cmp = comparator;
293 if (cmp == null)
294 siftDownComparable(0, x, array, n);
295 else
296 siftDownUsingComparator(0, x, array, n, cmp);
297 size = n;
298 return result;
299 }
300 }
301
302 /**
303 * Inserts item x at position k, maintaining heap invariant by
304 * promoting x up the tree until it is greater than or equal to
305 * its parent, or is the root.
306 *
307 * To simplify and speed up coercions and comparisons. the
308 * Comparable and Comparator versions are separated into different
309 * methods that are otherwise identical. (Similarly for siftDown.)
310 * These methods are static, with heap state as arguments, to
311 * simplify use in light of possible comparator exceptions.
312 *
313 * @param k the position to fill
314 * @param x the item to insert
315 * @param array the heap array
316 * @param n heap size
317 */
318 private static <T> void siftUpComparable(int k, T x, Object[] array) {
319 Comparable<? super T> key = (Comparable<? super T>) x;
320 while (k > 0) {
321 int parent = (k - 1) >>> 1;
322 Object e = array[parent];
323 if (key.compareTo((T) e) >= 0)
324 break;
325 array[k] = e;
326 k = parent;
327 }
328 array[k] = key;
329 }
330
331 private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
332 Comparator<? super T> cmp) {
333 while (k > 0) {
334 int parent = (k - 1) >>> 1;
335 Object e = array[parent];
336 if (cmp.compare(x, (T) e) >= 0)
337 break;
338 array[k] = e;
339 k = parent;
340 }
341 array[k] = x;
342 }
343
344 /**
345 * Inserts item x at position k, maintaining heap invariant by
346 * demoting x down the tree repeatedly until it is less than or
347 * equal to its children or is a leaf.
348 *
349 * @param k the position to fill
350 * @param x the item to insert
351 * @param array the heap array
352 * @param n heap size
353 */
354 private static <T> void siftDownComparable(int k, T x, Object[] array,
355 int n) {
356 Comparable<? super T> key = (Comparable<? super T>)x;
357 int half = n >>> 1; // loop while a non-leaf
358 while (k < half) {
359 int child = (k << 1) + 1; // assume left child is least
360 Object c = array[child];
361 int right = child + 1;
362 if (right < n &&
363 ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
364 c = array[child = right];
365 if (key.compareTo((T) c) <= 0)
366 break;
367 array[k] = c;
368 k = child;
369 }
370 array[k] = key;
371 }
372
373 private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
374 int n,
375 Comparator<? super T> cmp) {
376 int half = n >>> 1;
377 while (k < half) {
378 int child = (k << 1) + 1;
379 Object c = array[child];
380 int right = child + 1;
381 if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
382 c = array[child = right];
383 if (cmp.compare(x, (T) c) <= 0)
384 break;
385 array[k] = c;
386 k = child;
387 }
388 array[k] = x;
389 }
390
391 /**
392 * Establishes the heap invariant (described above) in the entire tree,
393 * assuming nothing about the order of the elements prior to the call.
394 */
395 private void heapify() {
396 Object[] array = queue;
397 int n = size;
398 int half = (n >>> 1) - 1;
399 Comparator<? super E> cmp = comparator;
400 if (cmp == null) {
401 for (int i = half; i >= 0; i--)
402 siftDownComparable(i, (E) array[i], array, n);
403 }
404 else {
405 for (int i = half; i >= 0; i--)
406 siftDownUsingComparator(i, (E) array[i], array, n, cmp);
407 }
408 }
409
410 /**
411 * Inserts the specified element into this priority queue.
412 *
413 * @param e the element to add
414 * @return {@code true} (as specified by {@link Collection#add})
415 * @throws ClassCastException if the specified element cannot be compared
416 * with elements currently in the priority queue according to the
417 * priority queue's ordering
418 * @throws NullPointerException if the specified element is null
419 */
420 public boolean add(E e) {
421 return offer(e);
422 }
423
424 /**
425 * Inserts the specified element into this priority queue.
426 * As the queue is unbounded, this method will never return {@code false}.
427 *
428 * @param e the element to add
429 * @return {@code true} (as specified by {@link Queue#offer})
430 * @throws ClassCastException if the specified element cannot be compared
431 * with elements currently in the priority queue according to the
432 * priority queue's ordering
433 * @throws NullPointerException if the specified element is null
434 */
435 public boolean offer(E e) {
436 if (e == null)
437 throw new NullPointerException();
438 final ReentrantLock lock = this.lock;
439 lock.lock();
440 int n, cap;
441 Object[] array;
442 while ((n = size) >= (cap = (array = queue).length))
443 tryGrow(array, cap);
444 try {
445 Comparator<? super E> cmp = comparator;
446 if (cmp == null)
447 siftUpComparable(n, e, array);
448 else
449 siftUpUsingComparator(n, e, array, cmp);
450 size = n + 1;
451 notEmpty.signal();
452 } finally {
453 lock.unlock();
454 }
455 return true;
456 }
457
458 /**
459 * Inserts the specified element into this priority queue.
460 * As the queue is unbounded, this method will never block.
461 *
462 * @param e the element to add
463 * @throws ClassCastException if the specified element cannot be compared
464 * with elements currently in the priority queue according to the
465 * priority queue's ordering
466 * @throws NullPointerException if the specified element is null
467 */
468 public void put(E e) {
469 offer(e); // never need to block
470 }
471
472 /**
473 * Inserts the specified element into this priority queue.
474 * As the queue is unbounded, this method will never block or
475 * return {@code false}.
476 *
477 * @param e the element to add
478 * @param timeout This parameter is ignored as the method never blocks
479 * @param unit This parameter is ignored as the method never blocks
480 * @return {@code true} (as specified by
481 * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
482 * @throws ClassCastException if the specified element cannot be compared
483 * with elements currently in the priority queue according to the
484 * priority queue's ordering
485 * @throws NullPointerException if the specified element is null
486 */
487 public boolean offer(E e, long timeout, TimeUnit unit) {
488 return offer(e); // never need to block
489 }
490
491 public E poll() {
492 final ReentrantLock lock = this.lock;
493 lock.lock();
494 try {
495 return dequeue();
496 } finally {
497 lock.unlock();
498 }
499 }
500
501 public E take() throws InterruptedException {
502 final ReentrantLock lock = this.lock;
503 lock.lockInterruptibly();
504 E result;
505 try {
506 while ( (result = dequeue()) == null)
507 notEmpty.await();
508 } finally {
509 lock.unlock();
510 }
511 return result;
512 }
513
514 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
515 long nanos = unit.toNanos(timeout);
516 final ReentrantLock lock = this.lock;
517 lock.lockInterruptibly();
518 E result;
519 try {
520 while ( (result = dequeue()) == null && nanos > 0)
521 nanos = notEmpty.awaitNanos(nanos);
522 } finally {
523 lock.unlock();
524 }
525 return result;
526 }
527
528 public E peek() {
529 final ReentrantLock lock = this.lock;
530 lock.lock();
531 try {
532 return (size == 0) ? null : (E) queue[0];
533 } finally {
534 lock.unlock();
535 }
536 }
537
538 /**
539 * Returns the comparator used to order the elements in this queue,
540 * or {@code null} if this queue uses the {@linkplain Comparable
541 * natural ordering} of its elements.
542 *
543 * @return the comparator used to order the elements in this queue,
544 * or {@code null} if this queue uses the natural
545 * ordering of its elements
546 */
547 public Comparator<? super E> comparator() {
548 return comparator;
549 }
550
551 public int size() {
552 final ReentrantLock lock = this.lock;
553 lock.lock();
554 try {
555 return size;
556 } finally {
557 lock.unlock();
558 }
559 }
560
561 /**
562 * Always returns {@code Integer.MAX_VALUE} because
563 * a {@code PriorityBlockingQueue} is not capacity constrained.
564 * @return {@code Integer.MAX_VALUE} always
565 */
566 public int remainingCapacity() {
567 return Integer.MAX_VALUE;
568 }
569
570 private int indexOf(Object o) {
571 if (o != null) {
572 Object[] array = queue;
573 int n = size;
574 for (int i = 0; i < n; i++)
575 if (o.equals(array[i]))
576 return i;
577 }
578 return -1;
579 }
580
581 /**
582 * Removes the ith element from queue.
583 */
584 private void removeAt(int i) {
585 Object[] array = queue;
586 int n = size - 1;
587 if (n == i) // removed last element
588 array[i] = null;
589 else {
590 E moved = (E) array[n];
591 array[n] = null;
592 Comparator<? super E> cmp = comparator;
593 if (cmp == null)
594 siftDownComparable(i, moved, array, n);
595 else
596 siftDownUsingComparator(i, moved, array, n, cmp);
597 if (array[i] == moved) {
598 if (cmp == null)
599 siftUpComparable(i, moved, array);
600 else
601 siftUpUsingComparator(i, moved, array, cmp);
602 }
603 }
604 size = n;
605 }
606
607 /**
608 * Removes a single instance of the specified element from this queue,
609 * if it is present. More formally, removes an element {@code e} such
610 * that {@code o.equals(e)}, if this queue contains one or more such
611 * elements. Returns {@code true} if and only if this queue contained
612 * the specified element (or equivalently, if this queue changed as a
613 * result of the call).
614 *
615 * @param o element to be removed from this queue, if present
616 * @return {@code true} if this queue changed as a result of the call
617 */
618 public boolean remove(Object o) {
619 final ReentrantLock lock = this.lock;
620 lock.lock();
621 try {
622 int i = indexOf(o);
623 if (i == -1)
624 return false;
625 removeAt(i);
626 return true;
627 } finally {
628 lock.unlock();
629 }
630 }
631
632 /**
633 * Identity-based version for use in Itr.remove
634 */
635 void removeEQ(Object o) {
636 final ReentrantLock lock = this.lock;
637 lock.lock();
638 try {
639 Object[] array = queue;
640 for (int i = 0, n = size; i < n; i++) {
641 if (o == array[i]) {
642 removeAt(i);
643 break;
644 }
645 }
646 } finally {
647 lock.unlock();
648 }
649 }
650
651 /**
652 * Returns {@code true} if this queue contains the specified element.
653 * More formally, returns {@code true} if and only if this queue contains
654 * at least one element {@code e} such that {@code o.equals(e)}.
655 *
656 * @param o object to be checked for containment in this queue
657 * @return {@code true} if this queue contains the specified element
658 */
659 public boolean contains(Object o) {
660 final ReentrantLock lock = this.lock;
661 lock.lock();
662 try {
663 return indexOf(o) != -1;
664 } finally {
665 lock.unlock();
666 }
667 }
668
669 /**
670 * Returns an array containing all of the elements in this queue.
671 * The returned array elements are in no particular order.
672 *
673 * <p>The returned array will be "safe" in that no references to it are
674 * maintained by this queue. (In other words, this method must allocate
675 * a new array). The caller is thus free to modify the returned array.
676 *
677 * <p>This method acts as bridge between array-based and collection-based
678 * APIs.
679 *
680 * @return an array containing all of the elements in this queue
681 */
682 public Object[] toArray() {
683 final ReentrantLock lock = this.lock;
684 lock.lock();
685 try {
686 return Arrays.copyOf(queue, size);
687 } finally {
688 lock.unlock();
689 }
690 }
691
692 public String toString() {
693 final ReentrantLock lock = this.lock;
694 lock.lock();
695 try {
696 int n = size;
697 if (n == 0)
698 return "[]";
699 StringBuilder sb = new StringBuilder();
700 sb.append('[');
701 for (int i = 0; i < n; ++i) {
702 Object e = queue[i];
703 sb.append(e == this ? "(this Collection)" : e);
704 if (i != n - 1)
705 sb.append(',').append(' ');
706 }
707 return sb.append(']').toString();
708 } finally {
709 lock.unlock();
710 }
711 }
712
713 /**
714 * @throws UnsupportedOperationException {@inheritDoc}
715 * @throws ClassCastException {@inheritDoc}
716 * @throws NullPointerException {@inheritDoc}
717 * @throws IllegalArgumentException {@inheritDoc}
718 */
719 public int drainTo(Collection<? super E> c) {
720 return drainTo(c, Integer.MAX_VALUE);
721 }
722
723 /**
724 * @throws UnsupportedOperationException {@inheritDoc}
725 * @throws ClassCastException {@inheritDoc}
726 * @throws NullPointerException {@inheritDoc}
727 * @throws IllegalArgumentException {@inheritDoc}
728 */
729 public int drainTo(Collection<? super E> c, int maxElements) {
730 if (c == null)
731 throw new NullPointerException();
732 if (c == this)
733 throw new IllegalArgumentException();
734 if (maxElements <= 0)
735 return 0;
736 final ReentrantLock lock = this.lock;
737 lock.lock();
738 try {
739 int n = Math.min(size, maxElements);
740 for (int i = 0; i < n; i++) {
741 c.add((E) queue[0]); // In this order, in case add() throws.
742 dequeue();
743 }
744 return n;
745 } finally {
746 lock.unlock();
747 }
748 }
749
750 /**
751 * Atomically removes all of the elements from this queue.
752 * The queue will be empty after this call returns.
753 */
754 public void clear() {
755 final ReentrantLock lock = this.lock;
756 lock.lock();
757 try {
758 Object[] array = queue;
759 int n = size;
760 size = 0;
761 for (int i = 0; i < n; i++)
762 array[i] = null;
763 } finally {
764 lock.unlock();
765 }
766 }
767
768 /**
769 * Returns an array containing all of the elements in this queue; the
770 * runtime type of the returned array is that of the specified array.
771 * The returned array elements are in no particular order.
772 * If the queue fits in the specified array, it is returned therein.
773 * Otherwise, a new array is allocated with the runtime type of the
774 * specified array and the size of this queue.
775 *
776 * <p>If this queue fits in the specified array with room to spare
777 * (i.e., the array has more elements than this queue), the element in
778 * the array immediately following the end of the queue is set to
779 * {@code null}.
780 *
781 * <p>Like the {@link #toArray()} method, this method acts as bridge between
782 * array-based and collection-based APIs. Further, this method allows
783 * precise control over the runtime type of the output array, and may,
784 * under certain circumstances, be used to save allocation costs.
785 *
786 * <p>Suppose {@code x} is a queue known to contain only strings.
787 * The following code can be used to dump the queue into a newly
788 * allocated array of {@code String}:
789 *
790 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
791 *
792 * Note that {@code toArray(new Object[0])} is identical in function to
793 * {@code toArray()}.
794 *
795 * @param a the array into which the elements of the queue are to
796 * be stored, if it is big enough; otherwise, a new array of the
797 * same runtime type is allocated for this purpose
798 * @return an array containing all of the elements in this queue
799 * @throws ArrayStoreException if the runtime type of the specified array
800 * is not a supertype of the runtime type of every element in
801 * this queue
802 * @throws NullPointerException if the specified array is null
803 */
804 public <T> T[] toArray(T[] a) {
805 final ReentrantLock lock = this.lock;
806 lock.lock();
807 try {
808 int n = size;
809 if (a.length < n)
810 // Make a new array of a's runtime type, but my contents:
811 return (T[]) Arrays.copyOf(queue, size, a.getClass());
812 System.arraycopy(queue, 0, a, 0, n);
813 if (a.length > n)
814 a[n] = null;
815 return a;
816 } finally {
817 lock.unlock();
818 }
819 }
820
821 /**
822 * Returns an iterator over the elements in this queue. The
823 * iterator does not return the elements in any particular order.
824 *
825 * <p>The returned iterator is a "weakly consistent" iterator that
826 * will never throw {@link java.util.ConcurrentModificationException
827 * ConcurrentModificationException}, and guarantees to traverse
828 * elements as they existed upon construction of the iterator, and
829 * may (but is not guaranteed to) reflect any modifications
830 * subsequent to construction.
831 *
832 * @return an iterator over the elements in this queue
833 */
834 public Iterator<E> iterator() {
835 return new Itr(toArray());
836 }
837
838 /**
839 * Snapshot iterator that works off copy of underlying q array.
840 */
841 final class Itr implements Iterator<E> {
842 final Object[] array; // Array of all elements
843 int cursor; // index of next element to return
844 int lastRet; // index of last element, or -1 if no such
845
846 Itr(Object[] array) {
847 lastRet = -1;
848 this.array = array;
849 }
850
851 public boolean hasNext() {
852 return cursor < array.length;
853 }
854
855 public E next() {
856 if (cursor >= array.length)
857 throw new NoSuchElementException();
858 lastRet = cursor;
859 return (E)array[cursor++];
860 }
861
862 public void remove() {
863 if (lastRet < 0)
864 throw new IllegalStateException();
865 removeEQ(array[lastRet]);
866 lastRet = -1;
867 }
868 }
869
870 /**
871 * Saves this queue to a stream (that is, serializes it).
872 *
873 * For compatibility with previous version of this class, elements
874 * are first copied to a java.util.PriorityQueue, which is then
875 * serialized.
876 */
877 private void writeObject(java.io.ObjectOutputStream s)
878 throws java.io.IOException {
879 lock.lock();
880 try {
881 // avoid zero capacity argument
882 q = new PriorityQueue<E>(Math.max(size, 1), comparator);
883 q.addAll(this);
884 s.defaultWriteObject();
885 } finally {
886 q = null;
887 lock.unlock();
888 }
889 }
890
891 /**
892 * Reconstitutes this queue from a stream (that is, deserializes it).
893 */
894 private void readObject(java.io.ObjectInputStream s)
895 throws java.io.IOException, ClassNotFoundException {
896 try {
897 s.defaultReadObject();
898 this.queue = new Object[q.size()];
899 comparator = q.comparator();
900 addAll(q);
901 } finally {
902 q = null;
903 }
904 }
905
906 // Unsafe mechanics
907 private static final sun.misc.Unsafe UNSAFE;
908 private static final long allocationSpinLockOffset;
909 static {
910 try {
911 UNSAFE = sun.misc.Unsafe.getUnsafe();
912 Class<?> k = PriorityBlockingQueue.class;
913 allocationSpinLockOffset = UNSAFE.objectFieldOffset
914 (k.getDeclaredField("allocationSpinLock"));
915 } catch (Exception e) {
916 throw new Error(e);
917 }
918 }
919 }