ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
Revision: 1.7
Committed: Tue Jul 26 18:30:35 2011 UTC (12 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.6: +21 -10 lines
Log Message:
Use common empty placeholder

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package jsr166e;
8 import java.util.Arrays;
9 import java.util.Random;
10 import java.util.concurrent.atomic.AtomicInteger;
11 import java.util.concurrent.atomic.AtomicLong;
12 import java.io.IOException;
13 import java.io.Serializable;
14 import java.io.ObjectInputStream;
15 import java.io.ObjectOutputStream;
16
17 /**
18 * A set of variables that together maintain a sum. When updates
19 * (method {@link #add}) are contended across threads, this set of
20 * adder variables may grow dynamically to reduce contention. Method
21 * {@link #sum} returns the current combined total across these
22 * adders. This value is <em>NOT</em> an atomic snapshot (concurrent
23 * updates may occur while the sum is being calculated), and so cannot
24 * be used alone for fine-grained synchronization control.
25 *
26 * <p> This class may be applicable when many threads frequently
27 * update a common sum that is used for purposes such as collecting
28 * statistics. In this case, performance may be significantly faster
29 * than using a shared {@link AtomicLong}, at the expense of using
30 * much more space. On the other hand, if it is known that only one
31 * thread can ever update the sum, performance may be significantly
32 * slower than just updating a local variable.
33 *
34 * <p>A StripedAdder may optionally be constructed with a given
35 * expected contention level; i.e., the number of threads that are
36 * expected to concurrently update the sum. Supplying an accurate
37 * value may improve performance by reducing the need for dynamic
38 * adjustment.
39 *
40 * @author Doug Lea
41 */
42 public class StripedAdder implements Serializable {
43 private static final long serialVersionUID = 7249069246863182397L;
44
45 /*
46 * A StripedAdder maintains a table of Atomic long variables. The
47 * table is indexed by per-thread hash codes.
48 *
49 * By default, the table is lazily initialized, to minimize
50 * footprint until adders are used. On first use, the table is set
51 * to size DEFAULT_INITIAL_SIZE (currently 8). Table size is
52 * bounded by the number of CPUS (if larger than the default
53 * size).
54 *
55 * Per-thread hash codes are initialized to random values.
56 * Collisions are indicated by failed CASes when performing an add
57 * operation (see method retryAdd). Upon a collision, if the table
58 * size is less than the capacity, it is doubled in size unless
59 * some other thread holds lock. If a hashed slot is empty, and
60 * lock is available, a new Adder is created. Otherwise, if the
61 * slot exists, a CAS is tried. Retries proceed by "double
62 * hashing", using a secondary hash (Marsaglia XorShift) to try to
63 * find a free slot.
64 *
65 * The table size is capped because, when there are more threads
66 * than CPUs, supposing that each thread were bound to a CPU,
67 * there would exist a perfect hash function mapping threads to
68 * slots that eliminates collisions. When we reach capacity, we
69 * search for this mapping by randomly varying the hash codes of
70 * colliding threads. Because search is random, and failures only
71 * become known via CAS failures, convergence will be slow, and
72 * because threads are typically not bound to CPUS forever, may
73 * not occur at all. However, despite these limitations, observed
74 * contention is typically low in these cases.
75 *
76 * Table entries are of class Adder; a form of AtomicLong padded
77 * to reduce cache contention on most processors. Padding is
78 * overkill for most Atomics because they are usually irregularly
79 * scattered in memory and thus don't interfere much with each
80 * other. But Atomic objects residing in arrays will tend to be
81 * placed adjacent to each other, and so will most often share
82 * cache lines without this precaution. Adders are by default
83 * constructed upon first use, which further improves per-thread
84 * locality and helps reduce footprint.
85 *
86 * A single spinlock is used for resizing the table as well as
87 * populating slots with new Adders. Upon lock contention, threads
88 * try other slots rather than blocking. After initialization, at
89 * least one slot exists, so retries will eventually find a
90 * candidate Adder. During these retries, there is increased
91 * contention and reduced locality, which is still better than
92 * alternatives.
93 */
94
95 /**
96 * Padded version of AtomicLong
97 */
98 static final class Adder extends AtomicLong {
99 long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
100 Adder(long x) { super(x); }
101 }
102
103 private static final int NCPU = Runtime.getRuntime().availableProcessors();
104
105 /**
106 * Table bounds. DEFAULT_INITIAL_SIZE is the table size set upon
107 * first use under default constructor, and must be a power of
108 * two. There is not much point in making size a lot smaller than
109 * that of Adders though. CAP is the maximum allowed table size.
110 */
111 private static final int DEFAULT_INITIAL_SIZE = 8;
112 private static final int CAP = Math.max(NCPU, DEFAULT_INITIAL_SIZE);
113
114 /**
115 * Holder for the thread-local hash code. The code is initially
116 * random, but may be set to a different value upon collisions.
117 */
118 static final class HashCode {
119 static final Random rng = new Random();
120 int code;
121 HashCode() {
122 int h = rng.nextInt();
123 code = (h == 0) ? 1 : h; // ensure nonzero
124 }
125 }
126
127 /**
128 * The corresponding ThreadLocal class
129 */
130 static final class ThreadHashCode extends ThreadLocal<HashCode> {
131 public HashCode initialValue() { return new HashCode(); }
132 }
133
134 /**
135 * Static per-thread hash codes. Shared across all StripedAdders
136 * to reduce ThreadLocal pollution and because adjustments due to
137 * collisions in one table are likely to be appropriate for
138 * others.
139 */
140 static final ThreadHashCode threadHashCode = new ThreadHashCode();
141
142 /**
143 * Common placeholder for empty arrays.
144 */
145 static final Adder[] EMPTY_ARRAY = new Adder[0];
146
147 /**
148 * Table of adders. Size is either zero or a power of two, grows
149 * to be at most CAP.
150 */
151 private transient volatile Adder[] adders;
152
153 /**
154 * Serves as a lock when resizing and/or creating Adders. There
155 * is no need for a blocking lock: Except during initialization
156 * races, when busy, other threads try other slots. However,
157 * during (double-checked) initializations, we use the
158 * "synchronized" lock on this object.
159 */
160 private final AtomicInteger mutex;
161
162 /**
163 * Creates a new adder with zero sum.
164 */
165 public StripedAdder() {
166 this.adders = EMPTY_ARRAY;
167 this.mutex = new AtomicInteger();
168 // remaining initialization on first call to add.
169 }
170
171 /**
172 * Creates a new adder with zero sum, and with stripes presized
173 * for the given expected contention level.
174 *
175 * @param expectedContention the expected number of threads that
176 * will concurrently update the sum.
177 */
178 public StripedAdder(int expectedContention) {
179 if (expectedContention > 0) {
180 int cap = (expectedContention < CAP) ? expectedContention : CAP;
181 int size = 1;
182 while (size < cap)
183 size <<= 1;
184 Adder[] as = new Adder[size];
185 for (int i = 0; i < size; ++i)
186 as[i] = new Adder(0);
187 this.adders = as;
188 }
189 else
190 this.adders = EMPTY_ARRAY;
191 this.mutex = new AtomicInteger();
192 }
193
194 /**
195 * Adds the given value.
196 *
197 * @param x the value to add
198 */
199 public void add(long x) {
200 Adder[] as; Adder a; int n; long v; // locals to hold volatile reads
201 HashCode hc = threadHashCode.get();
202 int h = hc.code;
203 if ((as = adders) == null || (n = as.length) < 1 ||
204 (a = as[(n - 1) & h]) == null ||
205 !a.compareAndSet(v = a.get(), v + x))
206 retryAdd(x, hc);
207 }
208
209 /**
210 * Handle cases of add involving initialization, resizing,
211 * creating new Adders, and/or contention. See above for
212 * explanation.
213 */
214 private void retryAdd(long x, HashCode hc) {
215 int h = hc.code;
216 final AtomicInteger mutex = this.mutex;
217 int collisions = 1 - mutex.get(); // first guess: collides if not locked
218 for (;;) {
219 Adder[] as; Adder a; long v; int k, n;
220 while ((as = adders) == null || (n = as.length) < 1) {
221 synchronized(mutex) { // Try to initialize
222 if (adders == as) {
223 Adder[] rs = new Adder[DEFAULT_INITIAL_SIZE];
224 rs[h & (DEFAULT_INITIAL_SIZE - 1)] = new Adder(0);
225 adders = rs;
226 }
227 }
228 collisions = 0;
229 }
230
231 if ((a = as[k = (n - 1) & h]) == null) { // Try to add slot
232 if (mutex.get() == 0 && mutex.compareAndSet(0, 1)) {
233 try {
234 if (adders == as && as[k] == null)
235 a = as[k] = new Adder(x);
236 } finally {
237 mutex.set(0);
238 }
239 if (a != null)
240 break;
241 }
242 collisions = 0;
243 }
244 else if (collisions != 0 && n < CAP && // Try to expand table
245 mutex.get() == 0 && mutex.compareAndSet(0, 1)) {
246 try {
247 if (adders == as) {
248 Adder[] rs = new Adder[n << 1];
249 for (int i = 0; i < n; ++i)
250 rs[i] = as[i];
251 adders = rs;
252 }
253 } finally {
254 mutex.set(0);
255 }
256 collisions = 0;
257 }
258 else if (a.compareAndSet(v = a.get(), v + x))
259 break;
260 else
261 collisions = 1;
262 h ^= h << 13; // Rehash
263 h ^= h >>> 17;
264 h ^= h << 5;
265 }
266 hc.code = h;
267 }
268
269 /**
270 * Returns an estimate of the current sum. The result is
271 * calculated by summing multiple variables, so may not be
272 * accurate if updates occur concurrently with this method.
273 *
274 * @return the estimated sum
275 */
276 public long sum() {
277 long sum = 0L;
278 Adder[] as = adders;
279 if (as != null) {
280 int n = as.length;
281 for (int i = 0; i < n; ++i) {
282 Adder a = as[i];
283 if (a != null)
284 sum += a.get();
285 }
286 }
287 return sum;
288 }
289
290 /**
291 * Resets each of the variables to zero. This is effective in
292 * fully resetting the sum only if there are no concurrent
293 * updates.
294 */
295 public void reset() {
296 Adder[] as = adders;
297 if (as != null) {
298 int n = as.length;
299 for (int i = 0; i < n; ++i) {
300 Adder a = as[i];
301 if (a != null)
302 a.set(0L);
303 }
304 }
305 }
306
307 /**
308 * Equivalent to {@code add(1)}.
309 */
310 public void increment() {
311 add(1L);
312 }
313
314 /**
315 * Equivalent to {@code add(-1)}.
316 */
317 public void decrement() {
318 add(-1L);
319 }
320
321 /**
322 * Equivalent to {@link #sum} followed by {@link #reset}.
323 *
324 * @return the estimated sum
325 */
326 public long sumAndReset() {
327 long sum = 0L;
328 Adder[] as = adders;
329 if (as != null) {
330 int n = as.length;
331 for (int i = 0; i < n; ++i) {
332 Adder a = as[i];
333 if (a != null) {
334 sum += a.get();
335 a.set(0L);
336 }
337 }
338 }
339 return sum;
340 }
341
342 private void writeObject(java.io.ObjectOutputStream s)
343 throws java.io.IOException {
344 s.defaultWriteObject();
345 s.writeLong(sum());
346 }
347
348 private void readObject(ObjectInputStream s)
349 throws IOException, ClassNotFoundException {
350 s.defaultReadObject();
351 mutex.set(0);
352 add(s.readLong());
353 }
354
355 }