ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
Revision: 1.20
Committed: Mon Jun 13 18:41:34 2005 UTC (18 years, 11 months ago) by dl
Branch: MAIN
Changes since 1.19: +2 -0 lines
Log Message:
Add serial ids

File Contents

# User Rev Content
1 dl 1.2 /*
2 dl 1.16 * Written by Doug Lea, Bill Scherer, and Michael Scott with
3     * assistance from members of JCP JSR-166 Expert Group and released to
4     * the public domain, as explained at
5 dl 1.14 * http://creativecommons.org/licenses/publicdomain
6 dl 1.2 */
7    
8 tim 1.1 package java.util.concurrent;
9 dl 1.4 import java.util.concurrent.locks.*;
10 dl 1.16 import java.util.concurrent.atomic.*;
11     import java.util.Random;
12 tim 1.1
13     /**
14 dl 1.12 * A synchronization point at which two threads can exchange objects.
15     * Each thread presents some object on entry to the {@link #exchange
16     * exchange} method, and receives the object presented by the other
17     * thread on return.
18 tim 1.1 *
19     * <p><b>Sample Usage:</b>
20     * Here are the highlights of a class that uses an <tt>Exchanger</tt> to
21     * swap buffers between threads so that the thread filling the
22     * buffer gets a freshly
23     * emptied one when it needs it, handing off the filled one to
24     * the thread emptying the buffer.
25     * <pre>
26     * class FillAndEmpty {
27 dl 1.9 * Exchanger&lt;DataBuffer&gt; exchanger = new Exchanger();
28     * DataBuffer initialEmptyBuffer = ... a made-up type
29     * DataBuffer initialFullBuffer = ...
30 tim 1.1 *
31     * class FillingLoop implements Runnable {
32     * public void run() {
33 dl 1.9 * DataBuffer currentBuffer = initialEmptyBuffer;
34 tim 1.1 * try {
35     * while (currentBuffer != null) {
36     * addToBuffer(currentBuffer);
37     * if (currentBuffer.full())
38     * currentBuffer = exchanger.exchange(currentBuffer);
39     * }
40 tim 1.7 * } catch (InterruptedException ex) { ... handle ... }
41 tim 1.1 * }
42     * }
43     *
44     * class EmptyingLoop implements Runnable {
45     * public void run() {
46 dl 1.9 * DataBuffer currentBuffer = initialFullBuffer;
47 tim 1.1 * try {
48     * while (currentBuffer != null) {
49     * takeFromBuffer(currentBuffer);
50     * if (currentBuffer.empty())
51     * currentBuffer = exchanger.exchange(currentBuffer);
52     * }
53 tim 1.7 * } catch (InterruptedException ex) { ... handle ...}
54 tim 1.1 * }
55     * }
56     *
57     * void start() {
58     * new Thread(new FillingLoop()).start();
59     * new Thread(new EmptyingLoop()).start();
60     * }
61     * }
62     * </pre>
63     *
64     * @since 1.5
65 dl 1.16 * @author Doug Lea and Bill Scherer and Michael Scott
66 dl 1.11 * @param <V> The type of objects that may be exchanged
67 tim 1.1 */
68     public class Exchanger<V> {
69 dl 1.16 /*
70     * The underlying idea is to use a stack to hold nodes containing
71     * pairs of items to be exchanged. Except that:
72     *
73     * * Only one element of the pair is known on creation by a
74     * first-arriving thread; the other is a "hole" waiting to be
75     * filled in. This is a degenerate form of the dual stacks
76     * described in "Nonblocking Concurrent Objects with Condition
77     * Synchronization", by W. N. Scherer III and M. L. Scott.
78     * 18th Annual Conf. on Distributed Computing, Oct. 2004.
79     * It is "degenerate" in that both the items and the holes
80     * are shared in the same nodes.
81     *
82     * * There isn't really a stack here! There can't be -- if two
83     * nodes were both in the stack, they should cancel themselves
84     * out by combining. So that's what we do. The 0th element of
85     * the "arena" array serves only as the top of stack. The
86     * remainder of the array is a form of the elimination backoff
87     * collision array described in "A Scalable Lock-free Stack
88     * Algorithm", by D. Hendler, N. Shavit, and L. Yerushalmi.
89     * 16th ACM Symposium on Parallelism in Algorithms and
90     * Architectures, June 2004. Here, threads spin (using short
91     * timed waits with exponential backoff) looking for each
92     * other. If they fail to find others waiting, they try the
93     * top spot again. As shown in that paper, this always
94     * converges.
95     *
96     * The backoff elimination mechanics never come into play in
97     * common usages where only two threads ever meet to exchange
98     * items, but they prevent contention bottlenecks when an
99     * exchanger is used by a large number of threads.
100     */
101 dl 1.2
102 jsr166 1.17 /**
103 dl 1.16 * Size of collision space. Using a size of half the number of
104     * CPUs provides enough space for threads to find each other but
105     * not so much that it would always require one or more to time
106     * out to become unstuck. Note that the arena array holds SIZE+1
107     * elements, to include the top-of-stack slot.
108     */
109 jsr166 1.17 private static final int SIZE =
110 dl 1.16 (Runtime.getRuntime().availableProcessors() + 1) / 2;
111 jsr166 1.15
112 dl 1.2 /**
113 dl 1.16 * Base unit in nanoseconds for backoffs. Must be a power of two.
114     * Should be small because backoffs exponentially increase from
115     * base.
116 dl 1.2 */
117 dl 1.16 private static final long BACKOFF_BASE = 128L;
118 dl 1.2
119 jsr166 1.17 /**
120 dl 1.16 * Sentinel item representing cancellation. This value is placed
121     * in holes on cancellation, and used as a return value from Node
122     * methods to indicate failure to set or get hole.
123 jsr166 1.17 */
124 dl 1.16 static final Object FAIL = new Object();
125    
126 jsr166 1.17 /**
127 dl 1.16 * The collision arena. arena[0] is used as the top of the stack.
128     * The remainder is used as the collision elimination space.
129 jsr166 1.17 * Each slot holds an AtomicReference<Node>, but this cannot be
130 dl 1.16 * expressed for arrays, so elements are casted on each use.
131 dl 1.3 */
132 dl 1.16 private final AtomicReference[] arena;
133 dl 1.5
134 dl 1.16 /** Generator for random backoffs and delays. */
135     private final Random random = new Random();
136 dl 1.5
137 dl 1.16 /**
138     * Creates a new Exchanger.
139     */
140     public Exchanger() {
141     arena = new AtomicReference[SIZE + 1];
142     for (int i = 0; i < arena.length; ++i)
143     arena[i] = new AtomicReference();
144     }
145 dl 1.2
146 dl 1.16 /**
147     * Main exchange function, handling the different policy variants.
148     * Uses Object, not "V" as argument and return value to simplify
149     * handling of internal sentinel values. Callers from public
150     * methods cast accordingly.
151     * @param item the item to exchange.
152     * @param timed true if the wait is timed.
153     * @param nanos if timed, the maximum wait time.
154     * @return the other thread's item.
155     */
156     private Object doExchange(Object item, boolean timed, long nanos)
157     throws InterruptedException, TimeoutException {
158     Node me = new Node(item);
159     long lastTime = (timed)? System.nanoTime() : 0;
160     int idx = 0; // start out at slot representing top
161     int backoff = 0; // increases on failure to occupy a slot
162    
163     for (;;) {
164     AtomicReference<Node> slot = (AtomicReference<Node>)arena[idx];
165    
166     // If this slot is already occupied, there is a waiting item...
167     Node you = slot.get();
168     if (you != null) {
169     Object v = you.fillHole(item);
170     slot.compareAndSet(you, null);
171     if (v != FAIL) // ... unless it was cancelled
172     return v;
173 dl 1.2 }
174    
175 dl 1.16 // Try to occupy this slot
176     if (slot.compareAndSet(null, me)) {
177     // If this is top slot, use regular wait, else backoff-wait
178     Object v = ((idx == 0)?
179     me.waitForHole(timed, nanos) :
180     me.waitForHole(true, randomDelay(backoff)));
181     slot.compareAndSet(me, null);
182     if (v != FAIL)
183     return v;
184     if (Thread.interrupted())
185     throw new InterruptedException();
186     if (timed) {
187     long now = System.nanoTime();
188     nanos -= now - lastTime;
189     lastTime = now;
190     if (nanos <= 0)
191     throw new TimeoutException();
192     }
193 dl 1.2
194 dl 1.16 me = new Node(item); // Throw away nodes on failure
195     if (backoff < SIZE - 1) // Increase or stay saturated
196     ++backoff;
197     idx = 0; // Restart at top
198 dl 1.2 }
199    
200 dl 1.16 else // Retry with a random non-top slot <= backoff
201     idx = 1 + random.nextInt(backoff + 1);
202 dl 1.2
203     }
204     }
205 tim 1.1
206     /**
207 dl 1.16 * Returns a random delay less than (base times (2 raised to backoff))
208     */
209     private long randomDelay(int backoff) {
210     return ((BACKOFF_BASE << backoff) - 1) & random.nextInt();
211     }
212    
213     /**
214     * Nodes hold partially exchanged data. This class
215     * opportunistically subclasses AtomicReference to represent the
216     * hole. So get() returns hole, and compareAndSet CAS'es value
217     * into hole. Note that this class cannot be parameterized as V
218     * because the sentinel value FAIL is only of type Object.
219 jsr166 1.15 */
220 dl 1.16 static final class Node extends AtomicReference<Object> {
221 dl 1.20 private static final long serialVersionUID = -3221313401284163686L;
222    
223 dl 1.16 /** The element offered by the Thread creating this node. */
224     final Object item;
225     /** The Thread creating this node. */
226     final Thread waiter;
227    
228     /**
229     * Creates node with given item and empty hole.
230     * @param item the item.
231     */
232     Node(Object item) {
233     this.item = item;
234     waiter = Thread.currentThread();
235     }
236    
237     /**
238     * Tries to fill in hole. On success, wakes up the waiter.
239     * @param val the value to place in hole.
240     * @return on success, the item; on failure, FAIL.
241     */
242     Object fillHole(Object val) {
243     if (compareAndSet(null, val)) {
244     LockSupport.unpark(waiter);
245     return item;
246     }
247     return FAIL;
248     }
249    
250     /**
251 jsr166 1.17 * Waits for and gets the hole filled in by another thread.
252     * Fails if timed out or interrupted before hole filled.
253 dl 1.16 * @param timed true if the wait is timed.
254     * @param nanos if timed, the maximum wait time.
255     * @return on success, the hole; on failure, FAIL.
256     */
257     Object waitForHole(boolean timed, long nanos) {
258     long lastTime = (timed)? System.nanoTime() : 0;
259 dl 1.18 Object h;
260     while ((h = get()) == null) {
261     // If interrupted or timed out, try to cancel by
262     // CASing FAIL as hole value.
263     if (Thread.currentThread().isInterrupted() ||
264 jsr166 1.19 (timed && nanos <= 0))
265 dl 1.18 compareAndSet(null, FAIL);
266     else if (!timed)
267 dl 1.16 LockSupport.park();
268     else {
269     LockSupport.parkNanos(nanos);
270     long now = System.nanoTime();
271     nanos -= now - lastTime;
272     lastTime = now;
273     }
274     }
275     return h;
276     }
277 tim 1.1 }
278    
279     /**
280     * Waits for another thread to arrive at this exchange point (unless
281     * it is {@link Thread#interrupt interrupted}),
282     * and then transfers the given object to it, receiving its object
283     * in return.
284 jsr166 1.17 *
285 tim 1.1 * <p>If another thread is already waiting at the exchange point then
286     * it is resumed for thread scheduling purposes and receives the object
287     * passed in by the current thread. The current thread returns immediately,
288     * receiving the object passed to the exchange by that other thread.
289 jsr166 1.17 *
290 jsr166 1.15 * <p>If no other thread is already waiting at the exchange then the
291 tim 1.1 * current thread is disabled for thread scheduling purposes and lies
292     * dormant until one of two things happens:
293     * <ul>
294     * <li>Some other thread enters the exchange; or
295     * <li>Some other thread {@link Thread#interrupt interrupts} the current
296     * thread.
297     * </ul>
298     * <p>If the current thread:
299     * <ul>
300 jsr166 1.15 * <li>has its interrupted status set on entry to this method; or
301 tim 1.1 * <li>is {@link Thread#interrupt interrupted} while waiting
302 jsr166 1.15 * for the exchange,
303 tim 1.1 * </ul>
304 jsr166 1.15 * then {@link InterruptedException} is thrown and the current thread's
305     * interrupted status is cleared.
306 tim 1.1 *
307     * @param x the object to exchange
308     * @return the object provided by the other thread.
309 jsr166 1.15 * @throws InterruptedException if current thread was interrupted
310 dl 1.3 * while waiting
311 jsr166 1.15 */
312 tim 1.1 public V exchange(V x) throws InterruptedException {
313 dl 1.2 try {
314 dl 1.16 return (V)doExchange(x, false, 0);
315 jsr166 1.15 } catch (TimeoutException cannotHappen) {
316 dl 1.2 throw new Error(cannotHappen);
317     }
318 tim 1.1 }
319    
320     /**
321     * Waits for another thread to arrive at this exchange point (unless
322     * it is {@link Thread#interrupt interrupted}, or the specified waiting
323     * time elapses),
324     * and then transfers the given object to it, receiving its object
325     * in return.
326     *
327     * <p>If another thread is already waiting at the exchange point then
328     * it is resumed for thread scheduling purposes and receives the object
329     * passed in by the current thread. The current thread returns immediately,
330     * receiving the object passed to the exchange by that other thread.
331     *
332 jsr166 1.15 * <p>If no other thread is already waiting at the exchange then the
333 tim 1.1 * current thread is disabled for thread scheduling purposes and lies
334     * dormant until one of three things happens:
335     * <ul>
336     * <li>Some other thread enters the exchange; or
337     * <li>Some other thread {@link Thread#interrupt interrupts} the current
338     * thread; or
339     * <li>The specified waiting time elapses.
340     * </ul>
341     * <p>If the current thread:
342     * <ul>
343 jsr166 1.15 * <li>has its interrupted status set on entry to this method; or
344 tim 1.1 * <li>is {@link Thread#interrupt interrupted} while waiting
345 jsr166 1.15 * for the exchange,
346 tim 1.1 * </ul>
347 jsr166 1.15 * then {@link InterruptedException} is thrown and the current thread's
348     * interrupted status is cleared.
349 tim 1.1 *
350     * <p>If the specified waiting time elapses then {@link TimeoutException}
351     * is thrown.
352 jsr166 1.15 * If the time is
353 tim 1.1 * less than or equal to zero, the method will not wait at all.
354     *
355     * @param x the object to exchange
356     * @param timeout the maximum time to wait
357 dl 1.2 * @param unit the time unit of the <tt>timeout</tt> argument.
358 tim 1.1 * @return the object provided by the other thread.
359 dl 1.3 * @throws InterruptedException if current thread was interrupted
360     * while waiting
361 tim 1.1 * @throws TimeoutException if the specified waiting time elapses before
362     * another thread enters the exchange.
363 jsr166 1.15 */
364     public V exchange(V x, long timeout, TimeUnit unit)
365 tim 1.1 throws InterruptedException, TimeoutException {
366 dl 1.16 return (V)doExchange(x, true, unit.toNanos(timeout));
367 tim 1.1 }
368     }