ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/LongAdder.java
(Generate patch)

Comparing jsr166/src/jsr166e/LongAdder.java (file contents):
Revision 1.1 by dl, Sun Jul 31 14:20:01 2011 UTC vs.
Revision 1.14 by jsr166, Mon Jan 14 02:02:41 2013 UTC

# Line 5 | Line 5
5   */
6  
7   package jsr166e;
8 import java.util.Random;
9 import java.util.concurrent.atomic.AtomicInteger;
8   import java.util.concurrent.atomic.AtomicLong;
11 import java.io.IOException;
9   import java.io.Serializable;
13 import java.io.ObjectInputStream;
14 import java.io.ObjectOutputStream;
10  
11   /**
12 < * One or more variables that together maintain an initially zero sum.
13 < * When updates (method {@link #add}) are contended across threads,
14 < * the set of variables may grow dynamically to reduce contention.
12 > * One or more variables that together maintain an initially zero
13 > * {@code long} sum.  When updates (method {@link #add}) are contended
14 > * across threads, the set of variables may grow dynamically to reduce
15 > * contention. Method {@link #sum} (or, equivalently, {@link
16 > * #longValue}) returns the current total combined across the
17 > * variables maintaining the sum.
18   *
19 < * <p> This class is usually preferable to {@link AtomicLong} when
19 > * <p>This class is usually preferable to {@link AtomicLong} when
20   * multiple threads update a common sum that is used for purposes such
21   * as collecting statistics, not for fine-grained synchronization
22   * control.  Under low update contention, the two classes have similar
# Line 26 | Line 24 | import java.io.ObjectOutputStream;
24   * this class is significantly higher, at the expense of higher space
25   * consumption.
26   *
27 < * <p> Method {@link #sum} returns the current combined total across
28 < * the variables maintaining the sum.  This value is <em>NOT</em> an
29 < * atomic snapshot: Invocation of <code>sum</code> in the absence of
30 < * concurrent updates returns an accurate result, but concurrent
33 < * updates that occur while the sum is being calculated might not be
34 < * incorporated.  The sum may also be <code>reset</code> to zero, as
35 < * an alternative to creating a new adder.  However, method {@link
36 < * #reset} is intrinsically racy, so should only be used when it is
37 < * known that no threads are concurrently updating the sum.
27 > * <p>This class extends {@link Number}, but does <em>not</em> define
28 > * methods such as {@code equals}, {@code hashCode} and {@code
29 > * compareTo} because instances are expected to be mutated, and so are
30 > * not useful as collection keys.
31   *
32   * <p><em>jsr166e note: This class is targeted to be placed in
33 < * java.util.concurrent.atomic<em>
33 > * java.util.concurrent.atomic.</em>
34   *
35 + * @since 1.8
36   * @author Doug Lea
37   */
38 < public class LongAdder implements Serializable {
38 > public class LongAdder extends Striped64 implements Serializable {
39      private static final long serialVersionUID = 7249069246863182397L;
40  
47    /*
48     * A LongAdder maintains a lazily-initialized table of atomically
49     * updated variables, plus an extra "base" field. The table size
50     * is a power of two. Indexing uses masked per-thread hash codes
51     *
52     * Table entries are of class Cell; a variant of AtomicLong padded
53     * to reduce cache contention on most processors. Padding is
54     * overkill for most Atomics because they are usually irregularly
55     * scattered in memory and thus don't interfere much with each
56     * other. But Atomic objects residing in arrays will tend to be
57     * placed adjacent to each other, and so will most often share
58     * cache lines (with a huge negative performance impact) without
59     * this precaution.
60     *
61     * In part because Cells are relatively large, we avoid creating
62     * them until they are needed.  When there is no contention, all
63     * updates are made to the base field.  Upon first contention (a
64     * failed CAS on base update), the table is initialized to size 2.
65     * The table size is doubled upon further contention until
66     * reaching the nearest power of two greater than or equal to the
67     * number of CPUS.
68     *
69     * Per-thread hash codes are initialized to random values.
70     * Contention and/or table collisions are indicated by failed
71     * CASes when performing an add operation (see method
72     * retryAdd). Upon a collision, if the table size is less than the
73     * capacity, it is doubled in size unless some other thread holds
74     * the lock. If a hashed slot is empty, and lock is available, a
75     * new Cell is created. Otherwise, if the slot exists, a CAS is
76     * tried.  Retries proceed by "double hashing", using a secondary
77     * hash (Marsaglia XorShift) to try to find a free slot.
78     *
79     * The table size is capped because, when there are more threads
80     * than CPUs, supposing that each thread were bound to a CPU,
81     * there would exist a perfect hash function mapping threads to
82     * slots that eliminates collisions. When we reach capacity, we
83     * search for this mapping by randomly varying the hash codes of
84     * colliding threads.  Because search is random, and collisions
85     * only become known via CAS failures, convergence can be slow,
86     * and because threads are typically not bound to CPUS forever,
87     * may not occur at all. However, despite these limitations,
88     * observed contention rates are typically low in these cases.
89     *
90     * A single spinlock is used for initializing and resizing the
91     * table, as well as populating slots with new Cells.  There is no
92     * need for a blocking lock: Upon lock contention, threads try
93     * other slots (or the base) rather than blocking.  During these
94     * retries, there is increased contention and reduced locality,
95     * which is still better than alternatives.
96     *
97     * It is possible for a Cell to become unused when threads that
98     * once hashed to it terminate, as well as in the case where
99     * doubling the table causes no thread to hash to it under
100     * expanded mask.  We do not try to detect or remove such cells,
101     * under the assumption that for long-running adders, observed
102     * contention levels will recur, so the cells will eventually be
103     * needed again; and for short-lived ones, it does not matter.
104     *
105     * JVM intrinsics note: It would be possible to use a release-only
106     * form of CAS here, if it were provided.
107     */
108
41      /**
42 <     * Padded variant of AtomicLong.  The value field is placed
111 <     * between pads, hoping that the JVM doesn't reorder them.
112 <     * Updates are via inlined CAS in methods add and retryAdd.
42 >     * Version of plus for use in retryUpdate
43       */
44 <    static final class Cell {
115 <        volatile long p0, p1, p2, p3, p4, p5, p6;
116 <        volatile long value;
117 <        volatile long q0, q1, q2, q3, q4, q5, q6;
118 <        Cell(long x) { value = x; }
119 <    }
120 <
121 <    /**
122 <     * Holder for the thread-local hash code. The code is initially
123 <     * random, but may be set to a different value upon collisions.
124 <     */
125 <    static final class HashCode {
126 <        static final Random rng = new Random();
127 <        int code;
128 <        HashCode() {
129 <            int h = rng.nextInt(); // Avoid zero to allow xorShift rehash
130 <            code = (h == 0) ? 1 : h;
131 <        }
132 <    }
133 <    
134 <    /**
135 <     * The corresponding ThreadLocal class
136 <     */
137 <    static final class ThreadHashCode extends ThreadLocal<HashCode> {
138 <        public HashCode initialValue() { return new HashCode(); }
139 <    }
140 <
141 <    /**
142 <     * Static per-thread hash codes. Shared across all LongAdders
143 <     * to reduce ThreadLocal pollution and because adjustments due to
144 <     * collisions in one table are likely to be appropriate for
145 <     * others.
146 <     */
147 <    static final ThreadHashCode threadHashCode = new ThreadHashCode();
148 <
149 <    /** Nomber of CPUS, to place bound on table size */
150 <    private static final int NCPU = Runtime.getRuntime().availableProcessors();
151 <
152 <    /**
153 <     * Table of cells. When non-null, size is a power of 2.
154 <     */
155 <    private transient volatile Cell[] cells;
156 <
157 <    /**
158 <     * Base sum, used mainly when there is no contention, but also as
159 <     * a fallback during table initializion races. Updated via CAS.
160 <     */
161 <    private transient volatile long base;
162 <
163 <    /**
164 <     * Spinlock (locked via CAS) used when resizing and/or creating Cells.
165 <     */
166 <    private transient volatile int busy;
44 >    final long fn(long v, long x) { return v + x; }
45  
46      /**
47       * Creates a new adder with initial sum of zero.
# Line 177 | Line 55 | public class LongAdder implements Serial
55       * @param x the value to add
56       */
57      public void add(long x) {
58 <        Cell[] as; long v; HashCode hc; Cell a; int n;
59 <        if ((as = cells) != null ||
182 <            !UNSAFE.compareAndSwapLong(this, baseOffset, v = base, v + x)) {
58 >        Cell[] as; long b, v; HashCode hc; Cell a; int n;
59 >        if ((as = cells) != null || !casBase(b = base, b + x)) {
60              boolean uncontended = true;
61              int h = (hc = threadHashCode.get()).code;
62              if (as == null || (n = as.length) < 1 ||
63                  (a = as[(n - 1) & h]) == null ||
64 <                !(uncontended = UNSAFE.compareAndSwapLong(a, valueOffset,
65 <                                                          v = a.value, v + x)))
189 <                retryAdd(x, hc, uncontended);
190 <        }
191 <    }
192 <
193 <    /**
194 <     * Handle cases of add involving initialization, resizing,
195 <     * creating new Cells, and/or contention. See above for
196 <     * explanation. This method suffers the usual non-modularity
197 <     * problems of optimistic retry code, relying on rechecked sets of
198 <     * reads.
199 <     *
200 <     * @param x the value to add
201 <     * @param hc the hash code holder
202 <     * @param wasUncontended false if CAS failed before call
203 <     */
204 <    private void retryAdd(long x, HashCode hc, boolean wasUncontended) {
205 <        int h = hc.code;
206 <        boolean collide = false;                // True if last slot nonempty
207 <        for (;;) {
208 <            Cell[] as; Cell a; int n; long v;
209 <            if ((as = cells) != null && (n = as.length) > 0) {
210 <                if ((a = as[(n - 1) & h]) == null) {
211 <                    if (busy == 0) {            // Try to attach new Cell
212 <                        Cell r = new Cell(x);   // Optimistically create
213 <                        if (busy == 0 &&
214 <                            UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
215 <                            boolean created = false;
216 <                            try {               // Recheck under lock
217 <                                Cell[] rs; int m, j;
218 <                                if ((rs = cells) != null &&
219 <                                    (m = rs.length) > 0 &&
220 <                                    rs[j = (m - 1) & h] == null) {
221 <                                    rs[j] = r;
222 <                                    created = true;
223 <                                }
224 <                            } finally {
225 <                                busy = 0;
226 <                            }
227 <                            if (created)
228 <                                break;
229 <                            continue;           // Slot is now non-empty
230 <                        }
231 <                    }
232 <                    collide = false;
233 <                }
234 <                else if (!wasUncontended)       // CAS already known to fail
235 <                    wasUncontended = true;      // Continue after rehash
236 <                else if (UNSAFE.compareAndSwapLong(a, valueOffset,
237 <                                                   v = a.value, v + x))
238 <                    break;
239 <                else if (n >= NCPU || cells != as)
240 <                    collide = false;            // At max size or stale
241 <                else if (!collide)
242 <                    collide = true;
243 <                else if (busy == 0 &&           // Try to expand table
244 <                         UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
245 <                    try {
246 <                        if (cells == as) {
247 <                            Cell[] rs = new Cell[n << 1];
248 <                            for (int i = 0; i < n; ++i)
249 <                                rs[i] = as[i];
250 <                            cells = rs;
251 <                        }
252 <                    } finally {
253 <                        busy = 0;
254 <                    }
255 <                    collide = false;
256 <                    continue;                   // Retry with expanded table
257 <                }
258 <                h ^= h << 13;                   // Rehash
259 <                h ^= h >>> 17;
260 <                h ^= h << 5;
261 <            }
262 <            else if (busy == 0 && cells == as &&
263 <                     UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
264 <                boolean init = false;
265 <                try {                           // Initialize table
266 <                    if (cells == as) {
267 <                        Cell[] rs = new Cell[2];
268 <                        rs[h & 1] = new Cell(x);
269 <                        cells = rs;
270 <                        init = true;
271 <                    }
272 <                } finally {
273 <                    busy = 0;
274 <                }
275 <                if (init)
276 <                    break;
277 <            }
278 <            else if (UNSAFE.compareAndSwapLong(this, baseOffset,
279 <                                               v = base, v + x))
280 <                break;                          // Fall back on using base
64 >                !(uncontended = a.cas(v = a.value, v + x)))
65 >                retryUpdate(x, hc, uncontended);
66          }
282        hc.code = h;                            // Record index for next time
67      }
68  
69      /**
# Line 297 | Line 81 | public class LongAdder implements Serial
81      }
82  
83      /**
84 <     * Returns the current sum.  The result is only guaranteed to be
85 <     * accurate in the absence of concurrent updates. Otherwise, it
86 <     * may fail to reflect one or more updates occuring while
87 <     * calculating the result.
84 >     * Returns the current sum.  The returned value is <em>NOT</em> an
85 >     * atomic snapshot; invocation in the absence of concurrent
86 >     * updates returns an accurate result, but concurrent updates that
87 >     * occur while the sum is being calculated might not be
88 >     * incorporated.
89       *
90       * @return the sum
91       */
92      public long sum() {
308        Cell[] as = cells;
93          long sum = base;
94 +        Cell[] as = cells;
95          if (as != null) {
96              int n = as.length;
97              for (int i = 0; i < n; ++i) {
# Line 319 | Line 104 | public class LongAdder implements Serial
104      }
105  
106      /**
107 <     * Resets variables maintaining the sum to zero.  This is
108 <     * effective in setting the sum to zero only if there are no
109 <     * concurrent updates.
107 >     * Resets variables maintaining the sum to zero.  This method may
108 >     * be a useful alternative to creating a new adder, but is only
109 >     * effective if there are no concurrent updates.  Because this
110 >     * method is intrinsically racy, it should only be used when it is
111 >     * known that no threads are concurrently updating.
112       */
113      public void reset() {
114 <        Cell[] as = cells;
328 <        base = 0L;
329 <        if (as != null) {
330 <            int n = as.length;
331 <            for (int i = 0; i < n; ++i) {
332 <                Cell a = as[i];
333 <                if (a != null)
334 <                    a.value = 0L;
335 <            }
336 <        }
114 >        internalReset(0L);
115      }
116  
117      /**
# Line 341 | Line 119 | public class LongAdder implements Serial
119       * #reset}. This method may apply for example during quiescent
120       * points between multithreaded computations.  If there are
121       * updates concurrent with this method, the returned value is
122 <     * <em>not</em> guaranteed to be the final sum occurring before
122 >     * <em>not</em> guaranteed to be the final value occurring before
123       * the reset.
124       *
125       * @return the sum
126       */
127      public long sumThenReset() {
350        Cell[] as = cells;
128          long sum = base;
129 +        Cell[] as = cells;
130          base = 0L;
131          if (as != null) {
132              int n = as.length;
# Line 363 | Line 141 | public class LongAdder implements Serial
141          return sum;
142      }
143  
144 +    /**
145 +     * Returns the String representation of the {@link #sum}.
146 +     * @return the String representation of the {@link #sum}
147 +     */
148 +    public String toString() {
149 +        return Long.toString(sum());
150 +    }
151 +
152 +    /**
153 +     * Equivalent to {@link #sum}.
154 +     *
155 +     * @return the sum
156 +     */
157 +    public long longValue() {
158 +        return sum();
159 +    }
160 +
161 +    /**
162 +     * Returns the {@link #sum} as an {@code int} after a narrowing
163 +     * primitive conversion.
164 +     */
165 +    public int intValue() {
166 +        return (int)sum();
167 +    }
168 +
169 +    /**
170 +     * Returns the {@link #sum} as a {@code float}
171 +     * after a widening primitive conversion.
172 +     */
173 +    public float floatValue() {
174 +        return (float)sum();
175 +    }
176 +
177 +    /**
178 +     * Returns the {@link #sum} as a {@code double} after a widening
179 +     * primitive conversion.
180 +     */
181 +    public double doubleValue() {
182 +        return (double)sum();
183 +    }
184 +
185      private void writeObject(java.io.ObjectOutputStream s)
186          throws java.io.IOException {
187          s.defaultWriteObject();
188          s.writeLong(sum());
189      }
190  
191 <    private void readObject(ObjectInputStream s)
192 <        throws IOException, ClassNotFoundException {
191 >    private void readObject(java.io.ObjectInputStream s)
192 >        throws java.io.IOException, ClassNotFoundException {
193          s.defaultReadObject();
194          busy = 0;
195          cells = null;
196          base = s.readLong();
197      }
198  
380    // Unsafe mechanics
381    private static final sun.misc.Unsafe UNSAFE;
382    private static final long baseOffset;
383    private static final long busyOffset;
384    private static final long valueOffset;
385    static {
386        try {
387            UNSAFE = getUnsafe();
388            Class<?> sk = LongAdder.class;
389            baseOffset = UNSAFE.objectFieldOffset
390                (sk.getDeclaredField("base"));
391            busyOffset = UNSAFE.objectFieldOffset
392                (sk.getDeclaredField("busy"));
393            Class<?> ak = Cell.class;
394            valueOffset = UNSAFE.objectFieldOffset
395                (ak.getDeclaredField("value"));
396        } catch (Exception e) {
397            throw new Error(e);
398        }
399    }
400
401    /**
402     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
403     * Replace with a simple call to Unsafe.getUnsafe when integrating
404     * into a jdk.
405     *
406     * @return a sun.misc.Unsafe
407     */
408    private static sun.misc.Unsafe getUnsafe() {
409        try {
410            return sun.misc.Unsafe.getUnsafe();
411        } catch (SecurityException se) {
412            try {
413                return java.security.AccessController.doPrivileged
414                    (new java.security
415                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
416                        public sun.misc.Unsafe run() throws Exception {
417                            java.lang.reflect.Field f = sun.misc
418                                .Unsafe.class.getDeclaredField("theUnsafe");
419                            f.setAccessible(true);
420                            return (sun.misc.Unsafe) f.get(null);
421                        }});
422            } catch (java.security.PrivilegedActionException e) {
423                throw new RuntimeException("Could not initialize intrinsics",
424                                           e.getCause());
425            }
426        }
427    }
428
199   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines