ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java (file contents):
Revision 1.136 by jsr166, Mon Nov 7 00:37:53 2016 UTC vs.
Revision 1.137 by jsr166, Sun Nov 13 02:10:09 2016 UTC

# Line 141 | Line 141 | public class ArrayBlockingQueue<E> exten
141       * Call only when holding lock.
142       */
143      private void enqueue(E x) {
144 +        // assert lock.isHeldByCurrentThread();
145          // assert lock.getHoldCount() == 1;
146          // assert items[putIndex] == null;
147          final Object[] items = this.items;
# Line 156 | Line 157 | public class ArrayBlockingQueue<E> exten
157       * Call only when holding lock.
158       */
159      private E dequeue() {
160 +        // assert lock.isHeldByCurrentThread();
161          // assert lock.getHoldCount() == 1;
162          // assert items[takeIndex] != null;
163          final Object[] items = this.items;
# Line 177 | Line 179 | public class ArrayBlockingQueue<E> exten
179       * Call only when holding lock.
180       */
181      void removeAt(final int removeIndex) {
182 +        // assert lock.isHeldByCurrentThread();
183          // assert lock.getHoldCount() == 1;
184          // assert items[removeIndex] != null;
185          // assert removeIndex >= 0 && removeIndex < items.length;
# Line 820 | Line 823 | public class ArrayBlockingQueue<E> exten
823           * there is known to be at least one iterator to collect
824           */
825          void doSomeSweeping(boolean tryHarder) {
826 <            // assert lock.getHoldCount() == 1;
826 >            // assert lock.isHeldByCurrentThread();
827              // assert head != null;
828              int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
829              Node o, p;
# Line 876 | Line 879 | public class ArrayBlockingQueue<E> exten
879           * Adds a new iterator to the linked list of tracked iterators.
880           */
881          void register(Itr itr) {
882 <            // assert lock.getHoldCount() == 1;
882 >            // assert lock.isHeldByCurrentThread();
883              head = new Node(itr, head);
884          }
885  
# Line 886 | Line 889 | public class ArrayBlockingQueue<E> exten
889           * Notifies all iterators, and expunges any that are now stale.
890           */
891          void takeIndexWrapped() {
892 <            // assert lock.getHoldCount() == 1;
892 >            // assert lock.isHeldByCurrentThread();
893              cycles++;
894              for (Node o = null, p = head; p != null;) {
895                  final Itr it = p.get();
# Line 943 | Line 946 | public class ArrayBlockingQueue<E> exten
946           * clears all weak refs, and unlinks the itrs datastructure.
947           */
948          void queueIsEmpty() {
949 <            // assert lock.getHoldCount() == 1;
949 >            // assert lock.isHeldByCurrentThread();
950              for (Node p = head; p != null; p = p.next) {
951                  Itr it = p.get();
952                  if (it != null) {
# Line 959 | Line 962 | public class ArrayBlockingQueue<E> exten
962           * Called whenever an element has been dequeued (at takeIndex).
963           */
964          void elementDequeued() {
965 <            // assert lock.getHoldCount() == 1;
965 >            // assert lock.isHeldByCurrentThread();
966              if (count == 0)
967                  queueIsEmpty();
968              else if (takeIndex == 0)
# Line 1020 | Line 1023 | public class ArrayBlockingQueue<E> exten
1023          private static final int DETACHED = -3;
1024  
1025          Itr() {
1023            // assert lock.getHoldCount() == 0;
1026              lastRet = NONE;
1027              final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1028              lock.lock();
# Line 1053 | Line 1055 | public class ArrayBlockingQueue<E> exten
1055          }
1056  
1057          boolean isDetached() {
1058 <            // assert lock.getHoldCount() == 1;
1058 >            // assert lock.isHeldByCurrentThread();
1059              return prevTakeIndex < 0;
1060          }
1061  
1062          private int incCursor(int index) {
1063 <            // assert lock.getHoldCount() == 1;
1063 >            // assert lock.isHeldByCurrentThread();
1064              if (++index == items.length) index = 0;
1065              if (index == putIndex) index = NONE;
1066              return index;
# Line 1083 | Line 1085 | public class ArrayBlockingQueue<E> exten
1085           * operation on this iterator.  Call only from iterating thread.
1086           */
1087          private void incorporateDequeues() {
1088 <            // assert lock.getHoldCount() == 1;
1088 >            // assert lock.isHeldByCurrentThread();
1089              // assert itrs != null;
1090              // assert !isDetached();
1091              // assert count > 0;
# Line 1126 | Line 1128 | public class ArrayBlockingQueue<E> exten
1128           */
1129          private void detach() {
1130              // Switch to detached mode
1131 <            // assert lock.getHoldCount() == 1;
1131 >            // assert lock.isHeldByCurrentThread();
1132              // assert cursor == NONE;
1133              // assert nextIndex < 0;
1134              // assert lastRet < 0 || nextItem == null;
# Line 1146 | Line 1148 | public class ArrayBlockingQueue<E> exten
1148           * triggered by queue modifications.
1149           */
1150          public boolean hasNext() {
1149            // assert lock.getHoldCount() == 0;
1151              if (nextItem != null)
1152                  return true;
1153              noNext();
# Line 1176 | Line 1177 | public class ArrayBlockingQueue<E> exten
1177          }
1178  
1179          public E next() {
1179            // assert lock.getHoldCount() == 0;
1180              final E x = nextItem;
1181              if (x == null)
1182                  throw new NoSuchElementException();
# Line 1204 | Line 1204 | public class ArrayBlockingQueue<E> exten
1204          }
1205  
1206          public void forEachRemaining(Consumer<? super E> action) {
1207            // assert lock.getHoldCount() == 0;
1207              Objects.requireNonNull(action);
1208              final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1209              lock.lock();
# Line 1236 | Line 1235 | public class ArrayBlockingQueue<E> exten
1235          }
1236  
1237          public void remove() {
1239            // assert lock.getHoldCount() == 0;
1238              final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1239              lock.lock();
1240 +            // assert lock.getHoldCount() == 1;
1241              try {
1242                  if (!isDetached())
1243                      incorporateDequeues(); // might update lastRet or detach
# Line 1276 | Line 1275 | public class ArrayBlockingQueue<E> exten
1275           * from next(), as promised by returning true from hasNext().
1276           */
1277          void shutdown() {
1278 <            // assert lock.getHoldCount() == 1;
1278 >            // assert lock.isHeldByCurrentThread();
1279              cursor = NONE;
1280              if (nextIndex >= 0)
1281                  nextIndex = REMOVED;
# Line 1304 | Line 1303 | public class ArrayBlockingQueue<E> exten
1303           * @return true if this iterator should be unlinked from itrs
1304           */
1305          boolean removedAt(int removedIndex) {
1306 <            // assert lock.getHoldCount() == 1;
1306 >            // assert lock.isHeldByCurrentThread();
1307              if (isDetached())
1308                  return true;
1309  
# Line 1361 | Line 1360 | public class ArrayBlockingQueue<E> exten
1360           * @return true if this iterator should be unlinked from itrs
1361           */
1362          boolean takeIndexWrapped() {
1363 <            // assert lock.getHoldCount() == 1;
1363 >            // assert lock.isHeldByCurrentThread();
1364              if (isDetached())
1365                  return true;
1366              if (itrs.cycles - prevCycles > 1) {
# Line 1469 | Line 1468 | public class ArrayBlockingQueue<E> exten
1468                           ; i = 0, to = end) {
1469                          for (; i < to; i++)
1470                              if (filter.test(itemAt(items, i)))
1471 <                                return bulkRemoveModified(filter, i, to);
1471 >                                return bulkRemoveModified(filter, i);
1472                          if (to == end) break;
1473                      }
1474                  }
# Line 1484 | Line 1483 | public class ArrayBlockingQueue<E> exten
1483          return super.removeIf(filter);
1484      }
1485  
1486 +    // A tiny bit set implementation
1487 +
1488 +    private static long[] nBits(int n) {
1489 +        return new long[((n - 1) >> 6) + 1];
1490 +    }
1491 +    private static void setBit(long[] bits, int i) {
1492 +        bits[i >> 6] |= 1L << i;
1493 +    }
1494 +    private static boolean isClear(long[] bits, int i) {
1495 +        return (bits[i >> 6] & (1L << i)) == 0;
1496 +    }
1497 +
1498 +    /**
1499 +     * Returns circular distance from i to j, disambiguating i == j to
1500 +     * items.length; never returns 0.
1501 +     */
1502 +    private int distanceNonEmpty(int i, int j) {
1503 +        if ((j -= i) <= 0) j += items.length;
1504 +        return j;
1505 +    }
1506 +
1507      /**
1508       * Helper for bulkRemove, in case of at least one deletion.
1509 <     * @param i valid index of first element to be deleted
1509 >     * Tolerate predicates that reentrantly access the collection for
1510 >     * read (but not write), so traverse once to find elements to
1511 >     * delete, a second pass to physically expunge.
1512 >     *
1513 >     * @param beg valid index of first element to be deleted
1514       */
1515      private boolean bulkRemoveModified(
1516 <        Predicate<? super E> filter, int i, int to) {
1517 <        final Object[] items = this.items;
1516 >        Predicate<? super E> filter, final int beg) {
1517 >        final Object[] es = items;
1518          final int capacity = items.length;
1495        // a two-finger algorithm, with hare i reading, tortoise j writing
1496        int j = i++;
1519          final int end = putIndex;
1520 <        try {
1521 <            for (;; j = 0) {    // j rejoins i on second leg
1522 <                E e;
1523 <                // In this loop, i and j are on the same leg, with i > j
1524 <                for (; i < to; i++)
1525 <                    if (!filter.test(e = itemAt(items, i)))
1526 <                        items[j++] = e;
1527 <                if (to == end) break;
1528 <                // In this loop, j is on the first leg, i on the second
1529 <                for (i = 0, to = end; i < to && j < capacity; i++)
1530 <                    if (!filter.test(e = itemAt(items, i)))
1531 <                        items[j++] = e;
1532 <                if (i >= to) {
1533 <                    if (j == capacity) j = 0; // "corner" case
1534 <                    break;
1535 <                }
1520 >        final long[] deathRow = nBits(distanceNonEmpty(beg, putIndex));
1521 >        deathRow[0] = 1L;   // set bit 0
1522 >        for (int i = beg + 1, to = (i <= end) ? end : es.length, k = beg;
1523 >             ; i = 0, to = end, k -= capacity) {
1524 >            for (; i < to; i++)
1525 >                if (filter.test(itemAt(es, i)))
1526 >                    setBit(deathRow, i - k);
1527 >            if (to == end) break;
1528 >        }
1529 >        // a two-finger traversal, with hare i reading, tortoise w writing
1530 >        int w = beg;
1531 >        for (int i = beg + 1, to = (i <= end) ? end : es.length, k = beg;
1532 >             ; w = 0) { // w rejoins i on second leg
1533 >            // In this loop, i and w are on the same leg, with i > w
1534 >            for (; i < to; i++)
1535 >                if (isClear(deathRow, i - k))
1536 >                    es[w++] = es[i];
1537 >            if (to == end) break;
1538 >            // In this loop, w is on the first leg, i on the second
1539 >            for (i = 0, to = end, k -= capacity; i < to && w < capacity; i++)
1540 >                if (isClear(deathRow, i - k))
1541 >                    es[w++] = es[i];
1542 >            if (i >= to) {
1543 >                if (w == capacity) w = 0; // "corner" case
1544 >                break;
1545              }
1515            return true;
1516        } catch (Throwable ex) {
1517            // copy remaining elements
1518            for (; i != end; i = inc(i, capacity), j = inc(j, capacity))
1519                items[j] = items[i];
1520            throw ex;
1521        } finally {
1522            int deleted = putIndex - j;
1523            if (deleted <= 0) deleted += capacity;
1524            count -= deleted;
1525            circularClear(items, putIndex = j, end);
1546          }
1547 +        count -= distanceNonEmpty(w, end);
1548 +        circularClear(es, putIndex = w, end);
1549 +        // checkInvariants();
1550 +        return true;
1551      }
1552  
1553      /** debugging */
1554      void checkInvariants() {
1555          // meta-assertions
1556          // assert lock.isHeldByCurrentThread();
1533        // assert lock.getHoldCount() == 1;
1557          try {
1558              int capacity = items.length;
1559              // assert capacity > 0;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines