ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/FairSemaphore.java
Revision: 1.10
Committed: Tue Aug 26 00:09:18 2003 UTC (20 years, 9 months ago) by dholmes
Branch: MAIN
Changes since 1.9: +5 -5 lines
Log Message:
In response to Eamonn's comment that "best effort lower bound" is not
defined for waiting times, all such references have been deleted. The
preceding text makes it clear that the time must elapse before the
method will return, and trying to say anything about the maximum waiting
time is pointless. We can still say something to this effect in the package
docs if we want.

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7     package java.util.concurrent;
8 dl 1.4 import java.util.concurrent.locks.*;
9 dl 1.1
10     /**
11 dl 1.2 * A semaphore guaranteeing that threads invoking any of the {@link
12     * #acquire() acquire} methods are allocated permits in the order in
13     * which their invocation of those methods was processed
14     * (first-in-first-out; FIFO). Note that FIFO ordering necessarily
15     * applies to specific internal points of execution within these
16     * methods. So, it is possible for one thread to invoke
17     * <tt>acquire</tt> before another, but reach the ordering point after
18     * the other, and similarly upon return from the method.
19 dl 1.1 *
20 dl 1.2 * <p>Because permits are allocated in-order, this class also provides
21     * convenience methods to {@link #acquire(long) acquire} and {@link
22     * #release(long) release} multiple permits at a time.
23 dl 1.1 *
24     * @since 1.5
25     * @spec JSR-166
26 dholmes 1.10 * @revised $Date: 2003/08/25 22:35:45 $
27     * @editor $Author: dholmes $
28 dl 1.3 * @author Doug Lea
29 dl 1.1 *
30     */
31     public class FairSemaphore extends Semaphore {
32 dl 1.8 private static final long serialVersionUID = -3222578661600680210L;
33 dl 1.1
34     /*
35 dl 1.6 * Basic algorithm is a variant of the one described
36     * in CPJ book 2nd edition, section 3.7.3
37 dl 1.1 */
38    
39     /**
40 dl 1.6 * Nodes of the wait queue. Opportunistically subclasses ReentrantLock.
41     */
42     private static class Node extends ReentrantLock {
43     /** The condition to wait on */
44     Condition done = newCondition();
45     /** True if interrupted before released */
46     boolean cancelled;
47     /** Number of permits remaining until can release */
48     long permitsNeeded;
49     /** Next node of queue */
50     Node next;
51    
52     Node(long n) { permitsNeeded = n; }
53     }
54    
55     /** Head of the wait queue */
56 dl 1.8 private transient Node head;
57 dl 1.6
58     /** Pointer to last node of the wait queue */
59 dl 1.8 private transient Node last;
60 dl 1.6
61     /** Add a new node to wait queue with given number of permits needed */
62     private Node enq(long n) {
63     Node p = new Node(n);
64     if (last == null)
65     last = head = p;
66     else
67     last = last.next = p;
68     return p;
69     }
70    
71     /** Remove first node from wait queue */
72     private void removeFirst() {
73     Node p = head;
74     assert p != null;
75     if (p != null && (head = p.next) == null)
76     last = null;
77     }
78    
79     /**
80     * Construct a <tt>FairSemaphore</tt> with the given number of
81 dl 1.1 * permits.
82     * @param permits the initial number of permits available
83     */
84     public FairSemaphore(long permits) {
85 dl 1.4 super(permits, new ReentrantLock(true));
86 dl 1.1 }
87    
88     /**
89 dl 1.6 * Acquires a permit from this semaphore, blocking until one is
90     * available, or the thread is {@link Thread#interrupt interrupted}.
91     *
92     * <p>Acquires a permit, if one is available and returns immediately,
93     * reducing the number of available permits by one.
94     * <p>If no permit is available then the current thread becomes
95     * disabled for thread scheduling purposes and lies dormant until
96     * one of two things happens:
97     * <ul>
98     * <li>Some other thread invokes the {@link #release} method for this
99 dholmes 1.9 * semaphore and the current thread is next to be assigned a permit; or
100 dl 1.6 * <li>Some other thread {@link Thread#interrupt interrupts} the current
101     * thread.
102     * </ul>
103     *
104     * <p>If the current thread:
105     * <ul>
106     * <li>has its interrupted status set on entry to this method; or
107     * <li>is {@link Thread#interrupt interrupted} while waiting
108     * for a permit,
109     * </ul>
110     * then {@link InterruptedException} is thrown and the current thread's
111     * interrupted status is cleared.
112     *
113     * @throws InterruptedException if the current thread is interrupted
114     *
115     * @see Thread#interrupt
116     */
117     public void acquire() throws InterruptedException {
118     acquire(1L);
119     }
120    
121     /**
122     * Acquires a permit from this semaphore, blocking until one is
123     * available.
124     *
125     * <p>Acquires a permit, if one is available and returns immediately,
126     * reducing the number of available permits by one.
127     * <p>If no permit is available then the current thread becomes
128     * disabled for thread scheduling purposes and lies dormant until
129     * some other thread invokes the {@link #release} method for this
130 dholmes 1.9 * semaphore and the current thread is next to be assigned a permit.
131 dl 1.6 *
132     * <p>If the current thread
133     * is {@link Thread#interrupt interrupted} while waiting
134     * for a permit then it will continue to wait, but the time at which
135     * the thread is assigned a permit may change compared to the time it
136     * would have received the permit had no interruption occurred. When the
137     * thread does return from this method its interrupt status will be set.
138     *
139     */
140     public void acquireUninterruptibly() {
141     acquireUninterruptibly(1L);
142     }
143    
144     /**
145     * Acquires a permit from this semaphore, only if one is available at the
146     * time of invocation.
147     * <p>Acquires a permit, if one is available and returns immediately,
148     * with the value <tt>true</tt>,
149     * reducing the number of available permits by one.
150     *
151     * <p>If no permit is available then this method will return
152     * immediately with the value <tt>false</tt>.
153     *
154     * @return <tt>true</tt> if a permit was acquired and <tt>false</tt>
155     * otherwise.
156     */
157     public boolean tryAcquire() {
158     return tryAcquire(1L);
159     }
160    
161     /**
162     * Acquires a permit from this semaphore, if one becomes available
163     * within the given waiting time and the
164     * current thread has not been {@link Thread#interrupt interrupted}.
165     * <p>Acquires a permit, if one is available and returns immediately,
166     * with the value <tt>true</tt>,
167     * reducing the number of available permits by one.
168     * <p>If no permit is available then
169     * the current thread becomes disabled for thread scheduling
170     * purposes and lies dormant until one of three things happens:
171     * <ul>
172     * <li>Some other thread invokes the {@link #release} method for this
173 dholmes 1.9 * semaphore and the current thread is next to be assigned a permit; or
174 dl 1.6 * <li>Some other thread {@link Thread#interrupt interrupts} the current
175     * thread; or
176     * <li>The specified waiting time elapses.
177     * </ul>
178     * <p>If a permit is acquired then the value <tt>true</tt> is returned.
179     * <p>If the current thread:
180     * <ul>
181     * <li>has its interrupted status set on entry to this method; or
182     * <li>is {@link Thread#interrupt interrupted} while waiting to acquire
183     * a permit,
184     * </ul>
185     * then {@link InterruptedException} is thrown and the current thread's
186     * interrupted status is cleared.
187     * <p>If the specified waiting time elapses then the value <tt>false</tt>
188     * is returned.
189 dholmes 1.10 * If the time is less than or equal to zero, the method will not wait
190     * at all.
191 dl 1.6 *
192     * @param timeout the maximum time to wait for a permit
193     * @param unit the time unit of the <tt>timeout</tt> argument.
194     * @return <tt>true</tt> if a permit was acquired and <tt>false</tt>
195     * if the waiting time elapsed before a permit was acquired.
196     *
197     * @throws InterruptedException if the current thread is interrupted
198     *
199     * @see Thread#interrupt
200     *
201     */
202     public boolean tryAcquire(long timeout, TimeUnit unit)
203     throws InterruptedException {
204     return tryAcquire(1L, timeout, unit);
205     }
206    
207     /**
208     * Releases a permit, returning it to the semaphore.
209     * <p>Releases a permit, increasing the number of available permits
210     * by one.
211     * If any threads are blocking trying to acquire a permit, then one
212     * is selected and given the permit that was just released.
213     * That thread is re-enabled for thread scheduling purposes.
214     * <p>There is no requirement that a thread that releases a permit must
215     * have acquired that permit by calling {@link #acquire}.
216     * Correct usage of a semaphore is established by programming convention
217     * in the application.
218     */
219     public void release() {
220     release(1L);
221     }
222    
223    
224     /**
225 dl 1.1 * Acquires the given number of permits from this semaphore,
226     * blocking until all are available,
227     * or the thread is {@link Thread#interrupt interrupted}.
228     *
229     * <p>Acquires the given number of permits, if they are available,
230     * and returns immediately,
231     * reducing the number of available permits by the given amount.
232     *
233     * <p>If insufficient permits are available then the current thread becomes
234     * disabled for thread scheduling purposes and lies dormant until
235     * one of two things happens:
236     * <ul>
237     * <li>Some other thread invokes one of the {@link #release() release}
238     * methods for this semaphore, the current thread is next to be assigned
239     * permits and the number of available permits satisfies this request; or
240     * <li>Some other thread {@link Thread#interrupt interrupts} the current
241     * thread.
242     * </ul>
243     *
244     * <p>If the current thread:
245     * <ul>
246     * <li>has its interrupted status set on entry to this method; or
247     * <li>is {@link Thread#interrupt interrupted} while waiting
248     * for a permit,
249     * </ul>
250     * then {@link InterruptedException} is thrown and the current thread's
251     * interrupted status is cleared.
252 dholmes 1.9 * Any permits that were to be assigned to this thread, are instead
253     * assigned to the next waiting thread(s), as if
254 dl 1.1 * they had been made available by a call to {@link #release()}.
255     *
256     * @param permits the number of permits to acquire
257     *
258     * @throws InterruptedException if the current thread is interrupted
259     * @throws IllegalArgumentException if permits less than zero.
260     *
261     * @see Thread#interrupt
262     */
263     public void acquire(long permits) throws InterruptedException {
264     if (permits < 0) throw new IllegalArgumentException();
265 dl 1.6 Node node = null;
266 dl 1.1 lock.lockInterruptibly();
267     try {
268 dl 1.6 long remaining = count - permits;
269     if (remaining >= 0) {
270     count = remaining;
271     return;
272     }
273     count = 0;
274     node = enq(-remaining);
275 tim 1.7 } finally {
276 dl 1.1 lock.unlock();
277     }
278 dl 1.6
279     node.lock();
280     try {
281     while (node.permitsNeeded > 0)
282     node.done.await();
283 tim 1.7 } catch(InterruptedException ie) {
284 dl 1.6 if (node.permitsNeeded > 0) {
285     node.cancelled = true;
286     release(permits - node.permitsNeeded);
287     throw ie;
288 tim 1.7 } else { // ignore interrupt
289 dl 1.6 Thread.currentThread().interrupt();
290     }
291 tim 1.7 } finally {
292 dl 1.6 node.unlock();
293     }
294 dl 1.1 }
295    
296     /**
297     * Acquires the given number of permits from this semaphore,
298     * blocking until all are available.
299     *
300     * <p>Acquires the given number of permits, if they are available,
301     * and returns immediately,
302     * reducing the number of available permits by the given amount.
303     *
304     * <p>If insufficient permits are available then the current thread becomes
305     * disabled for thread scheduling purposes and lies dormant until
306     * some other thread invokes one of the {@link #release() release}
307     * methods for this semaphore, the current thread is next to be assigned
308     * permits and the number of available permits satisfies this request.
309     *
310     * <p>If the current thread
311     * is {@link Thread#interrupt interrupted} while waiting
312     * for permits then it will continue to wait and its position in the
313     * queue is not affected. When the
314     * thread does return from this method its interrupt status will be set.
315     *
316     * @param permits the number of permits to acquire
317     * @throws IllegalArgumentException if permits less than zero.
318     *
319     */
320     public void acquireUninterruptibly(long permits) {
321     if (permits < 0) throw new IllegalArgumentException();
322 dl 1.6 Node node = null;
323 dl 1.1 lock.lock();
324     try {
325 dl 1.6 long remaining = count - permits;
326     if (remaining >= 0) {
327     count = remaining;
328     return;
329     }
330     count = 0;
331     node = enq(-remaining);
332 tim 1.7 } finally {
333 dl 1.1 lock.unlock();
334     }
335 dl 1.6
336     node.lock();
337     try {
338     while (node.permitsNeeded > 0)
339     node.done.awaitUninterruptibly();
340 tim 1.7 } finally {
341 dl 1.6 node.unlock();
342     }
343 dl 1.1 }
344    
345     /**
346     * Acquires the given number of permits from this semaphore, only if
347     * all are available at the time of invocation.
348     * <p>Acquires the given number of permits, if they are available, and
349     * returns immediately, with the value <tt>true</tt>,
350     * reducing the number of available permits by the given amount.
351     *
352     * <p>If insufficient permits are available then this method will return
353     * immediately with the value <tt>false</tt> and the number of available
354     * permits is unchanged.
355     *
356     * @param permits the number of permits to acquire
357     *
358     * @return <tt>true</tt> if the permits were acquired and <tt>false</tt>
359     * otherwise.
360     * @throws IllegalArgumentException if permits less than zero.
361     */
362     public boolean tryAcquire(long permits) {
363     if (permits < 0) throw new IllegalArgumentException();
364     lock.lock();
365     try {
366 dl 1.6 long remaining = count - permits;
367     if (remaining >= 0) {
368     count = remaining;
369 dl 1.1 return true;
370     }
371     return false;
372 tim 1.7 } finally {
373 dl 1.1 lock.unlock();
374     }
375     }
376    
377     /**
378     * Acquires the given number of permits from this semaphore, if all
379     * become available within the given waiting time and the
380     * current thread has not been {@link Thread#interrupt interrupted}.
381     * <p>Acquires the given number of permits, if they are available and
382     * returns immediately, with the value <tt>true</tt>,
383     * reducing the number of available permits by the given amount.
384     * <p>If insufficient permits are available then
385     * the current thread becomes disabled for thread scheduling
386     * purposes and lies dormant until one of three things happens:
387     * <ul>
388     * <li>Some other thread invokes one of the {@link #release() release}
389     * methods for this semaphore, the current thread is next to be assigned
390     * permits and the number of available permits satisfies this request; or
391     * <li>Some other thread {@link Thread#interrupt interrupts} the current
392     * thread; or
393     * <li>The specified waiting time elapses.
394     * </ul>
395     * <p>If the permits are acquired then the value <tt>true</tt> is returned.
396     * <p>If the current thread:
397     * <ul>
398     * <li>has its interrupted status set on entry to this method; or
399     * <li>is {@link Thread#interrupt interrupted} while waiting to acquire
400     * the permits,
401     * </ul>
402     * then {@link InterruptedException} is thrown and the current thread's
403     * interrupted status is cleared.
404 dholmes 1.9 * Any permits that were to be assigned to this thread, are instead
405     * assigned to the next waiting thread(s), as if
406 dl 1.1 * they had been made available by a call to {@link #release()}.
407     *
408     * <p>If the specified waiting time elapses then the value <tt>false</tt>
409     * is returned.
410 dholmes 1.10 * If the time is
411 dl 1.1 * less than or equal to zero, the method will not wait at all.
412 dholmes 1.9 * Any permits that were to be assigned to this thread, are instead
413     * assigned to the next waiting thread(s), as if
414 dl 1.1 * they had been made available by a call to {@link #release()}.
415     *
416     * @param permits the number of permits to acquire
417     * @param timeout the maximum time to wait for the permits
418 dl 1.6 * @param unit the time unit of the <tt>timeout</tt> argument.
419 dl 1.1 * @return <tt>true</tt> if all permits were acquired and <tt>false</tt>
420     * if the waiting time elapsed before all permits were acquired.
421     *
422     * @throws InterruptedException if the current thread is interrupted
423     * @throws IllegalArgumentException if permits less than zero.
424     *
425     * @see Thread#interrupt
426     *
427     */
428 dl 1.6 public boolean tryAcquire(long permits, long timeout, TimeUnit unit)
429 dl 1.1 throws InterruptedException {
430     if (permits < 0) throw new IllegalArgumentException();
431 dl 1.6 Node node = null;
432     long nanos = unit.toNanos(timeout);
433 dl 1.1 lock.lockInterruptibly();
434     try {
435 dl 1.6 long remaining = count - permits;
436     if (remaining >= 0) {
437     count = remaining;
438     return true;
439     }
440     if (nanos <= 0)
441     return false;
442     count = 0;
443     node = enq(-remaining);
444 tim 1.7 } finally {
445 dl 1.6 lock.unlock();
446     }
447    
448     node.lock();
449     try {
450     while (node.permitsNeeded > 0) {
451     nanos = node.done.awaitNanos(nanos);
452     if (nanos <= 0) {
453     release(permits - node.permitsNeeded);
454     return false;
455 dl 1.1 }
456     }
457 dl 1.6 return true;
458 tim 1.7 } catch(InterruptedException ie) {
459 dl 1.6 if (node.permitsNeeded > 0) {
460     node.cancelled = true;
461     release(permits - node.permitsNeeded);
462     throw ie;
463 tim 1.7 } else { // ignore interrupt
464 dl 1.6 Thread.currentThread().interrupt();
465     return true;
466     }
467 tim 1.7 } finally {
468 dl 1.6 node.unlock();
469 dl 1.1 }
470     }
471    
472    
473     /**
474     * Releases the given number of permits, returning them to the semaphore.
475     * <p>Releases the given number of permits, increasing the number of
476     * available permits by that amount.
477     * If any threads are blocking trying to acquire permits, then the
478     * one that has been waiting the longest
479     * is selected and given the permits that were just released.
480     * If the number of available permits satisfies that thread's request
481     * then that thread is re-enabled for thread scheduling purposes; otherwise
482     * the thread continues to wait. If there are still permits available
483     * after the first thread's request has been satisfied, then those permits
484     * are assigned to the next waiting thread. If it is satisfied then it is
485     * re-enabled for thread scheduling purposes. This continues until there
486     * are insufficient permits to satisfy the next waiting thread, or there
487     * are no more waiting threads.
488     *
489     * <p>There is no requirement that a thread that releases a permit must
490     * have acquired that permit by calling {@link Semaphore#acquire acquire}.
491     * Correct usage of a semaphore is established by programming convention
492     * in the application.
493     *
494     * @param permits the number of permits to release
495     * @throws IllegalArgumentException if permits less than zero.
496     */
497     public void release(long permits) {
498     if (permits < 0) throw new IllegalArgumentException();
499 dl 1.6 if (permits == 0)
500     return;
501     // Even when using fair locks, releases should try to barge in.
502 dl 1.5 if (!lock.tryLock())
503     lock.lock();
504 dl 1.1 try {
505 dl 1.6 long p = permits;
506     while (p > 0) {
507     Node node = head;
508     if (node == null) {
509     count += p;
510     p = 0;
511 tim 1.7 } else {
512 dl 1.6 node.lock();
513     try {
514     if (node.cancelled)
515     removeFirst();
516     else if (node.permitsNeeded > p) {
517     node.permitsNeeded -= p;
518     p = 0;
519 tim 1.7 } else {
520 dl 1.6 p -= node.permitsNeeded;
521     node.permitsNeeded = 0;
522     node.done.signal();
523     removeFirst();
524     }
525 tim 1.7 } finally {
526 dl 1.6 node.unlock();
527     }
528     }
529     }
530 tim 1.7 } finally {
531 dl 1.1 lock.unlock();
532     }
533     }
534     }