ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
Revision: 1.10
Committed: Sat Oct 11 15:37:31 2003 UTC (20 years, 8 months ago) by dl
Branch: MAIN
Changes since 1.9: +1 -1 lines
Log Message:
Redeclare some Conditions as ReentrantLock.ConditionObjects

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.4 import java.util.concurrent.locks.*;
9 tim 1.1
10     /**
11     * An <tt>Exchanger</tt> provides a synchronization point at which two threads
12     * can exchange objects. Each thread presents some object on entry to
13     * the {@link #exchange exchange} method, and receives the object presented by
14     * the other thread on return.
15     *
16     * <p><b>Sample Usage:</b>
17     * Here are the highlights of a class that uses an <tt>Exchanger</tt> to
18     * swap buffers between threads so that the thread filling the
19     * buffer gets a freshly
20     * emptied one when it needs it, handing off the filled one to
21     * the thread emptying the buffer.
22     * <pre>
23     * class FillAndEmpty {
24 dl 1.9 * Exchanger&lt;DataBuffer&gt; exchanger = new Exchanger();
25     * DataBuffer initialEmptyBuffer = ... a made-up type
26     * DataBuffer initialFullBuffer = ...
27 tim 1.1 *
28     * class FillingLoop implements Runnable {
29     * public void run() {
30 dl 1.9 * DataBuffer currentBuffer = initialEmptyBuffer;
31 tim 1.1 * try {
32     * while (currentBuffer != null) {
33     * addToBuffer(currentBuffer);
34     * if (currentBuffer.full())
35     * currentBuffer = exchanger.exchange(currentBuffer);
36     * }
37 tim 1.7 * } catch (InterruptedException ex) { ... handle ... }
38 tim 1.1 * }
39     * }
40     *
41     * class EmptyingLoop implements Runnable {
42     * public void run() {
43 dl 1.9 * DataBuffer currentBuffer = initialFullBuffer;
44 tim 1.1 * try {
45     * while (currentBuffer != null) {
46     * takeFromBuffer(currentBuffer);
47     * if (currentBuffer.empty())
48     * currentBuffer = exchanger.exchange(currentBuffer);
49     * }
50 tim 1.7 * } catch (InterruptedException ex) { ... handle ...}
51 tim 1.1 * }
52     * }
53     *
54     * void start() {
55     * new Thread(new FillingLoop()).start();
56     * new Thread(new EmptyingLoop()).start();
57     * }
58     * }
59     * </pre>
60     *
61     * @since 1.5
62 dl 1.3 * @author Doug Lea
63 tim 1.1 */
64     public class Exchanger<V> {
65 dl 1.2 private final ReentrantLock lock = new ReentrantLock();
66 dl 1.10 private final ReentrantLock.ConditionObject taken = lock.newCondition();
67 dl 1.2
68     /** Holder for the item being exchanged */
69     private V item;
70    
71     /**
72     * Arrival count transitions from 0 to 1 to 2 then back to 0
73     * during an exchange.
74     */
75     private int arrivalCount;
76    
77 dl 1.3 /**
78     * Main exchange function, handling the different policy variants.
79     */
80 dl 1.2 private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException {
81     lock.lock();
82     try {
83     V other;
84 dl 1.5
85     // If arrival count already at two, we must wait for
86     // a previous pair to finish and reset the count;
87     while (arrivalCount == 2) {
88     if (!timed)
89     taken.await();
90     else if (nanos > 0)
91     nanos = taken.awaitNanos(nanos);
92     else
93     throw new TimeoutException();
94     }
95    
96 dl 1.2 int count = ++arrivalCount;
97    
98     // If item is already waiting, replace it and signal other thread
99     if (count == 2) {
100     other = item;
101     item = x;
102     taken.signal();
103     return other;
104     }
105    
106     // Otherwise, set item and wait for another thread to
107     // replace it and signal us.
108    
109     item = x;
110     InterruptedException interrupted = null;
111     try {
112     while (arrivalCount != 2) {
113     if (!timed)
114     taken.await();
115     else if (nanos > 0)
116     nanos = taken.awaitNanos(nanos);
117     else
118     break; // timed out
119     }
120 tim 1.7 } catch (InterruptedException ie) {
121 dl 1.2 interrupted = ie;
122     }
123    
124 dl 1.5 // Get and reset item and count after the wait.
125     // (We need to do this even if wait was aborted.)
126 dl 1.2 other = item;
127     item = null;
128     count = arrivalCount;
129     arrivalCount = 0;
130 dl 1.5 taken.signal();
131 dl 1.2
132     // If the other thread replaced item, then we must
133     // continue even if cancelled.
134     if (count == 2) {
135     if (interrupted != null)
136     Thread.currentThread().interrupt();
137     return other;
138     }
139    
140 dl 1.5 // If no one is waiting for us, we can back out
141     if (interrupted != null)
142 dl 1.2 throw interrupted;
143     else // must be timeout
144     throw new TimeoutException();
145 tim 1.7 } finally {
146 dl 1.2 lock.unlock();
147     }
148     }
149 tim 1.1
150     /**
151 tim 1.6 * Create a new Exchanger.
152 tim 1.1 **/
153     public Exchanger() {
154     }
155    
156     /**
157     * Waits for another thread to arrive at this exchange point (unless
158     * it is {@link Thread#interrupt interrupted}),
159     * and then transfers the given object to it, receiving its object
160     * in return.
161     * <p>If another thread is already waiting at the exchange point then
162     * it is resumed for thread scheduling purposes and receives the object
163     * passed in by the current thread. The current thread returns immediately,
164     * receiving the object passed to the exchange by that other thread.
165     * <p>If no other thread is already waiting at the exchange then the
166     * current thread is disabled for thread scheduling purposes and lies
167     * dormant until one of two things happens:
168     * <ul>
169     * <li>Some other thread enters the exchange; or
170     * <li>Some other thread {@link Thread#interrupt interrupts} the current
171     * thread.
172     * </ul>
173     * <p>If the current thread:
174     * <ul>
175     * <li>has its interrupted status set on entry to this method; or
176     * <li>is {@link Thread#interrupt interrupted} while waiting
177     * for the exchange,
178     * </ul>
179     * then {@link InterruptedException} is thrown and the current thread's
180     * interrupted status is cleared.
181     *
182     * @param x the object to exchange
183     * @return the object provided by the other thread.
184 dl 1.3 * @throws InterruptedException if current thread was interrupted
185     * while waiting
186 tim 1.1 **/
187     public V exchange(V x) throws InterruptedException {
188 dl 1.2 try {
189     return doExchange(x, false, 0);
190 tim 1.7 } catch (TimeoutException cannotHappen) {
191 dl 1.2 throw new Error(cannotHappen);
192     }
193 tim 1.1 }
194    
195     /**
196     * Waits for another thread to arrive at this exchange point (unless
197     * it is {@link Thread#interrupt interrupted}, or the specified waiting
198     * time elapses),
199     * and then transfers the given object to it, receiving its object
200     * in return.
201     *
202     * <p>If another thread is already waiting at the exchange point then
203     * it is resumed for thread scheduling purposes and receives the object
204     * passed in by the current thread. The current thread returns immediately,
205     * receiving the object passed to the exchange by that other thread.
206     *
207     * <p>If no other thread is already waiting at the exchange then the
208     * current thread is disabled for thread scheduling purposes and lies
209     * dormant until one of three things happens:
210     * <ul>
211     * <li>Some other thread enters the exchange; or
212     * <li>Some other thread {@link Thread#interrupt interrupts} the current
213     * thread; or
214     * <li>The specified waiting time elapses.
215     * </ul>
216     * <p>If the current thread:
217     * <ul>
218     * <li>has its interrupted status set on entry to this method; or
219     * <li>is {@link Thread#interrupt interrupted} while waiting
220     * for the exchange,
221     * </ul>
222     * then {@link InterruptedException} is thrown and the current thread's
223     * interrupted status is cleared.
224     *
225     * <p>If the specified waiting time elapses then {@link TimeoutException}
226     * is thrown.
227 dholmes 1.8 * If the time is
228 tim 1.1 * less than or equal to zero, the method will not wait at all.
229     *
230     * @param x the object to exchange
231     * @param timeout the maximum time to wait
232 dl 1.2 * @param unit the time unit of the <tt>timeout</tt> argument.
233 tim 1.1 * @return the object provided by the other thread.
234 dl 1.3 * @throws InterruptedException if current thread was interrupted
235     * while waiting
236 tim 1.1 * @throws TimeoutException if the specified waiting time elapses before
237     * another thread enters the exchange.
238     **/
239 dl 1.2 public V exchange(V x, long timeout, TimeUnit unit)
240 tim 1.1 throws InterruptedException, TimeoutException {
241 dl 1.2 return doExchange(x, true, unit.toNanos(timeout));
242 tim 1.1 }
243    
244     }
245    
246