ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
Revision: 1.10
Committed: Fri Jul 29 13:50:54 2011 UTC (12 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.9: +74 -66 lines
Log Message:
Reduce expansion under low contention

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package jsr166e;
8     import java.util.Random;
9     import java.util.concurrent.atomic.AtomicInteger;
10     import java.util.concurrent.atomic.AtomicLong;
11     import java.io.IOException;
12     import java.io.Serializable;
13     import java.io.ObjectInputStream;
14     import java.io.ObjectOutputStream;
15    
16     /**
17     * A set of variables that together maintain a sum. When updates
18 dl 1.3 * (method {@link #add}) are contended across threads, this set of
19     * adder variables may grow dynamically to reduce contention. Method
20     * {@link #sum} returns the current combined total across these
21     * adders. This value is <em>NOT</em> an atomic snapshot (concurrent
22     * updates may occur while the sum is being calculated), and so cannot
23     * be used alone for fine-grained synchronization control.
24 jsr166 1.2 *
25 dl 1.1 * <p> This class may be applicable when many threads frequently
26     * update a common sum that is used for purposes such as collecting
27     * statistics. In this case, performance may be significantly faster
28     * than using a shared {@link AtomicLong}, at the expense of using
29 dl 1.10 * more space. On the other hand, if it is known that only one thread
30     * can ever update the sum, performance may be significantly slower
31     * than just updating a local variable.
32 dl 1.1 *
33 dl 1.3 * <p>A StripedAdder may optionally be constructed with a given
34     * expected contention level; i.e., the number of threads that are
35     * expected to concurrently update the sum. Supplying an accurate
36     * value may improve performance by reducing the need for dynamic
37     * adjustment.
38     *
39 jsr166 1.2 * @author Doug Lea
40 dl 1.1 */
41     public class StripedAdder implements Serializable {
42     private static final long serialVersionUID = 7249069246863182397L;
43    
44     /*
45 dl 1.4 * A StripedAdder maintains a table of Atomic long variables. The
46 dl 1.6 * table is indexed by per-thread hash codes.
47 dl 1.3 *
48 dl 1.8 * Table entries are of class Adder; a variant of AtomicLong
49     * padded to reduce cache contention on most processors. Padding
50     * is overkill for most Atomics because they are usually
51     * irregularly scattered in memory and thus don't interfere much
52     * with each other. But Atomic objects residing in arrays will
53     * tend to be placed adjacent to each other, and so will most
54     * often share cache lines (with a huge negative performance
55     * impact) without this precaution.
56     *
57     * Because Adders are relatively large, we avoid creating them
58     * until they are needed. On the other hand, we try to create them
59     * on any sign of contention.
60 dl 1.6 *
61     * Per-thread hash codes are initialized to random values.
62     * Collisions are indicated by failed CASes when performing an add
63     * operation (see method retryAdd). Upon a collision, if the table
64     * size is less than the capacity, it is doubled in size unless
65     * some other thread holds lock. If a hashed slot is empty, and
66     * lock is available, a new Adder is created. Otherwise, if the
67     * slot exists, a CAS is tried. Retries proceed by "double
68     * hashing", using a secondary hash (Marsaglia XorShift) to try to
69     * find a free slot.
70     *
71 dl 1.8 * By default, the table is lazily initialized. Upon first use,
72 dl 1.10 * the table is set to size 1, and contains a single Adder. The
73     * maximum table size is bounded by nearest power of two >= the
74     * number of CPUS. The table size is capped because, when there
75     * are more threads than CPUs, supposing that each thread were
76     * bound to a CPU, there would exist a perfect hash function
77     * mapping threads to slots that eliminates collisions. When we
78     * reach capacity, we search for this mapping by randomly varying
79     * the hash codes of colliding threads. Because search is random,
80     * and failures only become known via CAS failures, convergence
81     * will be slow, and because threads are typically not bound to
82     * CPUS forever, may not occur at all. However, despite these
83     * limitations, observed contention is typically low in these
84     * cases.
85 dl 1.3 *
86     * A single spinlock is used for resizing the table as well as
87 dl 1.8 * populating slots with new Adders. After initialization, there
88     * is no need for a blocking lock: Upon lock contention, threads
89 dl 1.4 * try other slots rather than blocking. After initialization, at
90     * least one slot exists, so retries will eventually find a
91 dl 1.6 * candidate Adder. During these retries, there is increased
92 dl 1.3 * contention and reduced locality, which is still better than
93     * alternatives.
94 dl 1.1 */
95    
96 dl 1.6 private static final int NCPU = Runtime.getRuntime().availableProcessors();
97 dl 1.5
98     /**
99 dl 1.8 * Padded variant of AtomicLong. The value field is placed
100     * between pads, hoping that the JVM doesn't reorder them.
101     * Updates are via inlined CAS in methods add and retryAdd.
102     */
103     static final class Adder {
104     volatile long p0, p1, p2, p3, p4, p5, p6;
105     volatile long value;
106     volatile long q0, q1, q2, q3, q4, q5, q6;
107     Adder(long x) { value = x; }
108     }
109 dl 1.1
110 jsr166 1.2 /**
111 dl 1.6 * Holder for the thread-local hash code. The code is initially
112     * random, but may be set to a different value upon collisions.
113 dl 1.1 */
114     static final class HashCode {
115 dl 1.6 static final Random rng = new Random();
116 dl 1.1 int code;
117 dl 1.6 HashCode() {
118 dl 1.8 int h = rng.nextInt(); // Avoid zero, because of xorShift rehash
119     code = (h == 0) ? 1 : h;
120 dl 1.6 }
121 dl 1.1 }
122    
123     /**
124     * The corresponding ThreadLocal class
125     */
126     static final class ThreadHashCode extends ThreadLocal<HashCode> {
127 dl 1.6 public HashCode initialValue() { return new HashCode(); }
128 dl 1.1 }
129    
130     /**
131     * Static per-thread hash codes. Shared across all StripedAdders
132 dl 1.6 * to reduce ThreadLocal pollution and because adjustments due to
133     * collisions in one table are likely to be appropriate for
134     * others.
135 dl 1.1 */
136     static final ThreadHashCode threadHashCode = new ThreadHashCode();
137    
138     /**
139 dl 1.10 * Table of adders. When non-null, size is a power of 2.
140 dl 1.1 */
141     private transient volatile Adder[] adders;
142    
143     /**
144 dl 1.8 * Spinlock (locked via CAS) used when resizing and/or creating Adders.
145 dl 1.1 */
146 dl 1.8 private volatile int busy;
147 dl 1.1
148     /**
149 dl 1.3 * Creates a new adder with zero sum.
150 dl 1.1 */
151     public StripedAdder() {
152 dl 1.3 }
153    
154     /**
155     * Creates a new adder with zero sum, and with stripes presized
156     * for the given expected contention level.
157     *
158     * @param expectedContention the expected number of threads that
159     * will concurrently update the sum.
160     */
161     public StripedAdder(int expectedContention) {
162 dl 1.8 int cap = (expectedContention < NCPU) ? expectedContention : NCPU;
163 dl 1.10 int size = 1;
164 dl 1.8 while (size < cap)
165     size <<= 1;
166     Adder[] as = new Adder[size];
167     for (int i = 0; i < size; ++i)
168     as[i] = new Adder(0);
169     this.adders = as;
170 dl 1.1 }
171    
172     /**
173     * Adds the given value.
174     *
175     * @param x the value to add
176     */
177     public void add(long x) {
178 dl 1.8 Adder[] as; Adder a; int n; // locals to hold volatile reads
179 dl 1.1 HashCode hc = threadHashCode.get();
180 dl 1.6 int h = hc.code;
181 dl 1.10 boolean contended;
182 dl 1.8 if ((as = adders) != null && (n = as.length) > 0 &&
183     (a = as[(n - 1) & h]) != null) {
184     long v = a.value;
185     if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
186     return;
187 dl 1.10 contended = true;
188 dl 1.8 }
189     else
190 dl 1.10 contended = false;
191     retryAdd(x, hc, contended);
192 dl 1.4 }
193    
194     /**
195     * Handle cases of add involving initialization, resizing,
196 dl 1.6 * creating new Adders, and/or contention. See above for
197 dl 1.8 * explanation. This method suffers the usual non-modularity
198     * problems of optimistic retry code, relying on rechecked sets of
199     * reads.
200 dl 1.10 *
201     * @param x the value to add
202     * @param hc the hash code holder
203     * @param precontended true if CAS failed before call
204 dl 1.4 */
205 dl 1.10 private void retryAdd(long x, HashCode hc, boolean precontended) {
206 dl 1.4 int h = hc.code;
207 dl 1.10 boolean collide = false; // true if last slot nonempty
208 dl 1.6 for (;;) {
209 dl 1.8 Adder[] as; Adder a; int n;
210     if ((as = adders) != null && (n = as.length) > 0) {
211 dl 1.10 if ((a = as[(n - 1) & h]) == null) {
212     if (busy == 0 && // Try to attach new Adder
213 dl 1.8 UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
214     boolean created = false;
215     try { // Recheck under lock
216     Adder[] rs; int m, j;
217     if ((rs = adders) != null && (m = rs.length) > 0 &&
218     rs[j = (m - 1) & h] == null) {
219     rs[j] = new Adder(x);
220     created = true;
221     }
222     } finally {
223     busy = 0;
224     }
225     if (created)
226     break;
227     continue; // Slot is now non-empty
228 dl 1.6 }
229 dl 1.8 collide = false;
230 dl 1.6 }
231 dl 1.10 else if (precontended) // CAS already known to fail
232     precontended = false; // Continue after rehash
233     else {
234     long v = a.value;
235     if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
236     break;
237     if (!collide)
238     collide = true;
239     else if (n >= NCPU || adders != as)
240     collide = false; // Don't expand
241     else if (busy == 0 &&
242     UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
243     collide = false;
244     try {
245     if (adders == as) { // Expand table
246     Adder[] rs = new Adder[n << 1];
247     for (int i = 0; i < n; ++i)
248     rs[i] = as[i];
249     adders = rs;
250     }
251     } finally {
252     busy = 0;
253     }
254     continue;
255     }
256     }
257 dl 1.8 h ^= h << 13; // Rehash
258     h ^= h >>> 17;
259     h ^= h << 5;
260 dl 1.6 }
261 dl 1.10 else if (adders == as) { // Try to default-initialize
262     Adder[] rs = new Adder[1];
263     rs[0] = new Adder(x);
264     boolean init = false;
265     while (adders == as) {
266     if (UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
267     try {
268     if (adders == as) {
269     adders = rs;
270     init = true;
271     }
272     } finally {
273     busy = 0;
274 dl 1.8 }
275 dl 1.10 break;
276 dl 1.5 }
277 dl 1.10 if (adders != as)
278 dl 1.6 break;
279 dl 1.10 Thread.yield(); // Back off
280 dl 1.1 }
281 dl 1.10 if (init)
282     break;
283 dl 1.5 }
284 dl 1.1 }
285 dl 1.8 hc.code = h; // Record index for next time
286 dl 1.1 }
287    
288     /**
289 dl 1.9 * Equivalent to {@code add(1)}.
290     */
291     public void increment() {
292     add(1L);
293     }
294    
295     /**
296     * Equivalent to {@code add(-1)}.
297     */
298     public void decrement() {
299     add(-1L);
300     }
301    
302     /**
303 dl 1.1 * Returns an estimate of the current sum. The result is
304     * calculated by summing multiple variables, so may not be
305     * accurate if updates occur concurrently with this method.
306 jsr166 1.2 *
307 dl 1.1 * @return the estimated sum
308     */
309     public long sum() {
310 dl 1.4 long sum = 0L;
311 dl 1.1 Adder[] as = adders;
312 dl 1.4 if (as != null) {
313     int n = as.length;
314     for (int i = 0; i < n; ++i) {
315     Adder a = as[i];
316     if (a != null)
317 dl 1.8 sum += a.value;
318 dl 1.4 }
319 dl 1.1 }
320     return sum;
321     }
322    
323     /**
324 dl 1.9 * Resets each of the variables to zero, returning the estimated
325     * previous sum. This is effective in fully resetting the sum only
326     * if there are no concurrent updates.
327 dl 1.1 *
328 dl 1.9 * @return the estimated previous sum
329 dl 1.1 */
330 dl 1.9 public long reset() {
331 dl 1.4 long sum = 0L;
332 dl 1.1 Adder[] as = adders;
333 dl 1.4 if (as != null) {
334     int n = as.length;
335     for (int i = 0; i < n; ++i) {
336     Adder a = as[i];
337     if (a != null) {
338 dl 1.8 sum += a.value;
339     a.value = 0L;
340 dl 1.4 }
341 dl 1.1 }
342     }
343     return sum;
344     }
345    
346     private void writeObject(java.io.ObjectOutputStream s)
347     throws java.io.IOException {
348     s.defaultWriteObject();
349     s.writeLong(sum());
350     }
351    
352     private void readObject(ObjectInputStream s)
353     throws IOException, ClassNotFoundException {
354     s.defaultReadObject();
355 dl 1.8 busy = 0;
356 dl 1.4 add(s.readLong());
357 dl 1.1 }
358    
359 dl 1.8 // Unsafe mechanics
360     private static final sun.misc.Unsafe UNSAFE;
361     private static final long busyOffset;
362     private static final long valueOffset;
363     static {
364     try {
365     UNSAFE = getUnsafe();
366     Class<?> sk = StripedAdder.class;
367     busyOffset = UNSAFE.objectFieldOffset
368     (sk.getDeclaredField("busy"));
369     Class<?> ak = Adder.class;
370     valueOffset = UNSAFE.objectFieldOffset
371     (ak.getDeclaredField("value"));
372     } catch (Exception e) {
373     throw new Error(e);
374     }
375     }
376    
377     /**
378     * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
379     * Replace with a simple call to Unsafe.getUnsafe when integrating
380     * into a jdk.
381     *
382     * @return a sun.misc.Unsafe
383     */
384     private static sun.misc.Unsafe getUnsafe() {
385     try {
386     return sun.misc.Unsafe.getUnsafe();
387     } catch (SecurityException se) {
388     try {
389     return java.security.AccessController.doPrivileged
390     (new java.security
391     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
392     public sun.misc.Unsafe run() throws Exception {
393     java.lang.reflect.Field f = sun.misc
394     .Unsafe.class.getDeclaredField("theUnsafe");
395     f.setAccessible(true);
396     return (sun.misc.Unsafe) f.get(null);
397     }});
398     } catch (java.security.PrivilegedActionException e) {
399     throw new RuntimeException("Could not initialize intrinsics",
400     e.getCause());
401     }
402     }
403     }
404    
405 dl 1.1 }