ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/LongAdder.java
(Generate patch)

Comparing jsr166/src/jsr166e/LongAdder.java (file contents):
Revision 1.4 by jsr166, Mon Aug 1 18:54:15 2011 UTC vs.
Revision 1.17 by jsr166, Mon May 5 20:20:15 2014 UTC

# Line 5 | Line 5
5   */
6  
7   package jsr166e;
8 < import java.util.Random;
9 < import java.util.concurrent.atomic.AtomicInteger;
10 < import java.util.concurrent.atomic.AtomicLong;
8 >
9   import java.io.IOException;
12 import java.io.Serializable;
10   import java.io.ObjectInputStream;
11   import java.io.ObjectOutputStream;
12 + import java.io.Serializable;
13 + import java.util.concurrent.atomic.AtomicLong;
14  
15   /**
16 < * One or more variables that together maintain an initially zero sum.
17 < * When updates (method {@link #add}) are contended across threads,
18 < * the set of variables may grow dynamically to reduce contention.
16 > * One or more variables that together maintain an initially zero
17 > * {@code long} sum.  When updates (method {@link #add}) are contended
18 > * across threads, the set of variables may grow dynamically to reduce
19 > * contention. Method {@link #sum} (or, equivalently, {@link
20 > * #longValue}) returns the current total combined across the
21 > * variables maintaining the sum.
22   *
23 < * <p> This class is usually preferable to {@link AtomicLong} when
23 > * <p>This class is usually preferable to {@link AtomicLong} when
24   * multiple threads update a common sum that is used for purposes such
25   * as collecting statistics, not for fine-grained synchronization
26   * control.  Under low update contention, the two classes have similar
# Line 26 | Line 28 | import java.io.ObjectOutputStream;
28   * this class is significantly higher, at the expense of higher space
29   * consumption.
30   *
31 < * <p> Method {@link #sum} returns the current combined total across
32 < * the variables maintaining the sum.  This value is <em>NOT</em> an
33 < * atomic snapshot: Invocation of {@code sum} in the absence of
34 < * concurrent updates returns an accurate result, but concurrent
33 < * updates that occur while the sum is being calculated might not be
34 < * incorporated.  The sum may also be {@code reset} to zero, as
35 < * an alternative to creating a new adder.  However, method {@link
36 < * #reset} is intrinsically racy, so should only be used when it is
37 < * known that no threads are concurrently updating the sum.
31 > * <p>This class extends {@link Number}, but does <em>not</em> define
32 > * methods such as {@code equals}, {@code hashCode} and {@code
33 > * compareTo} because instances are expected to be mutated, and so are
34 > * not useful as collection keys.
35   *
36   * <p><em>jsr166e note: This class is targeted to be placed in
37 < * java.util.concurrent.atomic<em>
37 > * java.util.concurrent.atomic.</em>
38   *
39 + * @since 1.8
40   * @author Doug Lea
41   */
42 < public class LongAdder implements Serializable {
42 > public class LongAdder extends Striped64 implements Serializable {
43      private static final long serialVersionUID = 7249069246863182397L;
44  
47    /*
48     * A LongAdder maintains a lazily-initialized table of atomically
49     * updated variables, plus an extra "base" field. The table size
50     * is a power of two. Indexing uses masked per-thread hash codes
51     *
52     * Table entries are of class Cell; a variant of AtomicLong padded
53     * to reduce cache contention on most processors. Padding is
54     * overkill for most Atomics because they are usually irregularly
55     * scattered in memory and thus don't interfere much with each
56     * other. But Atomic objects residing in arrays will tend to be
57     * placed adjacent to each other, and so will most often share
58     * cache lines (with a huge negative performance impact) without
59     * this precaution.
60     *
61     * In part because Cells are relatively large, we avoid creating
62     * them until they are needed.  When there is no contention, all
63     * updates are made to the base field.  Upon first contention (a
64     * failed CAS on base update), the table is initialized to size 2.
65     * The table size is doubled upon further contention until
66     * reaching the nearest power of two greater than or equal to the
67     * number of CPUS.
68     *
69     * Per-thread hash codes are initialized to random values.
70     * Contention and/or table collisions are indicated by failed
71     * CASes when performing an add operation (see method
72     * retryAdd). Upon a collision, if the table size is less than the
73     * capacity, it is doubled in size unless some other thread holds
74     * the lock. If a hashed slot is empty, and lock is available, a
75     * new Cell is created. Otherwise, if the slot exists, a CAS is
76     * tried.  Retries proceed by "double hashing", using a secondary
77     * hash (Marsaglia XorShift) to try to find a free slot.
78     *
79     * The table size is capped because, when there are more threads
80     * than CPUs, supposing that each thread were bound to a CPU,
81     * there would exist a perfect hash function mapping threads to
82     * slots that eliminates collisions. When we reach capacity, we
83     * search for this mapping by randomly varying the hash codes of
84     * colliding threads.  Because search is random, and collisions
85     * only become known via CAS failures, convergence can be slow,
86     * and because threads are typically not bound to CPUS forever,
87     * may not occur at all. However, despite these limitations,
88     * observed contention rates are typically low in these cases.
89     *
90     * A single spinlock is used for initializing and resizing the
91     * table, as well as populating slots with new Cells.  There is no
92     * need for a blocking lock: Upon lock contention, threads try
93     * other slots (or the base) rather than blocking.  During these
94     * retries, there is increased contention and reduced locality,
95     * which is still better than alternatives.
96     *
97     * It is possible for a Cell to become unused when threads that
98     * once hashed to it terminate, as well as in the case where
99     * doubling the table causes no thread to hash to it under
100     * expanded mask.  We do not try to detect or remove such cells,
101     * under the assumption that for long-running adders, observed
102     * contention levels will recur, so the cells will eventually be
103     * needed again; and for short-lived ones, it does not matter.
104     *
105     * JVM intrinsics note: It would be possible to use a release-only
106     * form of CAS here, if it were provided.
107     */
108
109    /**
110     * Padded variant of AtomicLong.  The value field is placed
111     * between pads, hoping that the JVM doesn't reorder them.
112     * Updates are via inlined CAS in methods add and retryAdd.
113     */
114    static final class Cell {
115        volatile long p0, p1, p2, p3, p4, p5, p6;
116        volatile long value;
117        volatile long q0, q1, q2, q3, q4, q5, q6;
118        Cell(long x) { value = x; }
119    }
120
121    /**
122     * Holder for the thread-local hash code. The code is initially
123     * random, but may be set to a different value upon collisions.
124     */
125    static final class HashCode {
126        static final Random rng = new Random();
127        int code;
128        HashCode() {
129            int h = rng.nextInt(); // Avoid zero to allow xorShift rehash
130            code = (h == 0) ? 1 : h;
131        }
132    }
133
134    /**
135     * The corresponding ThreadLocal class
136     */
137    static final class ThreadHashCode extends ThreadLocal<HashCode> {
138        public HashCode initialValue() { return new HashCode(); }
139    }
140
141    /**
142     * Static per-thread hash codes. Shared across all LongAdders
143     * to reduce ThreadLocal pollution and because adjustments due to
144     * collisions in one table are likely to be appropriate for
145     * others.
146     */
147    static final ThreadHashCode threadHashCode = new ThreadHashCode();
148
149    /** Number of CPUS, to place bound on table size */
150    private static final int NCPU = Runtime.getRuntime().availableProcessors();
151
152    /**
153     * Table of cells. When non-null, size is a power of 2.
154     */
155    private transient volatile Cell[] cells;
156
157    /**
158     * Base sum, used mainly when there is no contention, but also as
159     * a fallback during table initialization races. Updated via CAS.
160     */
161    private transient volatile long base;
162
45      /**
46 <     * Spinlock (locked via CAS) used when resizing and/or creating Cells.
46 >     * Version of plus for use in retryUpdate
47       */
48 <    private transient volatile int busy;
48 >    final long fn(long v, long x) { return v + x; }
49  
50      /**
51       * Creates a new adder with initial sum of zero.
# Line 177 | Line 59 | public class LongAdder implements Serial
59       * @param x the value to add
60       */
61      public void add(long x) {
62 <        Cell[] as; long v; HashCode hc; Cell a; int n;
63 <        if ((as = cells) != null ||
182 <            !UNSAFE.compareAndSwapLong(this, baseOffset, v = base, v + x)) {
62 >        Cell[] as; long b, v; int[] hc; Cell a; int n;
63 >        if ((as = cells) != null || !casBase(b = base, b + x)) {
64              boolean uncontended = true;
65 <            int h = (hc = threadHashCode.get()).code;
66 <            if (as == null || (n = as.length) < 1 ||
67 <                (a = as[(n - 1) & h]) == null ||
68 <                !(uncontended = UNSAFE.compareAndSwapLong(a, valueOffset,
69 <                                                          v = a.value, v + x)))
189 <                retryAdd(x, hc, uncontended);
65 >            if ((hc = threadHashCode.get()) == null ||
66 >                as == null || (n = as.length) < 1 ||
67 >                (a = as[(n - 1) & hc[0]]) == null ||
68 >                !(uncontended = a.cas(v = a.value, v + x)))
69 >                retryUpdate(x, hc, uncontended);
70          }
71      }
72  
73      /**
194     * Handle cases of add involving initialization, resizing,
195     * creating new Cells, and/or contention. See above for
196     * explanation. This method suffers the usual non-modularity
197     * problems of optimistic retry code, relying on rechecked sets of
198     * reads.
199     *
200     * @param x the value to add
201     * @param hc the hash code holder
202     * @param wasUncontended false if CAS failed before call
203     */
204    private void retryAdd(long x, HashCode hc, boolean wasUncontended) {
205        int h = hc.code;
206        boolean collide = false;                // True if last slot nonempty
207        for (;;) {
208            Cell[] as; Cell a; int n; long v;
209            if ((as = cells) != null && (n = as.length) > 0) {
210                if ((a = as[(n - 1) & h]) == null) {
211                    if (busy == 0) {            // Try to attach new Cell
212                        Cell r = new Cell(x);   // Optimistically create
213                        if (busy == 0 &&
214                            UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
215                            boolean created = false;
216                            try {               // Recheck under lock
217                                Cell[] rs; int m, j;
218                                if ((rs = cells) != null &&
219                                    (m = rs.length) > 0 &&
220                                    rs[j = (m - 1) & h] == null) {
221                                    rs[j] = r;
222                                    created = true;
223                                }
224                            } finally {
225                                busy = 0;
226                            }
227                            if (created)
228                                break;
229                            continue;           // Slot is now non-empty
230                        }
231                    }
232                    collide = false;
233                }
234                else if (!wasUncontended)       // CAS already known to fail
235                    wasUncontended = true;      // Continue after rehash
236                else if (UNSAFE.compareAndSwapLong(a, valueOffset,
237                                                   v = a.value, v + x))
238                    break;
239                else if (n >= NCPU || cells != as)
240                    collide = false;            // At max size or stale
241                else if (!collide)
242                    collide = true;
243                else if (busy == 0 &&           // Try to expand table
244                         UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
245                    try {
246                        if (cells == as) {
247                            Cell[] rs = new Cell[n << 1];
248                            for (int i = 0; i < n; ++i)
249                                rs[i] = as[i];
250                            cells = rs;
251                        }
252                    } finally {
253                        busy = 0;
254                    }
255                    collide = false;
256                    continue;                   // Retry with expanded table
257                }
258                h ^= h << 13;                   // Rehash
259                h ^= h >>> 17;
260                h ^= h << 5;
261            }
262            else if (busy == 0 && cells == as &&
263                     UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1)) {
264                boolean init = false;
265                try {                           // Initialize table
266                    if (cells == as) {
267                        Cell[] rs = new Cell[2];
268                        rs[h & 1] = new Cell(x);
269                        cells = rs;
270                        init = true;
271                    }
272                } finally {
273                    busy = 0;
274                }
275                if (init)
276                    break;
277            }
278            else if (UNSAFE.compareAndSwapLong(this, baseOffset,
279                                               v = base, v + x))
280                break;                          // Fall back on using base
281        }
282        hc.code = h;                            // Record index for next time
283    }
284
285    /**
74       * Equivalent to {@code add(1)}.
75       */
76      public void increment() {
# Line 297 | Line 85 | public class LongAdder implements Serial
85      }
86  
87      /**
88 <     * Returns the current sum.  The result is only guaranteed to be
89 <     * accurate in the absence of concurrent updates. Otherwise, it
90 <     * may fail to reflect one or more updates occuring while
91 <     * calculating the result.
88 >     * Returns the current sum.  The returned value is <em>NOT</em> an
89 >     * atomic snapshot; invocation in the absence of concurrent
90 >     * updates returns an accurate result, but concurrent updates that
91 >     * occur while the sum is being calculated might not be
92 >     * incorporated.
93       *
94       * @return the sum
95       */
96      public long sum() {
308        Cell[] as = cells;
97          long sum = base;
98 +        Cell[] as = cells;
99          if (as != null) {
100              int n = as.length;
101              for (int i = 0; i < n; ++i) {
# Line 319 | Line 108 | public class LongAdder implements Serial
108      }
109  
110      /**
111 <     * Resets variables maintaining the sum to zero.  This is
112 <     * effective in setting the sum to zero only if there are no
113 <     * concurrent updates.
111 >     * Resets variables maintaining the sum to zero.  This method may
112 >     * be a useful alternative to creating a new adder, but is only
113 >     * effective if there are no concurrent updates.  Because this
114 >     * method is intrinsically racy, it should only be used when it is
115 >     * known that no threads are concurrently updating.
116       */
117      public void reset() {
118 <        Cell[] as = cells;
328 <        base = 0L;
329 <        if (as != null) {
330 <            int n = as.length;
331 <            for (int i = 0; i < n; ++i) {
332 <                Cell a = as[i];
333 <                if (a != null)
334 <                    a.value = 0L;
335 <            }
336 <        }
118 >        internalReset(0L);
119      }
120  
121      /**
# Line 341 | Line 123 | public class LongAdder implements Serial
123       * #reset}. This method may apply for example during quiescent
124       * points between multithreaded computations.  If there are
125       * updates concurrent with this method, the returned value is
126 <     * <em>not</em> guaranteed to be the final sum occurring before
126 >     * <em>not</em> guaranteed to be the final value occurring before
127       * the reset.
128       *
129       * @return the sum
130       */
131      public long sumThenReset() {
350        Cell[] as = cells;
132          long sum = base;
133 +        Cell[] as = cells;
134          base = 0L;
135          if (as != null) {
136              int n = as.length;
# Line 363 | Line 145 | public class LongAdder implements Serial
145          return sum;
146      }
147  
148 <    private void writeObject(java.io.ObjectOutputStream s)
149 <        throws java.io.IOException {
148 >    /**
149 >     * Returns the String representation of the {@link #sum}.
150 >     * @return the String representation of the {@link #sum}
151 >     */
152 >    public String toString() {
153 >        return Long.toString(sum());
154 >    }
155 >
156 >    /**
157 >     * Equivalent to {@link #sum}.
158 >     *
159 >     * @return the sum
160 >     */
161 >    public long longValue() {
162 >        return sum();
163 >    }
164 >
165 >    /**
166 >     * Returns the {@link #sum} as an {@code int} after a narrowing
167 >     * primitive conversion.
168 >     */
169 >    public int intValue() {
170 >        return (int)sum();
171 >    }
172 >
173 >    /**
174 >     * Returns the {@link #sum} as a {@code float}
175 >     * after a widening primitive conversion.
176 >     */
177 >    public float floatValue() {
178 >        return (float)sum();
179 >    }
180 >
181 >    /**
182 >     * Returns the {@link #sum} as a {@code double} after a widening
183 >     * primitive conversion.
184 >     */
185 >    public double doubleValue() {
186 >        return (double)sum();
187 >    }
188 >
189 >    private void writeObject(ObjectOutputStream s) throws IOException {
190          s.defaultWriteObject();
191          s.writeLong(sum());
192      }
193  
194      private void readObject(ObjectInputStream s)
195 <        throws IOException, ClassNotFoundException {
195 >            throws IOException, ClassNotFoundException {
196          s.defaultReadObject();
197          busy = 0;
198          cells = null;
199          base = s.readLong();
200      }
201  
380    // Unsafe mechanics
381    private static final sun.misc.Unsafe UNSAFE;
382    private static final long baseOffset;
383    private static final long busyOffset;
384    private static final long valueOffset;
385    static {
386        try {
387            UNSAFE = getUnsafe();
388            Class<?> sk = LongAdder.class;
389            baseOffset = UNSAFE.objectFieldOffset
390                (sk.getDeclaredField("base"));
391            busyOffset = UNSAFE.objectFieldOffset
392                (sk.getDeclaredField("busy"));
393            Class<?> ak = Cell.class;
394            valueOffset = UNSAFE.objectFieldOffset
395                (ak.getDeclaredField("value"));
396        } catch (Exception e) {
397            throw new Error(e);
398        }
399    }
400
401    /**
402     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
403     * Replace with a simple call to Unsafe.getUnsafe when integrating
404     * into a jdk.
405     *
406     * @return a sun.misc.Unsafe
407     */
408    private static sun.misc.Unsafe getUnsafe() {
409        try {
410            return sun.misc.Unsafe.getUnsafe();
411        } catch (SecurityException se) {
412            try {
413                return java.security.AccessController.doPrivileged
414                    (new java.security
415                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
416                        public sun.misc.Unsafe run() throws Exception {
417                            java.lang.reflect.Field f = sun.misc
418                                .Unsafe.class.getDeclaredField("theUnsafe");
419                            f.setAccessible(true);
420                            return (sun.misc.Unsafe) f.get(null);
421                        }});
422            } catch (java.security.PrivilegedActionException e) {
423                throw new RuntimeException("Could not initialize intrinsics",
424                                           e.getCause());
425            }
426        }
427    }
428
202   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines