--- jsr166/src/jsr166e/StripedAdder.java 2011/07/20 16:06:19 1.2 +++ jsr166/src/jsr166e/StripedAdder.java 2011/07/22 13:25:12 1.3 @@ -16,12 +16,12 @@ import java.io.ObjectOutputStream; /** * A set of variables that together maintain a sum. When updates - * (method {@link #add}) are contended across threads, the set of - * adders may grow to reduce contention. Method {@link #sum} returns - * the current combined total across these adders. This value is - * NOT an atomic snapshot (concurrent updates may occur while - * the sum is being calculated), and so cannot be used alone for - * fine-grained synchronization control. + * (method {@link #add}) are contended across threads, this set of + * adder variables may grow dynamically to reduce contention. Method + * {@link #sum} returns the current combined total across these + * adders. This value is NOT an atomic snapshot (concurrent + * updates may occur while the sum is being calculated), and so cannot + * be used alone for fine-grained synchronization control. * *

This class may be applicable when many threads frequently * update a common sum that is used for purposes such as collecting @@ -31,25 +31,54 @@ import java.io.ObjectOutputStream; * only one thread can ever update the sum, performance may be * significantly slower than just updating a local variable. * + *

A StripedAdder may optionally be constructed with a given + * expected contention level; i.e., the number of threads that are + * expected to concurrently update the sum. Supplying an accurate + * value may improve performance by reducing the need for dynamic + * adjustment. + * * @author Doug Lea */ public class StripedAdder implements Serializable { private static final long serialVersionUID = 7249069246863182397L; /* - * Overview: We maintain a table of AtomicLongs (padded to reduce - * false sharing). The table is indexed by per-thread hash codes - * that are initialized as random values. The table doubles in - * size upon contention (as indicated by failed CASes when - * performing add()), but is capped at the nearest power of two >= - * #cpus: At that point, contention should be infrequent if each - * thread has a unique index; so we instead adjust hash codes to - * new random values upon contention rather than expanding. A - * single spinlock is used for resizing the table as well as + * Overview: We maintain a table of Atomic long variables. The + * table is indexed by per-thread hash codes that are initialized + * to random values. + * + * The table doubles in size upon contention (as indicated by + * failed CASes when performing add()), but is capped at the + * nearest power of two >= #CPUS. This reflects the idea that, + * when there are more threads than CPUs, then if each thread were + * bound to a CPU, there would exist a perfect hash function + * mapping threads to slots that eliminates collisions. When we + * reach capacity, we search for this mapping by randomly varying + * the hash codes of colliding threads. Because search is random, + * and failures only become known via CAS failures, convergence + * will be slow, and because threads are typically not bound to + * CPUS forever, may not occur at all. However, despite these + * limitations, observed contention is typically very low in these + * cases. + * + * Table entries are of class Adder; a form of AtomicLong padded + * to reduce cache contention on most processors. Padding is + * overkill for most Atomics because they are most often + * irregularly scattered in memory and thus don't interfere much + * with each other. But Atomic objects residing in arrays will + * tend to be placed adjacent to each other, and so will most + * often share cache lines without this precaution. Except for + * slot adders[0], Adders are constructed upon first use, which + * further improves per-thread locality and helps reduce (an + * already large) footprint. + * + * A single spinlock is used for resizing the table as well as * populating slots with new Adders. Upon lock contention, threads - * just try other slots rather than blocking. We guarantee that at - * least one slot exists, so retries will eventually find a - * candidate Adder. + * try other slots rather than blocking. We guarantee that at + * least one slot (0) exists, so retries will eventually find a + * candidate Adder. During these retries, there is increased + * contention and reduced locality, which is still better than + * alternatives. */ /** @@ -58,8 +87,7 @@ public class StripedAdder implements Ser static final int NCPU = Runtime.getRuntime().availableProcessors(); /** - * Version of AtomicLong padded to avoid sharing cache - * lines on most processors + * Padded version of AtomicLong */ static final class Adder extends AtomicLong { long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd; @@ -67,7 +95,10 @@ public class StripedAdder implements Ser } /** - * Holder for the thread-local hash code. + * Holder for the thread-local hash code. The code starts off with + * a given random value, but may be set to a different + * pseudo-random value (using a cheaper but adequate xorshift + * generator) upon collisions. */ static final class HashCode { int code; @@ -93,7 +124,7 @@ public class StripedAdder implements Ser static final ThreadHashCode threadHashCode = new ThreadHashCode(); /** - * Table of adders. Initially of size 2; grows to be at most NCPU. + * Table of adders. Minimum size 2. Size grows to be at most NCPU. */ private transient volatile Adder[] adders; @@ -105,7 +136,7 @@ public class StripedAdder implements Ser private final AtomicInteger mutex; /** - * Marsaglia XorShift for rehashing on collisions + * Marsaglia XorShift random generator for rehashing on collisions */ private static int xorShift(int r) { r ^= r << 13; @@ -114,10 +145,25 @@ public class StripedAdder implements Ser } /** - * Creates a new adder with initially zero sum. + * Creates a new adder with zero sum. */ public StripedAdder() { - Adder[] as = new Adder[2]; + this(2); + } + + /** + * Creates a new adder with zero sum, and with stripes presized + * for the given expected contention level. + * + * @param expectedContention the expected number of threads that + * will concurrently update the sum. + */ + public StripedAdder(int expectedContention) { + int cap = (expectedContention < NCPU) ? expectedContention : NCPU; + int size = 2; + while (size < cap) + size <<= 1; + Adder[] as = new Adder[size]; as[0] = new Adder(0); // ensure at least one available adder this.adders = as; this.mutex = new AtomicInteger(); @@ -161,7 +207,7 @@ public class StripedAdder implements Ser mutex.set(0); } if (created) { - hc.code = h; // Use this adder next time + hc.code = h; // Use this adder next time break; } }