16 |
|
|
17 |
|
/** |
18 |
|
* A set of variables that together maintain a sum. When updates |
19 |
< |
* (method {@link #add}) are contended across threads, the set of |
20 |
< |
* adders may grow to reduce contention. Method {@link #sum} returns |
21 |
< |
* the current combined total across these adders. This value is |
22 |
< |
* <em>NOT</em> an atomic snapshot (concurrent updates may occur while |
23 |
< |
* the sum is being calculated), and so cannot be used alone for |
24 |
< |
* fine-grained synchronization control. |
19 |
> |
* (method {@link #add}) are contended across threads, this set of |
20 |
> |
* adder variables may grow dynamically to reduce contention. Method |
21 |
> |
* {@link #sum} returns the current combined total across these |
22 |
> |
* adders. This value is <em>NOT</em> an atomic snapshot (concurrent |
23 |
> |
* updates may occur while the sum is being calculated), and so cannot |
24 |
> |
* be used alone for fine-grained synchronization control. |
25 |
|
* |
26 |
|
* <p> This class may be applicable when many threads frequently |
27 |
|
* update a common sum that is used for purposes such as collecting |
31 |
|
* only one thread can ever update the sum, performance may be |
32 |
|
* significantly slower than just updating a local variable. |
33 |
|
* |
34 |
+ |
* <p>A StripedAdder may optionally be constructed with a given |
35 |
+ |
* expected contention level; i.e., the number of threads that are |
36 |
+ |
* expected to concurrently update the sum. Supplying an accurate |
37 |
+ |
* value may improve performance by reducing the need for dynamic |
38 |
+ |
* adjustment. |
39 |
+ |
* |
40 |
|
* @author Doug Lea |
41 |
|
*/ |
42 |
|
public class StripedAdder implements Serializable { |
43 |
|
private static final long serialVersionUID = 7249069246863182397L; |
44 |
|
|
45 |
|
/* |
46 |
< |
* Overview: We maintain a table of AtomicLongs (padded to reduce |
47 |
< |
* false sharing). The table is indexed by per-thread hash codes |
48 |
< |
* that are initialized as random values. The table doubles in |
49 |
< |
* size upon contention (as indicated by failed CASes when |
50 |
< |
* performing add()), but is capped at the nearest power of two >= |
51 |
< |
* #cpus: At that point, contention should be infrequent if each |
52 |
< |
* thread has a unique index; so we instead adjust hash codes to |
53 |
< |
* new random values upon contention rather than expanding. A |
54 |
< |
* single spinlock is used for resizing the table as well as |
46 |
> |
* Overview: We maintain a table of Atomic long variables. The |
47 |
> |
* table is indexed by per-thread hash codes that are initialized |
48 |
> |
* to random values. |
49 |
> |
* |
50 |
> |
* The table doubles in size upon contention (as indicated by |
51 |
> |
* failed CASes when performing add()), but is capped at the |
52 |
> |
* nearest power of two >= #CPUS. This reflects the idea that, |
53 |
> |
* when there are more threads than CPUs, then if each thread were |
54 |
> |
* bound to a CPU, there would exist a perfect hash function |
55 |
> |
* mapping threads to slots that eliminates collisions. When we |
56 |
> |
* reach capacity, we search for this mapping by randomly varying |
57 |
> |
* the hash codes of colliding threads. Because search is random, |
58 |
> |
* and failures only become known via CAS failures, convergence |
59 |
> |
* will be slow, and because threads are typically not bound to |
60 |
> |
* CPUS forever, may not occur at all. However, despite these |
61 |
> |
* limitations, observed contention is typically very low in these |
62 |
> |
* cases. |
63 |
> |
* |
64 |
> |
* Table entries are of class Adder; a form of AtomicLong padded |
65 |
> |
* to reduce cache contention on most processors. Padding is |
66 |
> |
* overkill for most Atomics because they are most often |
67 |
> |
* irregularly scattered in memory and thus don't interfere much |
68 |
> |
* with each other. But Atomic objects residing in arrays will |
69 |
> |
* tend to be placed adjacent to each other, and so will most |
70 |
> |
* often share cache lines without this precaution. Except for |
71 |
> |
* slot adders[0], Adders are constructed upon first use, which |
72 |
> |
* further improves per-thread locality and helps reduce (an |
73 |
> |
* already large) footprint. |
74 |
> |
* |
75 |
> |
* A single spinlock is used for resizing the table as well as |
76 |
|
* populating slots with new Adders. Upon lock contention, threads |
77 |
< |
* just try other slots rather than blocking. We guarantee that at |
78 |
< |
* least one slot exists, so retries will eventually find a |
79 |
< |
* candidate Adder. |
77 |
> |
* try other slots rather than blocking. We guarantee that at |
78 |
> |
* least one slot (0) exists, so retries will eventually find a |
79 |
> |
* candidate Adder. During these retries, there is increased |
80 |
> |
* contention and reduced locality, which is still better than |
81 |
> |
* alternatives. |
82 |
|
*/ |
83 |
|
|
84 |
|
/** |
87 |
|
static final int NCPU = Runtime.getRuntime().availableProcessors(); |
88 |
|
|
89 |
|
/** |
90 |
< |
* Version of AtomicLong padded to avoid sharing cache |
62 |
< |
* lines on most processors |
90 |
> |
* Padded version of AtomicLong |
91 |
|
*/ |
92 |
|
static final class Adder extends AtomicLong { |
93 |
|
long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd; |
95 |
|
} |
96 |
|
|
97 |
|
/** |
98 |
< |
* Holder for the thread-local hash code. |
98 |
> |
* Holder for the thread-local hash code. The code starts off with |
99 |
> |
* a given random value, but may be set to a different |
100 |
> |
* pseudo-random value (using a cheaper but adequate xorshift |
101 |
> |
* generator) upon collisions. |
102 |
|
*/ |
103 |
|
static final class HashCode { |
104 |
|
int code; |
124 |
|
static final ThreadHashCode threadHashCode = new ThreadHashCode(); |
125 |
|
|
126 |
|
/** |
127 |
< |
* Table of adders. Initially of size 2; grows to be at most NCPU. |
127 |
> |
* Table of adders. Minimum size 2. Size grows to be at most NCPU. |
128 |
|
*/ |
129 |
|
private transient volatile Adder[] adders; |
130 |
|
|
136 |
|
private final AtomicInteger mutex; |
137 |
|
|
138 |
|
/** |
139 |
< |
* Marsaglia XorShift for rehashing on collisions |
139 |
> |
* Marsaglia XorShift random generator for rehashing on collisions |
140 |
|
*/ |
141 |
|
private static int xorShift(int r) { |
142 |
|
r ^= r << 13; |
145 |
|
} |
146 |
|
|
147 |
|
/** |
148 |
< |
* Creates a new adder with initially zero sum. |
148 |
> |
* Creates a new adder with zero sum. |
149 |
|
*/ |
150 |
|
public StripedAdder() { |
151 |
< |
Adder[] as = new Adder[2]; |
151 |
> |
this(2); |
152 |
> |
} |
153 |
> |
|
154 |
> |
/** |
155 |
> |
* Creates a new adder with zero sum, and with stripes presized |
156 |
> |
* for the given expected contention level. |
157 |
> |
* |
158 |
> |
* @param expectedContention the expected number of threads that |
159 |
> |
* will concurrently update the sum. |
160 |
> |
*/ |
161 |
> |
public StripedAdder(int expectedContention) { |
162 |
> |
int cap = (expectedContention < NCPU) ? expectedContention : NCPU; |
163 |
> |
int size = 2; |
164 |
> |
while (size < cap) |
165 |
> |
size <<= 1; |
166 |
> |
Adder[] as = new Adder[size]; |
167 |
|
as[0] = new Adder(0); // ensure at least one available adder |
168 |
|
this.adders = as; |
169 |
|
this.mutex = new AtomicInteger(); |
207 |
|
mutex.set(0); |
208 |
|
} |
209 |
|
if (created) { |
210 |
< |
hc.code = h; // Use this adder next time |
210 |
> |
hc.code = h; // Use this adder next time |
211 |
|
break; |
212 |
|
} |
213 |
|
} |