ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
Revision: 1.71
Committed: Tue Feb 17 18:55:39 2015 UTC (9 years, 3 months ago) by jsr166
Branch: MAIN
Changes since 1.70: +1 -1 lines
Log Message:
standardize code sample idiom: * <pre> {@code

File Contents

# Content
1 /*
2 * Written by Doug Lea, Bill Scherer, and Michael Scott with
3 * assistance from members of JCP JSR-166 Expert Group and released to
4 * the public domain, as explained at
5 * http://creativecommons.org/publicdomain/zero/1.0/
6 */
7
8 package java.util.concurrent;
9
10 /**
11 * A synchronization point at which threads can pair and swap elements
12 * within pairs. Each thread presents some object on entry to the
13 * {@link #exchange exchange} method, matches with a partner thread,
14 * and receives its partner's object on return. An Exchanger may be
15 * viewed as a bidirectional form of a {@link SynchronousQueue}.
16 * Exchangers may be useful in applications such as genetic algorithms
17 * and pipeline designs.
18 *
19 * <p><b>Sample Usage:</b>
20 * Here are the highlights of a class that uses an {@code Exchanger}
21 * to swap buffers between threads so that the thread filling the
22 * buffer gets a freshly emptied one when it needs it, handing off the
23 * filled one to the thread emptying the buffer.
24 * <pre> {@code
25 * class FillAndEmpty {
26 * Exchanger<DataBuffer> exchanger = new Exchanger<>();
27 * DataBuffer initialEmptyBuffer = ... a made-up type
28 * DataBuffer initialFullBuffer = ...
29 *
30 * class FillingLoop implements Runnable {
31 * public void run() {
32 * DataBuffer currentBuffer = initialEmptyBuffer;
33 * try {
34 * while (currentBuffer != null) {
35 * addToBuffer(currentBuffer);
36 * if (currentBuffer.isFull())
37 * currentBuffer = exchanger.exchange(currentBuffer);
38 * }
39 * } catch (InterruptedException ex) { ... handle ... }
40 * }
41 * }
42 *
43 * class EmptyingLoop implements Runnable {
44 * public void run() {
45 * DataBuffer currentBuffer = initialFullBuffer;
46 * try {
47 * while (currentBuffer != null) {
48 * takeFromBuffer(currentBuffer);
49 * if (currentBuffer.isEmpty())
50 * currentBuffer = exchanger.exchange(currentBuffer);
51 * }
52 * } catch (InterruptedException ex) { ... handle ...}
53 * }
54 * }
55 *
56 * void start() {
57 * new Thread(new FillingLoop()).start();
58 * new Thread(new EmptyingLoop()).start();
59 * }
60 * }}</pre>
61 *
62 * <p>Memory consistency effects: For each pair of threads that
63 * successfully exchange objects via an {@code Exchanger}, actions
64 * prior to the {@code exchange()} in each thread
65 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
66 * those subsequent to a return from the corresponding {@code exchange()}
67 * in the other thread.
68 *
69 * @since 1.5
70 * @author Doug Lea and Bill Scherer and Michael Scott
71 * @param <V> The type of objects that may be exchanged
72 */
73 public class Exchanger<V> {
74
75 /*
76 * Overview: The core algorithm is, for an exchange "slot",
77 * and a participant (caller) with an item:
78 *
79 * for (;;) {
80 * if (slot is empty) { // offer
81 * place item in a Node;
82 * if (can CAS slot from empty to node) {
83 * wait for release;
84 * return matching item in node;
85 * }
86 * }
87 * else if (can CAS slot from node to empty) { // release
88 * get the item in node;
89 * set matching item in node;
90 * release waiting thread;
91 * }
92 * // else retry on CAS failure
93 * }
94 *
95 * This is among the simplest forms of a "dual data structure" --
96 * see Scott and Scherer's DISC 04 paper and
97 * http://www.cs.rochester.edu/research/synchronization/pseudocode/duals.html
98 *
99 * This works great in principle. But in practice, like many
100 * algorithms centered on atomic updates to a single location, it
101 * scales horribly when there are more than a few participants
102 * using the same Exchanger. So the implementation instead uses a
103 * form of elimination arena, that spreads out this contention by
104 * arranging that some threads typically use different slots,
105 * while still ensuring that eventually, any two parties will be
106 * able to exchange items. That is, we cannot completely partition
107 * across threads, but instead give threads arena indices that
108 * will on average grow under contention and shrink under lack of
109 * contention. We approach this by defining the Nodes that we need
110 * anyway as ThreadLocals, and include in them per-thread index
111 * and related bookkeeping state. (We can safely reuse per-thread
112 * nodes rather than creating them fresh each time because slots
113 * alternate between pointing to a node vs null, so cannot
114 * encounter ABA problems. However, we do need some care in
115 * resetting them between uses.)
116 *
117 * Implementing an effective arena requires allocating a bunch of
118 * space, so we only do so upon detecting contention (except on
119 * uniprocessors, where they wouldn't help, so aren't used).
120 * Otherwise, exchanges use the single-slot slotExchange method.
121 * On contention, not only must the slots be in different
122 * locations, but the locations must not encounter memory
123 * contention due to being on the same cache line (or more
124 * generally, the same coherence unit). Because, as of this
125 * writing, there is no way to determine cacheline size, we define
126 * a value that is enough for common platforms. Additionally,
127 * extra care elsewhere is taken to avoid other false/unintended
128 * sharing and to enhance locality, including adding padding (via
129 * sun.misc.Contended) to Nodes, embedding "bound" as an Exchanger
130 * field, and reworking some park/unpark mechanics compared to
131 * LockSupport versions.
132 *
133 * The arena starts out with only one used slot. We expand the
134 * effective arena size by tracking collisions; i.e., failed CASes
135 * while trying to exchange. By nature of the above algorithm, the
136 * only kinds of collision that reliably indicate contention are
137 * when two attempted releases collide -- one of two attempted
138 * offers can legitimately fail to CAS without indicating
139 * contention by more than one other thread. (Note: it is possible
140 * but not worthwhile to more precisely detect contention by
141 * reading slot values after CAS failures.) When a thread has
142 * collided at each slot within the current arena bound, it tries
143 * to expand the arena size by one. We track collisions within
144 * bounds by using a version (sequence) number on the "bound"
145 * field, and conservatively reset collision counts when a
146 * participant notices that bound has been updated (in either
147 * direction).
148 *
149 * The effective arena size is reduced (when there is more than
150 * one slot) by giving up on waiting after a while and trying to
151 * decrement the arena size on expiration. The value of "a while"
152 * is an empirical matter. We implement by piggybacking on the
153 * use of spin->yield->block that is essential for reasonable
154 * waiting performance anyway -- in a busy exchanger, offers are
155 * usually almost immediately released, in which case context
156 * switching on multiprocessors is extremely slow/wasteful. Arena
157 * waits just omit the blocking part, and instead cancel. The spin
158 * count is empirically chosen to be a value that avoids blocking
159 * 99% of the time under maximum sustained exchange rates on a
160 * range of test machines. Spins and yields entail some limited
161 * randomness (using a cheap xorshift) to avoid regular patterns
162 * that can induce unproductive grow/shrink cycles. (Using a
163 * pseudorandom also helps regularize spin cycle duration by
164 * making branches unpredictable.) Also, during an offer, a
165 * waiter can "know" that it will be released when its slot has
166 * changed, but cannot yet proceed until match is set. In the
167 * mean time it cannot cancel the offer, so instead spins/yields.
168 * Note: It is possible to avoid this secondary check by changing
169 * the linearization point to be a CAS of the match field (as done
170 * in one case in the Scott & Scherer DISC paper), which also
171 * increases asynchrony a bit, at the expense of poorer collision
172 * detection and inability to always reuse per-thread nodes. So
173 * the current scheme is typically a better tradeoff.
174 *
175 * On collisions, indices traverse the arena cyclically in reverse
176 * order, restarting at the maximum index (which will tend to be
177 * sparsest) when bounds change. (On expirations, indices instead
178 * are halved until reaching 0.) It is possible (and has been
179 * tried) to use randomized, prime-value-stepped, or double-hash
180 * style traversal instead of simple cyclic traversal to reduce
181 * bunching. But empirically, whatever benefits these may have
182 * don't overcome their added overhead: We are managing operations
183 * that occur very quickly unless there is sustained contention,
184 * so simpler/faster control policies work better than more
185 * accurate but slower ones.
186 *
187 * Because we use expiration for arena size control, we cannot
188 * throw TimeoutExceptions in the timed version of the public
189 * exchange method until the arena size has shrunken to zero (or
190 * the arena isn't enabled). This may delay response to timeout
191 * but is still within spec.
192 *
193 * Essentially all of the implementation is in methods
194 * slotExchange and arenaExchange. These have similar overall
195 * structure, but differ in too many details to combine. The
196 * slotExchange method uses the single Exchanger field "slot"
197 * rather than arena array elements. However, it still needs
198 * minimal collision detection to trigger arena construction.
199 * (The messiest part is making sure interrupt status and
200 * InterruptedExceptions come out right during transitions when
201 * both methods may be called. This is done by using null return
202 * as a sentinel to recheck interrupt status.)
203 *
204 * As is too common in this sort of code, methods are monolithic
205 * because most of the logic relies on reads of fields that are
206 * maintained as local variables so can't be nicely factored --
207 * mainly, here, bulky spin->yield->block/cancel code), and
208 * heavily dependent on intrinsics (Unsafe) to use inlined
209 * embedded CAS and related memory access operations (that tend
210 * not to be as readily inlined by dynamic compilers when they are
211 * hidden behind other methods that would more nicely name and
212 * encapsulate the intended effects). This includes the use of
213 * putOrderedX to clear fields of the per-thread Nodes between
214 * uses. Note that field Node.item is not declared as volatile
215 * even though it is read by releasing threads, because they only
216 * do so after CAS operations that must precede access, and all
217 * uses by the owning thread are otherwise acceptably ordered by
218 * other operations. (Because the actual points of atomicity are
219 * slot CASes, it would also be legal for the write to Node.match
220 * in a release to be weaker than a full volatile write. However,
221 * this is not done because it could allow further postponement of
222 * the write, delaying progress.)
223 */
224
225 /**
226 * The byte distance (as a shift value) between any two used slots
227 * in the arena. 1 << ASHIFT should be at least cacheline size.
228 */
229 private static final int ASHIFT = 7;
230
231 /**
232 * The maximum supported arena index. The maximum allocatable
233 * arena size is MMASK + 1. Must be a power of two minus one, less
234 * than (1<<(31-ASHIFT)). The cap of 255 (0xff) more than suffices
235 * for the expected scaling limits of the main algorithms.
236 */
237 private static final int MMASK = 0xff;
238
239 /**
240 * Unit for sequence/version bits of bound field. Each successful
241 * change to the bound also adds SEQ.
242 */
243 private static final int SEQ = MMASK + 1;
244
245 /** The number of CPUs, for sizing and spin control */
246 private static final int NCPU = Runtime.getRuntime().availableProcessors();
247
248 /**
249 * The maximum slot index of the arena: The number of slots that
250 * can in principle hold all threads without contention, or at
251 * most the maximum indexable value.
252 */
253 static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
254
255 /**
256 * The bound for spins while waiting for a match. The actual
257 * number of iterations will on average be about twice this value
258 * due to randomization. Note: Spinning is disabled when NCPU==1.
259 */
260 private static final int SPINS = 1 << 10;
261
262 /**
263 * Value representing null arguments/returns from public
264 * methods. Needed because the API originally didn't disallow null
265 * arguments, which it should have.
266 */
267 private static final Object NULL_ITEM = new Object();
268
269 /**
270 * Sentinel value returned by internal exchange methods upon
271 * timeout, to avoid need for separate timed versions of these
272 * methods.
273 */
274 private static final Object TIMED_OUT = new Object();
275
276 /**
277 * Nodes hold partially exchanged data, plus other per-thread
278 * bookkeeping. Padded via @sun.misc.Contended to reduce memory
279 * contention.
280 */
281 @sun.misc.Contended static final class Node {
282 int index; // Arena index
283 int bound; // Last recorded value of Exchanger.bound
284 int collides; // Number of CAS failures at current bound
285 int hash; // Pseudo-random for spins
286 Object item; // This thread's current item
287 volatile Object match; // Item provided by releasing thread
288 volatile Thread parked; // Set to this thread when parked, else null
289 }
290
291 /** The corresponding thread local class */
292 static final class Participant extends ThreadLocal<Node> {
293 public Node initialValue() { return new Node(); }
294 }
295
296 /**
297 * Per-thread state
298 */
299 private final Participant participant;
300
301 /**
302 * Elimination array; null until enabled (within slotExchange).
303 * Element accesses use emulation of volatile gets and CAS.
304 */
305 private volatile Node[] arena;
306
307 /**
308 * Slot used until contention detected.
309 */
310 private volatile Node slot;
311
312 /**
313 * The index of the largest valid arena position, OR'ed with SEQ
314 * number in high bits, incremented on each update. The initial
315 * update from 0 to SEQ is used to ensure that the arena array is
316 * constructed only once.
317 */
318 private volatile int bound;
319
320 /**
321 * Exchange function when arenas enabled. See above for explanation.
322 *
323 * @param item the (non-null) item to exchange
324 * @param timed true if the wait is timed
325 * @param ns if timed, the maximum wait time, else 0L
326 * @return the other thread's item; or null if interrupted; or
327 * TIMED_OUT if timed and timed out
328 */
329 private final Object arenaExchange(Object item, boolean timed, long ns) {
330 Node[] a = arena;
331 Node p = participant.get();
332 for (int i = p.index;;) { // access slot at i
333 int b, m, c; long j; // j is raw array offset
334 Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
335 if (q != null && U.compareAndSwapObject(a, j, q, null)) {
336 Object v = q.item; // release
337 q.match = item;
338 Thread w = q.parked;
339 if (w != null)
340 U.unpark(w);
341 return v;
342 }
343 else if (i <= (m = (b = bound) & MMASK) && q == null) {
344 p.item = item; // offer
345 if (U.compareAndSwapObject(a, j, null, p)) {
346 long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
347 Thread t = Thread.currentThread(); // wait
348 for (int h = p.hash, spins = SPINS;;) {
349 Object v = p.match;
350 if (v != null) {
351 U.putOrderedObject(p, MATCH, null);
352 p.item = null; // clear for next use
353 p.hash = h;
354 return v;
355 }
356 else if (spins > 0) {
357 h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
358 if (h == 0) // initialize hash
359 h = SPINS | (int)t.getId();
360 else if (h < 0 && // approx 50% true
361 (--spins & ((SPINS >>> 1) - 1)) == 0)
362 Thread.yield(); // two yields per wait
363 }
364 else if (U.getObjectVolatile(a, j) != p)
365 spins = SPINS; // releaser hasn't set match yet
366 else if (!t.isInterrupted() && m == 0 &&
367 (!timed ||
368 (ns = end - System.nanoTime()) > 0L)) {
369 U.putObject(t, BLOCKER, this); // emulate LockSupport
370 p.parked = t; // minimize window
371 if (U.getObjectVolatile(a, j) == p)
372 U.park(false, ns);
373 p.parked = null;
374 U.putObject(t, BLOCKER, null);
375 }
376 else if (U.getObjectVolatile(a, j) == p &&
377 U.compareAndSwapObject(a, j, p, null)) {
378 if (m != 0) // try to shrink
379 U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
380 p.item = null;
381 p.hash = h;
382 i = p.index >>>= 1; // descend
383 if (Thread.interrupted())
384 return null;
385 if (timed && m == 0 && ns <= 0L)
386 return TIMED_OUT;
387 break; // expired; restart
388 }
389 }
390 }
391 else
392 p.item = null; // clear offer
393 }
394 else {
395 if (p.bound != b) { // stale; reset
396 p.bound = b;
397 p.collides = 0;
398 i = (i != m || m == 0) ? m : m - 1;
399 }
400 else if ((c = p.collides) < m || m == FULL ||
401 !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
402 p.collides = c + 1;
403 i = (i == 0) ? m : i - 1; // cyclically traverse
404 }
405 else
406 i = m + 1; // grow
407 p.index = i;
408 }
409 }
410 }
411
412 /**
413 * Exchange function used until arenas enabled. See above for explanation.
414 *
415 * @param item the item to exchange
416 * @param timed true if the wait is timed
417 * @param ns if timed, the maximum wait time, else 0L
418 * @return the other thread's item; or null if either the arena
419 * was enabled or the thread was interrupted before completion; or
420 * TIMED_OUT if timed and timed out
421 */
422 private final Object slotExchange(Object item, boolean timed, long ns) {
423 Node p = participant.get();
424 Thread t = Thread.currentThread();
425 if (t.isInterrupted()) // preserve interrupt status so caller can recheck
426 return null;
427
428 for (Node q;;) {
429 if ((q = slot) != null) {
430 if (U.compareAndSwapObject(this, SLOT, q, null)) {
431 Object v = q.item;
432 q.match = item;
433 Thread w = q.parked;
434 if (w != null)
435 U.unpark(w);
436 return v;
437 }
438 // create arena on contention, but continue until slot null
439 if (NCPU > 1 && bound == 0 &&
440 U.compareAndSwapInt(this, BOUND, 0, SEQ))
441 arena = new Node[(FULL + 2) << ASHIFT];
442 }
443 else if (arena != null)
444 return null; // caller must reroute to arenaExchange
445 else {
446 p.item = item;
447 if (U.compareAndSwapObject(this, SLOT, null, p))
448 break;
449 p.item = null;
450 }
451 }
452
453 // await release
454 int h = p.hash;
455 long end = timed ? System.nanoTime() + ns : 0L;
456 int spins = (NCPU > 1) ? SPINS : 1;
457 Object v;
458 while ((v = p.match) == null) {
459 if (spins > 0) {
460 h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
461 if (h == 0)
462 h = SPINS | (int)t.getId();
463 else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
464 Thread.yield();
465 }
466 else if (slot != p)
467 spins = SPINS;
468 else if (!t.isInterrupted() && arena == null &&
469 (!timed || (ns = end - System.nanoTime()) > 0L)) {
470 U.putObject(t, BLOCKER, this);
471 p.parked = t;
472 if (slot == p)
473 U.park(false, ns);
474 p.parked = null;
475 U.putObject(t, BLOCKER, null);
476 }
477 else if (U.compareAndSwapObject(this, SLOT, p, null)) {
478 v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
479 break;
480 }
481 }
482 U.putOrderedObject(p, MATCH, null);
483 p.item = null;
484 p.hash = h;
485 return v;
486 }
487
488 /**
489 * Creates a new Exchanger.
490 */
491 public Exchanger() {
492 participant = new Participant();
493 }
494
495 /**
496 * Waits for another thread to arrive at this exchange point (unless
497 * the current thread is {@linkplain Thread#interrupt interrupted}),
498 * and then transfers the given object to it, receiving its object
499 * in return.
500 *
501 * <p>If another thread is already waiting at the exchange point then
502 * it is resumed for thread scheduling purposes and receives the object
503 * passed in by the current thread. The current thread returns immediately,
504 * receiving the object passed to the exchange by that other thread.
505 *
506 * <p>If no other thread is already waiting at the exchange then the
507 * current thread is disabled for thread scheduling purposes and lies
508 * dormant until one of two things happens:
509 * <ul>
510 * <li>Some other thread enters the exchange; or
511 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
512 * the current thread.
513 * </ul>
514 * <p>If the current thread:
515 * <ul>
516 * <li>has its interrupted status set on entry to this method; or
517 * <li>is {@linkplain Thread#interrupt interrupted} while waiting
518 * for the exchange,
519 * </ul>
520 * then {@link InterruptedException} is thrown and the current thread's
521 * interrupted status is cleared.
522 *
523 * @param x the object to exchange
524 * @return the object provided by the other thread
525 * @throws InterruptedException if the current thread was
526 * interrupted while waiting
527 */
528 @SuppressWarnings("unchecked")
529 public V exchange(V x) throws InterruptedException {
530 Object v;
531 Object item = (x == null) ? NULL_ITEM : x; // translate null args
532 if ((arena != null ||
533 (v = slotExchange(item, false, 0L)) == null) &&
534 ((Thread.interrupted() || // disambiguates null return
535 (v = arenaExchange(item, false, 0L)) == null)))
536 throw new InterruptedException();
537 return (v == NULL_ITEM) ? null : (V)v;
538 }
539
540 /**
541 * Waits for another thread to arrive at this exchange point (unless
542 * the current thread is {@linkplain Thread#interrupt interrupted} or
543 * the specified waiting time elapses), and then transfers the given
544 * object to it, receiving its object in return.
545 *
546 * <p>If another thread is already waiting at the exchange point then
547 * it is resumed for thread scheduling purposes and receives the object
548 * passed in by the current thread. The current thread returns immediately,
549 * receiving the object passed to the exchange by that other thread.
550 *
551 * <p>If no other thread is already waiting at the exchange then the
552 * current thread is disabled for thread scheduling purposes and lies
553 * dormant until one of three things happens:
554 * <ul>
555 * <li>Some other thread enters the exchange; or
556 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
557 * the current thread; or
558 * <li>The specified waiting time elapses.
559 * </ul>
560 * <p>If the current thread:
561 * <ul>
562 * <li>has its interrupted status set on entry to this method; or
563 * <li>is {@linkplain Thread#interrupt interrupted} while waiting
564 * for the exchange,
565 * </ul>
566 * then {@link InterruptedException} is thrown and the current thread's
567 * interrupted status is cleared.
568 *
569 * <p>If the specified waiting time elapses then {@link
570 * TimeoutException} is thrown. If the time is less than or equal
571 * to zero, the method will not wait at all.
572 *
573 * @param x the object to exchange
574 * @param timeout the maximum time to wait
575 * @param unit the time unit of the {@code timeout} argument
576 * @return the object provided by the other thread
577 * @throws InterruptedException if the current thread was
578 * interrupted while waiting
579 * @throws TimeoutException if the specified waiting time elapses
580 * before another thread enters the exchange
581 */
582 @SuppressWarnings("unchecked")
583 public V exchange(V x, long timeout, TimeUnit unit)
584 throws InterruptedException, TimeoutException {
585 Object v;
586 Object item = (x == null) ? NULL_ITEM : x;
587 long ns = unit.toNanos(timeout);
588 if ((arena != null ||
589 (v = slotExchange(item, true, ns)) == null) &&
590 ((Thread.interrupted() ||
591 (v = arenaExchange(item, true, ns)) == null)))
592 throw new InterruptedException();
593 if (v == TIMED_OUT)
594 throw new TimeoutException();
595 return (v == NULL_ITEM) ? null : (V)v;
596 }
597
598 // Unsafe mechanics
599 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
600 private static final long BOUND;
601 private static final long SLOT;
602 private static final long MATCH;
603 private static final long BLOCKER;
604 private static final int ABASE;
605 static {
606 try {
607 BOUND = U.objectFieldOffset
608 (Exchanger.class.getDeclaredField("bound"));
609 SLOT = U.objectFieldOffset
610 (Exchanger.class.getDeclaredField("slot"));
611
612 MATCH = U.objectFieldOffset
613 (Node.class.getDeclaredField("match"));
614
615 BLOCKER = U.objectFieldOffset
616 (Thread.class.getDeclaredField("parkBlocker"));
617
618 int scale = U.arrayIndexScale(Node[].class);
619 if ((scale & (scale - 1)) != 0 || scale > (1 << ASHIFT))
620 throw new Error("Unsupported array scale");
621 // ABASE absorbs padding in front of element 0
622 ABASE = U.arrayBaseOffset(Node[].class) + (1 << ASHIFT);
623 } catch (ReflectiveOperationException e) {
624 throw new Error(e);
625 }
626 }
627
628 }