ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CyclicBarrier.java
Revision: 1.2
Committed: Tue May 27 18:14:39 2003 UTC (21 years ago) by dl
Branch: MAIN
CVS Tags: JSR166_PRERELEASE_0_1
Changes since 1.1: +213 -17 lines
Log Message:
re-check-in initial implementations

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8    
9     /**
10     * A <tt>CyclicBarrier</tt> allows a set threads to all wait for each
11     * other to reach a common barrier point. They are useful in programs
12     * involving a fixed sized party of threads that must occasionally
13     * wait for each other. The barrier is <em>cyclic</em> because it can
14     * be re-used after the waiting threads are released.
15     *
16     * <p>A <tt>CyclicBarrier</tt> supports an optional {@link Runnable} command
17     * that is run once per barrier point, after the last thread in the party
18     * arrives, but before any threads are released.
19     * This <em>barrier action</em> is useful
20     * for updating shared-state before any of the parties continue.
21     *
22     * <p><b>Sample usage:</b> Here is a code sketch of
23     * using a barrier in a parallel decomposition design:
24     * <pre>
25     * class Solver {
26     * final int N;
27     * final float[][] data;
28     * final CyclicBarrier barrier;
29     *
30     * class Worker implements Runnable {
31     * int myRow;
32     * Worker(int row) { myRow = row; }
33     * public void run() {
34     * while (!done()) {
35     * processRow(myRow);
36     *
37     * try {
38     * barrier.await();
39     * }
40     * catch (InterruptedException ex) { return; }
41     * catch (BrokenBarrierException ex) { return; }
42     * }
43     * }
44     * }
45     *
46     * public Solver(float[][] matrix) {
47     * data = matrix;
48     * N = matrix.length;
49     * barrier = new CyclicBarrier(N,
50     * new Runnable() {
51     * public void run() {
52     * mergeRows(...);
53     * }
54     * });
55     * for (int i = 0; i < N; ++i)
56     * new Thread(new Worker(i)).start();
57     *
58     * waitUntilDone();
59     * }
60     * }
61     * </pre>
62     * Here, each worker thread processes a row of the matrix then waits at the
63     * barrier until all rows have been processed. When all rows are processed
64     * the supplied {@link Runnable} barrier action is executed and merges the
65     * rows. If the merger
66     * determines that a solution has been found then <tt>done()</tt> will return
67     * <tt>true</tt> and each worker will terminate.
68     *
69     * <p>If the barrier action does not rely on the parties being suspended when
70     * it is executed, then any of the threads in the party could execute that
71     * action when it is released. To facilitate this, each invocation of
72     * {@link #await} returns the arrival index of that thread at the barrier.
73     * You can then choose which thread should execute the barrier action, for
74     * example:
75     * <pre> if (barrier.await() == 0) {
76     * // log the completion of this iteration
77     * }</pre>
78     *
79 dl 1.2 * <p>The <tt>CyclicBarrier</tt> uses an all-or-none breakage model
80     * for failed synchronization attempts: If a thread leaves a barrier
81     * point prematurely because of interruption or timeout, all others
82     * will also leave abnormally (via {@link BrokenBarrierException}),
83     * until the barrier is {@link #reset}. This is usually the simplest
84     * and best strategy for sharing knowledge about failures among
85     * cooperating threads in the most common usage contexts of barriers.
86 tim 1.1 *
87     * <h3>Implementation Considerations</h3>
88     * <p>This implementation has the property that interruptions among newly
89     * arriving threads can cause as-yet-unresumed threads from a previous
90     * barrier cycle to return out as broken. This transmits breakage as
91     * early as possible, but with the possible byproduct that only some
92     * threads returning out of a barrier will realize that it is newly
93     * broken. (Others will not realize this until a future cycle.)
94     *
95     *
96     *
97     * @since 1.5
98     * @spec JSR-166
99 dl 1.2 * @revised $Date: 2003/01/28 06:56:53 $
100     * @editor $Author: dholmes $
101 tim 1.1 *
102     * @fixme Is the above property actually true in this implementation?
103     * @fixme Should we have a timeout version of await()?
104     */
105     public class CyclicBarrier {
106 dl 1.2 private final ReentrantLock lock = new ReentrantLock();
107     private final Condition trip = lock.newCondition();
108     private final int parties;
109     private Runnable barrierCommand;
110    
111     /**
112     * The generation number. Incremented mod Integer.MAX_VALUE every
113     * time barrier tripped. Starts at 1 to simplify handling of
114     * breakage indicator
115     */
116     private int generation = 1;
117    
118     /**
119     * Breakage indicator: last generation of breakage, propagated
120     * across barrier generations until reset.
121     */
122     private int broken = 0;
123    
124     /**
125     * Number of parties still waiting. Counts down from parties to 0
126     * on each cycle.
127     */
128     private int count;
129    
130     /**
131     * Update state on barrier trip.
132     */
133     private void nextGeneration() {
134     count = parties;
135     int g = generation;
136     // avoid generation == 0
137     if (++generation < 0) generation = 1;
138     // propagate breakage
139     if (broken == g) broken = generation;
140     }
141    
142     private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
143     lock.lock();
144     try {
145     int index = --count;
146     int g = generation;
147    
148     if (broken == g)
149     throw new BrokenBarrierException();
150    
151     if (Thread.interrupted()) {
152     broken = g;
153     trip.signalAll();
154     throw new InterruptedException();
155     }
156    
157     if (index == 0) { // tripped
158     nextGeneration();
159     trip.signalAll();
160     try {
161     if (barrierCommand != null)
162     barrierCommand.run();
163     return 0;
164     }
165     catch (RuntimeException ex) {
166     broken = generation; // next generation is broken
167     throw ex;
168     }
169     }
170    
171     while (generation == g) {
172     try {
173     if (!timed)
174     trip.await();
175     else if (nanos > 0)
176     nanos = trip.awaitNanos(nanos);
177     }
178     catch (InterruptedException ex) {
179     // Only claim that broken if interrupted before reset
180     if (generation == g) {
181     broken = g;
182     trip.signalAll();
183     throw ex;
184     }
185     else {
186     Thread.currentThread().interrupt(); // propagate
187     break;
188     }
189     }
190    
191     if (timed && nanos <= 0) {
192     broken = g;
193     trip.signalAll();
194     throw new TimeoutException();
195     }
196    
197     if (broken == generation)
198     throw new BrokenBarrierException();
199    
200     }
201     return index;
202    
203     }
204     finally {
205     lock.unlock();
206     }
207     }
208 tim 1.1
209     /**
210     * Create a new <tt>CyclicBarrier</tt> that will trip when the
211     * given number of parties (threads) are waiting upon it, and which
212     * will execute the given barrier action when the barrier is tripped.
213     *
214     * @param parties the number of threads that must invoke {@link #await}
215     * before the barrier is tripped.
216     * @param barrierAction the command to execute when the barrier is
217     * tripped.
218     *
219     * @throws IllegalArgumentException if <tt>parties</tt> is less than 1.
220     */
221     public CyclicBarrier(int parties, Runnable barrierAction) {
222 dl 1.2 if (parties <= 0) throw new IllegalArgumentException();
223     this.parties = parties;
224     this.count = parties;
225     this.barrierCommand = barrierAction;
226 tim 1.1 }
227    
228     /**
229     * Create a new <tt>CyclicBarrier</tt> that will trip when the
230     * given number of parties (threads) are waiting upon it.
231     *
232     * <p>This is equivalent to <tt>CyclicBarrier(parties, null)</tt>.
233     *
234     * @param parties the number of threads that must invoke {@link #await}
235     * before the barrier is tripped.
236     *
237     * @throws IllegalArgumentException if <tt>parties</tt> is less than 1.
238     */
239     public CyclicBarrier(int parties) {
240 dl 1.2 this(parties, null);
241 tim 1.1 }
242    
243     /**
244     * Return the number of parties required to trip this barrier.
245     * @return the number of parties required to trip this barrier.
246     **/
247     public int getParties() {
248 dl 1.2 return parties;
249 tim 1.1 }
250    
251     /**
252     * Wait until all {@link #getParties parties} have invoked <tt>await</tt>
253     * on this barrier.
254     *
255     * <p>If the current thread is not the last to arrive then it is
256     * disabled for thread scheduling purposes and lies dormant until
257 dl 1.2 * one of following things happens:
258 tim 1.1 * <ul>
259     * <li>The last thread arrives; or
260     * <li>Some other thread {@link Thread#interrupt interrupts} the current
261     * thread; or
262     * <li>Some other thread {@link Thread#interrupt interrupts} one of the
263     * other waiting threads; or
264 dl 1.2 * <li>Some other thread times out while waiting for barrier; or
265 tim 1.1 * <li>Some other thread invokes {@link #reset} on this barrier.
266     * </ul>
267     * <p>If the current thread:
268     * <ul>
269     * <li>has its interrupted status set on entry to this method; or
270     * <li>is {@link Thread#interrupt interrupted} while waiting
271     * </ul>
272     * then {@link InterruptedException} is thrown and the current thread's
273     * interrupted status is cleared.
274     *
275     * <p>If the barrier is {@link #reset} while any thread is waiting, or if
276     * the barrier {@link #isBroken is broken} when <tt>await</tt> is invoked
277     * then {@link BrokenBarrierException} is thrown.
278     *
279     * <p>If any thread is {@link Thread#interrupt interrupted} while waiting,
280     * then all other waiting threads will throw
281     * {@link BrokenBarrierException} and the barrier is placed in the broken
282     * state.
283     *
284     * <p>If the current thread is the last thread to arrive, and a
285     * non-null barrier action was supplied in the constructor, then the
286     * current thread runs the action before allowing the other threads to
287     * continue.
288     * If an exception occurs during the barrier action then that exception
289     * will be propagated in the current thread.
290     *
291     * @return the arrival index of the current thread, where index
292     * <tt>{@link #getParties()} - 1</tt> indicates the first to arrive and
293     * zero indicates the last to arrive.
294     *
295     * @throws InterruptedException if the current thread was interrupted
296     * while waiting
297     * @throws BrokenBarrierException if <em>another</em> thread was
298     * interrupted while the current thread was waiting, or the barrier was
299     * reset, or the barrier was broken when <tt>await</tt> was called.
300     */
301     public int await() throws InterruptedException, BrokenBarrierException {
302 dl 1.2 try {
303     return dowait(false, 0);
304     }
305     catch (TimeoutException toe) {
306     throw new Error(toe); // cannot happen;
307     }
308     }
309    
310     /**
311     * Wait until all {@link #getParties parties} have invoked <tt>await</tt>
312     * on this barrier.
313     *
314     * <p>If the current thread is not the last to arrive then it is
315     * disabled for thread scheduling purposes and lies dormant until
316     * one of the following things happens:
317     * <ul>
318     * <li>The last thread arrives; or
319     * <li>The speceified timeout elapses; or
320     * <li>Some other thread {@link Thread#interrupt interrupts} the current
321     * thread; or
322     * <li>Some other thread {@link Thread#interrupt interrupts} one of the
323     * other waiting threads; or
324     * <li>Some other thread times out while waiting for barrier; or
325     * <li>Some other thread invokes {@link #reset} on this barrier.
326     * </ul>
327     * <p>If the current thread:
328     * <ul>
329     * <li>has its interrupted status set on entry to this method; or
330     * <li>is {@link Thread#interrupt interrupted} while waiting
331     * </ul>
332     * then {@link InterruptedException} is thrown and the current thread's
333     * interrupted status is cleared.
334     *
335     * <p>If the barrier is {@link #reset} while any thread is waiting, or if
336     * the barrier {@link #isBroken is broken} when <tt>await</tt> is invoked
337     * then {@link BrokenBarrierException} is thrown.
338     *
339     * <p>If any thread is {@link Thread#interrupt interrupted} while waiting,
340     * then all other waiting threads will throw
341     * {@link BrokenBarrierException} and the barrier is placed in the broken
342     * state.
343     *
344     * <p>If the current thread is the last thread to arrive, and a
345     * non-null barrier action was supplied in the constructor, then the
346     * current thread runs the action before allowing the other threads to
347     * continue.
348     * If an exception occurs during the barrier action then that exception
349     * will be propagated in the current thread.
350     *
351     * @return the arrival index of the current thread, where index
352     * <tt>{@link #getParties()} - 1</tt> indicates the first to arrive and
353     * zero indicates the last to arrive.
354     *
355     * @throws InterruptedException if the current thread was interrupted
356     * while waiting
357     * @throws TimeoutException if the specified timeout elapses.
358     * @throws BrokenBarrierException if <em>another</em> thread was
359     * interrupted while the current thread was waiting, or the barrier was
360     * reset, or the barrier was broken when <tt>await</tt> was called.
361     */
362     public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
363     return dowait(true, unit.toNanos(timeout));
364 tim 1.1 }
365    
366     /**
367     * Query if this barrier is in a broken state.
368     * @return <tt>true</tt> if one or more parties broke out of this
369 dl 1.2 * barrier due to interruption or timeout since construction or
370     * the last reset; and <tt>false</tt> otherwise.
371 tim 1.1 */
372     public boolean isBroken() {
373 dl 1.2 lock.lock();
374     try {
375     return broken >= generation;
376     }
377     finally {
378     lock.unlock();
379     }
380 tim 1.1 }
381    
382     /**
383     * Reset the barrier to its initial state. If any parties are
384     * currently waiting at the barrier, they will return with a
385     * {@link BrokenBarrierException}.
386     */
387     public void reset() {
388 dl 1.2 lock.lock();
389     try {
390     int g = generation;
391     nextGeneration();
392     broken = g; // cause brokenness setting to stop at previous gen.
393     trip.signalAll();
394     }
395     finally {
396     lock.unlock();
397     }
398 tim 1.1 }
399    
400     /**
401     * Return the number of parties currently waiting at the barrier.
402     * This method is primarily useful for debugging and assertions.
403     *
404     * @return the number of parties currently blocked in {@link #await}
405     **/
406     public int getNumberWaiting() {
407 dl 1.2 lock.lock();
408     try {
409     return parties - count;
410     }
411     finally {
412     lock.unlock();
413     }
414 tim 1.1 }
415    
416     }
417    
418