--- jsr166/src/jsr166e/StripedAdder.java 2011/07/22 13:25:12 1.3
+++ jsr166/src/jsr166e/StripedAdder.java 2011/07/29 13:50:54 1.10
@@ -5,7 +5,6 @@
*/
package jsr166e;
-import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -27,9 +26,9 @@ import java.io.ObjectOutputStream;
* update a common sum that is used for purposes such as collecting
* statistics. In this case, performance may be significantly faster
* than using a shared {@link AtomicLong}, at the expense of using
- * significantly more space. On the other hand, if it is known that
- * only one thread can ever update the sum, performance may be
- * significantly slower than just updating a local variable.
+ * more space. On the other hand, if it is known that only one thread
+ * can ever update the sum, performance may be significantly slower
+ * than just updating a local variable.
*
*
A StripedAdder may optionally be constructed with a given
* expected contention level; i.e., the number of threads that are
@@ -43,14 +42,37 @@ public class StripedAdder implements Ser
private static final long serialVersionUID = 7249069246863182397L;
/*
- * Overview: We maintain a table of Atomic long variables. The
- * table is indexed by per-thread hash codes that are initialized
- * to random values.
- *
- * The table doubles in size upon contention (as indicated by
- * failed CASes when performing add()), but is capped at the
- * nearest power of two >= #CPUS. This reflects the idea that,
- * when there are more threads than CPUs, then if each thread were
+ * A StripedAdder maintains a table of Atomic long variables. The
+ * table is indexed by per-thread hash codes.
+ *
+ * Table entries are of class Adder; a variant of AtomicLong
+ * padded to reduce cache contention on most processors. Padding
+ * is overkill for most Atomics because they are usually
+ * irregularly scattered in memory and thus don't interfere much
+ * with each other. But Atomic objects residing in arrays will
+ * tend to be placed adjacent to each other, and so will most
+ * often share cache lines (with a huge negative performance
+ * impact) without this precaution.
+ *
+ * Because Adders are relatively large, we avoid creating them
+ * until they are needed. On the other hand, we try to create them
+ * on any sign of contention.
+ *
+ * Per-thread hash codes are initialized to random values.
+ * Collisions are indicated by failed CASes when performing an add
+ * operation (see method retryAdd). Upon a collision, if the table
+ * size is less than the capacity, it is doubled in size unless
+ * some other thread holds lock. If a hashed slot is empty, and
+ * lock is available, a new Adder is created. Otherwise, if the
+ * slot exists, a CAS is tried. Retries proceed by "double
+ * hashing", using a secondary hash (Marsaglia XorShift) to try to
+ * find a free slot.
+ *
+ * By default, the table is lazily initialized. Upon first use,
+ * the table is set to size 1, and contains a single Adder. The
+ * maximum table size is bounded by nearest power of two >= the
+ * number of CPUS. The table size is capped because, when there
+ * are more threads than CPUs, supposing that each thread were
* bound to a CPU, there would exist a perfect hash function
* mapping threads to slots that eliminates collisions. When we
* reach capacity, we search for this mapping by randomly varying
@@ -58,97 +80,75 @@ public class StripedAdder implements Ser
* and failures only become known via CAS failures, convergence
* will be slow, and because threads are typically not bound to
* CPUS forever, may not occur at all. However, despite these
- * limitations, observed contention is typically very low in these
+ * limitations, observed contention is typically low in these
* cases.
*
- * Table entries are of class Adder; a form of AtomicLong padded
- * to reduce cache contention on most processors. Padding is
- * overkill for most Atomics because they are most often
- * irregularly scattered in memory and thus don't interfere much
- * with each other. But Atomic objects residing in arrays will
- * tend to be placed adjacent to each other, and so will most
- * often share cache lines without this precaution. Except for
- * slot adders[0], Adders are constructed upon first use, which
- * further improves per-thread locality and helps reduce (an
- * already large) footprint.
- *
* A single spinlock is used for resizing the table as well as
- * populating slots with new Adders. Upon lock contention, threads
- * try other slots rather than blocking. We guarantee that at
- * least one slot (0) exists, so retries will eventually find a
- * candidate Adder. During these retries, there is increased
+ * populating slots with new Adders. After initialization, there
+ * is no need for a blocking lock: Upon lock contention, threads
+ * try other slots rather than blocking. After initialization, at
+ * least one slot exists, so retries will eventually find a
+ * candidate Adder. During these retries, there is increased
* contention and reduced locality, which is still better than
* alternatives.
*/
- /**
- * Number of processors, to place a cap on table growth.
- */
- static final int NCPU = Runtime.getRuntime().availableProcessors();
+ private static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
- * Padded version of AtomicLong
+ * Padded variant of AtomicLong. The value field is placed
+ * between pads, hoping that the JVM doesn't reorder them.
+ * Updates are via inlined CAS in methods add and retryAdd.
*/
- static final class Adder extends AtomicLong {
- long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd;
- Adder(long x) { super(x); }
+ static final class Adder {
+ volatile long p0, p1, p2, p3, p4, p5, p6;
+ volatile long value;
+ volatile long q0, q1, q2, q3, q4, q5, q6;
+ Adder(long x) { value = x; }
}
/**
- * Holder for the thread-local hash code. The code starts off with
- * a given random value, but may be set to a different
- * pseudo-random value (using a cheaper but adequate xorshift
- * generator) upon collisions.
+ * Holder for the thread-local hash code. The code is initially
+ * random, but may be set to a different value upon collisions.
*/
static final class HashCode {
+ static final Random rng = new Random();
int code;
- HashCode(int h) { code = h; }
+ HashCode() {
+ int h = rng.nextInt(); // Avoid zero, because of xorShift rehash
+ code = (h == 0) ? 1 : h;
+ }
}
/**
* The corresponding ThreadLocal class
*/
static final class ThreadHashCode extends ThreadLocal {
- static final Random rng = new Random();
- public HashCode initialValue() {
- int h = rng.nextInt();
- return new HashCode((h == 0) ? 1 : h); // ensure nonzero
- }
+ public HashCode initialValue() { return new HashCode(); }
}
/**
* Static per-thread hash codes. Shared across all StripedAdders
- * because adjustments due to collisions in one table are likely
- * to be appropriate for others.
+ * to reduce ThreadLocal pollution and because adjustments due to
+ * collisions in one table are likely to be appropriate for
+ * others.
*/
static final ThreadHashCode threadHashCode = new ThreadHashCode();
/**
- * Table of adders. Minimum size 2. Size grows to be at most NCPU.
+ * Table of adders. When non-null, size is a power of 2.
*/
private transient volatile Adder[] adders;
/**
- * Serves as a lock when resizing and/or creating Adders. There
- * is no need for a blocking lock: When busy, other threads try
- * other slots.
- */
- private final AtomicInteger mutex;
-
- /**
- * Marsaglia XorShift random generator for rehashing on collisions
+ * Spinlock (locked via CAS) used when resizing and/or creating Adders.
*/
- private static int xorShift(int r) {
- r ^= r << 13;
- r ^= r >>> 17;
- return r ^ (r << 5);
- }
+ private volatile int busy;
/**
* Creates a new adder with zero sum.
*/
public StripedAdder() {
- this(2);
}
/**
@@ -160,13 +160,13 @@ public class StripedAdder implements Ser
*/
public StripedAdder(int expectedContention) {
int cap = (expectedContention < NCPU) ? expectedContention : NCPU;
- int size = 2;
+ int size = 1;
while (size < cap)
size <<= 1;
Adder[] as = new Adder[size];
- as[0] = new Adder(0); // ensure at least one available adder
+ for (int i = 0; i < size; ++i)
+ as[i] = new Adder(0);
this.adders = as;
- this.mutex = new AtomicInteger();
}
/**
@@ -175,77 +175,114 @@ public class StripedAdder implements Ser
* @param x the value to add
*/
public void add(long x) {
+ Adder[] as; Adder a; int n; // locals to hold volatile reads
HashCode hc = threadHashCode.get();
- for (int h = hc.code;;) {
- Adder[] as = adders;
- int n = as.length;
- Adder a = as[h & (n - 1)];
- if (a != null) {
- long v = a.get();
- if (a.compareAndSet(v, v + x))
- break;
- if (n >= NCPU) { // Collision when table at max
- h = hc.code = xorShift(h); // change code
- continue;
- }
- }
- final AtomicInteger mutex = this.mutex;
- if (mutex.get() != 0)
- h = xorShift(h); // Try elsewhere
- else if (mutex.compareAndSet(0, 1)) {
- boolean created = false;
- try {
- Adder[] rs = adders;
- if (a != null && rs == as) // Resize table
- rs = adders = Arrays.copyOf(as, as.length << 1);
- int j = h & (rs.length - 1);
- if (rs[j] == null) { // Create adder
- rs[j] = new Adder(x);
- created = true;
- }
- } finally {
- mutex.set(0);
- }
- if (created) {
- hc.code = h; // Use this adder next time
- break;
- }
- }
+ int h = hc.code;
+ boolean contended;
+ if ((as = adders) != null && (n = as.length) > 0 &&
+ (a = as[(n - 1) & h]) != null) {
+ long v = a.value;
+ if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
+ return;
+ contended = true;
}
+ else
+ contended = false;
+ retryAdd(x, hc, contended);
}
/**
- * Returns an estimate of the current sum. The result is
- * calculated by summing multiple variables, so may not be
- * accurate if updates occur concurrently with this method.
+ * Handle cases of add involving initialization, resizing,
+ * creating new Adders, and/or contention. See above for
+ * explanation. This method suffers the usual non-modularity
+ * problems of optimistic retry code, relying on rechecked sets of
+ * reads.
*
- * @return the estimated sum
- */
- public long sum() {
- long sum = 0;
- Adder[] as = adders;
- int n = as.length;
- for (int i = 0; i < n; ++i) {
- Adder a = as[i];
- if (a != null)
- sum += a.get();
- }
- return sum;
- }
-
- /**
- * Resets each of the variables to zero. This is effective in
- * fully resetting the sum only if there are no concurrent
- * updates.
+ * @param x the value to add
+ * @param hc the hash code holder
+ * @param precontended true if CAS failed before call
*/
- public void reset() {
- Adder[] as = adders;
- int n = as.length;
- for (int i = 0; i < n; ++i) {
- Adder a = as[i];
- if (a != null)
- a.set(0L);
+ private void retryAdd(long x, HashCode hc, boolean precontended) {
+ int h = hc.code;
+ boolean collide = false; // true if last slot nonempty
+ for (;;) {
+ Adder[] as; Adder a; int n;
+ if ((as = adders) != null && (n = as.length) > 0) {
+ if ((a = as[(n - 1) & h]) == null) {
+ if (busy == 0 && // Try to attach new Adder
+ UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
+ boolean created = false;
+ try { // Recheck under lock
+ Adder[] rs; int m, j;
+ if ((rs = adders) != null && (m = rs.length) > 0 &&
+ rs[j = (m - 1) & h] == null) {
+ rs[j] = new Adder(x);
+ created = true;
+ }
+ } finally {
+ busy = 0;
+ }
+ if (created)
+ break;
+ continue; // Slot is now non-empty
+ }
+ collide = false;
+ }
+ else if (precontended) // CAS already known to fail
+ precontended = false; // Continue after rehash
+ else {
+ long v = a.value;
+ if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
+ break;
+ if (!collide)
+ collide = true;
+ else if (n >= NCPU || adders != as)
+ collide = false; // Don't expand
+ else if (busy == 0 &&
+ UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
+ collide = false;
+ try {
+ if (adders == as) { // Expand table
+ Adder[] rs = new Adder[n << 1];
+ for (int i = 0; i < n; ++i)
+ rs[i] = as[i];
+ adders = rs;
+ }
+ } finally {
+ busy = 0;
+ }
+ continue;
+ }
+ }
+ h ^= h << 13; // Rehash
+ h ^= h >>> 17;
+ h ^= h << 5;
+ }
+ else if (adders == as) { // Try to default-initialize
+ Adder[] rs = new Adder[1];
+ rs[0] = new Adder(x);
+ boolean init = false;
+ while (adders == as) {
+ if (UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
+ try {
+ if (adders == as) {
+ adders = rs;
+ init = true;
+ }
+ } finally {
+ busy = 0;
+ }
+ break;
+ }
+ if (adders != as)
+ break;
+ Thread.yield(); // Back off
+ }
+ if (init)
+ break;
+ }
}
+ hc.code = h; // Record index for next time
}
/**
@@ -263,19 +300,44 @@ public class StripedAdder implements Ser
}
/**
- * Equivalent to {@link #sum} followed by {@link #reset}.
+ * Returns an estimate of the current sum. The result is
+ * calculated by summing multiple variables, so may not be
+ * accurate if updates occur concurrently with this method.
*
* @return the estimated sum
*/
- public long sumAndReset() {
- long sum = 0;
+ public long sum() {
+ long sum = 0L;
+ Adder[] as = adders;
+ if (as != null) {
+ int n = as.length;
+ for (int i = 0; i < n; ++i) {
+ Adder a = as[i];
+ if (a != null)
+ sum += a.value;
+ }
+ }
+ return sum;
+ }
+
+ /**
+ * Resets each of the variables to zero, returning the estimated
+ * previous sum. This is effective in fully resetting the sum only
+ * if there are no concurrent updates.
+ *
+ * @return the estimated previous sum
+ */
+ public long reset() {
+ long sum = 0L;
Adder[] as = adders;
- int n = as.length;
- for (int i = 0; i < n; ++i) {
- Adder a = as[i];
- if (a != null) {
- sum += a.get();
- a.set(0L);
+ if (as != null) {
+ int n = as.length;
+ for (int i = 0; i < n; ++i) {
+ Adder a = as[i];
+ if (a != null) {
+ sum += a.value;
+ a.value = 0L;
+ }
}
}
return sum;
@@ -290,13 +352,54 @@ public class StripedAdder implements Ser
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
- long c = s.readLong();
- Adder[] as = new Adder[2];
- as[0] = new Adder(c);
- this.adders = as;
- mutex.set(0);
+ busy = 0;
+ add(s.readLong());
}
-}
+ // Unsafe mechanics
+ private static final sun.misc.Unsafe UNSAFE;
+ private static final long busyOffset;
+ private static final long valueOffset;
+ static {
+ try {
+ UNSAFE = getUnsafe();
+ Class> sk = StripedAdder.class;
+ busyOffset = UNSAFE.objectFieldOffset
+ (sk.getDeclaredField("busy"));
+ Class> ak = Adder.class;
+ valueOffset = UNSAFE.objectFieldOffset
+ (ak.getDeclaredField("value"));
+ } catch (Exception e) {
+ throw new Error(e);
+ }
+ }
+ /**
+ * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
+ * Replace with a simple call to Unsafe.getUnsafe when integrating
+ * into a jdk.
+ *
+ * @return a sun.misc.Unsafe
+ */
+ private static sun.misc.Unsafe getUnsafe() {
+ try {
+ return sun.misc.Unsafe.getUnsafe();
+ } catch (SecurityException se) {
+ try {
+ return java.security.AccessController.doPrivileged
+ (new java.security
+ .PrivilegedExceptionAction() {
+ public sun.misc.Unsafe run() throws Exception {
+ java.lang.reflect.Field f = sun.misc
+ .Unsafe.class.getDeclaredField("theUnsafe");
+ f.setAccessible(true);
+ return (sun.misc.Unsafe) f.get(null);
+ }});
+ } catch (java.security.PrivilegedActionException e) {
+ throw new RuntimeException("Could not initialize intrinsics",
+ e.getCause());
+ }
+ }
+ }
+}