ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
Revision: 1.74
Committed: Tue Apr 19 22:55:30 2016 UTC (8 years, 1 month ago) by jsr166
Branch: MAIN
Changes since 1.73: +4 -4 lines
Log Message:
s~\bsun\.(misc\.Unsafe)\b~jdk.internal.$1~g;
s~\bputOrdered([A-Za-z]+)\b~put${1}Release~g

File Contents

# User Rev Content
1 dl 1.2 /*
2 dl 1.16 * Written by Doug Lea, Bill Scherer, and Michael Scott with
3     * assistance from members of JCP JSR-166 Expert Group and released to
4     * the public domain, as explained at
5 jsr166 1.48 * http://creativecommons.org/publicdomain/zero/1.0/
6 dl 1.2 */
7    
8 tim 1.1 package java.util.concurrent;
9 jsr166 1.66
10 tim 1.1 /**
11 dl 1.28 * A synchronization point at which threads can pair and swap elements
12 jsr166 1.39 * within pairs. Each thread presents some object on entry to the
13 dl 1.28 * {@link #exchange exchange} method, matches with a partner thread,
14 jsr166 1.39 * and receives its partner's object on return. An Exchanger may be
15     * viewed as a bidirectional form of a {@link SynchronousQueue}.
16     * Exchangers may be useful in applications such as genetic algorithms
17     * and pipeline designs.
18 tim 1.1 *
19     * <p><b>Sample Usage:</b>
20 jsr166 1.29 * Here are the highlights of a class that uses an {@code Exchanger}
21     * to swap buffers between threads so that the thread filling the
22     * buffer gets a freshly emptied one when it needs it, handing off the
23     * filled one to the thread emptying the buffer.
24 jsr166 1.71 * <pre> {@code
25 tim 1.1 * class FillAndEmpty {
26 jsr166 1.65 * Exchanger<DataBuffer> exchanger = new Exchanger<>();
27 dl 1.9 * DataBuffer initialEmptyBuffer = ... a made-up type
28     * DataBuffer initialFullBuffer = ...
29 tim 1.1 *
30     * class FillingLoop implements Runnable {
31     * public void run() {
32 dl 1.9 * DataBuffer currentBuffer = initialEmptyBuffer;
33 tim 1.1 * try {
34     * while (currentBuffer != null) {
35     * addToBuffer(currentBuffer);
36 dl 1.30 * if (currentBuffer.isFull())
37 tim 1.1 * currentBuffer = exchanger.exchange(currentBuffer);
38     * }
39 tim 1.7 * } catch (InterruptedException ex) { ... handle ... }
40 tim 1.1 * }
41     * }
42     *
43     * class EmptyingLoop implements Runnable {
44     * public void run() {
45 dl 1.9 * DataBuffer currentBuffer = initialFullBuffer;
46 tim 1.1 * try {
47     * while (currentBuffer != null) {
48     * takeFromBuffer(currentBuffer);
49 dl 1.30 * if (currentBuffer.isEmpty())
50 tim 1.1 * currentBuffer = exchanger.exchange(currentBuffer);
51     * }
52 tim 1.7 * } catch (InterruptedException ex) { ... handle ...}
53 tim 1.1 * }
54     * }
55     *
56     * void start() {
57     * new Thread(new FillingLoop()).start();
58     * new Thread(new EmptyingLoop()).start();
59     * }
60 jsr166 1.50 * }}</pre>
61 tim 1.1 *
62 jsr166 1.27 * <p>Memory consistency effects: For each pair of threads that
63     * successfully exchange objects via an {@code Exchanger}, actions
64     * prior to the {@code exchange()} in each thread
65     * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
66     * those subsequent to a return from the corresponding {@code exchange()}
67     * in the other thread.
68 brian 1.22 *
69 tim 1.1 * @since 1.5
70 dl 1.16 * @author Doug Lea and Bill Scherer and Michael Scott
71 dl 1.11 * @param <V> The type of objects that may be exchanged
72 tim 1.1 */
73     public class Exchanger<V> {
74 dl 1.55
75 dl 1.16 /*
76 jsr166 1.57 * Overview: The core algorithm is, for an exchange "slot",
77 dl 1.55 * and a participant (caller) with an item:
78 dl 1.16 *
79 jsr166 1.61 * for (;;) {
80     * if (slot is empty) { // offer
81     * place item in a Node;
82     * if (can CAS slot from empty to node) {
83 jsr166 1.62 * wait for release;
84     * return matching item in node;
85 dl 1.55 * }
86 jsr166 1.61 * }
87     * else if (can CAS slot from node to empty) { // release
88     * get the item in node;
89     * set matching item in node;
90     * release waiting thread;
91     * }
92     * // else retry on CAS failure
93     * }
94 dl 1.55 *
95     * This is among the simplest forms of a "dual data structure" --
96     * see Scott and Scherer's DISC 04 paper and
97     * http://www.cs.rochester.edu/research/synchronization/pseudocode/duals.html
98     *
99     * This works great in principle. But in practice, like many
100     * algorithms centered on atomic updates to a single location, it
101     * scales horribly when there are more than a few participants
102     * using the same Exchanger. So the implementation instead uses a
103     * form of elimination arena, that spreads out this contention by
104     * arranging that some threads typically use different slots,
105     * while still ensuring that eventually, any two parties will be
106     * able to exchange items. That is, we cannot completely partition
107     * across threads, but instead give threads arena indices that
108     * will on average grow under contention and shrink under lack of
109     * contention. We approach this by defining the Nodes that we need
110     * anyway as ThreadLocals, and include in them per-thread index
111     * and related bookkeeping state. (We can safely reuse per-thread
112     * nodes rather than creating them fresh each time because slots
113     * alternate between pointing to a node vs null, so cannot
114 jsr166 1.57 * encounter ABA problems. However, we do need some care in
115 dl 1.55 * resetting them between uses.)
116     *
117     * Implementing an effective arena requires allocating a bunch of
118     * space, so we only do so upon detecting contention (except on
119     * uniprocessors, where they wouldn't help, so aren't used).
120     * Otherwise, exchanges use the single-slot slotExchange method.
121     * On contention, not only must the slots be in different
122     * locations, but the locations must not encounter memory
123     * contention due to being on the same cache line (or more
124     * generally, the same coherence unit). Because, as of this
125     * writing, there is no way to determine cacheline size, we define
126     * a value that is enough for common platforms. Additionally,
127     * extra care elsewhere is taken to avoid other false/unintended
128 dl 1.64 * sharing and to enhance locality, including adding padding (via
129 jsr166 1.73 * @Contended) to Nodes, embedding "bound" as an Exchanger field,
130     * and reworking some park/unpark mechanics compared to
131 dl 1.64 * LockSupport versions.
132 dl 1.55 *
133     * The arena starts out with only one used slot. We expand the
134     * effective arena size by tracking collisions; i.e., failed CASes
135     * while trying to exchange. By nature of the above algorithm, the
136     * only kinds of collision that reliably indicate contention are
137     * when two attempted releases collide -- one of two attempted
138     * offers can legitimately fail to CAS without indicating
139     * contention by more than one other thread. (Note: it is possible
140     * but not worthwhile to more precisely detect contention by
141     * reading slot values after CAS failures.) When a thread has
142     * collided at each slot within the current arena bound, it tries
143     * to expand the arena size by one. We track collisions within
144     * bounds by using a version (sequence) number on the "bound"
145     * field, and conservatively reset collision counts when a
146     * participant notices that bound has been updated (in either
147     * direction).
148     *
149     * The effective arena size is reduced (when there is more than
150     * one slot) by giving up on waiting after a while and trying to
151     * decrement the arena size on expiration. The value of "a while"
152     * is an empirical matter. We implement by piggybacking on the
153     * use of spin->yield->block that is essential for reasonable
154     * waiting performance anyway -- in a busy exchanger, offers are
155     * usually almost immediately released, in which case context
156     * switching on multiprocessors is extremely slow/wasteful. Arena
157     * waits just omit the blocking part, and instead cancel. The spin
158     * count is empirically chosen to be a value that avoids blocking
159     * 99% of the time under maximum sustained exchange rates on a
160     * range of test machines. Spins and yields entail some limited
161     * randomness (using a cheap xorshift) to avoid regular patterns
162     * that can induce unproductive grow/shrink cycles. (Using a
163     * pseudorandom also helps regularize spin cycle duration by
164     * making branches unpredictable.) Also, during an offer, a
165     * waiter can "know" that it will be released when its slot has
166     * changed, but cannot yet proceed until match is set. In the
167     * mean time it cannot cancel the offer, so instead spins/yields.
168     * Note: It is possible to avoid this secondary check by changing
169     * the linearization point to be a CAS of the match field (as done
170     * in one case in the Scott & Scherer DISC paper), which also
171     * increases asynchrony a bit, at the expense of poorer collision
172     * detection and inability to always reuse per-thread nodes. So
173     * the current scheme is typically a better tradeoff.
174     *
175     * On collisions, indices traverse the arena cyclically in reverse
176     * order, restarting at the maximum index (which will tend to be
177     * sparsest) when bounds change. (On expirations, indices instead
178     * are halved until reaching 0.) It is possible (and has been
179     * tried) to use randomized, prime-value-stepped, or double-hash
180     * style traversal instead of simple cyclic traversal to reduce
181     * bunching. But empirically, whatever benefits these may have
182     * don't overcome their added overhead: We are managing operations
183     * that occur very quickly unless there is sustained contention,
184     * so simpler/faster control policies work better than more
185     * accurate but slower ones.
186     *
187     * Because we use expiration for arena size control, we cannot
188     * throw TimeoutExceptions in the timed version of the public
189     * exchange method until the arena size has shrunken to zero (or
190     * the arena isn't enabled). This may delay response to timeout
191     * but is still within spec.
192     *
193     * Essentially all of the implementation is in methods
194     * slotExchange and arenaExchange. These have similar overall
195     * structure, but differ in too many details to combine. The
196     * slotExchange method uses the single Exchanger field "slot"
197     * rather than arena array elements. However, it still needs
198     * minimal collision detection to trigger arena construction.
199     * (The messiest part is making sure interrupt status and
200     * InterruptedExceptions come out right during transitions when
201     * both methods may be called. This is done by using null return
202     * as a sentinel to recheck interrupt status.)
203     *
204 jsr166 1.57 * As is too common in this sort of code, methods are monolithic
205 dl 1.55 * because most of the logic relies on reads of fields that are
206     * maintained as local variables so can't be nicely factored --
207     * mainly, here, bulky spin->yield->block/cancel code), and
208     * heavily dependent on intrinsics (Unsafe) to use inlined
209 jsr166 1.57 * embedded CAS and related memory access operations (that tend
210 dl 1.55 * not to be as readily inlined by dynamic compilers when they are
211     * hidden behind other methods that would more nicely name and
212     * encapsulate the intended effects). This includes the use of
213 jsr166 1.74 * putXRelease to clear fields of the per-thread Nodes between
214 dl 1.55 * uses. Note that field Node.item is not declared as volatile
215     * even though it is read by releasing threads, because they only
216 jsr166 1.57 * do so after CAS operations that must precede access, and all
217 dl 1.55 * uses by the owning thread are otherwise acceptably ordered by
218     * other operations. (Because the actual points of atomicity are
219     * slot CASes, it would also be legal for the write to Node.match
220     * in a release to be weaker than a full volatile write. However,
221     * this is not done because it could allow further postponement of
222     * the write, delaying progress.)
223     */
224    
225     /**
226     * The byte distance (as a shift value) between any two used slots
227     * in the arena. 1 << ASHIFT should be at least cacheline size.
228     */
229     private static final int ASHIFT = 7;
230    
231     /**
232     * The maximum supported arena index. The maximum allocatable
233     * arena size is MMASK + 1. Must be a power of two minus one, less
234     * than (1<<(31-ASHIFT)). The cap of 255 (0xff) more than suffices
235     * for the expected scaling limits of the main algorithms.
236 dl 1.16 */
237 jsr166 1.59 private static final int MMASK = 0xff;
238 dl 1.55
239     /**
240     * Unit for sequence/version bits of bound field. Each successful
241     * change to the bound also adds SEQ.
242     */
243 jsr166 1.59 private static final int SEQ = MMASK + 1;
244 dl 1.2
245 dl 1.32 /** The number of CPUs, for sizing and spin control */
246 dl 1.37 private static final int NCPU = Runtime.getRuntime().availableProcessors();
247 dl 1.32
248 jsr166 1.17 /**
249 dl 1.55 * The maximum slot index of the arena: The number of slots that
250     * can in principle hold all threads without contention, or at
251     * most the maximum indexable value.
252 dl 1.37 */
253 dl 1.55 static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
254 dl 1.37
255     /**
256 dl 1.55 * The bound for spins while waiting for a match. The actual
257     * number of iterations will on average be about twice this value
258     * due to randomization. Note: Spinning is disabled when NCPU==1.
259 dl 1.37 */
260 dl 1.55 private static final int SPINS = 1 << 10;
261 dl 1.37
262     /**
263 dl 1.55 * Value representing null arguments/returns from public
264     * methods. Needed because the API originally didn't disallow null
265     * arguments, which it should have.
266 dl 1.16 */
267 dl 1.55 private static final Object NULL_ITEM = new Object();
268 dl 1.34
269     /**
270 dl 1.55 * Sentinel value returned by internal exchange methods upon
271     * timeout, to avoid need for separate timed versions of these
272     * methods.
273 dl 1.34 */
274 dl 1.55 private static final Object TIMED_OUT = new Object();
275 dl 1.34
276     /**
277 dl 1.55 * Nodes hold partially exchanged data, plus other per-thread
278 jsr166 1.73 * bookkeeping. Padded via @Contended to reduce memory contention.
279 dl 1.34 */
280 jsr166 1.73 @jdk.internal.vm.annotation.Contended static final class Node {
281 dl 1.55 int index; // Arena index
282     int bound; // Last recorded value of Exchanger.bound
283     int collides; // Number of CAS failures at current bound
284     int hash; // Pseudo-random for spins
285     Object item; // This thread's current item
286     volatile Object match; // Item provided by releasing thread
287     volatile Thread parked; // Set to this thread when parked, else null
288     }
289    
290     /** The corresponding thread local class */
291     static final class Participant extends ThreadLocal<Node> {
292     public Node initialValue() { return new Node(); }
293     }
294 dl 1.32
295     /**
296 jsr166 1.72 * Per-thread state.
297 dl 1.32 */
298 dl 1.55 private final Participant participant;
299 dl 1.32
300     /**
301 dl 1.55 * Elimination array; null until enabled (within slotExchange).
302     * Element accesses use emulation of volatile gets and CAS.
303     */
304     private volatile Node[] arena;
305 dl 1.5
306 dl 1.34 /**
307 dl 1.55 * Slot used until contention detected.
308 dl 1.34 */
309 dl 1.55 private volatile Node slot;
310 dl 1.5
311 dl 1.16 /**
312 dl 1.55 * The index of the largest valid arena position, OR'ed with SEQ
313     * number in high bits, incremented on each update. The initial
314     * update from 0 to SEQ is used to ensure that the arena array is
315     * constructed only once.
316 dl 1.16 */
317 dl 1.55 private volatile int bound;
318 dl 1.2
319 dl 1.16 /**
320 dl 1.55 * Exchange function when arenas enabled. See above for explanation.
321 dl 1.30 *
322 jsr166 1.60 * @param item the (non-null) item to exchange
323 dl 1.30 * @param timed true if the wait is timed
324 jsr166 1.58 * @param ns if timed, the maximum wait time, else 0L
325 dl 1.55 * @return the other thread's item; or null if interrupted; or
326     * TIMED_OUT if timed and timed out
327     */
328     private final Object arenaExchange(Object item, boolean timed, long ns) {
329     Node[] a = arena;
330     Node p = participant.get();
331     for (int i = p.index;;) { // access slot at i
332     int b, m, c; long j; // j is raw array offset
333     Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
334     if (q != null && U.compareAndSwapObject(a, j, q, null)) {
335     Object v = q.item; // release
336     q.match = item;
337     Thread w = q.parked;
338     if (w != null)
339     U.unpark(w);
340     return v;
341 dl 1.37 }
342 dl 1.55 else if (i <= (m = (b = bound) & MMASK) && q == null) {
343     p.item = item; // offer
344     if (U.compareAndSwapObject(a, j, null, p)) {
345 jsr166 1.56 long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
346 dl 1.55 Thread t = Thread.currentThread(); // wait
347     for (int h = p.hash, spins = SPINS;;) {
348     Object v = p.match;
349     if (v != null) {
350 jsr166 1.74 U.putObjectRelease(p, MATCH, null);
351 dl 1.55 p.item = null; // clear for next use
352     p.hash = h;
353     return v;
354     }
355     else if (spins > 0) {
356     h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
357     if (h == 0) // initialize hash
358     h = SPINS | (int)t.getId();
359     else if (h < 0 && // approx 50% true
360     (--spins & ((SPINS >>> 1) - 1)) == 0)
361     Thread.yield(); // two yields per wait
362     }
363     else if (U.getObjectVolatile(a, j) != p)
364     spins = SPINS; // releaser hasn't set match yet
365     else if (!t.isInterrupted() && m == 0 &&
366     (!timed ||
367     (ns = end - System.nanoTime()) > 0L)) {
368     U.putObject(t, BLOCKER, this); // emulate LockSupport
369     p.parked = t; // minimize window
370     if (U.getObjectVolatile(a, j) == p)
371     U.park(false, ns);
372     p.parked = null;
373     U.putObject(t, BLOCKER, null);
374     }
375     else if (U.getObjectVolatile(a, j) == p &&
376     U.compareAndSwapObject(a, j, p, null)) {
377     if (m != 0) // try to shrink
378     U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
379     p.item = null;
380     p.hash = h;
381     i = p.index >>>= 1; // descend
382     if (Thread.interrupted())
383     return null;
384     if (timed && m == 0 && ns <= 0L)
385     return TIMED_OUT;
386     break; // expired; restart
387     }
388     }
389     }
390     else
391     p.item = null; // clear offer
392 dl 1.2 }
393 dl 1.55 else {
394     if (p.bound != b) { // stale; reset
395     p.bound = b;
396     p.collides = 0;
397     i = (i != m || m == 0) ? m : m - 1;
398     }
399     else if ((c = p.collides) < m || m == FULL ||
400     !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
401     p.collides = c + 1;
402     i = (i == 0) ? m : i - 1; // cyclically traverse
403     }
404     else
405     i = m + 1; // grow
406     p.index = i;
407 dl 1.34 }
408 dl 1.37 }
409     }
410 dl 1.2
411 dl 1.37 /**
412 dl 1.55 * Exchange function used until arenas enabled. See above for explanation.
413     *
414     * @param item the item to exchange
415     * @param timed true if the wait is timed
416 jsr166 1.58 * @param ns if timed, the maximum wait time, else 0L
417 dl 1.55 * @return the other thread's item; or null if either the arena
418     * was enabled or the thread was interrupted before completion; or
419     * TIMED_OUT if timed and timed out
420     */
421     private final Object slotExchange(Object item, boolean timed, long ns) {
422     Node p = participant.get();
423     Thread t = Thread.currentThread();
424     if (t.isInterrupted()) // preserve interrupt status so caller can recheck
425     return null;
426    
427     for (Node q;;) {
428     if ((q = slot) != null) {
429     if (U.compareAndSwapObject(this, SLOT, q, null)) {
430     Object v = q.item;
431     q.match = item;
432     Thread w = q.parked;
433     if (w != null)
434     U.unpark(w);
435     return v;
436     }
437     // create arena on contention, but continue until slot null
438     if (NCPU > 1 && bound == 0 &&
439     U.compareAndSwapInt(this, BOUND, 0, SEQ))
440     arena = new Node[(FULL + 2) << ASHIFT];
441     }
442     else if (arena != null)
443     return null; // caller must reroute to arenaExchange
444     else {
445     p.item = item;
446     if (U.compareAndSwapObject(this, SLOT, null, p))
447     break;
448     p.item = null;
449     }
450 dl 1.16 }
451    
452 dl 1.55 // await release
453     int h = p.hash;
454 jsr166 1.56 long end = timed ? System.nanoTime() + ns : 0L;
455 dl 1.55 int spins = (NCPU > 1) ? SPINS : 1;
456     Object v;
457     while ((v = p.match) == null) {
458     if (spins > 0) {
459     h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
460     if (h == 0)
461     h = SPINS | (int)t.getId();
462     else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
463     Thread.yield();
464 jsr166 1.53 }
465 dl 1.55 else if (slot != p)
466     spins = SPINS;
467     else if (!t.isInterrupted() && arena == null &&
468     (!timed || (ns = end - System.nanoTime()) > 0L)) {
469     U.putObject(t, BLOCKER, this);
470     p.parked = t;
471     if (slot == p)
472     U.park(false, ns);
473     p.parked = null;
474     U.putObject(t, BLOCKER, null);
475 dl 1.37 }
476 dl 1.55 else if (U.compareAndSwapObject(this, SLOT, p, null)) {
477     v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
478     break;
479 dl 1.16 }
480     }
481 jsr166 1.74 U.putObjectRelease(p, MATCH, null);
482 dl 1.55 p.item = null;
483     p.hash = h;
484     return v;
485 dl 1.37 }
486    
487     /**
488     * Creates a new Exchanger.
489     */
490     public Exchanger() {
491 dl 1.55 participant = new Participant();
492 tim 1.1 }
493    
494     /**
495     * Waits for another thread to arrive at this exchange point (unless
496 jsr166 1.44 * the current thread is {@linkplain Thread#interrupt interrupted}),
497 tim 1.1 * and then transfers the given object to it, receiving its object
498     * in return.
499 jsr166 1.17 *
500 tim 1.1 * <p>If another thread is already waiting at the exchange point then
501     * it is resumed for thread scheduling purposes and receives the object
502 jsr166 1.39 * passed in by the current thread. The current thread returns immediately,
503 tim 1.1 * receiving the object passed to the exchange by that other thread.
504 jsr166 1.17 *
505 jsr166 1.15 * <p>If no other thread is already waiting at the exchange then the
506 tim 1.1 * current thread is disabled for thread scheduling purposes and lies
507     * dormant until one of two things happens:
508     * <ul>
509     * <li>Some other thread enters the exchange; or
510 jsr166 1.45 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
511     * the current thread.
512 tim 1.1 * </ul>
513     * <p>If the current thread:
514     * <ul>
515 jsr166 1.15 * <li>has its interrupted status set on entry to this method; or
516 jsr166 1.44 * <li>is {@linkplain Thread#interrupt interrupted} while waiting
517 jsr166 1.15 * for the exchange,
518 tim 1.1 * </ul>
519 jsr166 1.15 * then {@link InterruptedException} is thrown and the current thread's
520     * interrupted status is cleared.
521 tim 1.1 *
522     * @param x the object to exchange
523 dl 1.30 * @return the object provided by the other thread
524     * @throws InterruptedException if the current thread was
525     * interrupted while waiting
526 jsr166 1.15 */
527 jsr166 1.52 @SuppressWarnings("unchecked")
528 tim 1.1 public V exchange(V x) throws InterruptedException {
529 dl 1.55 Object v;
530 jsr166 1.56 Object item = (x == null) ? NULL_ITEM : x; // translate null args
531 dl 1.55 if ((arena != null ||
532     (v = slotExchange(item, false, 0L)) == null) &&
533     ((Thread.interrupted() || // disambiguates null return
534     (v = arenaExchange(item, false, 0L)) == null)))
535     throw new InterruptedException();
536 jsr166 1.56 return (v == NULL_ITEM) ? null : (V)v;
537 tim 1.1 }
538    
539     /**
540     * Waits for another thread to arrive at this exchange point (unless
541 jsr166 1.44 * the current thread is {@linkplain Thread#interrupt interrupted} or
542 jsr166 1.31 * the specified waiting time elapses), and then transfers the given
543     * object to it, receiving its object in return.
544 tim 1.1 *
545     * <p>If another thread is already waiting at the exchange point then
546     * it is resumed for thread scheduling purposes and receives the object
547 jsr166 1.39 * passed in by the current thread. The current thread returns immediately,
548 tim 1.1 * receiving the object passed to the exchange by that other thread.
549     *
550 jsr166 1.15 * <p>If no other thread is already waiting at the exchange then the
551 tim 1.1 * current thread is disabled for thread scheduling purposes and lies
552     * dormant until one of three things happens:
553     * <ul>
554     * <li>Some other thread enters the exchange; or
555 jsr166 1.44 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
556     * the current thread; or
557 tim 1.1 * <li>The specified waiting time elapses.
558     * </ul>
559     * <p>If the current thread:
560     * <ul>
561 jsr166 1.15 * <li>has its interrupted status set on entry to this method; or
562 jsr166 1.44 * <li>is {@linkplain Thread#interrupt interrupted} while waiting
563 jsr166 1.15 * for the exchange,
564 tim 1.1 * </ul>
565 jsr166 1.15 * then {@link InterruptedException} is thrown and the current thread's
566     * interrupted status is cleared.
567 tim 1.1 *
568 dl 1.37 * <p>If the specified waiting time elapses then {@link
569     * TimeoutException} is thrown. If the time is less than or equal
570     * to zero, the method will not wait at all.
571 tim 1.1 *
572     * @param x the object to exchange
573     * @param timeout the maximum time to wait
574 jsr166 1.63 * @param unit the time unit of the {@code timeout} argument
575 dl 1.30 * @return the object provided by the other thread
576     * @throws InterruptedException if the current thread was
577     * interrupted while waiting
578     * @throws TimeoutException if the specified waiting time elapses
579     * before another thread enters the exchange
580 jsr166 1.15 */
581 jsr166 1.52 @SuppressWarnings("unchecked")
582 jsr166 1.15 public V exchange(V x, long timeout, TimeUnit unit)
583 tim 1.1 throws InterruptedException, TimeoutException {
584 dl 1.55 Object v;
585 jsr166 1.56 Object item = (x == null) ? NULL_ITEM : x;
586 dl 1.55 long ns = unit.toNanos(timeout);
587     if ((arena != null ||
588     (v = slotExchange(item, true, ns)) == null) &&
589     ((Thread.interrupted() ||
590     (v = arenaExchange(item, true, ns)) == null)))
591     throw new InterruptedException();
592     if (v == TIMED_OUT)
593     throw new TimeoutException();
594 jsr166 1.56 return (v == NULL_ITEM) ? null : (V)v;
595 dl 1.55 }
596    
597     // Unsafe mechanics
598 jsr166 1.74 private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
599 dl 1.55 private static final long BOUND;
600     private static final long SLOT;
601     private static final long MATCH;
602     private static final long BLOCKER;
603     private static final int ABASE;
604     static {
605     try {
606     BOUND = U.objectFieldOffset
607 jsr166 1.70 (Exchanger.class.getDeclaredField("bound"));
608 dl 1.55 SLOT = U.objectFieldOffset
609 jsr166 1.70 (Exchanger.class.getDeclaredField("slot"));
610    
611 dl 1.55 MATCH = U.objectFieldOffset
612 jsr166 1.70 (Node.class.getDeclaredField("match"));
613    
614 dl 1.55 BLOCKER = U.objectFieldOffset
615 jsr166 1.70 (Thread.class.getDeclaredField("parkBlocker"));
616    
617     int scale = U.arrayIndexScale(Node[].class);
618 jsr166 1.69 if ((scale & (scale - 1)) != 0 || scale > (1 << ASHIFT))
619     throw new Error("Unsupported array scale");
620 dl 1.55 // ABASE absorbs padding in front of element 0
621 jsr166 1.70 ABASE = U.arrayBaseOffset(Node[].class) + (1 << ASHIFT);
622 jsr166 1.68 } catch (ReflectiveOperationException e) {
623 dl 1.55 throw new Error(e);
624 dl 1.34 }
625     }
626 dl 1.55
627 tim 1.1 }