ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java (file contents):
Revision 1.65 by jsr166, Mon Oct 11 18:27:39 2010 UTC vs.
Revision 1.66 by dl, Wed Oct 13 14:01:25 2010 UTC

# Line 70 | Line 70 | public class PriorityBlockingQueue<E> ex
70      private static final long serialVersionUID = 5595510919245408276L;
71  
72      /*
73 <     * This implementation is a variant of the one in
74 <     * java.util.PriorityQueue, with public operations protected with
75 <     * a single lock. However, allocation during resizing uses a
76 <     * simple spinlock (used only while not holding main lock) in
77 <     * order to allow takes to operate concurrently with allocation.
78 <     * This avoids repeated postponement of waiting consumers and
79 <     * consequent build-up. The need to back away from lock during
80 <     * allocation makes it impossible to simply wrap delegated
81 <     * java.util.PriorityQueue operations within a lock; hence code
82 <     * duplication.
73 >     * The implementation uses an array-based binary heap, with public
74 >     * operations protected with a single lock. However, allocation
75 >     * during resizing uses a simple spinlock (used only while not
76 >     * holding main lock) in order to allow takes to operate
77 >     * concurrently with allocation.  This avoids repeated
78 >     * postponement of waiting consumers and consequent element
79 >     * build-up. The need to back away from lock during allocation
80 >     * makes it impossible to simply wrap delegated
81 >     * java.util.PriorityQueue operations within a lock, as was done
82 >     * in a previous version of this class. To maintain
83 >     * interoperability, a plain PriorityQueue is still used during
84 >     * serialization, which maintains compatibility at the espense of
85 >     * transiently doubling overhead.
86       */
87  
88      /**
# Line 88 | Line 91 | public class PriorityBlockingQueue<E> ex
91      private static final int DEFAULT_INITIAL_CAPACITY = 11;
92  
93      /**
94 +     * The maximum size of array to allocate.
95 +     * Some VMs reserve some header words in an array.
96 +     * Attempts to allocate larger arrays may result in
97 +     * OutOfMemoryError: Requested array size exceeds VM limit
98 +     */
99 +    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
100 +
101 +    /**
102       * Priority queue represented as a balanced binary heap: the two
103       * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
104       * priority queue is ordered by comparator, or by the elements'
# Line 100 | Line 111 | public class PriorityBlockingQueue<E> ex
111      /**
112       * The number of elements in the priority queue.
113       */
114 <    private transient int size = 0;
114 >    private transient int size;
115  
116      /**
117       * The comparator, or null if priority queue uses elements'
# Line 109 | Line 120 | public class PriorityBlockingQueue<E> ex
120      private transient Comparator<? super E> comparator;
121  
122      /**
123 <     * A plain PriorityQueue used only for serialization,
113 <     * to maintain compatibility with previous versions
114 <     * of this class. Non-null only during serialization/deserialization.
123 >     * Lock used for all public operations
124       */
125 <    private PriorityQueue q;
125 >    private final ReentrantLock lock;
126  
127      /**
128 <     * Lock used for all public operations
128 >     * Condition for blocking when empty
129       */
130 <    final ReentrantLock lock = new ReentrantLock();
122 <    private final Condition notEmpty = lock.newCondition();
130 >    private final Condition notEmpty;
131  
132      /**
133       * Spinlock for allocation, acquired via CAS.
# Line 127 | Line 135 | public class PriorityBlockingQueue<E> ex
135      private transient volatile int allocationSpinLock;
136  
137      /**
138 +     * A plain PriorityQueue used only for serialization,
139 +     * to maintain compatibility with previous versions
140 +     * of this class. Non-null only during serialization/deserialization.
141 +     */
142 +    private PriorityQueue q;
143 +
144 +    /**
145       * Creates a {@code PriorityBlockingQueue} with the default
146       * initial capacity (11) that orders its elements according to
147       * their {@linkplain Comparable natural ordering}.
# Line 164 | Line 179 | public class PriorityBlockingQueue<E> ex
179                                   Comparator<? super E> comparator) {
180          if (initialCapacity < 1)
181              throw new IllegalArgumentException();
182 <        this.queue = new Object[initialCapacity];
182 >        this.lock = new ReentrantLock();
183 >        this.notEmpty = lock.newCondition();
184          this.comparator = comparator;
185 +        this.queue = new Object[initialCapacity];
186      }
187  
188      /**
# Line 185 | Line 202 | public class PriorityBlockingQueue<E> ex
202       *         of its elements are null
203       */
204      public PriorityBlockingQueue(Collection<? extends E> c) {
205 +        this.lock = new ReentrantLock();
206 +        this.notEmpty = lock.newCondition();
207 +        boolean heapify = true; // true if not known to be in heap order
208 +        boolean screen = true;  // true if must screen for nulls
209          if (c instanceof SortedSet<?>) {
210              SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
211              this.comparator = (Comparator<? super E>) ss.comparator();
212 <            initElementsFromCollection(ss);
212 >            heapify = false;
213          }
214          else if (c instanceof PriorityBlockingQueue<?>) {
215              PriorityBlockingQueue<? extends E> pq =
216                  (PriorityBlockingQueue<? extends E>) c;
217              this.comparator = (Comparator<? super E>) pq.comparator();
218 <            initFromPriorityBlockingQueue(pq);
219 <        }
220 <        else {
200 <            this.comparator = null;
201 <            initFromCollection(c);
202 <        }
203 <    }
204 <
205 <    private void initFromPriorityBlockingQueue(PriorityBlockingQueue<? extends E> c) {
206 <        if (c.getClass() == PriorityBlockingQueue.class) {
207 <            this.queue = c.toArray();
208 <            this.size = c.size();
209 <        } else {
210 <            initFromCollection(c);
218 >            screen = false;
219 >            if (pq.getClass() == PriorityBlockingQueue.class) // exact match
220 >                heapify = false;
221          }
212    }
213
214    private void initElementsFromCollection(Collection<? extends E> c) {
222          Object[] a = c.toArray();
223 +        int n = a.length;
224          // If c.toArray incorrectly doesn't return Object[], copy it.
225          if (a.getClass() != Object[].class)
226 <            a = Arrays.copyOf(a, a.length, Object[].class);
227 <        int len = a.length;
228 <        if (len == 1 || this.comparator != null)
221 <            for (int i = 0; i < len; i++)
226 >            a = Arrays.copyOf(a, n, Object[].class);
227 >        if (screen && (n == 1 || this.comparator != null)) {
228 >            for (int i = 0; i < n; ++i)
229                  if (a[i] == null)
230                      throw new NullPointerException();
231 +        }
232          this.queue = a;
233 <        this.size = a.length;
234 <    }
235 <
228 <    /**
229 <     * Initializes queue array with elements from the given Collection.
230 <     *
231 <     * @param c the collection
232 <     */
233 <    private void initFromCollection(Collection<? extends E> c) {
234 <        initElementsFromCollection(c);
235 <        heapify();
233 >        this.size = n;
234 >        if (heapify)
235 >            heapify();
236      }
237  
238      /**
239 <     * Tries to grow array to at least minCap, giving up (allowing
240 <     * retry) on contention. Call only while holding lock.
239 >     * Tries to grow array to accommodate at least one more element
240 >     * (but normally expand by about 50%), giving up (allowing retry)
241 >     * on contention (which we expect to be rare). Call only while
242 >     * holding lock.
243 >     *
244 >     * @param array the heap array
245 >     * @param oldCap the length of the array
246       */
247 <    private void tryGrow(int minCap, Object[] array, int oldCap) {
247 >    private void tryGrow(Object[] array, int oldCap) {
248          lock.unlock(); // must release and then re-acquire main lock
249          Object[] newArray = null;
250          if (allocationSpinLock == 0 &&
# Line 247 | Line 252 | public class PriorityBlockingQueue<E> ex
252                                       0, 1)) {
253              try {
254                  int newCap = oldCap + ((oldCap < 64) ?
255 <                                       (oldCap + 2) :
255 >                                       (oldCap + 2) : // grow faster if small
256                                         (oldCap >> 1));
257 <                if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
257 >                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
258 >                    int minCap = oldCap + 1;
259                      if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
260                          throw new OutOfMemoryError();
261                      newCap = MAX_ARRAY_SIZE;
262                  }
263 <                if (queue == array && newCap > array.length)
263 >                if (newCap > oldCap && queue == array)
264                      newArray = new Object[newCap];
265              } finally {
266                  allocationSpinLock = 0;
267              }
268          }
269 <        else
269 >        if (newArray == null) // back off if another thread is allocating
270              Thread.yield();
271          lock.lock();
272          if (newArray != null && queue == array) {
267            System.arraycopy(array, 0, newArray, 0, minCap);
273              queue = newArray;
274 +            System.arraycopy(array, 0, newArray, 0, oldCap);
275          }
276      }
277  
278      /**
279       * Mechanics for poll().  Call only while holding lock.
280       */
281 <    private E internalPoll() {
282 <        int s = size - 1;
283 <        if (s >= 0) {
284 <            size = s;
285 <            E result = (E) queue[0];
286 <            E x = (E) queue[s];
287 <            queue[s] = null;
288 <            if (s != 0)
289 <                siftDown(0, x);
290 <            return result;
281 >    private E extract() {
282 >        E result;
283 >        int n = size - 1;
284 >        if (n < 0)
285 >            result = null;
286 >        else {
287 >            Object[] array = queue;
288 >            result = (E) array[0];
289 >            E x = (E) array[n];
290 >            array[n] = null;
291 >            Comparator<? super E> cmp = comparator;
292 >            if (cmp == null)
293 >                siftDownComparable(0, x, array, n);
294 >            else
295 >                siftDownUsingComparator(0, x, array, n, cmp);
296 >            size = n;
297          }
298 <        else
287 <            return null;
298 >        return result;
299      }
300  
301      /**
# Line 295 | Line 306 | public class PriorityBlockingQueue<E> ex
306       * To simplify and speed up coercions and comparisons. the
307       * Comparable and Comparator versions are separated into different
308       * methods that are otherwise identical. (Similarly for siftDown.)
309 +     * These methods are static, with heap state as arguments, to
310 +     * simplify use in light of possible comparator exceptions.
311       *
312       * @param k the position to fill
313       * @param x the item to insert
314 +     * @param array the heap array
315 +     * @param n heap size
316       */
317 <    private void siftUp(int k, E x) {
318 <        if (comparator != null)
304 <            siftUpUsingComparator(k, x);
305 <        else
306 <            siftUpComparable(k, x);
307 <    }
308 <
309 <    private void siftUpComparable(int k, E x) {
310 <        Comparable<? super E> key = (Comparable<? super E>) x;
317 >    private static <T> void siftUpComparable(int k, T x, Object[] array) {
318 >        Comparable<? super T> key = (Comparable<? super T>) x;
319          while (k > 0) {
320              int parent = (k - 1) >>> 1;
321 <            Object e = queue[parent];
322 <            if (key.compareTo((E) e) >= 0)
321 >            Object e = array[parent];
322 >            if (key.compareTo((T) e) >= 0)
323                  break;
324 <            queue[k] = e;
324 >            array[k] = e;
325              k = parent;
326          }
327 <        queue[k] = key;
327 >        array[k] = key;
328      }
329  
330 <    private void siftUpUsingComparator(int k, E x) {
330 >    private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
331 >                                       Comparator<? super T> cmp) {
332          while (k > 0) {
333              int parent = (k - 1) >>> 1;
334 <            Object e = queue[parent];
335 <            if (comparator.compare(x, (E) e) >= 0)
334 >            Object e = array[parent];
335 >            if (cmp.compare(x, (T) e) >= 0)
336                  break;
337 <            queue[k] = e;
337 >            array[k] = e;
338              k = parent;
339          }
340 <        queue[k] = x;
340 >        array[k] = x;
341      }
342  
343      /**
# Line 338 | Line 347 | public class PriorityBlockingQueue<E> ex
347       *
348       * @param k the position to fill
349       * @param x the item to insert
350 +     * @param array the heap array
351 +     * @param n heap size
352       */
353 <    private void siftDown(int k, E x) {
354 <        if (comparator != null)
355 <            siftDownUsingComparator(k, x);
356 <        else
346 <            siftDownComparable(k, x);
347 <    }
348 <
349 <    private void siftDownComparable(int k, E x) {
350 <        Comparable<? super E> key = (Comparable<? super E>)x;
351 <        int half = size >>> 1;        // loop while a non-leaf
353 >    private static <T> void siftDownComparable(int k, T x, Object[] array,
354 >                                               int n) {
355 >        Comparable<? super T> key = (Comparable<? super T>)x;
356 >        int half = n >>> 1;           // loop while a non-leaf
357          while (k < half) {
358              int child = (k << 1) + 1; // assume left child is least
359 <            Object c = queue[child];
359 >            Object c = array[child];
360              int right = child + 1;
361 <            if (right < size &&
362 <                ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
363 <                c = queue[child = right];
364 <            if (key.compareTo((E) c) <= 0)
361 >            if (right < n &&
362 >                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
363 >                c = array[child = right];
364 >            if (key.compareTo((T) c) <= 0)
365                  break;
366 <            queue[k] = c;
366 >            array[k] = c;
367              k = child;
368          }
369 <        queue[k] = key;
369 >        array[k] = key;
370      }
371  
372 <    private void siftDownUsingComparator(int k, E x) {
373 <        int half = size >>> 1;
372 >    private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
373 >                                                    int n,
374 >                                                    Comparator<? super T> cmp) {
375 >        int half = n >>> 1;
376          while (k < half) {
377              int child = (k << 1) + 1;
378 <            Object c = queue[child];
378 >            Object c = array[child];
379              int right = child + 1;
380 <            if (right < size &&
381 <                comparator.compare((E) c, (E) queue[right]) > 0)
382 <                c = queue[child = right];
376 <            if (comparator.compare(x, (E) c) <= 0)
380 >            if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
381 >                c = array[child = right];
382 >            if (cmp.compare(x, (T) c) <= 0)
383                  break;
384 <            queue[k] = c;
384 >            array[k] = c;
385              k = child;
386          }
387 <        queue[k] = x;
387 >        array[k] = x;
388      }
389  
390      /**
# Line 386 | Line 392 | public class PriorityBlockingQueue<E> ex
392       * assuming nothing about the order of the elements prior to the call.
393       */
394      private void heapify() {
395 <        for (int i = (size >>> 1) - 1; i >= 0; i--)
396 <            siftDown(i, (E) queue[i]);
395 >        Object[] array = queue;
396 >        int n = size;
397 >        int half = (n >>> 1) - 1;
398 >        Comparator<? super E> cmp = comparator;
399 >        if (cmp == null) {
400 >            for (int i = half; i >= 0; i--)
401 >                siftDownComparable(i, (E) array[i], array, n);
402 >        }
403 >        else {
404 >            for (int i = half; i >= 0; i--)
405 >                siftDownUsingComparator(i, (E) array[i], array, n, cmp);
406 >        }
407      }
408  
409      /**
394     * The maximum size of array to allocate.
395     * Some VMs reserve some header words in an array.
396     * Attempts to allocate larger arrays may result in
397     * OutOfMemoryError: Requested array size exceeds VM limit
398     */
399    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
400
401    /**
410       * Inserts the specified element into this priority queue.
411       *
412       * @param e the element to add
# Line 428 | Line 436 | public class PriorityBlockingQueue<E> ex
436              throw new NullPointerException();
437          final ReentrantLock lock = this.lock;
438          lock.lock();
439 <        int len, cap;
439 >        int n, cap;
440          Object[] array;
441 <        while ((len = size) >= (cap = (array = queue).length))
442 <            tryGrow(len, array, cap);
441 >        while ((n = size) >= (cap = (array = queue).length))
442 >            tryGrow(array, cap);
443          try {
444 <            size = len + 1;
445 <            if (len == 0)
446 <                array[0] = e;
444 >            Comparator<? super E> cmp = comparator;
445 >            if (cmp == null)
446 >                siftUpComparable(n, e, array);
447              else
448 <                siftUp(len, e);
448 >                siftUpUsingComparator(n, e, array, cmp);
449 >            size = n + 1;
450              notEmpty.signal();
451          } finally {
452              lock.unlock();
# Line 481 | Line 490 | public class PriorityBlockingQueue<E> ex
490      public E poll() {
491          final ReentrantLock lock = this.lock;
492          lock.lock();
493 +        E result;
494          try {
495 <            return internalPoll();
495 >            result = extract();
496          } finally {
497              lock.unlock();
498          }
499 +        return result;
500      }
501  
502      public E take() throws InterruptedException {
492        E result = null;
503          final ReentrantLock lock = this.lock;
504          lock.lockInterruptibly();
505 +        E result;
506          try {
507 <            while ( (result = internalPoll()) == null)
507 >            while ( (result = extract()) == null)
508                  notEmpty.await();
509          } finally {
510              lock.unlock();
# Line 503 | Line 514 | public class PriorityBlockingQueue<E> ex
514  
515      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
516          long nanos = unit.toNanos(timeout);
506        E result = null;
517          final ReentrantLock lock = this.lock;
518          lock.lockInterruptibly();
519 +        E result;
520          try {
521 <            while ( (result = internalPoll()) == null && nanos > 0)
521 >            while ( (result = extract()) == null && nanos > 0)
522                  nanos = notEmpty.awaitNanos(nanos);
523          } finally {
524              lock.unlock();
# Line 516 | Line 527 | public class PriorityBlockingQueue<E> ex
527      }
528  
529      public E peek() {
519        E result = null;
530          final ReentrantLock lock = this.lock;
531          lock.lock();
532 +        E result;
533          try {
534 <            if (size >= 0)
524 <                result = (E) queue[0];
534 >            result = size > 0 ? (E) queue[0] : null;
535          } finally {
536              lock.unlock();
537          }
# Line 564 | Line 574 | public class PriorityBlockingQueue<E> ex
574  
575      private int indexOf(Object o) {
576          if (o != null) {
577 <            for (int i = 0; i < size; i++)
578 <                if (o.equals(queue[i]))
577 >            Object[] array = queue;
578 >            int n = size;
579 >            for (int i = 0; i < n; i++)
580 >                if (o.equals(array[i]))
581                      return i;
582          }
583          return -1;
# Line 575 | Line 587 | public class PriorityBlockingQueue<E> ex
587       * Removes the ith element from queue.
588       */
589      private void removeAt(int i) {
590 <        int s = --size;
591 <        if (s == i) // removed last element
592 <            queue[i] = null;
590 >        Object[] array = queue;
591 >        int n = size - 1;
592 >        if (n == i) // removed last element
593 >            array[i] = null;
594          else {
595 <            E moved = (E) queue[s];
596 <            queue[s] = null;
597 <            siftDown(i, moved);
598 <            if (queue[i] == moved)
599 <                siftUp(i, moved);
595 >            E moved = (E) array[n];
596 >            array[n] = null;
597 >            Comparator<? super E> cmp = comparator;
598 >            if (cmp == null)
599 >                siftDownComparable(i, moved, array, n);
600 >            else
601 >                siftDownUsingComparator(i, moved, array, n, cmp);
602 >            if (array[i] == moved) {
603 >                if (cmp == null)
604 >                    siftUpComparable(i, moved, array);
605 >                else
606 >                    siftUpUsingComparator(i, moved, array, cmp);
607 >            }
608          }
609 +        size = n;
610      }
611  
612      /**
# Line 622 | Line 644 | public class PriorityBlockingQueue<E> ex
644          final ReentrantLock lock = this.lock;
645          lock.lock();
646          try {
647 <            for (int i = 0; i < size; i++) {
648 <                if (o == queue[i]) {
647 >            Object[] array = queue;
648 >            int n = size;
649 >            for (int i = 0; i < n; i++) {
650 >                if (o == array[i]) {
651                      removeAt(i);
652                      break;
653                  }
# Line 714 | Line 738 | public class PriorityBlockingQueue<E> ex
738          try {
739              int n = 0;
740              E e;
741 <            while ( (e = internalPoll()) != null) {
741 >            while ( (e = extract()) != null) {
742                  c.add(e);
743                  ++n;
744              }
# Line 742 | Line 766 | public class PriorityBlockingQueue<E> ex
766          try {
767              int n = 0;
768              E e;
769 <            while (n < maxElements && (e = internalPoll()) != null) {
769 >            while (n < maxElements && (e = extract()) != null) {
770                  c.add(e);
771                  ++n;
772              }
# Line 760 | Line 784 | public class PriorityBlockingQueue<E> ex
784          final ReentrantLock lock = this.lock;
785          lock.lock();
786          try {
787 <            for (int i = 0; i < size; i++)
788 <                queue[i] = null;
787 >            Object[] array = queue;
788 >            int n = size;
789              size = 0;
790 +            for (int i = 0; i < n; i++)
791 +                array[i] = null;
792          } finally {
793              lock.unlock();
794          }
# Line 809 | Line 835 | public class PriorityBlockingQueue<E> ex
835          final ReentrantLock lock = this.lock;
836          lock.lock();
837          try {
838 <            if (a.length < size)
838 >            int n = size;
839 >            if (a.length < n)
840                  // Make a new array of a's runtime type, but my contents:
841                  return (T[]) Arrays.copyOf(queue, size, a.getClass());
842 <            System.arraycopy(queue, 0, a, 0, size);
843 <            if (a.length > size)
844 <                a[size] = null;
842 >            System.arraycopy(queue, 0, a, 0, n);
843 >            if (a.length > n)
844 >                a[n] = null;
845              return a;
846          } finally {
847              lock.unlock();
# Line 883 | Line 910 | public class PriorityBlockingQueue<E> ex
910              q = new PriorityQueue<E>(n == 0 ? 1 : n, comparator);
911              q.addAll(this);
912              s.defaultWriteObject();
886            q = null;
913          } finally {
914 +            q = null;
915              lock.unlock();
916          }
917      }
# Line 897 | Line 924 | public class PriorityBlockingQueue<E> ex
924       */
925      private void readObject(java.io.ObjectInputStream s)
926          throws java.io.IOException, ClassNotFoundException {
927 <        s.defaultReadObject();
928 <        this.queue = new Object[q.size()];
929 <        comparator = q.comparator();
930 <        addAll(q);
931 <        q = null;
927 >        try {
928 >            s.defaultReadObject();
929 >            this.queue = new Object[q.size()];
930 >            comparator = q.comparator();
931 >            addAll(q);
932 >        } finally {
933 >            q = null;
934 >        }
935      }
936  
937      // Unsafe mechanics

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines