ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
Revision: 1.3
Committed: Tue Jun 24 14:34:47 2003 UTC (20 years, 11 months ago) by dl
Branch: MAIN
Changes since 1.2: +10 -4 lines
Log Message:
Added missing javadoc tags; minor reformatting

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8
9 /**
10 * An <tt>Exchanger</tt> provides a synchronization point at which two threads
11 * can exchange objects. Each thread presents some object on entry to
12 * the {@link #exchange exchange} method, and receives the object presented by
13 * the other thread on return.
14 *
15 * <p><b>Sample Usage:</b>
16 * Here are the highlights of a class that uses an <tt>Exchanger</tt> to
17 * swap buffers between threads so that the thread filling the
18 * buffer gets a freshly
19 * emptied one when it needs it, handing off the filled one to
20 * the thread emptying the buffer.
21 * <pre>
22 * class FillAndEmpty {
23 * Exchanger&lt;Buffer&gt; exchanger = new Exchanger();
24 * Buffer initialEmptyBuffer = ... a made-up type
25 * Buffer initialFullBuffer = ...
26 *
27 * class FillingLoop implements Runnable {
28 * public void run() {
29 * Buffer currentBuffer = initialEmptyBuffer;
30 * try {
31 * while (currentBuffer != null) {
32 * addToBuffer(currentBuffer);
33 * if (currentBuffer.full())
34 * currentBuffer = exchanger.exchange(currentBuffer);
35 * }
36 * }
37 * catch (InterruptedException ex) { ... handle ... }
38 * }
39 * }
40 *
41 * class EmptyingLoop implements Runnable {
42 * public void run() {
43 * Buffer currentBuffer = initialFullBuffer;
44 * try {
45 * while (currentBuffer != null) {
46 * takeFromBuffer(currentBuffer);
47 * if (currentBuffer.empty())
48 * currentBuffer = exchanger.exchange(currentBuffer);
49 * }
50 * }
51 * catch (InterruptedException ex) { ... handle ...}
52 * }
53 * }
54 *
55 * void start() {
56 * new Thread(new FillingLoop()).start();
57 * new Thread(new EmptyingLoop()).start();
58 * }
59 * }
60 * </pre>
61 *
62 * @fixme change example to use a bounded queue?
63 *
64 * @since 1.5
65 * @spec JSR-166
66 * @revised $Date: 2003/05/27 18:14:40 $
67 * @editor $Author: dl $
68 * @author Doug Lea
69 */
70 public class Exchanger<V> {
71 private final ReentrantLock lock = new ReentrantLock();
72 private final Condition taken = lock.newCondition();
73
74 /** Holder for the item being exchanged */
75 private V item;
76
77 /**
78 * Arrival count transitions from 0 to 1 to 2 then back to 0
79 * during an exchange.
80 */
81 private int arrivalCount;
82
83 /**
84 * Main exchange function, handling the different policy variants.
85 */
86 private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException {
87 lock.lock();
88 try {
89 V other;
90 int count = ++arrivalCount;
91
92 // If item is already waiting, replace it and signal other thread
93 if (count == 2) {
94 other = item;
95 item = x;
96 taken.signal();
97 return other;
98 }
99
100 // Otherwise, set item and wait for another thread to
101 // replace it and signal us.
102
103 item = x;
104 InterruptedException interrupted = null;
105 try {
106 while (arrivalCount != 2) {
107 if (!timed)
108 taken.await();
109 else if (nanos > 0)
110 nanos = taken.awaitNanos(nanos);
111 else
112 break; // timed out
113 }
114 }
115 catch (InterruptedException ie) {
116 interrupted = ie;
117 }
118
119 // get and reset item and count after the wait.
120 other = item;
121 item = null;
122 count = arrivalCount;
123 arrivalCount = 0;
124
125 // If the other thread replaced item, then we must
126 // continue even if cancelled.
127 if (count == 2) {
128 if (interrupted != null)
129 Thread.currentThread().interrupt();
130 return other;
131 }
132
133 // Otherwise, no one is waiting for us, so we can just back out
134 if (interrupted != null) {
135 taken.signal(); // propagate to any other waiting thread
136 throw interrupted;
137 }
138 else // must be timeout
139 throw new TimeoutException();
140 }
141 finally {
142 lock.unlock();
143 }
144 }
145
146 /**
147 * Create a new Exchanger
148 **/
149 public Exchanger() {
150 }
151
152 /**
153 * Waits for another thread to arrive at this exchange point (unless
154 * it is {@link Thread#interrupt interrupted}),
155 * and then transfers the given object to it, receiving its object
156 * in return.
157 * <p>If another thread is already waiting at the exchange point then
158 * it is resumed for thread scheduling purposes and receives the object
159 * passed in by the current thread. The current thread returns immediately,
160 * receiving the object passed to the exchange by that other thread.
161 * <p>If no other thread is already waiting at the exchange then the
162 * current thread is disabled for thread scheduling purposes and lies
163 * dormant until one of two things happens:
164 * <ul>
165 * <li>Some other thread enters the exchange; or
166 * <li>Some other thread {@link Thread#interrupt interrupts} the current
167 * thread.
168 * </ul>
169 * <p>If the current thread:
170 * <ul>
171 * <li>has its interrupted status set on entry to this method; or
172 * <li>is {@link Thread#interrupt interrupted} while waiting
173 * for the exchange,
174 * </ul>
175 * then {@link InterruptedException} is thrown and the current thread's
176 * interrupted status is cleared.
177 *
178 * @param x the object to exchange
179 * @return the object provided by the other thread.
180 * @throws InterruptedException if current thread was interrupted
181 * while waiting
182 **/
183 public V exchange(V x) throws InterruptedException {
184 try {
185 return doExchange(x, false, 0);
186 }
187 catch (TimeoutException cannotHappen) {
188 throw new Error(cannotHappen);
189 }
190 }
191
192 /**
193 * Waits for another thread to arrive at this exchange point (unless
194 * it is {@link Thread#interrupt interrupted}, or the specified waiting
195 * time elapses),
196 * and then transfers the given object to it, receiving its object
197 * in return.
198 *
199 * <p>If another thread is already waiting at the exchange point then
200 * it is resumed for thread scheduling purposes and receives the object
201 * passed in by the current thread. The current thread returns immediately,
202 * receiving the object passed to the exchange by that other thread.
203 *
204 * <p>If no other thread is already waiting at the exchange then the
205 * current thread is disabled for thread scheduling purposes and lies
206 * dormant until one of three things happens:
207 * <ul>
208 * <li>Some other thread enters the exchange; or
209 * <li>Some other thread {@link Thread#interrupt interrupts} the current
210 * thread; or
211 * <li>The specified waiting time elapses.
212 * </ul>
213 * <p>If the current thread:
214 * <ul>
215 * <li>has its interrupted status set on entry to this method; or
216 * <li>is {@link Thread#interrupt interrupted} while waiting
217 * for the exchange,
218 * </ul>
219 * then {@link InterruptedException} is thrown and the current thread's
220 * interrupted status is cleared.
221 *
222 * <p>If the specified waiting time elapses then {@link TimeoutException}
223 * is thrown.
224 * The given waiting time is a best-effort lower bound. If the time is
225 * less than or equal to zero, the method will not wait at all.
226 *
227 * @param x the object to exchange
228 * @param timeout the maximum time to wait
229 * @param unit the time unit of the <tt>timeout</tt> argument.
230 * @return the object provided by the other thread.
231 * @throws InterruptedException if current thread was interrupted
232 * while waiting
233 * @throws TimeoutException if the specified waiting time elapses before
234 * another thread enters the exchange.
235 **/
236 public V exchange(V x, long timeout, TimeUnit unit)
237 throws InterruptedException, TimeoutException {
238 return doExchange(x, true, unit.toNanos(timeout));
239 }
240
241 }
242
243