ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
Revision: 1.8
Committed: Thu Jul 28 15:05:55 2011 UTC (12 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.7: +177 -120 lines
Log Message:
Footprint and performance improvements

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package jsr166e;
8 import java.util.Random;
9 import java.util.concurrent.atomic.AtomicInteger;
10 import java.util.concurrent.atomic.AtomicLong;
11 import java.io.IOException;
12 import java.io.Serializable;
13 import java.io.ObjectInputStream;
14 import java.io.ObjectOutputStream;
15
16 /**
17 * A set of variables that together maintain a sum. When updates
18 * (method {@link #add}) are contended across threads, this set of
19 * adder variables may grow dynamically to reduce contention. Method
20 * {@link #sum} returns the current combined total across these
21 * adders. This value is <em>NOT</em> an atomic snapshot (concurrent
22 * updates may occur while the sum is being calculated), and so cannot
23 * be used alone for fine-grained synchronization control.
24 *
25 * <p> This class may be applicable when many threads frequently
26 * update a common sum that is used for purposes such as collecting
27 * statistics. In this case, performance may be significantly faster
28 * than using a shared {@link AtomicLong}, at the expense of using
29 * much more space. On the other hand, if it is known that only one
30 * thread can ever update the sum, performance may be significantly
31 * slower than just updating a local variable.
32 *
33 * <p>A StripedAdder may optionally be constructed with a given
34 * expected contention level; i.e., the number of threads that are
35 * expected to concurrently update the sum. Supplying an accurate
36 * value may improve performance by reducing the need for dynamic
37 * adjustment.
38 *
39 * @author Doug Lea
40 */
41 public class StripedAdder implements Serializable {
42 private static final long serialVersionUID = 7249069246863182397L;
43
44 /*
45 * A StripedAdder maintains a table of Atomic long variables. The
46 * table is indexed by per-thread hash codes.
47 *
48 * Table entries are of class Adder; a variant of AtomicLong
49 * padded to reduce cache contention on most processors. Padding
50 * is overkill for most Atomics because they are usually
51 * irregularly scattered in memory and thus don't interfere much
52 * with each other. But Atomic objects residing in arrays will
53 * tend to be placed adjacent to each other, and so will most
54 * often share cache lines (with a huge negative performance
55 * impact) without this precaution.
56 *
57 * Because Adders are relatively large, we avoid creating them
58 * until they are needed. On the other hand, we try to create them
59 * on any sign of contention.
60 *
61 * Per-thread hash codes are initialized to random values.
62 * Collisions are indicated by failed CASes when performing an add
63 * operation (see method retryAdd). Upon a collision, if the table
64 * size is less than the capacity, it is doubled in size unless
65 * some other thread holds lock. If a hashed slot is empty, and
66 * lock is available, a new Adder is created. Otherwise, if the
67 * slot exists, a CAS is tried. Retries proceed by "double
68 * hashing", using a secondary hash (Marsaglia XorShift) to try to
69 * find a free slot.
70 *
71 * By default, the table is lazily initialized. Upon first use,
72 * the table is set to size 2 (the minimum non-empty size), but
73 * containing only a single Adder. The maximum table size is
74 * bounded by nearest power of two >= the number of CPUS. The
75 * table size is capped because, when there are more threads than
76 * CPUs, supposing that each thread were bound to a CPU, there
77 * would exist a perfect hash function mapping threads to slots
78 * that eliminates collisions. When we reach capacity, we search
79 * for this mapping by randomly varying the hash codes of
80 * colliding threads. Because search is random, and failures only
81 * become known via CAS failures, convergence will be slow, and
82 * because threads are typically not bound to CPUS forever, may
83 * not occur at all. However, despite these limitations, observed
84 * contention is typically low in these cases.
85 *
86 * A single spinlock is used for resizing the table as well as
87 * populating slots with new Adders. After initialization, there
88 * is no need for a blocking lock: Upon lock contention, threads
89 * try other slots rather than blocking. After initialization, at
90 * least one slot exists, so retries will eventually find a
91 * candidate Adder. During these retries, there is increased
92 * contention and reduced locality, which is still better than
93 * alternatives.
94 */
95
96 private static final int NCPU = Runtime.getRuntime().availableProcessors();
97
98 /**
99 * Padded variant of AtomicLong. The value field is placed
100 * between pads, hoping that the JVM doesn't reorder them.
101 * Updates are via inlined CAS in methods add and retryAdd.
102 */
103 static final class Adder {
104 volatile long p0, p1, p2, p3, p4, p5, p6;
105 volatile long value;
106 volatile long q0, q1, q2, q3, q4, q5, q6;
107 Adder(long x) { value = x; }
108 }
109
110 /**
111 * Holder for the thread-local hash code. The code is initially
112 * random, but may be set to a different value upon collisions.
113 */
114 static final class HashCode {
115 static final Random rng = new Random();
116 int code;
117 HashCode() {
118 int h = rng.nextInt(); // Avoid zero, because of xorShift rehash
119 code = (h == 0) ? 1 : h;
120 }
121 }
122
123 /**
124 * The corresponding ThreadLocal class
125 */
126 static final class ThreadHashCode extends ThreadLocal<HashCode> {
127 public HashCode initialValue() { return new HashCode(); }
128 }
129
130 /**
131 * Static per-thread hash codes. Shared across all StripedAdders
132 * to reduce ThreadLocal pollution and because adjustments due to
133 * collisions in one table are likely to be appropriate for
134 * others.
135 */
136 static final ThreadHashCode threadHashCode = new ThreadHashCode();
137
138 /**
139 * Table of adders. When non-null, size is a power of 2, at least 2.
140 */
141 private transient volatile Adder[] adders;
142
143 /**
144 * Spinlock (locked via CAS) used when resizing and/or creating Adders.
145 */
146 private volatile int busy;
147
148 /**
149 * Creates a new adder with zero sum.
150 */
151 public StripedAdder() {
152 }
153
154 /**
155 * Creates a new adder with zero sum, and with stripes presized
156 * for the given expected contention level.
157 *
158 * @param expectedContention the expected number of threads that
159 * will concurrently update the sum.
160 */
161 public StripedAdder(int expectedContention) {
162 int cap = (expectedContention < NCPU) ? expectedContention : NCPU;
163 int size = 2;
164 while (size < cap)
165 size <<= 1;
166 Adder[] as = new Adder[size];
167 for (int i = 0; i < size; ++i)
168 as[i] = new Adder(0);
169 this.adders = as;
170 }
171
172 /**
173 * Adds the given value.
174 *
175 * @param x the value to add
176 */
177 public void add(long x) {
178 Adder[] as; Adder a; int n; // locals to hold volatile reads
179 HashCode hc = threadHashCode.get();
180 int h = hc.code;
181 boolean collide;
182 if ((as = adders) != null && (n = as.length) > 0 &&
183 (a = as[(n - 1) & h]) != null) {
184 long v = a.value;
185 if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
186 return;
187 collide = true;
188 }
189 else
190 collide = false;
191 retryAdd(x, hc, collide);
192 }
193
194 /**
195 * Handle cases of add involving initialization, resizing,
196 * creating new Adders, and/or contention. See above for
197 * explanation. This method suffers the usual non-modularity
198 * problems of optimistic retry code, relying on rechecked sets of
199 * reads.
200 */
201 private void retryAdd(long x, HashCode hc, boolean collide) {
202 int h = hc.code;
203 for (;;) {
204 Adder[] as; Adder a; int n;
205 if ((as = adders) != null && (n = as.length) > 0) {
206 if ((a = as[(n - 1) & h]) != null) {
207 boolean shared = true; // Slot exists
208 if (collide && n < NCPU && busy == 0 &&
209 UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
210 try {
211 if (adders == as) { // Expand table
212 Adder[] rs = new Adder[n << 1];
213 for (int i = 0; i < n; ++i)
214 rs[i] = as[i];
215 adders = rs;
216 shared = false;
217 }
218 } finally {
219 busy = 0;
220 }
221 if (shared || (h & n) != 0) {
222 collide = false;
223 continue; // Array or index changed
224 }
225 }
226 long v = a.value;
227 if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
228 break;
229 collide = shared;
230 }
231 else { // Try to attach new Adder
232 if (busy == 0 &&
233 UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
234 boolean created = false;
235 try { // Recheck under lock
236 Adder[] rs; int m, j;
237 if ((rs = adders) != null && (m = rs.length) > 0 &&
238 rs[j = (m - 1) & h] == null) {
239 rs[j] = new Adder(x);
240 created = true;
241 }
242 } finally {
243 busy = 0;
244 }
245 if (created)
246 break;
247 continue; // Slot is now non-empty
248 }
249 collide = false;
250 }
251 h ^= h << 13; // Rehash
252 h ^= h >>> 17;
253 h ^= h << 5;
254 }
255 else if (busy == 0) { // Default-initialize
256 Adder r = new Adder(x);
257 Adder[] rs = new Adder[2];
258 rs[h & 1] = r;
259 if (adders == as && busy == 0 &&
260 UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
261 boolean init = false;
262 try {
263 if (adders == as) {
264 adders = rs;
265 init = true;
266 }
267 } finally {
268 busy = 0;
269 }
270 if (init)
271 break;
272 }
273 }
274 else if (adders == as) // Lost initialization race
275 Thread.yield();
276 }
277 hc.code = h; // Record index for next time
278 }
279
280 /**
281 * Returns an estimate of the current sum. The result is
282 * calculated by summing multiple variables, so may not be
283 * accurate if updates occur concurrently with this method.
284 *
285 * @return the estimated sum
286 */
287 public long sum() {
288 long sum = 0L;
289 Adder[] as = adders;
290 if (as != null) {
291 int n = as.length;
292 for (int i = 0; i < n; ++i) {
293 Adder a = as[i];
294 if (a != null)
295 sum += a.value;
296 }
297 }
298 return sum;
299 }
300
301 /**
302 * Resets each of the variables to zero. This is effective in
303 * fully resetting the sum only if there are no concurrent
304 * updates.
305 */
306 public void reset() {
307 Adder[] as = adders;
308 if (as != null) {
309 int n = as.length;
310 for (int i = 0; i < n; ++i) {
311 Adder a = as[i];
312 if (a != null)
313 a.value = 0L;
314 }
315 }
316 }
317
318 /**
319 * Equivalent to {@code add(1)}.
320 */
321 public void increment() {
322 add(1L);
323 }
324
325 /**
326 * Equivalent to {@code add(-1)}.
327 */
328 public void decrement() {
329 add(-1L);
330 }
331
332 /**
333 * Equivalent to {@link #sum} followed by {@link #reset}.
334 *
335 * @return the estimated sum
336 */
337 public long sumAndReset() {
338 long sum = 0L;
339 Adder[] as = adders;
340 if (as != null) {
341 int n = as.length;
342 for (int i = 0; i < n; ++i) {
343 Adder a = as[i];
344 if (a != null) {
345 sum += a.value;
346 a.value = 0L;
347 }
348 }
349 }
350 return sum;
351 }
352
353 private void writeObject(java.io.ObjectOutputStream s)
354 throws java.io.IOException {
355 s.defaultWriteObject();
356 s.writeLong(sum());
357 }
358
359 private void readObject(ObjectInputStream s)
360 throws IOException, ClassNotFoundException {
361 s.defaultReadObject();
362 busy = 0;
363 add(s.readLong());
364 }
365
366 // Unsafe mechanics
367 private static final sun.misc.Unsafe UNSAFE;
368 private static final long busyOffset;
369 private static final long valueOffset;
370 static {
371 try {
372 UNSAFE = getUnsafe();
373 Class<?> sk = StripedAdder.class;
374 busyOffset = UNSAFE.objectFieldOffset
375 (sk.getDeclaredField("busy"));
376 Class<?> ak = Adder.class;
377 valueOffset = UNSAFE.objectFieldOffset
378 (ak.getDeclaredField("value"));
379 } catch (Exception e) {
380 throw new Error(e);
381 }
382 }
383
384 /**
385 * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
386 * Replace with a simple call to Unsafe.getUnsafe when integrating
387 * into a jdk.
388 *
389 * @return a sun.misc.Unsafe
390 */
391 private static sun.misc.Unsafe getUnsafe() {
392 try {
393 return sun.misc.Unsafe.getUnsafe();
394 } catch (SecurityException se) {
395 try {
396 return java.security.AccessController.doPrivileged
397 (new java.security
398 .PrivilegedExceptionAction<sun.misc.Unsafe>() {
399 public sun.misc.Unsafe run() throws Exception {
400 java.lang.reflect.Field f = sun.misc
401 .Unsafe.class.getDeclaredField("theUnsafe");
402 f.setAccessible(true);
403 return (sun.misc.Unsafe) f.get(null);
404 }});
405 } catch (java.security.PrivilegedActionException e) {
406 throw new RuntimeException("Could not initialize intrinsics",
407 e.getCause());
408 }
409 }
410 }
411
412 }