ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CyclicBarrier.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/CyclicBarrier.java (file contents):
Revision 1.27 by dl, Tue Jul 13 15:03:05 2004 UTC vs.
Revision 1.28 by dl, Tue Apr 19 15:12:43 2005 UTC

# Line 17 | Line 17 | import java.util.concurrent.locks.*;
17   *
18   * <p>A <tt>CyclicBarrier</tt> supports an optional {@link Runnable} command
19   * that is run once per barrier point, after the last thread in the party
20 < * arrives, but before any threads are released.
20 > * arrives, but before any threads are released.
21   * This <em>barrier action</em> is useful
22   * for updating shared-state before any of the parties continue.
23 < *
23 > *
24   * <p><b>Sample usage:</b> Here is an example of
25   *  using a barrier in a parallel decomposition design:
26   * <pre>
# Line 28 | Line 28 | import java.util.concurrent.locks.*;
28   *   final int N;
29   *   final float[][] data;
30   *   final CyclicBarrier barrier;
31 < *  
31 > *
32   *   class Worker implements Runnable {
33   *     int myRow;
34   *     Worker(int row) { myRow = row; }
# Line 37 | Line 37 | import java.util.concurrent.locks.*;
37   *         processRow(myRow);
38   *
39   *         try {
40 < *           barrier.await();
41 < *         } catch (InterruptedException ex) {
42 < *           return;
43 < *         } catch (BrokenBarrierException ex) {
44 < *           return;
40 > *           barrier.await();
41 > *         } catch (InterruptedException ex) {
42 > *           return;
43 > *         } catch (BrokenBarrierException ex) {
44 > *           return;
45   *         }
46   *       }
47   *     }
# Line 50 | Line 50 | import java.util.concurrent.locks.*;
50   *   public Solver(float[][] matrix) {
51   *     data = matrix;
52   *     N = matrix.length;
53 < *     barrier = new CyclicBarrier(N,
53 > *     barrier = new CyclicBarrier(N,
54   *                                 new Runnable() {
55 < *                                   public void run() {
56 < *                                     mergeRows(...);
55 > *                                   public void run() {
56 > *                                     mergeRows(...);
57   *                                   }
58   *                                 });
59 < *     for (int i = 0; i < N; ++i)
59 > *     for (int i = 0; i < N; ++i)
60   *       new Thread(new Worker(i)).start();
61   *
62   *     waitUntilDone();
63   *   }
64   * }
65   * </pre>
66 < * Here, each worker thread processes a row of the matrix then waits at the
66 > * Here, each worker thread processes a row of the matrix then waits at the
67   * barrier until all rows have been processed. When all rows are processed
68 < * the supplied {@link Runnable} barrier action is executed and merges the
68 > * the supplied {@link Runnable} barrier action is executed and merges the
69   * rows. If the merger
70   * determines that a solution has been found then <tt>done()</tt> will return
71   * <tt>true</tt> and each worker will terminate.
# Line 74 | Line 74 | import java.util.concurrent.locks.*;
74   * it is executed, then any of the threads in the party could execute that
75   * action when it is released. To facilitate this, each invocation of
76   * {@link #await} returns the arrival index of that thread at the barrier.
77 < * You can then choose which thread should execute the barrier action, for
77 > * You can then choose which thread should execute the barrier action, for
78   * example:
79   * <pre>  if (barrier.await() == 0) {
80   *     // log the completion of this iteration
81   *   }</pre>
82   *
83 < * <p>The <tt>CyclicBarrier</tt> uses a fast-fail all-or-none breakage
84 < * model for failed synchronization attempts: If a thread leaves a
85 < * barrier point prematurely because of interruption, failure, or
86 < * timeout, all other threads, even those that have not yet resumed
87 < * from a previous {@link #await}, will also leave abnormally via
88 < * {@link BrokenBarrierException} (or <tt>InterruptedException</tt> if
89 < * they too were interrupted at about the same time).
83 > * <p>The <tt>CyclicBarrier</tt> uses an all-or-none breakage model
84 > * for failed synchronization attempts: If a thread leaves a barrier
85 > * point prematurely because of interruption, failure, or timeout, all
86 > * other threads waiting at that barrier point will also leave
87 > * abnormally via {@link BrokenBarrierException} (or
88 > * <tt>InterruptedException</tt> if they too were interrupted at about
89 > * the same time).
90   *
91   * @since 1.5
92   * @see CountDownLatch
# Line 94 | Line 94 | import java.util.concurrent.locks.*;
94   * @author Doug Lea
95   */
96   public class CyclicBarrier {
97 +    /**
98 +     * Each use of the barrier is represented as a generation instance.
99 +     * The generation changes whenever the barrier is tripped, or
100 +     * is reset. There can be many generations associated with threads
101 +     * using the barrier - due to the non-deterministic way the lock
102 +     * may be allocated to waiting threads - but only one of these
103 +     * can be active at a time (the one to which <tt>count</tt> applies)
104 +     * and all the rest are either broken or tripped.
105 +     * There need not be an active generation if there has been a break
106 +     * but no subsequent reset.
107 +     */
108 +    private static class Generation {
109 +        boolean broken = false;
110 +        boolean tripped = false;
111 +    }
112 +
113      /** The lock for guarding barrier entry */
114      private final ReentrantLock lock = new ReentrantLock();
115      /** Condition to wait on until tripped */
# Line 102 | Line 118 | public class CyclicBarrier {
118      private final int parties;
119      /* The command to run when tripped */
120      private final Runnable barrierCommand;
121 <
122 <    /**
107 <     * The generation number. Incremented upon barrier trip.
108 <     * Retracted upon reset.
109 <     */
110 <    private long generation;
111 <
112 <    /**
113 <     * Breakage indicator.
114 <     */
115 <    private boolean broken;
121 >    /** The current generation */
122 >    private Generation generation = new Generation();
123  
124      /**
125       * Number of parties still waiting. Counts down from parties to 0
126 <     * on each cycle.
126 >     * on each generation. This only has meaning for the current non-broken
127 >     * generation. It is reset to parties on each new generation.
128       */
129 <    private int count;
129 >    private int count;
130  
131      /**
132       * Updates state on barrier trip and wake up everyone.
133 <     */  
133 >     * Called only while holding lock.
134 >     */
135      private void nextGeneration() {
136 <        count = parties;
137 <        ++generation;
136 >        // signal completion of last generation
137 >        generation.tripped = true;
138          trip.signalAll();
139 +        // set up next generation
140 +        count = parties;
141 +        generation = new Generation();
142      }
143  
144      /**
145 <     * Sets barrier as broken and wake up everyone
145 >     * Sets current barrier generation as broken and wakes up everyone
146 >     * Called only while holding lock.
147       */
148      private void breakBarrier() {
149 <        broken = true;
149 >        generation.broken = true;
150          trip.signalAll();
151      }
152  
153      /**
154       * Main barrier code, covering the various policies.
155       */
156 <    private int dowait(boolean timed, long nanos)
157 <        throws InterruptedException, BrokenBarrierException, TimeoutException {
156 >    private int dowait(boolean timed, long nanos)
157 >        throws InterruptedException, BrokenBarrierException,
158 >               TimeoutException {
159          final ReentrantLock lock = this.lock;
160          lock.lock();
161          try {
162 <            int index = --count;
149 <            long g = generation;
162 >            Generation g = generation;
163  
164 <            if (broken)
164 >            if (g.broken)
165                  throw new BrokenBarrierException();
166  
167              if (Thread.interrupted()) {
# Line 156 | Line 169 | public class CyclicBarrier {
169                  throw new InterruptedException();
170              }
171  
172 +            int index = --count;
173              if (index == 0) {  // tripped
174                  nextGeneration();
175                  boolean ranAction = false;
176                  try {
177                      Runnable command = barrierCommand;
178 <                    if (command != null)
178 >                    if (command != null)
179                          command.run();
180                      ranAction = true;
181                      return 0;
182                  } finally {
183 <                    if (!ranAction)
184 <                        breakBarrier();
183 >                    if (!ranAction) {
184 >                        // Mark g (not the now-current generation) broken.
185 >                        g.broken = true;
186 >                    }
187                  }
188              }
189  
190 +            // loop until tripped, broken, interrupted, or timed out
191              for (;;) {
192                  try {
193 <                    if (!timed)
193 >                    if (!timed)
194                          trip.await();
195                      else if (nanos > 0L)
196                          nanos = trip.awaitNanos(nanos);
# Line 181 | Line 198 | public class CyclicBarrier {
198                      breakBarrier();
199                      throw ie;
200                  }
201 <                
202 <                if (broken ||
186 <                    g > generation) // true if a reset occurred while waiting
201 >
202 >                if (g.broken )
203                      throw new BrokenBarrierException();
204  
205 <                if (g < generation)
205 >                if (g.tripped)
206                      return index;
207  
208                  if (timed && nanos <= 0L) {
# Line 194 | Line 210 | public class CyclicBarrier {
210                      throw new TimeoutException();
211                  }
212              }
197
213          } finally {
214              lock.unlock();
215          }
216      }
217  
218 +
219      /**
220       * Creates a new <tt>CyclicBarrier</tt> that will trip when the
221       * given number of parties (threads) are waiting upon it, and which
# Line 215 | Line 231 | public class CyclicBarrier {
231       */
232      public CyclicBarrier(int parties, Runnable barrierAction) {
233          if (parties <= 0) throw new IllegalArgumentException();
234 <        this.parties = parties;
234 >        this.parties = parties;
235          this.count = parties;
236          this.barrierCommand = barrierAction;
237      }
# Line 266 | Line 282 | public class CyclicBarrier {
282       * then {@link InterruptedException} is thrown and the current thread's
283       * interrupted status is cleared.
284       *
285 <     * <p>If the barrier is {@link #reset} while any thread is waiting, or if
285 >     * <p>If the barrier is {@link #reset} while any thread is waiting, or if
286       * the barrier {@link #isBroken is broken} when <tt>await</tt> is invoked,
287       * or while any thread is waiting,
288       * then {@link BrokenBarrierException} is thrown.
289       *
290       * <p>If any thread is {@link Thread#interrupt interrupted} while waiting,
291 <     * then all other waiting threads will throw
291 >     * then all other waiting threads will throw
292       * {@link BrokenBarrierException} and the barrier is placed in the broken
293       * state.
294       *
295       * <p>If the current thread is the last thread to arrive, and a
296       * non-null barrier action was supplied in the constructor, then the
297 <     * current thread runs the action before allowing the other threads to
297 >     * current thread runs the action before allowing the other threads to
298       * continue.
299       * If an exception occurs during the barrier action then that exception
300       * will be propagated in the current thread and the barrier is placed in
301       * the broken state.
302       *
303       * @return the arrival index of the current thread, where index
304 <     *  <tt>{@link #getParties()} - 1</tt> indicates the first to arrive and
304 >     *  <tt>{@link #getParties()} - 1</tt> indicates the first to arrive and
305       * zero indicates the last to arrive.
306       *
307 <     * @throws InterruptedException if the current thread was interrupted
307 >     * @throws InterruptedException if the current thread was interrupted
308       * while waiting
309       * @throws BrokenBarrierException if <em>another</em> thread was
310 <     * interrupted while the current thread was waiting, or the barrier was
311 <     * reset, or the barrier was broken when <tt>await</tt> was called,
312 <     * or the barrier action (if present) failed due an exception.
310 >     * interrupted or timed out while the current thread was waiting,
311 >     * or the barrier was reset, or the barrier was broken when
312 >     * <tt>await</tt> was called, or the barrier action (if present)
313 >     * failed due an exception.
314       */
315      public int await() throws InterruptedException, BrokenBarrierException {
316          try {
# Line 332 | Line 349 | public class CyclicBarrier {
349       * is thrown. If the time is less than or equal to zero, the
350       * method will not wait at all.
351       *
352 <     * <p>If the barrier is {@link #reset} while any thread is waiting, or if
352 >     * <p>If the barrier is {@link #reset} while any thread is waiting, or if
353       * the barrier {@link #isBroken is broken} when <tt>await</tt> is invoked,
354       * or while any thread is waiting,
355       * then {@link BrokenBarrierException} is thrown.
356       *
357       * <p>If any thread is {@link Thread#interrupt interrupted} while waiting,
358 <     * then all other waiting threads will throw
358 >     * then all other waiting threads will throw
359       * {@link BrokenBarrierException} and the barrier is placed in the broken
360       * state.
361       *
362       * <p>If the current thread is the last thread to arrive, and a
363       * non-null barrier action was supplied in the constructor, then the
364 <     * current thread runs the action before allowing the other threads to
364 >     * current thread runs the action before allowing the other threads to
365       * continue.
366       * If an exception occurs during the barrier action then that exception
367       * will be propagated in the current thread and the barrier is placed in
# Line 353 | Line 370 | public class CyclicBarrier {
370       * @param timeout the time to wait for the barrier
371       * @param unit the time unit of the timeout parameter
372       * @return the arrival index of the current thread, where index
373 <     *  <tt>{@link #getParties()} - 1</tt> indicates the first to arrive and
373 >     *  <tt>{@link #getParties()} - 1</tt> indicates the first to arrive and
374       * zero indicates the last to arrive.
375       *
376 <     * @throws InterruptedException if the current thread was interrupted
376 >     * @throws InterruptedException if the current thread was interrupted
377       * while waiting
378       * @throws TimeoutException if the specified timeout elapses.
379       * @throws BrokenBarrierException if <em>another</em> thread was
380 <     * interrupted while the current thread was waiting, or the barrier was
381 <     * reset, or the barrier was broken when <tt>await</tt> was called,
382 <     * or the barrier action (if present) failed due an exception.
383 <     */
384 <    public int await(long timeout, TimeUnit unit)
385 <        throws InterruptedException,
386 <        BrokenBarrierException,
387 <        TimeoutException {
380 >     * interrupted or timed out while the current thread was waiting,
381 >     * or the barrier was reset, or the barrier was broken when
382 >     * <tt>await</tt> was called, or the barrier action (if present)
383 >     * failed due an exception.
384 >     */
385 >    public int await(long timeout, TimeUnit unit)
386 >        throws InterruptedException,
387 >               BrokenBarrierException,
388 >               TimeoutException {
389          return dowait(true, unit.toNanos(timeout));
390      }
391  
# Line 375 | Line 393 | public class CyclicBarrier {
393       * Queries if this barrier is in a broken state.
394       * @return <tt>true</tt> if one or more parties broke out of this
395       * barrier due to interruption or timeout since construction or
396 <     * the last reset, or a barrier action failed due to an exception;
396 >     * the last reset, or a barrier action failed due to an exception;
397       * and <tt>false</tt> otherwise.
398       */
399      public boolean isBroken() {
400          final ReentrantLock lock = this.lock;
401          lock.lock();
402          try {
403 <            return broken;
403 >            return generation.broken;
404          } finally {
405              lock.unlock();
406          }
# Line 401 | Line 419 | public class CyclicBarrier {
419          final ReentrantLock lock = this.lock;
420          lock.lock();
421          try {
422 <            /*
423 <             * Retract generation number enough to cover threads
406 <             * currently waiting on current and still resuming from
407 <             * previous generation, plus similarly accommodating spans
408 <             * after the reset.
409 <             */
410 <            generation -= 4;
411 <            broken = false;
412 <            trip.signalAll();
422 >            breakBarrier();   // break the current generation
423 >            nextGeneration(); // start a new generation
424          } finally {
425              lock.unlock();
426          }
# Line 430 | Line 441 | public class CyclicBarrier {
441              lock.unlock();
442          }
443      }
433
444   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines