ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
(Generate patch)

Comparing jsr166/src/jsr166e/StripedAdder.java (file contents):
Revision 1.6 by dl, Tue Jul 26 17:16:36 2011 UTC vs.
Revision 1.12 by dl, Sat Jul 30 16:26:34 2011 UTC

# Line 5 | Line 5
5   */
6  
7   package jsr166e;
8 import java.util.Arrays;
8   import java.util.Random;
9   import java.util.concurrent.atomic.AtomicInteger;
10   import java.util.concurrent.atomic.AtomicLong;
# Line 15 | Line 14 | import java.io.ObjectInputStream;
14   import java.io.ObjectOutputStream;
15  
16   /**
17 < * A set of variables that together maintain a sum.  When updates
18 < * (method {@link #add}) are contended across threads, this set of
19 < * adder variables may grow dynamically to reduce contention. Method
21 < * {@link #sum} returns the current combined total across these
22 < * adders. This value is <em>NOT</em> an atomic snapshot (concurrent
23 < * updates may occur while the sum is being calculated), and so cannot
24 < * be used alone for fine-grained synchronization control.
17 > * One or more variables that together maintain an initially zero sum.
18 > * When updates (method {@link #add}) are contended across threads,
19 > * the set of variables may grow dynamically to reduce contention.
20   *
21 < * <p> This class may be applicable when many threads frequently
22 < * update a common sum that is used for purposes such as collecting
23 < * statistics. In this case, performance may be significantly faster
24 < * than using a shared {@link AtomicLong}, at the expense of using
25 < * much more space.  On the other hand, if it is known that only one
26 < * thread can ever update the sum, performance may be significantly
27 < * slower than just updating a local variable.
21 > * <p> This class is usually preferable to {@link AtomicLong} when
22 > * multiple threads update a common sum that is used for purposes such
23 > * as collecting statistics, not for fine-grained synchronization
24 > * control.  Under high update contention, throughput of this class is
25 > * expected to be significantly higher, at the expense of higher space
26 > * consumption.  Under low contention, this class imposes very little
27 > * time and space overhead compared to AtomicLong.  On the other hand,
28 > * in contexts where it is statically known that only one thread can
29 > * ever update a sum, time and space overhead is noticeably greater
30 > * than just updating a local variable.
31   *
32 < * <p>A StripedAdder may optionally be constructed with a given
33 < * expected contention level; i.e., the number of threads that are
34 < * expected to concurrently update the sum. Supplying an accurate
35 < * value may improve performance by reducing the need for dynamic
36 < * adjustment.
32 > * <p> Method {@link #sum} returns the current combined total across
33 > * the variables maintaining the sum.  This value is <em>NOT</em> an
34 > * atomic snapshot: Concurrent updates may occur while the sum is
35 > * being calculated.  However, updates cannot be "lost", so invocation
36 > * of <code>sum</code> in the absence of concurrent updates always
37 > * returns an accurate result.  The sum may also be <code>reset</code>
38 > * to zero, as an alternative to creating a new adder.  However,
39 > * method {@link #reset} is intrinsically racy, so should only be used
40 > * when it is known that no threads are concurrently updating the sum.
41 > *
42 > * <p><em>jsr166e note: This class is targeted to be placed in
43 > * java.util.concurrent.atomic<em>
44   *
45   * @author Doug Lea
46   */
# Line 43 | Line 48 | public class StripedAdder implements Ser
48      private static final long serialVersionUID = 7249069246863182397L;
49  
50      /*
51 <     * A StripedAdder maintains a table of Atomic long variables. The
52 <     * table is indexed by per-thread hash codes.
51 >     * A StripedAdder maintains a lazily-initialized table of
52 >     * atomically updated variables, plus an extra "base" field. The
53 >     * table size is a power of two. Indexing uses masked per-thread
54 >     * hash codes
55       *
56 <     * By default, the table is lazily initialized, to minimize
57 <     * footprint until adders are used. On first use, the table is set
58 <     * to size DEFAULT_INITIAL_SIZE (currently 8). Table size is
59 <     * bounded by the number of CPUS (if larger than the default
60 <     * size).
56 >     * Table entries are of class Cell; a variant of AtomicLong padded
57 >     * to reduce cache contention on most processors. Padding is
58 >     * overkill for most Atomics because they are usually irregularly
59 >     * scattered in memory and thus don't interfere much with each
60 >     * other. But Atomic objects residing in arrays will tend to be
61 >     * placed adjacent to each other, and so will most often share
62 >     * cache lines (with a huge negative performance impact) without
63 >     * this precaution.
64 >     *
65 >     * In part because Cells are relatively large, we avoid creating
66 >     * them until they are needed.  When there is no contention, all
67 >     * updates are made to the base field.  Upon first contention (a
68 >     * failed CAS on base update), the table is initialized to size 2.
69 >     * The table size is doubled upon further contention until
70 >     * reaching the nearest power of two greater than or equal to the
71 >     * number of CPUS.
72       *
73       * Per-thread hash codes are initialized to random values.
74 <     * Collisions are indicated by failed CASes when performing an add
75 <     * operation (see method retryAdd). Upon a collision, if the table
76 <     * size is less than the capacity, it is doubled in size unless
77 <     * some other thread holds lock. If a hashed slot is empty, and
78 <     * lock is available, a new Adder is created. Otherwise, if the
79 <     * slot exists, a CAS is tried.  Retries proceed by "double
80 <     * hashing", using a secondary hash (Marsaglia XorShift) to try to
81 <     * find a free slot.
74 >     * Contention and/or table collisions are indicated by failed
75 >     * CASes when performing an add operation (see method
76 >     * retryAdd). Upon a collision, if the table size is less than the
77 >     * capacity, it is doubled in size unless some other thread holds
78 >     * the lock. If a hashed slot is empty, and lock is available, a
79 >     * new Cell is created. Otherwise, if the slot exists, a CAS is
80 >     * tried.  Retries proceed by "double hashing", using a secondary
81 >     * hash (Marsaglia XorShift) to try to find a free slot.
82       *
83       * The table size is capped because, when there are more threads
84       * than CPUs, supposing that each thread were bound to a CPU,
85       * there would exist a perfect hash function mapping threads to
86       * slots that eliminates collisions. When we reach capacity, we
87       * search for this mapping by randomly varying the hash codes of
88 <     * colliding threads.  Because search is random, and failures only
89 <     * become known via CAS failures, convergence will be slow, and
90 <     * because threads are typically not bound to CPUS forever, may
91 <     * not occur at all. However, despite these limitations, observed
92 <     * contention is typically low in these cases.
88 >     * colliding threads.  Because search is random, and collisions
89 >     * only become known via CAS failures, convergence can be slow,
90 >     * and because threads are typically not bound to CPUS forever,
91 >     * may not occur at all. However, despite these limitations,
92 >     * observed contention rates are typically low in these cases.
93       *
94 <     * Table entries are of class Adder; a form of AtomicLong padded
95 <     * to reduce cache contention on most processors. Padding is
96 <     * overkill for most Atomics because they are usually irregularly
97 <     * scattered in memory and thus don't interfere much with each
98 <     * other. But Atomic objects residing in arrays will tend to be
99 <     * placed adjacent to each other, and so will most often share
100 <     * cache lines without this precaution.  Adders are by default
101 <     * constructed upon first use, which further improves per-thread
102 <     * locality and helps reduce footprint.
94 >     * A single spinlock is used for initializing and resizing the
95 >     * table, as well as populating slots with new Cells.  There is no
96 >     * need for a blocking lock: Upon lock contention, threads try
97 >     * other slots (or the base) rather than blocking.  During these
98 >     * retries, there is increased contention and reduced locality,
99 >     * which is still better than alternatives.
100 >     *
101 >     * It is possible for a Cell to become unused when threads that
102 >     * once hashed to it terminate, as well as in the case where
103 >     * doubling the table causes no thread to hash to it under
104 >     * expanded mask.  We do not try to detect or remove such cells,
105 >     * under the assumption that for long-running adders, observed
106 >     * contention levels will recur, so the cells will eventually be
107 >     * needed again; and for short-lived ones, it does not matter.
108       *
86     * A single spinlock is used for resizing the table as well as
87     * populating slots with new Adders. Upon lock contention, threads
88     * try other slots rather than blocking. After initialization, at
89     * least one slot exists, so retries will eventually find a
90     * candidate Adder.  During these retries, there is increased
91     * contention and reduced locality, which is still better than
92     * alternatives.
93     */
94
95    /**
96     * Padded version of AtomicLong
109       */
98    static final class Adder extends AtomicLong {
99        long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
100        Adder(long x) { super(x); }
101    }
110  
111      private static final int NCPU = Runtime.getRuntime().availableProcessors();
112  
113      /**
114 <     * Table bounds. DEFAULT_INITIAL_SIZE is the table size set upon
115 <     * first use under default constructor, and must be a power of
116 <     * two. There is not much point in making size a lot smaller than
117 <     * that of Adders though.  CAP is the maximum allowed table size.
118 <     */
119 <    private static final int DEFAULT_INITIAL_SIZE = 8;
120 <    private static final int CAP = Math.max(NCPU, DEFAULT_INITIAL_SIZE);
114 >     * Padded variant of AtomicLong.  The value field is placed
115 >     * between pads, hoping that the JVM doesn't reorder them.
116 >     * Updates are via inlined CAS in methods add and retryAdd.
117 >     */
118 >    static final class Cell {
119 >        volatile long p0, p1, p2, p3, p4, p5, p6;
120 >        volatile long value;
121 >        volatile long q0, q1, q2, q3, q4, q5, q6;
122 >        Cell(long x) { value = x; }
123 >    }
124  
125      /**
126       * Holder for the thread-local hash code. The code is initially
# Line 119 | Line 130 | public class StripedAdder implements Ser
130          static final Random rng = new Random();
131          int code;
132          HashCode() {
133 <            int h = rng.nextInt();
134 <            code = (h == 0) ? 1 : h; // ensure nonzero
133 >            int h = rng.nextInt(); // Avoid zero to allow xorShift rehash
134 >            code = (h == 0) ? 1 : h;
135          }
136      }
137  
# Line 140 | Line 151 | public class StripedAdder implements Ser
151      static final ThreadHashCode threadHashCode = new ThreadHashCode();
152  
153      /**
154 <     * Table of adders. Size is power of two, grows to be at most CAP.
154 >     * Table of cells. When non-null, size is a power of 2.
155       */
156 <    private transient volatile Adder[] adders;
156 >    private transient volatile Cell[] cells;
157  
158      /**
159 <     * Serves as a lock when resizing and/or creating Adders.  There
160 <     * is no need for a blocking lock: Except during initialization
150 <     * races, when busy, other threads try other slots. However,
151 <     * during (double-checked) initializations, we use the
152 <     * "synchronized" lock on this object.
159 >     * Base sum, used mainly when there is no contention, but also as
160 >     * a fallback during table initializion races. Updated via CAS.
161       */
162 <    private final AtomicInteger mutex;
162 >    private transient volatile long base;
163  
164      /**
165 <     * Creates a new adder with zero sum.
165 >     * Spinlock (locked via CAS) used when resizing and/or creating Cells.
166       */
167 <    public StripedAdder() {
160 <        this.mutex = new AtomicInteger();
161 <        // remaining initialization on first call to add.
162 <    }
167 >    private transient volatile int busy;
168  
169      /**
170 <     * Creates a new adder with zero sum, and with stripes presized
171 <     * for the given expected contention level.
172 <     *
168 <     * @param expectedContention the expected number of threads that
169 <     * will concurrently update the sum.
170 <     */
171 <    public StripedAdder(int expectedContention) {
172 <        int cap = (expectedContention < CAP) ? expectedContention : CAP;
173 <        int size = 1;
174 <        while (size < cap)
175 <            size <<= 1;
176 <        Adder[] as = new Adder[size];
177 <        for (int i = 0; i < size; ++i)
178 <            as[i] = new Adder(0);
179 <        this.adders = as;
180 <        this.mutex = new AtomicInteger();
170 >     * Creates a new adder with initial sum of zero.
171 >     */
172 >    public StripedAdder() {
173      }
174  
175      /**
# Line 186 | Line 178 | public class StripedAdder implements Ser
178       * @param x the value to add
179       */
180      public void add(long x) {
181 <        Adder[] as; Adder a; int n; long v; // locals to hold volatile reads
182 <        HashCode hc = threadHashCode.get();
183 <        int h = hc.code;
184 <        if ((as = adders) == null || (n = as.length) < 1 ||
185 <            (a = as[(n - 1) & h]) == null ||
186 <            !a.compareAndSet(v = a.get(), v + x))
187 <            retryAdd(x, hc);
181 >        Cell[] as; long v; HashCode hc; Cell a; int n; boolean contended;
182 >        if ((as = cells) != null ||
183 >            !UNSAFE.compareAndSwapLong(this, baseOffset, v = base, v + x)) {
184 >            int h = (hc = threadHashCode.get()).code;
185 >            if (as != null && (n = as.length) > 0 &&
186 >                (a = as[(n - 1) & h]) != null) {
187 >                if (UNSAFE.compareAndSwapLong(a, valueOffset,
188 >                                              v = a.value, v + x))
189 >                    return;
190 >                contended = true;
191 >            }
192 >            else
193 >                contended = false;
194 >            retryAdd(x, hc, contended);
195 >        }
196      }
197  
198      /**
199       * Handle cases of add involving initialization, resizing,
200 <     * creating new Adders, and/or contention. See above for
201 <     * explanation.
200 >     * creating new Cells, and/or contention. See above for
201 >     * explanation. This method suffers the usual non-modularity
202 >     * problems of optimistic retry code, relying on rechecked sets of
203 >     * reads.
204 >     *
205 >     * @param x the value to add
206 >     * @param hc the hash code holder
207 >     * @param precontended true if CAS failed before call
208       */
209 <    private void retryAdd(long x, HashCode hc) {
209 >    private void retryAdd(long x, HashCode hc, boolean precontended) {
210          int h = hc.code;
211 <        final AtomicInteger mutex = this.mutex;
206 <        int collisions = 1 - mutex.get(); // first guess: collides if not locked
211 >        boolean collide = false; // true if last slot nonempty
212          for (;;) {
213 <            Adder[] as; Adder a; long v; int k, n;
214 <            while ((as = adders) == null || (n = as.length) < 1) {
215 <                synchronized(mutex) {                // Try to initialize
216 <                    if (adders == null) {
217 <                        Adder[] rs = new Adder[DEFAULT_INITIAL_SIZE];
218 <                        rs[h & (DEFAULT_INITIAL_SIZE - 1)] = new Adder(0);
219 <                        adders = rs;
213 >            Cell[] as; Cell a; int n;
214 >            if ((as = cells) != null && (n = as.length) > 0) {
215 >                if ((a = as[(n - 1) & h]) == null) {
216 >                    if (busy == 0) {            // Try to attach new Cell
217 >                        Cell r = new Cell(x);   // Optimistically create
218 >                        if (busy == 0 &&
219 >                            UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
220 >                            boolean created = false;
221 >                            try {               // Recheck under lock
222 >                                Cell[] rs; int m, j;
223 >                                if ((rs = cells) != null &&
224 >                                    (m = rs.length) > 0 &&
225 >                                    rs[j = (m - 1) & h] == null) {
226 >                                    rs[j] = r;
227 >                                    created = true;
228 >                                }
229 >                            } finally {
230 >                                busy = 0;
231 >                            }
232 >                            if (created)
233 >                                break;
234 >                            continue;           // Slot is now non-empty
235 >                        }
236                      }
237 +                    collide = false;
238                  }
239 <                collisions = 0;
240 <            }
241 <
242 <            if ((a = as[k = (n - 1) & h]) == null) { // Try to add slot
243 <                if (mutex.get() == 0 && mutex.compareAndSet(0, 1)) {
222 <                    try {
223 <                        if (adders == as && as[k] == null)
224 <                            a = as[k] = new Adder(x);
225 <                    } finally {
226 <                        mutex.set(0);
227 <                    }
228 <                    if (a != null)
239 >                else if (precontended)          // CAS already known to fail
240 >                    precontended = false;       // Continue after rehash
241 >                else {
242 >                    long v = a.value;
243 >                    if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
244                          break;
245 +                    if (!collide)
246 +                        collide = true;
247 +                    else if (n >= NCPU || cells != as)
248 +                        collide = false;        // Can't expand
249 +                    else if (busy == 0 &&
250 +                             UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
251 +                        collide = false;
252 +                        try {
253 +                            if (cells == as) { // Expand table
254 +                                Cell[] rs = new Cell[n << 1];
255 +                                for (int i = 0; i < n; ++i)
256 +                                    rs[i] = as[i];
257 +                                cells = rs;
258 +                            }
259 +                        } finally {
260 +                            busy = 0;
261 +                        }
262 +                        continue;
263 +                    }
264                  }
265 <                collisions = 0;
265 >                h ^= h << 13;                   // Rehash
266 >                h ^= h >>> 17;
267 >                h ^= h << 5;
268              }
269 <            else if (collisions != 0 && n < CAP &&   // Try to expand table
270 <                     mutex.get() == 0 && mutex.compareAndSet(0, 1)) {
271 <                try {
272 <                    if (adders == as) {
273 <                        Adder[] rs = new Adder[n << 1];
274 <                        for (int i = 0; i < n; ++i)
275 <                            rs[i] = as[i];
276 <                        adders = rs;
269 >            else if (busy == 0 && cells == as &&
270 >                     UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
271 >                boolean init = false;
272 >                try {                           // Initialize
273 >                    if (cells == as) {
274 >                        Cell r = new Cell(x);
275 >                        Cell[] rs = new Cell[2];
276 >                        rs[h & 1] = r;
277 >                        cells = rs;
278 >                        init = true;
279                      }
280                  } finally {
281 <                    mutex.set(0);
281 >                    busy = 0;
282                  }
283 <                collisions = 0;
283 >                if (init)
284 >                    break;
285 >            }
286 >            else {                              // Lost initialization race
287 >                long b = base;                  // Fall back on using base
288 >                if (UNSAFE.compareAndSwapLong(this, baseOffset, b, b + x))
289 >                    break;
290              }
247            else if (a.compareAndSet(v = a.get(), v + x))
248                break;
249            else
250                collisions = 1;
251            h ^= h << 13;                            // Rehash
252            h ^= h >>> 17;
253            h ^= h << 5;
291          }
292 <        hc.code = h;
292 >        hc.code = h;                            // Record index for next time
293      }
294  
295      /**
296 <     * Returns an estimate of the current sum.  The result is
297 <     * calculated by summing multiple variables, so may not be
298 <     * accurate if updates occur concurrently with this method.
296 >     * Equivalent to {@code add(1)}.
297 >     */
298 >    public void increment() {
299 >        add(1L);
300 >    }
301 >
302 >    /**
303 >     * Equivalent to {@code add(-1)}.
304 >     */
305 >    public void decrement() {
306 >        add(-1L);
307 >    }
308 >
309 >    /**
310 >     * Returns the current sum.  The result is only guaranteed to be
311 >     * accurate in the absence of concurrent updates. Otherwise, it
312 >     * may fail to reflect one or more updates occuring while
313 >     * calculating the result.
314       *
315 <     * @return the estimated sum
315 >     * @return the sum
316       */
317      public long sum() {
318 <        long sum = 0L;
319 <        Adder[] as = adders;
318 >        Cell[] as = cells;
319 >        long sum = base;
320          if (as != null) {
321              int n = as.length;
322              for (int i = 0; i < n; ++i) {
323 <                Adder a = as[i];
323 >                Cell a = as[i];
324                  if (a != null)
325 <                    sum += a.get();
325 >                    sum += a.value;
326              }
327          }
328          return sum;
329      }
330  
331      /**
332 <     * Resets each of the variables to zero. This is effective in
333 <     * fully resetting the sum only if there are no concurrent
334 <     * updates.
332 >     * Resets variables maintaining the sum to zero.  This is
333 >     * effective in setting the sum to zero only if there are no
334 >     * concurrent updates.
335       */
336      public void reset() {
337 <        Adder[] as = adders;
337 >        Cell[] as = cells;
338 >        base = 0L;
339          if (as != null) {
340              int n = as.length;
341              for (int i = 0; i < n; ++i) {
342 <                Adder a = as[i];
342 >                Cell a = as[i];
343                  if (a != null)
344 <                    a.set(0L);
344 >                    a.value = 0L;
345              }
346          }
347      }
348  
349      /**
350 <     * Equivalent to {@code add(1)}.
351 <     */
352 <    public void increment() {
353 <        add(1L);
354 <    }
355 <
303 <    /**
304 <     * Equivalent to {@code add(-1)}.
305 <     */
306 <    public void decrement() {
307 <        add(-1L);
308 <    }
309 <
310 <    /**
311 <     * Equivalent to {@link #sum} followed by {@link #reset}.
350 >     * Equivalent in effect to {@link #sum} followed by {@link
351 >     * #reset}. This method may apply for example during quiescent
352 >     * points between multithreaded computations.  If there are
353 >     * updates concurrent with this method, the returned value is
354 >     * <em>not</em> guaranteed to be the final sum occurring before
355 >     * the reset.
356       *
357 <     * @return the estimated sum
357 >     * @return the sum
358       */
359 <    public long sumAndReset() {
360 <        long sum = 0L;
361 <        Adder[] as = adders;
359 >    public long sumThenReset() {
360 >        Cell[] as = cells;
361 >        long sum = base;
362 >        base = 0L;
363          if (as != null) {
364              int n = as.length;
365              for (int i = 0; i < n; ++i) {
366 <                Adder a = as[i];
366 >                Cell a = as[i];
367                  if (a != null) {
368 <                    sum += a.get();
369 <                    a.set(0L);
368 >                    sum += a.value;
369 >                    a.value = 0L;
370                  }
371              }
372          }
# Line 337 | Line 382 | public class StripedAdder implements Ser
382      private void readObject(ObjectInputStream s)
383          throws IOException, ClassNotFoundException {
384          s.defaultReadObject();
385 <        mutex.set(0);
386 <        add(s.readLong());
385 >        busy = 0;
386 >        cells = null;
387 >        base = s.readLong();
388 >    }
389 >
390 >    // Unsafe mechanics
391 >    private static final sun.misc.Unsafe UNSAFE;
392 >    private static final long baseOffset;
393 >    private static final long busyOffset;
394 >    private static final long valueOffset;
395 >    static {
396 >        try {
397 >            UNSAFE = getUnsafe();
398 >            Class<?> sk = StripedAdder.class;
399 >            baseOffset = UNSAFE.objectFieldOffset
400 >                (sk.getDeclaredField("base"));
401 >            busyOffset = UNSAFE.objectFieldOffset
402 >                (sk.getDeclaredField("busy"));
403 >            Class<?> ak = Cell.class;
404 >            valueOffset = UNSAFE.objectFieldOffset
405 >                (ak.getDeclaredField("value"));
406 >        } catch (Exception e) {
407 >            throw new Error(e);
408 >        }
409 >    }
410 >
411 >    /**
412 >     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
413 >     * Replace with a simple call to Unsafe.getUnsafe when integrating
414 >     * into a jdk.
415 >     *
416 >     * @return a sun.misc.Unsafe
417 >     */
418 >    private static sun.misc.Unsafe getUnsafe() {
419 >        try {
420 >            return sun.misc.Unsafe.getUnsafe();
421 >        } catch (SecurityException se) {
422 >            try {
423 >                return java.security.AccessController.doPrivileged
424 >                    (new java.security
425 >                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
426 >                        public sun.misc.Unsafe run() throws Exception {
427 >                            java.lang.reflect.Field f = sun.misc
428 >                                .Unsafe.class.getDeclaredField("theUnsafe");
429 >                            f.setAccessible(true);
430 >                            return (sun.misc.Unsafe) f.get(null);
431 >                        }});
432 >            } catch (java.security.PrivilegedActionException e) {
433 >                throw new RuntimeException("Could not initialize intrinsics",
434 >                                           e.getCause());
435 >            }
436 >        }
437      }
438  
439   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines