--- jsr166/src/jsr166e/StripedAdder.java 2011/07/26 17:16:36 1.6 +++ jsr166/src/jsr166e/StripedAdder.java 2011/07/30 16:26:34 1.12 @@ -5,7 +5,6 @@ */ package jsr166e; -import java.util.Arrays; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -15,27 +14,33 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; /** - * A set of variables that together maintain a sum. When updates - * (method {@link #add}) are contended across threads, this set of - * adder variables may grow dynamically to reduce contention. Method - * {@link #sum} returns the current combined total across these - * adders. This value is NOT an atomic snapshot (concurrent - * updates may occur while the sum is being calculated), and so cannot - * be used alone for fine-grained synchronization control. + * One or more variables that together maintain an initially zero sum. + * When updates (method {@link #add}) are contended across threads, + * the set of variables may grow dynamically to reduce contention. * - *
This class may be applicable when many threads frequently - * update a common sum that is used for purposes such as collecting - * statistics. In this case, performance may be significantly faster - * than using a shared {@link AtomicLong}, at the expense of using - * much more space. On the other hand, if it is known that only one - * thread can ever update the sum, performance may be significantly - * slower than just updating a local variable. + *
This class is usually preferable to {@link AtomicLong} when + * multiple threads update a common sum that is used for purposes such + * as collecting statistics, not for fine-grained synchronization + * control. Under high update contention, throughput of this class is + * expected to be significantly higher, at the expense of higher space + * consumption. Under low contention, this class imposes very little + * time and space overhead compared to AtomicLong. On the other hand, + * in contexts where it is statically known that only one thread can + * ever update a sum, time and space overhead is noticeably greater + * than just updating a local variable. * - *
A StripedAdder may optionally be constructed with a given - * expected contention level; i.e., the number of threads that are - * expected to concurrently update the sum. Supplying an accurate - * value may improve performance by reducing the need for dynamic - * adjustment. + *
Method {@link #sum} returns the current combined total across
+ * the variables maintaining the sum. This value is NOT an
+ * atomic snapshot: Concurrent updates may occur while the sum is
+ * being calculated. However, updates cannot be "lost", so invocation
+ * of sum
in the absence of concurrent updates always
+ * returns an accurate result. The sum may also be reset
+ * to zero, as an alternative to creating a new adder. However,
+ * method {@link #reset} is intrinsically racy, so should only be used
+ * when it is known that no threads are concurrently updating the sum.
+ *
+ *
jsr166e note: This class is targeted to be placed in
+ * java.util.concurrent.atomic
*
* @author Doug Lea
*/
@@ -43,73 +48,79 @@ public class StripedAdder implements Ser
private static final long serialVersionUID = 7249069246863182397L;
/*
- * A StripedAdder maintains a table of Atomic long variables. The
- * table is indexed by per-thread hash codes.
+ * A StripedAdder maintains a lazily-initialized table of
+ * atomically updated variables, plus an extra "base" field. The
+ * table size is a power of two. Indexing uses masked per-thread
+ * hash codes
*
- * By default, the table is lazily initialized, to minimize
- * footprint until adders are used. On first use, the table is set
- * to size DEFAULT_INITIAL_SIZE (currently 8). Table size is
- * bounded by the number of CPUS (if larger than the default
- * size).
+ * Table entries are of class Cell; a variant of AtomicLong padded
+ * to reduce cache contention on most processors. Padding is
+ * overkill for most Atomics because they are usually irregularly
+ * scattered in memory and thus don't interfere much with each
+ * other. But Atomic objects residing in arrays will tend to be
+ * placed adjacent to each other, and so will most often share
+ * cache lines (with a huge negative performance impact) without
+ * this precaution.
+ *
+ * In part because Cells are relatively large, we avoid creating
+ * them until they are needed. When there is no contention, all
+ * updates are made to the base field. Upon first contention (a
+ * failed CAS on base update), the table is initialized to size 2.
+ * The table size is doubled upon further contention until
+ * reaching the nearest power of two greater than or equal to the
+ * number of CPUS.
*
* Per-thread hash codes are initialized to random values.
- * Collisions are indicated by failed CASes when performing an add
- * operation (see method retryAdd). Upon a collision, if the table
- * size is less than the capacity, it is doubled in size unless
- * some other thread holds lock. If a hashed slot is empty, and
- * lock is available, a new Adder is created. Otherwise, if the
- * slot exists, a CAS is tried. Retries proceed by "double
- * hashing", using a secondary hash (Marsaglia XorShift) to try to
- * find a free slot.
+ * Contention and/or table collisions are indicated by failed
+ * CASes when performing an add operation (see method
+ * retryAdd). Upon a collision, if the table size is less than the
+ * capacity, it is doubled in size unless some other thread holds
+ * the lock. If a hashed slot is empty, and lock is available, a
+ * new Cell is created. Otherwise, if the slot exists, a CAS is
+ * tried. Retries proceed by "double hashing", using a secondary
+ * hash (Marsaglia XorShift) to try to find a free slot.
*
* The table size is capped because, when there are more threads
* than CPUs, supposing that each thread were bound to a CPU,
* there would exist a perfect hash function mapping threads to
* slots that eliminates collisions. When we reach capacity, we
* search for this mapping by randomly varying the hash codes of
- * colliding threads. Because search is random, and failures only
- * become known via CAS failures, convergence will be slow, and
- * because threads are typically not bound to CPUS forever, may
- * not occur at all. However, despite these limitations, observed
- * contention is typically low in these cases.
+ * colliding threads. Because search is random, and collisions
+ * only become known via CAS failures, convergence can be slow,
+ * and because threads are typically not bound to CPUS forever,
+ * may not occur at all. However, despite these limitations,
+ * observed contention rates are typically low in these cases.
*
- * Table entries are of class Adder; a form of AtomicLong padded
- * to reduce cache contention on most processors. Padding is
- * overkill for most Atomics because they are usually irregularly
- * scattered in memory and thus don't interfere much with each
- * other. But Atomic objects residing in arrays will tend to be
- * placed adjacent to each other, and so will most often share
- * cache lines without this precaution. Adders are by default
- * constructed upon first use, which further improves per-thread
- * locality and helps reduce footprint.
+ * A single spinlock is used for initializing and resizing the
+ * table, as well as populating slots with new Cells. There is no
+ * need for a blocking lock: Upon lock contention, threads try
+ * other slots (or the base) rather than blocking. During these
+ * retries, there is increased contention and reduced locality,
+ * which is still better than alternatives.
+ *
+ * It is possible for a Cell to become unused when threads that
+ * once hashed to it terminate, as well as in the case where
+ * doubling the table causes no thread to hash to it under
+ * expanded mask. We do not try to detect or remove such cells,
+ * under the assumption that for long-running adders, observed
+ * contention levels will recur, so the cells will eventually be
+ * needed again; and for short-lived ones, it does not matter.
*
- * A single spinlock is used for resizing the table as well as
- * populating slots with new Adders. Upon lock contention, threads
- * try other slots rather than blocking. After initialization, at
- * least one slot exists, so retries will eventually find a
- * candidate Adder. During these retries, there is increased
- * contention and reduced locality, which is still better than
- * alternatives.
- */
-
- /**
- * Padded version of AtomicLong
*/
- static final class Adder extends AtomicLong {
- long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
- Adder(long x) { super(x); }
- }
private static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
- * Table bounds. DEFAULT_INITIAL_SIZE is the table size set upon
- * first use under default constructor, and must be a power of
- * two. There is not much point in making size a lot smaller than
- * that of Adders though. CAP is the maximum allowed table size.
- */
- private static final int DEFAULT_INITIAL_SIZE = 8;
- private static final int CAP = Math.max(NCPU, DEFAULT_INITIAL_SIZE);
+ * Padded variant of AtomicLong. The value field is placed
+ * between pads, hoping that the JVM doesn't reorder them.
+ * Updates are via inlined CAS in methods add and retryAdd.
+ */
+ static final class Cell {
+ volatile long p0, p1, p2, p3, p4, p5, p6;
+ volatile long value;
+ volatile long q0, q1, q2, q3, q4, q5, q6;
+ Cell(long x) { value = x; }
+ }
/**
* Holder for the thread-local hash code. The code is initially
@@ -119,8 +130,8 @@ public class StripedAdder implements Ser
static final Random rng = new Random();
int code;
HashCode() {
- int h = rng.nextInt();
- code = (h == 0) ? 1 : h; // ensure nonzero
+ int h = rng.nextInt(); // Avoid zero to allow xorShift rehash
+ code = (h == 0) ? 1 : h;
}
}
@@ -140,44 +151,25 @@ public class StripedAdder implements Ser
static final ThreadHashCode threadHashCode = new ThreadHashCode();
/**
- * Table of adders. Size is power of two, grows to be at most CAP.
+ * Table of cells. When non-null, size is a power of 2.
*/
- private transient volatile Adder[] adders;
+ private transient volatile Cell[] cells;
/**
- * Serves as a lock when resizing and/or creating Adders. There
- * is no need for a blocking lock: Except during initialization
- * races, when busy, other threads try other slots. However,
- * during (double-checked) initializations, we use the
- * "synchronized" lock on this object.
+ * Base sum, used mainly when there is no contention, but also as
+ * a fallback during table initializion races. Updated via CAS.
*/
- private final AtomicInteger mutex;
+ private transient volatile long base;
/**
- * Creates a new adder with zero sum.
+ * Spinlock (locked via CAS) used when resizing and/or creating Cells.
*/
- public StripedAdder() {
- this.mutex = new AtomicInteger();
- // remaining initialization on first call to add.
- }
+ private transient volatile int busy;
/**
- * Creates a new adder with zero sum, and with stripes presized
- * for the given expected contention level.
- *
- * @param expectedContention the expected number of threads that
- * will concurrently update the sum.
- */
- public StripedAdder(int expectedContention) {
- int cap = (expectedContention < CAP) ? expectedContention : CAP;
- int size = 1;
- while (size < cap)
- size <<= 1;
- Adder[] as = new Adder[size];
- for (int i = 0; i < size; ++i)
- as[i] = new Adder(0);
- this.adders = as;
- this.mutex = new AtomicInteger();
+ * Creates a new adder with initial sum of zero.
+ */
+ public StripedAdder() {
}
/**
@@ -186,142 +178,195 @@ public class StripedAdder implements Ser
* @param x the value to add
*/
public void add(long x) {
- Adder[] as; Adder a; int n; long v; // locals to hold volatile reads
- HashCode hc = threadHashCode.get();
- int h = hc.code;
- if ((as = adders) == null || (n = as.length) < 1 ||
- (a = as[(n - 1) & h]) == null ||
- !a.compareAndSet(v = a.get(), v + x))
- retryAdd(x, hc);
+ Cell[] as; long v; HashCode hc; Cell a; int n; boolean contended;
+ if ((as = cells) != null ||
+ !UNSAFE.compareAndSwapLong(this, baseOffset, v = base, v + x)) {
+ int h = (hc = threadHashCode.get()).code;
+ if (as != null && (n = as.length) > 0 &&
+ (a = as[(n - 1) & h]) != null) {
+ if (UNSAFE.compareAndSwapLong(a, valueOffset,
+ v = a.value, v + x))
+ return;
+ contended = true;
+ }
+ else
+ contended = false;
+ retryAdd(x, hc, contended);
+ }
}
/**
* Handle cases of add involving initialization, resizing,
- * creating new Adders, and/or contention. See above for
- * explanation.
+ * creating new Cells, and/or contention. See above for
+ * explanation. This method suffers the usual non-modularity
+ * problems of optimistic retry code, relying on rechecked sets of
+ * reads.
+ *
+ * @param x the value to add
+ * @param hc the hash code holder
+ * @param precontended true if CAS failed before call
*/
- private void retryAdd(long x, HashCode hc) {
+ private void retryAdd(long x, HashCode hc, boolean precontended) {
int h = hc.code;
- final AtomicInteger mutex = this.mutex;
- int collisions = 1 - mutex.get(); // first guess: collides if not locked
+ boolean collide = false; // true if last slot nonempty
for (;;) {
- Adder[] as; Adder a; long v; int k, n;
- while ((as = adders) == null || (n = as.length) < 1) {
- synchronized(mutex) { // Try to initialize
- if (adders == null) {
- Adder[] rs = new Adder[DEFAULT_INITIAL_SIZE];
- rs[h & (DEFAULT_INITIAL_SIZE - 1)] = new Adder(0);
- adders = rs;
+ Cell[] as; Cell a; int n;
+ if ((as = cells) != null && (n = as.length) > 0) {
+ if ((a = as[(n - 1) & h]) == null) {
+ if (busy == 0) { // Try to attach new Cell
+ Cell r = new Cell(x); // Optimistically create
+ if (busy == 0 &&
+ UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
+ boolean created = false;
+ try { // Recheck under lock
+ Cell[] rs; int m, j;
+ if ((rs = cells) != null &&
+ (m = rs.length) > 0 &&
+ rs[j = (m - 1) & h] == null) {
+ rs[j] = r;
+ created = true;
+ }
+ } finally {
+ busy = 0;
+ }
+ if (created)
+ break;
+ continue; // Slot is now non-empty
+ }
}
+ collide = false;
}
- collisions = 0;
- }
-
- if ((a = as[k = (n - 1) & h]) == null) { // Try to add slot
- if (mutex.get() == 0 && mutex.compareAndSet(0, 1)) {
- try {
- if (adders == as && as[k] == null)
- a = as[k] = new Adder(x);
- } finally {
- mutex.set(0);
- }
- if (a != null)
+ else if (precontended) // CAS already known to fail
+ precontended = false; // Continue after rehash
+ else {
+ long v = a.value;
+ if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
break;
+ if (!collide)
+ collide = true;
+ else if (n >= NCPU || cells != as)
+ collide = false; // Can't expand
+ else if (busy == 0 &&
+ UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
+ collide = false;
+ try {
+ if (cells == as) { // Expand table
+ Cell[] rs = new Cell[n << 1];
+ for (int i = 0; i < n; ++i)
+ rs[i] = as[i];
+ cells = rs;
+ }
+ } finally {
+ busy = 0;
+ }
+ continue;
+ }
}
- collisions = 0;
+ h ^= h << 13; // Rehash
+ h ^= h >>> 17;
+ h ^= h << 5;
}
- else if (collisions != 0 && n < CAP && // Try to expand table
- mutex.get() == 0 && mutex.compareAndSet(0, 1)) {
- try {
- if (adders == as) {
- Adder[] rs = new Adder[n << 1];
- for (int i = 0; i < n; ++i)
- rs[i] = as[i];
- adders = rs;
+ else if (busy == 0 && cells == as &&
+ UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
+ boolean init = false;
+ try { // Initialize
+ if (cells == as) {
+ Cell r = new Cell(x);
+ Cell[] rs = new Cell[2];
+ rs[h & 1] = r;
+ cells = rs;
+ init = true;
}
} finally {
- mutex.set(0);
+ busy = 0;
}
- collisions = 0;
+ if (init)
+ break;
+ }
+ else { // Lost initialization race
+ long b = base; // Fall back on using base
+ if (UNSAFE.compareAndSwapLong(this, baseOffset, b, b + x))
+ break;
}
- else if (a.compareAndSet(v = a.get(), v + x))
- break;
- else
- collisions = 1;
- h ^= h << 13; // Rehash
- h ^= h >>> 17;
- h ^= h << 5;
}
- hc.code = h;
+ hc.code = h; // Record index for next time
}
/**
- * Returns an estimate of the current sum. The result is
- * calculated by summing multiple variables, so may not be
- * accurate if updates occur concurrently with this method.
+ * Equivalent to {@code add(1)}.
+ */
+ public void increment() {
+ add(1L);
+ }
+
+ /**
+ * Equivalent to {@code add(-1)}.
+ */
+ public void decrement() {
+ add(-1L);
+ }
+
+ /**
+ * Returns the current sum. The result is only guaranteed to be
+ * accurate in the absence of concurrent updates. Otherwise, it
+ * may fail to reflect one or more updates occuring while
+ * calculating the result.
*
- * @return the estimated sum
+ * @return the sum
*/
public long sum() {
- long sum = 0L;
- Adder[] as = adders;
+ Cell[] as = cells;
+ long sum = base;
if (as != null) {
int n = as.length;
for (int i = 0; i < n; ++i) {
- Adder a = as[i];
+ Cell a = as[i];
if (a != null)
- sum += a.get();
+ sum += a.value;
}
}
return sum;
}
/**
- * Resets each of the variables to zero. This is effective in
- * fully resetting the sum only if there are no concurrent
- * updates.
+ * Resets variables maintaining the sum to zero. This is
+ * effective in setting the sum to zero only if there are no
+ * concurrent updates.
*/
public void reset() {
- Adder[] as = adders;
+ Cell[] as = cells;
+ base = 0L;
if (as != null) {
int n = as.length;
for (int i = 0; i < n; ++i) {
- Adder a = as[i];
+ Cell a = as[i];
if (a != null)
- a.set(0L);
+ a.value = 0L;
}
}
}
/**
- * Equivalent to {@code add(1)}.
- */
- public void increment() {
- add(1L);
- }
-
- /**
- * Equivalent to {@code add(-1)}.
- */
- public void decrement() {
- add(-1L);
- }
-
- /**
- * Equivalent to {@link #sum} followed by {@link #reset}.
+ * Equivalent in effect to {@link #sum} followed by {@link
+ * #reset}. This method may apply for example during quiescent
+ * points between multithreaded computations. If there are
+ * updates concurrent with this method, the returned value is
+ * not guaranteed to be the final sum occurring before
+ * the reset.
*
- * @return the estimated sum
+ * @return the sum
*/
- public long sumAndReset() {
- long sum = 0L;
- Adder[] as = adders;
+ public long sumThenReset() {
+ Cell[] as = cells;
+ long sum = base;
+ base = 0L;
if (as != null) {
int n = as.length;
for (int i = 0; i < n; ++i) {
- Adder a = as[i];
+ Cell a = as[i];
if (a != null) {
- sum += a.get();
- a.set(0L);
+ sum += a.value;
+ a.value = 0L;
}
}
}
@@ -337,8 +382,58 @@ public class StripedAdder implements Ser
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
- mutex.set(0);
- add(s.readLong());
+ busy = 0;
+ cells = null;
+ base = s.readLong();
+ }
+
+ // Unsafe mechanics
+ private static final sun.misc.Unsafe UNSAFE;
+ private static final long baseOffset;
+ private static final long busyOffset;
+ private static final long valueOffset;
+ static {
+ try {
+ UNSAFE = getUnsafe();
+ Class> sk = StripedAdder.class;
+ baseOffset = UNSAFE.objectFieldOffset
+ (sk.getDeclaredField("base"));
+ busyOffset = UNSAFE.objectFieldOffset
+ (sk.getDeclaredField("busy"));
+ Class> ak = Cell.class;
+ valueOffset = UNSAFE.objectFieldOffset
+ (ak.getDeclaredField("value"));
+ } catch (Exception e) {
+ throw new Error(e);
+ }
+ }
+
+ /**
+ * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
+ * Replace with a simple call to Unsafe.getUnsafe when integrating
+ * into a jdk.
+ *
+ * @return a sun.misc.Unsafe
+ */
+ private static sun.misc.Unsafe getUnsafe() {
+ try {
+ return sun.misc.Unsafe.getUnsafe();
+ } catch (SecurityException se) {
+ try {
+ return java.security.AccessController.doPrivileged
+ (new java.security
+ .PrivilegedExceptionAction