ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
Revision: 1.3
Committed: Tue Jun 24 14:34:47 2003 UTC (20 years, 11 months ago) by dl
Branch: MAIN
Changes since 1.2: +10 -4 lines
Log Message:
Added missing javadoc tags; minor reformatting

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8    
9     /**
10     * An <tt>Exchanger</tt> provides a synchronization point at which two threads
11     * can exchange objects. Each thread presents some object on entry to
12     * the {@link #exchange exchange} method, and receives the object presented by
13     * the other thread on return.
14     *
15     * <p><b>Sample Usage:</b>
16     * Here are the highlights of a class that uses an <tt>Exchanger</tt> to
17     * swap buffers between threads so that the thread filling the
18     * buffer gets a freshly
19     * emptied one when it needs it, handing off the filled one to
20     * the thread emptying the buffer.
21     * <pre>
22     * class FillAndEmpty {
23     * Exchanger&lt;Buffer&gt; exchanger = new Exchanger();
24     * Buffer initialEmptyBuffer = ... a made-up type
25     * Buffer initialFullBuffer = ...
26     *
27     * class FillingLoop implements Runnable {
28     * public void run() {
29     * Buffer currentBuffer = initialEmptyBuffer;
30     * try {
31     * while (currentBuffer != null) {
32     * addToBuffer(currentBuffer);
33     * if (currentBuffer.full())
34     * currentBuffer = exchanger.exchange(currentBuffer);
35     * }
36     * }
37     * catch (InterruptedException ex) { ... handle ... }
38     * }
39     * }
40     *
41     * class EmptyingLoop implements Runnable {
42     * public void run() {
43     * Buffer currentBuffer = initialFullBuffer;
44     * try {
45     * while (currentBuffer != null) {
46     * takeFromBuffer(currentBuffer);
47     * if (currentBuffer.empty())
48     * currentBuffer = exchanger.exchange(currentBuffer);
49     * }
50     * }
51     * catch (InterruptedException ex) { ... handle ...}
52     * }
53     * }
54     *
55     * void start() {
56     * new Thread(new FillingLoop()).start();
57     * new Thread(new EmptyingLoop()).start();
58     * }
59     * }
60     * </pre>
61     *
62     * @fixme change example to use a bounded queue?
63     *
64     * @since 1.5
65     * @spec JSR-166
66 dl 1.3 * @revised $Date: 2003/05/27 18:14:40 $
67     * @editor $Author: dl $
68     * @author Doug Lea
69 tim 1.1 */
70     public class Exchanger<V> {
71 dl 1.2 private final ReentrantLock lock = new ReentrantLock();
72     private final Condition taken = lock.newCondition();
73    
74     /** Holder for the item being exchanged */
75     private V item;
76    
77     /**
78     * Arrival count transitions from 0 to 1 to 2 then back to 0
79     * during an exchange.
80     */
81     private int arrivalCount;
82    
83 dl 1.3 /**
84     * Main exchange function, handling the different policy variants.
85     */
86 dl 1.2 private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException {
87     lock.lock();
88     try {
89     V other;
90     int count = ++arrivalCount;
91    
92     // If item is already waiting, replace it and signal other thread
93     if (count == 2) {
94     other = item;
95     item = x;
96     taken.signal();
97     return other;
98     }
99    
100     // Otherwise, set item and wait for another thread to
101     // replace it and signal us.
102    
103     item = x;
104     InterruptedException interrupted = null;
105     try {
106     while (arrivalCount != 2) {
107     if (!timed)
108     taken.await();
109     else if (nanos > 0)
110     nanos = taken.awaitNanos(nanos);
111     else
112     break; // timed out
113     }
114     }
115     catch (InterruptedException ie) {
116     interrupted = ie;
117     }
118    
119     // get and reset item and count after the wait.
120     other = item;
121     item = null;
122     count = arrivalCount;
123     arrivalCount = 0;
124    
125     // If the other thread replaced item, then we must
126     // continue even if cancelled.
127     if (count == 2) {
128     if (interrupted != null)
129     Thread.currentThread().interrupt();
130     return other;
131     }
132    
133     // Otherwise, no one is waiting for us, so we can just back out
134     if (interrupted != null) {
135     taken.signal(); // propagate to any other waiting thread
136     throw interrupted;
137     }
138     else // must be timeout
139     throw new TimeoutException();
140     }
141     finally {
142     lock.unlock();
143     }
144     }
145 tim 1.1
146     /**
147     * Create a new Exchanger
148     **/
149     public Exchanger() {
150     }
151    
152     /**
153     * Waits for another thread to arrive at this exchange point (unless
154     * it is {@link Thread#interrupt interrupted}),
155     * and then transfers the given object to it, receiving its object
156     * in return.
157     * <p>If another thread is already waiting at the exchange point then
158     * it is resumed for thread scheduling purposes and receives the object
159     * passed in by the current thread. The current thread returns immediately,
160     * receiving the object passed to the exchange by that other thread.
161     * <p>If no other thread is already waiting at the exchange then the
162     * current thread is disabled for thread scheduling purposes and lies
163     * dormant until one of two things happens:
164     * <ul>
165     * <li>Some other thread enters the exchange; or
166     * <li>Some other thread {@link Thread#interrupt interrupts} the current
167     * thread.
168     * </ul>
169     * <p>If the current thread:
170     * <ul>
171     * <li>has its interrupted status set on entry to this method; or
172     * <li>is {@link Thread#interrupt interrupted} while waiting
173     * for the exchange,
174     * </ul>
175     * then {@link InterruptedException} is thrown and the current thread's
176     * interrupted status is cleared.
177     *
178     * @param x the object to exchange
179     * @return the object provided by the other thread.
180 dl 1.3 * @throws InterruptedException if current thread was interrupted
181     * while waiting
182 tim 1.1 **/
183     public V exchange(V x) throws InterruptedException {
184 dl 1.2 try {
185     return doExchange(x, false, 0);
186     }
187     catch (TimeoutException cannotHappen) {
188     throw new Error(cannotHappen);
189     }
190 tim 1.1 }
191    
192     /**
193     * Waits for another thread to arrive at this exchange point (unless
194     * it is {@link Thread#interrupt interrupted}, or the specified waiting
195     * time elapses),
196     * and then transfers the given object to it, receiving its object
197     * in return.
198     *
199     * <p>If another thread is already waiting at the exchange point then
200     * it is resumed for thread scheduling purposes and receives the object
201     * passed in by the current thread. The current thread returns immediately,
202     * receiving the object passed to the exchange by that other thread.
203     *
204     * <p>If no other thread is already waiting at the exchange then the
205     * current thread is disabled for thread scheduling purposes and lies
206     * dormant until one of three things happens:
207     * <ul>
208     * <li>Some other thread enters the exchange; or
209     * <li>Some other thread {@link Thread#interrupt interrupts} the current
210     * thread; or
211     * <li>The specified waiting time elapses.
212     * </ul>
213     * <p>If the current thread:
214     * <ul>
215     * <li>has its interrupted status set on entry to this method; or
216     * <li>is {@link Thread#interrupt interrupted} while waiting
217     * for the exchange,
218     * </ul>
219     * then {@link InterruptedException} is thrown and the current thread's
220     * interrupted status is cleared.
221     *
222     * <p>If the specified waiting time elapses then {@link TimeoutException}
223     * is thrown.
224     * The given waiting time is a best-effort lower bound. If the time is
225     * less than or equal to zero, the method will not wait at all.
226     *
227     * @param x the object to exchange
228     * @param timeout the maximum time to wait
229 dl 1.2 * @param unit the time unit of the <tt>timeout</tt> argument.
230 tim 1.1 * @return the object provided by the other thread.
231 dl 1.3 * @throws InterruptedException if current thread was interrupted
232     * while waiting
233 tim 1.1 * @throws TimeoutException if the specified waiting time elapses before
234     * another thread enters the exchange.
235     **/
236 dl 1.2 public V exchange(V x, long timeout, TimeUnit unit)
237 tim 1.1 throws InterruptedException, TimeoutException {
238 dl 1.2 return doExchange(x, true, unit.toNanos(timeout));
239 tim 1.1 }
240    
241     }
242    
243