ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
Revision: 1.13
Committed: Sun Jul 31 14:20:01 2011 UTC (12 years, 9 months ago) by dl
Branch: MAIN
CVS Tags: HEAD
Changes since 1.12: +0 -0 lines
State: FILE REMOVED
Log Message:
Rename classes

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package jsr166e;
8 import java.util.Random;
9 import java.util.concurrent.atomic.AtomicInteger;
10 import java.util.concurrent.atomic.AtomicLong;
11 import java.io.IOException;
12 import java.io.Serializable;
13 import java.io.ObjectInputStream;
14 import java.io.ObjectOutputStream;
15
16 /**
17 * One or more variables that together maintain an initially zero sum.
18 * When updates (method {@link #add}) are contended across threads,
19 * the set of variables may grow dynamically to reduce contention.
20 *
21 * <p> This class is usually preferable to {@link AtomicLong} when
22 * multiple threads update a common sum that is used for purposes such
23 * as collecting statistics, not for fine-grained synchronization
24 * control. Under high update contention, throughput of this class is
25 * expected to be significantly higher, at the expense of higher space
26 * consumption. Under low contention, this class imposes very little
27 * time and space overhead compared to AtomicLong. On the other hand,
28 * in contexts where it is statically known that only one thread can
29 * ever update a sum, time and space overhead is noticeably greater
30 * than just updating a local variable.
31 *
32 * <p> Method {@link #sum} returns the current combined total across
33 * the variables maintaining the sum. This value is <em>NOT</em> an
34 * atomic snapshot: Concurrent updates may occur while the sum is
35 * being calculated. However, updates cannot be "lost", so invocation
36 * of <code>sum</code> in the absence of concurrent updates always
37 * returns an accurate result. The sum may also be <code>reset</code>
38 * to zero, as an alternative to creating a new adder. However,
39 * method {@link #reset} is intrinsically racy, so should only be used
40 * when it is known that no threads are concurrently updating the sum.
41 *
42 * <p><em>jsr166e note: This class is targeted to be placed in
43 * java.util.concurrent.atomic<em>
44 *
45 * @author Doug Lea
46 */
47 public class StripedAdder implements Serializable {
48 private static final long serialVersionUID = 7249069246863182397L;
49
50 /*
51 * A StripedAdder maintains a lazily-initialized table of
52 * atomically updated variables, plus an extra "base" field. The
53 * table size is a power of two. Indexing uses masked per-thread
54 * hash codes
55 *
56 * Table entries are of class Cell; a variant of AtomicLong padded
57 * to reduce cache contention on most processors. Padding is
58 * overkill for most Atomics because they are usually irregularly
59 * scattered in memory and thus don't interfere much with each
60 * other. But Atomic objects residing in arrays will tend to be
61 * placed adjacent to each other, and so will most often share
62 * cache lines (with a huge negative performance impact) without
63 * this precaution.
64 *
65 * In part because Cells are relatively large, we avoid creating
66 * them until they are needed. When there is no contention, all
67 * updates are made to the base field. Upon first contention (a
68 * failed CAS on base update), the table is initialized to size 2.
69 * The table size is doubled upon further contention until
70 * reaching the nearest power of two greater than or equal to the
71 * number of CPUS.
72 *
73 * Per-thread hash codes are initialized to random values.
74 * Contention and/or table collisions are indicated by failed
75 * CASes when performing an add operation (see method
76 * retryAdd). Upon a collision, if the table size is less than the
77 * capacity, it is doubled in size unless some other thread holds
78 * the lock. If a hashed slot is empty, and lock is available, a
79 * new Cell is created. Otherwise, if the slot exists, a CAS is
80 * tried. Retries proceed by "double hashing", using a secondary
81 * hash (Marsaglia XorShift) to try to find a free slot.
82 *
83 * The table size is capped because, when there are more threads
84 * than CPUs, supposing that each thread were bound to a CPU,
85 * there would exist a perfect hash function mapping threads to
86 * slots that eliminates collisions. When we reach capacity, we
87 * search for this mapping by randomly varying the hash codes of
88 * colliding threads. Because search is random, and collisions
89 * only become known via CAS failures, convergence can be slow,
90 * and because threads are typically not bound to CPUS forever,
91 * may not occur at all. However, despite these limitations,
92 * observed contention rates are typically low in these cases.
93 *
94 * A single spinlock is used for initializing and resizing the
95 * table, as well as populating slots with new Cells. There is no
96 * need for a blocking lock: Upon lock contention, threads try
97 * other slots (or the base) rather than blocking. During these
98 * retries, there is increased contention and reduced locality,
99 * which is still better than alternatives.
100 *
101 * It is possible for a Cell to become unused when threads that
102 * once hashed to it terminate, as well as in the case where
103 * doubling the table causes no thread to hash to it under
104 * expanded mask. We do not try to detect or remove such cells,
105 * under the assumption that for long-running adders, observed
106 * contention levels will recur, so the cells will eventually be
107 * needed again; and for short-lived ones, it does not matter.
108 *
109 */
110
111 private static final int NCPU = Runtime.getRuntime().availableProcessors();
112
113 /**
114 * Padded variant of AtomicLong. The value field is placed
115 * between pads, hoping that the JVM doesn't reorder them.
116 * Updates are via inlined CAS in methods add and retryAdd.
117 */
118 static final class Cell {
119 volatile long p0, p1, p2, p3, p4, p5, p6;
120 volatile long value;
121 volatile long q0, q1, q2, q3, q4, q5, q6;
122 Cell(long x) { value = x; }
123 }
124
125 /**
126 * Holder for the thread-local hash code. The code is initially
127 * random, but may be set to a different value upon collisions.
128 */
129 static final class HashCode {
130 static final Random rng = new Random();
131 int code;
132 HashCode() {
133 int h = rng.nextInt(); // Avoid zero to allow xorShift rehash
134 code = (h == 0) ? 1 : h;
135 }
136 }
137
138 /**
139 * The corresponding ThreadLocal class
140 */
141 static final class ThreadHashCode extends ThreadLocal<HashCode> {
142 public HashCode initialValue() { return new HashCode(); }
143 }
144
145 /**
146 * Static per-thread hash codes. Shared across all StripedAdders
147 * to reduce ThreadLocal pollution and because adjustments due to
148 * collisions in one table are likely to be appropriate for
149 * others.
150 */
151 static final ThreadHashCode threadHashCode = new ThreadHashCode();
152
153 /**
154 * Table of cells. When non-null, size is a power of 2.
155 */
156 private transient volatile Cell[] cells;
157
158 /**
159 * Base sum, used mainly when there is no contention, but also as
160 * a fallback during table initializion races. Updated via CAS.
161 */
162 private transient volatile long base;
163
164 /**
165 * Spinlock (locked via CAS) used when resizing and/or creating Cells.
166 */
167 private transient volatile int busy;
168
169 /**
170 * Creates a new adder with initial sum of zero.
171 */
172 public StripedAdder() {
173 }
174
175 /**
176 * Adds the given value.
177 *
178 * @param x the value to add
179 */
180 public void add(long x) {
181 Cell[] as; long v; HashCode hc; Cell a; int n; boolean contended;
182 if ((as = cells) != null ||
183 !UNSAFE.compareAndSwapLong(this, baseOffset, v = base, v + x)) {
184 int h = (hc = threadHashCode.get()).code;
185 if (as != null && (n = as.length) > 0 &&
186 (a = as[(n - 1) & h]) != null) {
187 if (UNSAFE.compareAndSwapLong(a, valueOffset,
188 v = a.value, v + x))
189 return;
190 contended = true;
191 }
192 else
193 contended = false;
194 retryAdd(x, hc, contended);
195 }
196 }
197
198 /**
199 * Handle cases of add involving initialization, resizing,
200 * creating new Cells, and/or contention. See above for
201 * explanation. This method suffers the usual non-modularity
202 * problems of optimistic retry code, relying on rechecked sets of
203 * reads.
204 *
205 * @param x the value to add
206 * @param hc the hash code holder
207 * @param precontended true if CAS failed before call
208 */
209 private void retryAdd(long x, HashCode hc, boolean precontended) {
210 int h = hc.code;
211 boolean collide = false; // true if last slot nonempty
212 for (;;) {
213 Cell[] as; Cell a; int n;
214 if ((as = cells) != null && (n = as.length) > 0) {
215 if ((a = as[(n - 1) & h]) == null) {
216 if (busy == 0) { // Try to attach new Cell
217 Cell r = new Cell(x); // Optimistically create
218 if (busy == 0 &&
219 UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
220 boolean created = false;
221 try { // Recheck under lock
222 Cell[] rs; int m, j;
223 if ((rs = cells) != null &&
224 (m = rs.length) > 0 &&
225 rs[j = (m - 1) & h] == null) {
226 rs[j] = r;
227 created = true;
228 }
229 } finally {
230 busy = 0;
231 }
232 if (created)
233 break;
234 continue; // Slot is now non-empty
235 }
236 }
237 collide = false;
238 }
239 else if (precontended) // CAS already known to fail
240 precontended = false; // Continue after rehash
241 else {
242 long v = a.value;
243 if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
244 break;
245 if (!collide)
246 collide = true;
247 else if (n >= NCPU || cells != as)
248 collide = false; // Can't expand
249 else if (busy == 0 &&
250 UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
251 collide = false;
252 try {
253 if (cells == as) { // Expand table
254 Cell[] rs = new Cell[n << 1];
255 for (int i = 0; i < n; ++i)
256 rs[i] = as[i];
257 cells = rs;
258 }
259 } finally {
260 busy = 0;
261 }
262 continue;
263 }
264 }
265 h ^= h << 13; // Rehash
266 h ^= h >>> 17;
267 h ^= h << 5;
268 }
269 else if (busy == 0 && cells == as &&
270 UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
271 boolean init = false;
272 try { // Initialize
273 if (cells == as) {
274 Cell r = new Cell(x);
275 Cell[] rs = new Cell[2];
276 rs[h & 1] = r;
277 cells = rs;
278 init = true;
279 }
280 } finally {
281 busy = 0;
282 }
283 if (init)
284 break;
285 }
286 else { // Lost initialization race
287 long b = base; // Fall back on using base
288 if (UNSAFE.compareAndSwapLong(this, baseOffset, b, b + x))
289 break;
290 }
291 }
292 hc.code = h; // Record index for next time
293 }
294
295 /**
296 * Equivalent to {@code add(1)}.
297 */
298 public void increment() {
299 add(1L);
300 }
301
302 /**
303 * Equivalent to {@code add(-1)}.
304 */
305 public void decrement() {
306 add(-1L);
307 }
308
309 /**
310 * Returns the current sum. The result is only guaranteed to be
311 * accurate in the absence of concurrent updates. Otherwise, it
312 * may fail to reflect one or more updates occuring while
313 * calculating the result.
314 *
315 * @return the sum
316 */
317 public long sum() {
318 Cell[] as = cells;
319 long sum = base;
320 if (as != null) {
321 int n = as.length;
322 for (int i = 0; i < n; ++i) {
323 Cell a = as[i];
324 if (a != null)
325 sum += a.value;
326 }
327 }
328 return sum;
329 }
330
331 /**
332 * Resets variables maintaining the sum to zero. This is
333 * effective in setting the sum to zero only if there are no
334 * concurrent updates.
335 */
336 public void reset() {
337 Cell[] as = cells;
338 base = 0L;
339 if (as != null) {
340 int n = as.length;
341 for (int i = 0; i < n; ++i) {
342 Cell a = as[i];
343 if (a != null)
344 a.value = 0L;
345 }
346 }
347 }
348
349 /**
350 * Equivalent in effect to {@link #sum} followed by {@link
351 * #reset}. This method may apply for example during quiescent
352 * points between multithreaded computations. If there are
353 * updates concurrent with this method, the returned value is
354 * <em>not</em> guaranteed to be the final sum occurring before
355 * the reset.
356 *
357 * @return the sum
358 */
359 public long sumThenReset() {
360 Cell[] as = cells;
361 long sum = base;
362 base = 0L;
363 if (as != null) {
364 int n = as.length;
365 for (int i = 0; i < n; ++i) {
366 Cell a = as[i];
367 if (a != null) {
368 sum += a.value;
369 a.value = 0L;
370 }
371 }
372 }
373 return sum;
374 }
375
376 private void writeObject(java.io.ObjectOutputStream s)
377 throws java.io.IOException {
378 s.defaultWriteObject();
379 s.writeLong(sum());
380 }
381
382 private void readObject(ObjectInputStream s)
383 throws IOException, ClassNotFoundException {
384 s.defaultReadObject();
385 busy = 0;
386 cells = null;
387 base = s.readLong();
388 }
389
390 // Unsafe mechanics
391 private static final sun.misc.Unsafe UNSAFE;
392 private static final long baseOffset;
393 private static final long busyOffset;
394 private static final long valueOffset;
395 static {
396 try {
397 UNSAFE = getUnsafe();
398 Class<?> sk = StripedAdder.class;
399 baseOffset = UNSAFE.objectFieldOffset
400 (sk.getDeclaredField("base"));
401 busyOffset = UNSAFE.objectFieldOffset
402 (sk.getDeclaredField("busy"));
403 Class<?> ak = Cell.class;
404 valueOffset = UNSAFE.objectFieldOffset
405 (ak.getDeclaredField("value"));
406 } catch (Exception e) {
407 throw new Error(e);
408 }
409 }
410
411 /**
412 * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
413 * Replace with a simple call to Unsafe.getUnsafe when integrating
414 * into a jdk.
415 *
416 * @return a sun.misc.Unsafe
417 */
418 private static sun.misc.Unsafe getUnsafe() {
419 try {
420 return sun.misc.Unsafe.getUnsafe();
421 } catch (SecurityException se) {
422 try {
423 return java.security.AccessController.doPrivileged
424 (new java.security
425 .PrivilegedExceptionAction<sun.misc.Unsafe>() {
426 public sun.misc.Unsafe run() throws Exception {
427 java.lang.reflect.Field f = sun.misc
428 .Unsafe.class.getDeclaredField("theUnsafe");
429 f.setAccessible(true);
430 return (sun.misc.Unsafe) f.get(null);
431 }});
432 } catch (java.security.PrivilegedActionException e) {
433 throw new RuntimeException("Could not initialize intrinsics",
434 e.getCause());
435 }
436 }
437 }
438
439 }