ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
Revision: 1.15
Committed: Tue Apr 26 01:17:18 2005 UTC (19 years, 1 month ago) by jsr166
Branch: MAIN
Changes since 1.14: +28 -30 lines
Log Message:
doc fixes

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.14 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.4 import java.util.concurrent.locks.*;
9 tim 1.1
10     /**
11 dl 1.12 * A synchronization point at which two threads can exchange objects.
12     * Each thread presents some object on entry to the {@link #exchange
13     * exchange} method, and receives the object presented by the other
14     * thread on return.
15 tim 1.1 *
16     * <p><b>Sample Usage:</b>
17     * Here are the highlights of a class that uses an <tt>Exchanger</tt> to
18     * swap buffers between threads so that the thread filling the
19     * buffer gets a freshly
20     * emptied one when it needs it, handing off the filled one to
21     * the thread emptying the buffer.
22     * <pre>
23     * class FillAndEmpty {
24 dl 1.9 * Exchanger&lt;DataBuffer&gt; exchanger = new Exchanger();
25     * DataBuffer initialEmptyBuffer = ... a made-up type
26     * DataBuffer initialFullBuffer = ...
27 tim 1.1 *
28     * class FillingLoop implements Runnable {
29     * public void run() {
30 dl 1.9 * DataBuffer currentBuffer = initialEmptyBuffer;
31 tim 1.1 * try {
32     * while (currentBuffer != null) {
33     * addToBuffer(currentBuffer);
34     * if (currentBuffer.full())
35     * currentBuffer = exchanger.exchange(currentBuffer);
36     * }
37 tim 1.7 * } catch (InterruptedException ex) { ... handle ... }
38 tim 1.1 * }
39     * }
40     *
41     * class EmptyingLoop implements Runnable {
42     * public void run() {
43 dl 1.9 * DataBuffer currentBuffer = initialFullBuffer;
44 tim 1.1 * try {
45     * while (currentBuffer != null) {
46     * takeFromBuffer(currentBuffer);
47     * if (currentBuffer.empty())
48     * currentBuffer = exchanger.exchange(currentBuffer);
49     * }
50 tim 1.7 * } catch (InterruptedException ex) { ... handle ...}
51 tim 1.1 * }
52     * }
53     *
54     * void start() {
55     * new Thread(new FillingLoop()).start();
56     * new Thread(new EmptyingLoop()).start();
57     * }
58     * }
59     * </pre>
60     *
61     * @since 1.5
62 dl 1.3 * @author Doug Lea
63 dl 1.11 * @param <V> The type of objects that may be exchanged
64 tim 1.1 */
65     public class Exchanger<V> {
66 dl 1.2 private final ReentrantLock lock = new ReentrantLock();
67 dl 1.13 private final Condition taken = lock.newCondition();
68 dl 1.2
69     /** Holder for the item being exchanged */
70     private V item;
71 jsr166 1.15
72 dl 1.2 /**
73     * Arrival count transitions from 0 to 1 to 2 then back to 0
74     * during an exchange.
75     */
76     private int arrivalCount;
77    
78 dl 1.3 /**
79     * Main exchange function, handling the different policy variants.
80     */
81 dl 1.2 private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException {
82     lock.lock();
83     try {
84     V other;
85 dl 1.5
86     // If arrival count already at two, we must wait for
87     // a previous pair to finish and reset the count;
88     while (arrivalCount == 2) {
89     if (!timed)
90     taken.await();
91 jsr166 1.15 else if (nanos > 0)
92 dl 1.5 nanos = taken.awaitNanos(nanos);
93 jsr166 1.15 else
94 dl 1.5 throw new TimeoutException();
95     }
96    
97 dl 1.2 int count = ++arrivalCount;
98    
99     // If item is already waiting, replace it and signal other thread
100 jsr166 1.15 if (count == 2) {
101 dl 1.2 other = item;
102     item = x;
103     taken.signal();
104     return other;
105     }
106    
107     // Otherwise, set item and wait for another thread to
108     // replace it and signal us.
109    
110     item = x;
111     InterruptedException interrupted = null;
112 jsr166 1.15 try {
113 dl 1.2 while (arrivalCount != 2) {
114     if (!timed)
115     taken.await();
116 jsr166 1.15 else if (nanos > 0)
117 dl 1.2 nanos = taken.awaitNanos(nanos);
118 jsr166 1.15 else
119 dl 1.2 break; // timed out
120     }
121 tim 1.7 } catch (InterruptedException ie) {
122 dl 1.2 interrupted = ie;
123     }
124    
125 dl 1.5 // Get and reset item and count after the wait.
126     // (We need to do this even if wait was aborted.)
127 dl 1.2 other = item;
128     item = null;
129     count = arrivalCount;
130 jsr166 1.15 arrivalCount = 0;
131 dl 1.5 taken.signal();
132 jsr166 1.15
133 dl 1.2 // If the other thread replaced item, then we must
134     // continue even if cancelled.
135     if (count == 2) {
136     if (interrupted != null)
137     Thread.currentThread().interrupt();
138     return other;
139     }
140    
141 dl 1.5 // If no one is waiting for us, we can back out
142 jsr166 1.15 if (interrupted != null)
143 dl 1.2 throw interrupted;
144     else // must be timeout
145     throw new TimeoutException();
146 tim 1.7 } finally {
147 dl 1.2 lock.unlock();
148     }
149     }
150 tim 1.1
151     /**
152 jsr166 1.15 * Creates a new Exchanger.
153     */
154 tim 1.1 public Exchanger() {
155     }
156    
157     /**
158     * Waits for another thread to arrive at this exchange point (unless
159     * it is {@link Thread#interrupt interrupted}),
160     * and then transfers the given object to it, receiving its object
161     * in return.
162     * <p>If another thread is already waiting at the exchange point then
163     * it is resumed for thread scheduling purposes and receives the object
164     * passed in by the current thread. The current thread returns immediately,
165     * receiving the object passed to the exchange by that other thread.
166 jsr166 1.15 * <p>If no other thread is already waiting at the exchange then the
167 tim 1.1 * current thread is disabled for thread scheduling purposes and lies
168     * dormant until one of two things happens:
169     * <ul>
170     * <li>Some other thread enters the exchange; or
171     * <li>Some other thread {@link Thread#interrupt interrupts} the current
172     * thread.
173     * </ul>
174     * <p>If the current thread:
175     * <ul>
176 jsr166 1.15 * <li>has its interrupted status set on entry to this method; or
177 tim 1.1 * <li>is {@link Thread#interrupt interrupted} while waiting
178 jsr166 1.15 * for the exchange,
179 tim 1.1 * </ul>
180 jsr166 1.15 * then {@link InterruptedException} is thrown and the current thread's
181     * interrupted status is cleared.
182 tim 1.1 *
183     * @param x the object to exchange
184     * @return the object provided by the other thread.
185 jsr166 1.15 * @throws InterruptedException if current thread was interrupted
186 dl 1.3 * while waiting
187 jsr166 1.15 */
188 tim 1.1 public V exchange(V x) throws InterruptedException {
189 dl 1.2 try {
190     return doExchange(x, false, 0);
191 jsr166 1.15 } catch (TimeoutException cannotHappen) {
192 dl 1.2 throw new Error(cannotHappen);
193     }
194 tim 1.1 }
195    
196     /**
197     * Waits for another thread to arrive at this exchange point (unless
198     * it is {@link Thread#interrupt interrupted}, or the specified waiting
199     * time elapses),
200     * and then transfers the given object to it, receiving its object
201     * in return.
202     *
203     * <p>If another thread is already waiting at the exchange point then
204     * it is resumed for thread scheduling purposes and receives the object
205     * passed in by the current thread. The current thread returns immediately,
206     * receiving the object passed to the exchange by that other thread.
207     *
208 jsr166 1.15 * <p>If no other thread is already waiting at the exchange then the
209 tim 1.1 * current thread is disabled for thread scheduling purposes and lies
210     * dormant until one of three things happens:
211     * <ul>
212     * <li>Some other thread enters the exchange; or
213     * <li>Some other thread {@link Thread#interrupt interrupts} the current
214     * thread; or
215     * <li>The specified waiting time elapses.
216     * </ul>
217     * <p>If the current thread:
218     * <ul>
219 jsr166 1.15 * <li>has its interrupted status set on entry to this method; or
220 tim 1.1 * <li>is {@link Thread#interrupt interrupted} while waiting
221 jsr166 1.15 * for the exchange,
222 tim 1.1 * </ul>
223 jsr166 1.15 * then {@link InterruptedException} is thrown and the current thread's
224     * interrupted status is cleared.
225 tim 1.1 *
226     * <p>If the specified waiting time elapses then {@link TimeoutException}
227     * is thrown.
228 jsr166 1.15 * If the time is
229 tim 1.1 * less than or equal to zero, the method will not wait at all.
230     *
231     * @param x the object to exchange
232     * @param timeout the maximum time to wait
233 dl 1.2 * @param unit the time unit of the <tt>timeout</tt> argument.
234 tim 1.1 * @return the object provided by the other thread.
235 dl 1.3 * @throws InterruptedException if current thread was interrupted
236     * while waiting
237 tim 1.1 * @throws TimeoutException if the specified waiting time elapses before
238     * another thread enters the exchange.
239 jsr166 1.15 */
240     public V exchange(V x, long timeout, TimeUnit unit)
241 tim 1.1 throws InterruptedException, TimeoutException {
242 dl 1.2 return doExchange(x, true, unit.toNanos(timeout));
243 tim 1.1 }
244    
245     }