ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
Revision: 1.4
Committed: Sat Jul 23 16:32:53 2011 UTC (12 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.3: +87 -78 lines
Log Message:
Lazier initialization

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package jsr166e;
8 import java.util.Arrays;
9 import java.util.Random;
10 import java.util.concurrent.atomic.AtomicInteger;
11 import java.util.concurrent.atomic.AtomicLong;
12 import java.io.IOException;
13 import java.io.Serializable;
14 import java.io.ObjectInputStream;
15 import java.io.ObjectOutputStream;
16
17 /**
18 * A set of variables that together maintain a sum. When updates
19 * (method {@link #add}) are contended across threads, this set of
20 * adder variables may grow dynamically to reduce contention. Method
21 * {@link #sum} returns the current combined total across these
22 * adders. This value is <em>NOT</em> an atomic snapshot (concurrent
23 * updates may occur while the sum is being calculated), and so cannot
24 * be used alone for fine-grained synchronization control.
25 *
26 * <p> This class may be applicable when many threads frequently
27 * update a common sum that is used for purposes such as collecting
28 * statistics. In this case, performance may be significantly faster
29 * than using a shared {@link AtomicLong}, at the expense of using
30 * much more space. On the other hand, if it is known that only one
31 * thread can ever update the sum, performance may be significantly
32 * slower than just updating a local variable.
33 *
34 * <p>A StripedAdder may optionally be constructed with a given
35 * expected contention level; i.e., the number of threads that are
36 * expected to concurrently update the sum. Supplying an accurate
37 * value may improve performance by reducing the need for dynamic
38 * adjustment.
39 *
40 * @author Doug Lea
41 */
42 public class StripedAdder implements Serializable {
43 private static final long serialVersionUID = 7249069246863182397L;
44
45 /*
46 * A StripedAdder maintains a table of Atomic long variables. The
47 * table is indexed by per-thread hash codes that are initialized
48 * to random values.
49 *
50 * The table doubles in size upon contention (as indicated by
51 * failed CASes when performing add()), but is capped at the
52 * nearest power of two >= #CPUS. This reflects the idea that,
53 * when there are more threads than CPUs, then if each thread were
54 * bound to a CPU, there would exist a perfect hash function
55 * mapping threads to slots that eliminates collisions. When we
56 * reach capacity, we search for this mapping by randomly varying
57 * the hash codes of colliding threads. Because search is random,
58 * and failures only become known via CAS failures, convergence
59 * will be slow, and because threads are typically not bound to
60 * CPUS forever, may not occur at all. However, despite these
61 * limitations, observed contention is typically low in these
62 * cases.
63 *
64 * Table entries are of class Adder; a form of AtomicLong padded
65 * to reduce cache contention on most processors. Padding is
66 * overkill for most Atomics because they are most often
67 * irregularly scattered in memory and thus don't interfere much
68 * with each other. But Atomic objects residing in arrays will
69 * tend to be placed adjacent to each other, and so will most
70 * often share cache lines without this precaution. Adders are
71 * constructed upon first use, which further improves per-thread
72 * locality and helps reduce (an already large) footprint.
73 *
74 * A single spinlock is used for resizing the table as well as
75 * populating slots with new Adders. Upon lock contention, threads
76 * try other slots rather than blocking. After initialization, at
77 * least one slot exists, so retries will eventually find a
78 * candidate Adder. During these retries, there is increased
79 * contention and reduced locality, which is still better than
80 * alternatives.
81 */
82
83 /**
84 * Number of processors, to place a cap on table growth.
85 */
86 static final int NCPU = Runtime.getRuntime().availableProcessors();
87
88 /**
89 * Padded version of AtomicLong
90 */
91 static final class Adder extends AtomicLong {
92 long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd;
93 Adder(long x) { super(x); }
94 }
95
96 /**
97 * Holder for the thread-local hash code. The code starts off with
98 * a given random value, but may be set to a different value upon
99 * collisions in retryAdd.
100 */
101 static final class HashCode {
102 int code;
103 HashCode(int h) { code = h; }
104 }
105
106 /**
107 * The corresponding ThreadLocal class
108 */
109 static final class ThreadHashCode extends ThreadLocal<HashCode> {
110 static final Random rng = new Random();
111 public HashCode initialValue() {
112 int h = rng.nextInt();
113 return new HashCode((h == 0) ? 1 : h); // ensure nonzero
114 }
115 }
116
117 /**
118 * Static per-thread hash codes. Shared across all StripedAdders
119 * because adjustments due to collisions in one table are likely
120 * to be appropriate for others.
121 */
122 static final ThreadHashCode threadHashCode = new ThreadHashCode();
123
124 /**
125 * Table of adders. Minimum size 2. Size grows to be at most NCPU.
126 */
127 private transient volatile Adder[] adders;
128
129 /**
130 * Serves as a lock when resizing and/or creating Adders. There
131 * is no need for a blocking lock: When busy, other threads try
132 * other slots.
133 */
134 private final AtomicInteger mutex;
135
136 /**
137 * Creates a new adder with zero sum.
138 */
139 public StripedAdder() {
140 this.mutex = new AtomicInteger();
141 // remaining initialization on first call to add.
142 }
143
144 /**
145 * Creates a new adder with zero sum, and with stripes presized
146 * for the given expected contention level.
147 *
148 * @param expectedContention the expected number of threads that
149 * will concurrently update the sum.
150 */
151 public StripedAdder(int expectedContention) {
152 int cap = (expectedContention < NCPU) ? expectedContention : NCPU;
153 int size = 2;
154 while (size < cap)
155 size <<= 1;
156 Adder[] as = new Adder[size];
157 for (int i = 0; i < size; ++i)
158 as[i] = new Adder(0);
159 this.adders = as;
160 this.mutex = new AtomicInteger();
161 }
162
163 /**
164 * Adds the given value.
165 *
166 * @param x the value to add
167 */
168 public void add(long x) {
169 Adder[] as; Adder a; int n; long v; // locals to hold volatile reads
170 HashCode hc = threadHashCode.get();
171 if ((as = adders) == null || (n = as.length) < 1 ||
172 (a = as[hc.code & (n - 1)]) == null ||
173 !a.compareAndSet(v = a.get(), v + x))
174 retryAdd(x, hc);
175 }
176
177 /**
178 * Handle cases of add involving initialization, resizing,
179 * creating new Adders, and/or contention.
180 */
181 private void retryAdd(long x, HashCode hc) {
182 int h = hc.code;
183 final AtomicInteger mutex = this.mutex;
184 AtomicInteger lock = null; // nonnull when held
185 try {
186 for (;;) {
187 Adder[] as; Adder a; long v; int n, k; // locals for volatiles
188 boolean needLock = true;
189 if ((as = adders) == null || (n = as.length) < 1) {
190 if (lock != null) // default-initialize
191 adders = new Adder[2];
192 }
193 else if ((a = as[k = h & (n - 1)]) == null) {
194 if (lock != null) { // attach new adder
195 as[k] = new Adder(x);
196 break;
197 }
198 }
199 else if (a.compareAndSet(v = a.get(), v + x))
200 break;
201 else if (n >= NCPU) // cannot expand
202 needLock = false;
203 else if (lock != null) // expand table
204 adders = Arrays.copyOf(as, n << 1);
205
206 if (lock == null) {
207 if (needLock && mutex.get() == 0 &&
208 mutex.compareAndSet(0, 1))
209 lock = mutex;
210 else { // try elsewhere
211 h ^= h << 13; // Marsaglia XorShift
212 h ^= h >>> 17;
213 h ^= h << 5;
214 }
215 }
216 }
217 } finally {
218 if (lock != null)
219 lock.set(0);
220 }
221 if (hc.code != h) // avoid unneeded writes
222 hc.code = h;
223 }
224
225 /**
226 * Returns an estimate of the current sum. The result is
227 * calculated by summing multiple variables, so may not be
228 * accurate if updates occur concurrently with this method.
229 *
230 * @return the estimated sum
231 */
232 public long sum() {
233 long sum = 0L;
234 Adder[] as = adders;
235 if (as != null) {
236 int n = as.length;
237 for (int i = 0; i < n; ++i) {
238 Adder a = as[i];
239 if (a != null)
240 sum += a.get();
241 }
242 }
243 return sum;
244 }
245
246 /**
247 * Resets each of the variables to zero. This is effective in
248 * fully resetting the sum only if there are no concurrent
249 * updates.
250 */
251 public void reset() {
252 Adder[] as = adders;
253 if (as != null) {
254 int n = as.length;
255 for (int i = 0; i < n; ++i) {
256 Adder a = as[i];
257 if (a != null)
258 a.set(0L);
259 }
260 }
261 }
262
263 /**
264 * Equivalent to {@code add(1)}.
265 */
266 public void increment() {
267 add(1L);
268 }
269
270 /**
271 * Equivalent to {@code add(-1)}.
272 */
273 public void decrement() {
274 add(-1L);
275 }
276
277 /**
278 * Equivalent to {@link #sum} followed by {@link #reset}.
279 *
280 * @return the estimated sum
281 */
282 public long sumAndReset() {
283 long sum = 0L;
284 Adder[] as = adders;
285 if (as != null) {
286 int n = as.length;
287 for (int i = 0; i < n; ++i) {
288 Adder a = as[i];
289 if (a != null) {
290 sum += a.get();
291 a.set(0L);
292 }
293 }
294 }
295 return sum;
296 }
297
298 private void writeObject(java.io.ObjectOutputStream s)
299 throws java.io.IOException {
300 s.defaultWriteObject();
301 s.writeLong(sum());
302 }
303
304 private void readObject(ObjectInputStream s)
305 throws IOException, ClassNotFoundException {
306 s.defaultReadObject();
307 mutex.set(0);
308 add(s.readLong());
309 }
310
311 }