34 |
|
* try { |
35 |
|
* while (currentBuffer != null) { |
36 |
|
* addToBuffer(currentBuffer); |
37 |
< |
* if (currentBuffer.full()) |
37 |
> |
* if (currentBuffer.isFull()) |
38 |
|
* currentBuffer = exchanger.exchange(currentBuffer); |
39 |
|
* } |
40 |
|
* } catch (InterruptedException ex) { ... handle ... } |
47 |
|
* try { |
48 |
|
* while (currentBuffer != null) { |
49 |
|
* takeFromBuffer(currentBuffer); |
50 |
< |
* if (currentBuffer.empty()) |
50 |
> |
* if (currentBuffer.isEmpty()) |
51 |
|
* currentBuffer = exchanger.exchange(currentBuffer); |
52 |
|
* } |
53 |
|
* } catch (InterruptedException ex) { ... handle ...} |
104 |
|
* common usages where only two threads ever meet to exchange |
105 |
|
* items, but they prevent contention bottlenecks when an |
106 |
|
* exchanger is used by a large number of threads. |
107 |
+ |
* |
108 |
+ |
* For more details, see the paper "A Scalable Elimination-based |
109 |
+ |
* Exchange Channel" by William Scherer, Doug Lea, and Michael |
110 |
+ |
* Scott in Proceedings of SCOOL05 workshop. Available at: |
111 |
+ |
* http://hdl.handle.net/1802/2104 |
112 |
|
*/ |
113 |
|
|
114 |
|
/** |
141 |
|
* Each slot holds an AtomicReference<Node>, but this cannot be |
142 |
|
* expressed for arrays, so elements are casted on each use. |
143 |
|
*/ |
144 |
< |
private final AtomicReference[] arena; |
144 |
> |
private final AtomicReference<Node>[] arena; |
145 |
|
|
146 |
|
/** Generator for random backoffs and delays. */ |
147 |
|
private final Random random = new Random(); |
150 |
|
* Creates a new Exchanger. |
151 |
|
*/ |
152 |
|
public Exchanger() { |
153 |
< |
arena = new AtomicReference[SIZE + 1]; |
153 |
> |
arena = (AtomicReference<Node>[]) new AtomicReference[SIZE + 1]; |
154 |
|
for (int i = 0; i < arena.length; ++i) |
155 |
< |
arena[i] = new AtomicReference(); |
155 |
> |
arena[i] = new AtomicReference<Node>(); |
156 |
|
} |
157 |
|
|
158 |
|
/** |
160 |
|
* Uses Object, not "V" as argument and return value to simplify |
161 |
|
* handling of internal sentinel values. Callers from public |
162 |
|
* methods cast accordingly. |
163 |
< |
* @param item the item to exchange. |
164 |
< |
* @param timed true if the wait is timed. |
165 |
< |
* @param nanos if timed, the maximum wait time. |
166 |
< |
* @return the other thread's item. |
163 |
> |
* |
164 |
> |
* @param item the item to exchange |
165 |
> |
* @param timed true if the wait is timed |
166 |
> |
* @param nanos if timed, the maximum wait time |
167 |
> |
* @return the other thread's item |
168 |
|
*/ |
169 |
|
private Object doExchange(Object item, boolean timed, long nanos) |
170 |
|
throws InterruptedException, TimeoutException { |
171 |
|
Node me = new Node(item); |
172 |
< |
long lastTime = (timed)? System.nanoTime() : 0; |
172 |
> |
long lastTime = timed ? System.nanoTime() : 0; |
173 |
|
int idx = 0; // start out at slot representing top |
174 |
|
int backoff = 0; // increases on failure to occupy a slot |
175 |
|
|
176 |
|
for (;;) { |
177 |
< |
AtomicReference<Node> slot = (AtomicReference<Node>)arena[idx]; |
177 |
> |
AtomicReference<Node> slot = arena[idx]; |
178 |
|
|
179 |
|
// If this slot is already occupied, there is a waiting item... |
180 |
|
Node you = slot.get(); |
263 |
|
/** |
264 |
|
* Waits for and gets the hole filled in by another thread. |
265 |
|
* Fails if timed out or interrupted before hole filled. |
266 |
< |
* @param timed true if the wait is timed. |
267 |
< |
* @param nanos if timed, the maximum wait time. |
268 |
< |
* @return on success, the hole; on failure, FAIL. |
266 |
> |
* |
267 |
> |
* @param timed true if the wait is timed |
268 |
> |
* @param nanos if timed, the maximum wait time |
269 |
> |
* @return on success, the hole; on failure, FAIL |
270 |
|
*/ |
271 |
|
Object waitForHole(boolean timed, long nanos) { |
272 |
< |
long lastTime = (timed)? System.nanoTime() : 0; |
272 |
> |
long lastTime = timed ? System.nanoTime() : 0; |
273 |
|
Object h; |
274 |
|
while ((h = get()) == null) { |
275 |
|
// If interrupted or timed out, try to cancel by |
276 |
|
// CASing FAIL as hole value. |
277 |
|
if (Thread.currentThread().isInterrupted() || |
278 |
< |
(timed && nanos <= 0)) |
279 |
< |
compareAndSet(null, FAIL); |
278 |
> |
(timed && nanos <= 0)) { |
279 |
> |
if (compareAndSet(null, FAIL)) |
280 |
> |
return FAIL; |
281 |
> |
} |
282 |
|
else if (!timed) |
283 |
|
LockSupport.park(); |
284 |
|
else { |
321 |
|
* interrupted status is cleared. |
322 |
|
* |
323 |
|
* @param x the object to exchange |
324 |
< |
* @return the object provided by the other thread. |
325 |
< |
* @throws InterruptedException if current thread was interrupted |
326 |
< |
* while waiting |
324 |
> |
* @return the object provided by the other thread |
325 |
> |
* @throws InterruptedException if the current thread was |
326 |
> |
* interrupted while waiting |
327 |
|
*/ |
328 |
|
public V exchange(V x) throws InterruptedException { |
329 |
|
try { |
370 |
|
* |
371 |
|
* @param x the object to exchange |
372 |
|
* @param timeout the maximum time to wait |
373 |
< |
* @param unit the time unit of the <tt>timeout</tt> argument. |
374 |
< |
* @return the object provided by the other thread. |
375 |
< |
* @throws InterruptedException if current thread was interrupted |
376 |
< |
* while waiting |
377 |
< |
* @throws TimeoutException if the specified waiting time elapses before |
378 |
< |
* another thread enters the exchange. |
373 |
> |
* @param unit the time unit of the <tt>timeout</tt> argument |
374 |
> |
* @return the object provided by the other thread |
375 |
> |
* @throws InterruptedException if the current thread was |
376 |
> |
* interrupted while waiting |
377 |
> |
* @throws TimeoutException if the specified waiting time elapses |
378 |
> |
* before another thread enters the exchange |
379 |
|
*/ |
380 |
|
public V exchange(V x, long timeout, TimeUnit unit) |
381 |
|
throws InterruptedException, TimeoutException { |