ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
Revision: 1.65
Committed: Sun May 25 02:33:45 2014 UTC (10 years ago) by jsr166
Branch: MAIN
Changes since 1.64: +1 -1 lines
Log Message:
time to start using diamond <>

File Contents

# User Rev Content
1 dl 1.2 /*
2 dl 1.16 * Written by Doug Lea, Bill Scherer, and Michael Scott with
3     * assistance from members of JCP JSR-166 Expert Group and released to
4     * the public domain, as explained at
5 jsr166 1.48 * http://creativecommons.org/publicdomain/zero/1.0/
6 dl 1.2 */
7    
8 tim 1.1 package java.util.concurrent;
9 jsr166 1.54 import java.util.concurrent.atomic.AtomicInteger;
10     import java.util.concurrent.atomic.AtomicReference;
11 dl 1.38 import java.util.concurrent.locks.LockSupport;
12 tim 1.1
13     /**
14 dl 1.28 * A synchronization point at which threads can pair and swap elements
15 jsr166 1.39 * within pairs. Each thread presents some object on entry to the
16 dl 1.28 * {@link #exchange exchange} method, matches with a partner thread,
17 jsr166 1.39 * and receives its partner's object on return. An Exchanger may be
18     * viewed as a bidirectional form of a {@link SynchronousQueue}.
19     * Exchangers may be useful in applications such as genetic algorithms
20     * and pipeline designs.
21 tim 1.1 *
22     * <p><b>Sample Usage:</b>
23 jsr166 1.29 * Here are the highlights of a class that uses an {@code Exchanger}
24     * to swap buffers between threads so that the thread filling the
25     * buffer gets a freshly emptied one when it needs it, handing off the
26     * filled one to the thread emptying the buffer.
27 jsr166 1.50 * <pre> {@code
28 tim 1.1 * class FillAndEmpty {
29 jsr166 1.65 * Exchanger<DataBuffer> exchanger = new Exchanger<>();
30 dl 1.9 * DataBuffer initialEmptyBuffer = ... a made-up type
31     * DataBuffer initialFullBuffer = ...
32 tim 1.1 *
33     * class FillingLoop implements Runnable {
34     * public void run() {
35 dl 1.9 * DataBuffer currentBuffer = initialEmptyBuffer;
36 tim 1.1 * try {
37     * while (currentBuffer != null) {
38     * addToBuffer(currentBuffer);
39 dl 1.30 * if (currentBuffer.isFull())
40 tim 1.1 * currentBuffer = exchanger.exchange(currentBuffer);
41     * }
42 tim 1.7 * } catch (InterruptedException ex) { ... handle ... }
43 tim 1.1 * }
44     * }
45     *
46     * class EmptyingLoop implements Runnable {
47     * public void run() {
48 dl 1.9 * DataBuffer currentBuffer = initialFullBuffer;
49 tim 1.1 * try {
50     * while (currentBuffer != null) {
51     * takeFromBuffer(currentBuffer);
52 dl 1.30 * if (currentBuffer.isEmpty())
53 tim 1.1 * currentBuffer = exchanger.exchange(currentBuffer);
54     * }
55 tim 1.7 * } catch (InterruptedException ex) { ... handle ...}
56 tim 1.1 * }
57     * }
58     *
59     * void start() {
60     * new Thread(new FillingLoop()).start();
61     * new Thread(new EmptyingLoop()).start();
62     * }
63 jsr166 1.50 * }}</pre>
64 tim 1.1 *
65 jsr166 1.27 * <p>Memory consistency effects: For each pair of threads that
66     * successfully exchange objects via an {@code Exchanger}, actions
67     * prior to the {@code exchange()} in each thread
68     * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
69     * those subsequent to a return from the corresponding {@code exchange()}
70     * in the other thread.
71 brian 1.22 *
72 tim 1.1 * @since 1.5
73 dl 1.16 * @author Doug Lea and Bill Scherer and Michael Scott
74 dl 1.11 * @param <V> The type of objects that may be exchanged
75 tim 1.1 */
76     public class Exchanger<V> {
77 dl 1.55
78 dl 1.16 /*
79 jsr166 1.57 * Overview: The core algorithm is, for an exchange "slot",
80 dl 1.55 * and a participant (caller) with an item:
81 dl 1.16 *
82 jsr166 1.61 * for (;;) {
83     * if (slot is empty) { // offer
84     * place item in a Node;
85     * if (can CAS slot from empty to node) {
86 jsr166 1.62 * wait for release;
87     * return matching item in node;
88 dl 1.55 * }
89 jsr166 1.61 * }
90     * else if (can CAS slot from node to empty) { // release
91     * get the item in node;
92     * set matching item in node;
93     * release waiting thread;
94     * }
95     * // else retry on CAS failure
96     * }
97 dl 1.55 *
98     * This is among the simplest forms of a "dual data structure" --
99     * see Scott and Scherer's DISC 04 paper and
100     * http://www.cs.rochester.edu/research/synchronization/pseudocode/duals.html
101     *
102     * This works great in principle. But in practice, like many
103     * algorithms centered on atomic updates to a single location, it
104     * scales horribly when there are more than a few participants
105     * using the same Exchanger. So the implementation instead uses a
106     * form of elimination arena, that spreads out this contention by
107     * arranging that some threads typically use different slots,
108     * while still ensuring that eventually, any two parties will be
109     * able to exchange items. That is, we cannot completely partition
110     * across threads, but instead give threads arena indices that
111     * will on average grow under contention and shrink under lack of
112     * contention. We approach this by defining the Nodes that we need
113     * anyway as ThreadLocals, and include in them per-thread index
114     * and related bookkeeping state. (We can safely reuse per-thread
115     * nodes rather than creating them fresh each time because slots
116     * alternate between pointing to a node vs null, so cannot
117 jsr166 1.57 * encounter ABA problems. However, we do need some care in
118 dl 1.55 * resetting them between uses.)
119     *
120     * Implementing an effective arena requires allocating a bunch of
121     * space, so we only do so upon detecting contention (except on
122     * uniprocessors, where they wouldn't help, so aren't used).
123     * Otherwise, exchanges use the single-slot slotExchange method.
124     * On contention, not only must the slots be in different
125     * locations, but the locations must not encounter memory
126     * contention due to being on the same cache line (or more
127     * generally, the same coherence unit). Because, as of this
128     * writing, there is no way to determine cacheline size, we define
129     * a value that is enough for common platforms. Additionally,
130     * extra care elsewhere is taken to avoid other false/unintended
131 dl 1.64 * sharing and to enhance locality, including adding padding (via
132     * sun.misc.Contended) to Nodes, embedding "bound" as an Exchanger
133     * field, and reworking some park/unpark mechanics compared to
134     * LockSupport versions.
135 dl 1.55 *
136     * The arena starts out with only one used slot. We expand the
137     * effective arena size by tracking collisions; i.e., failed CASes
138     * while trying to exchange. By nature of the above algorithm, the
139     * only kinds of collision that reliably indicate contention are
140     * when two attempted releases collide -- one of two attempted
141     * offers can legitimately fail to CAS without indicating
142     * contention by more than one other thread. (Note: it is possible
143     * but not worthwhile to more precisely detect contention by
144     * reading slot values after CAS failures.) When a thread has
145     * collided at each slot within the current arena bound, it tries
146     * to expand the arena size by one. We track collisions within
147     * bounds by using a version (sequence) number on the "bound"
148     * field, and conservatively reset collision counts when a
149     * participant notices that bound has been updated (in either
150     * direction).
151     *
152     * The effective arena size is reduced (when there is more than
153     * one slot) by giving up on waiting after a while and trying to
154     * decrement the arena size on expiration. The value of "a while"
155     * is an empirical matter. We implement by piggybacking on the
156     * use of spin->yield->block that is essential for reasonable
157     * waiting performance anyway -- in a busy exchanger, offers are
158     * usually almost immediately released, in which case context
159     * switching on multiprocessors is extremely slow/wasteful. Arena
160     * waits just omit the blocking part, and instead cancel. The spin
161     * count is empirically chosen to be a value that avoids blocking
162     * 99% of the time under maximum sustained exchange rates on a
163     * range of test machines. Spins and yields entail some limited
164     * randomness (using a cheap xorshift) to avoid regular patterns
165     * that can induce unproductive grow/shrink cycles. (Using a
166     * pseudorandom also helps regularize spin cycle duration by
167     * making branches unpredictable.) Also, during an offer, a
168     * waiter can "know" that it will be released when its slot has
169     * changed, but cannot yet proceed until match is set. In the
170     * mean time it cannot cancel the offer, so instead spins/yields.
171     * Note: It is possible to avoid this secondary check by changing
172     * the linearization point to be a CAS of the match field (as done
173     * in one case in the Scott & Scherer DISC paper), which also
174     * increases asynchrony a bit, at the expense of poorer collision
175     * detection and inability to always reuse per-thread nodes. So
176     * the current scheme is typically a better tradeoff.
177     *
178     * On collisions, indices traverse the arena cyclically in reverse
179     * order, restarting at the maximum index (which will tend to be
180     * sparsest) when bounds change. (On expirations, indices instead
181     * are halved until reaching 0.) It is possible (and has been
182     * tried) to use randomized, prime-value-stepped, or double-hash
183     * style traversal instead of simple cyclic traversal to reduce
184     * bunching. But empirically, whatever benefits these may have
185     * don't overcome their added overhead: We are managing operations
186     * that occur very quickly unless there is sustained contention,
187     * so simpler/faster control policies work better than more
188     * accurate but slower ones.
189     *
190     * Because we use expiration for arena size control, we cannot
191     * throw TimeoutExceptions in the timed version of the public
192     * exchange method until the arena size has shrunken to zero (or
193     * the arena isn't enabled). This may delay response to timeout
194     * but is still within spec.
195     *
196     * Essentially all of the implementation is in methods
197     * slotExchange and arenaExchange. These have similar overall
198     * structure, but differ in too many details to combine. The
199     * slotExchange method uses the single Exchanger field "slot"
200     * rather than arena array elements. However, it still needs
201     * minimal collision detection to trigger arena construction.
202     * (The messiest part is making sure interrupt status and
203     * InterruptedExceptions come out right during transitions when
204     * both methods may be called. This is done by using null return
205     * as a sentinel to recheck interrupt status.)
206     *
207 jsr166 1.57 * As is too common in this sort of code, methods are monolithic
208 dl 1.55 * because most of the logic relies on reads of fields that are
209     * maintained as local variables so can't be nicely factored --
210     * mainly, here, bulky spin->yield->block/cancel code), and
211     * heavily dependent on intrinsics (Unsafe) to use inlined
212 jsr166 1.57 * embedded CAS and related memory access operations (that tend
213 dl 1.55 * not to be as readily inlined by dynamic compilers when they are
214     * hidden behind other methods that would more nicely name and
215     * encapsulate the intended effects). This includes the use of
216     * putOrderedX to clear fields of the per-thread Nodes between
217     * uses. Note that field Node.item is not declared as volatile
218     * even though it is read by releasing threads, because they only
219 jsr166 1.57 * do so after CAS operations that must precede access, and all
220 dl 1.55 * uses by the owning thread are otherwise acceptably ordered by
221     * other operations. (Because the actual points of atomicity are
222     * slot CASes, it would also be legal for the write to Node.match
223     * in a release to be weaker than a full volatile write. However,
224     * this is not done because it could allow further postponement of
225     * the write, delaying progress.)
226     */
227    
228     /**
229     * The byte distance (as a shift value) between any two used slots
230     * in the arena. 1 << ASHIFT should be at least cacheline size.
231     */
232     private static final int ASHIFT = 7;
233    
234     /**
235     * The maximum supported arena index. The maximum allocatable
236     * arena size is MMASK + 1. Must be a power of two minus one, less
237     * than (1<<(31-ASHIFT)). The cap of 255 (0xff) more than suffices
238     * for the expected scaling limits of the main algorithms.
239 dl 1.16 */
240 jsr166 1.59 private static final int MMASK = 0xff;
241 dl 1.55
242     /**
243     * Unit for sequence/version bits of bound field. Each successful
244     * change to the bound also adds SEQ.
245     */
246 jsr166 1.59 private static final int SEQ = MMASK + 1;
247 dl 1.2
248 dl 1.32 /** The number of CPUs, for sizing and spin control */
249 dl 1.37 private static final int NCPU = Runtime.getRuntime().availableProcessors();
250 dl 1.32
251 jsr166 1.17 /**
252 dl 1.55 * The maximum slot index of the arena: The number of slots that
253     * can in principle hold all threads without contention, or at
254     * most the maximum indexable value.
255 dl 1.37 */
256 dl 1.55 static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
257 dl 1.37
258     /**
259 dl 1.55 * The bound for spins while waiting for a match. The actual
260     * number of iterations will on average be about twice this value
261     * due to randomization. Note: Spinning is disabled when NCPU==1.
262 dl 1.37 */
263 dl 1.55 private static final int SPINS = 1 << 10;
264 dl 1.37
265     /**
266 dl 1.55 * Value representing null arguments/returns from public
267     * methods. Needed because the API originally didn't disallow null
268     * arguments, which it should have.
269 dl 1.16 */
270 dl 1.55 private static final Object NULL_ITEM = new Object();
271 dl 1.34
272     /**
273 dl 1.55 * Sentinel value returned by internal exchange methods upon
274     * timeout, to avoid need for separate timed versions of these
275     * methods.
276 dl 1.34 */
277 dl 1.55 private static final Object TIMED_OUT = new Object();
278 dl 1.34
279     /**
280 dl 1.55 * Nodes hold partially exchanged data, plus other per-thread
281 dl 1.64 * bookkeeping. Padded via @sun.misc.Contended to reduce memory
282     * contention.
283 dl 1.34 */
284 dl 1.64 @sun.misc.Contended static final class Node {
285 dl 1.55 int index; // Arena index
286     int bound; // Last recorded value of Exchanger.bound
287     int collides; // Number of CAS failures at current bound
288     int hash; // Pseudo-random for spins
289     Object item; // This thread's current item
290     volatile Object match; // Item provided by releasing thread
291     volatile Thread parked; // Set to this thread when parked, else null
292     }
293    
294     /** The corresponding thread local class */
295     static final class Participant extends ThreadLocal<Node> {
296     public Node initialValue() { return new Node(); }
297     }
298 dl 1.32
299     /**
300 dl 1.55 * Per-thread state
301 dl 1.32 */
302 dl 1.55 private final Participant participant;
303 dl 1.32
304     /**
305 dl 1.55 * Elimination array; null until enabled (within slotExchange).
306     * Element accesses use emulation of volatile gets and CAS.
307     */
308     private volatile Node[] arena;
309 dl 1.5
310 dl 1.34 /**
311 dl 1.55 * Slot used until contention detected.
312 dl 1.34 */
313 dl 1.55 private volatile Node slot;
314 dl 1.5
315 dl 1.16 /**
316 dl 1.55 * The index of the largest valid arena position, OR'ed with SEQ
317     * number in high bits, incremented on each update. The initial
318     * update from 0 to SEQ is used to ensure that the arena array is
319     * constructed only once.
320 dl 1.16 */
321 dl 1.55 private volatile int bound;
322 dl 1.2
323 dl 1.16 /**
324 dl 1.55 * Exchange function when arenas enabled. See above for explanation.
325 dl 1.30 *
326 jsr166 1.60 * @param item the (non-null) item to exchange
327 dl 1.30 * @param timed true if the wait is timed
328 jsr166 1.58 * @param ns if timed, the maximum wait time, else 0L
329 dl 1.55 * @return the other thread's item; or null if interrupted; or
330     * TIMED_OUT if timed and timed out
331     */
332     private final Object arenaExchange(Object item, boolean timed, long ns) {
333     Node[] a = arena;
334     Node p = participant.get();
335     for (int i = p.index;;) { // access slot at i
336     int b, m, c; long j; // j is raw array offset
337     Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
338     if (q != null && U.compareAndSwapObject(a, j, q, null)) {
339     Object v = q.item; // release
340     q.match = item;
341     Thread w = q.parked;
342     if (w != null)
343     U.unpark(w);
344     return v;
345 dl 1.37 }
346 dl 1.55 else if (i <= (m = (b = bound) & MMASK) && q == null) {
347     p.item = item; // offer
348     if (U.compareAndSwapObject(a, j, null, p)) {
349 jsr166 1.56 long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
350 dl 1.55 Thread t = Thread.currentThread(); // wait
351     for (int h = p.hash, spins = SPINS;;) {
352     Object v = p.match;
353     if (v != null) {
354     U.putOrderedObject(p, MATCH, null);
355     p.item = null; // clear for next use
356     p.hash = h;
357     return v;
358     }
359     else if (spins > 0) {
360     h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
361     if (h == 0) // initialize hash
362     h = SPINS | (int)t.getId();
363     else if (h < 0 && // approx 50% true
364     (--spins & ((SPINS >>> 1) - 1)) == 0)
365     Thread.yield(); // two yields per wait
366     }
367     else if (U.getObjectVolatile(a, j) != p)
368     spins = SPINS; // releaser hasn't set match yet
369     else if (!t.isInterrupted() && m == 0 &&
370     (!timed ||
371     (ns = end - System.nanoTime()) > 0L)) {
372     U.putObject(t, BLOCKER, this); // emulate LockSupport
373     p.parked = t; // minimize window
374     if (U.getObjectVolatile(a, j) == p)
375     U.park(false, ns);
376     p.parked = null;
377     U.putObject(t, BLOCKER, null);
378     }
379     else if (U.getObjectVolatile(a, j) == p &&
380     U.compareAndSwapObject(a, j, p, null)) {
381     if (m != 0) // try to shrink
382     U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
383     p.item = null;
384     p.hash = h;
385     i = p.index >>>= 1; // descend
386     if (Thread.interrupted())
387     return null;
388     if (timed && m == 0 && ns <= 0L)
389     return TIMED_OUT;
390     break; // expired; restart
391     }
392     }
393     }
394     else
395     p.item = null; // clear offer
396 dl 1.2 }
397 dl 1.55 else {
398     if (p.bound != b) { // stale; reset
399     p.bound = b;
400     p.collides = 0;
401     i = (i != m || m == 0) ? m : m - 1;
402     }
403     else if ((c = p.collides) < m || m == FULL ||
404     !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
405     p.collides = c + 1;
406     i = (i == 0) ? m : i - 1; // cyclically traverse
407     }
408     else
409     i = m + 1; // grow
410     p.index = i;
411 dl 1.34 }
412 dl 1.37 }
413     }
414 dl 1.2
415 dl 1.37 /**
416 dl 1.55 * Exchange function used until arenas enabled. See above for explanation.
417     *
418     * @param item the item to exchange
419     * @param timed true if the wait is timed
420 jsr166 1.58 * @param ns if timed, the maximum wait time, else 0L
421 dl 1.55 * @return the other thread's item; or null if either the arena
422     * was enabled or the thread was interrupted before completion; or
423     * TIMED_OUT if timed and timed out
424     */
425     private final Object slotExchange(Object item, boolean timed, long ns) {
426     Node p = participant.get();
427     Thread t = Thread.currentThread();
428     if (t.isInterrupted()) // preserve interrupt status so caller can recheck
429     return null;
430    
431     for (Node q;;) {
432     if ((q = slot) != null) {
433     if (U.compareAndSwapObject(this, SLOT, q, null)) {
434     Object v = q.item;
435     q.match = item;
436     Thread w = q.parked;
437     if (w != null)
438     U.unpark(w);
439     return v;
440     }
441     // create arena on contention, but continue until slot null
442     if (NCPU > 1 && bound == 0 &&
443     U.compareAndSwapInt(this, BOUND, 0, SEQ))
444     arena = new Node[(FULL + 2) << ASHIFT];
445     }
446     else if (arena != null)
447     return null; // caller must reroute to arenaExchange
448     else {
449     p.item = item;
450     if (U.compareAndSwapObject(this, SLOT, null, p))
451     break;
452     p.item = null;
453     }
454 dl 1.16 }
455    
456 dl 1.55 // await release
457     int h = p.hash;
458 jsr166 1.56 long end = timed ? System.nanoTime() + ns : 0L;
459 dl 1.55 int spins = (NCPU > 1) ? SPINS : 1;
460     Object v;
461     while ((v = p.match) == null) {
462     if (spins > 0) {
463     h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
464     if (h == 0)
465     h = SPINS | (int)t.getId();
466     else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
467     Thread.yield();
468 jsr166 1.53 }
469 dl 1.55 else if (slot != p)
470     spins = SPINS;
471     else if (!t.isInterrupted() && arena == null &&
472     (!timed || (ns = end - System.nanoTime()) > 0L)) {
473     U.putObject(t, BLOCKER, this);
474     p.parked = t;
475     if (slot == p)
476     U.park(false, ns);
477     p.parked = null;
478     U.putObject(t, BLOCKER, null);
479 dl 1.37 }
480 dl 1.55 else if (U.compareAndSwapObject(this, SLOT, p, null)) {
481     v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
482     break;
483 dl 1.16 }
484     }
485 dl 1.55 U.putOrderedObject(p, MATCH, null);
486     p.item = null;
487     p.hash = h;
488     return v;
489 dl 1.37 }
490    
491     /**
492     * Creates a new Exchanger.
493     */
494     public Exchanger() {
495 dl 1.55 participant = new Participant();
496 tim 1.1 }
497    
498     /**
499     * Waits for another thread to arrive at this exchange point (unless
500 jsr166 1.44 * the current thread is {@linkplain Thread#interrupt interrupted}),
501 tim 1.1 * and then transfers the given object to it, receiving its object
502     * in return.
503 jsr166 1.17 *
504 tim 1.1 * <p>If another thread is already waiting at the exchange point then
505     * it is resumed for thread scheduling purposes and receives the object
506 jsr166 1.39 * passed in by the current thread. The current thread returns immediately,
507 tim 1.1 * receiving the object passed to the exchange by that other thread.
508 jsr166 1.17 *
509 jsr166 1.15 * <p>If no other thread is already waiting at the exchange then the
510 tim 1.1 * current thread is disabled for thread scheduling purposes and lies
511     * dormant until one of two things happens:
512     * <ul>
513     * <li>Some other thread enters the exchange; or
514 jsr166 1.45 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
515     * the current thread.
516 tim 1.1 * </ul>
517     * <p>If the current thread:
518     * <ul>
519 jsr166 1.15 * <li>has its interrupted status set on entry to this method; or
520 jsr166 1.44 * <li>is {@linkplain Thread#interrupt interrupted} while waiting
521 jsr166 1.15 * for the exchange,
522 tim 1.1 * </ul>
523 jsr166 1.15 * then {@link InterruptedException} is thrown and the current thread's
524     * interrupted status is cleared.
525 tim 1.1 *
526     * @param x the object to exchange
527 dl 1.30 * @return the object provided by the other thread
528     * @throws InterruptedException if the current thread was
529     * interrupted while waiting
530 jsr166 1.15 */
531 jsr166 1.52 @SuppressWarnings("unchecked")
532 tim 1.1 public V exchange(V x) throws InterruptedException {
533 dl 1.55 Object v;
534 jsr166 1.56 Object item = (x == null) ? NULL_ITEM : x; // translate null args
535 dl 1.55 if ((arena != null ||
536     (v = slotExchange(item, false, 0L)) == null) &&
537     ((Thread.interrupted() || // disambiguates null return
538     (v = arenaExchange(item, false, 0L)) == null)))
539     throw new InterruptedException();
540 jsr166 1.56 return (v == NULL_ITEM) ? null : (V)v;
541 tim 1.1 }
542    
543     /**
544     * Waits for another thread to arrive at this exchange point (unless
545 jsr166 1.44 * the current thread is {@linkplain Thread#interrupt interrupted} or
546 jsr166 1.31 * the specified waiting time elapses), and then transfers the given
547     * object to it, receiving its object in return.
548 tim 1.1 *
549     * <p>If another thread is already waiting at the exchange point then
550     * it is resumed for thread scheduling purposes and receives the object
551 jsr166 1.39 * passed in by the current thread. The current thread returns immediately,
552 tim 1.1 * receiving the object passed to the exchange by that other thread.
553     *
554 jsr166 1.15 * <p>If no other thread is already waiting at the exchange then the
555 tim 1.1 * current thread is disabled for thread scheduling purposes and lies
556     * dormant until one of three things happens:
557     * <ul>
558     * <li>Some other thread enters the exchange; or
559 jsr166 1.44 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
560     * the current thread; or
561 tim 1.1 * <li>The specified waiting time elapses.
562     * </ul>
563     * <p>If the current thread:
564     * <ul>
565 jsr166 1.15 * <li>has its interrupted status set on entry to this method; or
566 jsr166 1.44 * <li>is {@linkplain Thread#interrupt interrupted} while waiting
567 jsr166 1.15 * for the exchange,
568 tim 1.1 * </ul>
569 jsr166 1.15 * then {@link InterruptedException} is thrown and the current thread's
570     * interrupted status is cleared.
571 tim 1.1 *
572 dl 1.37 * <p>If the specified waiting time elapses then {@link
573     * TimeoutException} is thrown. If the time is less than or equal
574     * to zero, the method will not wait at all.
575 tim 1.1 *
576     * @param x the object to exchange
577     * @param timeout the maximum time to wait
578 jsr166 1.63 * @param unit the time unit of the {@code timeout} argument
579 dl 1.30 * @return the object provided by the other thread
580     * @throws InterruptedException if the current thread was
581     * interrupted while waiting
582     * @throws TimeoutException if the specified waiting time elapses
583     * before another thread enters the exchange
584 jsr166 1.15 */
585 jsr166 1.52 @SuppressWarnings("unchecked")
586 jsr166 1.15 public V exchange(V x, long timeout, TimeUnit unit)
587 tim 1.1 throws InterruptedException, TimeoutException {
588 dl 1.55 Object v;
589 jsr166 1.56 Object item = (x == null) ? NULL_ITEM : x;
590 dl 1.55 long ns = unit.toNanos(timeout);
591     if ((arena != null ||
592     (v = slotExchange(item, true, ns)) == null) &&
593     ((Thread.interrupted() ||
594     (v = arenaExchange(item, true, ns)) == null)))
595     throw new InterruptedException();
596     if (v == TIMED_OUT)
597     throw new TimeoutException();
598 jsr166 1.56 return (v == NULL_ITEM) ? null : (V)v;
599 dl 1.55 }
600    
601     // Unsafe mechanics
602     private static final sun.misc.Unsafe U;
603     private static final long BOUND;
604     private static final long SLOT;
605     private static final long MATCH;
606     private static final long BLOCKER;
607     private static final int ABASE;
608     static {
609     int s;
610     try {
611     U = sun.misc.Unsafe.getUnsafe();
612     Class<?> ek = Exchanger.class;
613     Class<?> nk = Node.class;
614     Class<?> ak = Node[].class;
615     Class<?> tk = Thread.class;
616     BOUND = U.objectFieldOffset
617     (ek.getDeclaredField("bound"));
618     SLOT = U.objectFieldOffset
619     (ek.getDeclaredField("slot"));
620     MATCH = U.objectFieldOffset
621     (nk.getDeclaredField("match"));
622     BLOCKER = U.objectFieldOffset
623     (tk.getDeclaredField("parkBlocker"));
624     s = U.arrayIndexScale(ak);
625     // ABASE absorbs padding in front of element 0
626     ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);
627    
628     } catch (Exception e) {
629     throw new Error(e);
630 dl 1.34 }
631 dl 1.55 if ((s & (s-1)) != 0 || s > (1 << ASHIFT))
632     throw new Error("Unsupported array scale");
633 dl 1.34 }
634 dl 1.55
635 tim 1.1 }