ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
(Generate patch)

Comparing jsr166/src/jsr166e/StripedAdder.java (file contents):
Revision 1.6 by dl, Tue Jul 26 17:16:36 2011 UTC vs.
Revision 1.10 by dl, Fri Jul 29 13:50:54 2011 UTC

# Line 5 | Line 5
5   */
6  
7   package jsr166e;
8 import java.util.Arrays;
8   import java.util.Random;
9   import java.util.concurrent.atomic.AtomicInteger;
10   import java.util.concurrent.atomic.AtomicLong;
# Line 27 | Line 26 | import java.io.ObjectOutputStream;
26   * update a common sum that is used for purposes such as collecting
27   * statistics. In this case, performance may be significantly faster
28   * than using a shared {@link AtomicLong}, at the expense of using
29 < * much more space.  On the other hand, if it is known that only one
30 < * thread can ever update the sum, performance may be significantly
31 < * slower than just updating a local variable.
29 > * more space.  On the other hand, if it is known that only one thread
30 > * can ever update the sum, performance may be significantly slower
31 > * than just updating a local variable.
32   *
33   * <p>A StripedAdder may optionally be constructed with a given
34   * expected contention level; i.e., the number of threads that are
# Line 46 | Line 45 | public class StripedAdder implements Ser
45       * A StripedAdder maintains a table of Atomic long variables. The
46       * table is indexed by per-thread hash codes.
47       *
48 <     * By default, the table is lazily initialized, to minimize
49 <     * footprint until adders are used. On first use, the table is set
50 <     * to size DEFAULT_INITIAL_SIZE (currently 8). Table size is
51 <     * bounded by the number of CPUS (if larger than the default
52 <     * size).
48 >     * Table entries are of class Adder; a variant of AtomicLong
49 >     * padded to reduce cache contention on most processors. Padding
50 >     * is overkill for most Atomics because they are usually
51 >     * irregularly scattered in memory and thus don't interfere much
52 >     * with each other. But Atomic objects residing in arrays will
53 >     * tend to be placed adjacent to each other, and so will most
54 >     * often share cache lines (with a huge negative performance
55 >     * impact) without this precaution.
56 >     *
57 >     * Because Adders are relatively large, we avoid creating them
58 >     * until they are needed. On the other hand, we try to create them
59 >     * on any sign of contention.
60       *
61       * Per-thread hash codes are initialized to random values.
62       * Collisions are indicated by failed CASes when performing an add
# Line 62 | Line 68 | public class StripedAdder implements Ser
68       * hashing", using a secondary hash (Marsaglia XorShift) to try to
69       * find a free slot.
70       *
71 <     * The table size is capped because, when there are more threads
72 <     * than CPUs, supposing that each thread were bound to a CPU,
73 <     * there would exist a perfect hash function mapping threads to
74 <     * slots that eliminates collisions. When we reach capacity, we
75 <     * search for this mapping by randomly varying the hash codes of
76 <     * colliding threads.  Because search is random, and failures only
77 <     * become known via CAS failures, convergence will be slow, and
78 <     * because threads are typically not bound to CPUS forever, may
79 <     * not occur at all. However, despite these limitations, observed
80 <     * contention is typically low in these cases.
81 <     *
82 <     * Table entries are of class Adder; a form of AtomicLong padded
83 <     * to reduce cache contention on most processors. Padding is
84 <     * overkill for most Atomics because they are usually irregularly
79 <     * scattered in memory and thus don't interfere much with each
80 <     * other. But Atomic objects residing in arrays will tend to be
81 <     * placed adjacent to each other, and so will most often share
82 <     * cache lines without this precaution.  Adders are by default
83 <     * constructed upon first use, which further improves per-thread
84 <     * locality and helps reduce footprint.
71 >     * By default, the table is lazily initialized.  Upon first use,
72 >     * the table is set to size 1, and contains a single Adder. The
73 >     * maximum table size is bounded by nearest power of two >= the
74 >     * number of CPUS.  The table size is capped because, when there
75 >     * are more threads than CPUs, supposing that each thread were
76 >     * bound to a CPU, there would exist a perfect hash function
77 >     * mapping threads to slots that eliminates collisions. When we
78 >     * reach capacity, we search for this mapping by randomly varying
79 >     * the hash codes of colliding threads.  Because search is random,
80 >     * and failures only become known via CAS failures, convergence
81 >     * will be slow, and because threads are typically not bound to
82 >     * CPUS forever, may not occur at all. However, despite these
83 >     * limitations, observed contention is typically low in these
84 >     * cases.
85       *
86       * A single spinlock is used for resizing the table as well as
87 <     * populating slots with new Adders. Upon lock contention, threads
87 >     * populating slots with new Adders. After initialization, there
88 >     * is no need for a blocking lock: Upon lock contention, threads
89       * try other slots rather than blocking. After initialization, at
90       * least one slot exists, so retries will eventually find a
91       * candidate Adder.  During these retries, there is increased
# Line 92 | Line 93 | public class StripedAdder implements Ser
93       * alternatives.
94       */
95  
95    /**
96     * Padded version of AtomicLong
97     */
98    static final class Adder extends AtomicLong {
99        long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
100        Adder(long x) { super(x); }
101    }
102
96      private static final int NCPU = Runtime.getRuntime().availableProcessors();
97  
98      /**
99 <     * Table bounds. DEFAULT_INITIAL_SIZE is the table size set upon
100 <     * first use under default constructor, and must be a power of
101 <     * two. There is not much point in making size a lot smaller than
102 <     * that of Adders though.  CAP is the maximum allowed table size.
103 <     */
104 <    private static final int DEFAULT_INITIAL_SIZE = 8;
105 <    private static final int CAP = Math.max(NCPU, DEFAULT_INITIAL_SIZE);
99 >     * Padded variant of AtomicLong.  The value field is placed
100 >     * between pads, hoping that the JVM doesn't reorder them.
101 >     * Updates are via inlined CAS in methods add and retryAdd.
102 >     */
103 >    static final class Adder {
104 >        volatile long p0, p1, p2, p3, p4, p5, p6;
105 >        volatile long value;
106 >        volatile long q0, q1, q2, q3, q4, q5, q6;
107 >        Adder(long x) { value = x; }
108 >    }
109  
110      /**
111       * Holder for the thread-local hash code. The code is initially
# Line 119 | Line 115 | public class StripedAdder implements Ser
115          static final Random rng = new Random();
116          int code;
117          HashCode() {
118 <            int h = rng.nextInt();
119 <            code = (h == 0) ? 1 : h; // ensure nonzero
118 >            int h = rng.nextInt(); // Avoid zero, because of xorShift rehash
119 >            code = (h == 0) ? 1 : h;
120          }
121      }
122  
# Line 140 | Line 136 | public class StripedAdder implements Ser
136      static final ThreadHashCode threadHashCode = new ThreadHashCode();
137  
138      /**
139 <     * Table of adders. Size is power of two, grows to be at most CAP.
139 >     * Table of adders. When non-null, size is a power of 2.
140       */
141      private transient volatile Adder[] adders;
142  
143      /**
144 <     * Serves as a lock when resizing and/or creating Adders.  There
149 <     * is no need for a blocking lock: Except during initialization
150 <     * races, when busy, other threads try other slots. However,
151 <     * during (double-checked) initializations, we use the
152 <     * "synchronized" lock on this object.
144 >     * Spinlock (locked via CAS) used when resizing and/or creating Adders.
145       */
146 <    private final AtomicInteger mutex;
146 >    private volatile int busy;
147  
148      /**
149       * Creates a new adder with zero sum.
150       */
151      public StripedAdder() {
160        this.mutex = new AtomicInteger();
161        // remaining initialization on first call to add.
152      }
153  
154      /**
# Line 169 | Line 159 | public class StripedAdder implements Ser
159       * will concurrently update the sum.
160       */
161      public StripedAdder(int expectedContention) {
162 <        int cap = (expectedContention < CAP) ? expectedContention : CAP;
162 >        int cap = (expectedContention < NCPU) ? expectedContention : NCPU;
163          int size = 1;
164          while (size < cap)
165              size <<= 1;
# Line 177 | Line 167 | public class StripedAdder implements Ser
167          for (int i = 0; i < size; ++i)
168              as[i] = new Adder(0);
169          this.adders = as;
180        this.mutex = new AtomicInteger();
170      }
171  
172      /**
# Line 186 | Line 175 | public class StripedAdder implements Ser
175       * @param x the value to add
176       */
177      public void add(long x) {
178 <        Adder[] as; Adder a; int n; long v; // locals to hold volatile reads
178 >        Adder[] as; Adder a; int n;  // locals to hold volatile reads
179          HashCode hc = threadHashCode.get();
180          int h = hc.code;
181 <        if ((as = adders) == null || (n = as.length) < 1 ||
182 <            (a = as[(n - 1) & h]) == null ||
183 <            !a.compareAndSet(v = a.get(), v + x))
184 <            retryAdd(x, hc);
181 >        boolean contended;
182 >        if ((as = adders) != null && (n = as.length) > 0 &&
183 >            (a = as[(n - 1) & h]) != null) {
184 >            long v = a.value;
185 >            if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
186 >                return;
187 >            contended = true;
188 >        }
189 >        else
190 >            contended = false;
191 >        retryAdd(x, hc, contended);
192      }
193  
194      /**
195       * Handle cases of add involving initialization, resizing,
196       * creating new Adders, and/or contention. See above for
197 <     * explanation.
197 >     * explanation. This method suffers the usual non-modularity
198 >     * problems of optimistic retry code, relying on rechecked sets of
199 >     * reads.
200 >     *
201 >     * @param x the value to add
202 >     * @param hc the hash code holder
203 >     * @param precontended true if CAS failed before call
204       */
205 <    private void retryAdd(long x, HashCode hc) {
205 >    private void retryAdd(long x, HashCode hc, boolean precontended) {
206          int h = hc.code;
207 <        final AtomicInteger mutex = this.mutex;
206 <        int collisions = 1 - mutex.get(); // first guess: collides if not locked
207 >        boolean collide = false; // true if last slot nonempty
208          for (;;) {
209 <            Adder[] as; Adder a; long v; int k, n;
210 <            while ((as = adders) == null || (n = as.length) < 1) {
211 <                synchronized(mutex) {                // Try to initialize
212 <                    if (adders == null) {
213 <                        Adder[] rs = new Adder[DEFAULT_INITIAL_SIZE];
214 <                        rs[h & (DEFAULT_INITIAL_SIZE - 1)] = new Adder(0);
215 <                        adders = rs;
209 >            Adder[] as; Adder a; int n;
210 >            if ((as = adders) != null && (n = as.length) > 0) {
211 >                if ((a = as[(n - 1) & h]) == null) {
212 >                    if (busy == 0 &&            // Try to attach new Adder
213 >                        UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
214 >                        boolean created = false;
215 >                        try {                   // Recheck under lock
216 >                            Adder[] rs; int m, j;
217 >                            if ((rs = adders) != null && (m = rs.length) > 0 &&
218 >                                rs[j = (m - 1) & h] == null) {
219 >                                rs[j] = new Adder(x);
220 >                                created = true;
221 >                            }
222 >                        } finally {
223 >                            busy = 0;
224 >                        }
225 >                        if (created)
226 >                            break;
227 >                        continue;               // Slot is now non-empty
228                      }
229 +                    collide = false;
230                  }
231 <                collisions = 0;
232 <            }
233 <
234 <            if ((a = as[k = (n - 1) & h]) == null) { // Try to add slot
235 <                if (mutex.get() == 0 && mutex.compareAndSet(0, 1)) {
222 <                    try {
223 <                        if (adders == as && as[k] == null)
224 <                            a = as[k] = new Adder(x);
225 <                    } finally {
226 <                        mutex.set(0);
227 <                    }
228 <                    if (a != null)
231 >                else if (precontended)          // CAS already known to fail
232 >                    precontended = false;       // Continue after rehash
233 >                else {
234 >                    long v = a.value;
235 >                    if (UNSAFE.compareAndSwapLong(a, valueOffset, v, v + x))
236                          break;
237 +                    if (!collide)
238 +                        collide = true;
239 +                    else if (n >= NCPU || adders != as)
240 +                        collide = false;        // Don't expand
241 +                    else if (busy == 0 &&
242 +                             UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
243 +                        collide = false;
244 +                        try {
245 +                            if (adders == as) { // Expand table
246 +                                Adder[] rs = new Adder[n << 1];
247 +                                for (int i = 0; i < n; ++i)
248 +                                    rs[i] = as[i];
249 +                                adders = rs;
250 +                            }
251 +                        } finally {
252 +                            busy = 0;
253 +                        }
254 +                        continue;
255 +                    }
256                  }
257 <                collisions = 0;
257 >                h ^= h << 13;                   // Rehash
258 >                h ^= h >>> 17;
259 >                h ^= h << 5;
260              }
261 <            else if (collisions != 0 && n < CAP &&   // Try to expand table
262 <                     mutex.get() == 0 && mutex.compareAndSet(0, 1)) {
263 <                try {
264 <                    if (adders == as) {
265 <                        Adder[] rs = new Adder[n << 1];
266 <                        for (int i = 0; i < n; ++i)
267 <                            rs[i] = as[i];
268 <                        adders = rs;
261 >            else if (adders == as) {            // Try to default-initialize
262 >                Adder[] rs = new Adder[1];
263 >                rs[0] = new Adder(x);
264 >                boolean init = false;
265 >                while (adders == as) {
266 >                    if (UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
267 >                        try {
268 >                            if (adders == as) {
269 >                                adders = rs;
270 >                                init = true;
271 >                            }
272 >                        } finally {
273 >                            busy = 0;
274 >                        }
275 >                        break;
276                      }
277 <                } finally {
278 <                    mutex.set(0);
277 >                    if (adders != as)
278 >                        break;
279 >                    Thread.yield();              // Back off
280                  }
281 <                collisions = 0;
281 >                if (init)
282 >                    break;
283              }
247            else if (a.compareAndSet(v = a.get(), v + x))
248                break;
249            else
250                collisions = 1;
251            h ^= h << 13;                            // Rehash
252            h ^= h >>> 17;
253            h ^= h << 5;
284          }
285 <        hc.code = h;
285 >        hc.code = h;                            // Record index for next time
286 >    }
287 >
288 >    /**
289 >     * Equivalent to {@code add(1)}.
290 >     */
291 >    public void increment() {
292 >        add(1L);
293 >    }
294 >
295 >    /**
296 >     * Equivalent to {@code add(-1)}.
297 >     */
298 >    public void decrement() {
299 >        add(-1L);
300      }
301  
302      /**
# Line 270 | Line 314 | public class StripedAdder implements Ser
314              for (int i = 0; i < n; ++i) {
315                  Adder a = as[i];
316                  if (a != null)
317 <                    sum += a.get();
317 >                    sum += a.value;
318              }
319          }
320          return sum;
321      }
322  
323      /**
324 <     * Resets each of the variables to zero. This is effective in
325 <     * fully resetting the sum only if there are no concurrent
326 <     * updates.
283 <     */
284 <    public void reset() {
285 <        Adder[] as = adders;
286 <        if (as != null) {
287 <            int n = as.length;
288 <            for (int i = 0; i < n; ++i) {
289 <                Adder a = as[i];
290 <                if (a != null)
291 <                    a.set(0L);
292 <            }
293 <        }
294 <    }
295 <
296 <    /**
297 <     * Equivalent to {@code add(1)}.
298 <     */
299 <    public void increment() {
300 <        add(1L);
301 <    }
302 <
303 <    /**
304 <     * Equivalent to {@code add(-1)}.
305 <     */
306 <    public void decrement() {
307 <        add(-1L);
308 <    }
309 <
310 <    /**
311 <     * Equivalent to {@link #sum} followed by {@link #reset}.
324 >     * Resets each of the variables to zero, returning the estimated
325 >     * previous sum. This is effective in fully resetting the sum only
326 >     * if there are no concurrent updates.
327       *
328 <     * @return the estimated sum
328 >     * @return the estimated previous sum
329       */
330 <    public long sumAndReset() {
330 >    public long reset() {
331          long sum = 0L;
332          Adder[] as = adders;
333          if (as != null) {
# Line 320 | Line 335 | public class StripedAdder implements Ser
335              for (int i = 0; i < n; ++i) {
336                  Adder a = as[i];
337                  if (a != null) {
338 <                    sum += a.get();
339 <                    a.set(0L);
338 >                    sum += a.value;
339 >                    a.value = 0L;
340                  }
341              }
342          }
# Line 337 | Line 352 | public class StripedAdder implements Ser
352      private void readObject(ObjectInputStream s)
353          throws IOException, ClassNotFoundException {
354          s.defaultReadObject();
355 <        mutex.set(0);
355 >        busy = 0;
356          add(s.readLong());
357      }
358  
359 +    // Unsafe mechanics
360 +    private static final sun.misc.Unsafe UNSAFE;
361 +    private static final long busyOffset;
362 +    private static final long valueOffset;
363 +    static {
364 +        try {
365 +            UNSAFE = getUnsafe();
366 +            Class<?> sk = StripedAdder.class;
367 +            busyOffset = UNSAFE.objectFieldOffset
368 +                (sk.getDeclaredField("busy"));
369 +            Class<?> ak = Adder.class;
370 +            valueOffset = UNSAFE.objectFieldOffset
371 +                (ak.getDeclaredField("value"));
372 +        } catch (Exception e) {
373 +            throw new Error(e);
374 +        }
375 +    }
376 +
377 +    /**
378 +     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
379 +     * Replace with a simple call to Unsafe.getUnsafe when integrating
380 +     * into a jdk.
381 +     *
382 +     * @return a sun.misc.Unsafe
383 +     */
384 +    private static sun.misc.Unsafe getUnsafe() {
385 +        try {
386 +            return sun.misc.Unsafe.getUnsafe();
387 +        } catch (SecurityException se) {
388 +            try {
389 +                return java.security.AccessController.doPrivileged
390 +                    (new java.security
391 +                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
392 +                        public sun.misc.Unsafe run() throws Exception {
393 +                            java.lang.reflect.Field f = sun.misc
394 +                                .Unsafe.class.getDeclaredField("theUnsafe");
395 +                            f.setAccessible(true);
396 +                            return (sun.misc.Unsafe) f.get(null);
397 +                        }});
398 +            } catch (java.security.PrivilegedActionException e) {
399 +                throw new RuntimeException("Could not initialize intrinsics",
400 +                                           e.getCause());
401 +            }
402 +        }
403 +    }
404 +
405   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines