ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
(Generate patch)

Comparing jsr166/src/jsr166e/StripedAdder.java (file contents):
Revision 1.5 by dl, Sun Jul 24 15:08:21 2011 UTC vs.
Revision 1.12 by dl, Sat Jul 30 16:26:34 2011 UTC

# Line 5 | Line 5
5   */
6  
7   package jsr166e;
8 import java.util.Arrays;
8   import java.util.Random;
9   import java.util.concurrent.atomic.AtomicInteger;
10   import java.util.concurrent.atomic.AtomicLong;
# Line 15 | Line 14 | import java.io.ObjectInputStream;
14   import java.io.ObjectOutputStream;
15  
16   /**
17 < * A set of variables that together maintain a sum.  When updates
18 < * (method {@link #add}) are contended across threads, this set of
19 < * adder variables may grow dynamically to reduce contention. Method
21 < * {@link #sum} returns the current combined total across these
22 < * adders. This value is <em>NOT</em> an atomic snapshot (concurrent
23 < * updates may occur while the sum is being calculated), and so cannot
24 < * be used alone for fine-grained synchronization control.
17 > * One or more variables that together maintain an initially zero sum.
18 > * When updates (method {@link #add}) are contended across threads,
19 > * the set of variables may grow dynamically to reduce contention.
20   *
21 < * <p> This class may be applicable when many threads frequently
22 < * update a common sum that is used for purposes such as collecting
23 < * statistics. In this case, performance may be significantly faster
24 < * than using a shared {@link AtomicLong}, at the expense of using
25 < * much more space.  On the other hand, if it is known that only one
26 < * thread can ever update the sum, performance may be significantly
27 < * slower than just updating a local variable.
21 > * <p> This class is usually preferable to {@link AtomicLong} when
22 > * multiple threads update a common sum that is used for purposes such
23 > * as collecting statistics, not for fine-grained synchronization
24 > * control.  Under high update contention, throughput of this class is
25 > * expected to be significantly higher, at the expense of higher space
26 > * consumption.  Under low contention, this class imposes very little
27 > * time and space overhead compared to AtomicLong.  On the other hand,
28 > * in contexts where it is statically known that only one thread can
29 > * ever update a sum, time and space overhead is noticeably greater
30 > * than just updating a local variable.
31   *
32 < * <p>A StripedAdder may optionally be constructed with a given
33 < * expected contention level; i.e., the number of threads that are
34 < * expected to concurrently update the sum. Supplying an accurate
35 < * value may improve performance by reducing the need for dynamic
36 < * adjustment.
32 > * <p> Method {@link #sum} returns the current combined total across
33 > * the variables maintaining the sum.  This value is <em>NOT</em> an
34 > * atomic snapshot: Concurrent updates may occur while the sum is
35 > * being calculated.  However, updates cannot be "lost", so invocation
36 > * of <code>sum</code> in the absence of concurrent updates always
37 > * returns an accurate result.  The sum may also be <code>reset</code>
38 > * to zero, as an alternative to creating a new adder.  However,
39 > * method {@link #reset} is intrinsically racy, so should only be used
40 > * when it is known that no threads are concurrently updating the sum.
41 > *
42 > * <p><em>jsr166e note: This class is targeted to be placed in
43 > * java.util.concurrent.atomic<em>
44   *
45   * @author Doug Lea
46   */
# Line 43 | Line 48 | public class StripedAdder implements Ser
48      private static final long serialVersionUID = 7249069246863182397L;
49  
50      /*
51 <     * A StripedAdder maintains a table of Atomic long variables. The
52 <     * table is indexed by per-thread hash codes that are initialized
53 <     * to random values.
54 <     *
50 <     * The table doubles in size upon contention (as indicated by
51 <     * failed CASes when performing add()), but is capped at the
52 <     * nearest power of two >= #CPUS. This reflects the idea that,
53 <     * when there are more threads than CPUs, then if each thread were
54 <     * bound to a CPU, there would exist a perfect hash function
55 <     * mapping threads to slots that eliminates collisions. When we
56 <     * reach capacity, we search for this mapping by randomly varying
57 <     * the hash codes of colliding threads.  Because search is random,
58 <     * and failures only become known via CAS failures, convergence
59 <     * will be slow, and because threads are typically not bound to
60 <     * CPUS forever, may not occur at all. However, despite these
61 <     * limitations, observed contention is typically low in these
62 <     * cases.
51 >     * A StripedAdder maintains a lazily-initialized table of
52 >     * atomically updated variables, plus an extra "base" field. The
53 >     * table size is a power of two. Indexing uses masked per-thread
54 >     * hash codes
55       *
56 <     * Table entries are of class Adder; a form of AtomicLong padded
56 >     * Table entries are of class Cell; a variant of AtomicLong padded
57       * to reduce cache contention on most processors. Padding is
58 <     * overkill for most Atomics because they are most often
59 <     * irregularly scattered in memory and thus don't interfere much
60 <     * with each other. But Atomic objects residing in arrays will
61 <     * tend to be placed adjacent to each other, and so will most
62 <     * often share cache lines without this precaution.  Adders are
63 <     * constructed upon first use, which further improves per-thread
64 <     * locality and helps reduce (an already large) footprint.
58 >     * overkill for most Atomics because they are usually irregularly
59 >     * scattered in memory and thus don't interfere much with each
60 >     * other. But Atomic objects residing in arrays will tend to be
61 >     * placed adjacent to each other, and so will most often share
62 >     * cache lines (with a huge negative performance impact) without
63 >     * this precaution.
64 >     *
65 >     * In part because Cells are relatively large, we avoid creating
66 >     * them until they are needed.  When there is no contention, all
67 >     * updates are made to the base field.  Upon first contention (a
68 >     * failed CAS on base update), the table is initialized to size 2.
69 >     * The table size is doubled upon further contention until
70 >     * reaching the nearest power of two greater than or equal to the
71 >     * number of CPUS.
72 >     *
73 >     * Per-thread hash codes are initialized to random values.
74 >     * Contention and/or table collisions are indicated by failed
75 >     * CASes when performing an add operation (see method
76 >     * retryAdd). Upon a collision, if the table size is less than the
77 >     * capacity, it is doubled in size unless some other thread holds
78 >     * the lock. If a hashed slot is empty, and lock is available, a
79 >     * new Cell is created. Otherwise, if the slot exists, a CAS is
80 >     * tried.  Retries proceed by "double hashing", using a secondary
81 >     * hash (Marsaglia XorShift) to try to find a free slot.
82 >     *
83 >     * The table size is capped because, when there are more threads
84 >     * than CPUs, supposing that each thread were bound to a CPU,
85 >     * there would exist a perfect hash function mapping threads to
86 >     * slots that eliminates collisions. When we reach capacity, we
87 >     * search for this mapping by randomly varying the hash codes of
88 >     * colliding threads.  Because search is random, and collisions
89 >     * only become known via CAS failures, convergence can be slow,
90 >     * and because threads are typically not bound to CPUS forever,
91 >     * may not occur at all. However, despite these limitations,
92 >     * observed contention rates are typically low in these cases.
93 >     *
94 >     * A single spinlock is used for initializing and resizing the
95 >     * table, as well as populating slots with new Cells.  There is no
96 >     * need for a blocking lock: Upon lock contention, threads try
97 >     * other slots (or the base) rather than blocking.  During these
98 >     * retries, there is increased contention and reduced locality,
99 >     * which is still better than alternatives.
100 >     *
101 >     * It is possible for a Cell to become unused when threads that
102 >     * once hashed to it terminate, as well as in the case where
103 >     * doubling the table causes no thread to hash to it under
104 >     * expanded mask.  We do not try to detect or remove such cells,
105 >     * under the assumption that for long-running adders, observed
106 >     * contention levels will recur, so the cells will eventually be
107 >     * needed again; and for short-lived ones, it does not matter.
108       *
74     * A single spinlock is used for resizing the table as well as
75     * populating slots with new Adders. Upon lock contention, threads
76     * try other slots rather than blocking. After initialization, at
77     * least one slot exists, so retries will eventually find a
78     * candidate Adder. During these retries, there is increased
79     * contention and reduced locality, which is still better than
80     * alternatives.
81     */
82
83    /**
84     * Number of processors, to place a cap on table growth.
109       */
86    static final int NCPU = Runtime.getRuntime().availableProcessors();
110  
111 <    /**
89 <     * The table size set upon first use when default-constructed
90 <     */
91 <    private static final int DEFAULT_ARRAY_SIZE = 8;
111 >    private static final int NCPU = Runtime.getRuntime().availableProcessors();
112  
113      /**
114 <     * Padded version of AtomicLong
114 >     * Padded variant of AtomicLong.  The value field is placed
115 >     * between pads, hoping that the JVM doesn't reorder them.
116 >     * Updates are via inlined CAS in methods add and retryAdd.
117       */
118 <    static final class Adder extends AtomicLong {
119 <        long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd;
120 <        Adder(long x) { super(x); }
118 >    static final class Cell {
119 >        volatile long p0, p1, p2, p3, p4, p5, p6;
120 >        volatile long value;
121 >        volatile long q0, q1, q2, q3, q4, q5, q6;
122 >        Cell(long x) { value = x; }
123      }
124  
125      /**
126 <     * Holder for the thread-local hash code. The code starts off with
127 <     * a given random value, but may be set to a different value upon
104 <     * collisions in retryAdd.
126 >     * Holder for the thread-local hash code. The code is initially
127 >     * random, but may be set to a different value upon collisions.
128       */
129      static final class HashCode {
130 +        static final Random rng = new Random();
131          int code;
132 <        HashCode(int h) { code = h; }
132 >        HashCode() {
133 >            int h = rng.nextInt(); // Avoid zero to allow xorShift rehash
134 >            code = (h == 0) ? 1 : h;
135 >        }
136      }
137  
138      /**
139       * The corresponding ThreadLocal class
140       */
141      static final class ThreadHashCode extends ThreadLocal<HashCode> {
142 <        static final Random rng = new Random();
116 <        public HashCode initialValue() {
117 <            int h = rng.nextInt();
118 <            return new HashCode((h == 0) ? 1 : h); // ensure nonzero
119 <        }
142 >        public HashCode initialValue() { return new HashCode(); }
143      }
144  
145      /**
146       * Static per-thread hash codes. Shared across all StripedAdders
147 <     * because adjustments due to collisions in one table are likely
148 <     * to be appropriate for others.
147 >     * to reduce ThreadLocal pollution and because adjustments due to
148 >     * collisions in one table are likely to be appropriate for
149 >     * others.
150       */
151      static final ThreadHashCode threadHashCode = new ThreadHashCode();
152  
153      /**
154 <     * Table of adders. Size is power of two, grows to be at most NCPU.
154 >     * Table of cells. When non-null, size is a power of 2.
155       */
156 <    private transient volatile Adder[] adders;
156 >    private transient volatile Cell[] cells;
157  
158      /**
159 <     * Serves as a lock when resizing and/or creating Adders.  There
160 <     * is no need for a blocking lock: Except during initialization
137 <     * races, when busy, other threads try other slots.
159 >     * Base sum, used mainly when there is no contention, but also as
160 >     * a fallback during table initializion races. Updated via CAS.
161       */
162 <    private final AtomicInteger mutex;
162 >    private transient volatile long base;
163  
164      /**
165 <     * Creates a new adder with zero sum.
165 >     * Spinlock (locked via CAS) used when resizing and/or creating Cells.
166       */
167 <    public StripedAdder() {
145 <        this.mutex = new AtomicInteger();
146 <        // remaining initialization on first call to add.
147 <    }
167 >    private transient volatile int busy;
168  
169      /**
170 <     * Creates a new adder with zero sum, and with stripes presized
151 <     * for the given expected contention level.
152 <     *
153 <     * @param expectedContention the expected number of threads that
154 <     * will concurrently update the sum.
170 >     * Creates a new adder with initial sum of zero.
171       */
172 <    public StripedAdder(int expectedContention) {
157 <        int size;
158 <        if (expectedContention > 0) {
159 <            int cap = (expectedContention < NCPU) ? expectedContention : NCPU;
160 <            size = 1;
161 <            while (size < cap)
162 <                size <<= 1;
163 <        }
164 <        else
165 <            size = 0;
166 <        Adder[] as = new Adder[size];
167 <        for (int i = 0; i < size; ++i)
168 <            as[i] = new Adder(0);
169 <        this.adders = as;
170 <        this.mutex = new AtomicInteger();
172 >    public StripedAdder() {
173      }
174  
175      /**
# Line 176 | Line 178 | public class StripedAdder implements Ser
178       * @param x the value to add
179       */
180      public void add(long x) {
181 <        Adder[] as; Adder a; int n; long v; // locals to hold volatile reads
182 <        HashCode hc = threadHashCode.get();
183 <        if ((as = adders) == null || (n = as.length) < 1 ||
184 <            (a = as[hc.code & (n - 1)]) == null ||
185 <            !a.compareAndSet(v = a.get(), v + x))
186 <            retryAdd(x, hc);
181 >        Cell[] as; long v; HashCode hc; Cell a; int n; boolean contended;
182 >        if ((as = cells) != null ||
183 >            !UNSAFE.compareAndSwapLong(this, baseOffset, v = base, v + x)) {
184 >            int h = (hc = threadHashCode.get()).code;
185 >            if (as != null && (n = as.length) > 0 &&
186 >                (a = as[(n - 1) & h]) != null) {
187 >                if (UNSAFE.compareAndSwapLong(a, valueOffset,
188 >                                              v = a.value, v + x))
189 >                    return;
190 >                contended = true;
191 >            }
192 >            else
193 >                contended = false;
194 >            retryAdd(x, hc, contended);
195 >        }
196      }
197  
198      /**
199       * Handle cases of add involving initialization, resizing,
200 <     * creating new Adders, and/or contention.
200 >     * creating new Cells, and/or contention. See above for
201 >     * explanation. This method suffers the usual non-modularity
202 >     * problems of optimistic retry code, relying on rechecked sets of
203 >     * reads.
204 >     *
205 >     * @param x the value to add
206 >     * @param hc the hash code holder
207 >     * @param precontended true if CAS failed before call
208       */
209 <    private void retryAdd(long x, HashCode hc) {
209 >    private void retryAdd(long x, HashCode hc, boolean precontended) {
210          int h = hc.code;
211 <        final AtomicInteger mutex = this.mutex;
212 <        for (boolean retried = false; ; retried = true) {
213 <            Adder[] as; Adder a; long v; int n, k; // Locals for volatiles
214 <            if ((as = adders) == null || (n = as.length) < 1) {
215 <                if (mutex.get() == 0 && mutex.compareAndSet(0, 1)) {
216 <                    try {
217 <                        if (adders == null)        // Default-initialize
218 <                            adders = new Adder[DEFAULT_ARRAY_SIZE];
219 <                    } finally {
220 <                        mutex.set(0);
211 >        boolean collide = false; // true if last slot nonempty
212 >        for (;;) {
213 >            Cell[] as; Cell a; int n;
214 >            if ((as = cells) != null && (n = as.length) > 0) {
215 >                if ((a = as[(n - 1) & h]) == null) {
216 >                    if (busy == 0) {            // Try to attach new Cell
217 >                        Cell r = new Cell(x);   // Optimistically create
218 >                        if (busy == 0 &&
219 >                            UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
220 >                            boolean created = false;
221 >                            try {               // Recheck under lock
222 >                                Cell[] rs; int m, j;
223 >                                if ((rs = cells) != null &&
224 >                                    (m = rs.length) > 0 &&
225 >                                    rs[j = (m - 1) & h] == null) {
226 >                                    rs[j] = r;
227 >                                    created = true;
228 >                                }
229 >                            } finally {
230 >                                busy = 0;
231 >                            }
232 >                            if (created)
233 >                                break;
234 >                            continue;           // Slot is now non-empty
235 >                        }
236                      }
237 +                    collide = false;
238                  }
239 <                else
240 <                    Thread.yield();               // initialization race
241 <            }
242 <            else if ((a = as[k = h & (n - 1)]) != null &&
243 <                     retried && a.compareAndSet(v = a.get(), v + x))
244 <                break;
245 <            else if ((a == null || n < NCPU) &&
246 <                     mutex.get() == 0 && mutex.compareAndSet(0, 1)) {
247 <                boolean created = false;
248 <                try {
249 <                    if (adders == as) {
250 <                        if (as[k] == null) {
251 <                            as[k] = new Adder(x);
252 <                            created = true;
253 <                        }
254 <                        else {                   // Expand table
255 <                            Adder[] rs = new Adder[n << 1];
256 <                            for (int i = 0; i < n; ++i)
257 <                                rs[i] = as[i];
258 <                            adders = rs;
239 >                else if (precontended)          // CAS already known to fail
240 >                    precontended = false;       // Continue after rehash
241 >                else {
242 >                    long v = a.value;
243 >                    if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
244 >                        break;
245 >                    if (!collide)
246 >                        collide = true;
247 >                    else if (n >= NCPU || cells != as)
248 >                        collide = false;        // Can't expand
249 >                    else if (busy == 0 &&
250 >                             UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
251 >                        collide = false;
252 >                        try {
253 >                            if (cells == as) { // Expand table
254 >                                Cell[] rs = new Cell[n << 1];
255 >                                for (int i = 0; i < n; ++i)
256 >                                    rs[i] = as[i];
257 >                                cells = rs;
258 >                            }
259 >                        } finally {
260 >                            busy = 0;
261                          }
262 +                        continue;
263 +                    }
264 +                }
265 +                h ^= h << 13;                   // Rehash
266 +                h ^= h >>> 17;
267 +                h ^= h << 5;
268 +            }
269 +            else if (busy == 0 && cells == as &&
270 +                     UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
271 +                boolean init = false;
272 +                try {                           // Initialize
273 +                    if (cells == as) {
274 +                        Cell r = new Cell(x);
275 +                        Cell[] rs = new Cell[2];
276 +                        rs[h & 1] = r;
277 +                        cells = rs;
278 +                        init = true;
279                      }
280                  } finally {
281 <                    mutex.set(0);
281 >                    busy = 0;
282                  }
283 <                if (created)
283 >                if (init)
284                      break;
285              }
286 <            else {                                // Try elsewhere
287 <                h ^= h << 13;
288 <                h ^= h >>> 17;                    // Marsaglia XorShift
289 <                h ^= h << 5;
286 >            else {                              // Lost initialization race
287 >                long b = base;                  // Fall back on using base
288 >                if (UNSAFE.compareAndSwapLong(this, baseOffset, b, b + x))
289 >                    break;
290              }
291          }
292 <        hc.code = h;
292 >        hc.code = h;                            // Record index for next time
293      }
294  
295      /**
296 <     * Returns an estimate of the current sum.  The result is
297 <     * calculated by summing multiple variables, so may not be
298 <     * accurate if updates occur concurrently with this method.
296 >     * Equivalent to {@code add(1)}.
297 >     */
298 >    public void increment() {
299 >        add(1L);
300 >    }
301 >
302 >    /**
303 >     * Equivalent to {@code add(-1)}.
304 >     */
305 >    public void decrement() {
306 >        add(-1L);
307 >    }
308 >
309 >    /**
310 >     * Returns the current sum.  The result is only guaranteed to be
311 >     * accurate in the absence of concurrent updates. Otherwise, it
312 >     * may fail to reflect one or more updates occuring while
313 >     * calculating the result.
314       *
315 <     * @return the estimated sum
315 >     * @return the sum
316       */
317      public long sum() {
318 <        long sum = 0L;
319 <        Adder[] as = adders;
318 >        Cell[] as = cells;
319 >        long sum = base;
320          if (as != null) {
321              int n = as.length;
322              for (int i = 0; i < n; ++i) {
323 <                Adder a = as[i];
323 >                Cell a = as[i];
324                  if (a != null)
325 <                    sum += a.get();
325 >                    sum += a.value;
326              }
327          }
328          return sum;
329      }
330  
331      /**
332 <     * Resets each of the variables to zero. This is effective in
333 <     * fully resetting the sum only if there are no concurrent
334 <     * updates.
332 >     * Resets variables maintaining the sum to zero.  This is
333 >     * effective in setting the sum to zero only if there are no
334 >     * concurrent updates.
335       */
336      public void reset() {
337 <        Adder[] as = adders;
337 >        Cell[] as = cells;
338 >        base = 0L;
339          if (as != null) {
340              int n = as.length;
341              for (int i = 0; i < n; ++i) {
342 <                Adder a = as[i];
342 >                Cell a = as[i];
343                  if (a != null)
344 <                    a.set(0L);
344 >                    a.value = 0L;
345              }
346          }
347      }
348  
349      /**
350 <     * Equivalent to {@code add(1)}.
351 <     */
352 <    public void increment() {
353 <        add(1L);
354 <    }
355 <
287 <    /**
288 <     * Equivalent to {@code add(-1)}.
289 <     */
290 <    public void decrement() {
291 <        add(-1L);
292 <    }
293 <
294 <    /**
295 <     * Equivalent to {@link #sum} followed by {@link #reset}.
350 >     * Equivalent in effect to {@link #sum} followed by {@link
351 >     * #reset}. This method may apply for example during quiescent
352 >     * points between multithreaded computations.  If there are
353 >     * updates concurrent with this method, the returned value is
354 >     * <em>not</em> guaranteed to be the final sum occurring before
355 >     * the reset.
356       *
357 <     * @return the estimated sum
357 >     * @return the sum
358       */
359 <    public long sumAndReset() {
360 <        long sum = 0L;
361 <        Adder[] as = adders;
359 >    public long sumThenReset() {
360 >        Cell[] as = cells;
361 >        long sum = base;
362 >        base = 0L;
363          if (as != null) {
364              int n = as.length;
365              for (int i = 0; i < n; ++i) {
366 <                Adder a = as[i];
366 >                Cell a = as[i];
367                  if (a != null) {
368 <                    sum += a.get();
369 <                    a.set(0L);
368 >                    sum += a.value;
369 >                    a.value = 0L;
370                  }
371              }
372          }
# Line 321 | Line 382 | public class StripedAdder implements Ser
382      private void readObject(ObjectInputStream s)
383          throws IOException, ClassNotFoundException {
384          s.defaultReadObject();
385 <        mutex.set(0);
386 <        add(s.readLong());
385 >        busy = 0;
386 >        cells = null;
387 >        base = s.readLong();
388 >    }
389 >
390 >    // Unsafe mechanics
391 >    private static final sun.misc.Unsafe UNSAFE;
392 >    private static final long baseOffset;
393 >    private static final long busyOffset;
394 >    private static final long valueOffset;
395 >    static {
396 >        try {
397 >            UNSAFE = getUnsafe();
398 >            Class<?> sk = StripedAdder.class;
399 >            baseOffset = UNSAFE.objectFieldOffset
400 >                (sk.getDeclaredField("base"));
401 >            busyOffset = UNSAFE.objectFieldOffset
402 >                (sk.getDeclaredField("busy"));
403 >            Class<?> ak = Cell.class;
404 >            valueOffset = UNSAFE.objectFieldOffset
405 >                (ak.getDeclaredField("value"));
406 >        } catch (Exception e) {
407 >            throw new Error(e);
408 >        }
409 >    }
410 >
411 >    /**
412 >     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
413 >     * Replace with a simple call to Unsafe.getUnsafe when integrating
414 >     * into a jdk.
415 >     *
416 >     * @return a sun.misc.Unsafe
417 >     */
418 >    private static sun.misc.Unsafe getUnsafe() {
419 >        try {
420 >            return sun.misc.Unsafe.getUnsafe();
421 >        } catch (SecurityException se) {
422 >            try {
423 >                return java.security.AccessController.doPrivileged
424 >                    (new java.security
425 >                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
426 >                        public sun.misc.Unsafe run() throws Exception {
427 >                            java.lang.reflect.Field f = sun.misc
428 >                                .Unsafe.class.getDeclaredField("theUnsafe");
429 >                            f.setAccessible(true);
430 >                            return (sun.misc.Unsafe) f.get(null);
431 >                        }});
432 >            } catch (java.security.PrivilegedActionException e) {
433 >                throw new RuntimeException("Could not initialize intrinsics",
434 >                                           e.getCause());
435 >            }
436 >        }
437      }
438  
439   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines