ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
Revision: 1.8
Committed: Thu Jul 28 15:05:55 2011 UTC (12 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.7: +177 -120 lines
Log Message:
Footprint and performance improvements

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package jsr166e;
8     import java.util.Random;
9     import java.util.concurrent.atomic.AtomicInteger;
10     import java.util.concurrent.atomic.AtomicLong;
11     import java.io.IOException;
12     import java.io.Serializable;
13     import java.io.ObjectInputStream;
14     import java.io.ObjectOutputStream;
15    
16     /**
17     * A set of variables that together maintain a sum. When updates
18 dl 1.3 * (method {@link #add}) are contended across threads, this set of
19     * adder variables may grow dynamically to reduce contention. Method
20     * {@link #sum} returns the current combined total across these
21     * adders. This value is <em>NOT</em> an atomic snapshot (concurrent
22     * updates may occur while the sum is being calculated), and so cannot
23     * be used alone for fine-grained synchronization control.
24 jsr166 1.2 *
25 dl 1.1 * <p> This class may be applicable when many threads frequently
26     * update a common sum that is used for purposes such as collecting
27     * statistics. In this case, performance may be significantly faster
28     * than using a shared {@link AtomicLong}, at the expense of using
29 dl 1.4 * much more space. On the other hand, if it is known that only one
30     * thread can ever update the sum, performance may be significantly
31     * slower than just updating a local variable.
32 dl 1.1 *
33 dl 1.3 * <p>A StripedAdder may optionally be constructed with a given
34     * expected contention level; i.e., the number of threads that are
35     * expected to concurrently update the sum. Supplying an accurate
36     * value may improve performance by reducing the need for dynamic
37     * adjustment.
38     *
39 jsr166 1.2 * @author Doug Lea
40 dl 1.1 */
41     public class StripedAdder implements Serializable {
42     private static final long serialVersionUID = 7249069246863182397L;
43    
44     /*
45 dl 1.4 * A StripedAdder maintains a table of Atomic long variables. The
46 dl 1.6 * table is indexed by per-thread hash codes.
47 dl 1.3 *
48 dl 1.8 * Table entries are of class Adder; a variant of AtomicLong
49     * padded to reduce cache contention on most processors. Padding
50     * is overkill for most Atomics because they are usually
51     * irregularly scattered in memory and thus don't interfere much
52     * with each other. But Atomic objects residing in arrays will
53     * tend to be placed adjacent to each other, and so will most
54     * often share cache lines (with a huge negative performance
55     * impact) without this precaution.
56     *
57     * Because Adders are relatively large, we avoid creating them
58     * until they are needed. On the other hand, we try to create them
59     * on any sign of contention.
60 dl 1.6 *
61     * Per-thread hash codes are initialized to random values.
62     * Collisions are indicated by failed CASes when performing an add
63     * operation (see method retryAdd). Upon a collision, if the table
64     * size is less than the capacity, it is doubled in size unless
65     * some other thread holds lock. If a hashed slot is empty, and
66     * lock is available, a new Adder is created. Otherwise, if the
67     * slot exists, a CAS is tried. Retries proceed by "double
68     * hashing", using a secondary hash (Marsaglia XorShift) to try to
69     * find a free slot.
70     *
71 dl 1.8 * By default, the table is lazily initialized. Upon first use,
72     * the table is set to size 2 (the minimum non-empty size), but
73     * containing only a single Adder. The maximum table size is
74     * bounded by nearest power of two >= the number of CPUS. The
75     * table size is capped because, when there are more threads than
76     * CPUs, supposing that each thread were bound to a CPU, there
77     * would exist a perfect hash function mapping threads to slots
78     * that eliminates collisions. When we reach capacity, we search
79     * for this mapping by randomly varying the hash codes of
80 dl 1.6 * colliding threads. Because search is random, and failures only
81     * become known via CAS failures, convergence will be slow, and
82     * because threads are typically not bound to CPUS forever, may
83     * not occur at all. However, despite these limitations, observed
84     * contention is typically low in these cases.
85 dl 1.3 *
86     * A single spinlock is used for resizing the table as well as
87 dl 1.8 * populating slots with new Adders. After initialization, there
88     * is no need for a blocking lock: Upon lock contention, threads
89 dl 1.4 * try other slots rather than blocking. After initialization, at
90     * least one slot exists, so retries will eventually find a
91 dl 1.6 * candidate Adder. During these retries, there is increased
92 dl 1.3 * contention and reduced locality, which is still better than
93     * alternatives.
94 dl 1.1 */
95    
96 dl 1.6 private static final int NCPU = Runtime.getRuntime().availableProcessors();
97 dl 1.5
98     /**
99 dl 1.8 * Padded variant of AtomicLong. The value field is placed
100     * between pads, hoping that the JVM doesn't reorder them.
101     * Updates are via inlined CAS in methods add and retryAdd.
102     */
103     static final class Adder {
104     volatile long p0, p1, p2, p3, p4, p5, p6;
105     volatile long value;
106     volatile long q0, q1, q2, q3, q4, q5, q6;
107     Adder(long x) { value = x; }
108     }
109 dl 1.1
110 jsr166 1.2 /**
111 dl 1.6 * Holder for the thread-local hash code. The code is initially
112     * random, but may be set to a different value upon collisions.
113 dl 1.1 */
114     static final class HashCode {
115 dl 1.6 static final Random rng = new Random();
116 dl 1.1 int code;
117 dl 1.6 HashCode() {
118 dl 1.8 int h = rng.nextInt(); // Avoid zero, because of xorShift rehash
119     code = (h == 0) ? 1 : h;
120 dl 1.6 }
121 dl 1.1 }
122    
123     /**
124     * The corresponding ThreadLocal class
125     */
126     static final class ThreadHashCode extends ThreadLocal<HashCode> {
127 dl 1.6 public HashCode initialValue() { return new HashCode(); }
128 dl 1.1 }
129    
130     /**
131     * Static per-thread hash codes. Shared across all StripedAdders
132 dl 1.6 * to reduce ThreadLocal pollution and because adjustments due to
133     * collisions in one table are likely to be appropriate for
134     * others.
135 dl 1.1 */
136     static final ThreadHashCode threadHashCode = new ThreadHashCode();
137    
138     /**
139 dl 1.8 * Table of adders. When non-null, size is a power of 2, at least 2.
140 dl 1.1 */
141     private transient volatile Adder[] adders;
142    
143     /**
144 dl 1.8 * Spinlock (locked via CAS) used when resizing and/or creating Adders.
145 dl 1.1 */
146 dl 1.8 private volatile int busy;
147 dl 1.1
148     /**
149 dl 1.3 * Creates a new adder with zero sum.
150 dl 1.1 */
151     public StripedAdder() {
152 dl 1.3 }
153    
154     /**
155     * Creates a new adder with zero sum, and with stripes presized
156     * for the given expected contention level.
157     *
158     * @param expectedContention the expected number of threads that
159     * will concurrently update the sum.
160     */
161     public StripedAdder(int expectedContention) {
162 dl 1.8 int cap = (expectedContention < NCPU) ? expectedContention : NCPU;
163     int size = 2;
164     while (size < cap)
165     size <<= 1;
166     Adder[] as = new Adder[size];
167     for (int i = 0; i < size; ++i)
168     as[i] = new Adder(0);
169     this.adders = as;
170 dl 1.1 }
171    
172     /**
173     * Adds the given value.
174     *
175     * @param x the value to add
176     */
177     public void add(long x) {
178 dl 1.8 Adder[] as; Adder a; int n; // locals to hold volatile reads
179 dl 1.1 HashCode hc = threadHashCode.get();
180 dl 1.6 int h = hc.code;
181 dl 1.8 boolean collide;
182     if ((as = adders) != null && (n = as.length) > 0 &&
183     (a = as[(n - 1) & h]) != null) {
184     long v = a.value;
185     if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
186     return;
187     collide = true;
188     }
189     else
190     collide = false;
191     retryAdd(x, hc, collide);
192 dl 1.4 }
193    
194     /**
195     * Handle cases of add involving initialization, resizing,
196 dl 1.6 * creating new Adders, and/or contention. See above for
197 dl 1.8 * explanation. This method suffers the usual non-modularity
198     * problems of optimistic retry code, relying on rechecked sets of
199     * reads.
200 dl 1.4 */
201 dl 1.8 private void retryAdd(long x, HashCode hc, boolean collide) {
202 dl 1.4 int h = hc.code;
203 dl 1.6 for (;;) {
204 dl 1.8 Adder[] as; Adder a; int n;
205     if ((as = adders) != null && (n = as.length) > 0) {
206     if ((a = as[(n - 1) & h]) != null) {
207     boolean shared = true; // Slot exists
208     if (collide && n < NCPU && busy == 0 &&
209     UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
210     try {
211     if (adders == as) { // Expand table
212     Adder[] rs = new Adder[n << 1];
213     for (int i = 0; i < n; ++i)
214     rs[i] = as[i];
215     adders = rs;
216     shared = false;
217     }
218     } finally {
219     busy = 0;
220     }
221     if (shared || (h & n) != 0) {
222     collide = false;
223     continue; // Array or index changed
224     }
225     }
226     long v = a.value;
227     if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
228     break;
229     collide = shared;
230     }
231     else { // Try to attach new Adder
232     if (busy == 0 &&
233     UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
234     boolean created = false;
235     try { // Recheck under lock
236     Adder[] rs; int m, j;
237     if ((rs = adders) != null && (m = rs.length) > 0 &&
238     rs[j = (m - 1) & h] == null) {
239     rs[j] = new Adder(x);
240     created = true;
241     }
242     } finally {
243     busy = 0;
244     }
245     if (created)
246     break;
247     continue; // Slot is now non-empty
248 dl 1.6 }
249 dl 1.8 collide = false;
250 dl 1.6 }
251 dl 1.8 h ^= h << 13; // Rehash
252     h ^= h >>> 17;
253     h ^= h << 5;
254 dl 1.6 }
255 dl 1.8 else if (busy == 0) { // Default-initialize
256     Adder r = new Adder(x);
257     Adder[] rs = new Adder[2];
258     rs[h & 1] = r;
259     if (adders == as && busy == 0 &&
260     UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
261     boolean init = false;
262 dl 1.5 try {
263 dl 1.8 if (adders == as) {
264     adders = rs;
265     init = true;
266     }
267 dl 1.5 } finally {
268 dl 1.8 busy = 0;
269 dl 1.5 }
270 dl 1.8 if (init)
271 dl 1.6 break;
272 dl 1.1 }
273 dl 1.5 }
274 dl 1.8 else if (adders == as) // Lost initialization race
275     Thread.yield();
276 dl 1.1 }
277 dl 1.8 hc.code = h; // Record index for next time
278 dl 1.1 }
279    
280     /**
281     * Returns an estimate of the current sum. The result is
282     * calculated by summing multiple variables, so may not be
283     * accurate if updates occur concurrently with this method.
284 jsr166 1.2 *
285 dl 1.1 * @return the estimated sum
286     */
287     public long sum() {
288 dl 1.4 long sum = 0L;
289 dl 1.1 Adder[] as = adders;
290 dl 1.4 if (as != null) {
291     int n = as.length;
292     for (int i = 0; i < n; ++i) {
293     Adder a = as[i];
294     if (a != null)
295 dl 1.8 sum += a.value;
296 dl 1.4 }
297 dl 1.1 }
298     return sum;
299     }
300    
301     /**
302     * Resets each of the variables to zero. This is effective in
303     * fully resetting the sum only if there are no concurrent
304     * updates.
305     */
306     public void reset() {
307     Adder[] as = adders;
308 dl 1.4 if (as != null) {
309     int n = as.length;
310     for (int i = 0; i < n; ++i) {
311     Adder a = as[i];
312     if (a != null)
313 dl 1.8 a.value = 0L;
314 dl 1.4 }
315 dl 1.1 }
316     }
317    
318     /**
319     * Equivalent to {@code add(1)}.
320     */
321     public void increment() {
322     add(1L);
323     }
324    
325     /**
326     * Equivalent to {@code add(-1)}.
327     */
328     public void decrement() {
329     add(-1L);
330     }
331    
332     /**
333     * Equivalent to {@link #sum} followed by {@link #reset}.
334     *
335     * @return the estimated sum
336     */
337     public long sumAndReset() {
338 dl 1.4 long sum = 0L;
339 dl 1.1 Adder[] as = adders;
340 dl 1.4 if (as != null) {
341     int n = as.length;
342     for (int i = 0; i < n; ++i) {
343     Adder a = as[i];
344     if (a != null) {
345 dl 1.8 sum += a.value;
346     a.value = 0L;
347 dl 1.4 }
348 dl 1.1 }
349     }
350     return sum;
351     }
352    
353     private void writeObject(java.io.ObjectOutputStream s)
354     throws java.io.IOException {
355     s.defaultWriteObject();
356     s.writeLong(sum());
357     }
358    
359     private void readObject(ObjectInputStream s)
360     throws IOException, ClassNotFoundException {
361     s.defaultReadObject();
362 dl 1.8 busy = 0;
363 dl 1.4 add(s.readLong());
364 dl 1.1 }
365    
366 dl 1.8 // Unsafe mechanics
367     private static final sun.misc.Unsafe UNSAFE;
368     private static final long busyOffset;
369     private static final long valueOffset;
370     static {
371     try {
372     UNSAFE = getUnsafe();
373     Class<?> sk = StripedAdder.class;
374     busyOffset = UNSAFE.objectFieldOffset
375     (sk.getDeclaredField("busy"));
376     Class<?> ak = Adder.class;
377     valueOffset = UNSAFE.objectFieldOffset
378     (ak.getDeclaredField("value"));
379     } catch (Exception e) {
380     throw new Error(e);
381     }
382     }
383    
384     /**
385     * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
386     * Replace with a simple call to Unsafe.getUnsafe when integrating
387     * into a jdk.
388     *
389     * @return a sun.misc.Unsafe
390     */
391     private static sun.misc.Unsafe getUnsafe() {
392     try {
393     return sun.misc.Unsafe.getUnsafe();
394     } catch (SecurityException se) {
395     try {
396     return java.security.AccessController.doPrivileged
397     (new java.security
398     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
399     public sun.misc.Unsafe run() throws Exception {
400     java.lang.reflect.Field f = sun.misc
401     .Unsafe.class.getDeclaredField("theUnsafe");
402     f.setAccessible(true);
403     return (sun.misc.Unsafe) f.get(null);
404     }});
405     } catch (java.security.PrivilegedActionException e) {
406     throw new RuntimeException("Could not initialize intrinsics",
407     e.getCause());
408     }
409     }
410     }
411    
412 dl 1.1 }