ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/SynchronousQueue.java (file contents):
Revision 1.30 by dl, Mon Dec 29 19:05:22 2003 UTC vs.
Revision 1.31 by dl, Tue Dec 30 23:55:43 2003 UTC

# Line 62 | Line 62 | public class SynchronousQueue<E> extends
62        the entire put or take sequence.
63      */
64  
65    /**
66     * Special marker used in queue nodes to indicate that
67     * the thread waiting for a change in the node has timed out
68     * or been interrupted.
69     **/
70    private static final Object CANCELLED = new Object();
65  
66      /*
67       * Note that all fields are transient final, so there is
# Line 80 | Line 74 | public class SynchronousQueue<E> extends
74  
75      /**
76       * Nodes each maintain an item and handle waits and signals for
77 <     * getting and setting it. The class opportunistically extends
78 <     * ReentrantLock to save an extra object allocation per
79 <     * rendezvous.
77 >     * getting and setting it. The class extends
78 >     * AbstractQueuedSynchronizer to manage blocking, using AQS state
79 >     *  0 for waiting, 1 for ack, -1 for cancelled.
80       */
81 <    private static class Node extends ReentrantLock {
88 <        /** Condition to wait on for other party; lazily constructed */
89 <        Condition done;
81 >    private static final class Node extends AbstractQueuedSynchronizer {
82          /** The item being transferred */
83          Object item;
84          /** Next node in wait queue */
85          Node next;
94
86          Node(Object x) { item = x; }
87  
88 +        private static final int WAITING   =  0;
89 +        private static final int ACKED     =  1;
90 +        private static final int CANCELLED = -1;
91 +
92 +        /**
93 +         * Implements AQS base acquire to succeed if not in WAITING state
94 +         */
95 +        public int acquireExclusiveState(boolean b, int ignore) {
96 +            return get() == WAITING ? -1 : 0;
97 +        }
98 +
99 +        /**
100 +         * Implements AQS base release to always signal.
101 +         * Status is changed in ack or cancel methods before calling,
102 +         * which is needed to ensure we win cancel race.
103 +         */
104 +        public boolean releaseExclusiveState(int ignore) {
105 +            return true;
106 +        }
107 +
108 +        /**
109 +         * Try to acknowledge; fail if not waiting
110 +         */
111 +        private boolean ack() {
112 +            if (!compareAndSet(WAITING, ACKED))
113 +                return false;
114 +            releaseExclusive(0);
115 +            return true;
116 +        }
117 +
118 +        /**
119 +         * Try to cancel; fail if not waiting
120 +         */
121 +        private boolean cancel() {
122 +            if (!compareAndSet(WAITING, CANCELLED))
123 +                return false;
124 +            releaseExclusive(0);
125 +            return true;
126 +        }
127 +
128 +        /**
129 +         * Take item and null out fields (for sake of GC)
130 +         */
131 +        private Object extract() {
132 +            Object x = item;
133 +            item = null;
134 +            next = null;
135 +            return x;
136 +        }
137 +
138          /**
139           * Fill in the slot created by the taker and signal taker to
140           * continue.
141           */
142 <        boolean set(Object x) {
143 <            this.lock();
144 <            try {
104 <                if (item != CANCELLED) {
105 <                    item = x;
106 <                    if (done != null)
107 <                        done.signal();
108 <                    return true;
109 <                } else // taker has cancelled
110 <                    return false;
111 <            } finally {
112 <                this.unlock();
113 <            }
142 >        boolean setItem(Object x) {
143 >            item = x;
144 >            return ack();
145          }
146  
147          /**
148           * Remove item from slot created by putter and signal putter
149           * to continue.
150           */
151 <        Object get() {
152 <            this.lock();
151 >        Object getItem() {
152 >            if (!ack())
153 >                return null;
154 >            return extract();
155 >        }
156 >
157 >        /**
158 >         * Wait for a taker to take item placed by putter.
159 >         */
160 >        boolean waitForTake() throws InterruptedException {
161              try {
162 <                Object x = item;
163 <                if (x != CANCELLED) {
164 <                    item = null;
165 <                    next = null;
166 <                    if (done != null)
167 <                        done.signal();
168 <                    return x;
130 <                } else
131 <                    return null;
132 <            } finally {
133 <                this.unlock();
162 >                acquireExclusiveInterruptibly(0);
163 >                return true;
164 >            } catch (InterruptedException ie) {
165 >                if (cancel())
166 >                    throw ie;
167 >                Thread.currentThread().interrupt();
168 >                return true;
169              }
170          }
171  
172          /**
173           * Wait for a taker to take item placed by putter, or time out.
174           */
175 <        boolean waitForTake(boolean timed, long nanos) throws InterruptedException {
141 <            this.lock();
175 >        boolean waitForTake(long nanos) throws InterruptedException {
176              try {
177 <                for (;;) {
144 <                    if (item == null)
145 <                        return true;
146 <                    if (timed) {
147 <                        if (nanos <= 0) {
148 <                            item = CANCELLED;
149 <                            return false;
150 <                        }
151 <                    }
152 <                    if (done == null)
153 <                        done = this.newCondition();
154 <                    if (timed)
155 <                        nanos = done.awaitNanos(nanos);
156 <                    else
157 <                        done.await();
158 <                }
177 >                return acquireExclusiveTimed(0, nanos) || !cancel();
178              } catch (InterruptedException ie) {
179 <                // If taken, return normally but set interrupt status
161 <                if (item == null) {
162 <                    Thread.currentThread().interrupt();
163 <                    return true;
164 <                } else {
165 <                    item = CANCELLED;
166 <                    done.signal(); // propagate signal
179 >                if (cancel())
180                      throw ie;
181 <                }
182 <            } finally {
183 <                this.unlock();
181 >                Thread.currentThread().interrupt();
182 >                return true;
183 >            }
184 >        }
185 >
186 >        /**
187 >         * Wait for a putter to put item placed by taker.
188 >         */
189 >        Object waitForPut() throws InterruptedException {
190 >            try {
191 >                acquireExclusiveInterruptibly(0);
192 >                return extract();
193 >            } catch (InterruptedException ie) {
194 >                if (cancel())
195 >                    throw ie;
196 >                Thread.currentThread().interrupt();
197 >                return extract();
198              }
199          }
200  
201          /**
202           * Wait for a putter to put item placed by taker, or time out.
203           */
204 <        Object waitForPut(boolean timed, long nanos) throws InterruptedException {
178 <            this.lock();
204 >        Object waitForPut(long nanos) throws InterruptedException {
205              try {
206 <                for (;;) {
207 <                    Object x = item;
208 <                    if (x != null) {
183 <                        item = null;
184 <                        next = null;
185 <                        return x;
186 <                    }
187 <                    if (timed) {
188 <                        if (nanos <= 0) {
189 <                            item = CANCELLED;
190 <                            return null;
191 <                        }
192 <                    }
193 <                    if (done == null)
194 <                        done = this.newCondition();
195 <                    if (timed)
196 <                        nanos = done.awaitNanos(nanos);
197 <                    else
198 <                        done.await();
199 <                }
206 >                if (acquireExclusiveTimed(0, nanos) || !cancel())
207 >                    return extract();
208 >                return null;
209              } catch (InterruptedException ie) {
210 <                Object y = item;
202 <                if (y != null) {
203 <                    item = null;
204 <                    next = null;
205 <                    Thread.currentThread().interrupt();
206 <                    return y;
207 <                } else {
208 <                    item = CANCELLED;
209 <                    done.signal(); // propagate signal
210 >                if (cancel())
211                      throw ie;
212 <                }
213 <            } finally {
213 <                this.unlock();
212 >                Thread.currentThread().interrupt();
213 >                return extract();
214              }
215          }
216      }
# Line 257 | Line 257 | public class SynchronousQueue<E> extends
257                  qlock.unlock();
258              }
259  
260 <            if (mustWait)
261 <                return node.waitForTake(timed, nanos);
260 >            if (mustWait)
261 >                return timed? node.waitForTake(nanos) : node.waitForTake();
262  
263 <            else if (node.set(x))
263 >            else if (node.setItem(x))
264                  return true;
265  
266              // else taker cancelled, so retry
# Line 285 | Line 285 | public class SynchronousQueue<E> extends
285                  qlock.unlock();
286              }
287  
288 <            if (mustWait)
289 <                return (E)node.waitForPut(timed, nanos);
290 <
288 >            if (mustWait) {
289 >                Object x = timed? node.waitForPut(nanos) : node.waitForPut();
290 >                return (E)x;
291 >            }
292              else {
293 <                E x = (E)node.get();
293 >                Object x = node.getItem();
294                  if (x != null)
295 <                    return x;
295 >                    return (E)x;
296                  // else cancelled, so retry
297              }
298          }
# Line 383 | Line 384 | public class SynchronousQueue<E> extends
384              if (node == null)
385                  return false;
386  
387 <            else if (node.set(o))
387 >            else if (node.setItem(o))
388                  return true;
389              // else retry
390          }
# Line 410 | Line 411 | public class SynchronousQueue<E> extends
411                  return null;
412  
413              else {
414 <                Object x = node.get();
414 >                Object x = node.getItem();
415                  if (x != null)
416                      return (E)x;
417                  // else retry

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines