ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Semaphore.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/Semaphore.java (file contents):
Revision 1.20 by dl, Tue Dec 23 19:38:09 2003 UTC vs.
Revision 1.21 by dl, Sat Dec 27 14:15:22 2003 UTC

# Line 118 | Line 118 | import java.util.concurrent.atomic.*;
118   *
119   */
120  
121 < public class Semaphore implements java.io.Serializable {
122 <    /*
123 <     * The underlying algorithm here is a simplified adaptation of
124 <     * that used for ReentrantLock. See the internal documentation of
125 <     * lock package classes for detailed explanation.
126 <     */
127 <
121 > public class Semaphore extends AbstractQueuedSynchronizer implements java.io.Serializable {
122      private static final long serialVersionUID = -3222578661600680210L;
129
130    /** Node status value to indicate thread has cancelled */
131    private static final int CANCELLED =  1;
132    /** Node status value to indicate successor needs unparking */
133    private static final int SIGNAL    = -1;
134    /** Node class for waiting threads */
135    private static class Node {
136        volatile int status;
137        volatile Node prev;
138        volatile Node next;
139        Thread thread;
140        Node(Thread t) { thread = t; }
141    }
142
143    /** Number of available permits held in a separate AtomicInteger */
144    private final AtomicInteger perms;
145    /**  Head of the wait queue, lazily initialized.  */
146    private transient volatile Node head;
147    /**  Tail of the wait queue, lazily initialized.  */
148    private transient volatile Node tail;
149    /** true if barging disabled */
123      private final boolean fair;
124  
125 <    // Atomic update support
153 <
154 <    private static final
155 <        AtomicReferenceFieldUpdater<Semaphore, Node> tailUpdater =
156 <        AtomicReferenceFieldUpdater.<Semaphore, Node>newUpdater
157 <        (Semaphore.class, Node.class, "tail");
158 <    private static final
159 <        AtomicReferenceFieldUpdater<Semaphore, Node> headUpdater =
160 <        AtomicReferenceFieldUpdater.<Semaphore, Node>newUpdater
161 <        (Semaphore.class,  Node.class, "head");
162 <    private static final
163 <        AtomicIntegerFieldUpdater<Node> statusUpdater =
164 <        AtomicIntegerFieldUpdater.<Node>newUpdater
165 <        (Node.class, "status");
166 <
167 <
168 <    /**
169 <     * Insert node into queue, initializing head and tail if necessary.
170 <     * @param node the node to insert
171 <     */
172 <    private void enq(Node node) {
173 <        Node t = tail;
174 <        if (t == null) {         // Must initialize first
175 <            Node h = new Node(null);
176 <            while ((t = tail) == null) {    
177 <                if (headUpdater.compareAndSet(this, null, h))
178 <                    tail = h;
179 <            }
180 <        }
125 >    // Implement abstract methods
126  
127 <        for (;;) {
128 <            node.prev = t;      // Prev field must be valid before/upon CAS
129 <            if (tailUpdater.compareAndSet(this, t, node)) {
130 <                t.next = node;  // Next field assignment lags CAS
131 <                return;
187 <            }
188 <            t = tail;
189 <        }
190 <    }
191 <
192 <    /**
193 <     * Unblock the successor of node
194 <     * @param node the node
195 <     */
196 <    private void unparkSuccessor(Node node) {
197 <        statusUpdater.compareAndSet(node, SIGNAL, 0);
198 <        Node s = node.next;
199 <        if (s == null || s.status == CANCELLED) {
200 <            s = tail;
201 <            if (s != null && s != node) {
202 <                Node p = s.prev;
203 <                while (p != null && p != node) {
204 <                    if (p.status != CANCELLED)
205 <                        s = p;
206 <                    p = p.prev;
207 <                }
208 <            }
209 <        }
210 <        if (s != null && s != node)
211 <            LockSupport.unpark(s.thread);
212 <    }
213 <
214 <
215 <    /**
216 <     * Internal version of tryAcquire returning number of remaining
217 <     * permits, which is nonnegative only if the acquire succeeded.
218 <     * @param permits requested number of permits
219 <     * @return remaining number of permits
220 <     */
221 <    private int doTryAcquire(int permits) {
222 <        final AtomicInteger perms = this.perms;
127 >    protected int acquireSharedState(boolean isQueued, int acquires,
128 >                                     Thread current) {
129 >        final AtomicInteger perms = getState();
130 >        if (!isQueued && fair && hasWaiters())
131 >            return -1;
132          for (;;) {
133              int available = perms.get();
134 <            int remaining = available - permits;
134 >            int remaining = available - acquires;
135              if (remaining < 0 ||
136                  perms.compareAndSet(available, remaining))
137                  return remaining;
138          }
139      }
140 <
141 <    /**
142 <     * Main code for untimed acquires.
234 <     * @param permits number of permits requested
235 <     * @param interrupts interrupt control: -1 for abort on interrupt,
236 <     * 0 for continue on interrupt
237 <     * @return true if lock acquired (can be false only if interruptible)
238 <     */
239 <    private boolean doAcquire(int permits, int interrupts) {
240 <        // Fast path bypasses queue
241 <        if ((!fair || head == tail) && doTryAcquire(permits) >= 0)
242 <            return true;
243 <        Thread current = Thread.currentThread();
244 <        Node node = new Node(current);
245 <        // Retry fast path before enqueuing
246 <        if (!fair && doTryAcquire(permits) >= 0)
247 <            return true;
248 <        enq(node);
249 <
140 >    
141 >    protected boolean releaseSharedState(int releases) {
142 >        final AtomicInteger perms = getState();
143          for (;;) {
144 <            Node p = node.prev;
145 <            if (p == head) {
146 <                int remaining = doTryAcquire(permits);
254 <                if (remaining >= 0) {
255 <                    p.next = null;
256 <                    node.thread = null;
257 <                    node.prev = null;
258 <                    head = node;
259 <                    // if still some permits left, wake up successor
260 <                    if (remaining > 0 && node.status < 0)
261 <                        unparkSuccessor(node);
262 <                    if (interrupts > 0) // Re-interrupt on normal exit
263 <                        current.interrupt();
264 <                    return true;
265 <                }
266 <            }
267 <            int status = p.status;
268 <            if (status == 0)
269 <                statusUpdater.compareAndSet(p, 0, SIGNAL);
270 <            else if (status == CANCELLED)
271 <                node.prev = p.prev;
272 <            else {
273 <                assert (status == SIGNAL);
274 <                LockSupport.park();
275 <                if (Thread.interrupted()) {
276 <                    if (interrupts < 0)  {  
277 <                        node.thread = null;      
278 <                        node.status = CANCELLED;
279 <                        unparkSuccessor(node);
280 <                        return false;
281 <                    }
282 <                    interrupts = 1; // set to re-interrupt on exit
283 <                }
284 <            }
144 >            int p = perms.get();
145 >            if (perms.compareAndSet(p, p + releases))
146 >                return true;
147          }
148      }
149  
150 <    /**
151 <     * Main code for timed acquires. Same as doAcquire but with
152 <     * interspersed time checks.
153 <     * @param permits number of permits requested
292 <     * @param nanos timeout in nanosecs
293 <     * @return true if lock acquired
294 <     */
295 <    private boolean doTimedAcquire(int permits, long nanos) throws InterruptedException {
296 <        if ((!fair || head == tail) && doTryAcquire(permits) >= 0)
297 <            return true;
298 <        Thread current = Thread.currentThread();
299 <        long lastTime = System.nanoTime();
300 <        Node node = new Node(current);
301 <        // Retry fast path before enqueuing
302 <        if (!fair && doTryAcquire(permits) >= 0)
303 <            return true;
304 <        enq(node);
150 >    protected int acquireExclusiveState(boolean isQueued, int acquires,
151 >                                        Thread current) {
152 >        throw new UnsupportedOperationException();
153 >    }
154  
155 <        for (;;) {
156 <            Node p = node.prev;
308 <            if (p == head) {
309 <                int remaining =  doTryAcquire(permits);
310 <                if (remaining >= 0) {
311 <                    p.next = null;
312 <                    node.thread = null;
313 <                    node.prev = null;
314 <                    head = node;
315 <                    if (remaining > 0 && node.status < 0)
316 <                        unparkSuccessor(node);
317 <                    return true;
318 <                }
319 <            }
320 <            if (nanos <= 0L) {    
321 <                node.thread = null;      
322 <                node.status = CANCELLED;
323 <                unparkSuccessor(node);
324 <                return false;
325 <            }
326 <
327 <            int status = p.status;
328 <            if (status == 0)
329 <                statusUpdater.compareAndSet(p, 0, SIGNAL);
330 <            else if (status == CANCELLED)
331 <                node.prev = p.prev;
332 <            else {                      
333 <                LockSupport.parkNanos(nanos);
334 <                if (Thread.interrupted()) {
335 <                    node.thread = null;      
336 <                    node.status = CANCELLED;
337 <                    unparkSuccessor(node);
338 <                    throw new InterruptedException();
339 <                }
340 <                long now = System.nanoTime();
341 <                nanos -= now - lastTime;
342 <                lastTime = now;
343 <            }
344 <        }
155 >    protected boolean releaseExclusiveState(int releases) {
156 >        throw new UnsupportedOperationException();
157      }
158  
159 <    /**
160 <     * Internal version of release
349 <     */
350 <    private void doRelease(int permits) {
351 <        final AtomicInteger perms = this.perms;
352 <        for (;;) {
353 <            int p = perms.get();
354 <            if (perms.compareAndSet(p, p + permits)) {
355 <                Node h = head;
356 <                if (h != null  && h.status < 0)
357 <                    unparkSuccessor(h);
358 <                return;
359 <            }
360 <        }
159 >    protected void checkConditionAccess(Thread thread, boolean waiting) {
160 >        throw new UnsupportedOperationException();
161      }
162  
163      /**
# Line 371 | Line 171 | public class Semaphore implements java.i
171       */
172      public Semaphore(int permits, boolean fair) {
173          this.fair = fair;
174 <        perms = new AtomicInteger(permits);
174 >        getState().set(permits);
175      }
176  
177      /**
# Line 404 | Line 204 | public class Semaphore implements java.i
204       * @see Thread#interrupt
205       */
206      public void acquire() throws InterruptedException {
207 <        if (Thread.interrupted() || !doAcquire(1, -1))
408 <            throw new InterruptedException();
207 >        acquireSharedInterruptibly(1);
208      }
209  
210      /**
# Line 428 | Line 227 | public class Semaphore implements java.i
227       *
228       */
229      public void acquireUninterruptibly() {
230 <        doAcquire(1, 0);
230 >        acquireSharedUninterruptibly(1);
231      }
232  
233      /**
# Line 445 | Line 244 | public class Semaphore implements java.i
244       * otherwise.
245       */
246      public boolean tryAcquire() {
247 <        return doTryAcquire(1) >= 0;
247 >        return acquireSharedState(false, 1, null) >= 0;
248      }
249  
250      /**
# Line 491 | Line 290 | public class Semaphore implements java.i
290       */
291      public boolean tryAcquire(long timeout, TimeUnit unit)
292          throws InterruptedException {
293 <        if (unit == null)
495 <            throw new NullPointerException();
496 <        if (Thread.interrupted())
497 <            throw new InterruptedException();
498 <        return doTimedAcquire(1, unit.toNanos(timeout));
293 >        return acquireSharedTimed(1, unit.toNanos(timeout));
294      }
295  
296      /**
# Line 511 | Line 306 | public class Semaphore implements java.i
306       * in the application.
307       */
308      public void release() {
309 <        doRelease(1);
309 >        releaseShared(1);
310      }
311        
312      /**
# Line 555 | Line 350 | public class Semaphore implements java.i
350       */
351      public void acquire(int permits) throws InterruptedException {
352          if (permits < 0) throw new IllegalArgumentException();
353 <        if (Thread.interrupted() || !doAcquire(permits, -1))
559 <            throw new InterruptedException();
353 >        acquireSharedInterruptibly(permits);
354      }
355  
356      /**
# Line 585 | Line 379 | public class Semaphore implements java.i
379       */
380      public void acquireUninterruptibly(int permits) {
381          if (permits < 0) throw new IllegalArgumentException();
382 <        doAcquire(permits, 0);
382 >        acquireSharedUninterruptibly(permits);
383      }
384  
385      /**
# Line 607 | Line 401 | public class Semaphore implements java.i
401       */
402      public boolean tryAcquire(int permits) {
403          if (permits < 0) throw new IllegalArgumentException();
404 <        return doTryAcquire(permits) >= 0;
404 >        return acquireSharedState(false, permits, null) >= 0;
405      }
406  
407      /**
# Line 664 | Line 458 | public class Semaphore implements java.i
458      public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
459          throws InterruptedException {
460          if (permits < 0) throw new IllegalArgumentException();
461 <        if (unit == null)
668 <            throw new NullPointerException();
669 <        if (Thread.interrupted())
670 <            throw new InterruptedException();
671 <        return doTimedAcquire(permits, unit.toNanos(timeout));
461 >        return acquireSharedTimed(permits, unit.toNanos(timeout));
462      }
463  
464  
# Line 698 | Line 488 | public class Semaphore implements java.i
488       */
489      public void release(int permits) {
490          if (permits < 0) throw new IllegalArgumentException();
491 <        doRelease(permits);
491 >        releaseShared(permits);
492      }
493  
494      /**
# Line 707 | Line 497 | public class Semaphore implements java.i
497       * @return the number of permits available in this semaphore.
498       */
499      public int availablePermits() {
500 <        return perms.get();
500 >        return getState().get();
501      }
502  
503      /**
# Line 722 | Line 512 | public class Semaphore implements java.i
512       */
513      protected void reducePermits(int reduction) {
514          if (reduction < 0) throw new IllegalArgumentException();
515 <        perms.getAndAdd(-reduction);
515 >        getState().getAndAdd(-reduction);
516      }
517  
518      /**
# Line 733 | Line 523 | public class Semaphore implements java.i
523          return fair;
524      }
525  
736    /**
737     * Returns an estimate of the number of threads waiting to acquire
738     * a permit. The value is only an estimate because the number of
739     * threads may change dynamically while this method traverses
740     * internal data structures.  This method is designed for use in
741     * monitoring of the system state, not for synchronization
742     * control.
743     * @return the estimated number of threads waiting for a permit
744     */
745    public int getQueueLength() {
746        int n = 0;
747        for (Node p = tail; p != null && p != head; p = p.prev)
748            ++n;
749        return n;
750    }
751
752    /**
753     * Returns a collection containing threads that may be waiting to
754     * acquire a permit.  Because the actual set of threads may
755     * change dynamically while constructing this result, the returned
756     * collection is only a best-effort estimate.  The elements of the
757     * returned collection are in no particular order.  This method is
758     * designed to facilitate construction of subclasses that provide
759     * more extensive monitoring facilities.
760     * @return the collection of threads
761     */
762    protected Collection<Thread> getQueuedThreads() {
763        ArrayList<Thread> list = new ArrayList<Thread>();
764        for (Node p = tail; p != null; p = p.prev) {
765            Thread t = p.thread;
766            if (t != null)
767                list.add(t);
768        }
769        return list;
770    }
771
526   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines