ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/StripedAdder.java
(Generate patch)

Comparing jsr166/src/jsr166e/StripedAdder.java (file contents):
Revision 1.5 by dl, Sun Jul 24 15:08:21 2011 UTC vs.
Revision 1.6 by dl, Tue Jul 26 17:16:36 2011 UTC

# Line 44 | Line 44 | public class StripedAdder implements Ser
44  
45      /*
46       * A StripedAdder maintains a table of Atomic long variables. The
47 <     * table is indexed by per-thread hash codes that are initialized
48 <     * to random values.
47 >     * table is indexed by per-thread hash codes.
48       *
49 <     * The table doubles in size upon contention (as indicated by
50 <     * failed CASes when performing add()), but is capped at the
51 <     * nearest power of two >= #CPUS. This reflects the idea that,
52 <     * when there are more threads than CPUs, then if each thread were
53 <     * bound to a CPU, there would exist a perfect hash function
54 <     * mapping threads to slots that eliminates collisions. When we
55 <     * reach capacity, we search for this mapping by randomly varying
56 <     * the hash codes of colliding threads.  Because search is random,
57 <     * and failures only become known via CAS failures, convergence
58 <     * will be slow, and because threads are typically not bound to
59 <     * CPUS forever, may not occur at all. However, despite these
60 <     * limitations, observed contention is typically low in these
61 <     * cases.
49 >     * By default, the table is lazily initialized, to minimize
50 >     * footprint until adders are used. On first use, the table is set
51 >     * to size DEFAULT_INITIAL_SIZE (currently 8). Table size is
52 >     * bounded by the number of CPUS (if larger than the default
53 >     * size).
54 >     *
55 >     * Per-thread hash codes are initialized to random values.
56 >     * Collisions are indicated by failed CASes when performing an add
57 >     * operation (see method retryAdd). Upon a collision, if the table
58 >     * size is less than the capacity, it is doubled in size unless
59 >     * some other thread holds lock. If a hashed slot is empty, and
60 >     * lock is available, a new Adder is created. Otherwise, if the
61 >     * slot exists, a CAS is tried.  Retries proceed by "double
62 >     * hashing", using a secondary hash (Marsaglia XorShift) to try to
63 >     * find a free slot.
64 >     *
65 >     * The table size is capped because, when there are more threads
66 >     * than CPUs, supposing that each thread were bound to a CPU,
67 >     * there would exist a perfect hash function mapping threads to
68 >     * slots that eliminates collisions. When we reach capacity, we
69 >     * search for this mapping by randomly varying the hash codes of
70 >     * colliding threads.  Because search is random, and failures only
71 >     * become known via CAS failures, convergence will be slow, and
72 >     * because threads are typically not bound to CPUS forever, may
73 >     * not occur at all. However, despite these limitations, observed
74 >     * contention is typically low in these cases.
75       *
76       * Table entries are of class Adder; a form of AtomicLong padded
77       * to reduce cache contention on most processors. Padding is
78 <     * overkill for most Atomics because they are most often
79 <     * irregularly scattered in memory and thus don't interfere much
80 <     * with each other. But Atomic objects residing in arrays will
81 <     * tend to be placed adjacent to each other, and so will most
82 <     * often share cache lines without this precaution.  Adders are
78 >     * overkill for most Atomics because they are usually irregularly
79 >     * scattered in memory and thus don't interfere much with each
80 >     * other. But Atomic objects residing in arrays will tend to be
81 >     * placed adjacent to each other, and so will most often share
82 >     * cache lines without this precaution.  Adders are by default
83       * constructed upon first use, which further improves per-thread
84 <     * locality and helps reduce (an already large) footprint.
84 >     * locality and helps reduce footprint.
85       *
86       * A single spinlock is used for resizing the table as well as
87       * populating slots with new Adders. Upon lock contention, threads
88       * try other slots rather than blocking. After initialization, at
89       * least one slot exists, so retries will eventually find a
90 <     * candidate Adder. During these retries, there is increased
90 >     * candidate Adder.  During these retries, there is increased
91       * contention and reduced locality, which is still better than
92       * alternatives.
93       */
94  
95      /**
84     * Number of processors, to place a cap on table growth.
85     */
86    static final int NCPU = Runtime.getRuntime().availableProcessors();
87
88    /**
89     * The table size set upon first use when default-constructed
90     */
91    private static final int DEFAULT_ARRAY_SIZE = 8;
92
93    /**
96       * Padded version of AtomicLong
97       */
98      static final class Adder extends AtomicLong {
99 <        long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd;
99 >        long p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
100          Adder(long x) { super(x); }
101      }
102  
103 +    private static final int NCPU = Runtime.getRuntime().availableProcessors();
104 +
105      /**
106 <     * Holder for the thread-local hash code. The code starts off with
107 <     * a given random value, but may be set to a different value upon
108 <     * collisions in retryAdd.
106 >     * Table bounds. DEFAULT_INITIAL_SIZE is the table size set upon
107 >     * first use under default constructor, and must be a power of
108 >     * two. There is not much point in making size a lot smaller than
109 >     * that of Adders though.  CAP is the maximum allowed table size.
110 >     */
111 >    private static final int DEFAULT_INITIAL_SIZE = 8;
112 >    private static final int CAP = Math.max(NCPU, DEFAULT_INITIAL_SIZE);
113 >
114 >    /**
115 >     * Holder for the thread-local hash code. The code is initially
116 >     * random, but may be set to a different value upon collisions.
117       */
118      static final class HashCode {
119 +        static final Random rng = new Random();
120          int code;
121 <        HashCode(int h) { code = h; }
121 >        HashCode() {
122 >            int h = rng.nextInt();
123 >            code = (h == 0) ? 1 : h; // ensure nonzero
124 >        }
125      }
126  
127      /**
128       * The corresponding ThreadLocal class
129       */
130      static final class ThreadHashCode extends ThreadLocal<HashCode> {
131 <        static final Random rng = new Random();
116 <        public HashCode initialValue() {
117 <            int h = rng.nextInt();
118 <            return new HashCode((h == 0) ? 1 : h); // ensure nonzero
119 <        }
131 >        public HashCode initialValue() { return new HashCode(); }
132      }
133  
134      /**
135       * Static per-thread hash codes. Shared across all StripedAdders
136 <     * because adjustments due to collisions in one table are likely
137 <     * to be appropriate for others.
136 >     * to reduce ThreadLocal pollution and because adjustments due to
137 >     * collisions in one table are likely to be appropriate for
138 >     * others.
139       */
140      static final ThreadHashCode threadHashCode = new ThreadHashCode();
141  
142      /**
143 <     * Table of adders. Size is power of two, grows to be at most NCPU.
143 >     * Table of adders. Size is power of two, grows to be at most CAP.
144       */
145      private transient volatile Adder[] adders;
146  
147      /**
148       * Serves as a lock when resizing and/or creating Adders.  There
149       * is no need for a blocking lock: Except during initialization
150 <     * races, when busy, other threads try other slots.
150 >     * races, when busy, other threads try other slots. However,
151 >     * during (double-checked) initializations, we use the
152 >     * "synchronized" lock on this object.
153       */
154      private final AtomicInteger mutex;
155  
# Line 154 | Line 169 | public class StripedAdder implements Ser
169       * will concurrently update the sum.
170       */
171      public StripedAdder(int expectedContention) {
172 <        int size;
173 <        if (expectedContention > 0) {
174 <            int cap = (expectedContention < NCPU) ? expectedContention : NCPU;
175 <            size = 1;
161 <            while (size < cap)
162 <                size <<= 1;
163 <        }
164 <        else
165 <            size = 0;
172 >        int cap = (expectedContention < CAP) ? expectedContention : CAP;
173 >        int size = 1;
174 >        while (size < cap)
175 >            size <<= 1;
176          Adder[] as = new Adder[size];
177          for (int i = 0; i < size; ++i)
178              as[i] = new Adder(0);
# Line 178 | Line 188 | public class StripedAdder implements Ser
188      public void add(long x) {
189          Adder[] as; Adder a; int n; long v; // locals to hold volatile reads
190          HashCode hc = threadHashCode.get();
191 +        int h = hc.code;
192          if ((as = adders) == null || (n = as.length) < 1 ||
193 <            (a = as[hc.code & (n - 1)]) == null ||
193 >            (a = as[(n - 1) & h]) == null ||
194              !a.compareAndSet(v = a.get(), v + x))
195              retryAdd(x, hc);
196      }
197  
198      /**
199       * Handle cases of add involving initialization, resizing,
200 <     * creating new Adders, and/or contention.
200 >     * creating new Adders, and/or contention. See above for
201 >     * explanation.
202       */
203      private void retryAdd(long x, HashCode hc) {
204          int h = hc.code;
205          final AtomicInteger mutex = this.mutex;
206 <        for (boolean retried = false; ; retried = true) {
207 <            Adder[] as; Adder a; long v; int n, k; // Locals for volatiles
208 <            if ((as = adders) == null || (n = as.length) < 1) {
206 >        int collisions = 1 - mutex.get(); // first guess: collides if not locked
207 >        for (;;) {
208 >            Adder[] as; Adder a; long v; int k, n;
209 >            while ((as = adders) == null || (n = as.length) < 1) {
210 >                synchronized(mutex) {                // Try to initialize
211 >                    if (adders == null) {
212 >                        Adder[] rs = new Adder[DEFAULT_INITIAL_SIZE];
213 >                        rs[h & (DEFAULT_INITIAL_SIZE - 1)] = new Adder(0);
214 >                        adders = rs;
215 >                    }
216 >                }
217 >                collisions = 0;
218 >            }
219 >
220 >            if ((a = as[k = (n - 1) & h]) == null) { // Try to add slot
221                  if (mutex.get() == 0 && mutex.compareAndSet(0, 1)) {
222                      try {
223 <                        if (adders == null)        // Default-initialize
224 <                            adders = new Adder[DEFAULT_ARRAY_SIZE];
223 >                        if (adders == as && as[k] == null)
224 >                            a = as[k] = new Adder(x);
225                      } finally {
226                          mutex.set(0);
227                      }
228 +                    if (a != null)
229 +                        break;
230                  }
231 <                else
206 <                    Thread.yield();               // initialization race
231 >                collisions = 0;
232              }
233 <            else if ((a = as[k = h & (n - 1)]) != null &&
209 <                     retried && a.compareAndSet(v = a.get(), v + x))
210 <                break;
211 <            else if ((a == null || n < NCPU) &&
233 >            else if (collisions != 0 && n < CAP &&   // Try to expand table
234                       mutex.get() == 0 && mutex.compareAndSet(0, 1)) {
213                boolean created = false;
235                  try {
236                      if (adders == as) {
237 <                        if (as[k] == null) {
238 <                            as[k] = new Adder(x);
239 <                            created = true;
240 <                        }
220 <                        else {                   // Expand table
221 <                            Adder[] rs = new Adder[n << 1];
222 <                            for (int i = 0; i < n; ++i)
223 <                                rs[i] = as[i];
224 <                            adders = rs;
225 <                        }
237 >                        Adder[] rs = new Adder[n << 1];
238 >                        for (int i = 0; i < n; ++i)
239 >                            rs[i] = as[i];
240 >                        adders = rs;
241                      }
242                  } finally {
243                      mutex.set(0);
244                  }
245 <                if (created)
231 <                    break;
232 <            }
233 <            else {                                // Try elsewhere
234 <                h ^= h << 13;
235 <                h ^= h >>> 17;                    // Marsaglia XorShift
236 <                h ^= h << 5;
245 >                collisions = 0;
246              }
247 +            else if (a.compareAndSet(v = a.get(), v + x))
248 +                break;
249 +            else
250 +                collisions = 1;
251 +            h ^= h << 13;                            // Rehash
252 +            h ^= h >>> 17;
253 +            h ^= h << 5;
254          }
255          hc.code = h;
256      }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines