ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
Revision: 1.11
Committed: Fri Jul 29 14:23:35 2011 UTC (12 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.10: +19 -15 lines
Log Message:
Reduce contention

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package jsr166e;
8     import java.util.Random;
9     import java.util.concurrent.atomic.AtomicInteger;
10     import java.util.concurrent.atomic.AtomicLong;
11     import java.io.IOException;
12     import java.io.Serializable;
13     import java.io.ObjectInputStream;
14     import java.io.ObjectOutputStream;
15    
16     /**
17     * A set of variables that together maintain a sum. When updates
18 dl 1.3 * (method {@link #add}) are contended across threads, this set of
19     * adder variables may grow dynamically to reduce contention. Method
20     * {@link #sum} returns the current combined total across these
21     * adders. This value is <em>NOT</em> an atomic snapshot (concurrent
22     * updates may occur while the sum is being calculated), and so cannot
23     * be used alone for fine-grained synchronization control.
24 jsr166 1.2 *
25 dl 1.1 * <p> This class may be applicable when many threads frequently
26     * update a common sum that is used for purposes such as collecting
27     * statistics. In this case, performance may be significantly faster
28     * than using a shared {@link AtomicLong}, at the expense of using
29 dl 1.10 * more space. On the other hand, if it is known that only one thread
30     * can ever update the sum, performance may be significantly slower
31     * than just updating a local variable.
32 dl 1.1 *
33 dl 1.3 * <p>A StripedAdder may optionally be constructed with a given
34     * expected contention level; i.e., the number of threads that are
35     * expected to concurrently update the sum. Supplying an accurate
36     * value may improve performance by reducing the need for dynamic
37     * adjustment.
38     *
39 jsr166 1.2 * @author Doug Lea
40 dl 1.1 */
41     public class StripedAdder implements Serializable {
42     private static final long serialVersionUID = 7249069246863182397L;
43    
44     /*
45 dl 1.4 * A StripedAdder maintains a table of Atomic long variables. The
46 dl 1.6 * table is indexed by per-thread hash codes.
47 dl 1.3 *
48 dl 1.8 * Table entries are of class Adder; a variant of AtomicLong
49     * padded to reduce cache contention on most processors. Padding
50     * is overkill for most Atomics because they are usually
51     * irregularly scattered in memory and thus don't interfere much
52     * with each other. But Atomic objects residing in arrays will
53     * tend to be placed adjacent to each other, and so will most
54     * often share cache lines (with a huge negative performance
55     * impact) without this precaution.
56     *
57     * Because Adders are relatively large, we avoid creating them
58     * until they are needed. On the other hand, we try to create them
59     * on any sign of contention.
60 dl 1.6 *
61     * Per-thread hash codes are initialized to random values.
62     * Collisions are indicated by failed CASes when performing an add
63     * operation (see method retryAdd). Upon a collision, if the table
64     * size is less than the capacity, it is doubled in size unless
65     * some other thread holds lock. If a hashed slot is empty, and
66     * lock is available, a new Adder is created. Otherwise, if the
67     * slot exists, a CAS is tried. Retries proceed by "double
68     * hashing", using a secondary hash (Marsaglia XorShift) to try to
69     * find a free slot.
70     *
71 dl 1.8 * By default, the table is lazily initialized. Upon first use,
72 dl 1.10 * the table is set to size 1, and contains a single Adder. The
73     * maximum table size is bounded by nearest power of two >= the
74     * number of CPUS. The table size is capped because, when there
75     * are more threads than CPUs, supposing that each thread were
76     * bound to a CPU, there would exist a perfect hash function
77     * mapping threads to slots that eliminates collisions. When we
78     * reach capacity, we search for this mapping by randomly varying
79     * the hash codes of colliding threads. Because search is random,
80     * and failures only become known via CAS failures, convergence
81     * will be slow, and because threads are typically not bound to
82     * CPUS forever, may not occur at all. However, despite these
83     * limitations, observed contention is typically low in these
84     * cases.
85 dl 1.3 *
86     * A single spinlock is used for resizing the table as well as
87 dl 1.8 * populating slots with new Adders. After initialization, there
88     * is no need for a blocking lock: Upon lock contention, threads
89 dl 1.4 * try other slots rather than blocking. After initialization, at
90     * least one slot exists, so retries will eventually find a
91 dl 1.6 * candidate Adder. During these retries, there is increased
92 dl 1.3 * contention and reduced locality, which is still better than
93     * alternatives.
94 dl 1.1 */
95    
96 dl 1.6 private static final int NCPU = Runtime.getRuntime().availableProcessors();
97 dl 1.5
98     /**
99 dl 1.8 * Padded variant of AtomicLong. The value field is placed
100     * between pads, hoping that the JVM doesn't reorder them.
101     * Updates are via inlined CAS in methods add and retryAdd.
102     */
103     static final class Adder {
104     volatile long p0, p1, p2, p3, p4, p5, p6;
105     volatile long value;
106     volatile long q0, q1, q2, q3, q4, q5, q6;
107     Adder(long x) { value = x; }
108     }
109 dl 1.1
110 jsr166 1.2 /**
111 dl 1.6 * Holder for the thread-local hash code. The code is initially
112     * random, but may be set to a different value upon collisions.
113 dl 1.1 */
114     static final class HashCode {
115 dl 1.6 static final Random rng = new Random();
116 dl 1.1 int code;
117 dl 1.6 HashCode() {
118 dl 1.8 int h = rng.nextInt(); // Avoid zero, because of xorShift rehash
119     code = (h == 0) ? 1 : h;
120 dl 1.6 }
121 dl 1.1 }
122    
123     /**
124     * The corresponding ThreadLocal class
125     */
126     static final class ThreadHashCode extends ThreadLocal<HashCode> {
127 dl 1.6 public HashCode initialValue() { return new HashCode(); }
128 dl 1.1 }
129    
130     /**
131     * Static per-thread hash codes. Shared across all StripedAdders
132 dl 1.6 * to reduce ThreadLocal pollution and because adjustments due to
133     * collisions in one table are likely to be appropriate for
134     * others.
135 dl 1.1 */
136     static final ThreadHashCode threadHashCode = new ThreadHashCode();
137    
138     /**
139 dl 1.10 * Table of adders. When non-null, size is a power of 2.
140 dl 1.1 */
141     private transient volatile Adder[] adders;
142    
143     /**
144 dl 1.8 * Spinlock (locked via CAS) used when resizing and/or creating Adders.
145 dl 1.1 */
146 dl 1.8 private volatile int busy;
147 dl 1.1
148     /**
149 dl 1.3 * Creates a new adder with zero sum.
150 dl 1.1 */
151     public StripedAdder() {
152 dl 1.3 }
153    
154     /**
155     * Creates a new adder with zero sum, and with stripes presized
156     * for the given expected contention level.
157     *
158     * @param expectedContention the expected number of threads that
159     * will concurrently update the sum.
160     */
161     public StripedAdder(int expectedContention) {
162 dl 1.8 int cap = (expectedContention < NCPU) ? expectedContention : NCPU;
163 dl 1.10 int size = 1;
164 dl 1.8 while (size < cap)
165     size <<= 1;
166     Adder[] as = new Adder[size];
167     for (int i = 0; i < size; ++i)
168     as[i] = new Adder(0);
169     this.adders = as;
170 dl 1.1 }
171    
172     /**
173     * Adds the given value.
174     *
175     * @param x the value to add
176     */
177     public void add(long x) {
178 dl 1.8 Adder[] as; Adder a; int n; // locals to hold volatile reads
179 dl 1.1 HashCode hc = threadHashCode.get();
180 dl 1.6 int h = hc.code;
181 dl 1.10 boolean contended;
182 dl 1.8 if ((as = adders) != null && (n = as.length) > 0 &&
183     (a = as[(n - 1) & h]) != null) {
184     long v = a.value;
185     if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
186     return;
187 dl 1.10 contended = true;
188 dl 1.8 }
189     else
190 dl 1.10 contended = false;
191     retryAdd(x, hc, contended);
192 dl 1.4 }
193    
194     /**
195     * Handle cases of add involving initialization, resizing,
196 dl 1.6 * creating new Adders, and/or contention. See above for
197 dl 1.8 * explanation. This method suffers the usual non-modularity
198     * problems of optimistic retry code, relying on rechecked sets of
199     * reads.
200 dl 1.10 *
201     * @param x the value to add
202     * @param hc the hash code holder
203     * @param precontended true if CAS failed before call
204 dl 1.4 */
205 dl 1.10 private void retryAdd(long x, HashCode hc, boolean precontended) {
206 dl 1.4 int h = hc.code;
207 dl 1.10 boolean collide = false; // true if last slot nonempty
208 dl 1.6 for (;;) {
209 dl 1.8 Adder[] as; Adder a; int n;
210     if ((as = adders) != null && (n = as.length) > 0) {
211 dl 1.10 if ((a = as[(n - 1) & h]) == null) {
212 dl 1.11 if (busy == 0) { // Try to attach new Adder
213     Adder r = new Adder(x); // Optimistically create
214     if (busy == 0 &&
215     UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
216     boolean created = false;
217     try { // Recheck under lock
218     Adder[] rs; int m, j;
219     if ((rs = adders) != null &&
220     (m = rs.length) > 0 &&
221     rs[j = (m - 1) & h] == null) {
222     rs[j] = r;
223     created = true;
224     }
225     } finally {
226     busy = 0;
227 dl 1.8 }
228 dl 1.11 if (created)
229     break;
230     continue; // Slot is now non-empty
231 dl 1.8 }
232 dl 1.6 }
233 dl 1.8 collide = false;
234 dl 1.6 }
235 dl 1.10 else if (precontended) // CAS already known to fail
236     precontended = false; // Continue after rehash
237     else {
238     long v = a.value;
239     if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
240     break;
241     if (!collide)
242     collide = true;
243     else if (n >= NCPU || adders != as)
244 dl 1.11 collide = false; // Can't expand
245 dl 1.10 else if (busy == 0 &&
246     UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
247     collide = false;
248     try {
249     if (adders == as) { // Expand table
250     Adder[] rs = new Adder[n << 1];
251     for (int i = 0; i < n; ++i)
252     rs[i] = as[i];
253     adders = rs;
254     }
255     } finally {
256     busy = 0;
257     }
258     continue;
259     }
260     }
261 dl 1.8 h ^= h << 13; // Rehash
262     h ^= h >>> 17;
263     h ^= h << 5;
264 dl 1.6 }
265 dl 1.10 else if (adders == as) { // Try to default-initialize
266     Adder[] rs = new Adder[1];
267     rs[0] = new Adder(x);
268     boolean init = false;
269     while (adders == as) {
270     if (UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
271     try {
272     if (adders == as) {
273     adders = rs;
274     init = true;
275     }
276     } finally {
277     busy = 0;
278 dl 1.8 }
279 dl 1.10 break;
280 dl 1.5 }
281 dl 1.10 if (adders != as)
282 dl 1.6 break;
283 dl 1.10 Thread.yield(); // Back off
284 dl 1.1 }
285 dl 1.10 if (init)
286     break;
287 dl 1.5 }
288 dl 1.1 }
289 dl 1.8 hc.code = h; // Record index for next time
290 dl 1.1 }
291    
292     /**
293 dl 1.9 * Equivalent to {@code add(1)}.
294     */
295     public void increment() {
296     add(1L);
297     }
298    
299     /**
300     * Equivalent to {@code add(-1)}.
301     */
302     public void decrement() {
303     add(-1L);
304     }
305    
306     /**
307 dl 1.1 * Returns an estimate of the current sum. The result is
308     * calculated by summing multiple variables, so may not be
309     * accurate if updates occur concurrently with this method.
310 jsr166 1.2 *
311 dl 1.1 * @return the estimated sum
312     */
313     public long sum() {
314 dl 1.4 long sum = 0L;
315 dl 1.1 Adder[] as = adders;
316 dl 1.4 if (as != null) {
317     int n = as.length;
318     for (int i = 0; i < n; ++i) {
319     Adder a = as[i];
320     if (a != null)
321 dl 1.8 sum += a.value;
322 dl 1.4 }
323 dl 1.1 }
324     return sum;
325     }
326    
327     /**
328 dl 1.9 * Resets each of the variables to zero, returning the estimated
329     * previous sum. This is effective in fully resetting the sum only
330     * if there are no concurrent updates.
331 dl 1.1 *
332 dl 1.9 * @return the estimated previous sum
333 dl 1.1 */
334 dl 1.9 public long reset() {
335 dl 1.4 long sum = 0L;
336 dl 1.1 Adder[] as = adders;
337 dl 1.4 if (as != null) {
338     int n = as.length;
339     for (int i = 0; i < n; ++i) {
340     Adder a = as[i];
341     if (a != null) {
342 dl 1.8 sum += a.value;
343     a.value = 0L;
344 dl 1.4 }
345 dl 1.1 }
346     }
347     return sum;
348     }
349    
350     private void writeObject(java.io.ObjectOutputStream s)
351     throws java.io.IOException {
352     s.defaultWriteObject();
353     s.writeLong(sum());
354     }
355    
356     private void readObject(ObjectInputStream s)
357     throws IOException, ClassNotFoundException {
358     s.defaultReadObject();
359 dl 1.8 busy = 0;
360 dl 1.4 add(s.readLong());
361 dl 1.1 }
362    
363 dl 1.8 // Unsafe mechanics
364     private static final sun.misc.Unsafe UNSAFE;
365     private static final long busyOffset;
366     private static final long valueOffset;
367     static {
368     try {
369     UNSAFE = getUnsafe();
370     Class<?> sk = StripedAdder.class;
371     busyOffset = UNSAFE.objectFieldOffset
372     (sk.getDeclaredField("busy"));
373     Class<?> ak = Adder.class;
374     valueOffset = UNSAFE.objectFieldOffset
375     (ak.getDeclaredField("value"));
376     } catch (Exception e) {
377     throw new Error(e);
378     }
379     }
380    
381     /**
382     * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
383     * Replace with a simple call to Unsafe.getUnsafe when integrating
384     * into a jdk.
385     *
386     * @return a sun.misc.Unsafe
387     */
388     private static sun.misc.Unsafe getUnsafe() {
389     try {
390     return sun.misc.Unsafe.getUnsafe();
391     } catch (SecurityException se) {
392     try {
393     return java.security.AccessController.doPrivileged
394     (new java.security
395     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
396     public sun.misc.Unsafe run() throws Exception {
397     java.lang.reflect.Field f = sun.misc
398     .Unsafe.class.getDeclaredField("theUnsafe");
399     f.setAccessible(true);
400     return (sun.misc.Unsafe) f.get(null);
401     }});
402     } catch (java.security.PrivilegedActionException e) {
403     throw new RuntimeException("Could not initialize intrinsics",
404     e.getCause());
405     }
406     }
407     }
408    
409 dl 1.1 }