ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
Revision: 1.82
Committed: Sun Mar 11 18:00:06 2018 UTC (6 years, 2 months ago) by jsr166
Branch: MAIN
Changes since 1.81: +1 -1 lines
Log Message:
prefer throwing ExceptionInInitializerError from <clinit> to throwing Error

File Contents

# User Rev Content
1 dl 1.2 /*
2 dl 1.16 * Written by Doug Lea, Bill Scherer, and Michael Scott with
3     * assistance from members of JCP JSR-166 Expert Group and released to
4     * the public domain, as explained at
5 jsr166 1.48 * http://creativecommons.org/publicdomain/zero/1.0/
6 dl 1.2 */
7    
8 tim 1.1 package java.util.concurrent;
9 jsr166 1.76
10 dl 1.75 import java.lang.invoke.MethodHandles;
11     import java.lang.invoke.VarHandle;
12     import java.util.concurrent.locks.LockSupport;
13 jsr166 1.66
14 tim 1.1 /**
15 dl 1.28 * A synchronization point at which threads can pair and swap elements
16 jsr166 1.39 * within pairs. Each thread presents some object on entry to the
17 dl 1.28 * {@link #exchange exchange} method, matches with a partner thread,
18 jsr166 1.39 * and receives its partner's object on return. An Exchanger may be
19     * viewed as a bidirectional form of a {@link SynchronousQueue}.
20     * Exchangers may be useful in applications such as genetic algorithms
21     * and pipeline designs.
22 tim 1.1 *
23     * <p><b>Sample Usage:</b>
24 jsr166 1.29 * Here are the highlights of a class that uses an {@code Exchanger}
25     * to swap buffers between threads so that the thread filling the
26     * buffer gets a freshly emptied one when it needs it, handing off the
27     * filled one to the thread emptying the buffer.
28 jsr166 1.71 * <pre> {@code
29 tim 1.1 * class FillAndEmpty {
30 jsr166 1.65 * Exchanger<DataBuffer> exchanger = new Exchanger<>();
31 dl 1.9 * DataBuffer initialEmptyBuffer = ... a made-up type
32     * DataBuffer initialFullBuffer = ...
33 tim 1.1 *
34     * class FillingLoop implements Runnable {
35     * public void run() {
36 dl 1.9 * DataBuffer currentBuffer = initialEmptyBuffer;
37 tim 1.1 * try {
38     * while (currentBuffer != null) {
39     * addToBuffer(currentBuffer);
40 dl 1.30 * if (currentBuffer.isFull())
41 tim 1.1 * currentBuffer = exchanger.exchange(currentBuffer);
42     * }
43 tim 1.7 * } catch (InterruptedException ex) { ... handle ... }
44 tim 1.1 * }
45     * }
46     *
47     * class EmptyingLoop implements Runnable {
48     * public void run() {
49 dl 1.9 * DataBuffer currentBuffer = initialFullBuffer;
50 tim 1.1 * try {
51     * while (currentBuffer != null) {
52     * takeFromBuffer(currentBuffer);
53 dl 1.30 * if (currentBuffer.isEmpty())
54 tim 1.1 * currentBuffer = exchanger.exchange(currentBuffer);
55     * }
56 tim 1.7 * } catch (InterruptedException ex) { ... handle ...}
57 tim 1.1 * }
58     * }
59     *
60     * void start() {
61     * new Thread(new FillingLoop()).start();
62     * new Thread(new EmptyingLoop()).start();
63     * }
64 jsr166 1.50 * }}</pre>
65 tim 1.1 *
66 jsr166 1.27 * <p>Memory consistency effects: For each pair of threads that
67     * successfully exchange objects via an {@code Exchanger}, actions
68     * prior to the {@code exchange()} in each thread
69     * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
70     * those subsequent to a return from the corresponding {@code exchange()}
71     * in the other thread.
72 brian 1.22 *
73 tim 1.1 * @since 1.5
74 dl 1.16 * @author Doug Lea and Bill Scherer and Michael Scott
75 dl 1.11 * @param <V> The type of objects that may be exchanged
76 tim 1.1 */
77     public class Exchanger<V> {
78 dl 1.55
79 dl 1.16 /*
80 jsr166 1.57 * Overview: The core algorithm is, for an exchange "slot",
81 dl 1.55 * and a participant (caller) with an item:
82 dl 1.16 *
83 jsr166 1.61 * for (;;) {
84     * if (slot is empty) { // offer
85     * place item in a Node;
86     * if (can CAS slot from empty to node) {
87 jsr166 1.62 * wait for release;
88     * return matching item in node;
89 dl 1.55 * }
90 jsr166 1.61 * }
91     * else if (can CAS slot from node to empty) { // release
92     * get the item in node;
93     * set matching item in node;
94     * release waiting thread;
95     * }
96     * // else retry on CAS failure
97     * }
98 dl 1.55 *
99     * This is among the simplest forms of a "dual data structure" --
100     * see Scott and Scherer's DISC 04 paper and
101     * http://www.cs.rochester.edu/research/synchronization/pseudocode/duals.html
102     *
103     * This works great in principle. But in practice, like many
104     * algorithms centered on atomic updates to a single location, it
105     * scales horribly when there are more than a few participants
106     * using the same Exchanger. So the implementation instead uses a
107     * form of elimination arena, that spreads out this contention by
108     * arranging that some threads typically use different slots,
109     * while still ensuring that eventually, any two parties will be
110     * able to exchange items. That is, we cannot completely partition
111     * across threads, but instead give threads arena indices that
112     * will on average grow under contention and shrink under lack of
113     * contention. We approach this by defining the Nodes that we need
114     * anyway as ThreadLocals, and include in them per-thread index
115     * and related bookkeeping state. (We can safely reuse per-thread
116     * nodes rather than creating them fresh each time because slots
117     * alternate between pointing to a node vs null, so cannot
118 jsr166 1.57 * encounter ABA problems. However, we do need some care in
119 dl 1.55 * resetting them between uses.)
120     *
121     * Implementing an effective arena requires allocating a bunch of
122     * space, so we only do so upon detecting contention (except on
123     * uniprocessors, where they wouldn't help, so aren't used).
124     * Otherwise, exchanges use the single-slot slotExchange method.
125     * On contention, not only must the slots be in different
126     * locations, but the locations must not encounter memory
127     * contention due to being on the same cache line (or more
128     * generally, the same coherence unit). Because, as of this
129     * writing, there is no way to determine cacheline size, we define
130     * a value that is enough for common platforms. Additionally,
131     * extra care elsewhere is taken to avoid other false/unintended
132 dl 1.64 * sharing and to enhance locality, including adding padding (via
133 dl 1.75 * @Contended) to Nodes, embedding "bound" as an Exchanger field.
134 dl 1.55 *
135     * The arena starts out with only one used slot. We expand the
136     * effective arena size by tracking collisions; i.e., failed CASes
137     * while trying to exchange. By nature of the above algorithm, the
138     * only kinds of collision that reliably indicate contention are
139     * when two attempted releases collide -- one of two attempted
140     * offers can legitimately fail to CAS without indicating
141     * contention by more than one other thread. (Note: it is possible
142     * but not worthwhile to more precisely detect contention by
143     * reading slot values after CAS failures.) When a thread has
144     * collided at each slot within the current arena bound, it tries
145     * to expand the arena size by one. We track collisions within
146     * bounds by using a version (sequence) number on the "bound"
147     * field, and conservatively reset collision counts when a
148     * participant notices that bound has been updated (in either
149     * direction).
150     *
151     * The effective arena size is reduced (when there is more than
152     * one slot) by giving up on waiting after a while and trying to
153     * decrement the arena size on expiration. The value of "a while"
154     * is an empirical matter. We implement by piggybacking on the
155     * use of spin->yield->block that is essential for reasonable
156     * waiting performance anyway -- in a busy exchanger, offers are
157     * usually almost immediately released, in which case context
158     * switching on multiprocessors is extremely slow/wasteful. Arena
159     * waits just omit the blocking part, and instead cancel. The spin
160     * count is empirically chosen to be a value that avoids blocking
161     * 99% of the time under maximum sustained exchange rates on a
162     * range of test machines. Spins and yields entail some limited
163     * randomness (using a cheap xorshift) to avoid regular patterns
164     * that can induce unproductive grow/shrink cycles. (Using a
165     * pseudorandom also helps regularize spin cycle duration by
166     * making branches unpredictable.) Also, during an offer, a
167     * waiter can "know" that it will be released when its slot has
168     * changed, but cannot yet proceed until match is set. In the
169     * mean time it cannot cancel the offer, so instead spins/yields.
170     * Note: It is possible to avoid this secondary check by changing
171     * the linearization point to be a CAS of the match field (as done
172     * in one case in the Scott & Scherer DISC paper), which also
173     * increases asynchrony a bit, at the expense of poorer collision
174     * detection and inability to always reuse per-thread nodes. So
175     * the current scheme is typically a better tradeoff.
176     *
177     * On collisions, indices traverse the arena cyclically in reverse
178     * order, restarting at the maximum index (which will tend to be
179     * sparsest) when bounds change. (On expirations, indices instead
180     * are halved until reaching 0.) It is possible (and has been
181     * tried) to use randomized, prime-value-stepped, or double-hash
182     * style traversal instead of simple cyclic traversal to reduce
183     * bunching. But empirically, whatever benefits these may have
184     * don't overcome their added overhead: We are managing operations
185     * that occur very quickly unless there is sustained contention,
186     * so simpler/faster control policies work better than more
187     * accurate but slower ones.
188     *
189     * Because we use expiration for arena size control, we cannot
190     * throw TimeoutExceptions in the timed version of the public
191     * exchange method until the arena size has shrunken to zero (or
192     * the arena isn't enabled). This may delay response to timeout
193     * but is still within spec.
194     *
195     * Essentially all of the implementation is in methods
196     * slotExchange and arenaExchange. These have similar overall
197     * structure, but differ in too many details to combine. The
198     * slotExchange method uses the single Exchanger field "slot"
199     * rather than arena array elements. However, it still needs
200     * minimal collision detection to trigger arena construction.
201     * (The messiest part is making sure interrupt status and
202     * InterruptedExceptions come out right during transitions when
203     * both methods may be called. This is done by using null return
204     * as a sentinel to recheck interrupt status.)
205     *
206 jsr166 1.57 * As is too common in this sort of code, methods are monolithic
207 dl 1.55 * because most of the logic relies on reads of fields that are
208     * maintained as local variables so can't be nicely factored --
209 jsr166 1.81 * mainly, here, bulky spin->yield->block/cancel code. Note that
210     * field Node.item is not declared as volatile even though it is
211     * read by releasing threads, because they only do so after CAS
212     * operations that must precede access, and all uses by the owning
213     * thread are otherwise acceptably ordered by other operations.
214     * (Because the actual points of atomicity are slot CASes, it
215     * would also be legal for the write to Node.match in a release to
216     * be weaker than a full volatile write. However, this is not done
217     * because it could allow further postponement of the write,
218     * delaying progress.)
219 dl 1.55 */
220    
221     /**
222 dl 1.78 * The index distance (as a shift value) between any two used slots
223     * in the arena, spacing them out to avoid false sharing.
224 dl 1.55 */
225 dl 1.78 private static final int ASHIFT = 5;
226 dl 1.55
227     /**
228     * The maximum supported arena index. The maximum allocatable
229     * arena size is MMASK + 1. Must be a power of two minus one, less
230     * than (1<<(31-ASHIFT)). The cap of 255 (0xff) more than suffices
231     * for the expected scaling limits of the main algorithms.
232 dl 1.16 */
233 jsr166 1.59 private static final int MMASK = 0xff;
234 dl 1.55
235     /**
236     * Unit for sequence/version bits of bound field. Each successful
237     * change to the bound also adds SEQ.
238     */
239 jsr166 1.59 private static final int SEQ = MMASK + 1;
240 dl 1.2
241 dl 1.32 /** The number of CPUs, for sizing and spin control */
242 dl 1.37 private static final int NCPU = Runtime.getRuntime().availableProcessors();
243 dl 1.32
244 jsr166 1.17 /**
245 dl 1.55 * The maximum slot index of the arena: The number of slots that
246     * can in principle hold all threads without contention, or at
247     * most the maximum indexable value.
248 dl 1.37 */
249 dl 1.55 static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
250 dl 1.37
251     /**
252 dl 1.55 * The bound for spins while waiting for a match. The actual
253     * number of iterations will on average be about twice this value
254     * due to randomization. Note: Spinning is disabled when NCPU==1.
255 dl 1.37 */
256 dl 1.55 private static final int SPINS = 1 << 10;
257 dl 1.37
258     /**
259 dl 1.55 * Value representing null arguments/returns from public
260     * methods. Needed because the API originally didn't disallow null
261     * arguments, which it should have.
262 dl 1.16 */
263 dl 1.55 private static final Object NULL_ITEM = new Object();
264 dl 1.34
265     /**
266 dl 1.55 * Sentinel value returned by internal exchange methods upon
267     * timeout, to avoid need for separate timed versions of these
268     * methods.
269 dl 1.34 */
270 dl 1.55 private static final Object TIMED_OUT = new Object();
271 dl 1.34
272     /**
273 dl 1.55 * Nodes hold partially exchanged data, plus other per-thread
274 jsr166 1.73 * bookkeeping. Padded via @Contended to reduce memory contention.
275 dl 1.34 */
276 jsr166 1.73 @jdk.internal.vm.annotation.Contended static final class Node {
277 dl 1.55 int index; // Arena index
278     int bound; // Last recorded value of Exchanger.bound
279     int collides; // Number of CAS failures at current bound
280     int hash; // Pseudo-random for spins
281     Object item; // This thread's current item
282     volatile Object match; // Item provided by releasing thread
283     volatile Thread parked; // Set to this thread when parked, else null
284     }
285    
286     /** The corresponding thread local class */
287     static final class Participant extends ThreadLocal<Node> {
288     public Node initialValue() { return new Node(); }
289     }
290 dl 1.32
291     /**
292 jsr166 1.72 * Per-thread state.
293 dl 1.32 */
294 dl 1.55 private final Participant participant;
295 dl 1.32
296     /**
297 dl 1.55 * Elimination array; null until enabled (within slotExchange).
298     * Element accesses use emulation of volatile gets and CAS.
299     */
300     private volatile Node[] arena;
301 dl 1.5
302 dl 1.34 /**
303 dl 1.55 * Slot used until contention detected.
304 dl 1.34 */
305 dl 1.55 private volatile Node slot;
306 dl 1.5
307 dl 1.16 /**
308 dl 1.55 * The index of the largest valid arena position, OR'ed with SEQ
309     * number in high bits, incremented on each update. The initial
310     * update from 0 to SEQ is used to ensure that the arena array is
311     * constructed only once.
312 dl 1.16 */
313 dl 1.55 private volatile int bound;
314 dl 1.2
315 dl 1.16 /**
316 dl 1.55 * Exchange function when arenas enabled. See above for explanation.
317 dl 1.30 *
318 jsr166 1.60 * @param item the (non-null) item to exchange
319 dl 1.30 * @param timed true if the wait is timed
320 jsr166 1.58 * @param ns if timed, the maximum wait time, else 0L
321 dl 1.55 * @return the other thread's item; or null if interrupted; or
322     * TIMED_OUT if timed and timed out
323     */
324     private final Object arenaExchange(Object item, boolean timed, long ns) {
325     Node[] a = arena;
326 dl 1.75 int alen = a.length;
327 dl 1.55 Node p = participant.get();
328     for (int i = p.index;;) { // access slot at i
329 dl 1.75 int b, m, c;
330     int j = (i << ASHIFT) + ((1 << ASHIFT) - 1);
331     if (j < 0 || j >= alen)
332     j = alen - 1;
333 dl 1.80 Node q = (Node)AA.getAcquire(a, j);
334 dl 1.75 if (q != null && AA.compareAndSet(a, j, q, null)) {
335 dl 1.55 Object v = q.item; // release
336     q.match = item;
337     Thread w = q.parked;
338     if (w != null)
339 dl 1.75 LockSupport.unpark(w);
340 dl 1.55 return v;
341 dl 1.37 }
342 dl 1.55 else if (i <= (m = (b = bound) & MMASK) && q == null) {
343     p.item = item; // offer
344 dl 1.75 if (AA.compareAndSet(a, j, null, p)) {
345 jsr166 1.56 long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
346 dl 1.55 Thread t = Thread.currentThread(); // wait
347     for (int h = p.hash, spins = SPINS;;) {
348     Object v = p.match;
349     if (v != null) {
350 dl 1.75 MATCH.setRelease(p, null);
351 dl 1.55 p.item = null; // clear for next use
352     p.hash = h;
353     return v;
354     }
355     else if (spins > 0) {
356     h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
357     if (h == 0) // initialize hash
358     h = SPINS | (int)t.getId();
359     else if (h < 0 && // approx 50% true
360     (--spins & ((SPINS >>> 1) - 1)) == 0)
361     Thread.yield(); // two yields per wait
362     }
363 dl 1.80 else if (AA.getAcquire(a, j) != p)
364 dl 1.55 spins = SPINS; // releaser hasn't set match yet
365     else if (!t.isInterrupted() && m == 0 &&
366     (!timed ||
367     (ns = end - System.nanoTime()) > 0L)) {
368     p.parked = t; // minimize window
369 dl 1.80 if (AA.getAcquire(a, j) == p) {
370 dl 1.75 if (ns == 0L)
371     LockSupport.park(this);
372     else
373     LockSupport.parkNanos(this, ns);
374     }
375 dl 1.55 p.parked = null;
376     }
377 dl 1.80 else if (AA.getAcquire(a, j) == p &&
378 dl 1.75 AA.compareAndSet(a, j, p, null)) {
379 dl 1.55 if (m != 0) // try to shrink
380 dl 1.75 BOUND.compareAndSet(this, b, b + SEQ - 1);
381 dl 1.55 p.item = null;
382     p.hash = h;
383     i = p.index >>>= 1; // descend
384     if (Thread.interrupted())
385     return null;
386     if (timed && m == 0 && ns <= 0L)
387     return TIMED_OUT;
388     break; // expired; restart
389     }
390     }
391     }
392     else
393     p.item = null; // clear offer
394 dl 1.2 }
395 dl 1.55 else {
396     if (p.bound != b) { // stale; reset
397     p.bound = b;
398     p.collides = 0;
399     i = (i != m || m == 0) ? m : m - 1;
400     }
401     else if ((c = p.collides) < m || m == FULL ||
402 dl 1.75 !BOUND.compareAndSet(this, b, b + SEQ + 1)) {
403 dl 1.55 p.collides = c + 1;
404     i = (i == 0) ? m : i - 1; // cyclically traverse
405     }
406     else
407     i = m + 1; // grow
408     p.index = i;
409 dl 1.34 }
410 dl 1.37 }
411     }
412 dl 1.2
413 dl 1.37 /**
414 dl 1.55 * Exchange function used until arenas enabled. See above for explanation.
415     *
416     * @param item the item to exchange
417     * @param timed true if the wait is timed
418 jsr166 1.58 * @param ns if timed, the maximum wait time, else 0L
419 dl 1.55 * @return the other thread's item; or null if either the arena
420     * was enabled or the thread was interrupted before completion; or
421     * TIMED_OUT if timed and timed out
422     */
423     private final Object slotExchange(Object item, boolean timed, long ns) {
424     Node p = participant.get();
425     Thread t = Thread.currentThread();
426     if (t.isInterrupted()) // preserve interrupt status so caller can recheck
427     return null;
428    
429     for (Node q;;) {
430     if ((q = slot) != null) {
431 dl 1.75 if (SLOT.compareAndSet(this, q, null)) {
432 dl 1.55 Object v = q.item;
433     q.match = item;
434     Thread w = q.parked;
435     if (w != null)
436 dl 1.75 LockSupport.unpark(w);
437 dl 1.55 return v;
438     }
439     // create arena on contention, but continue until slot null
440     if (NCPU > 1 && bound == 0 &&
441 dl 1.75 BOUND.compareAndSet(this, 0, SEQ))
442 dl 1.55 arena = new Node[(FULL + 2) << ASHIFT];
443     }
444     else if (arena != null)
445     return null; // caller must reroute to arenaExchange
446     else {
447     p.item = item;
448 dl 1.75 if (SLOT.compareAndSet(this, null, p))
449 dl 1.55 break;
450     p.item = null;
451     }
452 dl 1.16 }
453    
454 dl 1.55 // await release
455     int h = p.hash;
456 jsr166 1.56 long end = timed ? System.nanoTime() + ns : 0L;
457 dl 1.55 int spins = (NCPU > 1) ? SPINS : 1;
458     Object v;
459     while ((v = p.match) == null) {
460     if (spins > 0) {
461     h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
462     if (h == 0)
463     h = SPINS | (int)t.getId();
464     else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
465     Thread.yield();
466 jsr166 1.53 }
467 dl 1.55 else if (slot != p)
468     spins = SPINS;
469     else if (!t.isInterrupted() && arena == null &&
470     (!timed || (ns = end - System.nanoTime()) > 0L)) {
471     p.parked = t;
472 dl 1.75 if (slot == p) {
473     if (ns == 0L)
474     LockSupport.park(this);
475     else
476     LockSupport.parkNanos(this, ns);
477     }
478 dl 1.55 p.parked = null;
479 dl 1.37 }
480 dl 1.75 else if (SLOT.compareAndSet(this, p, null)) {
481 dl 1.55 v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
482     break;
483 dl 1.16 }
484     }
485 dl 1.75 MATCH.setRelease(p, null);
486 dl 1.55 p.item = null;
487     p.hash = h;
488     return v;
489 dl 1.37 }
490    
491     /**
492     * Creates a new Exchanger.
493     */
494     public Exchanger() {
495 dl 1.55 participant = new Participant();
496 tim 1.1 }
497    
498     /**
499     * Waits for another thread to arrive at this exchange point (unless
500 jsr166 1.44 * the current thread is {@linkplain Thread#interrupt interrupted}),
501 tim 1.1 * and then transfers the given object to it, receiving its object
502     * in return.
503 jsr166 1.17 *
504 tim 1.1 * <p>If another thread is already waiting at the exchange point then
505     * it is resumed for thread scheduling purposes and receives the object
506 jsr166 1.39 * passed in by the current thread. The current thread returns immediately,
507 tim 1.1 * receiving the object passed to the exchange by that other thread.
508 jsr166 1.17 *
509 jsr166 1.15 * <p>If no other thread is already waiting at the exchange then the
510 tim 1.1 * current thread is disabled for thread scheduling purposes and lies
511     * dormant until one of two things happens:
512     * <ul>
513     * <li>Some other thread enters the exchange; or
514 jsr166 1.45 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
515     * the current thread.
516 tim 1.1 * </ul>
517     * <p>If the current thread:
518     * <ul>
519 jsr166 1.15 * <li>has its interrupted status set on entry to this method; or
520 jsr166 1.44 * <li>is {@linkplain Thread#interrupt interrupted} while waiting
521 jsr166 1.15 * for the exchange,
522 tim 1.1 * </ul>
523 jsr166 1.15 * then {@link InterruptedException} is thrown and the current thread's
524     * interrupted status is cleared.
525 tim 1.1 *
526     * @param x the object to exchange
527 dl 1.30 * @return the object provided by the other thread
528     * @throws InterruptedException if the current thread was
529     * interrupted while waiting
530 jsr166 1.15 */
531 jsr166 1.52 @SuppressWarnings("unchecked")
532 tim 1.1 public V exchange(V x) throws InterruptedException {
533 dl 1.55 Object v;
534 dl 1.75 Node[] a;
535 jsr166 1.56 Object item = (x == null) ? NULL_ITEM : x; // translate null args
536 dl 1.75 if (((a = arena) != null ||
537 dl 1.55 (v = slotExchange(item, false, 0L)) == null) &&
538     ((Thread.interrupted() || // disambiguates null return
539     (v = arenaExchange(item, false, 0L)) == null)))
540     throw new InterruptedException();
541 jsr166 1.56 return (v == NULL_ITEM) ? null : (V)v;
542 tim 1.1 }
543    
544     /**
545     * Waits for another thread to arrive at this exchange point (unless
546 jsr166 1.44 * the current thread is {@linkplain Thread#interrupt interrupted} or
547 jsr166 1.31 * the specified waiting time elapses), and then transfers the given
548     * object to it, receiving its object in return.
549 tim 1.1 *
550     * <p>If another thread is already waiting at the exchange point then
551     * it is resumed for thread scheduling purposes and receives the object
552 jsr166 1.39 * passed in by the current thread. The current thread returns immediately,
553 tim 1.1 * receiving the object passed to the exchange by that other thread.
554     *
555 jsr166 1.15 * <p>If no other thread is already waiting at the exchange then the
556 tim 1.1 * current thread is disabled for thread scheduling purposes and lies
557     * dormant until one of three things happens:
558     * <ul>
559     * <li>Some other thread enters the exchange; or
560 jsr166 1.44 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
561     * the current thread; or
562 tim 1.1 * <li>The specified waiting time elapses.
563     * </ul>
564     * <p>If the current thread:
565     * <ul>
566 jsr166 1.15 * <li>has its interrupted status set on entry to this method; or
567 jsr166 1.44 * <li>is {@linkplain Thread#interrupt interrupted} while waiting
568 jsr166 1.15 * for the exchange,
569 tim 1.1 * </ul>
570 jsr166 1.15 * then {@link InterruptedException} is thrown and the current thread's
571     * interrupted status is cleared.
572 tim 1.1 *
573 dl 1.37 * <p>If the specified waiting time elapses then {@link
574     * TimeoutException} is thrown. If the time is less than or equal
575     * to zero, the method will not wait at all.
576 tim 1.1 *
577     * @param x the object to exchange
578     * @param timeout the maximum time to wait
579 jsr166 1.63 * @param unit the time unit of the {@code timeout} argument
580 dl 1.30 * @return the object provided by the other thread
581     * @throws InterruptedException if the current thread was
582     * interrupted while waiting
583     * @throws TimeoutException if the specified waiting time elapses
584     * before another thread enters the exchange
585 jsr166 1.15 */
586 jsr166 1.52 @SuppressWarnings("unchecked")
587 jsr166 1.15 public V exchange(V x, long timeout, TimeUnit unit)
588 tim 1.1 throws InterruptedException, TimeoutException {
589 dl 1.55 Object v;
590 jsr166 1.56 Object item = (x == null) ? NULL_ITEM : x;
591 dl 1.55 long ns = unit.toNanos(timeout);
592     if ((arena != null ||
593     (v = slotExchange(item, true, ns)) == null) &&
594     ((Thread.interrupted() ||
595     (v = arenaExchange(item, true, ns)) == null)))
596     throw new InterruptedException();
597     if (v == TIMED_OUT)
598     throw new TimeoutException();
599 jsr166 1.56 return (v == NULL_ITEM) ? null : (V)v;
600 dl 1.55 }
601    
602 dl 1.75 // VarHandle mechanics
603     private static final VarHandle BOUND;
604     private static final VarHandle SLOT;
605     private static final VarHandle MATCH;
606     private static final VarHandle AA;
607 dl 1.55 static {
608     try {
609 dl 1.75 MethodHandles.Lookup l = MethodHandles.lookup();
610     BOUND = l.findVarHandle(Exchanger.class, "bound", int.class);
611     SLOT = l.findVarHandle(Exchanger.class, "slot", Node.class);
612     MATCH = l.findVarHandle(Node.class, "match", Object.class);
613     AA = MethodHandles.arrayElementVarHandle(Node[].class);
614 jsr166 1.68 } catch (ReflectiveOperationException e) {
615 jsr166 1.82 throw new ExceptionInInitializerError(e);
616 dl 1.34 }
617     }
618 dl 1.55
619 tim 1.1 }