ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/FutureTask.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/FutureTask.java (file contents):
Revision 1.61 by jsr166, Thu Jun 9 22:11:39 2011 UTC vs.
Revision 1.62 by dl, Fri Jun 17 14:43:54 2011 UTC

# Line 5 | Line 5
5   */
6  
7   package java.util.concurrent;
8 < import java.util.concurrent.locks.*;
8 > import java.util.concurrent.locks.LockSupport;
9  
10   /**
11   * A cancellable asynchronous computation.  This class provides a base
# Line 31 | Line 31 | import java.util.concurrent.locks.*;
31   * @param <V> The result type returned by this FutureTask's <tt>get</tt> method
32   */
33   public class FutureTask<V> implements RunnableFuture<V> {
34 <    /** Synchronization control for FutureTask */
35 <    private final Sync sync;
34 >    /*
35 >     * Revision notes: This differs from previous versions of this
36 >     * class that relied on AbstractQueuedSynchronizer, mainly to
37 >     * avoid surprising users about retaining interrupt status during
38 >     * cancellation races. Sync control in the current design relies
39 >     * on a "state" field updated via CAS to track completion, along
40 >     * with a simple Treiber stack to hold waiting threads.
41 >     *
42 >     * Style note: As usual, we bypass overhead of using
43 >     * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
44 >     */
45 >
46 >    /**
47 >     * The run state of this task, initially 0.  The run state
48 >     * transitions to NORMAL, EXCEPTIONAL, or CANCELLED (only) in
49 >     * method setCompletion. During setCompletion, state may take on
50 >     * transient values of COMPLETING (while outcome is being set) or
51 >     * INTERRUPTING (while interrupting the runner). State values
52 >     * are ordered and set to powers of two to simplify checks.
53 >     */
54 >    private volatile int state;
55 >    private static final int COMPLETING   = 0x01;
56 >    private static final int INTERRUPTING = 0x02;
57 >    private static final int NORMAL       = 0x04;
58 >    private static final int EXCEPTIONAL  = 0x08;
59 >    private static final int CANCELLED    = 0x10;
60 >
61 >    /** The result to return or exception to throw from get() */
62 >    private Object outcome; // non-volatile, protected by state reads/writes
63 >    /** The thread running the callable; CASed during run() */
64 >    private volatile Thread runner;
65 >    /** The underlying callable */
66 >    private final Callable<V> callable;
67 >    /** Treiber stack of waiting threads */
68 >    private volatile WaitNode waiters;
69 >
70 >    /**
71 >     * Sets completion status, unless already completed.  If
72 >     * necessary, we first set state to COMPLETING or INTERRUPTING to
73 >     * establish precedence. This intentionally stalls (just via
74 >     * yields) in (uncommon) cases of concurrent calls during
75 >     * cancellation until state is set, to avoid surprising users
76 >     * during cancellation races.
77 >     *
78 >     * @param x the outcome
79 >     * @param mode the completion state value
80 >     * @return true if this call caused transtion from 0 to completed
81 >     */
82 >    private boolean setCompletion(Object x, int mode) {
83 >        Thread r = runner;
84 >        if (r == Thread.currentThread()) // null out runner on completion
85 >            UNSAFE.putObject(this, runnerOffset, r = null); // nonvolatile OK
86 >        int next = ((mode == INTERRUPTING) ? // set up transient states
87 >                    (r != null) ? INTERRUPTING : CANCELLED :
88 >                    (x != null) ? COMPLETING : mode);
89 >        for (int s;;) {
90 >            if ((s = state) == 0) {
91 >                if (UNSAFE.compareAndSwapInt(this, stateOffset, 0, next)) {
92 >                    if (next == INTERRUPTING) {
93 >                        Thread t = runner; // recheck
94 >                        if (t != null)
95 >                            t.interrupt();
96 >                        state = CANCELLED;
97 >                    }
98 >                    else if (next == COMPLETING) {
99 >                        outcome = x;
100 >                        state = mode;
101 >                    }
102 >                    if (waiters != null)
103 >                        releaseAll();
104 >                    done();
105 >                    return true;
106 >                }
107 >            }
108 >            else if (s == INTERRUPTING)
109 >                Thread.yield(); // wait out cancellation
110 >            else
111 >                return false;
112 >        }
113 >    }
114 >
115 >    /**
116 >     * Returns result or throws exception for completed task
117 >     * @param s completed state value
118 >     */
119 >    private V report(int s) throws ExecutionException {
120 >        Object x = outcome;
121 >        if (s == NORMAL)
122 >            return (V)x;
123 >        if ((s & (CANCELLED | INTERRUPTING)) != 0)
124 >            throw new CancellationException();
125 >        throw new ExecutionException((Throwable)x);
126 >    }
127  
128      /**
129       * Creates a <tt>FutureTask</tt> that will, upon running, execute the
# Line 44 | Line 135 | public class FutureTask<V> implements Ru
135      public FutureTask(Callable<V> callable) {
136          if (callable == null)
137              throw new NullPointerException();
138 <        sync = new Sync(callable);
138 >        this.callable = callable;
139      }
140  
141      /**
# Line 60 | Line 151 | public class FutureTask<V> implements Ru
151       * @throws NullPointerException if runnable is null
152       */
153      public FutureTask(Runnable runnable, V result) {
154 <        sync = new Sync(Executors.callable(runnable, result));
154 >        this.callable = Executors.callable(runnable, result);
155      }
156  
157      public boolean isCancelled() {
158 <        return sync.innerIsCancelled();
158 >        return (state & (CANCELLED | INTERRUPTING)) != 0;
159      }
160  
161      public boolean isDone() {
162 <        return sync.innerIsDone();
162 >        return state != 0;
163      }
164  
165      public boolean cancel(boolean mayInterruptIfRunning) {
166 <        return sync.innerCancel(mayInterruptIfRunning);
166 >        return state == 0 &&
167 >            setCompletion(null, mayInterruptIfRunning ?
168 >                          INTERRUPTING : CANCELLED);
169      }
170  
171      /**
172       * @throws CancellationException {@inheritDoc}
173       */
174      public V get() throws InterruptedException, ExecutionException {
175 <        return sync.innerGet();
175 >        int s;
176 >        return report((s = state) > COMPLETING ? s : awaitDone(false, 0L));
177      }
178  
179      /**
# Line 87 | Line 181 | public class FutureTask<V> implements Ru
181       */
182      public V get(long timeout, TimeUnit unit)
183          throws InterruptedException, ExecutionException, TimeoutException {
184 <        return sync.innerGet(unit.toNanos(timeout));
184 >        int s;
185 >        long nanos = unit.toNanos(timeout);
186 >        if ((s = state) <= COMPLETING &&
187 >            (s = awaitDone(true, nanos)) <= COMPLETING)
188 >            throw new TimeoutException();
189 >        return report(s);
190      }
191  
192      /**
# Line 109 | Line 208 | public class FutureTask<V> implements Ru
208       * @param v the value
209       */
210      protected void set(V v) {
211 <        sync.innerSet(v);
211 >        setCompletion(v, NORMAL);
212      }
213  
214      /**
# Line 121 | Line 220 | public class FutureTask<V> implements Ru
220       * @param t the cause of failure
221       */
222      protected void setException(Throwable t) {
223 <        sync.innerSetException(t);
223 >        setCompletion(t, EXCEPTIONAL);
224      }
225  
226      public void run() {
227 <        sync.innerRun();
227 >        Thread r = Thread.currentThread();
228 >        if (state == 0 &&
229 >            UNSAFE.compareAndSwapObject(this, runnerOffset, null, r)) {
230 >            V result;
231 >            try {
232 >                result = callable.call();
233 >            } catch (Throwable ex) {
234 >                setException(ex);
235 >                return;
236 >            }
237 >            set(result);
238 >        }
239      }
240  
241      /**
# Line 137 | Line 247 | public class FutureTask<V> implements Ru
247       * @return true if successfully run and reset
248       */
249      protected boolean runAndReset() {
250 <        return sync.innerRunAndReset();
250 >        Thread r = Thread.currentThread();
251 >        if (state != 0 ||
252 >            !UNSAFE.compareAndSwapObject(this, runnerOffset, null, r))
253 >            return false;
254 >        try {
255 >            callable.call(); // don't set result
256 >        } catch (Throwable ex) {
257 >            setException(ex);
258 >            return false;
259 >        }
260 >        runner = null;
261 >        for (;;) {
262 >            int s = state;
263 >            if (s == 0)
264 >                return true;
265 >            if (s != INTERRUPTING)
266 >                return false;
267 >            Thread.yield(); // wait out racing cancellation
268 >        }
269      }
270  
271      /**
272 <     * Synchronization control for FutureTask. Note that this must be
273 <     * a non-static inner class in order to invoke the protected
274 <     * <tt>done</tt> method. For clarity, all inner class support
147 <     * methods are same as outer, prefixed with "inner".
148 <     *
149 <     * Uses AQS sync state to represent run status.
272 >     * Simple linked list nodes to record waiting threads in a Treiber
273 >     * stack. See other classes such as Phaser and SynchronousQueue
274 >     * for more detailed explanation.
275       */
276 <    private final class Sync extends AbstractQueuedSynchronizer {
277 <        private static final long serialVersionUID = -7828117401763700385L;
278 <
279 <        /** State value representing that task is ready to run */
155 <        private static final int READY     = 0;
156 <        /** State value representing that task is running */
157 <        private static final int RUNNING   = 1;
158 <        /** State value representing that task ran */
159 <        private static final int RAN       = 2;
160 <        /** State value representing that task was cancelled */
161 <        private static final int CANCELLED = 4;
162 <
163 <        /** The underlying callable */
164 <        private final Callable<V> callable;
165 <        /** The result to return from get() */
166 <        private V result;
167 <        /** The exception to throw from get() */
168 <        private Throwable exception;
169 <
170 <        /**
171 <         * The thread running task. When nulled after set/cancel, this
172 <         * indicates that the results are accessible.  Must be
173 <         * volatile, to ensure visibility upon completion.
174 <         */
175 <        private volatile Thread runner;
176 <
177 <        Sync(Callable<V> callable) {
178 <            this.callable = callable;
179 <        }
180 <
181 <        private boolean ranOrCancelled(int state) {
182 <            return (state & (RAN | CANCELLED)) != 0;
183 <        }
184 <
185 <        /**
186 <         * Implements AQS base acquire to succeed if ran or cancelled
187 <         */
188 <        protected int tryAcquireShared(int ignore) {
189 <            return innerIsDone() ? 1 : -1;
190 <        }
191 <
192 <        /**
193 <         * Implements AQS base release to always signal after setting
194 <         * final done status by nulling runner thread.
195 <         */
196 <        protected boolean tryReleaseShared(int ignore) {
197 <            runner = null;
198 <            return true;
199 <        }
200 <
201 <        boolean innerIsCancelled() {
202 <            return getState() == CANCELLED;
203 <        }
204 <
205 <        boolean innerIsDone() {
206 <            return ranOrCancelled(getState()) && runner == null;
207 <        }
208 <
209 <        V innerGet() throws InterruptedException, ExecutionException {
210 <            acquireSharedInterruptibly(0);
211 <            if (getState() == CANCELLED)
212 <                throw new CancellationException();
213 <            if (exception != null)
214 <                throw new ExecutionException(exception);
215 <            return result;
216 <        }
217 <
218 <        V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
219 <            if (!tryAcquireSharedNanos(0, nanosTimeout))
220 <                throw new TimeoutException();
221 <            if (getState() == CANCELLED)
222 <                throw new CancellationException();
223 <            if (exception != null)
224 <                throw new ExecutionException(exception);
225 <            return result;
226 <        }
276 >    static final class WaitNode {
277 >        volatile Thread thread;
278 >        WaitNode next;
279 >    }
280  
281 <        void innerSet(V v) {
282 <            for (;;) {
283 <                int s = getState();
284 <                if (s == RAN)
285 <                    return;
286 <                if (s == CANCELLED) {
287 <                    // aggressively release to set runner to null,
288 <                    // in case we are racing with a cancel request
289 <                    // that will try to interrupt runner
290 <                    releaseShared(0);
291 <                    return;
292 <                }
293 <                if (compareAndSetState(s, RAN)) {
294 <                    result = v;
295 <                    releaseShared(0);
296 <                    done();
297 <                    return;
281 >    /**
282 >     * Removes and signals all waiting threads
283 >     */
284 >    private void releaseAll() {
285 >        WaitNode q;
286 >        while ((q = waiters) != null) {
287 >            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
288 >                for (;;) {
289 >                    Thread t = q.thread;
290 >                    if (t != null) {
291 >                        q.thread = null;
292 >                        LockSupport.unpark(t);
293 >                    }
294 >                    WaitNode next = q.next;
295 >                    if (next == null)
296 >                        return;
297 >                    q.next = null; // unlink to help gc
298 >                    q = next;
299                  }
300              }
301          }
302 +    }
303  
304 <        void innerSetException(Throwable t) {
305 <            for (;;) {
306 <                int s = getState();
307 <                if (s == RAN)
308 <                    return;
309 <                if (s == CANCELLED) {
310 <                    // aggressively release to set runner to null,
311 <                    // in case we are racing with a cancel request
312 <                    // that will try to interrupt runner
313 <                    releaseShared(0);
314 <                    return;
315 <                }
316 <                if (compareAndSetState(s, RAN)) {
317 <                    exception = t;
318 <                    releaseShared(0);
264 <                    done();
265 <                    return;
266 <                }
304 >    /**
305 >     * Awaits completion or aborts on interrupt of timeout
306 >     * @param timed true if use timed waits
307 >     * @param nanos time to wait if timed
308 >     * @return state upon completion
309 >     */
310 >    private int awaitDone(boolean timed, long nanos)
311 >        throws InterruptedException {
312 >        long last = timed? System.nanoTime() : 0L;
313 >        WaitNode q = null;
314 >        boolean queued = false;
315 >        for (int s;;) {
316 >            if (Thread.interrupted()) {
317 >                removeWaiter(q);
318 >                throw new InterruptedException();
319              }
320 <        }
321 <
322 <        boolean innerCancel(boolean mayInterruptIfRunning) {
323 <            for (;;) {
272 <                int s = getState();
273 <                if (ranOrCancelled(s))
274 <                    return false;
275 <                if (compareAndSetState(s, CANCELLED))
276 <                    break;
320 >            else if ((s = state) > COMPLETING) {
321 >                if (q != null)
322 >                    q.thread = null;
323 >                return s;
324              }
325 <            if (mayInterruptIfRunning) {
326 <                Thread r = runner;
327 <                if (r != null)
328 <                    r.interrupt();
325 >            else if (q == null)
326 >                q = new WaitNode();
327 >            else if (!queued)
328 >                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
329 >                                                     q.next = waiters, q);
330 >            else if (q.thread == null)
331 >                q.thread = Thread.currentThread();
332 >            else if (timed) {
333 >                long now = System.nanoTime();
334 >                if ((nanos -= (now - last)) <= 0L) {
335 >                    removeWaiter(q);
336 >                    return state;
337 >                }
338 >                last = now;
339 >                LockSupport.parkNanos(this, nanos);
340              }
341 <            releaseShared(0);
342 <            done();
285 <            return true;
341 >            else
342 >                LockSupport.park(this);
343          }
344 +    }
345  
346 <        void innerRun() {
347 <            if (!compareAndSetState(READY, RUNNING))
348 <                return;
349 <
350 <            runner = Thread.currentThread();
351 <            if (getState() == RUNNING) { // recheck after setting thread
352 <                V result;
353 <                try {
354 <                    result = callable.call();
355 <                } catch (Throwable ex) {
356 <                    setException(ex);
357 <                    return;
346 >    /**
347 >     * Try to unlink a timed-out or interrupted wait node to avoid
348 >     * accumulating garbage. Internal nodes are simply unspliced
349 >     * without CAS since it is harmless if they are traversed anyway
350 >     * by releasers or concurrent calls to removeWaiter.
351 >     */
352 >    private void removeWaiter(WaitNode node) {
353 >        if (node != null) {
354 >            node.thread = null;
355 >            WaitNode pred = null;
356 >            WaitNode q = waiters;
357 >            while (q != null) {
358 >                WaitNode next = node.next;
359 >                if (q != node) {
360 >                    pred = q;
361 >                    q = next;
362 >                }
363 >                else if (pred != null) {
364 >                    pred.next = next;
365 >                    break;
366 >                }
367 >                else if (UNSAFE.compareAndSwapObject(this, waitersOffset,
368 >                                                     q, next))
369 >                    break;
370 >                else { // restart on CAS failure
371 >                    pred = null;
372 >                    q = waiters;
373                  }
301                set(result);
302            } else {
303                releaseShared(0); // cancel
374              }
375          }
376 +    }
377  
378 <        boolean innerRunAndReset() {
379 <            if (!compareAndSetState(READY, RUNNING))
380 <                return false;
381 <            try {
382 <                runner = Thread.currentThread();
383 <                if (getState() == RUNNING)
384 <                    callable.call(); // don't set result
385 <                runner = null;
386 <                return compareAndSetState(RUNNING, READY);
387 <            } catch (Throwable ex) {
388 <                setException(ex);
389 <                return false;
390 <            }
378 >    // Unsafe mechanics
379 >    private static final sun.misc.Unsafe UNSAFE;
380 >    private static final long stateOffset;
381 >    private static final long runnerOffset;
382 >    private static final long waitersOffset;
383 >    static {
384 >        try {
385 >            UNSAFE = sun.misc.Unsafe.getUnsafe();
386 >            Class<?> k = FutureTask.class;
387 >            stateOffset = UNSAFE.objectFieldOffset
388 >                (k.getDeclaredField("state"));
389 >            runnerOffset = UNSAFE.objectFieldOffset
390 >                (k.getDeclaredField("runner"));
391 >            waitersOffset = UNSAFE.objectFieldOffset
392 >                (k.getDeclaredField("waiters"));
393 >        } catch (Exception e) {
394 >            throw new Error(e);
395          }
396      }
397 +
398   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines