--- jsr166/src/jsr166e/StripedAdder.java 2011/07/20 16:06:19 1.2
+++ jsr166/src/jsr166e/StripedAdder.java 2011/07/28 19:27:07 1.9
@@ -5,7 +5,6 @@
*/
package jsr166e;
-import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -16,20 +15,26 @@ import java.io.ObjectOutputStream;
/**
* A set of variables that together maintain a sum. When updates
- * (method {@link #add}) are contended across threads, the set of
- * adders may grow to reduce contention. Method {@link #sum} returns
- * the current combined total across these adders. This value is
- * NOT an atomic snapshot (concurrent updates may occur while
- * the sum is being calculated), and so cannot be used alone for
- * fine-grained synchronization control.
+ * (method {@link #add}) are contended across threads, this set of
+ * adder variables may grow dynamically to reduce contention. Method
+ * {@link #sum} returns the current combined total across these
+ * adders. This value is NOT an atomic snapshot (concurrent
+ * updates may occur while the sum is being calculated), and so cannot
+ * be used alone for fine-grained synchronization control.
*
*
This class may be applicable when many threads frequently
* update a common sum that is used for purposes such as collecting
* statistics. In this case, performance may be significantly faster
* than using a shared {@link AtomicLong}, at the expense of using
- * significantly more space. On the other hand, if it is known that
- * only one thread can ever update the sum, performance may be
- * significantly slower than just updating a local variable.
+ * much more space. On the other hand, if it is known that only one
+ * thread can ever update the sum, performance may be significantly
+ * slower than just updating a local variable.
+ *
+ *
A StripedAdder may optionally be constructed with a given
+ * expected contention level; i.e., the number of threads that are
+ * expected to concurrently update the sum. Supplying an accurate
+ * value may improve performance by reducing the need for dynamic
+ * adjustment.
*
* @author Doug Lea
*/
@@ -37,90 +42,131 @@ public class StripedAdder implements Ser
private static final long serialVersionUID = 7249069246863182397L;
/*
- * Overview: We maintain a table of AtomicLongs (padded to reduce
- * false sharing). The table is indexed by per-thread hash codes
- * that are initialized as random values. The table doubles in
- * size upon contention (as indicated by failed CASes when
- * performing add()), but is capped at the nearest power of two >=
- * #cpus: At that point, contention should be infrequent if each
- * thread has a unique index; so we instead adjust hash codes to
- * new random values upon contention rather than expanding. A
- * single spinlock is used for resizing the table as well as
- * populating slots with new Adders. Upon lock contention, threads
- * just try other slots rather than blocking. We guarantee that at
+ * A StripedAdder maintains a table of Atomic long variables. The
+ * table is indexed by per-thread hash codes.
+ *
+ * Table entries are of class Adder; a variant of AtomicLong
+ * padded to reduce cache contention on most processors. Padding
+ * is overkill for most Atomics because they are usually
+ * irregularly scattered in memory and thus don't interfere much
+ * with each other. But Atomic objects residing in arrays will
+ * tend to be placed adjacent to each other, and so will most
+ * often share cache lines (with a huge negative performance
+ * impact) without this precaution.
+ *
+ * Because Adders are relatively large, we avoid creating them
+ * until they are needed. On the other hand, we try to create them
+ * on any sign of contention.
+ *
+ * Per-thread hash codes are initialized to random values.
+ * Collisions are indicated by failed CASes when performing an add
+ * operation (see method retryAdd). Upon a collision, if the table
+ * size is less than the capacity, it is doubled in size unless
+ * some other thread holds lock. If a hashed slot is empty, and
+ * lock is available, a new Adder is created. Otherwise, if the
+ * slot exists, a CAS is tried. Retries proceed by "double
+ * hashing", using a secondary hash (Marsaglia XorShift) to try to
+ * find a free slot.
+ *
+ * By default, the table is lazily initialized. Upon first use,
+ * the table is set to size 2 (the minimum non-empty size), but
+ * containing only a single Adder. The maximum table size is
+ * bounded by nearest power of two >= the number of CPUS. The
+ * table size is capped because, when there are more threads than
+ * CPUs, supposing that each thread were bound to a CPU, there
+ * would exist a perfect hash function mapping threads to slots
+ * that eliminates collisions. When we reach capacity, we search
+ * for this mapping by randomly varying the hash codes of
+ * colliding threads. Because search is random, and failures only
+ * become known via CAS failures, convergence will be slow, and
+ * because threads are typically not bound to CPUS forever, may
+ * not occur at all. However, despite these limitations, observed
+ * contention is typically low in these cases.
+ *
+ * A single spinlock is used for resizing the table as well as
+ * populating slots with new Adders. After initialization, there
+ * is no need for a blocking lock: Upon lock contention, threads
+ * try other slots rather than blocking. After initialization, at
* least one slot exists, so retries will eventually find a
- * candidate Adder.
+ * candidate Adder. During these retries, there is increased
+ * contention and reduced locality, which is still better than
+ * alternatives.
*/
- /**
- * Number of processors, to place a cap on table growth.
- */
- static final int NCPU = Runtime.getRuntime().availableProcessors();
+ private static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
- * Version of AtomicLong padded to avoid sharing cache
- * lines on most processors
+ * Padded variant of AtomicLong. The value field is placed
+ * between pads, hoping that the JVM doesn't reorder them.
+ * Updates are via inlined CAS in methods add and retryAdd.
*/
- static final class Adder extends AtomicLong {
- long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd;
- Adder(long x) { super(x); }
+ static final class Adder {
+ volatile long p0, p1, p2, p3, p4, p5, p6;
+ volatile long value;
+ volatile long q0, q1, q2, q3, q4, q5, q6;
+ Adder(long x) { value = x; }
}
/**
- * Holder for the thread-local hash code.
+ * Holder for the thread-local hash code. The code is initially
+ * random, but may be set to a different value upon collisions.
*/
static final class HashCode {
+ static final Random rng = new Random();
int code;
- HashCode(int h) { code = h; }
+ HashCode() {
+ int h = rng.nextInt(); // Avoid zero, because of xorShift rehash
+ code = (h == 0) ? 1 : h;
+ }
}
/**
* The corresponding ThreadLocal class
*/
static final class ThreadHashCode extends ThreadLocal {
- static final Random rng = new Random();
- public HashCode initialValue() {
- int h = rng.nextInt();
- return new HashCode((h == 0) ? 1 : h); // ensure nonzero
- }
+ public HashCode initialValue() { return new HashCode(); }
}
/**
* Static per-thread hash codes. Shared across all StripedAdders
- * because adjustments due to collisions in one table are likely
- * to be appropriate for others.
+ * to reduce ThreadLocal pollution and because adjustments due to
+ * collisions in one table are likely to be appropriate for
+ * others.
*/
static final ThreadHashCode threadHashCode = new ThreadHashCode();
/**
- * Table of adders. Initially of size 2; grows to be at most NCPU.
+ * Table of adders. When non-null, size is a power of 2, at least 2.
*/
private transient volatile Adder[] adders;
/**
- * Serves as a lock when resizing and/or creating Adders. There
- * is no need for a blocking lock: When busy, other threads try
- * other slots.
+ * Spinlock (locked via CAS) used when resizing and/or creating Adders.
*/
- private final AtomicInteger mutex;
+ private volatile int busy;
/**
- * Marsaglia XorShift for rehashing on collisions
+ * Creates a new adder with zero sum.
*/
- private static int xorShift(int r) {
- r ^= r << 13;
- r ^= r >>> 17;
- return r ^ (r << 5);
+ public StripedAdder() {
}
/**
- * Creates a new adder with initially zero sum.
+ * Creates a new adder with zero sum, and with stripes presized
+ * for the given expected contention level.
+ *
+ * @param expectedContention the expected number of threads that
+ * will concurrently update the sum.
*/
- public StripedAdder() {
- Adder[] as = new Adder[2];
- as[0] = new Adder(0); // ensure at least one available adder
+ public StripedAdder(int expectedContention) {
+ int cap = (expectedContention < NCPU) ? expectedContention : NCPU;
+ int size = 2;
+ while (size < cap)
+ size <<= 1;
+ Adder[] as = new Adder[size];
+ for (int i = 0; i < size; ++i)
+ as[i] = new Adder(0);
this.adders = as;
- this.mutex = new AtomicInteger();
}
/**
@@ -129,77 +175,106 @@ public class StripedAdder implements Ser
* @param x the value to add
*/
public void add(long x) {
+ Adder[] as; Adder a; int n; // locals to hold volatile reads
HashCode hc = threadHashCode.get();
- for (int h = hc.code;;) {
- Adder[] as = adders;
- int n = as.length;
- Adder a = as[h & (n - 1)];
- if (a != null) {
- long v = a.get();
- if (a.compareAndSet(v, v + x))
- break;
- if (n >= NCPU) { // Collision when table at max
- h = hc.code = xorShift(h); // change code
- continue;
+ int h = hc.code;
+ boolean collide;
+ if ((as = adders) != null && (n = as.length) > 0 &&
+ (a = as[(n - 1) & h]) != null) {
+ long v = a.value;
+ if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
+ return;
+ collide = true;
+ }
+ else
+ collide = false;
+ retryAdd(x, hc, collide);
+ }
+
+ /**
+ * Handle cases of add involving initialization, resizing,
+ * creating new Adders, and/or contention. See above for
+ * explanation. This method suffers the usual non-modularity
+ * problems of optimistic retry code, relying on rechecked sets of
+ * reads.
+ */
+ private void retryAdd(long x, HashCode hc, boolean collide) {
+ int h = hc.code;
+ for (;;) {
+ Adder[] as; Adder a; int n;
+ if ((as = adders) != null && (n = as.length) > 0) {
+ if ((a = as[(n - 1) & h]) != null) {
+ boolean shared = true; // Slot exists
+ if (collide && n < NCPU && busy == 0 &&
+ UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
+ try {
+ if (adders == as) { // Expand table
+ Adder[] rs = new Adder[n << 1];
+ for (int i = 0; i < n; ++i)
+ rs[i] = as[i];
+ adders = rs;
+ shared = false;
+ }
+ } finally {
+ busy = 0;
+ }
+ if (shared || (h & n) != 0) {
+ collide = false;
+ continue; // Array or index changed
+ }
+ }
+ long v = a.value;
+ if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
+ break;
+ collide = shared;
}
- }
- final AtomicInteger mutex = this.mutex;
- if (mutex.get() != 0)
- h = xorShift(h); // Try elsewhere
- else if (mutex.compareAndSet(0, 1)) {
- boolean created = false;
- try {
- Adder[] rs = adders;
- if (a != null && rs == as) // Resize table
- rs = adders = Arrays.copyOf(as, as.length << 1);
- int j = h & (rs.length - 1);
- if (rs[j] == null) { // Create adder
- rs[j] = new Adder(x);
- created = true;
+ else { // Try to attach new Adder
+ if (busy == 0 &&
+ UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
+ boolean created = false;
+ try { // Recheck under lock
+ Adder[] rs; int m, j;
+ if ((rs = adders) != null && (m = rs.length) > 0 &&
+ rs[j = (m - 1) & h] == null) {
+ rs[j] = new Adder(x);
+ created = true;
+ }
+ } finally {
+ busy = 0;
+ }
+ if (created)
+ break;
+ continue; // Slot is now non-empty
}
- } finally {
- mutex.set(0);
+ collide = false;
}
- if (created) {
- hc.code = h; // Use this adder next time
- break;
+ h ^= h << 13; // Rehash
+ h ^= h >>> 17;
+ h ^= h << 5;
+ }
+ else if (busy == 0) { // Default-initialize
+ Adder r = new Adder(x);
+ Adder[] rs = new Adder[2];
+ rs[h & 1] = r;
+ if (adders == as && busy == 0 &&
+ UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
+ boolean init = false;
+ try {
+ if (adders == as) {
+ adders = rs;
+ init = true;
+ }
+ } finally {
+ busy = 0;
+ }
+ if (init)
+ break;
}
}
+ else if (adders == as) // Lost initialization race
+ Thread.yield();
}
- }
-
- /**
- * Returns an estimate of the current sum. The result is
- * calculated by summing multiple variables, so may not be
- * accurate if updates occur concurrently with this method.
- *
- * @return the estimated sum
- */
- public long sum() {
- long sum = 0;
- Adder[] as = adders;
- int n = as.length;
- for (int i = 0; i < n; ++i) {
- Adder a = as[i];
- if (a != null)
- sum += a.get();
- }
- return sum;
- }
-
- /**
- * Resets each of the variables to zero. This is effective in
- * fully resetting the sum only if there are no concurrent
- * updates.
- */
- public void reset() {
- Adder[] as = adders;
- int n = as.length;
- for (int i = 0; i < n; ++i) {
- Adder a = as[i];
- if (a != null)
- a.set(0L);
- }
+ hc.code = h; // Record index for next time
}
/**
@@ -217,19 +292,44 @@ public class StripedAdder implements Ser
}
/**
- * Equivalent to {@link #sum} followed by {@link #reset}.
+ * Returns an estimate of the current sum. The result is
+ * calculated by summing multiple variables, so may not be
+ * accurate if updates occur concurrently with this method.
*
* @return the estimated sum
*/
- public long sumAndReset() {
- long sum = 0;
+ public long sum() {
+ long sum = 0L;
Adder[] as = adders;
- int n = as.length;
- for (int i = 0; i < n; ++i) {
- Adder a = as[i];
- if (a != null) {
- sum += a.get();
- a.set(0L);
+ if (as != null) {
+ int n = as.length;
+ for (int i = 0; i < n; ++i) {
+ Adder a = as[i];
+ if (a != null)
+ sum += a.value;
+ }
+ }
+ return sum;
+ }
+
+ /**
+ * Resets each of the variables to zero, returning the estimated
+ * previous sum. This is effective in fully resetting the sum only
+ * if there are no concurrent updates.
+ *
+ * @return the estimated previous sum
+ */
+ public long reset() {
+ long sum = 0L;
+ Adder[] as = adders;
+ if (as != null) {
+ int n = as.length;
+ for (int i = 0; i < n; ++i) {
+ Adder a = as[i];
+ if (a != null) {
+ sum += a.value;
+ a.value = 0L;
+ }
}
}
return sum;
@@ -244,13 +344,54 @@ public class StripedAdder implements Ser
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
- long c = s.readLong();
- Adder[] as = new Adder[2];
- as[0] = new Adder(c);
- this.adders = as;
- mutex.set(0);
+ busy = 0;
+ add(s.readLong());
}
-}
+ // Unsafe mechanics
+ private static final sun.misc.Unsafe UNSAFE;
+ private static final long busyOffset;
+ private static final long valueOffset;
+ static {
+ try {
+ UNSAFE = getUnsafe();
+ Class> sk = StripedAdder.class;
+ busyOffset = UNSAFE.objectFieldOffset
+ (sk.getDeclaredField("busy"));
+ Class> ak = Adder.class;
+ valueOffset = UNSAFE.objectFieldOffset
+ (ak.getDeclaredField("value"));
+ } catch (Exception e) {
+ throw new Error(e);
+ }
+ }
+ /**
+ * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
+ * Replace with a simple call to Unsafe.getUnsafe when integrating
+ * into a jdk.
+ *
+ * @return a sun.misc.Unsafe
+ */
+ private static sun.misc.Unsafe getUnsafe() {
+ try {
+ return sun.misc.Unsafe.getUnsafe();
+ } catch (SecurityException se) {
+ try {
+ return java.security.AccessController.doPrivileged
+ (new java.security
+ .PrivilegedExceptionAction() {
+ public sun.misc.Unsafe run() throws Exception {
+ java.lang.reflect.Field f = sun.misc
+ .Unsafe.class.getDeclaredField("theUnsafe");
+ f.setAccessible(true);
+ return (sun.misc.Unsafe) f.get(null);
+ }});
+ } catch (java.security.PrivilegedActionException e) {
+ throw new RuntimeException("Could not initialize intrinsics",
+ e.getCause());
+ }
+ }
+ }
+}