ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/FairSemaphore.java
Revision: 1.6
Committed: Fri Jul 11 13:12:06 2003 UTC (20 years, 10 months ago) by dl
Branch: MAIN
CVS Tags: JSR166_PRELIMINARY_TEST_RELEASE_2, JSR166_CR1
Changes since 1.5: +298 -46 lines
Log Message:
Fair semaphores actually fair now.

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7     package java.util.concurrent;
8 dl 1.4 import java.util.concurrent.locks.*;
9 dl 1.1
10     /**
11 dl 1.2 * A semaphore guaranteeing that threads invoking any of the {@link
12     * #acquire() acquire} methods are allocated permits in the order in
13     * which their invocation of those methods was processed
14     * (first-in-first-out; FIFO). Note that FIFO ordering necessarily
15     * applies to specific internal points of execution within these
16     * methods. So, it is possible for one thread to invoke
17     * <tt>acquire</tt> before another, but reach the ordering point after
18     * the other, and similarly upon return from the method.
19 dl 1.1 *
20 dl 1.2 * <p>Because permits are allocated in-order, this class also provides
21     * convenience methods to {@link #acquire(long) acquire} and {@link
22     * #release(long) release} multiple permits at a time.
23 dl 1.1 *
24     * @since 1.5
25     * @spec JSR-166
26 dl 1.6 * @revised $Date: 2003/07/09 23:23:17 $
27 dl 1.2 * @editor $Author: dl $
28 dl 1.3 * @author Doug Lea
29 dl 1.1 *
30     */
31     public class FairSemaphore extends Semaphore {
32    
33     /*
34 dl 1.6 * Basic algorithm is a variant of the one described
35     * in CPJ book 2nd edition, section 3.7.3
36 dl 1.1 */
37    
38     /**
39 dl 1.6 * Nodes of the wait queue. Opportunistically subclasses ReentrantLock.
40     */
41     private static class Node extends ReentrantLock {
42     /** The condition to wait on */
43     Condition done = newCondition();
44     /** True if interrupted before released */
45     boolean cancelled;
46     /** Number of permits remaining until can release */
47     long permitsNeeded;
48     /** Next node of queue */
49     Node next;
50    
51     Node(long n) { permitsNeeded = n; }
52     }
53    
54     /** Head of the wait queue */
55     private Node head;
56    
57     /** Pointer to last node of the wait queue */
58     private Node last;
59    
60     /** Add a new node to wait queue with given number of permits needed */
61     private Node enq(long n) {
62     Node p = new Node(n);
63     if (last == null)
64     last = head = p;
65     else
66     last = last.next = p;
67     return p;
68     }
69    
70     /** Remove first node from wait queue */
71     private void removeFirst() {
72     Node p = head;
73     assert p != null;
74     if (p != null && (head = p.next) == null)
75     last = null;
76     }
77    
78     /**
79     * Construct a <tt>FairSemaphore</tt> with the given number of
80 dl 1.1 * permits.
81     * @param permits the initial number of permits available
82     */
83     public FairSemaphore(long permits) {
84 dl 1.4 super(permits, new ReentrantLock(true));
85 dl 1.1 }
86    
87     /**
88 dl 1.6 * Acquires a permit from this semaphore, blocking until one is
89     * available, or the thread is {@link Thread#interrupt interrupted}.
90     *
91     * <p>Acquires a permit, if one is available and returns immediately,
92     * reducing the number of available permits by one.
93     * <p>If no permit is available then the current thread becomes
94     * disabled for thread scheduling purposes and lies dormant until
95     * one of two things happens:
96     * <ul>
97     * <li>Some other thread invokes the {@link #release} method for this
98     * semaphore and the current thread happens to be chosen as the
99     * thread to receive the permit; or
100     * <li>Some other thread {@link Thread#interrupt interrupts} the current
101     * thread.
102     * </ul>
103     *
104     * <p>If the current thread:
105     * <ul>
106     * <li>has its interrupted status set on entry to this method; or
107     * <li>is {@link Thread#interrupt interrupted} while waiting
108     * for a permit,
109     * </ul>
110     * then {@link InterruptedException} is thrown and the current thread's
111     * interrupted status is cleared.
112     *
113     * @throws InterruptedException if the current thread is interrupted
114     *
115     * @see Thread#interrupt
116     */
117     public void acquire() throws InterruptedException {
118     acquire(1L);
119     }
120    
121     /**
122     * Acquires a permit from this semaphore, blocking until one is
123     * available.
124     *
125     * <p>Acquires a permit, if one is available and returns immediately,
126     * reducing the number of available permits by one.
127     * <p>If no permit is available then the current thread becomes
128     * disabled for thread scheduling purposes and lies dormant until
129     * some other thread invokes the {@link #release} method for this
130     * semaphore and the current thread happens to be chosen as the
131     * thread to receive the permit.
132     *
133     * <p>If the current thread
134     * is {@link Thread#interrupt interrupted} while waiting
135     * for a permit then it will continue to wait, but the time at which
136     * the thread is assigned a permit may change compared to the time it
137     * would have received the permit had no interruption occurred. When the
138     * thread does return from this method its interrupt status will be set.
139     *
140     */
141     public void acquireUninterruptibly() {
142     acquireUninterruptibly(1L);
143     }
144    
145     /**
146     * Acquires a permit from this semaphore, only if one is available at the
147     * time of invocation.
148     * <p>Acquires a permit, if one is available and returns immediately,
149     * with the value <tt>true</tt>,
150     * reducing the number of available permits by one.
151     *
152     * <p>If no permit is available then this method will return
153     * immediately with the value <tt>false</tt>.
154     *
155     * @return <tt>true</tt> if a permit was acquired and <tt>false</tt>
156     * otherwise.
157     */
158     public boolean tryAcquire() {
159     return tryAcquire(1L);
160     }
161    
162     /**
163     * Acquires a permit from this semaphore, if one becomes available
164     * within the given waiting time and the
165     * current thread has not been {@link Thread#interrupt interrupted}.
166     * <p>Acquires a permit, if one is available and returns immediately,
167     * with the value <tt>true</tt>,
168     * reducing the number of available permits by one.
169     * <p>If no permit is available then
170     * the current thread becomes disabled for thread scheduling
171     * purposes and lies dormant until one of three things happens:
172     * <ul>
173     * <li>Some other thread invokes the {@link #release} method for this
174     * semaphore and the current thread happens to be chosen as the
175     * thread to receive the permit; or
176     * <li>Some other thread {@link Thread#interrupt interrupts} the current
177     * thread; or
178     * <li>The specified waiting time elapses.
179     * </ul>
180     * <p>If a permit is acquired then the value <tt>true</tt> is returned.
181     * <p>If the current thread:
182     * <ul>
183     * <li>has its interrupted status set on entry to this method; or
184     * <li>is {@link Thread#interrupt interrupted} while waiting to acquire
185     * a permit,
186     * </ul>
187     * then {@link InterruptedException} is thrown and the current thread's
188     * interrupted status is cleared.
189     * <p>If the specified waiting time elapses then the value <tt>false</tt>
190     * is returned.
191     * The given waiting time is a best-effort lower bound. If the time is
192     * less than or equal to zero, the method will not wait at all.
193     *
194     * @param timeout the maximum time to wait for a permit
195     * @param unit the time unit of the <tt>timeout</tt> argument.
196     * @return <tt>true</tt> if a permit was acquired and <tt>false</tt>
197     * if the waiting time elapsed before a permit was acquired.
198     *
199     * @throws InterruptedException if the current thread is interrupted
200     *
201     * @see Thread#interrupt
202     *
203     */
204     public boolean tryAcquire(long timeout, TimeUnit unit)
205     throws InterruptedException {
206     return tryAcquire(1L, timeout, unit);
207     }
208    
209     /**
210     * Releases a permit, returning it to the semaphore.
211     * <p>Releases a permit, increasing the number of available permits
212     * by one.
213     * If any threads are blocking trying to acquire a permit, then one
214     * is selected and given the permit that was just released.
215     * That thread is re-enabled for thread scheduling purposes.
216     * <p>There is no requirement that a thread that releases a permit must
217     * have acquired that permit by calling {@link #acquire}.
218     * Correct usage of a semaphore is established by programming convention
219     * in the application.
220     */
221     public void release() {
222     release(1L);
223     }
224    
225    
226     /**
227 dl 1.1 * Acquires the given number of permits from this semaphore,
228     * blocking until all are available,
229     * or the thread is {@link Thread#interrupt interrupted}.
230     *
231     * <p>Acquires the given number of permits, if they are available,
232     * and returns immediately,
233     * reducing the number of available permits by the given amount.
234     *
235     * <p>If insufficient permits are available then the current thread becomes
236     * disabled for thread scheduling purposes and lies dormant until
237     * one of two things happens:
238     * <ul>
239     * <li>Some other thread invokes one of the {@link #release() release}
240     * methods for this semaphore, the current thread is next to be assigned
241     * permits and the number of available permits satisfies this request; or
242     * <li>Some other thread {@link Thread#interrupt interrupts} the current
243     * thread.
244     * </ul>
245     *
246     * <p>If the current thread:
247     * <ul>
248     * <li>has its interrupted status set on entry to this method; or
249     * <li>is {@link Thread#interrupt interrupted} while waiting
250     * for a permit,
251     * </ul>
252     * then {@link InterruptedException} is thrown and the current thread's
253     * interrupted status is cleared.
254     * Any available permits are assigned to the next waiting thread as if
255     * they had been made available by a call to {@link #release()}.
256     *
257     * @param permits the number of permits to acquire
258     *
259     * @throws InterruptedException if the current thread is interrupted
260     * @throws IllegalArgumentException if permits less than zero.
261     *
262     * @see Thread#interrupt
263     */
264     public void acquire(long permits) throws InterruptedException {
265     if (permits < 0) throw new IllegalArgumentException();
266 dl 1.6 Node node = null;
267 dl 1.1 lock.lockInterruptibly();
268     try {
269 dl 1.6 long remaining = count - permits;
270     if (remaining >= 0) {
271     count = remaining;
272     return;
273     }
274     count = 0;
275     node = enq(-remaining);
276 dl 1.1 }
277     finally {
278     lock.unlock();
279     }
280 dl 1.6
281     node.lock();
282     try {
283     while (node.permitsNeeded > 0)
284     node.done.await();
285     }
286     catch(InterruptedException ie) {
287     if (node.permitsNeeded > 0) {
288     node.cancelled = true;
289     release(permits - node.permitsNeeded);
290     throw ie;
291     }
292     else { // ignore interrupt
293     Thread.currentThread().interrupt();
294     }
295     }
296     finally {
297     node.unlock();
298     }
299 dl 1.1 }
300    
301     /**
302     * Acquires the given number of permits from this semaphore,
303     * blocking until all are available.
304     *
305     * <p>Acquires the given number of permits, if they are available,
306     * and returns immediately,
307     * reducing the number of available permits by the given amount.
308     *
309     * <p>If insufficient permits are available then the current thread becomes
310     * disabled for thread scheduling purposes and lies dormant until
311     * some other thread invokes one of the {@link #release() release}
312     * methods for this semaphore, the current thread is next to be assigned
313     * permits and the number of available permits satisfies this request.
314     *
315     * <p>If the current thread
316     * is {@link Thread#interrupt interrupted} while waiting
317     * for permits then it will continue to wait and its position in the
318     * queue is not affected. When the
319     * thread does return from this method its interrupt status will be set.
320     *
321     * @param permits the number of permits to acquire
322     * @throws IllegalArgumentException if permits less than zero.
323     *
324     */
325     public void acquireUninterruptibly(long permits) {
326     if (permits < 0) throw new IllegalArgumentException();
327 dl 1.6 Node node = null;
328 dl 1.1 lock.lock();
329     try {
330 dl 1.6 long remaining = count - permits;
331     if (remaining >= 0) {
332     count = remaining;
333     return;
334     }
335     count = 0;
336     node = enq(-remaining);
337 dl 1.1 }
338     finally {
339     lock.unlock();
340     }
341 dl 1.6
342     node.lock();
343     try {
344     while (node.permitsNeeded > 0)
345     node.done.awaitUninterruptibly();
346     }
347     finally {
348     node.unlock();
349     }
350 dl 1.1 }
351    
352     /**
353     * Acquires the given number of permits from this semaphore, only if
354     * all are available at the time of invocation.
355     * <p>Acquires the given number of permits, if they are available, and
356     * returns immediately, with the value <tt>true</tt>,
357     * reducing the number of available permits by the given amount.
358     *
359     * <p>If insufficient permits are available then this method will return
360     * immediately with the value <tt>false</tt> and the number of available
361     * permits is unchanged.
362     *
363     * @param permits the number of permits to acquire
364     *
365     * @return <tt>true</tt> if the permits were acquired and <tt>false</tt>
366     * otherwise.
367     * @throws IllegalArgumentException if permits less than zero.
368     */
369     public boolean tryAcquire(long permits) {
370     if (permits < 0) throw new IllegalArgumentException();
371     lock.lock();
372     try {
373 dl 1.6 long remaining = count - permits;
374     if (remaining >= 0) {
375     count = remaining;
376 dl 1.1 return true;
377     }
378     return false;
379     }
380     finally {
381     lock.unlock();
382     }
383     }
384    
385     /**
386     * Acquires the given number of permits from this semaphore, if all
387     * become available within the given waiting time and the
388     * current thread has not been {@link Thread#interrupt interrupted}.
389     * <p>Acquires the given number of permits, if they are available and
390     * returns immediately, with the value <tt>true</tt>,
391     * reducing the number of available permits by the given amount.
392     * <p>If insufficient permits are available then
393     * the current thread becomes disabled for thread scheduling
394     * purposes and lies dormant until one of three things happens:
395     * <ul>
396     * <li>Some other thread invokes one of the {@link #release() release}
397     * methods for this semaphore, the current thread is next to be assigned
398     * permits and the number of available permits satisfies this request; or
399     * <li>Some other thread {@link Thread#interrupt interrupts} the current
400     * thread; or
401     * <li>The specified waiting time elapses.
402     * </ul>
403     * <p>If the permits are acquired then the value <tt>true</tt> is returned.
404     * <p>If the current thread:
405     * <ul>
406     * <li>has its interrupted status set on entry to this method; or
407     * <li>is {@link Thread#interrupt interrupted} while waiting to acquire
408     * the permits,
409     * </ul>
410     * then {@link InterruptedException} is thrown and the current thread's
411     * interrupted status is cleared.
412     * Any available permits are assigned to the next waiting thread as if
413     * they had been made available by a call to {@link #release()}.
414     *
415     * <p>If the specified waiting time elapses then the value <tt>false</tt>
416     * is returned.
417     * The given waiting time is a best-effort lower bound. If the time is
418     * less than or equal to zero, the method will not wait at all.
419     * Any available permits are assigned to the next waiting thread as if
420     * they had been made available by a call to {@link #release()}.
421     *
422     * @param permits the number of permits to acquire
423     * @param timeout the maximum time to wait for the permits
424 dl 1.6 * @param unit the time unit of the <tt>timeout</tt> argument.
425 dl 1.1 * @return <tt>true</tt> if all permits were acquired and <tt>false</tt>
426     * if the waiting time elapsed before all permits were acquired.
427     *
428     * @throws InterruptedException if the current thread is interrupted
429     * @throws IllegalArgumentException if permits less than zero.
430     *
431     * @see Thread#interrupt
432     *
433     */
434 dl 1.6 public boolean tryAcquire(long permits, long timeout, TimeUnit unit)
435 dl 1.1 throws InterruptedException {
436     if (permits < 0) throw new IllegalArgumentException();
437 dl 1.6 Node node = null;
438     long nanos = unit.toNanos(timeout);
439 dl 1.1 lock.lockInterruptibly();
440     try {
441 dl 1.6 long remaining = count - permits;
442     if (remaining >= 0) {
443     count = remaining;
444     return true;
445     }
446     if (nanos <= 0)
447     return false;
448     count = 0;
449     node = enq(-remaining);
450     }
451     finally {
452     lock.unlock();
453     }
454    
455     node.lock();
456     try {
457     while (node.permitsNeeded > 0) {
458     nanos = node.done.awaitNanos(nanos);
459     if (nanos <= 0) {
460     release(permits - node.permitsNeeded);
461     return false;
462 dl 1.1 }
463     }
464 dl 1.6 return true;
465 dl 1.1 }
466 dl 1.6 catch(InterruptedException ie) {
467     if (node.permitsNeeded > 0) {
468     node.cancelled = true;
469     release(permits - node.permitsNeeded);
470     throw ie;
471     }
472     else { // ignore interrupt
473     Thread.currentThread().interrupt();
474     return true;
475     }
476 dl 1.1 }
477     finally {
478 dl 1.6 node.unlock();
479 dl 1.1 }
480     }
481    
482    
483     /**
484     * Releases the given number of permits, returning them to the semaphore.
485     * <p>Releases the given number of permits, increasing the number of
486     * available permits by that amount.
487     * If any threads are blocking trying to acquire permits, then the
488     * one that has been waiting the longest
489     * is selected and given the permits that were just released.
490     * If the number of available permits satisfies that thread's request
491     * then that thread is re-enabled for thread scheduling purposes; otherwise
492     * the thread continues to wait. If there are still permits available
493     * after the first thread's request has been satisfied, then those permits
494     * are assigned to the next waiting thread. If it is satisfied then it is
495     * re-enabled for thread scheduling purposes. This continues until there
496     * are insufficient permits to satisfy the next waiting thread, or there
497     * are no more waiting threads.
498     *
499     * <p>There is no requirement that a thread that releases a permit must
500     * have acquired that permit by calling {@link Semaphore#acquire acquire}.
501     * Correct usage of a semaphore is established by programming convention
502     * in the application.
503     *
504     * @param permits the number of permits to release
505     * @throws IllegalArgumentException if permits less than zero.
506     */
507     public void release(long permits) {
508     if (permits < 0) throw new IllegalArgumentException();
509 dl 1.6 if (permits == 0)
510     return;
511     // Even when using fair locks, releases should try to barge in.
512 dl 1.5 if (!lock.tryLock())
513     lock.lock();
514 dl 1.1 try {
515 dl 1.6 long p = permits;
516     while (p > 0) {
517     Node node = head;
518     if (node == null) {
519     count += p;
520     p = 0;
521     }
522     else {
523     node.lock();
524     try {
525     if (node.cancelled)
526     removeFirst();
527     else if (node.permitsNeeded > p) {
528     node.permitsNeeded -= p;
529     p = 0;
530     }
531     else {
532     p -= node.permitsNeeded;
533     node.permitsNeeded = 0;
534     node.done.signal();
535     removeFirst();
536     }
537     }
538     finally {
539     node.unlock();
540     }
541     }
542     }
543 dl 1.1 }
544     finally {
545     lock.unlock();
546     }
547     }
548     }