ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/FutureTask.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/FutureTask.java (file contents):
Revision 1.23 by dl, Sat Dec 27 19:26:26 2003 UTC vs.
Revision 1.24 by dl, Fri Jan 2 00:38:33 2004 UTC

# Line 7 | Line 7
7   package java.util.concurrent;
8   import java.util.concurrent.locks.*;
9  
10
10   /**
11   * A cancellable asynchronous computation.  This class provides a base
12   * implementation of {@link Future}, with methods to start and cancel
# Line 32 | Line 31 | import java.util.concurrent.locks.*;
31   * @param <V> The result type returned by this FutureTask's <tt>get</tt> method
32   */
33   public class FutureTask<V> implements Future<V>, Runnable {
34 <    /**
35 <     * Special value for "runState" indicating task is cancelled
37 <     */
38 <    private static final Object CANCELLED = new Object();
39 <
40 <    /**
41 <     * Special value for "runState" indicating task is completed
42 <     */
43 <    private static final Object DONE = new Object();
44 <
45 <    /**
46 <     * Holds the run-state, taking on values:
47 <     *   null              = not yet started,
48 <     *   [some thread ref] = running,
49 <     *   DONE              = completed normally,
50 <     *   CANCELLED         = cancelled (may or may not have ever run).
51 <     */
52 <    private volatile Object runState;
53 <
54 <    /** The underlying callable */
55 <    private final Callable<V> callable;
56 <    /** The result to return from get() */
57 <    private V result;
58 <    /** The exception to throw from get() */
59 <    private Throwable exception;
60 <
61 <    private final ReentrantLock lock = new ReentrantLock();
62 <    private final Condition accessible = lock.newCondition();
34 >    /** Synchronization control for FutureTask */
35 >    private final Sync sync;
36  
37      /**
38       * Constructs a <tt>FutureTask</tt> that will upon running, execute the
# Line 71 | Line 44 | public class FutureTask<V> implements Fu
44      public FutureTask(Callable<V> callable) {
45          if (callable == null)
46              throw new NullPointerException();
47 <        this.callable = callable;
47 >        sync = new Sync(callable);
48      }
49  
50      /**
# Line 87 | Line 60 | public class FutureTask<V> implements Fu
60       * @throws NullPointerException if runnable is null
61       */
62      public FutureTask(Runnable runnable, V result) {
63 <        this.callable = Executors.callable(runnable, result);
63 >        sync = new Sync(Executors.callable(runnable, result));
64      }
65  
66      public boolean isCancelled() {
67 <        return runState == CANCELLED;
67 >        return sync.doIsCancelled();
68      }
69      
70      public boolean isDone() {
71 <        Object r = runState;
99 <        return r == DONE || r == CANCELLED;
71 >        return sync.doIsDone();
72      }
73  
74      public boolean cancel(boolean mayInterruptIfRunning) {
75 <        final ReentrantLock lock = this.lock;
104 <        Thread interruptThread = null;
105 <        lock.lock();
106 <        try {
107 <            Object r = runState;
108 <            if (r == DONE || r == CANCELLED)
109 <                return false;
110 <            if (mayInterruptIfRunning && r != null && r instanceof Thread)
111 <                interruptThread = (Thread)r;
112 <            accessible.signalAll();
113 <            runState = CANCELLED;
114 <        }
115 <        finally{
116 <            lock.unlock();
117 <        }
118 <        if (interruptThread != null)
119 <            interruptThread.interrupt();
120 <        done();
121 <        return true;
75 >        return sync.doCancel(mayInterruptIfRunning);
76      }
77      
78      /**
# Line 134 | Line 88 | public class FutureTask<V> implements Fu
88       * while waiting
89       */
90      public V get() throws InterruptedException, ExecutionException {
91 <        final ReentrantLock lock = this.lock;
138 <        lock.lock();
139 <        try {
140 <            for (;;) {
141 <                Object r = runState;
142 <                if (r == CANCELLED)
143 <                    throw new CancellationException();
144 <                if (r == DONE) {
145 <                    if (exception != null)
146 <                        throw new ExecutionException(exception);
147 <                    else
148 <                        return result;
149 <                }
150 <                accessible.await();
151 <            }
152 <        } finally {
153 <            lock.unlock();
154 <        }
91 >        return sync.doGet();
92      }
93  
94      /**
# Line 171 | Line 108 | public class FutureTask<V> implements Fu
108       */
109      public V get(long timeout, TimeUnit unit)
110          throws InterruptedException, ExecutionException, TimeoutException {
111 <        long nanos = unit.toNanos(timeout);
175 <        final ReentrantLock lock = this.lock;
176 <        lock.lock();
177 <        try {
178 <            for (;;) {
179 <                Object r = runState;
180 <                if (r == CANCELLED)
181 <                    throw new CancellationException();
182 <                if (r == DONE) {
183 <                    if (exception != null)
184 <                        throw new ExecutionException(exception);
185 <                    else
186 <                        return result;
187 <                }
188 <                if (nanos <= 0)
189 <                    throw new TimeoutException();
190 <                nanos = accessible.awaitNanos(nanos);
191 <            }
192 <        } finally {
193 <            lock.unlock();
194 <        }
111 >        return sync.doGet(unit.toNanos(timeout));
112      }
113  
114      /**
# Line 207 | Line 124 | public class FutureTask<V> implements Fu
124  
125      /**
126       * Sets the result of this Future to the given value unless
127 <     * this future has been cancelled
127 >     * this future has already been set or has been cancelled
128       * @param v the value
129       */
130      protected void set(V v) {
131 <        final ReentrantLock lock = this.lock;
215 <        lock.lock();
216 <        try {
217 <            if (runState == CANCELLED)
218 <                return;
219 <            result = v;
220 <            accessible.signalAll();
221 <            runState = DONE;
222 <        } finally {
223 <            lock.unlock();
224 <        }
225 <        done();
131 >        sync.doSet(v);
132      }
133  
134      /**
135       * Causes this future to report an <tt>ExecutionException</tt>
136       * with the given throwable as its cause, unless this Future has
137 <     * been cancelled.
137 >     * already been set or has been cancelled.
138       * @param t the cause of failure.
139       */
140      protected void setException(Throwable t) {
141 <        final ReentrantLock lock = this.lock;
236 <        lock.lock();
237 <        try {
238 <            if (runState == CANCELLED)
239 <                return;
240 <            exception = t;
241 <            accessible.signalAll();
242 <            runState = DONE;
243 <        } finally {
244 <            lock.unlock();
245 <        }
246 <        done();
141 >        sync.doSetException(t);
142      }
143      
144      /**
145 <     * Attempts to set the state of this task to Running, succeeding
146 <     * only if the state is currently NOT Done, Running, or Cancelled.
147 <     * @return true if successful
148 <     */
149 <   private boolean setRunning() {
255 <        final ReentrantLock lock = this.lock;
256 <        lock.lock();
257 <        try {
258 <            if (runState != null)
259 <                return false;
260 <            runState = Thread.currentThread();
261 <            return true;
262 <        }
263 <        finally {
264 <            lock.unlock();
265 <        }
145 >     * Sets this Future to the result of computation unless
146 >     * it has been cancelled.
147 >     */
148 >    public void run() {
149 >        sync.doRun();
150      }
151  
152      /**
153 <     * Resets the run state of this task to its initial state unless
154 <     * it has been cancelled. (Note that a cancelled task cannot be
155 <     * reset.)
156 <     * @return true if successful
153 >     * Executes the computation without setting result, and then
154 >     * resets this Future to initial state; failing to do so if the
155 >     * computation encounters an exception or is cancelled.  This is
156 >     * designed for use with tasks that intrinsically execute more
157 >     * than once.
158 >     * @return true if successfully run and reset
159       */
160 <    private boolean reset() {
161 <        final ReentrantLock lock = this.lock;
276 <        lock.lock();
277 <        try {
278 <            if (runState == CANCELLED)
279 <                return false;
280 <            runState = null;
281 <            return true;
282 <        }
283 <        finally {
284 <            lock.unlock();
285 <        }
160 >    protected boolean runAndReset() {
161 >        return sync.doRunAndReset();
162      }
163  
164      /**
165 <     * Sets this Future to the result of computation unless
166 <     * it has been cancelled.
165 >     * Synchronization control for FutureTask. Note that this must be
166 >     * a non-static inner class in order to invoke protected
167 >     * <tt>done</tt> method. For clarity, all inner class support
168 >     * methods are same as outer, prefixed with "do".
169 >     *
170 >     * Uses AQS sync state to represent run status
171       */
172 <    public void run() {
173 <        if (setRunning()) {
172 >    private final class Sync extends AbstractQueuedSynchronizer {
173 >        /** State value representing that task is running */
174 >        private static final int RUNNING   = 1;
175 >        /** State value representing that task ran */
176 >        private static final int RAN       = 2;
177 >        /** State value representing that task was cancelled */
178 >        private static final int CANCELLED = 4;
179 >
180 >        /** The underlying callable */
181 >        private final Callable<V> callable;
182 >        /** The result to return from get() */
183 >        private V result;
184 >        /** The exception to throw from get() */
185 >        private Throwable exception;
186 >
187 >        /**
188 >         * The thread running task. When nulled after set/cancel, this
189 >         * indicates that the results are accessible.  Must be
190 >         * volatile, to serve as write barrier on completion.
191 >         */
192 >        private volatile Thread runner;
193 >
194 >        Sync(Callable<V> callable) {
195 >            this.callable = callable;
196 >        }
197 >
198 >        private boolean ranOrCancelled(int state) {
199 >            return (state & (RAN | CANCELLED)) != 0;
200 >        }
201 >
202 >        /**
203 >         * Implements AQS base acquire to succeed if Done/cancelled
204 >         */
205 >        protected int tryAcquireSharedState(boolean b, int ignore) {
206 >            return doIsDone()? 1 : -1;
207 >        }
208 >
209 >        /**
210 >         * Implements AQS base release to always signal after setting
211 >         * final done status by nulling runner thread.
212 >         */
213 >        protected boolean releaseSharedState(int ignore) {
214 >            runner = null;
215 >            return true;
216 >        }
217 >
218 >        boolean doIsCancelled() {
219 >            return getState() == CANCELLED;
220 >        }
221 >        
222 >        boolean doIsDone() {
223 >            return ranOrCancelled(getState()) && runner == null;
224 >        }
225 >
226 >        V doGet() throws InterruptedException, ExecutionException {
227 >            acquireSharedInterruptibly(0);
228 >            if (getState() == CANCELLED)
229 >                throw new CancellationException();
230 >            if (exception != null)
231 >                throw new ExecutionException(exception);
232 >            return result;
233 >        }
234 >
235 >        V doGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
236 >            if (!acquireSharedTimed(0, nanosTimeout))
237 >                throw new TimeoutException();                
238 >            if (getState() == CANCELLED)
239 >                throw new CancellationException();
240 >            if (exception != null)
241 >                throw new ExecutionException(exception);
242 >            return result;
243 >        }
244 >
245 >        void doSet(V v) {
246 >            int s = getState();
247 >            if (ranOrCancelled(s) || !compareAndSetState(s, RAN))
248 >                return;
249 >            result = v;
250 >            releaseShared(0);
251 >            done();
252 >        }
253 >
254 >        void doSetException(Throwable t) {
255 >            int s = getState();
256 >            if (ranOrCancelled(s) || !compareAndSetState(s, RAN))
257 >                return;
258 >            exception = t;
259 >            result = null;
260 >            releaseShared(0);
261 >            done();
262 >        }
263 >
264 >        boolean doCancel(boolean mayInterruptIfRunning) {
265 >            int s = getState();
266 >            if (ranOrCancelled(s) || !compareAndSetState(s, CANCELLED))
267 >                return false;
268 >            if (mayInterruptIfRunning) {
269 >                Thread r = runner;
270 >                if (r != null)
271 >                    r.interrupt();
272 >            }
273 >            releaseShared(0);
274 >            done();
275 >            return true;
276 >        }
277 >
278 >        void doRun() {
279 >            if (!compareAndSetState(0, RUNNING))
280 >                return;
281              try {
282 <                set(callable.call());
282 >                runner = Thread.currentThread();
283 >                doSet(callable.call());
284              } catch(Throwable ex) {
285 <                setException(ex);
286 <            }
285 >                doSetException(ex);
286 >            }
287          }
300    }
288  
289 <    /**
290 <     * Executes the computation and then resets this Future to initial
291 <     * state; failing to do so if the computation encounters an
305 <     * exception or is cancelled.  This is designed for use with tasks
306 <     * that intrinsically execute more than once.
307 <     * @return true if successfully run and reset
308 <     */
309 <    protected boolean runAndReset() {
310 <        if (setRunning()) {
289 >        boolean doRunAndReset() {
290 >            if (!compareAndSetState(0, RUNNING))
291 >                return false;
292              try {
293 <                // don't bother to set result; it can't be accessed
294 <                callable.call();
295 <                return reset();
293 >                runner = Thread.currentThread();
294 >                callable.call(); // don't set result
295 >                runner = null;
296 >                return compareAndSetState(RUNNING, 0);
297              } catch(Throwable ex) {
298 <                setException(ex);
298 >                doSetException(ex);
299 >                return false;
300              }
301          }
319        return false;
302      }
321
303   }
323

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines