ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/Exchanger.java (file contents):
Revision 1.54 by jsr166, Thu Dec 22 23:31:45 2011 UTC vs.
Revision 1.55 by dl, Fri Dec 23 19:33:45 2011 UTC

# Line 74 | Line 74 | import java.util.concurrent.locks.LockSu
74   * @param <V> The type of objects that may be exchanged
75   */
76   public class Exchanger<V> {
77 +
78      /*
79 <     * Algorithm Description:
79 >     * Oveview: The core algorithm is, for an exchange "slot",
80 >     * and a participant (caller) with an item:
81       *
82 <     * The basic idea is to maintain a "slot", which is a reference to
83 <     * a Node containing both an Item to offer and a "hole" waiting to
84 <     * get filled in.  If an incoming "occupying" thread sees that the
85 <     * slot is null, it CAS'es (compareAndSets) a Node there and waits
86 <     * for another to invoke exchange.  That second "fulfilling" thread
87 <     * sees that the slot is non-null, and so CASes it back to null,
88 <     * also exchanging items by CASing the hole, plus waking up the
89 <     * occupying thread if it is blocked.  In each case CAS'es may
90 <     * fail because a slot at first appears non-null but is null upon
91 <     * CAS, or vice-versa.  So threads may need to retry these
92 <     * actions.
93 <     *
94 <     * This simple approach works great when there are only a few
95 <     * threads using an Exchanger, but performance rapidly
96 <     * deteriorates due to CAS contention on the single slot when
97 <     * there are lots of threads using an exchanger.  So instead we use
98 <     * an "arena"; basically a kind of hash table with a dynamically
99 <     * varying number of slots, any one of which can be used by
100 <     * threads performing an exchange.  Incoming threads pick slots
101 <     * based on a hash of their Thread ids.  If an incoming thread
102 <     * fails to CAS in its chosen slot, it picks an alternative slot
103 <     * instead.  And similarly from there.  If a thread successfully
104 <     * CASes into a slot but no other thread arrives, it tries
105 <     * another, heading toward the zero slot, which always exists even
106 <     * if the table shrinks.  The particular mechanics controlling this
107 <     * are as follows:
108 <     *
109 <     * Waiting: Slot zero is special in that it is the only slot that
110 <     * exists when there is no contention.  A thread occupying slot
111 <     * zero will block if no thread fulfills it after a short spin.
112 <     * In other cases, occupying threads eventually give up and try
113 <     * another slot.  Waiting threads spin for a while (a period that
114 <     * should be a little less than a typical context-switch time)
115 <     * before either blocking (if slot zero) or giving up (if other
116 <     * slots) and restarting.  There is no reason for threads to block
117 <     * unless there are unlikely to be any other threads present.
118 <     * Occupants are mainly avoiding memory contention so sit there
119 <     * quietly polling for a shorter period than it would take to
120 <     * block and then unblock them.  Non-slot-zero waits that elapse
121 <     * because of lack of other threads waste around one extra
122 <     * context-switch time per try, which is still on average much
123 <     * faster than alternative approaches.
124 <     *
125 <     * Sizing: Usually, using only a few slots suffices to reduce
126 <     * contention.  Especially with small numbers of threads, using
127 <     * too many slots can lead to just as poor performance as using
128 <     * too few of them, and there's not much room for error.  The
129 <     * variable "max" maintains the number of slots actually in
130 <     * use.  It is increased when a thread sees too many CAS
131 <     * failures.  (This is analogous to resizing a regular hash table
132 <     * based on a target load factor, except here, growth steps are
133 <     * just one-by-one rather than proportional.)  Growth requires
134 <     * contention failures in each of three tried slots.  Requiring
135 <     * multiple failures for expansion copes with the fact that some
136 <     * failed CASes are not due to contention but instead to simple
137 <     * races between two threads or thread pre-emptions occurring
138 <     * between reading and CASing.  Also, very transient peak
139 <     * contention can be much higher than the average sustainable
140 <     * levels.  An attempt to decrease the max limit is usually made
141 <     * when a non-slot-zero wait elapses without being fulfilled.
142 <     * Threads experiencing elapsed waits move closer to zero, so
143 <     * eventually find existing (or future) threads even if the table
144 <     * has been shrunk due to inactivity.  The chosen mechanics and
145 <     * thresholds for growing and shrinking are intrinsically
146 <     * entangled with indexing and hashing inside the exchange code,
147 <     * and can't be nicely abstracted out.
148 <     *
149 <     * Hashing: Each thread picks its initial slot to use in accord
150 <     * with a simple hashcode.  The sequence is the same on each
151 <     * encounter by any given thread, but effectively random across
152 <     * threads.  Using arenas encounters the classic cost vs quality
153 <     * tradeoffs of all hash tables.  Here, we use a one-step FNV-1a
154 <     * hash code based on the current thread's Thread.getId(), along
155 <     * with a cheap approximation to a mod operation to select an
156 <     * index.  The downside of optimizing index selection in this way
157 <     * is that the code is hardwired to use a maximum table size of
158 <     * 32.  But this value more than suffices for known platforms and
159 <     * applications.
160 <     *
161 <     * Probing: On sensed contention of a selected slot, we probe
162 <     * sequentially through the table, analogously to linear probing
163 <     * after collision in a hash table.  (We move circularly, in
164 <     * reverse order, to mesh best with table growth and shrinkage
165 <     * rules.)  Except that to minimize the effects of false-alarms
166 <     * and cache thrashing, we try the first selected slot twice
167 <     * before moving.
168 <     *
169 <     * Padding: Even with contention management, slots are heavily
170 <     * contended, so use cache-padding to avoid poor memory
171 <     * performance.  Because of this, slots are lazily constructed
172 <     * only when used, to avoid wasting this space unnecessarily.
173 <     * While isolation of locations is not much of an issue at first
174 <     * in an application, as time goes on and garbage-collectors
175 <     * perform compaction, slots are very likely to be moved adjacent
176 <     * to each other, which can cause much thrashing of cache lines on
177 <     * MPs unless padding is employed.
178 <     *
179 <     * This is an improvement of the algorithm described in the paper
180 <     * "A Scalable Elimination-based Exchange Channel" by William
181 <     * Scherer, Doug Lea, and Michael Scott in Proceedings of SCOOL05
182 <     * workshop.  Available at: http://hdl.handle.net/1802/2104
82 >     *   for(;;) {
83 >     *       if (slot is empty) {                       // offer
84 >     *           place item in a Node;
85 >     *           if (can CAS slot from empty to node) {
86 >     *                wait for release;
87 >     *                return matching item in node;
88 >     *           }
89 >     *      }
90 >     *      else if (can CAS slot from node to empty) { // release
91 >     *          get the item in node;
92 >     *          set matching item in node;
93 >     *          release waiting thread;
94 >     *     }
95 >     *     // else retry on CAS failure
96 >     *  }
97 >     *
98 >     * This is among the simplest forms of a "dual data structure" --
99 >     * see Scott and Scherer's DISC 04 paper and
100 >     * http://www.cs.rochester.edu/research/synchronization/pseudocode/duals.html
101 >     *
102 >     * This works great in principle. But in practice, like many
103 >     * algorithms centered on atomic updates to a single location, it
104 >     * scales horribly when there are more than a few participants
105 >     * using the same Exchanger. So the implementation instead uses a
106 >     * form of elimination arena, that spreads out this contention by
107 >     * arranging that some threads typically use different slots,
108 >     * while still ensuring that eventually, any two parties will be
109 >     * able to exchange items. That is, we cannot completely partition
110 >     * across threads, but instead give threads arena indices that
111 >     * will on average grow under contention and shrink under lack of
112 >     * contention. We approach this by defining the Nodes that we need
113 >     * anyway as ThreadLocals, and include in them per-thread index
114 >     * and related bookkeeping state. (We can safely reuse per-thread
115 >     * nodes rather than creating them fresh each time because slots
116 >     * alternate between pointing to a node vs null, so cannot
117 >     * encounter ABA problems. However, we do need som care in
118 >     * resetting them between uses.)
119 >     *
120 >     * Implementing an effective arena requires allocating a bunch of
121 >     * space, so we only do so upon detecting contention (except on
122 >     * uniprocessors, where they wouldn't help, so aren't used).
123 >     * Otherwise, exchanges use the single-slot slotExchange method.
124 >     * On contention, not only must the slots be in different
125 >     * locations, but the locations must not encounter memory
126 >     * contention due to being on the same cache line (or more
127 >     * generally, the same coherence unit).  Because, as of this
128 >     * writing, there is no way to determine cacheline size, we define
129 >     * a value that is enough for common platforms.  Additionally,
130 >     * extra care elsewhere is taken to avoid other false/unintended
131 >     * sharing and to enhance locality, including adding padding to
132 >     * Nodes, embedding "bound" as an Exchanger field, and reworking
133 >     * some park/unpark mechanics compared to LockSupport versions.
134 >     *
135 >     * The arena starts out with only one used slot. We expand the
136 >     * effective arena size by tracking collisions; i.e., failed CASes
137 >     * while trying to exchange. By nature of the above algorithm, the
138 >     * only kinds of collision that reliably indicate contention are
139 >     * when two attempted releases collide -- one of two attempted
140 >     * offers can legitimately fail to CAS without indicating
141 >     * contention by more than one other thread. (Note: it is possible
142 >     * but not worthwhile to more precisely detect contention by
143 >     * reading slot values after CAS failures.)  When a thread has
144 >     * collided at each slot within the current arena bound, it tries
145 >     * to expand the arena size by one. We track collisions within
146 >     * bounds by using a version (sequence) number on the "bound"
147 >     * field, and conservatively reset collision counts when a
148 >     * participant notices that bound has been updated (in either
149 >     * direction).
150 >     *
151 >     * The effective arena size is reduced (when there is more than
152 >     * one slot) by giving up on waiting after a while and trying to
153 >     * decrement the arena size on expiration. The value of "a while"
154 >     * is an empirical matter.  We implement by piggybacking on the
155 >     * use of spin->yield->block that is essential for reasonable
156 >     * waiting performance anyway -- in a busy exchanger, offers are
157 >     * usually almost immediately released, in which case context
158 >     * switching on multiprocessors is extremely slow/wasteful.  Arena
159 >     * waits just omit the blocking part, and instead cancel. The spin
160 >     * count is empirically chosen to be a value that avoids blocking
161 >     * 99% of the time under maximum sustained exchange rates on a
162 >     * range of test machines. Spins and yields entail some limited
163 >     * randomness (using a cheap xorshift) to avoid regular patterns
164 >     * that can induce unproductive grow/shrink cycles. (Using a
165 >     * pseudorandom also helps regularize spin cycle duration by
166 >     * making branches unpredictable.)  Also, during an offer, a
167 >     * waiter can "know" that it will be released when its slot has
168 >     * changed, but cannot yet proceed until match is set.  In the
169 >     * mean time it cannot cancel the offer, so instead spins/yields.
170 >     * Note: It is possible to avoid this secondary check by changing
171 >     * the linearization point to be a CAS of the match field (as done
172 >     * in one case in the Scott & Scherer DISC paper), which also
173 >     * increases asynchrony a bit, at the expense of poorer collision
174 >     * detection and inability to always reuse per-thread nodes. So
175 >     * the current scheme is typically a better tradeoff.
176 >     *
177 >     * On collisions, indices traverse the arena cyclically in reverse
178 >     * order, restarting at the maximum index (which will tend to be
179 >     * sparsest) when bounds change. (On expirations, indices instead
180 >     * are halved until reaching 0.) It is possible (and has been
181 >     * tried) to use randomized, prime-value-stepped, or double-hash
182 >     * style traversal instead of simple cyclic traversal to reduce
183 >     * bunching.  But empirically, whatever benefits these may have
184 >     * don't overcome their added overhead: We are managing operations
185 >     * that occur very quickly unless there is sustained contention,
186 >     * so simpler/faster control policies work better than more
187 >     * accurate but slower ones.
188 >     *
189 >     * Because we use expiration for arena size control, we cannot
190 >     * throw TimeoutExceptions in the timed version of the public
191 >     * exchange method until the arena size has shrunken to zero (or
192 >     * the arena isn't enabled). This may delay response to timeout
193 >     * but is still within spec.
194 >     *
195 >     * Essentially all of the implementation is in methods
196 >     * slotExchange and arenaExchange. These have similar overall
197 >     * structure, but differ in too many details to combine. The
198 >     * slotExchange method uses the single Exchanger field "slot"
199 >     * rather than arena array elements. However, it still needs
200 >     * minimal collision detection to trigger arena construction.
201 >     * (The messiest part is making sure interrupt status and
202 >     * InterruptedExceptions come out right during transitions when
203 >     * both methods may be called. This is done by using null return
204 >     * as a sentinel to recheck interrupt status.)
205 >     *
206 >     * As is too common in this sort of code, methods are monolothic
207 >     * because most of the logic relies on reads of fields that are
208 >     * maintained as local variables so can't be nicely factored --
209 >     * mainly, here, bulky spin->yield->block/cancel code), and
210 >     * heavily dependent on intrinsics (Unsafe) to use inlined
211 >     * embedded CAS and related mameory access operations (that tend
212 >     * not to be as readily inlined by dynamic compilers when they are
213 >     * hidden behind other methods that would more nicely name and
214 >     * encapsulate the intended effects). This includes the use of
215 >     * putOrderedX to clear fields of the per-thread Nodes between
216 >     * uses. Note that field Node.item is not declared as volatile
217 >     * even though it is read by releasing threads, because they only
218 >     * do so after CAS operations that must preceed access, and all
219 >     * uses by the owning thread are otherwise acceptably ordered by
220 >     * other operations. (Because the actual points of atomicity are
221 >     * slot CASes, it would also be legal for the write to Node.match
222 >     * in a release to be weaker than a full volatile write. However,
223 >     * this is not done because it could allow further postponement of
224 >     * the write, delaying progress.)
225 >     */
226 >
227 >    /**
228 >     * The byte distance (as a shift value) between any two used slots
229 >     * in the arena.  1 << ASHIFT should be at least cacheline size.
230 >     */
231 >    private static final int ASHIFT = 7;
232 >
233 >    /**
234 >     * The maximum supported arena index. The maximum allocatable
235 >     * arena size is MMASK + 1. Must be a power of two minus one, less
236 >     * than (1<<(31-ASHIFT)). The cap of 255 (0xff) more than suffices
237 >     * for the expected scaling limits of the main algorithms.
238       */
239 <
183 <    /** The number of CPUs, for sizing and spin control */
184 <    private static final int NCPU = Runtime.getRuntime().availableProcessors();
239 >    private static final int MMASK  = 0xff;
240  
241      /**
242 <     * The capacity of the arena.  Set to a value that provides more
243 <     * than enough space to handle contention.  On small machines
189 <     * most slots won't be used, but it is still not wasted because
190 <     * the extra space provides some machine-level address padding
191 <     * to minimize interference with heavily CAS'ed Slot locations.
192 <     * And on very large machines, performance eventually becomes
193 <     * bounded by memory bandwidth, not numbers of threads/CPUs.
194 <     * This constant cannot be changed without also modifying
195 <     * indexing and hashing algorithms.
242 >     * Unit for sequence/version bits of bound field. Each successful
243 >     * change to the bound also adds SEQ.
244       */
245 <    private static final int CAPACITY = 32;
245 >    private static final int SEQ    = MMASK + 1;
246  
247 <    /**
248 <     * The value of "max" that will hold all threads without
201 <     * contention.  When this value is less than CAPACITY, some
202 <     * otherwise wasted expansion can be avoided.
203 <     */
204 <    private static final int FULL =
205 <        Math.max(0, Math.min(CAPACITY, NCPU / 2) - 1);
206 <
207 <    /**
208 <     * The number of times to spin (doing nothing except polling a
209 <     * memory location) before blocking or giving up while waiting to
210 <     * be fulfilled.  Should be zero on uniprocessors.  On
211 <     * multiprocessors, this value should be large enough so that two
212 <     * threads exchanging items as fast as possible block only when
213 <     * one of them is stalled (due to GC or preemption), but not much
214 <     * longer, to avoid wasting CPU resources.  Seen differently, this
215 <     * value is a little over half the number of cycles of an average
216 <     * context switch time on most systems.  The value here is
217 <     * approximately the average of those across a range of tested
218 <     * systems.
219 <     */
220 <    private static final int SPINS = (NCPU == 1) ? 0 : 2000;
247 >    /** The number of CPUs, for sizing and spin control */
248 >    private static final int NCPU = Runtime.getRuntime().availableProcessors();
249  
250      /**
251 <     * The number of times to spin before blocking in timed waits.
252 <     * Timed waits spin more slowly because checking the time takes
253 <     * time.  The best value relies mainly on the relative rate of
226 <     * System.nanoTime vs memory accesses.  The value is empirically
227 <     * derived to work well across a variety of systems.
251 >     * The maximum slot index of the arena: The number of slots that
252 >     * can in principle hold all threads without contention, or at
253 >     * most the maximum indexable value.
254       */
255 <    private static final int TIMED_SPINS = SPINS / 20;
255 >    static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
256  
257      /**
258 <     * Sentinel item representing cancellation of a wait due to
259 <     * interruption, timeout, or elapsed spin-waits.  This value is
260 <     * placed in holes on cancellation, and used as a return value
235 <     * from waiting methods to indicate failure to set or get hole.
258 >     * The bound for spins while waiting for a match. The actual
259 >     * number of iterations will on average be about twice this value
260 >     * due to randomization. Note: Spinning is disabled when NCPU==1.
261       */
262 <    private static final Object CANCEL = new Object();
262 >    private static final int SPINS = 1 << 10;
263  
264      /**
265       * Value representing null arguments/returns from public
266 <     * methods.  This disambiguates from internal requirement that
267 <     * holes start out as null to mean they are not yet set.
266 >     * methods. Needed because the API originally didn't disallow null
267 >     * arguments, which it should have.
268       */
269      private static final Object NULL_ITEM = new Object();
270  
271      /**
272 <     * Nodes hold partially exchanged data.  This class
273 <     * opportunistically subclasses AtomicReference to represent the
274 <     * hole.  So get() returns hole, and compareAndSet CAS'es value
250 <     * into hole.  This class cannot be parameterized as "V" because
251 <     * of the use of non-V CANCEL sentinels.
252 <     */
253 <    @SuppressWarnings("serial")
254 <    private static final class Node extends AtomicReference<Object> {
255 <        /** The element offered by the Thread creating this node. */
256 <        public final Object item;
257 <
258 <        /** The Thread waiting to be signalled; null until waiting. */
259 <        public volatile Thread waiter;
260 <
261 <        /**
262 <         * Creates node with given item and empty hole.
263 <         * @param item the item
264 <         */
265 <        public Node(Object item) {
266 <            this.item = item;
267 <        }
268 <    }
269 <
270 <    /**
271 <     * A Slot is an AtomicReference with heuristic padding to lessen
272 <     * cache effects of this heavily CAS'ed location.  While the
273 <     * padding adds noticeable space, all slots are created only on
274 <     * demand, and there will be more than one of them only when it
275 <     * would improve throughput more than enough to outweigh using
276 <     * extra space.
277 <     */
278 <    @SuppressWarnings("serial")
279 <    private static final class Slot extends AtomicReference<Object> {
280 <        // Improve likelihood of isolation on <= 128 byte cache lines.
281 <        // We used to target 64 byte cache lines, but some x86s (including
282 <        // i7 under some BIOSes) actually use 128 byte cache lines.
283 <        long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe;
284 <    }
285 <
286 <    /**
287 <     * Slot array.  Elements are lazily initialized when needed.
288 <     * Declared volatile to enable double-checked lazy construction.
272 >     * Sentinel value returned by internal exchange methods upon
273 >     * timeout, to avoid need for separate timed versions of these
274 >     * methods.
275       */
276 <    private volatile Slot[] arena = new Slot[CAPACITY];
276 >    private static final Object TIMED_OUT = new Object();
277  
278      /**
279 <     * The maximum slot index being used.  The value sometimes
280 <     * increases when a thread experiences too many CAS contentions,
295 <     * and sometimes decreases when a spin-wait elapses.  Changes
296 <     * are performed only via compareAndSet, to avoid stale values
297 <     * when a thread happens to stall right before setting.
279 >     * Nodes hold partially exchanged data, plus other per-thread
280 >     * bookkeeping.
281       */
282 <    private final AtomicInteger max = new AtomicInteger();
282 >    static final class Node {
283 >        int index;              // Arena index
284 >        int bound;              // Last recorded value of Exchanger.bound
285 >        int collides;           // Number of CAS failures at current bound
286 >        int hash;               // Pseudo-random for spins
287 >        Object item;            // This thread's current item
288 >        volatile Object match;  // Item provided by releasing thread
289 >        volatile Thread parked; // Set to this thread when parked, else null
290  
291 <    /**
292 <     * Main exchange function, handling the different policy variants.
293 <     * Uses Object, not "V" as argument and return value to simplify
304 <     * handling of sentinel values.  Callers from public methods decode
305 <     * and cast accordingly.
306 <     *
307 <     * @param item the (non-null) item to exchange
308 <     * @param timed true if the wait is timed
309 <     * @param nanos if timed, the maximum wait time
310 <     * @return the other thread's item, or CANCEL if interrupted or timed out
311 <     */
312 <    private Object doExchange(Object item, boolean timed, long nanos) {
313 <        Node me = new Node(item);                 // Create in case occupying
314 <        int index = hashIndex();                  // Index of current slot
315 <        int fails = 0;                            // Number of CAS failures
316 <
317 <        for (;;) {
318 <            Object y;                             // Contents of current slot
319 <            Slot slot = arena[index];
320 <            if (slot == null)                     // Lazily initialize slots
321 <                createSlot(index);                // Continue loop to reread
322 <            else if ((y = slot.get()) != null &&  // Try to fulfill
323 <                     slot.compareAndSet(y, null)) {
324 <                Node you = (Node)y;               // Transfer item
325 <                if (you.compareAndSet(null, item)) {
326 <                    LockSupport.unpark(you.waiter);
327 <                    return you.item;
328 <                }                                 // Else cancelled; continue
329 <            }
330 <            else if (y == null &&                 // Try to occupy
331 <                     slot.compareAndSet(null, me)) {
332 <                if (index == 0)                   // Blocking wait for slot 0
333 <                    return timed ?
334 <                        awaitNanos(me, slot, nanos) :
335 <                        await(me, slot);
336 <                Object v = spinWait(me, slot);    // Spin wait for non-0
337 <                if (v != CANCEL)
338 <                    return v;
339 <                me = new Node(item);              // Throw away cancelled node
340 <                int m = max.get();
341 <                if (m > (index >>>= 1))           // Decrease index
342 <                    max.compareAndSet(m, m - 1);  // Maybe shrink table
343 <            }
344 <            else if (++fails > 1) {               // Allow 2 fails on 1st slot
345 <                int m = max.get();
346 <                if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))
347 <                    index = m + 1;                // Grow on 3rd failed slot
348 <                else if (--index < 0)
349 <                    index = m;                    // Circularly traverse
350 <            }
351 <        }
291 >        // Padding to ameliorate unfortunate memory placements
292 >        Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe, pf;
293 >        Object q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe, qf;
294      }
295  
296 <    /**
297 <     * Returns a hash index for the current thread.  Uses a one-step
298 <     * FNV-1a hash code (http://www.isthe.com/chongo/tech/comp/fnv/)
357 <     * based on the current thread's Thread.getId().  These hash codes
358 <     * have more uniform distribution properties with respect to small
359 <     * moduli (here 1-31) than do other simple hashing functions.
360 <     *
361 <     * <p>To return an index between 0 and max, we use a cheap
362 <     * approximation to a mod operation, that also corrects for bias
363 <     * due to non-power-of-2 remaindering (see {@link
364 <     * java.util.Random#nextInt}).  Bits of the hashcode are masked
365 <     * with "nbits", the ceiling power of two of table size (looked up
366 <     * in a table packed into three ints).  If too large, this is
367 <     * retried after rotating the hash by nbits bits, while forcing new
368 <     * top bit to 0, which guarantees eventual termination (although
369 <     * with a non-random-bias).  This requires an average of less than
370 <     * 2 tries for all table sizes, and has a maximum 2% difference
371 <     * from perfectly uniform slot probabilities when applied to all
372 <     * possible hash codes for sizes less than 32.
373 <     *
374 <     * @return a per-thread-random index, 0 <= index < max
375 <     */
376 <    private final int hashIndex() {
377 <        long id = Thread.currentThread().getId();
378 <        int hash = (((int)(id ^ (id >>> 32))) ^ 0x811c9dc5) * 0x01000193;
379 <
380 <        int m = max.get();
381 <        int nbits = (((0xfffffc00  >> m) & 4) | // Compute ceil(log2(m+1))
382 <                     ((0x000001f8 >>> m) & 2) | // The constants hold
383 <                     ((0xffff00f2 >>> m) & 1)); // a lookup table
384 <        int index;
385 <        while ((index = hash & ((1 << nbits) - 1)) > m)       // May retry on
386 <            hash = (hash >>> nbits) | (hash << (33 - nbits)); // non-power-2 m
387 <        return index;
296 >    /** The corresponding thread local class */
297 >    static final class Participant extends ThreadLocal<Node> {
298 >        public Node initialValue() { return new Node(); }
299      }
300  
301      /**
302 <     * Creates a new slot at given index.  Called only when the slot
303 <     * appears to be null.  Relies on double-check using builtin
304 <     * locks, since they rarely contend.  This in turn relies on the
394 <     * arena array being declared volatile.
395 <     *
396 <     * @param index the index to add slot at
397 <     */
398 <    private void createSlot(int index) {
399 <        // Create slot outside of lock to narrow sync region
400 <        Slot newSlot = new Slot();
401 <        Slot[] a = arena;
402 <        synchronized (a) {
403 <            if (a[index] == null)
404 <                a[index] = newSlot;
405 <        }
406 <    }
302 >     * Per-thread state
303 >     */
304 >    private final Participant participant;
305  
306      /**
307 <     * Tries to cancel a wait for the given node waiting in the given
308 <     * slot, if so, helping clear the node from its slot to avoid
309 <     * garbage retention.
310 <     *
413 <     * @param node the waiting node
414 <     * @param the slot it is waiting in
415 <     * @return true if successfully cancelled
416 <     */
417 <    private static boolean tryCancel(Node node, Slot slot) {
418 <        if (!node.compareAndSet(null, CANCEL))
419 <            return false;
420 <        if (slot.get() == node) // pre-check to minimize contention
421 <            slot.compareAndSet(node, null);
422 <        return true;
423 <    }
424 <
425 <    // Three forms of waiting. Each just different enough not to merge
426 <    // code with others.
307 >     * Elimination array; null until enabled (within slotExchange).
308 >     * Element accesses use emulation of volatile gets and CAS.
309 >     */
310 >    private volatile Node[] arena;
311  
312      /**
313 <     * Spin-waits for hole for a non-0 slot.  Fails if spin elapses
314 <     * before hole filled.  Does not check interrupt, relying on check
315 <     * in public exchange method to abort if interrupted on entry.
432 <     *
433 <     * @param node the waiting node
434 <     * @return on success, the hole; on failure, CANCEL
435 <     */
436 <    private static Object spinWait(Node node, Slot slot) {
437 <        int spins = SPINS;
438 <        for (;;) {
439 <            Object v = node.get();
440 <            if (v != null)
441 <                return v;
442 <            else if (spins > 0)
443 <                --spins;
444 <            else
445 <                tryCancel(node, slot);
446 <        }
447 <    }
313 >     * Slot used until contention detected.
314 >     */
315 >    private volatile Node slot;
316  
317      /**
318 <     * Waits for (by spinning and/or blocking) and gets the hole
319 <     * filled in by another thread.  Fails if interrupted before
320 <     * hole filled.
321 <     *
322 <     * When a node/thread is about to block, it sets its waiter field
323 <     * and then rechecks state at least one more time before actually
456 <     * parking, thus covering race vs fulfiller noticing that waiter
457 <     * is non-null so should be woken.
458 <     *
459 <     * Thread interruption status is checked only surrounding calls to
460 <     * park.  The caller is assumed to have checked interrupt status
461 <     * on entry.
462 <     *
463 <     * @param node the waiting node
464 <     * @return on success, the hole; on failure, CANCEL
465 <     */
466 <    private static Object await(Node node, Slot slot) {
467 <        Thread w = Thread.currentThread();
468 <        int spins = SPINS;
469 <        for (;;) {
470 <            Object v = node.get();
471 <            if (v != null)
472 <                return v;
473 <            else if (spins > 0)                 // Spin-wait phase
474 <                --spins;
475 <            else if (node.waiter == null)       // Set up to block next
476 <                node.waiter = w;
477 <            else if (w.isInterrupted())         // Abort on interrupt
478 <                tryCancel(node, slot);
479 <            else                                // Block
480 <                LockSupport.park(node);
481 <        }
482 <    }
318 >     * The index of the largest valid arena position, OR'ed with SEQ
319 >     * number in high bits, incremented on each update.  The initial
320 >     * update from 0 to SEQ is used to ensure that the arena array is
321 >     * constructed only once.
322 >     */
323 >    private volatile int bound;
324  
325      /**
326 <     * Waits for (at index 0) and gets the hole filled in by another
327 <     * thread.  Fails if timed out or interrupted before hole filled.
328 <     * Same basic logic as untimed version, but a bit messier.
329 <     *
330 <     * @param node the waiting node
331 <     * @param nanos the wait time
332 <     * @return on success, the hole; on failure, CANCEL
333 <     */
334 <    private Object awaitNanos(Node node, Slot slot, long nanos) {
335 <        int spins = TIMED_SPINS;
336 <        long deadline = 0L;
337 <        Thread w = null;
338 <        for (;;) {
339 <            Object v = node.get();
340 <            if (v != null)
326 >     * Exchange function when arenas enabled. See above for explanation.
327 >     *
328 >     * @param item the (nonnull) item to exchange
329 >     * @param timed true if the wait is timed
330 >     * @param ns if timed, the maximum wait time else 0L
331 >     * @return the other thread's item; or null if interrupted; or
332 >     * TIMED_OUT if timed and timed out
333 >     */
334 >    private final Object arenaExchange(Object item, boolean timed, long ns) {
335 >        Node[] a = arena;
336 >        Node p = participant.get();
337 >        for (int i = p.index;;) {                      // access slot at i
338 >            int b, m, c; long j;                       // j is raw array offset
339 >            Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
340 >            if (q != null && U.compareAndSwapObject(a, j, q, null)) {
341 >                Object v = q.item;                     // release
342 >                q.match = item;
343 >                Thread w = q.parked;
344 >                if (w != null)
345 >                    U.unpark(w);
346                  return v;
501            long now = System.nanoTime();
502            if (w == null) {
503                deadline = now + nanos;
504                w = Thread.currentThread();
347              }
348 <            else
349 <                nanos = deadline - now;
350 <            if (nanos > 0L) {
351 <                if (spins > 0)
352 <                    --spins;
353 <                else if (node.waiter == null)
354 <                    node.waiter = w;
355 <                else if (w.isInterrupted())
356 <                    tryCancel(node, slot);
348 >            else if (i <= (m = (b = bound) & MMASK) && q == null) {
349 >                p.item = item;                         // offer
350 >                if (U.compareAndSwapObject(a, j, null, p)) {
351 >                    long end = (timed && m == 0)? System.nanoTime() + ns : 0L;
352 >                    Thread t = Thread.currentThread(); // wait
353 >                    for (int h = p.hash, spins = SPINS;;) {
354 >                        Object v = p.match;
355 >                        if (v != null) {
356 >                            U.putOrderedObject(p, MATCH, null);
357 >                            p.item = null;             // clear for next use
358 >                            p.hash = h;
359 >                            return v;
360 >                        }
361 >                        else if (spins > 0) {
362 >                            h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
363 >                            if (h == 0)                // initialize hash
364 >                                h = SPINS | (int)t.getId();
365 >                            else if (h < 0 &&          // approx 50% true
366 >                                     (--spins & ((SPINS >>> 1) - 1)) == 0)
367 >                                Thread.yield();        // two yields per wait
368 >                        }
369 >                        else if (U.getObjectVolatile(a, j) != p)
370 >                            spins = SPINS;       // releaser hasn't set match yet
371 >                        else if (!t.isInterrupted() && m == 0 &&
372 >                                 (!timed ||
373 >                                  (ns = end - System.nanoTime()) > 0L)) {
374 >                            U.putObject(t, BLOCKER, this); // emulate LockSupport
375 >                            p.parked = t;              // minimize window
376 >                            if (U.getObjectVolatile(a, j) == p)
377 >                                U.park(false, ns);
378 >                            p.parked = null;
379 >                            U.putObject(t, BLOCKER, null);
380 >                        }
381 >                        else if (U.getObjectVolatile(a, j) == p &&
382 >                                 U.compareAndSwapObject(a, j, p, null)) {
383 >                            if (m != 0)                // try to shrink
384 >                                U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
385 >                            p.item = null;
386 >                            p.hash = h;
387 >                            i = p.index >>>= 1;        // descend
388 >                            if (Thread.interrupted())
389 >                                return null;
390 >                            if (timed && m == 0 && ns <= 0L)
391 >                                return TIMED_OUT;
392 >                            break;                     // expired; restart
393 >                        }
394 >                    }
395 >                }
396 >                else
397 >                    p.item = null;                     // clear offer
398 >            }
399 >            else {
400 >                if (p.bound != b) {                    // stale; reset
401 >                    p.bound = b;
402 >                    p.collides = 0;
403 >                    i = (i != m || m == 0) ? m : m - 1;
404 >                }
405 >                else if ((c = p.collides) < m || m == FULL ||
406 >                         !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
407 >                    p.collides = c + 1;
408 >                    i = (i == 0) ? m : i - 1;          // cyclically traverse
409 >                }
410                  else
411 <                    LockSupport.parkNanos(node, nanos);
411 >                    i = m + 1;                         // grow
412 >                p.index = i;
413              }
518            else if (tryCancel(node, slot) && !w.isInterrupted())
519                return scanOnTimeout(node);
414          }
415      }
416  
417      /**
418 <     * Sweeps through arena checking for any waiting threads.  Called
419 <     * only upon return from timeout while waiting in slot 0.  When a
420 <     * thread gives up on a timed wait, it is possible that a
421 <     * previously-entered thread is still waiting in some other
422 <     * slot.  So we scan to check for any.  This is almost always
423 <     * overkill, but decreases the likelihood of timeouts when there
424 <     * are other threads present to far less than that in lock-based
425 <     * exchangers in which earlier-arriving threads may still be
426 <     * waiting on entry locks.
427 <     *
428 <     * @param node the waiting node
429 <     * @return another thread's item, or CANCEL
430 <     */
431 <    private Object scanOnTimeout(Node node) {
432 <        Object y;
433 <        for (int j = arena.length - 1; j >= 0; --j) {
434 <            Slot slot = arena[j];
435 <            if (slot != null) {
436 <                while ((y = slot.get()) != null) {
437 <                    if (slot.compareAndSet(y, null)) {
438 <                        Node you = (Node)y;
439 <                        if (you.compareAndSet(null, node.item)) {
440 <                            LockSupport.unpark(you.waiter);
441 <                            return you.item;
548 <                        }
549 <                    }
418 >     * Exchange function used until arenas enabled. See above for explanation.
419 >     *
420 >     * @param item the item to exchange
421 >     * @param timed true if the wait is timed
422 >     * @param ns if timed, the maximum wait time else 0L
423 >     * @return the other thread's item; or null if either the arena
424 >     * was enabled or the thread was interrupted before completion; or
425 >     * TIMED_OUT if timed and timed out
426 >     */
427 >    private final Object slotExchange(Object item, boolean timed, long ns) {
428 >        Node p = participant.get();
429 >        Thread t = Thread.currentThread();
430 >        if (t.isInterrupted()) // preserve interrupt status so caller can recheck
431 >            return null;
432 >
433 >        for (Node q;;) {
434 >            if ((q = slot) != null) {
435 >                if (U.compareAndSwapObject(this, SLOT, q, null)) {
436 >                    Object v = q.item;
437 >                    q.match = item;
438 >                    Thread w = q.parked;
439 >                    if (w != null)
440 >                        U.unpark(w);
441 >                    return v;
442                  }
443 +                // create arena on contention, but continue until slot null
444 +                if (NCPU > 1 && bound == 0 &&
445 +                    U.compareAndSwapInt(this, BOUND, 0, SEQ))
446 +                    arena = new Node[(FULL + 2) << ASHIFT];
447 +            }
448 +            else if (arena != null)
449 +                return null; // caller must reroute to arenaExchange
450 +            else {
451 +                p.item = item;
452 +                if (U.compareAndSwapObject(this, SLOT, null, p))
453 +                    break;
454 +                p.item = null;
455              }
456          }
457 <        return CANCEL;
457 >
458 >        // await release
459 >        int h = p.hash;
460 >        long end = timed? System.nanoTime() + ns : 0L;
461 >        int spins = (NCPU > 1) ? SPINS : 1;
462 >        Object v;
463 >        while ((v = p.match) == null) {
464 >            if (spins > 0) {
465 >                h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
466 >                if (h == 0)
467 >                    h = SPINS | (int)t.getId();
468 >                else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
469 >                    Thread.yield();
470 >            }
471 >            else if (slot != p)
472 >                spins = SPINS;
473 >            else if (!t.isInterrupted() && arena == null &&
474 >                     (!timed || (ns = end - System.nanoTime()) > 0L)) {
475 >                U.putObject(t, BLOCKER, this);
476 >                p.parked = t;
477 >                if (slot == p)
478 >                    U.park(false, ns);
479 >                p.parked = null;
480 >                U.putObject(t, BLOCKER, null);
481 >            }
482 >            else if (U.compareAndSwapObject(this, SLOT, p, null)) {
483 >                v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
484 >                break;
485 >            }
486 >        }
487 >        U.putOrderedObject(p, MATCH, null);
488 >        p.item = null;
489 >        p.hash = h;
490 >        return v;
491      }
492  
493      /**
494       * Creates a new Exchanger.
495       */
496      public Exchanger() {
497 +        participant = new Participant();
498      }
499  
500      /**
# Line 594 | Line 532 | public class Exchanger<V> {
532       */
533      @SuppressWarnings("unchecked")
534      public V exchange(V x) throws InterruptedException {
535 <        if (!Thread.interrupted()) {
536 <            Object o = doExchange((x == null) ? NULL_ITEM : x, false, 0);
537 <            if (o == NULL_ITEM)
538 <                return null;
539 <            if (o != CANCEL)
540 <                return (V)o;
541 <            Thread.interrupted(); // Clear interrupt status on IE throw
542 <        }
605 <        throw new InterruptedException();
535 >        Object v;
536 >        Object item = (x == null)? NULL_ITEM : x; // translate null args
537 >        if ((arena != null ||
538 >             (v = slotExchange(item, false, 0L)) == null) &&
539 >            ((Thread.interrupted() || // disambiguates null return
540 >              (v = arenaExchange(item, false, 0L)) == null)))
541 >            throw new InterruptedException();
542 >        return (v == NULL_ITEM)? null : (V)v;
543      }
544  
545      /**
# Line 650 | Line 587 | public class Exchanger<V> {
587      @SuppressWarnings("unchecked")
588      public V exchange(V x, long timeout, TimeUnit unit)
589          throws InterruptedException, TimeoutException {
590 <        if (!Thread.interrupted()) {
591 <            Object o = doExchange((x == null) ? NULL_ITEM : x,
592 <                                  true, unit.toNanos(timeout));
593 <            if (o == NULL_ITEM)
594 <                return null;
595 <            if (o != CANCEL)
596 <                return (V)o;
597 <            if (!Thread.interrupted())
598 <                throw new TimeoutException();
590 >        Object v;
591 >        Object item = (x == null)? NULL_ITEM : x;
592 >        long ns = unit.toNanos(timeout);
593 >        if ((arena != null ||
594 >             (v = slotExchange(item, true, ns)) == null) &&
595 >            ((Thread.interrupted() ||
596 >              (v = arenaExchange(item, true, ns)) == null)))
597 >            throw new InterruptedException();
598 >        if (v == TIMED_OUT)
599 >            throw new TimeoutException();
600 >        return (v == NULL_ITEM)? null : (V)v;
601 >    }
602 >
603 >    // Unsafe mechanics
604 >    private static final sun.misc.Unsafe U;
605 >    private static final long BOUND;
606 >    private static final long SLOT;
607 >    private static final long MATCH;
608 >    private static final long BLOCKER;
609 >    private static final int ABASE;
610 >    static {
611 >        int s;
612 >        try {
613 >            U = sun.misc.Unsafe.getUnsafe();
614 >            Class<?> ek = Exchanger.class;
615 >            Class<?> nk = Node.class;
616 >            Class<?> ak = Node[].class;
617 >            Class<?> tk = Thread.class;
618 >            BOUND = U.objectFieldOffset
619 >                (ek.getDeclaredField("bound"));
620 >            SLOT = U.objectFieldOffset
621 >                (ek.getDeclaredField("slot"));
622 >            MATCH = U.objectFieldOffset
623 >                (nk.getDeclaredField("match"));
624 >            BLOCKER = U.objectFieldOffset
625 >                (tk.getDeclaredField("parkBlocker"));
626 >            s = U.arrayIndexScale(ak);
627 >            // ABASE absorbs padding in front of element 0
628 >            ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);
629 >
630 >        } catch (Exception e) {
631 >            throw new Error(e);
632          }
633 <        throw new InterruptedException();
633 >        if ((s & (s-1)) != 0 || s > (1 << ASHIFT))
634 >            throw new Error("Unsupported array scale");
635      }
636 +
637   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines