1 |
/* |
2 |
* Written by Doug Lea with assistance from members of JCP JSR-166 |
3 |
* Expert Group and released to the public domain, as explained at |
4 |
* http://creativecommons.org/publicdomain/zero/1.0/ |
5 |
*/ |
6 |
|
7 |
package jsr166e; |
8 |
import java.util.Arrays; |
9 |
import java.util.Random; |
10 |
import java.util.concurrent.atomic.AtomicInteger; |
11 |
import java.util.concurrent.atomic.AtomicLong; |
12 |
import java.io.IOException; |
13 |
import java.io.Serializable; |
14 |
import java.io.ObjectInputStream; |
15 |
import java.io.ObjectOutputStream; |
16 |
|
17 |
/** |
18 |
* A set of variables that together maintain a sum. When updates |
19 |
* (method {@link #add}) are contended across threads, the set of |
20 |
* adders may grow to reduce contention. Method {@link #sum} returns |
21 |
* the current combined total across these adders. This value is |
22 |
* <em>NOT</em> an atomic snapshot (concurrent updates may occur while |
23 |
* the sum is being calculated), and so cannot be used alone for |
24 |
* fine-grained synchronization control. |
25 |
* |
26 |
* <p> This class may be applicable when many threads frequently |
27 |
* update a common sum that is used for purposes such as collecting |
28 |
* statistics. In this case, performance may be significantly faster |
29 |
* than using a shared {@link AtomicLong}, at the expense of using |
30 |
* significantly more space. On the other hand, if it is known that |
31 |
* only one thread can ever update the sum, performance may be |
32 |
* significantly slower than just updating a local variable. |
33 |
* |
34 |
* @author Doug Lea |
35 |
*/ |
36 |
public class StripedAdder implements Serializable { |
37 |
private static final long serialVersionUID = 7249069246863182397L; |
38 |
|
39 |
/* |
40 |
* Overview: We maintain a table of AtomicLongs (padded to reduce |
41 |
* false sharing). The table is indexed by per-thread hash codes |
42 |
* that are initialized as random values. The table doubles in |
43 |
* size upon contention (as indicated by failed CASes when |
44 |
* performing add()), but is capped at the nearest power of two >= |
45 |
* #cpus: At that point, contention should be infrequent if each |
46 |
* thread has a unique index; so we instead adjust hash codes to |
47 |
* new random values upon contention rather than expanding. A |
48 |
* single spinlock is used for resizing the table as well as |
49 |
* populating slots with new Adders. Upon lock contention, threads |
50 |
* just try other slots rather than blocking. We guarantee that at |
51 |
* least one slot exists, so retries will eventually find a |
52 |
* candidate Adder. |
53 |
*/ |
54 |
|
55 |
/** |
56 |
* Number of processors, to place a cap on table growth. |
57 |
*/ |
58 |
static final int NCPU = Runtime.getRuntime().availableProcessors(); |
59 |
|
60 |
/** |
61 |
* Version of AtomicLong padded to avoid sharing cache |
62 |
* lines on most processors |
63 |
*/ |
64 |
static final class Adder extends AtomicLong { |
65 |
long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd; |
66 |
Adder(long x) { super(x); } |
67 |
} |
68 |
|
69 |
/** |
70 |
* Holder for the thread-local hash code. |
71 |
*/ |
72 |
static final class HashCode { |
73 |
int code; |
74 |
HashCode(int h) { code = h; } |
75 |
} |
76 |
|
77 |
/** |
78 |
* The corresponding ThreadLocal class |
79 |
*/ |
80 |
static final class ThreadHashCode extends ThreadLocal<HashCode> { |
81 |
static final Random rng = new Random(); |
82 |
public HashCode initialValue() { |
83 |
int h = rng.nextInt(); |
84 |
return new HashCode((h == 0) ? 1 : h); // ensure nonzero |
85 |
} |
86 |
} |
87 |
|
88 |
/** |
89 |
* Static per-thread hash codes. Shared across all StripedAdders |
90 |
* because adjustments due to collisions in one table are likely |
91 |
* to be appropriate for others. |
92 |
*/ |
93 |
static final ThreadHashCode threadHashCode = new ThreadHashCode(); |
94 |
|
95 |
/** |
96 |
* Table of adders. Initially of size 2; grows to be at most NCPU. |
97 |
*/ |
98 |
private transient volatile Adder[] adders; |
99 |
|
100 |
/** |
101 |
* Serves as a lock when resizing and/or creating Adders. There |
102 |
* is no need for a blocking lock: When busy, other threads try |
103 |
* other slots. |
104 |
*/ |
105 |
private final AtomicInteger mutex; |
106 |
|
107 |
/** |
108 |
* Marsaglia XorShift for rehashing on collisions |
109 |
*/ |
110 |
private static int xorShift(int r) { |
111 |
r ^= r << 13; |
112 |
r ^= r >>> 17; |
113 |
return r ^ (r << 5); |
114 |
} |
115 |
|
116 |
/** |
117 |
* Creates a new adder with initially zero sum. |
118 |
*/ |
119 |
public StripedAdder() { |
120 |
Adder[] as = new Adder[2]; |
121 |
as[0] = new Adder(0); // ensure at least one available adder |
122 |
this.adders = as; |
123 |
this.mutex = new AtomicInteger(); |
124 |
} |
125 |
|
126 |
/** |
127 |
* Adds the given value. |
128 |
* |
129 |
* @param x the value to add |
130 |
*/ |
131 |
public void add(long x) { |
132 |
HashCode hc = threadHashCode.get(); |
133 |
for (int h = hc.code;;) { |
134 |
Adder[] as = adders; |
135 |
int n = as.length; |
136 |
Adder a = as[h & (n - 1)]; |
137 |
if (a != null) { |
138 |
long v = a.get(); |
139 |
if (a.compareAndSet(v, v + x)) |
140 |
break; |
141 |
if (n >= NCPU) { // Collision when table at max |
142 |
h = hc.code = xorShift(h); // change code |
143 |
continue; |
144 |
} |
145 |
} |
146 |
final AtomicInteger mutex = this.mutex; |
147 |
if (mutex.get() != 0) |
148 |
h = xorShift(h); // Try elsewhere |
149 |
else if (mutex.compareAndSet(0, 1)) { |
150 |
boolean created = false; |
151 |
try { |
152 |
Adder[] rs = adders; |
153 |
if (a != null && rs == as) // Resize table |
154 |
rs = adders = Arrays.copyOf(as, as.length << 1); |
155 |
int j = h & (rs.length - 1); |
156 |
if (rs[j] == null) { // Create adder |
157 |
rs[j] = new Adder(x); |
158 |
created = true; |
159 |
} |
160 |
} finally { |
161 |
mutex.set(0); |
162 |
} |
163 |
if (created) { |
164 |
hc.code = h; // Use this adder next time |
165 |
break; |
166 |
} |
167 |
} |
168 |
} |
169 |
} |
170 |
|
171 |
/** |
172 |
* Returns an estimate of the current sum. The result is |
173 |
* calculated by summing multiple variables, so may not be |
174 |
* accurate if updates occur concurrently with this method. |
175 |
* |
176 |
* @return the estimated sum |
177 |
*/ |
178 |
public long sum() { |
179 |
long sum = 0; |
180 |
Adder[] as = adders; |
181 |
int n = as.length; |
182 |
for (int i = 0; i < n; ++i) { |
183 |
Adder a = as[i]; |
184 |
if (a != null) |
185 |
sum += a.get(); |
186 |
} |
187 |
return sum; |
188 |
} |
189 |
|
190 |
/** |
191 |
* Resets each of the variables to zero. This is effective in |
192 |
* fully resetting the sum only if there are no concurrent |
193 |
* updates. |
194 |
*/ |
195 |
public void reset() { |
196 |
Adder[] as = adders; |
197 |
int n = as.length; |
198 |
for (int i = 0; i < n; ++i) { |
199 |
Adder a = as[i]; |
200 |
if (a != null) |
201 |
a.set(0L); |
202 |
} |
203 |
} |
204 |
|
205 |
/** |
206 |
* Equivalent to {@code add(1)}. |
207 |
*/ |
208 |
public void increment() { |
209 |
add(1L); |
210 |
} |
211 |
|
212 |
/** |
213 |
* Equivalent to {@code add(-1)}. |
214 |
*/ |
215 |
public void decrement() { |
216 |
add(-1L); |
217 |
} |
218 |
|
219 |
/** |
220 |
* Equivalent to {@link #sum} followed by {@link #reset}. |
221 |
* |
222 |
* @return the estimated sum |
223 |
*/ |
224 |
public long sumAndReset() { |
225 |
long sum = 0; |
226 |
Adder[] as = adders; |
227 |
int n = as.length; |
228 |
for (int i = 0; i < n; ++i) { |
229 |
Adder a = as[i]; |
230 |
if (a != null) { |
231 |
sum += a.get(); |
232 |
a.set(0L); |
233 |
} |
234 |
} |
235 |
return sum; |
236 |
} |
237 |
|
238 |
private void writeObject(java.io.ObjectOutputStream s) |
239 |
throws java.io.IOException { |
240 |
s.defaultWriteObject(); |
241 |
s.writeLong(sum()); |
242 |
} |
243 |
|
244 |
private void readObject(ObjectInputStream s) |
245 |
throws IOException, ClassNotFoundException { |
246 |
s.defaultReadObject(); |
247 |
long c = s.readLong(); |
248 |
Adder[] as = new Adder[2]; |
249 |
as[0] = new Adder(c); |
250 |
this.adders = as; |
251 |
mutex.set(0); |
252 |
} |
253 |
|
254 |
} |
255 |
|
256 |
|