ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/LongAdder.java
(Generate patch)

Comparing jsr166/src/jsr166e/LongAdder.java (file contents):
Revision 1.4 by jsr166, Mon Aug 1 18:54:15 2011 UTC vs.
Revision 1.5 by dl, Tue Aug 2 18:04:12 2011 UTC

# Line 5 | Line 5
5   */
6  
7   package jsr166e;
8 import java.util.Random;
9 import java.util.concurrent.atomic.AtomicInteger;
8   import java.util.concurrent.atomic.AtomicLong;
9   import java.io.IOException;
10   import java.io.Serializable;
# Line 14 | Line 12 | import java.io.ObjectInputStream;
12   import java.io.ObjectOutputStream;
13  
14   /**
15 < * One or more variables that together maintain an initially zero sum.
16 < * When updates (method {@link #add}) are contended across threads,
17 < * the set of variables may grow dynamically to reduce contention.
15 > * One or more variables that together maintain an initially zero
16 > * {@code long} sum.  When updates (method {@link #add}) are contended
17 > * across threads, the set of variables may grow dynamically to reduce
18 > * contention. Method {@link #sum} (or, equivalently, {@link
19 > * #longValue}) returns the current total combined across the
20 > * variables maintaining the sum.
21   *
22   * <p> This class is usually preferable to {@link AtomicLong} when
23   * multiple threads update a common sum that is used for purposes such
# Line 26 | Line 27 | import java.io.ObjectOutputStream;
27   * this class is significantly higher, at the expense of higher space
28   * consumption.
29   *
30 < * <p> Method {@link #sum} returns the current combined total across
31 < * the variables maintaining the sum.  This value is <em>NOT</em> an
32 < * atomic snapshot: Invocation of {@code sum} in the absence of
33 < * concurrent updates returns an accurate result, but concurrent
33 < * updates that occur while the sum is being calculated might not be
34 < * incorporated.  The sum may also be {@code reset} to zero, as
35 < * an alternative to creating a new adder.  However, method {@link
36 < * #reset} is intrinsically racy, so should only be used when it is
37 < * known that no threads are concurrently updating the sum.
30 > * <p>This class extends {@link Number}, but does <em>not</em> define
31 > * methods such as {@code hashCode} and {@code compareTo} because
32 > * instances are expected to be mutated, and so are not useful as
33 > * collection keys.
34   *
35   * <p><em>jsr166e note: This class is targeted to be placed in
36   * java.util.concurrent.atomic<em>
37   *
38   * @author Doug Lea
39   */
40 < public class LongAdder implements Serializable {
40 > public class LongAdder extends Striped64 implements Serializable {
41      private static final long serialVersionUID = 7249069246863182397L;
42  
47    /*
48     * A LongAdder maintains a lazily-initialized table of atomically
49     * updated variables, plus an extra "base" field. The table size
50     * is a power of two. Indexing uses masked per-thread hash codes
51     *
52     * Table entries are of class Cell; a variant of AtomicLong padded
53     * to reduce cache contention on most processors. Padding is
54     * overkill for most Atomics because they are usually irregularly
55     * scattered in memory and thus don't interfere much with each
56     * other. But Atomic objects residing in arrays will tend to be
57     * placed adjacent to each other, and so will most often share
58     * cache lines (with a huge negative performance impact) without
59     * this precaution.
60     *
61     * In part because Cells are relatively large, we avoid creating
62     * them until they are needed.  When there is no contention, all
63     * updates are made to the base field.  Upon first contention (a
64     * failed CAS on base update), the table is initialized to size 2.
65     * The table size is doubled upon further contention until
66     * reaching the nearest power of two greater than or equal to the
67     * number of CPUS.
68     *
69     * Per-thread hash codes are initialized to random values.
70     * Contention and/or table collisions are indicated by failed
71     * CASes when performing an add operation (see method
72     * retryAdd). Upon a collision, if the table size is less than the
73     * capacity, it is doubled in size unless some other thread holds
74     * the lock. If a hashed slot is empty, and lock is available, a
75     * new Cell is created. Otherwise, if the slot exists, a CAS is
76     * tried.  Retries proceed by "double hashing", using a secondary
77     * hash (Marsaglia XorShift) to try to find a free slot.
78     *
79     * The table size is capped because, when there are more threads
80     * than CPUs, supposing that each thread were bound to a CPU,
81     * there would exist a perfect hash function mapping threads to
82     * slots that eliminates collisions. When we reach capacity, we
83     * search for this mapping by randomly varying the hash codes of
84     * colliding threads.  Because search is random, and collisions
85     * only become known via CAS failures, convergence can be slow,
86     * and because threads are typically not bound to CPUS forever,
87     * may not occur at all. However, despite these limitations,
88     * observed contention rates are typically low in these cases.
89     *
90     * A single spinlock is used for initializing and resizing the
91     * table, as well as populating slots with new Cells.  There is no
92     * need for a blocking lock: Upon lock contention, threads try
93     * other slots (or the base) rather than blocking.  During these
94     * retries, there is increased contention and reduced locality,
95     * which is still better than alternatives.
96     *
97     * It is possible for a Cell to become unused when threads that
98     * once hashed to it terminate, as well as in the case where
99     * doubling the table causes no thread to hash to it under
100     * expanded mask.  We do not try to detect or remove such cells,
101     * under the assumption that for long-running adders, observed
102     * contention levels will recur, so the cells will eventually be
103     * needed again; and for short-lived ones, it does not matter.
104     *
105     * JVM intrinsics note: It would be possible to use a release-only
106     * form of CAS here, if it were provided.
107     */
108
109    /**
110     * Padded variant of AtomicLong.  The value field is placed
111     * between pads, hoping that the JVM doesn't reorder them.
112     * Updates are via inlined CAS in methods add and retryAdd.
113     */
114    static final class Cell {
115        volatile long p0, p1, p2, p3, p4, p5, p6;
116        volatile long value;
117        volatile long q0, q1, q2, q3, q4, q5, q6;
118        Cell(long x) { value = x; }
119    }
120
121    /**
122     * Holder for the thread-local hash code. The code is initially
123     * random, but may be set to a different value upon collisions.
124     */
125    static final class HashCode {
126        static final Random rng = new Random();
127        int code;
128        HashCode() {
129            int h = rng.nextInt(); // Avoid zero to allow xorShift rehash
130            code = (h == 0) ? 1 : h;
131        }
132    }
133
134    /**
135     * The corresponding ThreadLocal class
136     */
137    static final class ThreadHashCode extends ThreadLocal<HashCode> {
138        public HashCode initialValue() { return new HashCode(); }
139    }
140
43      /**
44 <     * Static per-thread hash codes. Shared across all LongAdders
143 <     * to reduce ThreadLocal pollution and because adjustments due to
144 <     * collisions in one table are likely to be appropriate for
145 <     * others.
44 >     * Version of plus for use in retryUpdate
45       */
46 <    static final ThreadHashCode threadHashCode = new ThreadHashCode();
148 <
149 <    /** Number of CPUS, to place bound on table size */
150 <    private static final int NCPU = Runtime.getRuntime().availableProcessors();
151 <
152 <    /**
153 <     * Table of cells. When non-null, size is a power of 2.
154 <     */
155 <    private transient volatile Cell[] cells;
156 <
157 <    /**
158 <     * Base sum, used mainly when there is no contention, but also as
159 <     * a fallback during table initialization races. Updated via CAS.
160 <     */
161 <    private transient volatile long base;
162 <
163 <    /**
164 <     * Spinlock (locked via CAS) used when resizing and/or creating Cells.
165 <     */
166 <    private transient volatile int busy;
46 >    final long fn(long v, long x) { return v + x; }
47  
48      /**
49       * Creates a new adder with initial sum of zero.
# Line 177 | Line 57 | public class LongAdder implements Serial
57       * @param x the value to add
58       */
59      public void add(long x) {
60 <        Cell[] as; long v; HashCode hc; Cell a; int n;
61 <        if ((as = cells) != null ||
182 <            !UNSAFE.compareAndSwapLong(this, baseOffset, v = base, v + x)) {
60 >        Cell[] as; long b, v; HashCode hc; Cell a; int n;
61 >        if ((as = cells) != null || !casBase(b = base, b + x)) {
62              boolean uncontended = true;
63              int h = (hc = threadHashCode.get()).code;
64              if (as == null || (n = as.length) < 1 ||
65                  (a = as[(n - 1) & h]) == null ||
66 <                !(uncontended = UNSAFE.compareAndSwapLong(a, valueOffset,
67 <                                                          v = a.value, v + x)))
189 <                retryAdd(x, hc, uncontended);
190 <        }
191 <    }
192 <
193 <    /**
194 <     * Handle cases of add involving initialization, resizing,
195 <     * creating new Cells, and/or contention. See above for
196 <     * explanation. This method suffers the usual non-modularity
197 <     * problems of optimistic retry code, relying on rechecked sets of
198 <     * reads.
199 <     *
200 <     * @param x the value to add
201 <     * @param hc the hash code holder
202 <     * @param wasUncontended false if CAS failed before call
203 <     */
204 <    private void retryAdd(long x, HashCode hc, boolean wasUncontended) {
205 <        int h = hc.code;
206 <        boolean collide = false;                // True if last slot nonempty
207 <        for (;;) {
208 <            Cell[] as; Cell a; int n; long v;
209 <            if ((as = cells) != null && (n = as.length) > 0) {
210 <                if ((a = as[(n - 1) & h]) == null) {
211 <                    if (busy == 0) {            // Try to attach new Cell
212 <                        Cell r = new Cell(x);   // Optimistically create
213 <                        if (busy == 0 &&
214 <                            UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
215 <                            boolean created = false;
216 <                            try {               // Recheck under lock
217 <                                Cell[] rs; int m, j;
218 <                                if ((rs = cells) != null &&
219 <                                    (m = rs.length) > 0 &&
220 <                                    rs[j = (m - 1) & h] == null) {
221 <                                    rs[j] = r;
222 <                                    created = true;
223 <                                }
224 <                            } finally {
225 <                                busy = 0;
226 <                            }
227 <                            if (created)
228 <                                break;
229 <                            continue;           // Slot is now non-empty
230 <                        }
231 <                    }
232 <                    collide = false;
233 <                }
234 <                else if (!wasUncontended)       // CAS already known to fail
235 <                    wasUncontended = true;      // Continue after rehash
236 <                else if (UNSAFE.compareAndSwapLong(a, valueOffset,
237 <                                                   v = a.value, v + x))
238 <                    break;
239 <                else if (n >= NCPU || cells != as)
240 <                    collide = false;            // At max size or stale
241 <                else if (!collide)
242 <                    collide = true;
243 <                else if (busy == 0 &&           // Try to expand table
244 <                         UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
245 <                    try {
246 <                        if (cells == as) {
247 <                            Cell[] rs = new Cell[n << 1];
248 <                            for (int i = 0; i < n; ++i)
249 <                                rs[i] = as[i];
250 <                            cells = rs;
251 <                        }
252 <                    } finally {
253 <                        busy = 0;
254 <                    }
255 <                    collide = false;
256 <                    continue;                   // Retry with expanded table
257 <                }
258 <                h ^= h << 13;                   // Rehash
259 <                h ^= h >>> 17;
260 <                h ^= h << 5;
261 <            }
262 <            else if (busy == 0 && cells == as &&
263 <                     UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
264 <                boolean init = false;
265 <                try {                           // Initialize table
266 <                    if (cells == as) {
267 <                        Cell[] rs = new Cell[2];
268 <                        rs[h & 1] = new Cell(x);
269 <                        cells = rs;
270 <                        init = true;
271 <                    }
272 <                } finally {
273 <                    busy = 0;
274 <                }
275 <                if (init)
276 <                    break;
277 <            }
278 <            else if (UNSAFE.compareAndSwapLong(this, baseOffset,
279 <                                               v = base, v + x))
280 <                break;                          // Fall back on using base
66 >                !(uncontended = a.cas(v = a.value, v + x)))
67 >                retryUpdate(x, hc, uncontended);
68          }
282        hc.code = h;                            // Record index for next time
69      }
70  
71      /**
# Line 297 | Line 83 | public class LongAdder implements Serial
83      }
84  
85      /**
86 <     * Returns the current sum.  The result is only guaranteed to be
87 <     * accurate in the absence of concurrent updates. Otherwise, it
88 <     * may fail to reflect one or more updates occuring while
89 <     * calculating the result.
86 >     * Returns the current sum.  The returned value is <em>NOT</em> an
87 >     * atomic snapshot: Invocation in the absence of concurrent
88 >     * updates returns an accurate result, but concurrent updates that
89 >     * occur while the sum is being calculated might not be
90 >     * incorporated.
91       *
92       * @return the sum
93       */
# Line 319 | Line 106 | public class LongAdder implements Serial
106      }
107  
108      /**
109 <     * Resets variables maintaining the sum to zero.  This is
110 <     * effective in setting the sum to zero only if there are no
111 <     * concurrent updates.
109 >     * Resets variables maintaining the sum to zero.  This method may
110 >     * be a useful alternative to creating a new adder, but is only
111 >     * effective if there are no concurrent updates.  Because this
112 >     * method is intrinsically racy, it should only be used when it is
113 >     * known that no threads are concurrently updating.
114       */
115      public void reset() {
116 <        Cell[] as = cells;
328 <        base = 0L;
329 <        if (as != null) {
330 <            int n = as.length;
331 <            for (int i = 0; i < n; ++i) {
332 <                Cell a = as[i];
333 <                if (a != null)
334 <                    a.value = 0L;
335 <            }
336 <        }
116 >        internalReset(0L);
117      }
118  
119      /**
# Line 341 | Line 121 | public class LongAdder implements Serial
121       * #reset}. This method may apply for example during quiescent
122       * points between multithreaded computations.  If there are
123       * updates concurrent with this method, the returned value is
124 <     * <em>not</em> guaranteed to be the final sum occurring before
124 >     * <em>not</em> guaranteed to be the final value occurring before
125       * the reset.
126       *
127       * @return the sum
# Line 355 | Line 135 | public class LongAdder implements Serial
135              for (int i = 0; i < n; ++i) {
136                  Cell a = as[i];
137                  if (a != null) {
138 <                    sum += a.value;
138 >                    long v = a.value;
139                      a.value = 0L;
140 +                    sum += v;
141                  }
142              }
143          }
144          return sum;
145      }
146  
147 +    /**
148 +     * Returns the String representation of the {@link #sum}.
149 +     * @return the String representation of the {@link #sum}.
150 +     */
151 +    public String toString() {
152 +        return Long.toString(sum());
153 +    }
154 +
155 +    /**
156 +     * Equivalent to {@link #sum}.
157 +     *
158 +     * @return the sum
159 +     */
160 +    public long longValue() {
161 +        return sum();
162 +    }
163 +
164 +    /**
165 +     * Returns the {@link #sum} as an {@code int} after a narrowing
166 +     * primitive conversion.
167 +     */
168 +    public int intValue() {
169 +        return (int)sum();
170 +    }
171 +
172 +    /**
173 +     * Returns the {@link #sum} as a {@code float}
174 +     * after a widening primitive conversion.
175 +     */
176 +    public float floatValue() {
177 +        return (float)sum();
178 +    }
179 +
180 +    /**
181 +     * Returns the {@link #sum} as a {@code double} after a widening
182 +     * primitive conversion.
183 +     */
184 +    public double doubleValue() {
185 +        return (double)sum();
186 +    }
187 +
188      private void writeObject(java.io.ObjectOutputStream s)
189          throws java.io.IOException {
190          s.defaultWriteObject();
# Line 377 | Line 199 | public class LongAdder implements Serial
199          base = s.readLong();
200      }
201  
380    // Unsafe mechanics
381    private static final sun.misc.Unsafe UNSAFE;
382    private static final long baseOffset;
383    private static final long busyOffset;
384    private static final long valueOffset;
385    static {
386        try {
387            UNSAFE = getUnsafe();
388            Class<?> sk = LongAdder.class;
389            baseOffset = UNSAFE.objectFieldOffset
390                (sk.getDeclaredField("base"));
391            busyOffset = UNSAFE.objectFieldOffset
392                (sk.getDeclaredField("busy"));
393            Class<?> ak = Cell.class;
394            valueOffset = UNSAFE.objectFieldOffset
395                (ak.getDeclaredField("value"));
396        } catch (Exception e) {
397            throw new Error(e);
398        }
399    }
400
401    /**
402     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
403     * Replace with a simple call to Unsafe.getUnsafe when integrating
404     * into a jdk.
405     *
406     * @return a sun.misc.Unsafe
407     */
408    private static sun.misc.Unsafe getUnsafe() {
409        try {
410            return sun.misc.Unsafe.getUnsafe();
411        } catch (SecurityException se) {
412            try {
413                return java.security.AccessController.doPrivileged
414                    (new java.security
415                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
416                        public sun.misc.Unsafe run() throws Exception {
417                            java.lang.reflect.Field f = sun.misc
418                                .Unsafe.class.getDeclaredField("theUnsafe");
419                            f.setAccessible(true);
420                            return (sun.misc.Unsafe) f.get(null);
421                        }});
422            } catch (java.security.PrivilegedActionException e) {
423                throw new RuntimeException("Could not initialize intrinsics",
424                                           e.getCause());
425            }
426        }
427    }
428
202   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines