ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Exchanger.java
Revision: 1.4
Committed: Tue Jul 8 00:46:33 2003 UTC (20 years, 11 months ago) by dl
Branch: MAIN
Changes since 1.3: +2 -1 lines
Log Message:
Locks in subpackage; fairness params added

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.4 import java.util.concurrent.locks.*;
9 tim 1.1
10     /**
11     * An <tt>Exchanger</tt> provides a synchronization point at which two threads
12     * can exchange objects. Each thread presents some object on entry to
13     * the {@link #exchange exchange} method, and receives the object presented by
14     * the other thread on return.
15     *
16     * <p><b>Sample Usage:</b>
17     * Here are the highlights of a class that uses an <tt>Exchanger</tt> to
18     * swap buffers between threads so that the thread filling the
19     * buffer gets a freshly
20     * emptied one when it needs it, handing off the filled one to
21     * the thread emptying the buffer.
22     * <pre>
23     * class FillAndEmpty {
24     * Exchanger&lt;Buffer&gt; exchanger = new Exchanger();
25     * Buffer initialEmptyBuffer = ... a made-up type
26     * Buffer initialFullBuffer = ...
27     *
28     * class FillingLoop implements Runnable {
29     * public void run() {
30     * Buffer currentBuffer = initialEmptyBuffer;
31     * try {
32     * while (currentBuffer != null) {
33     * addToBuffer(currentBuffer);
34     * if (currentBuffer.full())
35     * currentBuffer = exchanger.exchange(currentBuffer);
36     * }
37     * }
38     * catch (InterruptedException ex) { ... handle ... }
39     * }
40     * }
41     *
42     * class EmptyingLoop implements Runnable {
43     * public void run() {
44     * Buffer currentBuffer = initialFullBuffer;
45     * try {
46     * while (currentBuffer != null) {
47     * takeFromBuffer(currentBuffer);
48     * if (currentBuffer.empty())
49     * currentBuffer = exchanger.exchange(currentBuffer);
50     * }
51     * }
52     * catch (InterruptedException ex) { ... handle ...}
53     * }
54     * }
55     *
56     * void start() {
57     * new Thread(new FillingLoop()).start();
58     * new Thread(new EmptyingLoop()).start();
59     * }
60     * }
61     * </pre>
62     *
63     * @fixme change example to use a bounded queue?
64     *
65     * @since 1.5
66     * @spec JSR-166
67 dl 1.4 * @revised $Date: 2003/06/24 14:34:47 $
68 dl 1.3 * @editor $Author: dl $
69     * @author Doug Lea
70 tim 1.1 */
71     public class Exchanger<V> {
72 dl 1.2 private final ReentrantLock lock = new ReentrantLock();
73     private final Condition taken = lock.newCondition();
74    
75     /** Holder for the item being exchanged */
76     private V item;
77    
78     /**
79     * Arrival count transitions from 0 to 1 to 2 then back to 0
80     * during an exchange.
81     */
82     private int arrivalCount;
83    
84 dl 1.3 /**
85     * Main exchange function, handling the different policy variants.
86     */
87 dl 1.2 private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException {
88     lock.lock();
89     try {
90     V other;
91     int count = ++arrivalCount;
92    
93     // If item is already waiting, replace it and signal other thread
94     if (count == 2) {
95     other = item;
96     item = x;
97     taken.signal();
98     return other;
99     }
100    
101     // Otherwise, set item and wait for another thread to
102     // replace it and signal us.
103    
104     item = x;
105     InterruptedException interrupted = null;
106     try {
107     while (arrivalCount != 2) {
108     if (!timed)
109     taken.await();
110     else if (nanos > 0)
111     nanos = taken.awaitNanos(nanos);
112     else
113     break; // timed out
114     }
115     }
116     catch (InterruptedException ie) {
117     interrupted = ie;
118     }
119    
120     // get and reset item and count after the wait.
121     other = item;
122     item = null;
123     count = arrivalCount;
124     arrivalCount = 0;
125    
126     // If the other thread replaced item, then we must
127     // continue even if cancelled.
128     if (count == 2) {
129     if (interrupted != null)
130     Thread.currentThread().interrupt();
131     return other;
132     }
133    
134     // Otherwise, no one is waiting for us, so we can just back out
135     if (interrupted != null) {
136     taken.signal(); // propagate to any other waiting thread
137     throw interrupted;
138     }
139     else // must be timeout
140     throw new TimeoutException();
141     }
142     finally {
143     lock.unlock();
144     }
145     }
146 tim 1.1
147     /**
148     * Create a new Exchanger
149     **/
150     public Exchanger() {
151     }
152    
153     /**
154     * Waits for another thread to arrive at this exchange point (unless
155     * it is {@link Thread#interrupt interrupted}),
156     * and then transfers the given object to it, receiving its object
157     * in return.
158     * <p>If another thread is already waiting at the exchange point then
159     * it is resumed for thread scheduling purposes and receives the object
160     * passed in by the current thread. The current thread returns immediately,
161     * receiving the object passed to the exchange by that other thread.
162     * <p>If no other thread is already waiting at the exchange then the
163     * current thread is disabled for thread scheduling purposes and lies
164     * dormant until one of two things happens:
165     * <ul>
166     * <li>Some other thread enters the exchange; or
167     * <li>Some other thread {@link Thread#interrupt interrupts} the current
168     * thread.
169     * </ul>
170     * <p>If the current thread:
171     * <ul>
172     * <li>has its interrupted status set on entry to this method; or
173     * <li>is {@link Thread#interrupt interrupted} while waiting
174     * for the exchange,
175     * </ul>
176     * then {@link InterruptedException} is thrown and the current thread's
177     * interrupted status is cleared.
178     *
179     * @param x the object to exchange
180     * @return the object provided by the other thread.
181 dl 1.3 * @throws InterruptedException if current thread was interrupted
182     * while waiting
183 tim 1.1 **/
184     public V exchange(V x) throws InterruptedException {
185 dl 1.2 try {
186     return doExchange(x, false, 0);
187     }
188     catch (TimeoutException cannotHappen) {
189     throw new Error(cannotHappen);
190     }
191 tim 1.1 }
192    
193     /**
194     * Waits for another thread to arrive at this exchange point (unless
195     * it is {@link Thread#interrupt interrupted}, or the specified waiting
196     * time elapses),
197     * and then transfers the given object to it, receiving its object
198     * in return.
199     *
200     * <p>If another thread is already waiting at the exchange point then
201     * it is resumed for thread scheduling purposes and receives the object
202     * passed in by the current thread. The current thread returns immediately,
203     * receiving the object passed to the exchange by that other thread.
204     *
205     * <p>If no other thread is already waiting at the exchange then the
206     * current thread is disabled for thread scheduling purposes and lies
207     * dormant until one of three things happens:
208     * <ul>
209     * <li>Some other thread enters the exchange; or
210     * <li>Some other thread {@link Thread#interrupt interrupts} the current
211     * thread; or
212     * <li>The specified waiting time elapses.
213     * </ul>
214     * <p>If the current thread:
215     * <ul>
216     * <li>has its interrupted status set on entry to this method; or
217     * <li>is {@link Thread#interrupt interrupted} while waiting
218     * for the exchange,
219     * </ul>
220     * then {@link InterruptedException} is thrown and the current thread's
221     * interrupted status is cleared.
222     *
223     * <p>If the specified waiting time elapses then {@link TimeoutException}
224     * is thrown.
225     * The given waiting time is a best-effort lower bound. If the time is
226     * less than or equal to zero, the method will not wait at all.
227     *
228     * @param x the object to exchange
229     * @param timeout the maximum time to wait
230 dl 1.2 * @param unit the time unit of the <tt>timeout</tt> argument.
231 tim 1.1 * @return the object provided by the other thread.
232 dl 1.3 * @throws InterruptedException if current thread was interrupted
233     * while waiting
234 tim 1.1 * @throws TimeoutException if the specified waiting time elapses before
235     * another thread enters the exchange.
236     **/
237 dl 1.2 public V exchange(V x, long timeout, TimeUnit unit)
238 tim 1.1 throws InterruptedException, TimeoutException {
239 dl 1.2 return doExchange(x, true, unit.toNanos(timeout));
240 tim 1.1 }
241    
242     }
243    
244