--- jsr166/src/jsr166e/StripedAdder.java 2011/07/23 16:32:53 1.4
+++ jsr166/src/jsr166e/StripedAdder.java 2011/07/29 13:50:54 1.10
@@ -5,7 +5,6 @@
*/
package jsr166e;
-import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -27,9 +26,9 @@ import java.io.ObjectOutputStream;
* update a common sum that is used for purposes such as collecting
* statistics. In this case, performance may be significantly faster
* than using a shared {@link AtomicLong}, at the expense of using
- * much more space. On the other hand, if it is known that only one
- * thread can ever update the sum, performance may be significantly
- * slower than just updating a local variable.
+ * more space. On the other hand, if it is known that only one thread
+ * can ever update the sum, performance may be significantly slower
+ * than just updating a local variable.
*
*
A StripedAdder may optionally be constructed with a given
* expected contention level; i.e., the number of threads that are
@@ -44,13 +43,36 @@ public class StripedAdder implements Ser
/*
* A StripedAdder maintains a table of Atomic long variables. The
- * table is indexed by per-thread hash codes that are initialized
- * to random values.
+ * table is indexed by per-thread hash codes.
*
- * The table doubles in size upon contention (as indicated by
- * failed CASes when performing add()), but is capped at the
- * nearest power of two >= #CPUS. This reflects the idea that,
- * when there are more threads than CPUs, then if each thread were
+ * Table entries are of class Adder; a variant of AtomicLong
+ * padded to reduce cache contention on most processors. Padding
+ * is overkill for most Atomics because they are usually
+ * irregularly scattered in memory and thus don't interfere much
+ * with each other. But Atomic objects residing in arrays will
+ * tend to be placed adjacent to each other, and so will most
+ * often share cache lines (with a huge negative performance
+ * impact) without this precaution.
+ *
+ * Because Adders are relatively large, we avoid creating them
+ * until they are needed. On the other hand, we try to create them
+ * on any sign of contention.
+ *
+ * Per-thread hash codes are initialized to random values.
+ * Collisions are indicated by failed CASes when performing an add
+ * operation (see method retryAdd). Upon a collision, if the table
+ * size is less than the capacity, it is doubled in size unless
+ * some other thread holds lock. If a hashed slot is empty, and
+ * lock is available, a new Adder is created. Otherwise, if the
+ * slot exists, a CAS is tried. Retries proceed by "double
+ * hashing", using a secondary hash (Marsaglia XorShift) to try to
+ * find a free slot.
+ *
+ * By default, the table is lazily initialized. Upon first use,
+ * the table is set to size 1, and contains a single Adder. The
+ * maximum table size is bounded by nearest power of two >= the
+ * number of CPUS. The table size is capped because, when there
+ * are more threads than CPUs, supposing that each thread were
* bound to a CPU, there would exist a perfect hash function
* mapping threads to slots that eliminates collisions. When we
* reach capacity, we search for this mapping by randomly varying
@@ -61,84 +83,72 @@ public class StripedAdder implements Ser
* limitations, observed contention is typically low in these
* cases.
*
- * Table entries are of class Adder; a form of AtomicLong padded
- * to reduce cache contention on most processors. Padding is
- * overkill for most Atomics because they are most often
- * irregularly scattered in memory and thus don't interfere much
- * with each other. But Atomic objects residing in arrays will
- * tend to be placed adjacent to each other, and so will most
- * often share cache lines without this precaution. Adders are
- * constructed upon first use, which further improves per-thread
- * locality and helps reduce (an already large) footprint.
- *
* A single spinlock is used for resizing the table as well as
- * populating slots with new Adders. Upon lock contention, threads
+ * populating slots with new Adders. After initialization, there
+ * is no need for a blocking lock: Upon lock contention, threads
* try other slots rather than blocking. After initialization, at
* least one slot exists, so retries will eventually find a
- * candidate Adder. During these retries, there is increased
+ * candidate Adder. During these retries, there is increased
* contention and reduced locality, which is still better than
* alternatives.
*/
- /**
- * Number of processors, to place a cap on table growth.
- */
- static final int NCPU = Runtime.getRuntime().availableProcessors();
+ private static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
- * Padded version of AtomicLong
+ * Padded variant of AtomicLong. The value field is placed
+ * between pads, hoping that the JVM doesn't reorder them.
+ * Updates are via inlined CAS in methods add and retryAdd.
*/
- static final class Adder extends AtomicLong {
- long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd;
- Adder(long x) { super(x); }
+ static final class Adder {
+ volatile long p0, p1, p2, p3, p4, p5, p6;
+ volatile long value;
+ volatile long q0, q1, q2, q3, q4, q5, q6;
+ Adder(long x) { value = x; }
}
/**
- * Holder for the thread-local hash code. The code starts off with
- * a given random value, but may be set to a different value upon
- * collisions in retryAdd.
+ * Holder for the thread-local hash code. The code is initially
+ * random, but may be set to a different value upon collisions.
*/
static final class HashCode {
+ static final Random rng = new Random();
int code;
- HashCode(int h) { code = h; }
+ HashCode() {
+ int h = rng.nextInt(); // Avoid zero, because of xorShift rehash
+ code = (h == 0) ? 1 : h;
+ }
}
/**
* The corresponding ThreadLocal class
*/
static final class ThreadHashCode extends ThreadLocal {
- static final Random rng = new Random();
- public HashCode initialValue() {
- int h = rng.nextInt();
- return new HashCode((h == 0) ? 1 : h); // ensure nonzero
- }
+ public HashCode initialValue() { return new HashCode(); }
}
/**
* Static per-thread hash codes. Shared across all StripedAdders
- * because adjustments due to collisions in one table are likely
- * to be appropriate for others.
+ * to reduce ThreadLocal pollution and because adjustments due to
+ * collisions in one table are likely to be appropriate for
+ * others.
*/
static final ThreadHashCode threadHashCode = new ThreadHashCode();
/**
- * Table of adders. Minimum size 2. Size grows to be at most NCPU.
+ * Table of adders. When non-null, size is a power of 2.
*/
private transient volatile Adder[] adders;
/**
- * Serves as a lock when resizing and/or creating Adders. There
- * is no need for a blocking lock: When busy, other threads try
- * other slots.
+ * Spinlock (locked via CAS) used when resizing and/or creating Adders.
*/
- private final AtomicInteger mutex;
+ private volatile int busy;
/**
* Creates a new adder with zero sum.
*/
public StripedAdder() {
- this.mutex = new AtomicInteger();
- // remaining initialization on first call to add.
}
/**
@@ -150,14 +160,13 @@ public class StripedAdder implements Ser
*/
public StripedAdder(int expectedContention) {
int cap = (expectedContention < NCPU) ? expectedContention : NCPU;
- int size = 2;
+ int size = 1;
while (size < cap)
size <<= 1;
Adder[] as = new Adder[size];
for (int i = 0; i < size; ++i)
as[i] = new Adder(0);
this.adders = as;
- this.mutex = new AtomicInteger();
}
/**
@@ -166,60 +175,128 @@ public class StripedAdder implements Ser
* @param x the value to add
*/
public void add(long x) {
- Adder[] as; Adder a; int n; long v; // locals to hold volatile reads
+ Adder[] as; Adder a; int n; // locals to hold volatile reads
HashCode hc = threadHashCode.get();
- if ((as = adders) == null || (n = as.length) < 1 ||
- (a = as[hc.code & (n - 1)]) == null ||
- !a.compareAndSet(v = a.get(), v + x))
- retryAdd(x, hc);
+ int h = hc.code;
+ boolean contended;
+ if ((as = adders) != null && (n = as.length) > 0 &&
+ (a = as[(n - 1) & h]) != null) {
+ long v = a.value;
+ if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
+ return;
+ contended = true;
+ }
+ else
+ contended = false;
+ retryAdd(x, hc, contended);
}
/**
* Handle cases of add involving initialization, resizing,
- * creating new Adders, and/or contention.
+ * creating new Adders, and/or contention. See above for
+ * explanation. This method suffers the usual non-modularity
+ * problems of optimistic retry code, relying on rechecked sets of
+ * reads.
+ *
+ * @param x the value to add
+ * @param hc the hash code holder
+ * @param precontended true if CAS failed before call
*/
- private void retryAdd(long x, HashCode hc) {
+ private void retryAdd(long x, HashCode hc, boolean precontended) {
int h = hc.code;
- final AtomicInteger mutex = this.mutex;
- AtomicInteger lock = null; // nonnull when held
- try {
- for (;;) {
- Adder[] as; Adder a; long v; int n, k; // locals for volatiles
- boolean needLock = true;
- if ((as = adders) == null || (n = as.length) < 1) {
- if (lock != null) // default-initialize
- adders = new Adder[2];
+ boolean collide = false; // true if last slot nonempty
+ for (;;) {
+ Adder[] as; Adder a; int n;
+ if ((as = adders) != null && (n = as.length) > 0) {
+ if ((a = as[(n - 1) & h]) == null) {
+ if (busy == 0 && // Try to attach new Adder
+ UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
+ boolean created = false;
+ try { // Recheck under lock
+ Adder[] rs; int m, j;
+ if ((rs = adders) != null && (m = rs.length) > 0 &&
+ rs[j = (m - 1) & h] == null) {
+ rs[j] = new Adder(x);
+ created = true;
+ }
+ } finally {
+ busy = 0;
+ }
+ if (created)
+ break;
+ continue; // Slot is now non-empty
+ }
+ collide = false;
}
- else if ((a = as[k = h & (n - 1)]) == null) {
- if (lock != null) { // attach new adder
- as[k] = new Adder(x);
+ else if (precontended) // CAS already known to fail
+ precontended = false; // Continue after rehash
+ else {
+ long v = a.value;
+ if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
break;
+ if (!collide)
+ collide = true;
+ else if (n >= NCPU || adders != as)
+ collide = false; // Don't expand
+ else if (busy == 0 &&
+ UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
+ collide = false;
+ try {
+ if (adders == as) { // Expand table
+ Adder[] rs = new Adder[n << 1];
+ for (int i = 0; i < n; ++i)
+ rs[i] = as[i];
+ adders = rs;
+ }
+ } finally {
+ busy = 0;
+ }
+ continue;
}
}
- else if (a.compareAndSet(v = a.get(), v + x))
- break;
- else if (n >= NCPU) // cannot expand
- needLock = false;
- else if (lock != null) // expand table
- adders = Arrays.copyOf(as, n << 1);
-
- if (lock == null) {
- if (needLock && mutex.get() == 0 &&
- mutex.compareAndSet(0, 1))
- lock = mutex;
- else { // try elsewhere
- h ^= h << 13; // Marsaglia XorShift
- h ^= h >>> 17;
- h ^= h << 5;
+ h ^= h << 13; // Rehash
+ h ^= h >>> 17;
+ h ^= h << 5;
+ }
+ else if (adders == as) { // Try to default-initialize
+ Adder[] rs = new Adder[1];
+ rs[0] = new Adder(x);
+ boolean init = false;
+ while (adders == as) {
+ if (UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
+ try {
+ if (adders == as) {
+ adders = rs;
+ init = true;
+ }
+ } finally {
+ busy = 0;
+ }
+ break;
}
+ if (adders != as)
+ break;
+ Thread.yield(); // Back off
}
+ if (init)
+ break;
}
- } finally {
- if (lock != null)
- lock.set(0);
}
- if (hc.code != h) // avoid unneeded writes
- hc.code = h;
+ hc.code = h; // Record index for next time
+ }
+
+ /**
+ * Equivalent to {@code add(1)}.
+ */
+ public void increment() {
+ add(1L);
+ }
+
+ /**
+ * Equivalent to {@code add(-1)}.
+ */
+ public void decrement() {
+ add(-1L);
}
/**
@@ -237,49 +314,20 @@ public class StripedAdder implements Ser
for (int i = 0; i < n; ++i) {
Adder a = as[i];
if (a != null)
- sum += a.get();
+ sum += a.value;
}
}
return sum;
}
/**
- * Resets each of the variables to zero. This is effective in
- * fully resetting the sum only if there are no concurrent
- * updates.
- */
- public void reset() {
- Adder[] as = adders;
- if (as != null) {
- int n = as.length;
- for (int i = 0; i < n; ++i) {
- Adder a = as[i];
- if (a != null)
- a.set(0L);
- }
- }
- }
-
- /**
- * Equivalent to {@code add(1)}.
- */
- public void increment() {
- add(1L);
- }
-
- /**
- * Equivalent to {@code add(-1)}.
- */
- public void decrement() {
- add(-1L);
- }
-
- /**
- * Equivalent to {@link #sum} followed by {@link #reset}.
+ * Resets each of the variables to zero, returning the estimated
+ * previous sum. This is effective in fully resetting the sum only
+ * if there are no concurrent updates.
*
- * @return the estimated sum
+ * @return the estimated previous sum
*/
- public long sumAndReset() {
+ public long reset() {
long sum = 0L;
Adder[] as = adders;
if (as != null) {
@@ -287,8 +335,8 @@ public class StripedAdder implements Ser
for (int i = 0; i < n; ++i) {
Adder a = as[i];
if (a != null) {
- sum += a.get();
- a.set(0L);
+ sum += a.value;
+ a.value = 0L;
}
}
}
@@ -304,8 +352,54 @@ public class StripedAdder implements Ser
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
- mutex.set(0);
+ busy = 0;
add(s.readLong());
}
+ // Unsafe mechanics
+ private static final sun.misc.Unsafe UNSAFE;
+ private static final long busyOffset;
+ private static final long valueOffset;
+ static {
+ try {
+ UNSAFE = getUnsafe();
+ Class> sk = StripedAdder.class;
+ busyOffset = UNSAFE.objectFieldOffset
+ (sk.getDeclaredField("busy"));
+ Class> ak = Adder.class;
+ valueOffset = UNSAFE.objectFieldOffset
+ (ak.getDeclaredField("value"));
+ } catch (Exception e) {
+ throw new Error(e);
+ }
+ }
+
+ /**
+ * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
+ * Replace with a simple call to Unsafe.getUnsafe when integrating
+ * into a jdk.
+ *
+ * @return a sun.misc.Unsafe
+ */
+ private static sun.misc.Unsafe getUnsafe() {
+ try {
+ return sun.misc.Unsafe.getUnsafe();
+ } catch (SecurityException se) {
+ try {
+ return java.security.AccessController.doPrivileged
+ (new java.security
+ .PrivilegedExceptionAction() {
+ public sun.misc.Unsafe run() throws Exception {
+ java.lang.reflect.Field f = sun.misc
+ .Unsafe.class.getDeclaredField("theUnsafe");
+ f.setAccessible(true);
+ return (sun.misc.Unsafe) f.get(null);
+ }});
+ } catch (java.security.PrivilegedActionException e) {
+ throw new RuntimeException("Could not initialize intrinsics",
+ e.getCause());
+ }
+ }
+ }
+
}