--- jsr166/src/jsr166y/LinkedTransferQueue.java 2010/09/01 23:40:29 1.77 +++ jsr166/src/jsr166y/LinkedTransferQueue.java 2015/10/03 18:17:51 1.94 @@ -1,19 +1,17 @@ /* * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/licenses/publicdomain + * http://creativecommons.org/publicdomain/zero/1.0/ */ package jsr166y; -import java.util.concurrent.*; - import java.util.AbstractQueue; import java.util.Collection; -import java.util.ConcurrentModificationException; import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Queue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; /** @@ -24,10 +22,17 @@ import java.util.concurrent.locks.LockSu * producer. The tail of the queue is that element that has * been on the queue the shortest time for some producer. * - *

Beware that, unlike in most collections, the {@code size} - * method is NOT a constant-time operation. Because of the + *

Beware that, unlike in most collections, the {@code size} method + * is NOT a constant-time operation. Because of the * asynchronous nature of these queues, determining the current number - * of elements requires a traversal of the elements. + * of elements requires a traversal of the elements, and so may report + * inaccurate results if this collection is modified during traversal. + * Additionally, the bulk operations {@code addAll}, + * {@code removeAll}, {@code retainAll}, {@code containsAll}, + * {@code equals}, and {@code toArray} are not guaranteed + * to be performed atomically. For example, an iterator operating + * concurrently with an {@code addAll} operation might view only some + * of the added elements. * *

This class and its iterator implement all of the * optional methods of the {@link Collection} and {@link @@ -71,7 +76,7 @@ public class LinkedTransferQueue exte * * A FIFO dual queue may be implemented using a variation of the * Michael & Scott (M&S) lock-free queue algorithm - * (http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf). + * (http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf). * It maintains two pointer fields, "head", pointing to a * (matched) node that in turn points to the first actual * (unmatched) queue node (or null if empty); and "tail" that @@ -296,8 +301,8 @@ public class LinkedTransferQueue exte * of less-contended queues. During spins threads check their * interrupt status and generate a thread-local random number * to decide to occasionally perform a Thread.yield. While - * yield has underdefined specs, we assume that might it help, - * and will not hurt in limiting impact of spinning on busy + * yield has underdefined specs, we assume that it might help, + * and will not hurt, in limiting impact of spinning on busy * systems. We also use smaller (1/2) spins for nodes that are * not known to be front but whose predecessors have not * blocked -- these "chained" spins avoid artifacts of @@ -423,7 +428,7 @@ public class LinkedTransferQueue exte } final boolean casItem(Object cmp, Object val) { - // assert cmp == null || cmp.getClass() != Node.class; + // assert cmp == null || cmp.getClass() != Node.class; return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } @@ -489,7 +494,7 @@ public class LinkedTransferQueue exte * Tries to artificially match a data node -- used by remove. */ final boolean tryMatchData() { - // assert isData; + // assert isData; Object x = item; if (x != null && x != this && casItem(x, null)) { LockSupport.unpark(waiter); @@ -498,16 +503,27 @@ public class LinkedTransferQueue exte return false; } - // Unsafe mechanics - private static final sun.misc.Unsafe UNSAFE = getUnsafe(); - private static final long nextOffset = - objectFieldOffset(UNSAFE, "next", Node.class); - private static final long itemOffset = - objectFieldOffset(UNSAFE, "item", Node.class); - private static final long waiterOffset = - objectFieldOffset(UNSAFE, "waiter", Node.class); - private static final long serialVersionUID = -3375979862319811754L; + + // Unsafe mechanics + private static final sun.misc.Unsafe UNSAFE; + private static final long itemOffset; + private static final long nextOffset; + private static final long waiterOffset; + static { + try { + UNSAFE = getUnsafe(); + Class k = Node.class; + itemOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("item")); + nextOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("next")); + waiterOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("waiter")); + } catch (Exception e) { + throw new Error(e); + } + } } /** head of the queue; null until first enqueue */ @@ -542,7 +558,7 @@ public class LinkedTransferQueue exte @SuppressWarnings("unchecked") static E cast(Object item) { - // assert item == null || item.getClass() != Node.class; + // assert item == null || item.getClass() != Node.class; return (E) item; } @@ -561,7 +577,8 @@ public class LinkedTransferQueue exte throw new NullPointerException(); Node s = null; // the node to append, if needed - retry: for (;;) { // restart on append race + retry: + for (;;) { // restart on append race for (Node h = head, p = h; p != null;) { // find & match first node boolean isData = p.isData; @@ -572,7 +589,7 @@ public class LinkedTransferQueue exte if (p.casItem(item, e)) { // match for (Node q = p; q != h;) { Node n = q.next; // update by 2 unless singleton - if (head == h && casHead(h, n == null? q : n)) { + if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry @@ -581,7 +598,7 @@ public class LinkedTransferQueue exte break; // unless slack < 2 } LockSupport.unpark(p.waiter); - return this.cast(item); + return LinkedTransferQueue.cast(item); } } Node n = p.next; @@ -657,11 +674,11 @@ public class LinkedTransferQueue exte for (;;) { Object item = s.item; if (item != e) { // matched - // assert item != s; + // assert item != s; s.forgetContents(); // avoid garbage - return this.cast(item); + return LinkedTransferQueue.cast(item); } - if ((w.isInterrupted() || (timed && nanos <= 0)) && + if ((w.isInterrupted() || (timed && nanos <= 0L)) && s.casItem(e, s)) { // cancel unsplice(pred, s); return e; @@ -740,7 +757,7 @@ public class LinkedTransferQueue exte Object item = p.item; if (p.isData) { if (item != null && item != p) - return this.cast(item); + return LinkedTransferQueue.cast(item); } else if (item == null) return null; @@ -782,22 +799,61 @@ public class LinkedTransferQueue exte * Moves to next node after prev, or first node if prev null. */ private void advance(Node prev) { - lastPred = lastRet; - lastRet = prev; - for (Node p = (prev == null) ? head : succ(prev); - p != null; p = succ(p)) { - Object item = p.item; - if (p.isData) { - if (item != null && item != p) { - nextItem = LinkedTransferQueue.this.cast(item); - nextNode = p; + /* + * To track and avoid buildup of deleted nodes in the face + * of calls to both Queue.remove and Itr.remove, we must + * include variants of unsplice and sweep upon each + * advance: Upon Itr.remove, we may need to catch up links + * from lastPred, and upon other removes, we might need to + * skip ahead from stale nodes and unsplice deleted ones + * found while advancing. + */ + + Node r, b; // reset lastPred upon possible deletion of lastRet + if ((r = lastRet) != null && !r.isMatched()) + lastPred = r; // next lastPred is old lastRet + else if ((b = lastPred) == null || b.isMatched()) + lastPred = null; // at start of list + else { + Node s, n; // help with removal of lastPred.next + while ((s = b.next) != null && + s != b && s.isMatched() && + (n = s.next) != null && n != s) + b.casNext(s, n); + } + + this.lastRet = prev; + + for (Node p = prev, s, n;;) { + s = (p == null) ? head : p.next; + if (s == null) + break; + else if (s == p) { + p = null; + continue; + } + Object item = s.item; + if (s.isData) { + if (item != null && item != s) { + nextItem = LinkedTransferQueue.cast(item); + nextNode = s; return; } } else if (item == null) break; + // assert s.isMatched(); + if (p == null) + p = s; + else if ((n = s.next) == null) + break; + else if (s == n) + p = null; + else + p.casNext(s, n); } nextNode = null; + nextItem = null; } Itr() { @@ -817,10 +873,12 @@ public class LinkedTransferQueue exte } public final void remove() { - Node p = lastRet; - if (p == null) throw new IllegalStateException(); - if (p.tryMatchData()) - unsplice(lastPred, p); + final Node lastRet = this.lastRet; + if (lastRet == null) + throw new IllegalStateException(); + this.lastRet = null; + if (lastRet.tryMatchData()) + unsplice(lastPred, lastRet); } } @@ -882,12 +940,14 @@ public class LinkedTransferQueue exte */ private void sweep() { for (Node p = head, s, n; p != null && (s = p.next) != null; ) { - if (p == s) // stale - p = head; - else if (!s.isMatched()) + if (!s.isMatched()) + // Unmatched nodes are never self-linked p = s; else if ((n = s.next) == null) // trailing node is pinned break; + else if (s == n) // stale + // No need to also check for p == s, since that implies s == n + p = head; else p.casNext(s, n); } @@ -956,7 +1016,8 @@ public class LinkedTransferQueue exte * return {@code false}. * * @return {@code true} (as specified by - * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer}) + * {@link java.util.concurrent.BlockingQueue#offer(Object,long,TimeUnit) + * BlockingQueue.offer}) * @throws NullPointerException if the specified element is null */ public boolean offer(E e, long timeout, TimeUnit unit) { @@ -968,8 +1029,7 @@ public class LinkedTransferQueue exte * Inserts the specified element at the tail of this queue. * As the queue is unbounded, this method will never return {@code false}. * - * @return {@code true} (as specified by - * {@link BlockingQueue#offer(Object) BlockingQueue.offer}) + * @return {@code true} (as specified by {@link Queue#offer}) * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { @@ -1074,8 +1134,7 @@ public class LinkedTransferQueue exte if (c == this) throw new IllegalArgumentException(); int n = 0; - E e; - while ( (e = poll()) != null) { + for (E e; (e = poll()) != null;) { c.add(e); ++n; } @@ -1092,8 +1151,7 @@ public class LinkedTransferQueue exte if (c == this) throw new IllegalArgumentException(); int n = 0; - E e; - while (n < maxElements && (e = poll()) != null) { + for (E e; n < maxElements && (e = poll()) != null;) { c.add(e); ++n; } @@ -1101,15 +1159,15 @@ public class LinkedTransferQueue exte } /** - * Returns an iterator over the elements in this queue in proper - * sequence, from head to tail. + * Returns an iterator over the elements in this queue in proper sequence. + * The elements will be returned in order from first (head) to last (tail). * *

The returned iterator is a "weakly consistent" iterator that - * will never throw - * {@link ConcurrentModificationException ConcurrentModificationException}, - * and guarantees to traverse elements as they existed upon - * construction of the iterator, and may (but is not guaranteed - * to) reflect any modifications subsequent to construction. + * will never throw {@link java.util.ConcurrentModificationException + * ConcurrentModificationException}, and guarantees to traverse + * elements as they existed upon construction of the iterator, and + * may (but is not guaranteed to) reflect any modifications + * subsequent to construction. * * @return an iterator over the elements in this queue in proper sequence */ @@ -1174,11 +1232,34 @@ public class LinkedTransferQueue exte } /** + * Returns {@code true} if this queue contains the specified element. + * More formally, returns {@code true} if and only if this queue contains + * at least one element {@code e} such that {@code o.equals(e)}. + * + * @param o object to be checked for containment in this queue + * @return {@code true} if this queue contains the specified element + */ + public boolean contains(Object o) { + if (o == null) return false; + for (Node p = head; p != null; p = succ(p)) { + Object item = p.item; + if (p.isData) { + if (item != null && item != p && o.equals(item)) + return true; + } + else if (item == null) + break; + } + return false; + } + + /** * Always returns {@code Integer.MAX_VALUE} because a * {@code LinkedTransferQueue} is not capacity constrained. * * @return {@code Integer.MAX_VALUE} (as specified by - * {@link BlockingQueue#remainingCapacity()}) + * {@link java.util.concurrent.BlockingQueue#remainingCapacity() + * BlockingQueue.remainingCapacity}) */ public int remainingCapacity() { return Integer.MAX_VALUE; @@ -1210,7 +1291,8 @@ public class LinkedTransferQueue exte throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); for (;;) { - @SuppressWarnings("unchecked") E item = (E) s.readObject(); + @SuppressWarnings("unchecked") + E item = (E) s.readObject(); if (item == null) break; else @@ -1220,23 +1302,22 @@ public class LinkedTransferQueue exte // Unsafe mechanics - private static final sun.misc.Unsafe UNSAFE = getUnsafe(); - private static final long headOffset = - objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class); - private static final long tailOffset = - objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class); - private static final long sweepVotesOffset = - objectFieldOffset(UNSAFE, "sweepVotes", LinkedTransferQueue.class); - - static long objectFieldOffset(sun.misc.Unsafe UNSAFE, - String field, Class klazz) { + private static final sun.misc.Unsafe UNSAFE; + private static final long headOffset; + private static final long tailOffset; + private static final long sweepVotesOffset; + static { try { - return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field)); - } catch (NoSuchFieldException e) { - // Convert Exception to corresponding Error - NoSuchFieldError error = new NoSuchFieldError(field); - error.initCause(e); - throw error; + UNSAFE = getUnsafe(); + Class k = LinkedTransferQueue.class; + headOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("head")); + tailOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("tail")); + sweepVotesOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("sweepVotes")); + } catch (Exception e) { + throw new Error(e); } } @@ -1250,22 +1331,23 @@ public class LinkedTransferQueue exte static sun.misc.Unsafe getUnsafe() { try { return sun.misc.Unsafe.getUnsafe(); - } catch (SecurityException se) { - try { - return java.security.AccessController.doPrivileged - (new java.security - .PrivilegedExceptionAction() { - public sun.misc.Unsafe run() throws Exception { - java.lang.reflect.Field f = sun.misc - .Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (sun.misc.Unsafe) f.get(null); - }}); - } catch (java.security.PrivilegedActionException e) { - throw new RuntimeException("Could not initialize intrinsics", - e.getCause()); - } + } catch (SecurityException tryReflectionInstead) {} + try { + return java.security.AccessController.doPrivileged + (new java.security.PrivilegedExceptionAction() { + public sun.misc.Unsafe run() throws Exception { + Class k = sun.misc.Unsafe.class; + for (java.lang.reflect.Field f : k.getDeclaredFields()) { + f.setAccessible(true); + Object x = f.get(null); + if (k.isInstance(x)) + return k.cast(x); + } + throw new NoSuchFieldError("the Unsafe"); + }}); + } catch (java.security.PrivilegedActionException e) { + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); } } - }