ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
Revision: 1.4
Committed: Tue Jul 8 00:46:33 2003 UTC (20 years, 11 months ago) by dl
Branch: MAIN
Changes since 1.3: +2 -1 lines
Log Message:
Locks in subpackage; fairness params added

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9
10 /**
11 * An <tt>Exchanger</tt> provides a synchronization point at which two threads
12 * can exchange objects. Each thread presents some object on entry to
13 * the {@link #exchange exchange} method, and receives the object presented by
14 * the other thread on return.
15 *
16 * <p><b>Sample Usage:</b>
17 * Here are the highlights of a class that uses an <tt>Exchanger</tt> to
18 * swap buffers between threads so that the thread filling the
19 * buffer gets a freshly
20 * emptied one when it needs it, handing off the filled one to
21 * the thread emptying the buffer.
22 * <pre>
23 * class FillAndEmpty {
24 * Exchanger&lt;Buffer&gt; exchanger = new Exchanger();
25 * Buffer initialEmptyBuffer = ... a made-up type
26 * Buffer initialFullBuffer = ...
27 *
28 * class FillingLoop implements Runnable {
29 * public void run() {
30 * Buffer currentBuffer = initialEmptyBuffer;
31 * try {
32 * while (currentBuffer != null) {
33 * addToBuffer(currentBuffer);
34 * if (currentBuffer.full())
35 * currentBuffer = exchanger.exchange(currentBuffer);
36 * }
37 * }
38 * catch (InterruptedException ex) { ... handle ... }
39 * }
40 * }
41 *
42 * class EmptyingLoop implements Runnable {
43 * public void run() {
44 * Buffer currentBuffer = initialFullBuffer;
45 * try {
46 * while (currentBuffer != null) {
47 * takeFromBuffer(currentBuffer);
48 * if (currentBuffer.empty())
49 * currentBuffer = exchanger.exchange(currentBuffer);
50 * }
51 * }
52 * catch (InterruptedException ex) { ... handle ...}
53 * }
54 * }
55 *
56 * void start() {
57 * new Thread(new FillingLoop()).start();
58 * new Thread(new EmptyingLoop()).start();
59 * }
60 * }
61 * </pre>
62 *
63 * @fixme change example to use a bounded queue?
64 *
65 * @since 1.5
66 * @spec JSR-166
67 * @revised $Date: 2003/06/24 14:34:47 $
68 * @editor $Author: dl $
69 * @author Doug Lea
70 */
71 public class Exchanger<V> {
72 private final ReentrantLock lock = new ReentrantLock();
73 private final Condition taken = lock.newCondition();
74
75 /** Holder for the item being exchanged */
76 private V item;
77
78 /**
79 * Arrival count transitions from 0 to 1 to 2 then back to 0
80 * during an exchange.
81 */
82 private int arrivalCount;
83
84 /**
85 * Main exchange function, handling the different policy variants.
86 */
87 private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException {
88 lock.lock();
89 try {
90 V other;
91 int count = ++arrivalCount;
92
93 // If item is already waiting, replace it and signal other thread
94 if (count == 2) {
95 other = item;
96 item = x;
97 taken.signal();
98 return other;
99 }
100
101 // Otherwise, set item and wait for another thread to
102 // replace it and signal us.
103
104 item = x;
105 InterruptedException interrupted = null;
106 try {
107 while (arrivalCount != 2) {
108 if (!timed)
109 taken.await();
110 else if (nanos > 0)
111 nanos = taken.awaitNanos(nanos);
112 else
113 break; // timed out
114 }
115 }
116 catch (InterruptedException ie) {
117 interrupted = ie;
118 }
119
120 // get and reset item and count after the wait.
121 other = item;
122 item = null;
123 count = arrivalCount;
124 arrivalCount = 0;
125
126 // If the other thread replaced item, then we must
127 // continue even if cancelled.
128 if (count == 2) {
129 if (interrupted != null)
130 Thread.currentThread().interrupt();
131 return other;
132 }
133
134 // Otherwise, no one is waiting for us, so we can just back out
135 if (interrupted != null) {
136 taken.signal(); // propagate to any other waiting thread
137 throw interrupted;
138 }
139 else // must be timeout
140 throw new TimeoutException();
141 }
142 finally {
143 lock.unlock();
144 }
145 }
146
147 /**
148 * Create a new Exchanger
149 **/
150 public Exchanger() {
151 }
152
153 /**
154 * Waits for another thread to arrive at this exchange point (unless
155 * it is {@link Thread#interrupt interrupted}),
156 * and then transfers the given object to it, receiving its object
157 * in return.
158 * <p>If another thread is already waiting at the exchange point then
159 * it is resumed for thread scheduling purposes and receives the object
160 * passed in by the current thread. The current thread returns immediately,
161 * receiving the object passed to the exchange by that other thread.
162 * <p>If no other thread is already waiting at the exchange then the
163 * current thread is disabled for thread scheduling purposes and lies
164 * dormant until one of two things happens:
165 * <ul>
166 * <li>Some other thread enters the exchange; or
167 * <li>Some other thread {@link Thread#interrupt interrupts} the current
168 * thread.
169 * </ul>
170 * <p>If the current thread:
171 * <ul>
172 * <li>has its interrupted status set on entry to this method; or
173 * <li>is {@link Thread#interrupt interrupted} while waiting
174 * for the exchange,
175 * </ul>
176 * then {@link InterruptedException} is thrown and the current thread's
177 * interrupted status is cleared.
178 *
179 * @param x the object to exchange
180 * @return the object provided by the other thread.
181 * @throws InterruptedException if current thread was interrupted
182 * while waiting
183 **/
184 public V exchange(V x) throws InterruptedException {
185 try {
186 return doExchange(x, false, 0);
187 }
188 catch (TimeoutException cannotHappen) {
189 throw new Error(cannotHappen);
190 }
191 }
192
193 /**
194 * Waits for another thread to arrive at this exchange point (unless
195 * it is {@link Thread#interrupt interrupted}, or the specified waiting
196 * time elapses),
197 * and then transfers the given object to it, receiving its object
198 * in return.
199 *
200 * <p>If another thread is already waiting at the exchange point then
201 * it is resumed for thread scheduling purposes and receives the object
202 * passed in by the current thread. The current thread returns immediately,
203 * receiving the object passed to the exchange by that other thread.
204 *
205 * <p>If no other thread is already waiting at the exchange then the
206 * current thread is disabled for thread scheduling purposes and lies
207 * dormant until one of three things happens:
208 * <ul>
209 * <li>Some other thread enters the exchange; or
210 * <li>Some other thread {@link Thread#interrupt interrupts} the current
211 * thread; or
212 * <li>The specified waiting time elapses.
213 * </ul>
214 * <p>If the current thread:
215 * <ul>
216 * <li>has its interrupted status set on entry to this method; or
217 * <li>is {@link Thread#interrupt interrupted} while waiting
218 * for the exchange,
219 * </ul>
220 * then {@link InterruptedException} is thrown and the current thread's
221 * interrupted status is cleared.
222 *
223 * <p>If the specified waiting time elapses then {@link TimeoutException}
224 * is thrown.
225 * The given waiting time is a best-effort lower bound. If the time is
226 * less than or equal to zero, the method will not wait at all.
227 *
228 * @param x the object to exchange
229 * @param timeout the maximum time to wait
230 * @param unit the time unit of the <tt>timeout</tt> argument.
231 * @return the object provided by the other thread.
232 * @throws InterruptedException if current thread was interrupted
233 * while waiting
234 * @throws TimeoutException if the specified waiting time elapses before
235 * another thread enters the exchange.
236 **/
237 public V exchange(V x, long timeout, TimeUnit unit)
238 throws InterruptedException, TimeoutException {
239 return doExchange(x, true, unit.toNanos(timeout));
240 }
241
242 }
243
244