ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
Revision: 1.5
Committed: Wed Jul 9 23:23:17 2003 UTC (20 years, 10 months ago) by dl
Branch: MAIN
CVS Tags: JSR166_PRELIMINARY_TEST_RELEASE_2
Changes since 1.4: +18 -6 lines
Log Message:
Misc performance tunings

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9
10 /**
11 * An <tt>Exchanger</tt> provides a synchronization point at which two threads
12 * can exchange objects. Each thread presents some object on entry to
13 * the {@link #exchange exchange} method, and receives the object presented by
14 * the other thread on return.
15 *
16 * <p><b>Sample Usage:</b>
17 * Here are the highlights of a class that uses an <tt>Exchanger</tt> to
18 * swap buffers between threads so that the thread filling the
19 * buffer gets a freshly
20 * emptied one when it needs it, handing off the filled one to
21 * the thread emptying the buffer.
22 * <pre>
23 * class FillAndEmpty {
24 * Exchanger&lt;Buffer&gt; exchanger = new Exchanger();
25 * Buffer initialEmptyBuffer = ... a made-up type
26 * Buffer initialFullBuffer = ...
27 *
28 * class FillingLoop implements Runnable {
29 * public void run() {
30 * Buffer currentBuffer = initialEmptyBuffer;
31 * try {
32 * while (currentBuffer != null) {
33 * addToBuffer(currentBuffer);
34 * if (currentBuffer.full())
35 * currentBuffer = exchanger.exchange(currentBuffer);
36 * }
37 * }
38 * catch (InterruptedException ex) { ... handle ... }
39 * }
40 * }
41 *
42 * class EmptyingLoop implements Runnable {
43 * public void run() {
44 * Buffer currentBuffer = initialFullBuffer;
45 * try {
46 * while (currentBuffer != null) {
47 * takeFromBuffer(currentBuffer);
48 * if (currentBuffer.empty())
49 * currentBuffer = exchanger.exchange(currentBuffer);
50 * }
51 * }
52 * catch (InterruptedException ex) { ... handle ...}
53 * }
54 * }
55 *
56 * void start() {
57 * new Thread(new FillingLoop()).start();
58 * new Thread(new EmptyingLoop()).start();
59 * }
60 * }
61 * </pre>
62 *
63 * @fixme change example to use a bounded queue?
64 *
65 * @since 1.5
66 * @spec JSR-166
67 * @revised $Date: 2003/07/08 00:46:33 $
68 * @editor $Author: dl $
69 * @author Doug Lea
70 */
71 public class Exchanger<V> {
72 private final ReentrantLock lock = new ReentrantLock();
73 private final Condition taken = lock.newCondition();
74
75 /** Holder for the item being exchanged */
76 private V item;
77
78 /**
79 * Arrival count transitions from 0 to 1 to 2 then back to 0
80 * during an exchange.
81 */
82 private int arrivalCount;
83
84 /**
85 * Main exchange function, handling the different policy variants.
86 */
87 private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException {
88 lock.lock();
89 try {
90 V other;
91
92 // If arrival count already at two, we must wait for
93 // a previous pair to finish and reset the count;
94 while (arrivalCount == 2) {
95 if (!timed)
96 taken.await();
97 else if (nanos > 0)
98 nanos = taken.awaitNanos(nanos);
99 else
100 throw new TimeoutException();
101 }
102
103 int count = ++arrivalCount;
104
105 // If item is already waiting, replace it and signal other thread
106 if (count == 2) {
107 other = item;
108 item = x;
109 taken.signal();
110 return other;
111 }
112
113 // Otherwise, set item and wait for another thread to
114 // replace it and signal us.
115
116 item = x;
117 InterruptedException interrupted = null;
118 try {
119 while (arrivalCount != 2) {
120 if (!timed)
121 taken.await();
122 else if (nanos > 0)
123 nanos = taken.awaitNanos(nanos);
124 else
125 break; // timed out
126 }
127 }
128 catch (InterruptedException ie) {
129 interrupted = ie;
130 }
131
132 // Get and reset item and count after the wait.
133 // (We need to do this even if wait was aborted.)
134 other = item;
135 item = null;
136 count = arrivalCount;
137 arrivalCount = 0;
138 taken.signal();
139
140 // If the other thread replaced item, then we must
141 // continue even if cancelled.
142 if (count == 2) {
143 if (interrupted != null)
144 Thread.currentThread().interrupt();
145 return other;
146 }
147
148 // If no one is waiting for us, we can back out
149 if (interrupted != null)
150 throw interrupted;
151 else // must be timeout
152 throw new TimeoutException();
153 }
154 finally {
155 lock.unlock();
156 }
157 }
158
159 /**
160 * Create a new Exchanger
161 **/
162 public Exchanger() {
163 }
164
165 /**
166 * Waits for another thread to arrive at this exchange point (unless
167 * it is {@link Thread#interrupt interrupted}),
168 * and then transfers the given object to it, receiving its object
169 * in return.
170 * <p>If another thread is already waiting at the exchange point then
171 * it is resumed for thread scheduling purposes and receives the object
172 * passed in by the current thread. The current thread returns immediately,
173 * receiving the object passed to the exchange by that other thread.
174 * <p>If no other thread is already waiting at the exchange then the
175 * current thread is disabled for thread scheduling purposes and lies
176 * dormant until one of two things happens:
177 * <ul>
178 * <li>Some other thread enters the exchange; or
179 * <li>Some other thread {@link Thread#interrupt interrupts} the current
180 * thread.
181 * </ul>
182 * <p>If the current thread:
183 * <ul>
184 * <li>has its interrupted status set on entry to this method; or
185 * <li>is {@link Thread#interrupt interrupted} while waiting
186 * for the exchange,
187 * </ul>
188 * then {@link InterruptedException} is thrown and the current thread's
189 * interrupted status is cleared.
190 *
191 * @param x the object to exchange
192 * @return the object provided by the other thread.
193 * @throws InterruptedException if current thread was interrupted
194 * while waiting
195 **/
196 public V exchange(V x) throws InterruptedException {
197 try {
198 return doExchange(x, false, 0);
199 }
200 catch (TimeoutException cannotHappen) {
201 throw new Error(cannotHappen);
202 }
203 }
204
205 /**
206 * Waits for another thread to arrive at this exchange point (unless
207 * it is {@link Thread#interrupt interrupted}, or the specified waiting
208 * time elapses),
209 * and then transfers the given object to it, receiving its object
210 * in return.
211 *
212 * <p>If another thread is already waiting at the exchange point then
213 * it is resumed for thread scheduling purposes and receives the object
214 * passed in by the current thread. The current thread returns immediately,
215 * receiving the object passed to the exchange by that other thread.
216 *
217 * <p>If no other thread is already waiting at the exchange then the
218 * current thread is disabled for thread scheduling purposes and lies
219 * dormant until one of three things happens:
220 * <ul>
221 * <li>Some other thread enters the exchange; or
222 * <li>Some other thread {@link Thread#interrupt interrupts} the current
223 * thread; or
224 * <li>The specified waiting time elapses.
225 * </ul>
226 * <p>If the current thread:
227 * <ul>
228 * <li>has its interrupted status set on entry to this method; or
229 * <li>is {@link Thread#interrupt interrupted} while waiting
230 * for the exchange,
231 * </ul>
232 * then {@link InterruptedException} is thrown and the current thread's
233 * interrupted status is cleared.
234 *
235 * <p>If the specified waiting time elapses then {@link TimeoutException}
236 * is thrown.
237 * The given waiting time is a best-effort lower bound. If the time is
238 * less than or equal to zero, the method will not wait at all.
239 *
240 * @param x the object to exchange
241 * @param timeout the maximum time to wait
242 * @param unit the time unit of the <tt>timeout</tt> argument.
243 * @return the object provided by the other thread.
244 * @throws InterruptedException if current thread was interrupted
245 * while waiting
246 * @throws TimeoutException if the specified waiting time elapses before
247 * another thread enters the exchange.
248 **/
249 public V exchange(V x, long timeout, TimeUnit unit)
250 throws InterruptedException, TimeoutException {
251 return doExchange(x, true, unit.toNanos(timeout));
252 }
253
254 }
255
256