ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
(Generate patch)

Comparing jsr166/src/jsr166e/StripedAdder.java (file contents):
Revision 1.2 by jsr166, Wed Jul 20 16:06:19 2011 UTC vs.
Revision 1.5 by dl, Sun Jul 24 15:08:21 2011 UTC

# Line 16 | Line 16 | import java.io.ObjectOutputStream;
16  
17   /**
18   * A set of variables that together maintain a sum.  When updates
19 < * (method {@link #add}) are contended across threads, the set of
20 < * adders may grow to reduce contention. Method {@link #sum} returns
21 < * the current combined total across these adders. This value is
22 < * <em>NOT</em> an atomic snapshot (concurrent updates may occur while
23 < * the sum is being calculated), and so cannot be used alone for
24 < * fine-grained synchronization control.
19 > * (method {@link #add}) are contended across threads, this set of
20 > * adder variables may grow dynamically to reduce contention. Method
21 > * {@link #sum} returns the current combined total across these
22 > * adders. This value is <em>NOT</em> an atomic snapshot (concurrent
23 > * updates may occur while the sum is being calculated), and so cannot
24 > * be used alone for fine-grained synchronization control.
25   *
26   * <p> This class may be applicable when many threads frequently
27   * update a common sum that is used for purposes such as collecting
28   * statistics. In this case, performance may be significantly faster
29   * than using a shared {@link AtomicLong}, at the expense of using
30 < * significantly more space.  On the other hand, if it is known that
31 < * only one thread can ever update the sum, performance may be
32 < * significantly slower than just updating a local variable.
30 > * much more space.  On the other hand, if it is known that only one
31 > * thread can ever update the sum, performance may be significantly
32 > * slower than just updating a local variable.
33 > *
34 > * <p>A StripedAdder may optionally be constructed with a given
35 > * expected contention level; i.e., the number of threads that are
36 > * expected to concurrently update the sum. Supplying an accurate
37 > * value may improve performance by reducing the need for dynamic
38 > * adjustment.
39   *
40   * @author Doug Lea
41   */
# Line 37 | Line 43 | public class StripedAdder implements Ser
43      private static final long serialVersionUID = 7249069246863182397L;
44  
45      /*
46 <     * Overview: We maintain a table of AtomicLongs (padded to reduce
47 <     * false sharing). The table is indexed by per-thread hash codes
48 <     * that are initialized as random values.  The table doubles in
49 <     * size upon contention (as indicated by failed CASes when
50 <     * performing add()), but is capped at the nearest power of two >=
51 <     * #cpus: At that point, contention should be infrequent if each
52 <     * thread has a unique index; so we instead adjust hash codes to
53 <     * new random values upon contention rather than expanding. A
54 <     * single spinlock is used for resizing the table as well as
46 >     * A StripedAdder maintains a table of Atomic long variables. The
47 >     * table is indexed by per-thread hash codes that are initialized
48 >     * to random values.
49 >     *
50 >     * The table doubles in size upon contention (as indicated by
51 >     * failed CASes when performing add()), but is capped at the
52 >     * nearest power of two >= #CPUS. This reflects the idea that,
53 >     * when there are more threads than CPUs, then if each thread were
54 >     * bound to a CPU, there would exist a perfect hash function
55 >     * mapping threads to slots that eliminates collisions. When we
56 >     * reach capacity, we search for this mapping by randomly varying
57 >     * the hash codes of colliding threads.  Because search is random,
58 >     * and failures only become known via CAS failures, convergence
59 >     * will be slow, and because threads are typically not bound to
60 >     * CPUS forever, may not occur at all. However, despite these
61 >     * limitations, observed contention is typically low in these
62 >     * cases.
63 >     *
64 >     * Table entries are of class Adder; a form of AtomicLong padded
65 >     * to reduce cache contention on most processors. Padding is
66 >     * overkill for most Atomics because they are most often
67 >     * irregularly scattered in memory and thus don't interfere much
68 >     * with each other. But Atomic objects residing in arrays will
69 >     * tend to be placed adjacent to each other, and so will most
70 >     * often share cache lines without this precaution.  Adders are
71 >     * constructed upon first use, which further improves per-thread
72 >     * locality and helps reduce (an already large) footprint.
73 >     *
74 >     * A single spinlock is used for resizing the table as well as
75       * populating slots with new Adders. Upon lock contention, threads
76 <     * just try other slots rather than blocking. We guarantee that at
76 >     * try other slots rather than blocking. After initialization, at
77       * least one slot exists, so retries will eventually find a
78 <     * candidate Adder.
78 >     * candidate Adder. During these retries, there is increased
79 >     * contention and reduced locality, which is still better than
80 >     * alternatives.
81       */
82  
83      /**
# Line 58 | Line 86 | public class StripedAdder implements Ser
86      static final int NCPU = Runtime.getRuntime().availableProcessors();
87  
88      /**
89 <     * Version of AtomicLong padded to avoid sharing cache
90 <     * lines on most processors
89 >     * The table size set upon first use when default-constructed
90 >     */
91 >    private static final int DEFAULT_ARRAY_SIZE = 8;
92 >
93 >    /**
94 >     * Padded version of AtomicLong
95       */
96      static final class Adder extends AtomicLong {
97          long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd;
# Line 67 | Line 99 | public class StripedAdder implements Ser
99      }
100  
101      /**
102 <     * Holder for the thread-local hash code.
102 >     * Holder for the thread-local hash code. The code starts off with
103 >     * a given random value, but may be set to a different value upon
104 >     * collisions in retryAdd.
105       */
106      static final class HashCode {
107          int code;
# Line 93 | Line 127 | public class StripedAdder implements Ser
127      static final ThreadHashCode threadHashCode = new ThreadHashCode();
128  
129      /**
130 <     * Table of adders. Initially of size 2; grows to be at most NCPU.
130 >     * Table of adders. Size is power of two, grows to be at most NCPU.
131       */
132      private transient volatile Adder[] adders;
133  
134      /**
135       * Serves as a lock when resizing and/or creating Adders.  There
136 <     * is no need for a blocking lock: When busy, other threads try
137 <     * other slots.
136 >     * is no need for a blocking lock: Except during initialization
137 >     * races, when busy, other threads try other slots.
138       */
139      private final AtomicInteger mutex;
140  
141      /**
142 <     * Marsaglia XorShift for rehashing on collisions
142 >     * Creates a new adder with zero sum.
143       */
144 <    private static int xorShift(int r) {
145 <        r ^= r << 13;
146 <        r ^= r >>> 17;
113 <        return r ^ (r << 5);
144 >    public StripedAdder() {
145 >        this.mutex = new AtomicInteger();
146 >        // remaining initialization on first call to add.
147      }
148  
149      /**
150 <     * Creates a new adder with initially zero sum.
150 >     * Creates a new adder with zero sum, and with stripes presized
151 >     * for the given expected contention level.
152 >     *
153 >     * @param expectedContention the expected number of threads that
154 >     * will concurrently update the sum.
155       */
156 <    public StripedAdder() {
157 <        Adder[] as = new Adder[2];
158 <        as[0] = new Adder(0); // ensure at least one available adder
156 >    public StripedAdder(int expectedContention) {
157 >        int size;
158 >        if (expectedContention > 0) {
159 >            int cap = (expectedContention < NCPU) ? expectedContention : NCPU;
160 >            size = 1;
161 >            while (size < cap)
162 >                size <<= 1;
163 >        }
164 >        else
165 >            size = 0;
166 >        Adder[] as = new Adder[size];
167 >        for (int i = 0; i < size; ++i)
168 >            as[i] = new Adder(0);
169          this.adders = as;
170          this.mutex = new AtomicInteger();
171      }
# Line 129 | Line 176 | public class StripedAdder implements Ser
176       * @param x the value to add
177       */
178      public void add(long x) {
179 +        Adder[] as; Adder a; int n; long v; // locals to hold volatile reads
180          HashCode hc = threadHashCode.get();
181 <        for (int h = hc.code;;) {
182 <            Adder[] as = adders;
183 <            int n = as.length;
184 <            Adder a = as[h & (n - 1)];
185 <            if (a != null) {
186 <                long v = a.get();
187 <                if (a.compareAndSet(v, v + x))
188 <                    break;
189 <                if (n >= NCPU) {                 // Collision when table at max
190 <                    h = hc.code = xorShift(h);   // change code
191 <                    continue;
181 >        if ((as = adders) == null || (n = as.length) < 1 ||
182 >            (a = as[hc.code & (n - 1)]) == null ||
183 >            !a.compareAndSet(v = a.get(), v + x))
184 >            retryAdd(x, hc);
185 >    }
186 >
187 >    /**
188 >     * Handle cases of add involving initialization, resizing,
189 >     * creating new Adders, and/or contention.
190 >     */
191 >    private void retryAdd(long x, HashCode hc) {
192 >        int h = hc.code;
193 >        final AtomicInteger mutex = this.mutex;
194 >        for (boolean retried = false; ; retried = true) {
195 >            Adder[] as; Adder a; long v; int n, k; // Locals for volatiles
196 >            if ((as = adders) == null || (n = as.length) < 1) {
197 >                if (mutex.get() == 0 && mutex.compareAndSet(0, 1)) {
198 >                    try {
199 >                        if (adders == null)        // Default-initialize
200 >                            adders = new Adder[DEFAULT_ARRAY_SIZE];
201 >                    } finally {
202 >                        mutex.set(0);
203 >                    }
204                  }
205 +                else
206 +                    Thread.yield();               // initialization race
207              }
208 <            final AtomicInteger mutex = this.mutex;
209 <            if (mutex.get() != 0)
210 <                h = xorShift(h);                 // Try elsewhere
211 <            else if (mutex.compareAndSet(0, 1)) {
208 >            else if ((a = as[k = h & (n - 1)]) != null &&
209 >                     retried && a.compareAndSet(v = a.get(), v + x))
210 >                break;
211 >            else if ((a == null || n < NCPU) &&
212 >                     mutex.get() == 0 && mutex.compareAndSet(0, 1)) {
213                  boolean created = false;
214                  try {
215 <                    Adder[] rs = adders;
216 <                    if (a != null && rs == as)   // Resize table
217 <                        rs = adders = Arrays.copyOf(as, as.length << 1);
218 <                    int j = h & (rs.length - 1);
219 <                    if (rs[j] == null) {         // Create adder
220 <                        rs[j] = new Adder(x);
221 <                        created = true;
215 >                    if (adders == as) {
216 >                        if (as[k] == null) {
217 >                            as[k] = new Adder(x);
218 >                            created = true;
219 >                        }
220 >                        else {                   // Expand table
221 >                            Adder[] rs = new Adder[n << 1];
222 >                            for (int i = 0; i < n; ++i)
223 >                                rs[i] = as[i];
224 >                            adders = rs;
225 >                        }
226                      }
227                  } finally {
228                      mutex.set(0);
229                  }
230 <                if (created) {
164 <                    hc.code = h;                // Use this adder next time
230 >                if (created)
231                      break;
232 <                }
232 >            }
233 >            else {                                // Try elsewhere
234 >                h ^= h << 13;
235 >                h ^= h >>> 17;                    // Marsaglia XorShift
236 >                h ^= h << 5;
237              }
238          }
239 +        hc.code = h;
240      }
241  
242      /**
# Line 176 | Line 247 | public class StripedAdder implements Ser
247       * @return the estimated sum
248       */
249      public long sum() {
250 <        long sum = 0;
250 >        long sum = 0L;
251          Adder[] as = adders;
252 <        int n = as.length;
253 <        for (int i = 0; i < n; ++i) {
254 <            Adder a = as[i];
255 <            if (a != null)
256 <                sum += a.get();
252 >        if (as != null) {
253 >            int n = as.length;
254 >            for (int i = 0; i < n; ++i) {
255 >                Adder a = as[i];
256 >                if (a != null)
257 >                    sum += a.get();
258 >            }
259          }
260          return sum;
261      }
# Line 194 | Line 267 | public class StripedAdder implements Ser
267       */
268      public void reset() {
269          Adder[] as = adders;
270 <        int n = as.length;
271 <        for (int i = 0; i < n; ++i) {
272 <            Adder a = as[i];
273 <            if (a != null)
274 <                a.set(0L);
270 >        if (as != null) {
271 >            int n = as.length;
272 >            for (int i = 0; i < n; ++i) {
273 >                Adder a = as[i];
274 >                if (a != null)
275 >                    a.set(0L);
276 >            }
277          }
278      }
279  
# Line 222 | Line 297 | public class StripedAdder implements Ser
297       * @return the estimated sum
298       */
299      public long sumAndReset() {
300 <        long sum = 0;
300 >        long sum = 0L;
301          Adder[] as = adders;
302 <        int n = as.length;
303 <        for (int i = 0; i < n; ++i) {
304 <            Adder a = as[i];
305 <            if (a != null) {
306 <                sum += a.get();
307 <                a.set(0L);
302 >        if (as != null) {
303 >            int n = as.length;
304 >            for (int i = 0; i < n; ++i) {
305 >                Adder a = as[i];
306 >                if (a != null) {
307 >                    sum += a.get();
308 >                    a.set(0L);
309 >                }
310              }
311          }
312          return sum;
# Line 244 | Line 321 | public class StripedAdder implements Ser
321      private void readObject(ObjectInputStream s)
322          throws IOException, ClassNotFoundException {
323          s.defaultReadObject();
247        long c = s.readLong();
248        Adder[] as = new Adder[2];
249        as[0] = new Adder(c);
250        this.adders = as;
324          mutex.set(0);
325 +        add(s.readLong());
326      }
327  
328   }
255
256

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines