ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
Revision: 1.11
Committed: Fri Jul 29 14:23:35 2011 UTC (12 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.10: +19 -15 lines
Log Message:
Reduce contention

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package jsr166e;
8 import java.util.Random;
9 import java.util.concurrent.atomic.AtomicInteger;
10 import java.util.concurrent.atomic.AtomicLong;
11 import java.io.IOException;
12 import java.io.Serializable;
13 import java.io.ObjectInputStream;
14 import java.io.ObjectOutputStream;
15
16 /**
17 * A set of variables that together maintain a sum. When updates
18 * (method {@link #add}) are contended across threads, this set of
19 * adder variables may grow dynamically to reduce contention. Method
20 * {@link #sum} returns the current combined total across these
21 * adders. This value is <em>NOT</em> an atomic snapshot (concurrent
22 * updates may occur while the sum is being calculated), and so cannot
23 * be used alone for fine-grained synchronization control.
24 *
25 * <p> This class may be applicable when many threads frequently
26 * update a common sum that is used for purposes such as collecting
27 * statistics. In this case, performance may be significantly faster
28 * than using a shared {@link AtomicLong}, at the expense of using
29 * more space. On the other hand, if it is known that only one thread
30 * can ever update the sum, performance may be significantly slower
31 * than just updating a local variable.
32 *
33 * <p>A StripedAdder may optionally be constructed with a given
34 * expected contention level; i.e., the number of threads that are
35 * expected to concurrently update the sum. Supplying an accurate
36 * value may improve performance by reducing the need for dynamic
37 * adjustment.
38 *
39 * @author Doug Lea
40 */
41 public class StripedAdder implements Serializable {
42 private static final long serialVersionUID = 7249069246863182397L;
43
44 /*
45 * A StripedAdder maintains a table of Atomic long variables. The
46 * table is indexed by per-thread hash codes.
47 *
48 * Table entries are of class Adder; a variant of AtomicLong
49 * padded to reduce cache contention on most processors. Padding
50 * is overkill for most Atomics because they are usually
51 * irregularly scattered in memory and thus don't interfere much
52 * with each other. But Atomic objects residing in arrays will
53 * tend to be placed adjacent to each other, and so will most
54 * often share cache lines (with a huge negative performance
55 * impact) without this precaution.
56 *
57 * Because Adders are relatively large, we avoid creating them
58 * until they are needed. On the other hand, we try to create them
59 * on any sign of contention.
60 *
61 * Per-thread hash codes are initialized to random values.
62 * Collisions are indicated by failed CASes when performing an add
63 * operation (see method retryAdd). Upon a collision, if the table
64 * size is less than the capacity, it is doubled in size unless
65 * some other thread holds lock. If a hashed slot is empty, and
66 * lock is available, a new Adder is created. Otherwise, if the
67 * slot exists, a CAS is tried. Retries proceed by "double
68 * hashing", using a secondary hash (Marsaglia XorShift) to try to
69 * find a free slot.
70 *
71 * By default, the table is lazily initialized. Upon first use,
72 * the table is set to size 1, and contains a single Adder. The
73 * maximum table size is bounded by nearest power of two >= the
74 * number of CPUS. The table size is capped because, when there
75 * are more threads than CPUs, supposing that each thread were
76 * bound to a CPU, there would exist a perfect hash function
77 * mapping threads to slots that eliminates collisions. When we
78 * reach capacity, we search for this mapping by randomly varying
79 * the hash codes of colliding threads. Because search is random,
80 * and failures only become known via CAS failures, convergence
81 * will be slow, and because threads are typically not bound to
82 * CPUS forever, may not occur at all. However, despite these
83 * limitations, observed contention is typically low in these
84 * cases.
85 *
86 * A single spinlock is used for resizing the table as well as
87 * populating slots with new Adders. After initialization, there
88 * is no need for a blocking lock: Upon lock contention, threads
89 * try other slots rather than blocking. After initialization, at
90 * least one slot exists, so retries will eventually find a
91 * candidate Adder. During these retries, there is increased
92 * contention and reduced locality, which is still better than
93 * alternatives.
94 */
95
96 private static final int NCPU = Runtime.getRuntime().availableProcessors();
97
98 /**
99 * Padded variant of AtomicLong. The value field is placed
100 * between pads, hoping that the JVM doesn't reorder them.
101 * Updates are via inlined CAS in methods add and retryAdd.
102 */
103 static final class Adder {
104 volatile long p0, p1, p2, p3, p4, p5, p6;
105 volatile long value;
106 volatile long q0, q1, q2, q3, q4, q5, q6;
107 Adder(long x) { value = x; }
108 }
109
110 /**
111 * Holder for the thread-local hash code. The code is initially
112 * random, but may be set to a different value upon collisions.
113 */
114 static final class HashCode {
115 static final Random rng = new Random();
116 int code;
117 HashCode() {
118 int h = rng.nextInt(); // Avoid zero, because of xorShift rehash
119 code = (h == 0) ? 1 : h;
120 }
121 }
122
123 /**
124 * The corresponding ThreadLocal class
125 */
126 static final class ThreadHashCode extends ThreadLocal<HashCode> {
127 public HashCode initialValue() { return new HashCode(); }
128 }
129
130 /**
131 * Static per-thread hash codes. Shared across all StripedAdders
132 * to reduce ThreadLocal pollution and because adjustments due to
133 * collisions in one table are likely to be appropriate for
134 * others.
135 */
136 static final ThreadHashCode threadHashCode = new ThreadHashCode();
137
138 /**
139 * Table of adders. When non-null, size is a power of 2.
140 */
141 private transient volatile Adder[] adders;
142
143 /**
144 * Spinlock (locked via CAS) used when resizing and/or creating Adders.
145 */
146 private volatile int busy;
147
148 /**
149 * Creates a new adder with zero sum.
150 */
151 public StripedAdder() {
152 }
153
154 /**
155 * Creates a new adder with zero sum, and with stripes presized
156 * for the given expected contention level.
157 *
158 * @param expectedContention the expected number of threads that
159 * will concurrently update the sum.
160 */
161 public StripedAdder(int expectedContention) {
162 int cap = (expectedContention < NCPU) ? expectedContention : NCPU;
163 int size = 1;
164 while (size < cap)
165 size <<= 1;
166 Adder[] as = new Adder[size];
167 for (int i = 0; i < size; ++i)
168 as[i] = new Adder(0);
169 this.adders = as;
170 }
171
172 /**
173 * Adds the given value.
174 *
175 * @param x the value to add
176 */
177 public void add(long x) {
178 Adder[] as; Adder a; int n; // locals to hold volatile reads
179 HashCode hc = threadHashCode.get();
180 int h = hc.code;
181 boolean contended;
182 if ((as = adders) != null && (n = as.length) > 0 &&
183 (a = as[(n - 1) & h]) != null) {
184 long v = a.value;
185 if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
186 return;
187 contended = true;
188 }
189 else
190 contended = false;
191 retryAdd(x, hc, contended);
192 }
193
194 /**
195 * Handle cases of add involving initialization, resizing,
196 * creating new Adders, and/or contention. See above for
197 * explanation. This method suffers the usual non-modularity
198 * problems of optimistic retry code, relying on rechecked sets of
199 * reads.
200 *
201 * @param x the value to add
202 * @param hc the hash code holder
203 * @param precontended true if CAS failed before call
204 */
205 private void retryAdd(long x, HashCode hc, boolean precontended) {
206 int h = hc.code;
207 boolean collide = false; // true if last slot nonempty
208 for (;;) {
209 Adder[] as; Adder a; int n;
210 if ((as = adders) != null && (n = as.length) > 0) {
211 if ((a = as[(n - 1) & h]) == null) {
212 if (busy == 0) { // Try to attach new Adder
213 Adder r = new Adder(x); // Optimistically create
214 if (busy == 0 &&
215 UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
216 boolean created = false;
217 try { // Recheck under lock
218 Adder[] rs; int m, j;
219 if ((rs = adders) != null &&
220 (m = rs.length) > 0 &&
221 rs[j = (m - 1) & h] == null) {
222 rs[j] = r;
223 created = true;
224 }
225 } finally {
226 busy = 0;
227 }
228 if (created)
229 break;
230 continue; // Slot is now non-empty
231 }
232 }
233 collide = false;
234 }
235 else if (precontended) // CAS already known to fail
236 precontended = false; // Continue after rehash
237 else {
238 long v = a.value;
239 if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
240 break;
241 if (!collide)
242 collide = true;
243 else if (n >= NCPU || adders != as)
244 collide = false; // Can't expand
245 else if (busy == 0 &&
246 UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
247 collide = false;
248 try {
249 if (adders == as) { // Expand table
250 Adder[] rs = new Adder[n << 1];
251 for (int i = 0; i < n; ++i)
252 rs[i] = as[i];
253 adders = rs;
254 }
255 } finally {
256 busy = 0;
257 }
258 continue;
259 }
260 }
261 h ^= h << 13; // Rehash
262 h ^= h >>> 17;
263 h ^= h << 5;
264 }
265 else if (adders == as) { // Try to default-initialize
266 Adder[] rs = new Adder[1];
267 rs[0] = new Adder(x);
268 boolean init = false;
269 while (adders == as) {
270 if (UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
271 try {
272 if (adders == as) {
273 adders = rs;
274 init = true;
275 }
276 } finally {
277 busy = 0;
278 }
279 break;
280 }
281 if (adders != as)
282 break;
283 Thread.yield(); // Back off
284 }
285 if (init)
286 break;
287 }
288 }
289 hc.code = h; // Record index for next time
290 }
291
292 /**
293 * Equivalent to {@code add(1)}.
294 */
295 public void increment() {
296 add(1L);
297 }
298
299 /**
300 * Equivalent to {@code add(-1)}.
301 */
302 public void decrement() {
303 add(-1L);
304 }
305
306 /**
307 * Returns an estimate of the current sum. The result is
308 * calculated by summing multiple variables, so may not be
309 * accurate if updates occur concurrently with this method.
310 *
311 * @return the estimated sum
312 */
313 public long sum() {
314 long sum = 0L;
315 Adder[] as = adders;
316 if (as != null) {
317 int n = as.length;
318 for (int i = 0; i < n; ++i) {
319 Adder a = as[i];
320 if (a != null)
321 sum += a.value;
322 }
323 }
324 return sum;
325 }
326
327 /**
328 * Resets each of the variables to zero, returning the estimated
329 * previous sum. This is effective in fully resetting the sum only
330 * if there are no concurrent updates.
331 *
332 * @return the estimated previous sum
333 */
334 public long reset() {
335 long sum = 0L;
336 Adder[] as = adders;
337 if (as != null) {
338 int n = as.length;
339 for (int i = 0; i < n; ++i) {
340 Adder a = as[i];
341 if (a != null) {
342 sum += a.value;
343 a.value = 0L;
344 }
345 }
346 }
347 return sum;
348 }
349
350 private void writeObject(java.io.ObjectOutputStream s)
351 throws java.io.IOException {
352 s.defaultWriteObject();
353 s.writeLong(sum());
354 }
355
356 private void readObject(ObjectInputStream s)
357 throws IOException, ClassNotFoundException {
358 s.defaultReadObject();
359 busy = 0;
360 add(s.readLong());
361 }
362
363 // Unsafe mechanics
364 private static final sun.misc.Unsafe UNSAFE;
365 private static final long busyOffset;
366 private static final long valueOffset;
367 static {
368 try {
369 UNSAFE = getUnsafe();
370 Class<?> sk = StripedAdder.class;
371 busyOffset = UNSAFE.objectFieldOffset
372 (sk.getDeclaredField("busy"));
373 Class<?> ak = Adder.class;
374 valueOffset = UNSAFE.objectFieldOffset
375 (ak.getDeclaredField("value"));
376 } catch (Exception e) {
377 throw new Error(e);
378 }
379 }
380
381 /**
382 * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
383 * Replace with a simple call to Unsafe.getUnsafe when integrating
384 * into a jdk.
385 *
386 * @return a sun.misc.Unsafe
387 */
388 private static sun.misc.Unsafe getUnsafe() {
389 try {
390 return sun.misc.Unsafe.getUnsafe();
391 } catch (SecurityException se) {
392 try {
393 return java.security.AccessController.doPrivileged
394 (new java.security
395 .PrivilegedExceptionAction<sun.misc.Unsafe>() {
396 public sun.misc.Unsafe run() throws Exception {
397 java.lang.reflect.Field f = sun.misc
398 .Unsafe.class.getDeclaredField("theUnsafe");
399 f.setAccessible(true);
400 return (sun.misc.Unsafe) f.get(null);
401 }});
402 } catch (java.security.PrivilegedActionException e) {
403 throw new RuntimeException("Could not initialize intrinsics",
404 e.getCause());
405 }
406 }
407 }
408
409 }