ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
(Generate patch)

Comparing jsr166/src/jsr166e/StripedAdder.java (file contents):
Revision 1.1 by dl, Wed Jul 20 15:00:56 2011 UTC vs.
Revision 1.7 by dl, Tue Jul 26 18:30:35 2011 UTC

# Line 16 | Line 16 | import java.io.ObjectOutputStream;
16  
17   /**
18   * A set of variables that together maintain a sum.  When updates
19 < * (method {@link #add}) are contended across threads, the set of
20 < * adders may grow to reduce contention. Method {@link #sum} returns
21 < * the current combined total across these adders. This value is
22 < * <em>NOT</em> an atomic snapshot (concurrent updates may occur while
23 < * the sum is being calculated), and so cannot be used alone for
24 < * fine-grained synchronization control.
25 < *
19 > * (method {@link #add}) are contended across threads, this set of
20 > * adder variables may grow dynamically to reduce contention. Method
21 > * {@link #sum} returns the current combined total across these
22 > * adders. This value is <em>NOT</em> an atomic snapshot (concurrent
23 > * updates may occur while the sum is being calculated), and so cannot
24 > * be used alone for fine-grained synchronization control.
25 > *
26   * <p> This class may be applicable when many threads frequently
27   * update a common sum that is used for purposes such as collecting
28   * statistics. In this case, performance may be significantly faster
29   * than using a shared {@link AtomicLong}, at the expense of using
30 < * significantly more space.  On the other hand, if it is known that
31 < * only one thread can ever update the sum, performance may be
32 < * significantly slower than just updating a local variable.
30 > * much more space.  On the other hand, if it is known that only one
31 > * thread can ever update the sum, performance may be significantly
32 > * slower than just updating a local variable.
33 > *
34 > * <p>A StripedAdder may optionally be constructed with a given
35 > * expected contention level; i.e., the number of threads that are
36 > * expected to concurrently update the sum. Supplying an accurate
37 > * value may improve performance by reducing the need for dynamic
38 > * adjustment.
39   *
40 < * @author Doug Lea
40 > * @author Doug Lea
41   */
42   public class StripedAdder implements Serializable {
43      private static final long serialVersionUID = 7249069246863182397L;
44  
45      /*
46 <     * Overview: We maintain a table of AtomicLongs (padded to reduce
47 <     * false sharing). The table is indexed by per-thread hash codes
48 <     * that are initialized as random values.  The table doubles in
49 <     * size upon contention (as indicated by failed CASes when
50 <     * performing add()), but is capped at the nearest power of two >=
51 <     * #cpus: At that point, contention should be infrequent if each
52 <     * thread has a unique index; so we instead adjust hash codes to
53 <     * new random values upon contention rather than expanding. A
54 <     * single spinlock is used for resizing the table as well as
46 >     * A StripedAdder maintains a table of Atomic long variables. The
47 >     * table is indexed by per-thread hash codes.
48 >     *
49 >     * By default, the table is lazily initialized, to minimize
50 >     * footprint until adders are used. On first use, the table is set
51 >     * to size DEFAULT_INITIAL_SIZE (currently 8). Table size is
52 >     * bounded by the number of CPUS (if larger than the default
53 >     * size).
54 >     *
55 >     * Per-thread hash codes are initialized to random values.
56 >     * Collisions are indicated by failed CASes when performing an add
57 >     * operation (see method retryAdd). Upon a collision, if the table
58 >     * size is less than the capacity, it is doubled in size unless
59 >     * some other thread holds lock. If a hashed slot is empty, and
60 >     * lock is available, a new Adder is created. Otherwise, if the
61 >     * slot exists, a CAS is tried.  Retries proceed by "double
62 >     * hashing", using a secondary hash (Marsaglia XorShift) to try to
63 >     * find a free slot.
64 >     *
65 >     * The table size is capped because, when there are more threads
66 >     * than CPUs, supposing that each thread were bound to a CPU,
67 >     * there would exist a perfect hash function mapping threads to
68 >     * slots that eliminates collisions. When we reach capacity, we
69 >     * search for this mapping by randomly varying the hash codes of
70 >     * colliding threads.  Because search is random, and failures only
71 >     * become known via CAS failures, convergence will be slow, and
72 >     * because threads are typically not bound to CPUS forever, may
73 >     * not occur at all. However, despite these limitations, observed
74 >     * contention is typically low in these cases.
75 >     *
76 >     * Table entries are of class Adder; a form of AtomicLong padded
77 >     * to reduce cache contention on most processors. Padding is
78 >     * overkill for most Atomics because they are usually irregularly
79 >     * scattered in memory and thus don't interfere much with each
80 >     * other. But Atomic objects residing in arrays will tend to be
81 >     * placed adjacent to each other, and so will most often share
82 >     * cache lines without this precaution.  Adders are by default
83 >     * constructed upon first use, which further improves per-thread
84 >     * locality and helps reduce footprint.
85 >     *
86 >     * A single spinlock is used for resizing the table as well as
87       * populating slots with new Adders. Upon lock contention, threads
88 <     * just try other slots rather than blocking. We guarantee that at
88 >     * try other slots rather than blocking. After initialization, at
89       * least one slot exists, so retries will eventually find a
90 <     * candidate Adder.
91 <     */
92 <
55 <    /**
56 <     * Number of processors, to place a cap on table growth.
90 >     * candidate Adder.  During these retries, there is increased
91 >     * contention and reduced locality, which is still better than
92 >     * alternatives.
93       */
58    static final int NCPU = Runtime.getRuntime().availableProcessors();
94  
95      /**
96 <     * Version of AtomicLong padded to avoid sharing cache
62 <     * lines on most processors
96 >     * Padded version of AtomicLong
97       */
98      static final class Adder extends AtomicLong {
99 <        long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd;
99 >        long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
100          Adder(long x) { super(x); }
101      }
102  
103 <    /**
104 <     * Holder for the thread-local hash code.
103 >    private static final int NCPU = Runtime.getRuntime().availableProcessors();
104 >
105 >    /**
106 >     * Table bounds. DEFAULT_INITIAL_SIZE is the table size set upon
107 >     * first use under default constructor, and must be a power of
108 >     * two. There is not much point in making size a lot smaller than
109 >     * that of Adders though.  CAP is the maximum allowed table size.
110 >     */
111 >    private static final int DEFAULT_INITIAL_SIZE = 8;
112 >    private static final int CAP = Math.max(NCPU, DEFAULT_INITIAL_SIZE);
113 >
114 >    /**
115 >     * Holder for the thread-local hash code. The code is initially
116 >     * random, but may be set to a different value upon collisions.
117       */
118      static final class HashCode {
119 +        static final Random rng = new Random();
120          int code;
121 <        HashCode(int h) { code = h; }
121 >        HashCode() {
122 >            int h = rng.nextInt();
123 >            code = (h == 0) ? 1 : h; // ensure nonzero
124 >        }
125      }
126  
127      /**
128       * The corresponding ThreadLocal class
129       */
130      static final class ThreadHashCode extends ThreadLocal<HashCode> {
131 <        static final Random rng = new Random();
82 <        public HashCode initialValue() {
83 <            int h = rng.nextInt();
84 <            return new HashCode((h == 0) ? 1 : h); // ensure nonzero
85 <        }
131 >        public HashCode initialValue() { return new HashCode(); }
132      }
133  
134      /**
135       * Static per-thread hash codes. Shared across all StripedAdders
136 <     * because adjustments due to collisions in one table are likely
137 <     * to be appropriate for others.
136 >     * to reduce ThreadLocal pollution and because adjustments due to
137 >     * collisions in one table are likely to be appropriate for
138 >     * others.
139       */
140      static final ThreadHashCode threadHashCode = new ThreadHashCode();
141  
142      /**
143 <     * Table of adders. Initially of size 2; grows to be at most NCPU.
143 >     * Common placeholder for empty arrays.
144 >     */
145 >    static final Adder[] EMPTY_ARRAY = new Adder[0];
146 >
147 >    /**
148 >     * Table of adders. Size is either zero or a power of two, grows
149 >     * to be at most CAP.
150       */
151      private transient volatile Adder[] adders;
152  
153      /**
154       * Serves as a lock when resizing and/or creating Adders.  There
155 <     * is no need for a blocking lock: When busy, other threads try
156 <     * other slots.
155 >     * is no need for a blocking lock: Except during initialization
156 >     * races, when busy, other threads try other slots. However,
157 >     * during (double-checked) initializations, we use the
158 >     * "synchronized" lock on this object.
159       */
160      private final AtomicInteger mutex;
161  
162      /**
163 <     * Marsaglia XorShift for rehashing on collisions
163 >     * Creates a new adder with zero sum.
164       */
165 <    private static int xorShift(int r) {
166 <        r ^= r << 13;
167 <        r ^= r >>> 17;
168 <        return r ^ (r << 5);
165 >    public StripedAdder() {
166 >        this.adders = EMPTY_ARRAY;
167 >        this.mutex = new AtomicInteger();
168 >        // remaining initialization on first call to add.
169      }
170  
171      /**
172 <     * Creates a new adder with initially zero sum.
172 >     * Creates a new adder with zero sum, and with stripes presized
173 >     * for the given expected contention level.
174 >     *
175 >     * @param expectedContention the expected number of threads that
176 >     * will concurrently update the sum.
177       */
178 <    public StripedAdder() {
179 <        Adder[] as = new Adder[2];
180 <        as[0] = new Adder(0); // ensure at least one available adder
181 <        this.adders = as;
178 >    public StripedAdder(int expectedContention) {
179 >        if (expectedContention > 0) {
180 >            int cap = (expectedContention < CAP) ? expectedContention : CAP;
181 >            int size = 1;
182 >            while (size < cap)
183 >                size <<= 1;
184 >            Adder[] as = new Adder[size];
185 >            for (int i = 0; i < size; ++i)
186 >                as[i] = new Adder(0);
187 >            this.adders = as;
188 >        }
189 >        else
190 >            this.adders = EMPTY_ARRAY;
191          this.mutex = new AtomicInteger();
192      }
193  
# Line 129 | Line 197 | public class StripedAdder implements Ser
197       * @param x the value to add
198       */
199      public void add(long x) {
200 +        Adder[] as; Adder a; int n; long v; // locals to hold volatile reads
201          HashCode hc = threadHashCode.get();
202 <        for (int h = hc.code;;) {
203 <            Adder[] as = adders;
204 <            int n = as.length;
205 <            Adder a = as[h & (n - 1)];
206 <            if (a != null) {
207 <                long v = a.get();
208 <                if (a.compareAndSet(v, v + x))
209 <                    break;
210 <                if (n >= NCPU) {                 // Collision when table at max
211 <                    h = hc.code = xorShift(h);   // change code
212 <                    continue;
202 >        int h = hc.code;
203 >        if ((as = adders) == null || (n = as.length) < 1 ||
204 >            (a = as[(n - 1) & h]) == null ||
205 >            !a.compareAndSet(v = a.get(), v + x))
206 >            retryAdd(x, hc);
207 >    }
208 >
209 >    /**
210 >     * Handle cases of add involving initialization, resizing,
211 >     * creating new Adders, and/or contention. See above for
212 >     * explanation.
213 >     */
214 >    private void retryAdd(long x, HashCode hc) {
215 >        int h = hc.code;
216 >        final AtomicInteger mutex = this.mutex;
217 >        int collisions = 1 - mutex.get(); // first guess: collides if not locked
218 >        for (;;) {
219 >            Adder[] as; Adder a; long v; int k, n;
220 >            while ((as = adders) == null || (n = as.length) < 1) {
221 >                synchronized(mutex) {                // Try to initialize
222 >                    if (adders == as) {
223 >                        Adder[] rs = new Adder[DEFAULT_INITIAL_SIZE];
224 >                        rs[h & (DEFAULT_INITIAL_SIZE - 1)] = new Adder(0);
225 >                        adders = rs;
226 >                    }
227                  }
228 +                collisions = 0;
229              }
230 <            final AtomicInteger mutex = this.mutex;
231 <            if (mutex.get() != 0)
232 <                h = xorShift(h);                 // Try elsewhere
233 <            else if (mutex.compareAndSet(0, 1)) {
234 <                boolean created = false;
230 >
231 >            if ((a = as[k = (n - 1) & h]) == null) { // Try to add slot
232 >                if (mutex.get() == 0 && mutex.compareAndSet(0, 1)) {
233 >                    try {
234 >                        if (adders == as && as[k] == null)
235 >                            a = as[k] = new Adder(x);
236 >                    } finally {
237 >                        mutex.set(0);
238 >                    }
239 >                    if (a != null)
240 >                        break;
241 >                }
242 >                collisions = 0;
243 >            }
244 >            else if (collisions != 0 && n < CAP &&   // Try to expand table
245 >                     mutex.get() == 0 && mutex.compareAndSet(0, 1)) {
246                  try {
247 <                    Adder[] rs = adders;
248 <                    if (a != null && rs == as)   // Resize table
249 <                        rs = adders = Arrays.copyOf(as, as.length << 1);
250 <                    int j = h & (rs.length - 1);
251 <                    if (rs[j] == null) {         // Create adder
157 <                        rs[j] = new Adder(x);
158 <                        created = true;
247 >                    if (adders == as) {
248 >                        Adder[] rs = new Adder[n << 1];
249 >                        for (int i = 0; i < n; ++i)
250 >                            rs[i] = as[i];
251 >                        adders = rs;
252                      }
253                  } finally {
254                      mutex.set(0);
255                  }
256 <                if (created) {
164 <                    hc.code = h;                // Use this adder next time
165 <                    break;
166 <                }
256 >                collisions = 0;
257              }
258 +            else if (a.compareAndSet(v = a.get(), v + x))
259 +                break;
260 +            else
261 +                collisions = 1;
262 +            h ^= h << 13;                            // Rehash
263 +            h ^= h >>> 17;
264 +            h ^= h << 5;
265          }
266 +        hc.code = h;
267      }
268  
269      /**
270       * Returns an estimate of the current sum.  The result is
271       * calculated by summing multiple variables, so may not be
272       * accurate if updates occur concurrently with this method.
273 <     *
273 >     *
274       * @return the estimated sum
275       */
276      public long sum() {
277 <        long sum = 0;
277 >        long sum = 0L;
278          Adder[] as = adders;
279 <        int n = as.length;
280 <        for (int i = 0; i < n; ++i) {
281 <            Adder a = as[i];
282 <            if (a != null)
283 <                sum += a.get();
279 >        if (as != null) {
280 >            int n = as.length;
281 >            for (int i = 0; i < n; ++i) {
282 >                Adder a = as[i];
283 >                if (a != null)
284 >                    sum += a.get();
285 >            }
286          }
287          return sum;
288      }
# Line 194 | Line 294 | public class StripedAdder implements Ser
294       */
295      public void reset() {
296          Adder[] as = adders;
297 <        int n = as.length;
298 <        for (int i = 0; i < n; ++i) {
299 <            Adder a = as[i];
300 <            if (a != null)
301 <                a.set(0L);
297 >        if (as != null) {
298 >            int n = as.length;
299 >            for (int i = 0; i < n; ++i) {
300 >                Adder a = as[i];
301 >                if (a != null)
302 >                    a.set(0L);
303 >            }
304          }
305      }
306  
# Line 222 | Line 324 | public class StripedAdder implements Ser
324       * @return the estimated sum
325       */
326      public long sumAndReset() {
327 <        long sum = 0;
327 >        long sum = 0L;
328          Adder[] as = adders;
329 <        int n = as.length;
330 <        for (int i = 0; i < n; ++i) {
331 <            Adder a = as[i];
332 <            if (a != null) {
333 <                sum += a.get();
334 <                a.set(0L);
329 >        if (as != null) {
330 >            int n = as.length;
331 >            for (int i = 0; i < n; ++i) {
332 >                Adder a = as[i];
333 >                if (a != null) {
334 >                    sum += a.get();
335 >                    a.set(0L);
336 >                }
337              }
338          }
339          return sum;
# Line 244 | Line 348 | public class StripedAdder implements Ser
348      private void readObject(ObjectInputStream s)
349          throws IOException, ClassNotFoundException {
350          s.defaultReadObject();
247        long c = s.readLong();
248        Adder[] as = new Adder[2];
249        as[0] = new Adder(c);
250        this.adders = as;
351          mutex.set(0);
352 +        add(s.readLong());
353      }
354  
355   }
255
256

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines