ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CyclicBarrier.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/CyclicBarrier.java (file contents):
Revision 1.1 by tim, Wed May 14 21:30:46 2003 UTC vs.
Revision 1.2 by dl, Tue May 27 18:14:39 2003 UTC

# Line 1 | Line 1
1 + /*
2 + * Written by Doug Lea with assistance from members of JCP JSR-166
3 + * Expert Group and released to the public domain. Use, modify, and
4 + * redistribute this code in any way without acknowledgement.
5 + */
6 +
7   package java.util.concurrent;
8  
9   /**
# Line 70 | Line 76 | package java.util.concurrent;
76   *     // log the completion of this iteration
77   *   }</pre>
78   *
79 < * <p>The <tt>CyclicBarrier</tt> uses an all-or-none breakage model for failed
80 < * synchronization attempts: If a thread leaves a barrier point
81 < * prematurely because of interruption, all others will also
82 < * leave abnormally (via {@link BrokenBarrierException}), until the barrier is
83 < * {@link #reset}. This is usually the simplest and best
84 < * strategy for sharing knowledge about failures among cooperating
85 < * threads in the most common usage contexts of barriers.  
79 > * <p>The <tt>CyclicBarrier</tt> uses an all-or-none breakage model
80 > * for failed synchronization attempts: If a thread leaves a barrier
81 > * point prematurely because of interruption or timeout, all others
82 > * will also leave abnormally (via {@link BrokenBarrierException}),
83 > * until the barrier is {@link #reset}. This is usually the simplest
84 > * and best strategy for sharing knowledge about failures among
85 > * cooperating threads in the most common usage contexts of barriers.
86   *
87   * <h3>Implementation Considerations</h3>
88   * <p>This implementation has the property that interruptions among newly
# Line 97 | Line 103 | package java.util.concurrent;
103   * @fixme Should we have a timeout version of await()?
104   */
105   public class CyclicBarrier {
106 +    private final ReentrantLock lock = new ReentrantLock();
107 +    private final Condition trip = lock.newCondition();
108 +    private final int parties;
109 +    private Runnable barrierCommand;
110 +
111 +    /**
112 +     * The generation number. Incremented mod Integer.MAX_VALUE every
113 +     * time barrier tripped. Starts at 1 to simplify handling of
114 +     * breakage indicator
115 +     */
116 +    private int generation = 1;
117 +
118 +    /**
119 +     * Breakage indicator: last generation of breakage, propagated
120 +     * across barrier generations until reset.
121 +     */
122 +    private int broken = 0;
123 +
124 +    /**
125 +     * Number of parties still waiting. Counts down from parties to 0
126 +     * on each cycle.
127 +     */
128 +    private int count;
129 +
130 +    /**
131 +     * Update state on barrier trip.
132 +     */  
133 +    private void nextGeneration() {
134 +        count = parties;
135 +        int g = generation;
136 +        // avoid generation == 0
137 +        if (++generation < 0) generation = 1;
138 +        // propagate breakage
139 +        if (broken == g) broken = generation;
140 +    }
141 +
142 +    private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
143 +        lock.lock();
144 +        try {
145 +            int index = --count;
146 +            int g = generation;
147 +
148 +            if (broken == g)
149 +                throw new BrokenBarrierException();
150 +
151 +            if (Thread.interrupted()) {
152 +                broken = g;
153 +                trip.signalAll();
154 +                throw new InterruptedException();
155 +            }
156 +
157 +            if (index == 0) {  // tripped
158 +                nextGeneration();
159 +                trip.signalAll();
160 +                try {
161 +                    if (barrierCommand != null)
162 +                        barrierCommand.run();
163 +                    return 0;
164 +                }
165 +                catch (RuntimeException ex) {
166 +                    broken = generation; // next generation is broken
167 +                    throw ex;
168 +                }
169 +            }
170 +
171 +            while (generation == g) {
172 +                try {
173 +                    if (!timed)
174 +                        trip.await();
175 +                    else if (nanos > 0)
176 +                        nanos = trip.awaitNanos(nanos);
177 +                }
178 +                catch (InterruptedException ex) {
179 +                    // Only claim that broken if interrupted before reset
180 +                    if (generation == g) {
181 +                        broken = g;
182 +                        trip.signalAll();
183 +                        throw ex;
184 +                    }
185 +                    else {
186 +                        Thread.currentThread().interrupt(); // propagate
187 +                        break;
188 +                    }
189 +                }
190 +                
191 +                if (timed && nanos <= 0) {
192 +                    broken = g;
193 +                    trip.signalAll();
194 +                    throw new TimeoutException();
195 +                }
196 +
197 +                if (broken == generation)
198 +                    throw new BrokenBarrierException();
199 +                
200 +            }
201 +            return index;
202 +
203 +        }
204 +        finally {
205 +            lock.unlock();
206 +        }
207 +    }
208  
209      /**
210       * Create a new <tt>CyclicBarrier</tt> that will trip when the
# Line 111 | Line 219 | public class CyclicBarrier {
219       * @throws IllegalArgumentException if <tt>parties</tt> is less than 1.
220       */
221      public CyclicBarrier(int parties, Runnable barrierAction) {
222 +        if (parties <= 0) throw new IllegalArgumentException();
223 +        this.parties = parties;
224 +        this.count = parties;
225 +        this.barrierCommand = barrierAction;
226      }
227  
228      /**
# Line 125 | Line 237 | public class CyclicBarrier {
237       * @throws IllegalArgumentException if <tt>parties</tt> is less than 1.
238       */
239      public CyclicBarrier(int parties) {
240 +        this(parties, null);
241      }
242  
243      /**
# Line 132 | Line 245 | public class CyclicBarrier {
245       * @return the number of parties required to trip this barrier.
246       **/
247      public int getParties() {
248 <        return 0; // for now
248 >        return parties;
249      }
250  
251      /**
# Line 141 | Line 254 | public class CyclicBarrier {
254       *
255       * <p>If the current thread is not the last to arrive then it is
256       * disabled for thread scheduling purposes and lies dormant until
257 <     * one of four things happens:
257 >     * one of following things happens:
258       * <ul>
259       * <li>The last thread arrives; or
260       * <li>Some other thread {@link Thread#interrupt interrupts} the current
261       * thread; or
262       * <li>Some other thread  {@link Thread#interrupt interrupts} one of the
263       * other waiting threads; or
264 +     * <li>Some other thread  times out while waiting for barrier; or
265       * <li>Some other thread invokes {@link #reset} on this barrier.
266       * </ul>
267       * <p>If the current thread:
# Line 185 | Line 299 | public class CyclicBarrier {
299       * reset, or the barrier was broken when <tt>await</tt> was called.
300       */
301      public int await() throws InterruptedException, BrokenBarrierException {
302 <        return 0; // for now
302 >        try {
303 >            return dowait(false, 0);
304 >        }
305 >        catch (TimeoutException toe) {
306 >            throw new Error(toe); // cannot happen;
307 >        }
308 >    }
309 >
310 >    /**
311 >     * Wait until all {@link #getParties parties} have invoked <tt>await</tt>
312 >     * on this barrier.
313 >     *
314 >     * <p>If the current thread is not the last to arrive then it is
315 >     * disabled for thread scheduling purposes and lies dormant until
316 >     * one of the following things happens:
317 >     * <ul>
318 >     * <li>The last thread arrives; or
319 >     * <li>The speceified timeout elapses; or
320 >     * <li>Some other thread {@link Thread#interrupt interrupts} the current
321 >     * thread; or
322 >     * <li>Some other thread  {@link Thread#interrupt interrupts} one of the
323 >     * other waiting threads; or
324 >     * <li>Some other thread  times out while waiting for barrier; or
325 >     * <li>Some other thread invokes {@link #reset} on this barrier.
326 >     * </ul>
327 >     * <p>If the current thread:
328 >     * <ul>
329 >     * <li>has its interrupted status set on entry to this method; or
330 >     * <li>is {@link Thread#interrupt interrupted} while waiting
331 >     * </ul>
332 >     * then {@link InterruptedException} is thrown and the current thread's
333 >     * interrupted status is cleared.
334 >     *
335 >     * <p>If the barrier is {@link #reset} while any thread is waiting, or if
336 >     * the barrier {@link #isBroken is broken} when <tt>await</tt> is invoked
337 >     * then {@link BrokenBarrierException} is thrown.
338 >     *
339 >     * <p>If any thread is {@link Thread#interrupt interrupted} while waiting,
340 >     * then all other waiting threads will throw
341 >     * {@link BrokenBarrierException} and the barrier is placed in the broken
342 >     * state.
343 >     *
344 >     * <p>If the current thread is the last thread to arrive, and a
345 >     * non-null barrier action was supplied in the constructor, then the
346 >     * current thread runs the action before allowing the other threads to
347 >     * continue.
348 >     * If an exception occurs during the barrier action then that exception
349 >     * will be propagated in the current thread.
350 >     *
351 >     * @return the arrival index of the current thread, where index
352 >     *  <tt>{@link #getParties()} - 1</tt> indicates the first to arrive and
353 >     * zero indicates the last to arrive.
354 >     *
355 >     * @throws InterruptedException if the current thread was interrupted
356 >     * while waiting
357 >     * @throws TimeoutException if the specified timeout elapses.
358 >     * @throws BrokenBarrierException if <em>another</em> thread was
359 >     * interrupted while the current thread was waiting, or the barrier was
360 >     * reset, or the barrier was broken when <tt>await</tt> was called.
361 >     */
362 >    public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
363 >        return dowait(true, unit.toNanos(timeout));
364      }
365  
366      /**
367       * Query if this barrier is in a broken state.
368       * @return <tt>true</tt> if one or more parties broke out of this
369 <     * barrier due to interruption since construction or the last reset;
370 <     * and <tt>false</tt> otherwise.
369 >     * barrier due to interruption or timeout since construction or
370 >     * the last reset; and <tt>false</tt> otherwise.
371       */
372      public boolean isBroken() {
373 <        return false; // for now
373 >        lock.lock();
374 >        try {
375 >            return broken >= generation;
376 >        }
377 >        finally {
378 >            lock.unlock();
379 >        }
380      }
381  
382      /**
# Line 204 | Line 385 | public class CyclicBarrier {
385       * {@link BrokenBarrierException}.
386       */
387      public void reset() {
388 <        // for now
388 >        lock.lock();
389 >        try {
390 >            int g = generation;
391 >            nextGeneration();
392 >            broken = g; // cause brokenness setting to stop at previous gen.
393 >            trip.signalAll();
394 >        }
395 >        finally {
396 >            lock.unlock();
397 >        }
398      }
399  
400      /**
# Line 214 | Line 404 | public class CyclicBarrier {
404       * @return the number of parties currently blocked in {@link #await}
405       **/
406      public int getNumberWaiting() {
407 <        return 0; // for now
407 >        lock.lock();
408 >        try {
409 >            return parties - count;
410 >        }
411 >        finally {
412 >            lock.unlock();
413 >        }
414      }
415  
416   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines