ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
Revision: 1.5
Committed: Sun Jul 24 15:08:21 2011 UTC (12 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.4: +57 -40 lines
Log Message:
Reduce collisions

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package jsr166e;
8     import java.util.Arrays;
9     import java.util.Random;
10     import java.util.concurrent.atomic.AtomicInteger;
11     import java.util.concurrent.atomic.AtomicLong;
12     import java.io.IOException;
13     import java.io.Serializable;
14     import java.io.ObjectInputStream;
15     import java.io.ObjectOutputStream;
16    
17     /**
18     * A set of variables that together maintain a sum. When updates
19 dl 1.3 * (method {@link #add}) are contended across threads, this set of
20     * adder variables may grow dynamically to reduce contention. Method
21     * {@link #sum} returns the current combined total across these
22     * adders. This value is <em>NOT</em> an atomic snapshot (concurrent
23     * updates may occur while the sum is being calculated), and so cannot
24     * be used alone for fine-grained synchronization control.
25 jsr166 1.2 *
26 dl 1.1 * <p> This class may be applicable when many threads frequently
27     * update a common sum that is used for purposes such as collecting
28     * statistics. In this case, performance may be significantly faster
29     * than using a shared {@link AtomicLong}, at the expense of using
30 dl 1.4 * much more space. On the other hand, if it is known that only one
31     * thread can ever update the sum, performance may be significantly
32     * slower than just updating a local variable.
33 dl 1.1 *
34 dl 1.3 * <p>A StripedAdder may optionally be constructed with a given
35     * expected contention level; i.e., the number of threads that are
36     * expected to concurrently update the sum. Supplying an accurate
37     * value may improve performance by reducing the need for dynamic
38     * adjustment.
39     *
40 jsr166 1.2 * @author Doug Lea
41 dl 1.1 */
42     public class StripedAdder implements Serializable {
43     private static final long serialVersionUID = 7249069246863182397L;
44    
45     /*
46 dl 1.4 * A StripedAdder maintains a table of Atomic long variables. The
47 dl 1.3 * table is indexed by per-thread hash codes that are initialized
48     * to random values.
49     *
50     * The table doubles in size upon contention (as indicated by
51     * failed CASes when performing add()), but is capped at the
52     * nearest power of two >= #CPUS. This reflects the idea that,
53     * when there are more threads than CPUs, then if each thread were
54     * bound to a CPU, there would exist a perfect hash function
55     * mapping threads to slots that eliminates collisions. When we
56     * reach capacity, we search for this mapping by randomly varying
57     * the hash codes of colliding threads. Because search is random,
58     * and failures only become known via CAS failures, convergence
59     * will be slow, and because threads are typically not bound to
60     * CPUS forever, may not occur at all. However, despite these
61 dl 1.4 * limitations, observed contention is typically low in these
62 dl 1.3 * cases.
63     *
64     * Table entries are of class Adder; a form of AtomicLong padded
65     * to reduce cache contention on most processors. Padding is
66     * overkill for most Atomics because they are most often
67     * irregularly scattered in memory and thus don't interfere much
68     * with each other. But Atomic objects residing in arrays will
69     * tend to be placed adjacent to each other, and so will most
70 dl 1.4 * often share cache lines without this precaution. Adders are
71     * constructed upon first use, which further improves per-thread
72     * locality and helps reduce (an already large) footprint.
73 dl 1.3 *
74     * A single spinlock is used for resizing the table as well as
75 dl 1.1 * populating slots with new Adders. Upon lock contention, threads
76 dl 1.4 * try other slots rather than blocking. After initialization, at
77     * least one slot exists, so retries will eventually find a
78 dl 1.3 * candidate Adder. During these retries, there is increased
79     * contention and reduced locality, which is still better than
80     * alternatives.
81 dl 1.1 */
82    
83 jsr166 1.2 /**
84 dl 1.1 * Number of processors, to place a cap on table growth.
85     */
86     static final int NCPU = Runtime.getRuntime().availableProcessors();
87    
88     /**
89 dl 1.5 * The table size set upon first use when default-constructed
90     */
91     private static final int DEFAULT_ARRAY_SIZE = 8;
92    
93     /**
94 dl 1.3 * Padded version of AtomicLong
95 dl 1.1 */
96     static final class Adder extends AtomicLong {
97     long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd;
98     Adder(long x) { super(x); }
99     }
100    
101 jsr166 1.2 /**
102 dl 1.3 * Holder for the thread-local hash code. The code starts off with
103 dl 1.4 * a given random value, but may be set to a different value upon
104     * collisions in retryAdd.
105 dl 1.1 */
106     static final class HashCode {
107     int code;
108     HashCode(int h) { code = h; }
109     }
110    
111     /**
112     * The corresponding ThreadLocal class
113     */
114     static final class ThreadHashCode extends ThreadLocal<HashCode> {
115     static final Random rng = new Random();
116 jsr166 1.2 public HashCode initialValue() {
117 dl 1.1 int h = rng.nextInt();
118     return new HashCode((h == 0) ? 1 : h); // ensure nonzero
119     }
120     }
121    
122     /**
123     * Static per-thread hash codes. Shared across all StripedAdders
124     * because adjustments due to collisions in one table are likely
125     * to be appropriate for others.
126     */
127     static final ThreadHashCode threadHashCode = new ThreadHashCode();
128    
129     /**
130 dl 1.5 * Table of adders. Size is power of two, grows to be at most NCPU.
131 dl 1.1 */
132     private transient volatile Adder[] adders;
133    
134     /**
135     * Serves as a lock when resizing and/or creating Adders. There
136 dl 1.5 * is no need for a blocking lock: Except during initialization
137     * races, when busy, other threads try other slots.
138 dl 1.1 */
139     private final AtomicInteger mutex;
140    
141     /**
142 dl 1.3 * Creates a new adder with zero sum.
143 dl 1.1 */
144     public StripedAdder() {
145 dl 1.4 this.mutex = new AtomicInteger();
146     // remaining initialization on first call to add.
147 dl 1.3 }
148    
149     /**
150     * Creates a new adder with zero sum, and with stripes presized
151     * for the given expected contention level.
152     *
153     * @param expectedContention the expected number of threads that
154     * will concurrently update the sum.
155     */
156     public StripedAdder(int expectedContention) {
157 dl 1.5 int size;
158     if (expectedContention > 0) {
159     int cap = (expectedContention < NCPU) ? expectedContention : NCPU;
160     size = 1;
161     while (size < cap)
162     size <<= 1;
163     }
164     else
165     size = 0;
166 dl 1.3 Adder[] as = new Adder[size];
167 dl 1.4 for (int i = 0; i < size; ++i)
168     as[i] = new Adder(0);
169 dl 1.1 this.adders = as;
170     this.mutex = new AtomicInteger();
171     }
172    
173     /**
174     * Adds the given value.
175     *
176     * @param x the value to add
177     */
178     public void add(long x) {
179 dl 1.4 Adder[] as; Adder a; int n; long v; // locals to hold volatile reads
180 dl 1.1 HashCode hc = threadHashCode.get();
181 dl 1.4 if ((as = adders) == null || (n = as.length) < 1 ||
182     (a = as[hc.code & (n - 1)]) == null ||
183     !a.compareAndSet(v = a.get(), v + x))
184     retryAdd(x, hc);
185     }
186    
187     /**
188     * Handle cases of add involving initialization, resizing,
189     * creating new Adders, and/or contention.
190     */
191     private void retryAdd(long x, HashCode hc) {
192     int h = hc.code;
193     final AtomicInteger mutex = this.mutex;
194 dl 1.5 for (boolean retried = false; ; retried = true) {
195     Adder[] as; Adder a; long v; int n, k; // Locals for volatiles
196     if ((as = adders) == null || (n = as.length) < 1) {
197     if (mutex.get() == 0 && mutex.compareAndSet(0, 1)) {
198     try {
199     if (adders == null) // Default-initialize
200     adders = new Adder[DEFAULT_ARRAY_SIZE];
201     } finally {
202     mutex.set(0);
203     }
204 dl 1.1 }
205 dl 1.5 else
206     Thread.yield(); // initialization race
207     }
208     else if ((a = as[k = h & (n - 1)]) != null &&
209     retried && a.compareAndSet(v = a.get(), v + x))
210     break;
211     else if ((a == null || n < NCPU) &&
212     mutex.get() == 0 && mutex.compareAndSet(0, 1)) {
213     boolean created = false;
214     try {
215     if (adders == as) {
216     if (as[k] == null) {
217     as[k] = new Adder(x);
218     created = true;
219     }
220     else { // Expand table
221     Adder[] rs = new Adder[n << 1];
222     for (int i = 0; i < n; ++i)
223     rs[i] = as[i];
224     adders = rs;
225     }
226 dl 1.1 }
227 dl 1.5 } finally {
228     mutex.set(0);
229 dl 1.1 }
230 dl 1.5 if (created)
231 dl 1.1 break;
232     }
233 dl 1.5 else { // Try elsewhere
234     h ^= h << 13;
235     h ^= h >>> 17; // Marsaglia XorShift
236     h ^= h << 5;
237     }
238 dl 1.1 }
239 dl 1.5 hc.code = h;
240 dl 1.1 }
241    
242     /**
243     * Returns an estimate of the current sum. The result is
244     * calculated by summing multiple variables, so may not be
245     * accurate if updates occur concurrently with this method.
246 jsr166 1.2 *
247 dl 1.1 * @return the estimated sum
248     */
249     public long sum() {
250 dl 1.4 long sum = 0L;
251 dl 1.1 Adder[] as = adders;
252 dl 1.4 if (as != null) {
253     int n = as.length;
254     for (int i = 0; i < n; ++i) {
255     Adder a = as[i];
256     if (a != null)
257     sum += a.get();
258     }
259 dl 1.1 }
260     return sum;
261     }
262    
263     /**
264     * Resets each of the variables to zero. This is effective in
265     * fully resetting the sum only if there are no concurrent
266     * updates.
267     */
268     public void reset() {
269     Adder[] as = adders;
270 dl 1.4 if (as != null) {
271     int n = as.length;
272     for (int i = 0; i < n; ++i) {
273     Adder a = as[i];
274     if (a != null)
275     a.set(0L);
276     }
277 dl 1.1 }
278     }
279    
280     /**
281     * Equivalent to {@code add(1)}.
282     */
283     public void increment() {
284     add(1L);
285     }
286    
287     /**
288     * Equivalent to {@code add(-1)}.
289     */
290     public void decrement() {
291     add(-1L);
292     }
293    
294     /**
295     * Equivalent to {@link #sum} followed by {@link #reset}.
296     *
297     * @return the estimated sum
298     */
299     public long sumAndReset() {
300 dl 1.4 long sum = 0L;
301 dl 1.1 Adder[] as = adders;
302 dl 1.4 if (as != null) {
303     int n = as.length;
304     for (int i = 0; i < n; ++i) {
305     Adder a = as[i];
306     if (a != null) {
307     sum += a.get();
308     a.set(0L);
309     }
310 dl 1.1 }
311     }
312     return sum;
313     }
314    
315     private void writeObject(java.io.ObjectOutputStream s)
316     throws java.io.IOException {
317     s.defaultWriteObject();
318     s.writeLong(sum());
319     }
320    
321     private void readObject(ObjectInputStream s)
322     throws IOException, ClassNotFoundException {
323     s.defaultReadObject();
324     mutex.set(0);
325 dl 1.4 add(s.readLong());
326 dl 1.1 }
327    
328     }