ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledExecutor.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ScheduledExecutor.java (file contents):
Revision 1.3 by dl, Thu May 29 16:42:04 2003 UTC vs.
Revision 1.4 by dl, Tue Jun 3 16:44:36 2003 UTC

# Line 1 | Line 1
1   /*
2 < * @(#)ScheduledExecutor.java
2 > * Written by Doug Lea with assistance from members of JCP JSR-166
3 > * Expert Group and released to the public domain. Use, modify, and
4 > * redistribute this code in any way without acknowledgement.
5   */
6  
7   package java.util.concurrent;
8 + import java.util.concurrent.atomic.*;
9   import java.util.*;
10  
11   /**
# Line 21 | Line 24 | import java.util.*;
24   */
25   public class ScheduledExecutor extends ThreadPoolExecutor {
26  
27 <    private static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
28 <        private DelayQueue<Runnable> dq = new DelayQueue<Runnable>();
27 >    /**
28 >     * A delayed or periodic action.
29 >     */
30 >    public static class DelayedTask extends CancellableTask implements Delayed {
31 >        /**
32 >         * Sequence number to break ties, and in turn to guarantee
33 >         * FIFO order among tied entries.
34 >         */
35 >        private static final AtomicLong sequencer = new AtomicLong(0);
36 >
37 >        private final long sequenceNumber;
38 >        private final long time;
39 >        private final long period;
40 >
41 >        /**
42 >         * Creates a one-shot action with given nanosecond delay
43 >         */
44 >        DelayedTask(Runnable r, long ns) {
45 >            super(r);
46 >            this.time = ns;
47 >            this.period = 0;
48 >            this.sequenceNumber = sequencer.getAndIncrement();
49 >        }
50 >
51 >        /**
52 >         * Creates a periodic action with given nanosecond delay and period
53 >         */
54 >        DelayedTask(Runnable r, long ns,  long period) {
55 >            super(r);
56 >            if (period <= 0)
57 >                throw new IllegalArgumentException();
58 >            this.time = ns;
59 >            this.period = period;
60 >            this.sequenceNumber = sequencer.getAndIncrement();
61 >        }
62 >
63 >
64 >        public long getDelay(TimeUnit unit) {
65 >            return unit.convert(time - TimeUnit.nanoTime(), TimeUnit.NANOSECONDS);
66 >        }
67 >
68 >        public int compareTo(Object other) {
69 >            DelayedTask x = (DelayedTask)other;
70 >            long diff = time - x.time;
71 >            if (diff < 0)
72 >                return -1;
73 >            else if (diff > 0)
74 >                return 1;
75 >            else if (sequenceNumber < x.sequenceNumber)
76 >                return -1;
77 >            else
78 >                return 1;
79 >        }
80 >
81 >        /**
82 >         * Return true if this is a periodic (not a one-shot) action.
83 >         */
84 >        public boolean isPeriodic() {
85 >            return period > 0;
86 >        }
87 >
88 >        /**
89 >         * Returns the period, or zero if non-periodic
90 >         */
91 >        public long getPeriod(TimeUnit unit) {
92 >            return unit.convert(period, TimeUnit.NANOSECONDS);
93 >        }
94 >
95 >        /**
96 >         * Return a new DelayedTask that will trigger in the period
97 >         * subsequent to current task, or null if non-periodic
98 >         * or canceled.
99 >         */
100 >        public  DelayedTask nextTask() {
101 >            if (period <= 0 || isCancelled())
102 >                return null;
103 >            return new DelayedTask(getRunnable(), time+period, period);
104 >        }
105  
106 <        DelayQueue<Runnable> getDelayQueue() { return dq; }
106 >    }
107  
108 <        public Runnable take() throws InterruptedException {
109 <            return dq.take().get();
108 >    /**
109 >     * A delayed result-bearing action.
110 >     */
111 >    public static class DelayedFutureTask<V> extends DelayedTask implements Future<V> {
112 >        /**
113 >         * Creates a Future that may trigger after the given delay.
114 >         */
115 >        DelayedFutureTask(Callable<V> callable, long delay,  TimeUnit unit) {
116 >            // must set after super ctor call to use inner class
117 >            super(null, TimeUnit.nanoTime() + unit.toNanos(delay));
118 >            setRunnable(new InnerCancellableFuture<V>(callable));
119          }
120  
121 <        public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
122 <            DelayEntry<Runnable> e = dq.poll(timeout, unit);
123 <            return (e == null) ? null : e.get();
121 >        /**
122 >         * Creates a one-shot action that may trigger after the given date.
123 >         */
124 >        DelayedFutureTask(Callable<V> callable, Date date) {
125 >            super(null,
126 >                  TimeUnit.MILLISECONDS.toNanos(date.getTime() -
127 >                                                System.currentTimeMillis()));
128 >            setRunnable(new InnerCancellableFuture<V>(callable));
129          }
130 <
131 <        public Runnable poll() {
132 <            DelayEntry<Runnable> e = dq.poll();
40 <            return (e == null) ? null : e.get();
130 >    
131 >        public V get() throws InterruptedException, ExecutionException {
132 >            return ((InnerCancellableFuture<V>)getRunnable()).get();
133          }
134  
135 <        public Runnable peek() {
136 <            DelayEntry<Runnable> e = dq.peek();
137 <            return (e == null) ? null : e.get();
135 >        public V get(long timeout, TimeUnit unit)
136 >            throws InterruptedException, ExecutionException, TimeoutException {
137 >            return ((InnerCancellableFuture<V>)getRunnable()).get(timeout, unit);
138          }
139  
140 <        public int size() { return dq.size(); }
141 <        public int remainingCapacity() { return dq.remainingCapacity(); }
50 <        public void put(Runnable r) {
51 <            assert false;
52 <        }
53 <        public boolean offer(Runnable r) {
54 <            assert false;
55 <            return false;
56 <        }
57 <        public boolean offer(Runnable r, long time, TimeUnit unit) {
58 <            assert false;
59 <            return false;
60 <        }
61 <        public Iterator<Runnable> iterator() {
62 <            assert false;
63 <            return null; // for now
140 >        protected void set(V v) {
141 >            ((InnerCancellableFuture<V>)getRunnable()).set(v);
142          }
143  
144 <        public boolean remove(Object x) {
145 <            assert false;
68 <            return false; // for now
144 >        protected void setException(Throwable t) {
145 >            ((InnerCancellableFuture<V>)getRunnable()).setException(t);
146          }
147 +    }
148 +
149  
150 <        public Object[] toArray() {
151 <            assert false;
152 <            return null; // for now
150 >    /**
151 >     * An annoying wrapper class to convince generics compiler to
152 >     * use a DelayQueue<DelayedTask> as a BlockingQUeue<Runnable>
153 >     */
154 >    private static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
155 >        private final DelayQueue<DelayedTask> dq = new DelayQueue<DelayedTask>();
156 >        public Runnable poll() { return dq.poll(); }
157 >        public Runnable peek() { return dq.peek(); }
158 >        public Runnable take() throws InterruptedException { return dq.take(); }
159 >        public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
160 >            return dq.poll(timeout, unit);
161          }
162 <        public <T> T[] toArray(T[] a) {
163 <            assert false;
164 <            return null; // for now
162 >        public boolean offer(Runnable x) { return dq.offer((DelayedTask)x); }
163 >        public void put(Runnable x) throws InterruptedException {
164 >            dq.put((DelayedTask)x);
165 >        }
166 >        public boolean offer(Runnable x, long timeout, TimeUnit unit) throws InterruptedException {
167 >            return dq.offer((DelayedTask)x, timeout, unit);
168 >        }
169 >        public int remainingCapacity() { return dq.remainingCapacity(); }
170 >        public boolean remove(Object x) { return dq.remove(x); }
171 >        public boolean contains(Object x) { return dq.contains(x); }
172 >        public int size() { return dq.size(); }
173 >        public boolean isEmpty() { return dq.isEmpty(); }
174 >        public Iterator<Runnable> iterator() {
175 >            return new Iterator<Runnable>() {
176 >                private Iterator<DelayedTask> it = dq.iterator();
177 >                public boolean hasNext() { return it.hasNext(); }
178 >                public Runnable next() { return it.next(); }
179 >                public void remove() {  it.remove(); }
180 >            };
181          }
182      }
183  
81
184      /**
185       * Creates a new ScheduledExecutor with the given initial parameters.
186       *
# Line 90 | Line 192 | public class ScheduledExecutor extends T
192                new DelayedWorkQueue());
193      }
194  
195 <    public void execute(DelayEntry<Runnable> r) {
196 <        if (isShutdown()) {
197 <            getRejectedExecutionHandler().rejectedExecution(r.get(), this);
198 <            return;
199 <        }
195 >    /**
196 >     * Creates a new ScheduledExecutor with the given initial parameters.
197 >     *
198 >     * @param corePoolSize the number of threads to keep in the pool,
199 >     * even if they are idle.
200 >     * @param threadFactory the factory to use when the executor
201 >     * creates a new thread.
202 >     */
203 >    public ScheduledExecutor(int corePoolSize,
204 >                             ThreadFactory threadFactory) {
205 >        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
206 >              new DelayedWorkQueue(), threadFactory);
207 >    }
208 >
209 >    /**
210 >     * Creates a new ScheduledExecutor with the given initial parameters.
211 >     *
212 >     * @param corePoolSize the number of threads to keep in the pool,
213 >     * even if they are idle.
214 >     * @param handler the handler to use when execution is blocked
215 >     * because the thread bounds and queue capacities are reached.
216 >     */
217 >    public ScheduledExecutor(int corePoolSize,
218 >                              RejectedExecutionHandler handler) {
219 >        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
220 >              new DelayedWorkQueue(), handler);
221 >    }
222  
223 <        addIfUnderCorePoolSize(null);
224 <        ((DelayedWorkQueue)getQueue()).getDelayQueue().put(r);
223 >    /**
224 >     * Creates a new ScheduledExecutor with the given initial parameters.
225 >     *
226 >     * @param corePoolSize the number of threads to keep in the pool,
227 >     * even if they are idle.
228 >     * @param threadFactory the factory to use when the executor
229 >     * creates a new thread.
230 >     * @param handler the handler to use when execution is blocked
231 >     * because the thread bounds and queue capacities are reached.
232 >     */
233 >    public ScheduledExecutor(int corePoolSize,
234 >                              ThreadFactory threadFactory,
235 >                              RejectedExecutionHandler handler) {
236 >        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
237 >              new DelayedWorkQueue(), threadFactory, handler);
238      }
239  
240 <    public void execute(Runnable r) {
241 <        execute(new DelayEntry(r, 0, TimeUnit.NANOSECONDS));
240 >    /**
241 >     * Creates and executes a one-shot action that may trigger after
242 >     * the given delay.
243 >     */
244 >
245 >    public DelayedTask schedule(Runnable r, long delay,  TimeUnit unit) {
246 >        DelayedTask t = new DelayedTask(r, TimeUnit.nanoTime() + unit.toNanos(delay));
247 >        super.execute(t);
248 >        return t;
249      }
250  
251 +    /**
252 +     * Creates and executes a one-shot action that may trigger after the given date.
253 +     */
254 +    public DelayedTask schedule(Runnable r, Date date) {
255 +        DelayedTask t = new DelayedTask
256 +            (r,
257 +             TimeUnit.MILLISECONDS.toNanos(date.getTime() -
258 +                                           System.currentTimeMillis()));
259 +        super.execute(t);
260 +        return t;
261 +    }
262 +    
263 +    /**
264 +     * Creates and executes a periodic action that may trigger first
265 +     * after the given initial delay, and subsequently with the given
266 +     * period.
267 +     */
268 +    public DelayedTask schedule (Runnable r, long initialDelay,  long period, TimeUnit unit) {
269 +        DelayedTask t = new DelayedTask
270 +            (r, TimeUnit.nanoTime() + unit.toNanos(initialDelay),
271 +             unit.toNanos(period));
272 +        super.execute(t);
273 +        return t;
274 +    }
275 +    
276 +    /**
277 +     * Creates a periodic action that may trigger first after the
278 +     * given date, and subsequently with the given period.
279 +     */
280 +    public DelayedTask schedule(Runnable r, Date firstDate, long period, TimeUnit unit) {
281 +        DelayedTask t = new DelayedTask
282 +            (r,
283 +             TimeUnit.MILLISECONDS.toNanos(firstDate.getTime() -
284 +                                           System.currentTimeMillis()),
285 +             unit.toNanos(period));
286 +        super.execute(t);
287 +        return t;
288 +    }
289 +
290 +
291 +    /**
292 +     * Creates and executes a Future that may trigger after the given delay.
293 +     */
294 +    public <V> DelayedFutureTask<V> schedule(Callable<V> callable, long delay,  TimeUnit unit) {
295 +        DelayedFutureTask<V> t = new DelayedFutureTask<V>
296 +            (callable, delay, unit);
297 +        super.execute(t);
298 +        return t;
299 +    }
300 +
301 +    /**
302 +     * Creates and executes a one-shot action that may trigger after
303 +     * the given date.
304 +     */
305 +    public <V> DelayedFutureTask<V> schedule(Callable<V> callable, Date date) {
306 +        DelayedFutureTask<V> t = new DelayedFutureTask<V>
307 +            (callable, date);
308 +        super.execute(t);
309 +        return t;
310 +    }
311 +
312 +    /**
313 +     * Execute command with zero required delay
314 +     */
315 +    public void execute(Runnable command) {
316 +        schedule(command, 0, TimeUnit.NANOSECONDS);
317 +    }
318 +
319 +    /**
320 +     * If executed task was periodic, cause the task for the next
321 +     * period to execute.
322 +     */
323 +    protected void afterExecute(Runnable r, Throwable t) {
324 +        if (isShutdown())
325 +            return;
326 +        super.afterExecute(r, t);
327 +        DelayedTask d = (DelayedTask)r;
328 +        DelayedTask next = d.nextTask();
329 +        if (next != null)
330 +            super.execute(next);
331 +    }
332   }
333 +
334 +

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines