ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/FairSemaphore.java
Revision: 1.7
Committed: Fri Aug 8 20:05:07 2003 UTC (20 years, 9 months ago) by tim
Branch: MAIN
Changes since 1.6: +16 -31 lines
Log Message:
Scrunched catch, finally, else clauses.

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7     package java.util.concurrent;
8 dl 1.4 import java.util.concurrent.locks.*;
9 dl 1.1
10     /**
11 dl 1.2 * A semaphore guaranteeing that threads invoking any of the {@link
12     * #acquire() acquire} methods are allocated permits in the order in
13     * which their invocation of those methods was processed
14     * (first-in-first-out; FIFO). Note that FIFO ordering necessarily
15     * applies to specific internal points of execution within these
16     * methods. So, it is possible for one thread to invoke
17     * <tt>acquire</tt> before another, but reach the ordering point after
18     * the other, and similarly upon return from the method.
19 dl 1.1 *
20 dl 1.2 * <p>Because permits are allocated in-order, this class also provides
21     * convenience methods to {@link #acquire(long) acquire} and {@link
22     * #release(long) release} multiple permits at a time.
23 dl 1.1 *
24     * @since 1.5
25     * @spec JSR-166
26 tim 1.7 * @revised $Date: 2003/07/11 13:12:06 $
27 dl 1.2 * @editor $Author: dl $
28 dl 1.3 * @author Doug Lea
29 dl 1.1 *
30     */
31     public class FairSemaphore extends Semaphore {
32    
33     /*
34 dl 1.6 * Basic algorithm is a variant of the one described
35     * in CPJ book 2nd edition, section 3.7.3
36 dl 1.1 */
37    
38     /**
39 dl 1.6 * Nodes of the wait queue. Opportunistically subclasses ReentrantLock.
40     */
41     private static class Node extends ReentrantLock {
42     /** The condition to wait on */
43     Condition done = newCondition();
44     /** True if interrupted before released */
45     boolean cancelled;
46     /** Number of permits remaining until can release */
47     long permitsNeeded;
48     /** Next node of queue */
49     Node next;
50    
51     Node(long n) { permitsNeeded = n; }
52     }
53    
54     /** Head of the wait queue */
55     private Node head;
56    
57     /** Pointer to last node of the wait queue */
58     private Node last;
59    
60     /** Add a new node to wait queue with given number of permits needed */
61     private Node enq(long n) {
62     Node p = new Node(n);
63     if (last == null)
64     last = head = p;
65     else
66     last = last.next = p;
67     return p;
68     }
69    
70     /** Remove first node from wait queue */
71     private void removeFirst() {
72     Node p = head;
73     assert p != null;
74     if (p != null && (head = p.next) == null)
75     last = null;
76     }
77    
78     /**
79     * Construct a <tt>FairSemaphore</tt> with the given number of
80 dl 1.1 * permits.
81     * @param permits the initial number of permits available
82     */
83     public FairSemaphore(long permits) {
84 dl 1.4 super(permits, new ReentrantLock(true));
85 dl 1.1 }
86    
87     /**
88 dl 1.6 * Acquires a permit from this semaphore, blocking until one is
89     * available, or the thread is {@link Thread#interrupt interrupted}.
90     *
91     * <p>Acquires a permit, if one is available and returns immediately,
92     * reducing the number of available permits by one.
93     * <p>If no permit is available then the current thread becomes
94     * disabled for thread scheduling purposes and lies dormant until
95     * one of two things happens:
96     * <ul>
97     * <li>Some other thread invokes the {@link #release} method for this
98     * semaphore and the current thread happens to be chosen as the
99     * thread to receive the permit; or
100     * <li>Some other thread {@link Thread#interrupt interrupts} the current
101     * thread.
102     * </ul>
103     *
104     * <p>If the current thread:
105     * <ul>
106     * <li>has its interrupted status set on entry to this method; or
107     * <li>is {@link Thread#interrupt interrupted} while waiting
108     * for a permit,
109     * </ul>
110     * then {@link InterruptedException} is thrown and the current thread's
111     * interrupted status is cleared.
112     *
113     * @throws InterruptedException if the current thread is interrupted
114     *
115     * @see Thread#interrupt
116     */
117     public void acquire() throws InterruptedException {
118     acquire(1L);
119     }
120    
121     /**
122     * Acquires a permit from this semaphore, blocking until one is
123     * available.
124     *
125     * <p>Acquires a permit, if one is available and returns immediately,
126     * reducing the number of available permits by one.
127     * <p>If no permit is available then the current thread becomes
128     * disabled for thread scheduling purposes and lies dormant until
129     * some other thread invokes the {@link #release} method for this
130     * semaphore and the current thread happens to be chosen as the
131     * thread to receive the permit.
132     *
133     * <p>If the current thread
134     * is {@link Thread#interrupt interrupted} while waiting
135     * for a permit then it will continue to wait, but the time at which
136     * the thread is assigned a permit may change compared to the time it
137     * would have received the permit had no interruption occurred. When the
138     * thread does return from this method its interrupt status will be set.
139     *
140     */
141     public void acquireUninterruptibly() {
142     acquireUninterruptibly(1L);
143     }
144    
145     /**
146     * Acquires a permit from this semaphore, only if one is available at the
147     * time of invocation.
148     * <p>Acquires a permit, if one is available and returns immediately,
149     * with the value <tt>true</tt>,
150     * reducing the number of available permits by one.
151     *
152     * <p>If no permit is available then this method will return
153     * immediately with the value <tt>false</tt>.
154     *
155     * @return <tt>true</tt> if a permit was acquired and <tt>false</tt>
156     * otherwise.
157     */
158     public boolean tryAcquire() {
159     return tryAcquire(1L);
160     }
161    
162     /**
163     * Acquires a permit from this semaphore, if one becomes available
164     * within the given waiting time and the
165     * current thread has not been {@link Thread#interrupt interrupted}.
166     * <p>Acquires a permit, if one is available and returns immediately,
167     * with the value <tt>true</tt>,
168     * reducing the number of available permits by one.
169     * <p>If no permit is available then
170     * the current thread becomes disabled for thread scheduling
171     * purposes and lies dormant until one of three things happens:
172     * <ul>
173     * <li>Some other thread invokes the {@link #release} method for this
174     * semaphore and the current thread happens to be chosen as the
175     * thread to receive the permit; or
176     * <li>Some other thread {@link Thread#interrupt interrupts} the current
177     * thread; or
178     * <li>The specified waiting time elapses.
179     * </ul>
180     * <p>If a permit is acquired then the value <tt>true</tt> is returned.
181     * <p>If the current thread:
182     * <ul>
183     * <li>has its interrupted status set on entry to this method; or
184     * <li>is {@link Thread#interrupt interrupted} while waiting to acquire
185     * a permit,
186     * </ul>
187     * then {@link InterruptedException} is thrown and the current thread's
188     * interrupted status is cleared.
189     * <p>If the specified waiting time elapses then the value <tt>false</tt>
190     * is returned.
191     * The given waiting time is a best-effort lower bound. If the time is
192     * less than or equal to zero, the method will not wait at all.
193     *
194     * @param timeout the maximum time to wait for a permit
195     * @param unit the time unit of the <tt>timeout</tt> argument.
196     * @return <tt>true</tt> if a permit was acquired and <tt>false</tt>
197     * if the waiting time elapsed before a permit was acquired.
198     *
199     * @throws InterruptedException if the current thread is interrupted
200     *
201     * @see Thread#interrupt
202     *
203     */
204     public boolean tryAcquire(long timeout, TimeUnit unit)
205     throws InterruptedException {
206     return tryAcquire(1L, timeout, unit);
207     }
208    
209     /**
210     * Releases a permit, returning it to the semaphore.
211     * <p>Releases a permit, increasing the number of available permits
212     * by one.
213     * If any threads are blocking trying to acquire a permit, then one
214     * is selected and given the permit that was just released.
215     * That thread is re-enabled for thread scheduling purposes.
216     * <p>There is no requirement that a thread that releases a permit must
217     * have acquired that permit by calling {@link #acquire}.
218     * Correct usage of a semaphore is established by programming convention
219     * in the application.
220     */
221     public void release() {
222     release(1L);
223     }
224    
225    
226     /**
227 dl 1.1 * Acquires the given number of permits from this semaphore,
228     * blocking until all are available,
229     * or the thread is {@link Thread#interrupt interrupted}.
230     *
231     * <p>Acquires the given number of permits, if they are available,
232     * and returns immediately,
233     * reducing the number of available permits by the given amount.
234     *
235     * <p>If insufficient permits are available then the current thread becomes
236     * disabled for thread scheduling purposes and lies dormant until
237     * one of two things happens:
238     * <ul>
239     * <li>Some other thread invokes one of the {@link #release() release}
240     * methods for this semaphore, the current thread is next to be assigned
241     * permits and the number of available permits satisfies this request; or
242     * <li>Some other thread {@link Thread#interrupt interrupts} the current
243     * thread.
244     * </ul>
245     *
246     * <p>If the current thread:
247     * <ul>
248     * <li>has its interrupted status set on entry to this method; or
249     * <li>is {@link Thread#interrupt interrupted} while waiting
250     * for a permit,
251     * </ul>
252     * then {@link InterruptedException} is thrown and the current thread's
253     * interrupted status is cleared.
254     * Any available permits are assigned to the next waiting thread as if
255     * they had been made available by a call to {@link #release()}.
256     *
257     * @param permits the number of permits to acquire
258     *
259     * @throws InterruptedException if the current thread is interrupted
260     * @throws IllegalArgumentException if permits less than zero.
261     *
262     * @see Thread#interrupt
263     */
264     public void acquire(long permits) throws InterruptedException {
265     if (permits < 0) throw new IllegalArgumentException();
266 dl 1.6 Node node = null;
267 dl 1.1 lock.lockInterruptibly();
268     try {
269 dl 1.6 long remaining = count - permits;
270     if (remaining >= 0) {
271     count = remaining;
272     return;
273     }
274     count = 0;
275     node = enq(-remaining);
276 tim 1.7 } finally {
277 dl 1.1 lock.unlock();
278     }
279 dl 1.6
280     node.lock();
281     try {
282     while (node.permitsNeeded > 0)
283     node.done.await();
284 tim 1.7 } catch(InterruptedException ie) {
285 dl 1.6 if (node.permitsNeeded > 0) {
286     node.cancelled = true;
287     release(permits - node.permitsNeeded);
288     throw ie;
289 tim 1.7 } else { // ignore interrupt
290 dl 1.6 Thread.currentThread().interrupt();
291     }
292 tim 1.7 } finally {
293 dl 1.6 node.unlock();
294     }
295 dl 1.1 }
296    
297     /**
298     * Acquires the given number of permits from this semaphore,
299     * blocking until all are available.
300     *
301     * <p>Acquires the given number of permits, if they are available,
302     * and returns immediately,
303     * reducing the number of available permits by the given amount.
304     *
305     * <p>If insufficient permits are available then the current thread becomes
306     * disabled for thread scheduling purposes and lies dormant until
307     * some other thread invokes one of the {@link #release() release}
308     * methods for this semaphore, the current thread is next to be assigned
309     * permits and the number of available permits satisfies this request.
310     *
311     * <p>If the current thread
312     * is {@link Thread#interrupt interrupted} while waiting
313     * for permits then it will continue to wait and its position in the
314     * queue is not affected. When the
315     * thread does return from this method its interrupt status will be set.
316     *
317     * @param permits the number of permits to acquire
318     * @throws IllegalArgumentException if permits less than zero.
319     *
320     */
321     public void acquireUninterruptibly(long permits) {
322     if (permits < 0) throw new IllegalArgumentException();
323 dl 1.6 Node node = null;
324 dl 1.1 lock.lock();
325     try {
326 dl 1.6 long remaining = count - permits;
327     if (remaining >= 0) {
328     count = remaining;
329     return;
330     }
331     count = 0;
332     node = enq(-remaining);
333 tim 1.7 } finally {
334 dl 1.1 lock.unlock();
335     }
336 dl 1.6
337     node.lock();
338     try {
339     while (node.permitsNeeded > 0)
340     node.done.awaitUninterruptibly();
341 tim 1.7 } finally {
342 dl 1.6 node.unlock();
343     }
344 dl 1.1 }
345    
346     /**
347     * Acquires the given number of permits from this semaphore, only if
348     * all are available at the time of invocation.
349     * <p>Acquires the given number of permits, if they are available, and
350     * returns immediately, with the value <tt>true</tt>,
351     * reducing the number of available permits by the given amount.
352     *
353     * <p>If insufficient permits are available then this method will return
354     * immediately with the value <tt>false</tt> and the number of available
355     * permits is unchanged.
356     *
357     * @param permits the number of permits to acquire
358     *
359     * @return <tt>true</tt> if the permits were acquired and <tt>false</tt>
360     * otherwise.
361     * @throws IllegalArgumentException if permits less than zero.
362     */
363     public boolean tryAcquire(long permits) {
364     if (permits < 0) throw new IllegalArgumentException();
365     lock.lock();
366     try {
367 dl 1.6 long remaining = count - permits;
368     if (remaining >= 0) {
369     count = remaining;
370 dl 1.1 return true;
371     }
372     return false;
373 tim 1.7 } finally {
374 dl 1.1 lock.unlock();
375     }
376     }
377    
378     /**
379     * Acquires the given number of permits from this semaphore, if all
380     * become available within the given waiting time and the
381     * current thread has not been {@link Thread#interrupt interrupted}.
382     * <p>Acquires the given number of permits, if they are available and
383     * returns immediately, with the value <tt>true</tt>,
384     * reducing the number of available permits by the given amount.
385     * <p>If insufficient permits are available then
386     * the current thread becomes disabled for thread scheduling
387     * purposes and lies dormant until one of three things happens:
388     * <ul>
389     * <li>Some other thread invokes one of the {@link #release() release}
390     * methods for this semaphore, the current thread is next to be assigned
391     * permits and the number of available permits satisfies this request; or
392     * <li>Some other thread {@link Thread#interrupt interrupts} the current
393     * thread; or
394     * <li>The specified waiting time elapses.
395     * </ul>
396     * <p>If the permits are acquired then the value <tt>true</tt> is returned.
397     * <p>If the current thread:
398     * <ul>
399     * <li>has its interrupted status set on entry to this method; or
400     * <li>is {@link Thread#interrupt interrupted} while waiting to acquire
401     * the permits,
402     * </ul>
403     * then {@link InterruptedException} is thrown and the current thread's
404     * interrupted status is cleared.
405     * Any available permits are assigned to the next waiting thread as if
406     * they had been made available by a call to {@link #release()}.
407     *
408     * <p>If the specified waiting time elapses then the value <tt>false</tt>
409     * is returned.
410     * The given waiting time is a best-effort lower bound. If the time is
411     * less than or equal to zero, the method will not wait at all.
412     * Any available permits are assigned to the next waiting thread as if
413     * they had been made available by a call to {@link #release()}.
414     *
415     * @param permits the number of permits to acquire
416     * @param timeout the maximum time to wait for the permits
417 dl 1.6 * @param unit the time unit of the <tt>timeout</tt> argument.
418 dl 1.1 * @return <tt>true</tt> if all permits were acquired and <tt>false</tt>
419     * if the waiting time elapsed before all permits were acquired.
420     *
421     * @throws InterruptedException if the current thread is interrupted
422     * @throws IllegalArgumentException if permits less than zero.
423     *
424     * @see Thread#interrupt
425     *
426     */
427 dl 1.6 public boolean tryAcquire(long permits, long timeout, TimeUnit unit)
428 dl 1.1 throws InterruptedException {
429     if (permits < 0) throw new IllegalArgumentException();
430 dl 1.6 Node node = null;
431     long nanos = unit.toNanos(timeout);
432 dl 1.1 lock.lockInterruptibly();
433     try {
434 dl 1.6 long remaining = count - permits;
435     if (remaining >= 0) {
436     count = remaining;
437     return true;
438     }
439     if (nanos <= 0)
440     return false;
441     count = 0;
442     node = enq(-remaining);
443 tim 1.7 } finally {
444 dl 1.6 lock.unlock();
445     }
446    
447     node.lock();
448     try {
449     while (node.permitsNeeded > 0) {
450     nanos = node.done.awaitNanos(nanos);
451     if (nanos <= 0) {
452     release(permits - node.permitsNeeded);
453     return false;
454 dl 1.1 }
455     }
456 dl 1.6 return true;
457 tim 1.7 } catch(InterruptedException ie) {
458 dl 1.6 if (node.permitsNeeded > 0) {
459     node.cancelled = true;
460     release(permits - node.permitsNeeded);
461     throw ie;
462 tim 1.7 } else { // ignore interrupt
463 dl 1.6 Thread.currentThread().interrupt();
464     return true;
465     }
466 tim 1.7 } finally {
467 dl 1.6 node.unlock();
468 dl 1.1 }
469     }
470    
471    
472     /**
473     * Releases the given number of permits, returning them to the semaphore.
474     * <p>Releases the given number of permits, increasing the number of
475     * available permits by that amount.
476     * If any threads are blocking trying to acquire permits, then the
477     * one that has been waiting the longest
478     * is selected and given the permits that were just released.
479     * If the number of available permits satisfies that thread's request
480     * then that thread is re-enabled for thread scheduling purposes; otherwise
481     * the thread continues to wait. If there are still permits available
482     * after the first thread's request has been satisfied, then those permits
483     * are assigned to the next waiting thread. If it is satisfied then it is
484     * re-enabled for thread scheduling purposes. This continues until there
485     * are insufficient permits to satisfy the next waiting thread, or there
486     * are no more waiting threads.
487     *
488     * <p>There is no requirement that a thread that releases a permit must
489     * have acquired that permit by calling {@link Semaphore#acquire acquire}.
490     * Correct usage of a semaphore is established by programming convention
491     * in the application.
492     *
493     * @param permits the number of permits to release
494     * @throws IllegalArgumentException if permits less than zero.
495     */
496     public void release(long permits) {
497     if (permits < 0) throw new IllegalArgumentException();
498 dl 1.6 if (permits == 0)
499     return;
500     // Even when using fair locks, releases should try to barge in.
501 dl 1.5 if (!lock.tryLock())
502     lock.lock();
503 dl 1.1 try {
504 dl 1.6 long p = permits;
505     while (p > 0) {
506     Node node = head;
507     if (node == null) {
508     count += p;
509     p = 0;
510 tim 1.7 } else {
511 dl 1.6 node.lock();
512     try {
513     if (node.cancelled)
514     removeFirst();
515     else if (node.permitsNeeded > p) {
516     node.permitsNeeded -= p;
517     p = 0;
518 tim 1.7 } else {
519 dl 1.6 p -= node.permitsNeeded;
520     node.permitsNeeded = 0;
521     node.done.signal();
522     removeFirst();
523     }
524 tim 1.7 } finally {
525 dl 1.6 node.unlock();
526     }
527     }
528     }
529 tim 1.7 } finally {
530 dl 1.1 lock.unlock();
531     }
532     }
533     }