1 |
dl |
1.2 |
/* |
2 |
dl |
1.16 |
* Written by Doug Lea, Bill Scherer, and Michael Scott with |
3 |
|
|
* assistance from members of JCP JSR-166 Expert Group and released to |
4 |
|
|
* the public domain, as explained at |
5 |
jsr166 |
1.48 |
* http://creativecommons.org/publicdomain/zero/1.0/ |
6 |
dl |
1.2 |
*/ |
7 |
|
|
|
8 |
tim |
1.1 |
package java.util.concurrent; |
9 |
dl |
1.75 |
import java.lang.invoke.MethodHandles; |
10 |
|
|
import java.lang.invoke.VarHandle; |
11 |
|
|
import java.util.concurrent.locks.LockSupport; |
12 |
jsr166 |
1.66 |
|
13 |
tim |
1.1 |
/** |
14 |
dl |
1.28 |
* A synchronization point at which threads can pair and swap elements |
15 |
jsr166 |
1.39 |
* within pairs. Each thread presents some object on entry to the |
16 |
dl |
1.28 |
* {@link #exchange exchange} method, matches with a partner thread, |
17 |
jsr166 |
1.39 |
* and receives its partner's object on return. An Exchanger may be |
18 |
|
|
* viewed as a bidirectional form of a {@link SynchronousQueue}. |
19 |
|
|
* Exchangers may be useful in applications such as genetic algorithms |
20 |
|
|
* and pipeline designs. |
21 |
tim |
1.1 |
* |
22 |
|
|
* <p><b>Sample Usage:</b> |
23 |
jsr166 |
1.29 |
* Here are the highlights of a class that uses an {@code Exchanger} |
24 |
|
|
* to swap buffers between threads so that the thread filling the |
25 |
|
|
* buffer gets a freshly emptied one when it needs it, handing off the |
26 |
|
|
* filled one to the thread emptying the buffer. |
27 |
jsr166 |
1.71 |
* <pre> {@code |
28 |
tim |
1.1 |
* class FillAndEmpty { |
29 |
jsr166 |
1.65 |
* Exchanger<DataBuffer> exchanger = new Exchanger<>(); |
30 |
dl |
1.9 |
* DataBuffer initialEmptyBuffer = ... a made-up type |
31 |
|
|
* DataBuffer initialFullBuffer = ... |
32 |
tim |
1.1 |
* |
33 |
|
|
* class FillingLoop implements Runnable { |
34 |
|
|
* public void run() { |
35 |
dl |
1.9 |
* DataBuffer currentBuffer = initialEmptyBuffer; |
36 |
tim |
1.1 |
* try { |
37 |
|
|
* while (currentBuffer != null) { |
38 |
|
|
* addToBuffer(currentBuffer); |
39 |
dl |
1.30 |
* if (currentBuffer.isFull()) |
40 |
tim |
1.1 |
* currentBuffer = exchanger.exchange(currentBuffer); |
41 |
|
|
* } |
42 |
tim |
1.7 |
* } catch (InterruptedException ex) { ... handle ... } |
43 |
tim |
1.1 |
* } |
44 |
|
|
* } |
45 |
|
|
* |
46 |
|
|
* class EmptyingLoop implements Runnable { |
47 |
|
|
* public void run() { |
48 |
dl |
1.9 |
* DataBuffer currentBuffer = initialFullBuffer; |
49 |
tim |
1.1 |
* try { |
50 |
|
|
* while (currentBuffer != null) { |
51 |
|
|
* takeFromBuffer(currentBuffer); |
52 |
dl |
1.30 |
* if (currentBuffer.isEmpty()) |
53 |
tim |
1.1 |
* currentBuffer = exchanger.exchange(currentBuffer); |
54 |
|
|
* } |
55 |
tim |
1.7 |
* } catch (InterruptedException ex) { ... handle ...} |
56 |
tim |
1.1 |
* } |
57 |
|
|
* } |
58 |
|
|
* |
59 |
|
|
* void start() { |
60 |
|
|
* new Thread(new FillingLoop()).start(); |
61 |
|
|
* new Thread(new EmptyingLoop()).start(); |
62 |
|
|
* } |
63 |
jsr166 |
1.50 |
* }}</pre> |
64 |
tim |
1.1 |
* |
65 |
jsr166 |
1.27 |
* <p>Memory consistency effects: For each pair of threads that |
66 |
|
|
* successfully exchange objects via an {@code Exchanger}, actions |
67 |
|
|
* prior to the {@code exchange()} in each thread |
68 |
|
|
* <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> |
69 |
|
|
* those subsequent to a return from the corresponding {@code exchange()} |
70 |
|
|
* in the other thread. |
71 |
brian |
1.22 |
* |
72 |
tim |
1.1 |
* @since 1.5 |
73 |
dl |
1.16 |
* @author Doug Lea and Bill Scherer and Michael Scott |
74 |
dl |
1.11 |
* @param <V> The type of objects that may be exchanged |
75 |
tim |
1.1 |
*/ |
76 |
|
|
public class Exchanger<V> { |
77 |
dl |
1.55 |
|
78 |
dl |
1.16 |
/* |
79 |
jsr166 |
1.57 |
* Overview: The core algorithm is, for an exchange "slot", |
80 |
dl |
1.55 |
* and a participant (caller) with an item: |
81 |
dl |
1.16 |
* |
82 |
jsr166 |
1.61 |
* for (;;) { |
83 |
|
|
* if (slot is empty) { // offer |
84 |
|
|
* place item in a Node; |
85 |
|
|
* if (can CAS slot from empty to node) { |
86 |
jsr166 |
1.62 |
* wait for release; |
87 |
|
|
* return matching item in node; |
88 |
dl |
1.55 |
* } |
89 |
jsr166 |
1.61 |
* } |
90 |
|
|
* else if (can CAS slot from node to empty) { // release |
91 |
|
|
* get the item in node; |
92 |
|
|
* set matching item in node; |
93 |
|
|
* release waiting thread; |
94 |
|
|
* } |
95 |
|
|
* // else retry on CAS failure |
96 |
|
|
* } |
97 |
dl |
1.55 |
* |
98 |
|
|
* This is among the simplest forms of a "dual data structure" -- |
99 |
|
|
* see Scott and Scherer's DISC 04 paper and |
100 |
|
|
* http://www.cs.rochester.edu/research/synchronization/pseudocode/duals.html |
101 |
|
|
* |
102 |
|
|
* This works great in principle. But in practice, like many |
103 |
|
|
* algorithms centered on atomic updates to a single location, it |
104 |
|
|
* scales horribly when there are more than a few participants |
105 |
|
|
* using the same Exchanger. So the implementation instead uses a |
106 |
|
|
* form of elimination arena, that spreads out this contention by |
107 |
|
|
* arranging that some threads typically use different slots, |
108 |
|
|
* while still ensuring that eventually, any two parties will be |
109 |
|
|
* able to exchange items. That is, we cannot completely partition |
110 |
|
|
* across threads, but instead give threads arena indices that |
111 |
|
|
* will on average grow under contention and shrink under lack of |
112 |
|
|
* contention. We approach this by defining the Nodes that we need |
113 |
|
|
* anyway as ThreadLocals, and include in them per-thread index |
114 |
|
|
* and related bookkeeping state. (We can safely reuse per-thread |
115 |
|
|
* nodes rather than creating them fresh each time because slots |
116 |
|
|
* alternate between pointing to a node vs null, so cannot |
117 |
jsr166 |
1.57 |
* encounter ABA problems. However, we do need some care in |
118 |
dl |
1.55 |
* resetting them between uses.) |
119 |
|
|
* |
120 |
|
|
* Implementing an effective arena requires allocating a bunch of |
121 |
|
|
* space, so we only do so upon detecting contention (except on |
122 |
|
|
* uniprocessors, where they wouldn't help, so aren't used). |
123 |
|
|
* Otherwise, exchanges use the single-slot slotExchange method. |
124 |
|
|
* On contention, not only must the slots be in different |
125 |
|
|
* locations, but the locations must not encounter memory |
126 |
|
|
* contention due to being on the same cache line (or more |
127 |
|
|
* generally, the same coherence unit). Because, as of this |
128 |
|
|
* writing, there is no way to determine cacheline size, we define |
129 |
|
|
* a value that is enough for common platforms. Additionally, |
130 |
|
|
* extra care elsewhere is taken to avoid other false/unintended |
131 |
dl |
1.64 |
* sharing and to enhance locality, including adding padding (via |
132 |
dl |
1.75 |
* @Contended) to Nodes, embedding "bound" as an Exchanger field. |
133 |
dl |
1.55 |
* |
134 |
|
|
* The arena starts out with only one used slot. We expand the |
135 |
|
|
* effective arena size by tracking collisions; i.e., failed CASes |
136 |
|
|
* while trying to exchange. By nature of the above algorithm, the |
137 |
|
|
* only kinds of collision that reliably indicate contention are |
138 |
|
|
* when two attempted releases collide -- one of two attempted |
139 |
|
|
* offers can legitimately fail to CAS without indicating |
140 |
|
|
* contention by more than one other thread. (Note: it is possible |
141 |
|
|
* but not worthwhile to more precisely detect contention by |
142 |
|
|
* reading slot values after CAS failures.) When a thread has |
143 |
|
|
* collided at each slot within the current arena bound, it tries |
144 |
|
|
* to expand the arena size by one. We track collisions within |
145 |
|
|
* bounds by using a version (sequence) number on the "bound" |
146 |
|
|
* field, and conservatively reset collision counts when a |
147 |
|
|
* participant notices that bound has been updated (in either |
148 |
|
|
* direction). |
149 |
|
|
* |
150 |
|
|
* The effective arena size is reduced (when there is more than |
151 |
|
|
* one slot) by giving up on waiting after a while and trying to |
152 |
|
|
* decrement the arena size on expiration. The value of "a while" |
153 |
|
|
* is an empirical matter. We implement by piggybacking on the |
154 |
|
|
* use of spin->yield->block that is essential for reasonable |
155 |
|
|
* waiting performance anyway -- in a busy exchanger, offers are |
156 |
|
|
* usually almost immediately released, in which case context |
157 |
|
|
* switching on multiprocessors is extremely slow/wasteful. Arena |
158 |
|
|
* waits just omit the blocking part, and instead cancel. The spin |
159 |
|
|
* count is empirically chosen to be a value that avoids blocking |
160 |
|
|
* 99% of the time under maximum sustained exchange rates on a |
161 |
|
|
* range of test machines. Spins and yields entail some limited |
162 |
|
|
* randomness (using a cheap xorshift) to avoid regular patterns |
163 |
|
|
* that can induce unproductive grow/shrink cycles. (Using a |
164 |
|
|
* pseudorandom also helps regularize spin cycle duration by |
165 |
|
|
* making branches unpredictable.) Also, during an offer, a |
166 |
|
|
* waiter can "know" that it will be released when its slot has |
167 |
|
|
* changed, but cannot yet proceed until match is set. In the |
168 |
|
|
* mean time it cannot cancel the offer, so instead spins/yields. |
169 |
|
|
* Note: It is possible to avoid this secondary check by changing |
170 |
|
|
* the linearization point to be a CAS of the match field (as done |
171 |
|
|
* in one case in the Scott & Scherer DISC paper), which also |
172 |
|
|
* increases asynchrony a bit, at the expense of poorer collision |
173 |
|
|
* detection and inability to always reuse per-thread nodes. So |
174 |
|
|
* the current scheme is typically a better tradeoff. |
175 |
|
|
* |
176 |
|
|
* On collisions, indices traverse the arena cyclically in reverse |
177 |
|
|
* order, restarting at the maximum index (which will tend to be |
178 |
|
|
* sparsest) when bounds change. (On expirations, indices instead |
179 |
|
|
* are halved until reaching 0.) It is possible (and has been |
180 |
|
|
* tried) to use randomized, prime-value-stepped, or double-hash |
181 |
|
|
* style traversal instead of simple cyclic traversal to reduce |
182 |
|
|
* bunching. But empirically, whatever benefits these may have |
183 |
|
|
* don't overcome their added overhead: We are managing operations |
184 |
|
|
* that occur very quickly unless there is sustained contention, |
185 |
|
|
* so simpler/faster control policies work better than more |
186 |
|
|
* accurate but slower ones. |
187 |
|
|
* |
188 |
|
|
* Because we use expiration for arena size control, we cannot |
189 |
|
|
* throw TimeoutExceptions in the timed version of the public |
190 |
|
|
* exchange method until the arena size has shrunken to zero (or |
191 |
|
|
* the arena isn't enabled). This may delay response to timeout |
192 |
|
|
* but is still within spec. |
193 |
|
|
* |
194 |
|
|
* Essentially all of the implementation is in methods |
195 |
|
|
* slotExchange and arenaExchange. These have similar overall |
196 |
|
|
* structure, but differ in too many details to combine. The |
197 |
|
|
* slotExchange method uses the single Exchanger field "slot" |
198 |
|
|
* rather than arena array elements. However, it still needs |
199 |
|
|
* minimal collision detection to trigger arena construction. |
200 |
|
|
* (The messiest part is making sure interrupt status and |
201 |
|
|
* InterruptedExceptions come out right during transitions when |
202 |
|
|
* both methods may be called. This is done by using null return |
203 |
|
|
* as a sentinel to recheck interrupt status.) |
204 |
|
|
* |
205 |
jsr166 |
1.57 |
* As is too common in this sort of code, methods are monolithic |
206 |
dl |
1.55 |
* because most of the logic relies on reads of fields that are |
207 |
|
|
* maintained as local variables so can't be nicely factored -- |
208 |
|
|
* mainly, here, bulky spin->yield->block/cancel code), and |
209 |
dl |
1.75 |
* heavily dependent on intrinsics (VarHandles) to use inlined |
210 |
jsr166 |
1.57 |
* embedded CAS and related memory access operations (that tend |
211 |
dl |
1.55 |
* not to be as readily inlined by dynamic compilers when they are |
212 |
|
|
* hidden behind other methods that would more nicely name and |
213 |
|
|
* encapsulate the intended effects). This includes the use of |
214 |
jsr166 |
1.74 |
* putXRelease to clear fields of the per-thread Nodes between |
215 |
dl |
1.55 |
* uses. Note that field Node.item is not declared as volatile |
216 |
|
|
* even though it is read by releasing threads, because they only |
217 |
jsr166 |
1.57 |
* do so after CAS operations that must precede access, and all |
218 |
dl |
1.55 |
* uses by the owning thread are otherwise acceptably ordered by |
219 |
|
|
* other operations. (Because the actual points of atomicity are |
220 |
|
|
* slot CASes, it would also be legal for the write to Node.match |
221 |
|
|
* in a release to be weaker than a full volatile write. However, |
222 |
|
|
* this is not done because it could allow further postponement of |
223 |
|
|
* the write, delaying progress.) |
224 |
|
|
*/ |
225 |
|
|
|
226 |
|
|
/** |
227 |
|
|
* The byte distance (as a shift value) between any two used slots |
228 |
|
|
* in the arena. 1 << ASHIFT should be at least cacheline size. |
229 |
|
|
*/ |
230 |
|
|
private static final int ASHIFT = 7; |
231 |
|
|
|
232 |
|
|
/** |
233 |
|
|
* The maximum supported arena index. The maximum allocatable |
234 |
|
|
* arena size is MMASK + 1. Must be a power of two minus one, less |
235 |
|
|
* than (1<<(31-ASHIFT)). The cap of 255 (0xff) more than suffices |
236 |
|
|
* for the expected scaling limits of the main algorithms. |
237 |
dl |
1.16 |
*/ |
238 |
jsr166 |
1.59 |
private static final int MMASK = 0xff; |
239 |
dl |
1.55 |
|
240 |
|
|
/** |
241 |
|
|
* Unit for sequence/version bits of bound field. Each successful |
242 |
|
|
* change to the bound also adds SEQ. |
243 |
|
|
*/ |
244 |
jsr166 |
1.59 |
private static final int SEQ = MMASK + 1; |
245 |
dl |
1.2 |
|
246 |
dl |
1.32 |
/** The number of CPUs, for sizing and spin control */ |
247 |
dl |
1.37 |
private static final int NCPU = Runtime.getRuntime().availableProcessors(); |
248 |
dl |
1.32 |
|
249 |
jsr166 |
1.17 |
/** |
250 |
dl |
1.55 |
* The maximum slot index of the arena: The number of slots that |
251 |
|
|
* can in principle hold all threads without contention, or at |
252 |
|
|
* most the maximum indexable value. |
253 |
dl |
1.37 |
*/ |
254 |
dl |
1.55 |
static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1; |
255 |
dl |
1.37 |
|
256 |
|
|
/** |
257 |
dl |
1.55 |
* The bound for spins while waiting for a match. The actual |
258 |
|
|
* number of iterations will on average be about twice this value |
259 |
|
|
* due to randomization. Note: Spinning is disabled when NCPU==1. |
260 |
dl |
1.37 |
*/ |
261 |
dl |
1.55 |
private static final int SPINS = 1 << 10; |
262 |
dl |
1.37 |
|
263 |
|
|
/** |
264 |
dl |
1.55 |
* Value representing null arguments/returns from public |
265 |
|
|
* methods. Needed because the API originally didn't disallow null |
266 |
|
|
* arguments, which it should have. |
267 |
dl |
1.16 |
*/ |
268 |
dl |
1.55 |
private static final Object NULL_ITEM = new Object(); |
269 |
dl |
1.34 |
|
270 |
|
|
/** |
271 |
dl |
1.55 |
* Sentinel value returned by internal exchange methods upon |
272 |
|
|
* timeout, to avoid need for separate timed versions of these |
273 |
|
|
* methods. |
274 |
dl |
1.34 |
*/ |
275 |
dl |
1.55 |
private static final Object TIMED_OUT = new Object(); |
276 |
dl |
1.34 |
|
277 |
|
|
/** |
278 |
dl |
1.55 |
* Nodes hold partially exchanged data, plus other per-thread |
279 |
jsr166 |
1.73 |
* bookkeeping. Padded via @Contended to reduce memory contention. |
280 |
dl |
1.34 |
*/ |
281 |
jsr166 |
1.73 |
@jdk.internal.vm.annotation.Contended static final class Node { |
282 |
dl |
1.55 |
int index; // Arena index |
283 |
|
|
int bound; // Last recorded value of Exchanger.bound |
284 |
|
|
int collides; // Number of CAS failures at current bound |
285 |
|
|
int hash; // Pseudo-random for spins |
286 |
|
|
Object item; // This thread's current item |
287 |
|
|
volatile Object match; // Item provided by releasing thread |
288 |
|
|
volatile Thread parked; // Set to this thread when parked, else null |
289 |
|
|
} |
290 |
|
|
|
291 |
|
|
/** The corresponding thread local class */ |
292 |
|
|
static final class Participant extends ThreadLocal<Node> { |
293 |
|
|
public Node initialValue() { return new Node(); } |
294 |
|
|
} |
295 |
dl |
1.32 |
|
296 |
|
|
/** |
297 |
jsr166 |
1.72 |
* Per-thread state. |
298 |
dl |
1.32 |
*/ |
299 |
dl |
1.55 |
private final Participant participant; |
300 |
dl |
1.32 |
|
301 |
|
|
/** |
302 |
dl |
1.55 |
* Elimination array; null until enabled (within slotExchange). |
303 |
|
|
* Element accesses use emulation of volatile gets and CAS. |
304 |
|
|
*/ |
305 |
|
|
private volatile Node[] arena; |
306 |
dl |
1.5 |
|
307 |
dl |
1.34 |
/** |
308 |
dl |
1.55 |
* Slot used until contention detected. |
309 |
dl |
1.34 |
*/ |
310 |
dl |
1.55 |
private volatile Node slot; |
311 |
dl |
1.5 |
|
312 |
dl |
1.16 |
/** |
313 |
dl |
1.55 |
* The index of the largest valid arena position, OR'ed with SEQ |
314 |
|
|
* number in high bits, incremented on each update. The initial |
315 |
|
|
* update from 0 to SEQ is used to ensure that the arena array is |
316 |
|
|
* constructed only once. |
317 |
dl |
1.16 |
*/ |
318 |
dl |
1.55 |
private volatile int bound; |
319 |
dl |
1.2 |
|
320 |
dl |
1.16 |
/** |
321 |
dl |
1.55 |
* Exchange function when arenas enabled. See above for explanation. |
322 |
dl |
1.30 |
* |
323 |
jsr166 |
1.60 |
* @param item the (non-null) item to exchange |
324 |
dl |
1.30 |
* @param timed true if the wait is timed |
325 |
jsr166 |
1.58 |
* @param ns if timed, the maximum wait time, else 0L |
326 |
dl |
1.55 |
* @return the other thread's item; or null if interrupted; or |
327 |
|
|
* TIMED_OUT if timed and timed out |
328 |
|
|
*/ |
329 |
|
|
private final Object arenaExchange(Object item, boolean timed, long ns) { |
330 |
|
|
Node[] a = arena; |
331 |
dl |
1.75 |
int alen = a.length; |
332 |
dl |
1.55 |
Node p = participant.get(); |
333 |
|
|
for (int i = p.index;;) { // access slot at i |
334 |
dl |
1.75 |
int b, m, c; |
335 |
|
|
int j = (i << ASHIFT) + ((1 << ASHIFT) - 1); |
336 |
|
|
if (j < 0 || j >= alen) |
337 |
|
|
j = alen - 1; |
338 |
|
|
Node q = (Node)AA.get(a, j); |
339 |
|
|
if (q != null && AA.compareAndSet(a, j, q, null)) { |
340 |
dl |
1.55 |
Object v = q.item; // release |
341 |
|
|
q.match = item; |
342 |
|
|
Thread w = q.parked; |
343 |
|
|
if (w != null) |
344 |
dl |
1.75 |
LockSupport.unpark(w); |
345 |
dl |
1.55 |
return v; |
346 |
dl |
1.37 |
} |
347 |
dl |
1.55 |
else if (i <= (m = (b = bound) & MMASK) && q == null) { |
348 |
|
|
p.item = item; // offer |
349 |
dl |
1.75 |
if (AA.compareAndSet(a, j, null, p)) { |
350 |
jsr166 |
1.56 |
long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; |
351 |
dl |
1.55 |
Thread t = Thread.currentThread(); // wait |
352 |
|
|
for (int h = p.hash, spins = SPINS;;) { |
353 |
|
|
Object v = p.match; |
354 |
|
|
if (v != null) { |
355 |
dl |
1.75 |
MATCH.setRelease(p, null); |
356 |
dl |
1.55 |
p.item = null; // clear for next use |
357 |
|
|
p.hash = h; |
358 |
|
|
return v; |
359 |
|
|
} |
360 |
|
|
else if (spins > 0) { |
361 |
|
|
h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift |
362 |
|
|
if (h == 0) // initialize hash |
363 |
|
|
h = SPINS | (int)t.getId(); |
364 |
|
|
else if (h < 0 && // approx 50% true |
365 |
|
|
(--spins & ((SPINS >>> 1) - 1)) == 0) |
366 |
|
|
Thread.yield(); // two yields per wait |
367 |
|
|
} |
368 |
dl |
1.75 |
else if (AA.getVolatile(a, j) != p) |
369 |
dl |
1.55 |
spins = SPINS; // releaser hasn't set match yet |
370 |
|
|
else if (!t.isInterrupted() && m == 0 && |
371 |
|
|
(!timed || |
372 |
|
|
(ns = end - System.nanoTime()) > 0L)) { |
373 |
|
|
p.parked = t; // minimize window |
374 |
dl |
1.75 |
if (AA.getVolatile(a, j) == p) { |
375 |
|
|
if (ns == 0L) |
376 |
|
|
LockSupport.park(this); |
377 |
|
|
else |
378 |
|
|
LockSupport.parkNanos(this, ns); |
379 |
|
|
} |
380 |
dl |
1.55 |
p.parked = null; |
381 |
|
|
} |
382 |
dl |
1.75 |
else if (AA.getVolatile(a, j) == p && |
383 |
|
|
AA.compareAndSet(a, j, p, null)) { |
384 |
dl |
1.55 |
if (m != 0) // try to shrink |
385 |
dl |
1.75 |
BOUND.compareAndSet(this, b, b + SEQ - 1); |
386 |
dl |
1.55 |
p.item = null; |
387 |
|
|
p.hash = h; |
388 |
|
|
i = p.index >>>= 1; // descend |
389 |
|
|
if (Thread.interrupted()) |
390 |
|
|
return null; |
391 |
|
|
if (timed && m == 0 && ns <= 0L) |
392 |
|
|
return TIMED_OUT; |
393 |
|
|
break; // expired; restart |
394 |
|
|
} |
395 |
|
|
} |
396 |
|
|
} |
397 |
|
|
else |
398 |
|
|
p.item = null; // clear offer |
399 |
dl |
1.2 |
} |
400 |
dl |
1.55 |
else { |
401 |
|
|
if (p.bound != b) { // stale; reset |
402 |
|
|
p.bound = b; |
403 |
|
|
p.collides = 0; |
404 |
|
|
i = (i != m || m == 0) ? m : m - 1; |
405 |
|
|
} |
406 |
|
|
else if ((c = p.collides) < m || m == FULL || |
407 |
dl |
1.75 |
!BOUND.compareAndSet(this, b, b + SEQ + 1)) { |
408 |
dl |
1.55 |
p.collides = c + 1; |
409 |
|
|
i = (i == 0) ? m : i - 1; // cyclically traverse |
410 |
|
|
} |
411 |
|
|
else |
412 |
|
|
i = m + 1; // grow |
413 |
|
|
p.index = i; |
414 |
dl |
1.34 |
} |
415 |
dl |
1.37 |
} |
416 |
|
|
} |
417 |
dl |
1.2 |
|
418 |
dl |
1.37 |
/** |
419 |
dl |
1.55 |
* Exchange function used until arenas enabled. See above for explanation. |
420 |
|
|
* |
421 |
|
|
* @param item the item to exchange |
422 |
|
|
* @param timed true if the wait is timed |
423 |
jsr166 |
1.58 |
* @param ns if timed, the maximum wait time, else 0L |
424 |
dl |
1.55 |
* @return the other thread's item; or null if either the arena |
425 |
|
|
* was enabled or the thread was interrupted before completion; or |
426 |
|
|
* TIMED_OUT if timed and timed out |
427 |
|
|
*/ |
428 |
|
|
private final Object slotExchange(Object item, boolean timed, long ns) { |
429 |
|
|
Node p = participant.get(); |
430 |
|
|
Thread t = Thread.currentThread(); |
431 |
|
|
if (t.isInterrupted()) // preserve interrupt status so caller can recheck |
432 |
|
|
return null; |
433 |
|
|
|
434 |
|
|
for (Node q;;) { |
435 |
|
|
if ((q = slot) != null) { |
436 |
dl |
1.75 |
if (SLOT.compareAndSet(this, q, null)) { |
437 |
dl |
1.55 |
Object v = q.item; |
438 |
|
|
q.match = item; |
439 |
|
|
Thread w = q.parked; |
440 |
|
|
if (w != null) |
441 |
dl |
1.75 |
LockSupport.unpark(w); |
442 |
dl |
1.55 |
return v; |
443 |
|
|
} |
444 |
|
|
// create arena on contention, but continue until slot null |
445 |
|
|
if (NCPU > 1 && bound == 0 && |
446 |
dl |
1.75 |
BOUND.compareAndSet(this, 0, SEQ)) |
447 |
dl |
1.55 |
arena = new Node[(FULL + 2) << ASHIFT]; |
448 |
|
|
} |
449 |
|
|
else if (arena != null) |
450 |
|
|
return null; // caller must reroute to arenaExchange |
451 |
|
|
else { |
452 |
|
|
p.item = item; |
453 |
dl |
1.75 |
if (SLOT.compareAndSet(this, null, p)) |
454 |
dl |
1.55 |
break; |
455 |
|
|
p.item = null; |
456 |
|
|
} |
457 |
dl |
1.16 |
} |
458 |
|
|
|
459 |
dl |
1.55 |
// await release |
460 |
|
|
int h = p.hash; |
461 |
jsr166 |
1.56 |
long end = timed ? System.nanoTime() + ns : 0L; |
462 |
dl |
1.55 |
int spins = (NCPU > 1) ? SPINS : 1; |
463 |
|
|
Object v; |
464 |
|
|
while ((v = p.match) == null) { |
465 |
|
|
if (spins > 0) { |
466 |
|
|
h ^= h << 1; h ^= h >>> 3; h ^= h << 10; |
467 |
|
|
if (h == 0) |
468 |
|
|
h = SPINS | (int)t.getId(); |
469 |
|
|
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) |
470 |
|
|
Thread.yield(); |
471 |
jsr166 |
1.53 |
} |
472 |
dl |
1.55 |
else if (slot != p) |
473 |
|
|
spins = SPINS; |
474 |
|
|
else if (!t.isInterrupted() && arena == null && |
475 |
|
|
(!timed || (ns = end - System.nanoTime()) > 0L)) { |
476 |
|
|
p.parked = t; |
477 |
dl |
1.75 |
if (slot == p) { |
478 |
|
|
if (ns == 0L) |
479 |
|
|
LockSupport.park(this); |
480 |
|
|
else |
481 |
|
|
LockSupport.parkNanos(this, ns); |
482 |
|
|
} |
483 |
dl |
1.55 |
p.parked = null; |
484 |
dl |
1.37 |
} |
485 |
dl |
1.75 |
else if (SLOT.compareAndSet(this, p, null)) { |
486 |
dl |
1.55 |
v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; |
487 |
|
|
break; |
488 |
dl |
1.16 |
} |
489 |
|
|
} |
490 |
dl |
1.75 |
MATCH.setRelease(p, null); |
491 |
dl |
1.55 |
p.item = null; |
492 |
|
|
p.hash = h; |
493 |
|
|
return v; |
494 |
dl |
1.37 |
} |
495 |
|
|
|
496 |
|
|
/** |
497 |
|
|
* Creates a new Exchanger. |
498 |
|
|
*/ |
499 |
|
|
public Exchanger() { |
500 |
dl |
1.55 |
participant = new Participant(); |
501 |
tim |
1.1 |
} |
502 |
|
|
|
503 |
|
|
/** |
504 |
|
|
* Waits for another thread to arrive at this exchange point (unless |
505 |
jsr166 |
1.44 |
* the current thread is {@linkplain Thread#interrupt interrupted}), |
506 |
tim |
1.1 |
* and then transfers the given object to it, receiving its object |
507 |
|
|
* in return. |
508 |
jsr166 |
1.17 |
* |
509 |
tim |
1.1 |
* <p>If another thread is already waiting at the exchange point then |
510 |
|
|
* it is resumed for thread scheduling purposes and receives the object |
511 |
jsr166 |
1.39 |
* passed in by the current thread. The current thread returns immediately, |
512 |
tim |
1.1 |
* receiving the object passed to the exchange by that other thread. |
513 |
jsr166 |
1.17 |
* |
514 |
jsr166 |
1.15 |
* <p>If no other thread is already waiting at the exchange then the |
515 |
tim |
1.1 |
* current thread is disabled for thread scheduling purposes and lies |
516 |
|
|
* dormant until one of two things happens: |
517 |
|
|
* <ul> |
518 |
|
|
* <li>Some other thread enters the exchange; or |
519 |
jsr166 |
1.45 |
* <li>Some other thread {@linkplain Thread#interrupt interrupts} |
520 |
|
|
* the current thread. |
521 |
tim |
1.1 |
* </ul> |
522 |
|
|
* <p>If the current thread: |
523 |
|
|
* <ul> |
524 |
jsr166 |
1.15 |
* <li>has its interrupted status set on entry to this method; or |
525 |
jsr166 |
1.44 |
* <li>is {@linkplain Thread#interrupt interrupted} while waiting |
526 |
jsr166 |
1.15 |
* for the exchange, |
527 |
tim |
1.1 |
* </ul> |
528 |
jsr166 |
1.15 |
* then {@link InterruptedException} is thrown and the current thread's |
529 |
|
|
* interrupted status is cleared. |
530 |
tim |
1.1 |
* |
531 |
|
|
* @param x the object to exchange |
532 |
dl |
1.30 |
* @return the object provided by the other thread |
533 |
|
|
* @throws InterruptedException if the current thread was |
534 |
|
|
* interrupted while waiting |
535 |
jsr166 |
1.15 |
*/ |
536 |
jsr166 |
1.52 |
@SuppressWarnings("unchecked") |
537 |
tim |
1.1 |
public V exchange(V x) throws InterruptedException { |
538 |
dl |
1.55 |
Object v; |
539 |
dl |
1.75 |
Node[] a; |
540 |
jsr166 |
1.56 |
Object item = (x == null) ? NULL_ITEM : x; // translate null args |
541 |
dl |
1.75 |
if (((a = arena) != null || |
542 |
dl |
1.55 |
(v = slotExchange(item, false, 0L)) == null) && |
543 |
|
|
((Thread.interrupted() || // disambiguates null return |
544 |
|
|
(v = arenaExchange(item, false, 0L)) == null))) |
545 |
|
|
throw new InterruptedException(); |
546 |
jsr166 |
1.56 |
return (v == NULL_ITEM) ? null : (V)v; |
547 |
tim |
1.1 |
} |
548 |
|
|
|
549 |
|
|
/** |
550 |
|
|
* Waits for another thread to arrive at this exchange point (unless |
551 |
jsr166 |
1.44 |
* the current thread is {@linkplain Thread#interrupt interrupted} or |
552 |
jsr166 |
1.31 |
* the specified waiting time elapses), and then transfers the given |
553 |
|
|
* object to it, receiving its object in return. |
554 |
tim |
1.1 |
* |
555 |
|
|
* <p>If another thread is already waiting at the exchange point then |
556 |
|
|
* it is resumed for thread scheduling purposes and receives the object |
557 |
jsr166 |
1.39 |
* passed in by the current thread. The current thread returns immediately, |
558 |
tim |
1.1 |
* receiving the object passed to the exchange by that other thread. |
559 |
|
|
* |
560 |
jsr166 |
1.15 |
* <p>If no other thread is already waiting at the exchange then the |
561 |
tim |
1.1 |
* current thread is disabled for thread scheduling purposes and lies |
562 |
|
|
* dormant until one of three things happens: |
563 |
|
|
* <ul> |
564 |
|
|
* <li>Some other thread enters the exchange; or |
565 |
jsr166 |
1.44 |
* <li>Some other thread {@linkplain Thread#interrupt interrupts} |
566 |
|
|
* the current thread; or |
567 |
tim |
1.1 |
* <li>The specified waiting time elapses. |
568 |
|
|
* </ul> |
569 |
|
|
* <p>If the current thread: |
570 |
|
|
* <ul> |
571 |
jsr166 |
1.15 |
* <li>has its interrupted status set on entry to this method; or |
572 |
jsr166 |
1.44 |
* <li>is {@linkplain Thread#interrupt interrupted} while waiting |
573 |
jsr166 |
1.15 |
* for the exchange, |
574 |
tim |
1.1 |
* </ul> |
575 |
jsr166 |
1.15 |
* then {@link InterruptedException} is thrown and the current thread's |
576 |
|
|
* interrupted status is cleared. |
577 |
tim |
1.1 |
* |
578 |
dl |
1.37 |
* <p>If the specified waiting time elapses then {@link |
579 |
|
|
* TimeoutException} is thrown. If the time is less than or equal |
580 |
|
|
* to zero, the method will not wait at all. |
581 |
tim |
1.1 |
* |
582 |
|
|
* @param x the object to exchange |
583 |
|
|
* @param timeout the maximum time to wait |
584 |
jsr166 |
1.63 |
* @param unit the time unit of the {@code timeout} argument |
585 |
dl |
1.30 |
* @return the object provided by the other thread |
586 |
|
|
* @throws InterruptedException if the current thread was |
587 |
|
|
* interrupted while waiting |
588 |
|
|
* @throws TimeoutException if the specified waiting time elapses |
589 |
|
|
* before another thread enters the exchange |
590 |
jsr166 |
1.15 |
*/ |
591 |
jsr166 |
1.52 |
@SuppressWarnings("unchecked") |
592 |
jsr166 |
1.15 |
public V exchange(V x, long timeout, TimeUnit unit) |
593 |
tim |
1.1 |
throws InterruptedException, TimeoutException { |
594 |
dl |
1.55 |
Object v; |
595 |
jsr166 |
1.56 |
Object item = (x == null) ? NULL_ITEM : x; |
596 |
dl |
1.55 |
long ns = unit.toNanos(timeout); |
597 |
|
|
if ((arena != null || |
598 |
|
|
(v = slotExchange(item, true, ns)) == null) && |
599 |
|
|
((Thread.interrupted() || |
600 |
|
|
(v = arenaExchange(item, true, ns)) == null))) |
601 |
|
|
throw new InterruptedException(); |
602 |
|
|
if (v == TIMED_OUT) |
603 |
|
|
throw new TimeoutException(); |
604 |
jsr166 |
1.56 |
return (v == NULL_ITEM) ? null : (V)v; |
605 |
dl |
1.55 |
} |
606 |
|
|
|
607 |
dl |
1.75 |
// VarHandle mechanics |
608 |
|
|
private static final VarHandle BOUND; |
609 |
|
|
private static final VarHandle SLOT; |
610 |
|
|
private static final VarHandle MATCH; |
611 |
|
|
private static final VarHandle AA; |
612 |
dl |
1.55 |
static { |
613 |
|
|
try { |
614 |
dl |
1.75 |
MethodHandles.Lookup l = MethodHandles.lookup(); |
615 |
|
|
BOUND = l.findVarHandle(Exchanger.class, "bound", int.class); |
616 |
|
|
SLOT = l.findVarHandle(Exchanger.class, "slot", Node.class); |
617 |
|
|
MATCH = l.findVarHandle(Node.class, "match", Object.class); |
618 |
|
|
AA = MethodHandles.arrayElementVarHandle(Node[].class); |
619 |
jsr166 |
1.68 |
} catch (ReflectiveOperationException e) { |
620 |
dl |
1.55 |
throw new Error(e); |
621 |
dl |
1.34 |
} |
622 |
|
|
} |
623 |
dl |
1.55 |
|
624 |
tim |
1.1 |
} |