ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java (file contents):
Revision 1.12 by dl, Wed Jan 7 21:43:57 2004 UTC vs.
Revision 1.13 by dl, Sun Jan 11 23:19:55 2004 UTC

# Line 96 | Line 96 | public class ScheduledThreadPoolExecutor
96              this.sequenceNumber = sequencer.getAndIncrement();
97          }
98  
99
99          public long getDelay(TimeUnit unit) {
100              long d =  unit.convert(time - System.nanoTime(),
101                                     TimeUnit.NANOSECONDS);
# Line 136 | Line 135 | public class ScheduledThreadPoolExecutor
135          }
136  
137          /**
138 +         * RUn a periodic task
139 +         */
140 +        private void runPeriodic() {
141 +            boolean ok = ScheduledFutureTask.super.runAndReset();
142 +            boolean down = isShutdown();
143 +            // Reschedule if not cancelled and not shutdown or policy allows
144 +            if (ok && (!down ||
145 +                       (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
146 +                        !isTerminating()))) {
147 +                time = period + (rateBased ? time : System.nanoTime());
148 +                ScheduledThreadPoolExecutor.super.getQueue().add(this);
149 +            }
150 +            // This might have been the final executed delayed
151 +            // task.  Wake up threads to check.
152 +            else if (down)
153 +                interruptIdleWorkers();
154 +        }
155 +
156 +        /**
157           * Overrides FutureTask version so as to reset/requeue if periodic.
158           */
159          public void run() {
160 <            if (!isPeriodic())
160 >            if (isPeriodic())
161 >                runPeriodic();
162 >            else
163                  ScheduledFutureTask.super.run();
144            else {
145                boolean ok = (ScheduledFutureTask.super.runAndReset());
146                boolean down = isShutdown();
147                if (ok && (!down ||
148                    (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
149                     !isTerminating()))) {
150                    time = period + (rateBased ? time : System.nanoTime());
151                    ScheduledThreadPoolExecutor.super.getQueue().add(this);
152                }
153                // This might have been the final executed delayed
154                // task.  Wake up threads to check.
155                else if (down)
156                    interruptIdleWorkers();
157            }
164          }
165      }
166  
167      /**
168 <     * An annoying wrapper class to convince generics compiler to
169 <     * use a DelayQueue<ScheduledFutureTask> as a BlockingQueue<Runnable>
170 <     */
171 <    private static class DelayedWorkQueue
172 <            extends AbstractCollection<Runnable> implements BlockingQueue<Runnable> {
173 <        
168 <        private final DelayQueue<ScheduledFutureTask> dq = new DelayQueue<ScheduledFutureTask>();
169 <        public Runnable poll() { return dq.poll(); }
170 <        public Runnable peek() { return dq.peek(); }
171 <        public Runnable take() throws InterruptedException { return dq.take(); }
172 <        public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
173 <            return dq.poll(timeout, unit);
174 <        }
175 <
176 <        public boolean add(Runnable x) { return dq.add((ScheduledFutureTask)x); }
177 <        public boolean offer(Runnable x) { return dq.offer((ScheduledFutureTask)x); }
178 <        public void put(Runnable x)  {
179 <            dq.put((ScheduledFutureTask)x);
180 <        }
181 <        public boolean offer(Runnable x, long timeout, TimeUnit unit) {
182 <            return dq.offer((ScheduledFutureTask)x, timeout, unit);
183 <        }
184 <
185 <        public Runnable remove() { return dq.remove(); }
186 <        public Runnable element() { return dq.element(); }
187 <        public void clear() { dq.clear(); }
188 <        public int drainTo(Collection<? super Runnable> c) { return dq.drainTo(c); }
189 <        public int drainTo(Collection<? super Runnable> c, int maxElements) {
190 <            return dq.drainTo(c, maxElements);
168 >     * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
169 >     */
170 >    private void delayedExecute(Runnable command) {
171 >        if (isShutdown()) {
172 >            reject(command);
173 >            return;
174          }
175 +        // Prestart a thread if necessary. We cannot prestart it
176 +        // running the task because the task (probably) shouldn't be
177 +        // run yet, so thread will just idle until delay elapses.
178 +        if (getPoolSize() < getCorePoolSize())
179 +            prestartCoreThread();
180 +            
181 +        super.getQueue().add(command);
182 +    }
183  
184 <        public int remainingCapacity() { return dq.remainingCapacity(); }
185 <        public boolean remove(Object x) { return dq.remove(x); }
186 <        public boolean contains(Object x) { return dq.contains(x); }
187 <        public int size() { return dq.size(); }
188 <        public boolean isEmpty() { return dq.isEmpty(); }
189 <        public Object[] toArray() { return dq.toArray(); }
190 <        public <T> T[] toArray(T[] array) { return dq.toArray(array); }
191 <        public Iterator<Runnable> iterator() {
192 <            return new Iterator<Runnable>() {
193 <                private Iterator<ScheduledFutureTask> it = dq.iterator();
194 <                public boolean hasNext() { return it.hasNext(); }
195 <                public Runnable next() { return it.next(); }
196 <                public void remove() {  it.remove(); }
197 <            };
184 >    /**
185 >     * Cancel and clear the queue of all tasks that should not be run
186 >     * due to shutdown policy.
187 >     */
188 >    private void cancelUnwantedTasks() {
189 >        boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
190 >        boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
191 >        if (!keepDelayed && !keepPeriodic)
192 >            super.getQueue().clear();
193 >        else if (keepDelayed || keepPeriodic) {
194 >            Object[] entries = super.getQueue().toArray();
195 >            for (int i = 0; i < entries.length; ++i) {
196 >                Object e = entries[i];
197 >                if (e instanceof ScheduledFutureTask) {
198 >                    ScheduledFutureTask<?> t = (ScheduledFutureTask<?>)e;
199 >                    if (t.isPeriodic()? !keepPeriodic : !keepDelayed)
200 >                        t.cancel(false);
201 >                }
202 >            }
203 >            entries = null;
204 >            purge();
205          }
206      }
207  
208      /**
209 <     * Creates a new ScheduledThreadPoolExecutor with the given core pool size.
209 >     * Creates a new ScheduledThreadPoolExecutor with the given core
210 >     * pool size.
211       *
212       * @param corePoolSize the number of threads to keep in the pool,
213       * even if they are idle.
# Line 221 | Line 220 | public class ScheduledThreadPoolExecutor
220      }
221  
222      /**
223 <     * Creates a new ScheduledThreadPoolExecutor with the given initial parameters.
223 >     * Creates a new ScheduledThreadPoolExecutor with the given
224 >     * initial parameters.
225       *
226       * @param corePoolSize the number of threads to keep in the pool,
227       * even if they are idle.
# Line 236 | Line 236 | public class ScheduledThreadPoolExecutor
236      }
237  
238      /**
239 <     * Creates a new ScheduledThreadPoolExecutor with the given initial parameters.
239 >     * Creates a new ScheduledThreadPoolExecutor with the given
240 >     * initial parameters.
241       *
242       * @param corePoolSize the number of threads to keep in the pool,
243       * even if they are idle.
# Line 251 | Line 252 | public class ScheduledThreadPoolExecutor
252      }
253  
254      /**
255 <     * Creates a new ScheduledThreadPoolExecutor with the given initial parameters.
255 >     * Creates a new ScheduledThreadPoolExecutor with the given
256 >     * initial parameters.
257       *
258       * @param corePoolSize the number of threads to keep in the pool,
259       * even if they are idle.
# Line 268 | Line 270 | public class ScheduledThreadPoolExecutor
270                new DelayedWorkQueue(), threadFactory, handler);
271      }
272  
273 <    /**
274 <     * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
275 <     */
274 <    private void delayedExecute(Runnable command) {
275 <        if (isShutdown()) {
276 <            reject(command);
277 <            return;
278 <        }
279 <        // Prestart a thread if necessary. We cannot prestart it
280 <        // running the task because the task (probably) shouldn't be
281 <        // run yet, so thread will just idle until delay elapses.
282 <        if (getPoolSize() < getCorePoolSize())
283 <            prestartCoreThread();
284 <            
285 <        super.getQueue().add(command);
286 <    }
287 <
288 <    public ScheduledFuture<?> schedule(Runnable command, long delay,  TimeUnit unit) {
273 >    public ScheduledFuture<?> schedule(Runnable command,
274 >                                       long delay,
275 >                                       TimeUnit unit) {
276          if (command == null || unit == null)
277              throw new NullPointerException();
278          long triggerTime = System.nanoTime() + unit.toNanos(delay);
279 <        ScheduledFutureTask<?> t = new ScheduledFutureTask<Boolean>(command, null, triggerTime);
279 >        ScheduledFutureTask<?> t =
280 >            new ScheduledFutureTask<Boolean>(command, null, triggerTime);
281          delayedExecute(t);
282          return t;
283      }
284  
285 <    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
285 >    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
286 >                                           long delay,
287 >                                           TimeUnit unit) {
288          if (callable == null || unit == null)
289              throw new NullPointerException();
290          long triggerTime = System.nanoTime() + unit.toNanos(delay);
291 <        ScheduledFutureTask<V> t = new ScheduledFutureTask<V>(callable, triggerTime);
291 >        ScheduledFutureTask<V> t =
292 >            new ScheduledFutureTask<V>(callable, triggerTime);
293          delayedExecute(t);
294          return t;
295      }
296  
297 <    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,  long period, TimeUnit unit) {
297 >    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
298 >                                                  long initialDelay,  
299 >                                                  long period,
300 >                                                  TimeUnit unit) {
301          if (command == null || unit == null)
302              throw new NullPointerException();
303          if (period <= 0)
304              throw new IllegalArgumentException();
305          long triggerTime = System.nanoTime() + unit.toNanos(initialDelay);
306 <        ScheduledFutureTask<?> t = new ScheduledFutureTask<Object>
307 <            (command, null,
308 <             triggerTime,
309 <             unit.toNanos(period),
310 <             true);
306 >        ScheduledFutureTask<?> t =
307 >            new ScheduledFutureTask<Object>(command,
308 >                                            null,
309 >                                            triggerTime,
310 >                                            unit.toNanos(period),
311 >                                            true);
312          delayedExecute(t);
313          return t;
314      }
315      
316 <    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,  long delay, TimeUnit unit) {
316 >    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
317 >                                                     long initialDelay,  
318 >                                                     long delay,
319 >                                                     TimeUnit unit) {
320          if (command == null || unit == null)
321              throw new NullPointerException();
322          if (delay <= 0)
323              throw new IllegalArgumentException();
324          long triggerTime = System.nanoTime() + unit.toNanos(initialDelay);
325 <        ScheduledFutureTask<?> t = new ScheduledFutureTask<Boolean>
326 <            (command,
327 <             null,
328 <             triggerTime,
329 <             unit.toNanos(delay),
330 <             false);
325 >        ScheduledFutureTask<?> t =
326 >            new ScheduledFutureTask<Boolean>(command,
327 >                                             null,
328 >                                             triggerTime,
329 >                                             unit.toNanos(delay),
330 >                                             false);
331          delayedExecute(t);
332          return t;
333      }
# Line 354 | Line 352 | public class ScheduledThreadPoolExecutor
352          schedule(command, 0, TimeUnit.NANOSECONDS);
353      }
354  
355 +    // Override AbstractExecutorService methods
356 +
357      public Future<?> submit(Runnable task) {
358          return schedule(task, 0, TimeUnit.NANOSECONDS);
359      }
360  
361      public <T> Future<T> submit(Runnable task, T result) {
362 <        return schedule(Executors.callable(task, result), 0, TimeUnit.NANOSECONDS);
362 >        return schedule(Executors.callable(task, result),
363 >                        0, TimeUnit.NANOSECONDS);
364      }
365  
366      public <T> Future<T> submit(Callable<T> task) {
# Line 422 | Line 423 | public class ScheduledThreadPoolExecutor
423          return executeExistingDelayedTasksAfterShutdown;
424      }
425  
425    /**
426     * Cancel and clear the queue of all tasks that should not be run
427     * due to shutdown policy.
428     */
429    private void cancelUnwantedTasks() {
430        boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
431        boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
432        if (!keepDelayed && !keepPeriodic)
433            super.getQueue().clear();
434        else if (keepDelayed || keepPeriodic) {
435            Object[] entries = super.getQueue().toArray();
436            for (int i = 0; i < entries.length; ++i) {
437                Object e = entries[i];
438                if (e instanceof ScheduledFutureTask) {
439                    ScheduledFutureTask<?> t = (ScheduledFutureTask<?>)e;
440                    if (t.isPeriodic()? !keepPeriodic : !keepDelayed)
441                        t.cancel(false);
442                }
443            }
444            entries = null;
445            purge();
446        }
447    }
426  
427      /**
428       * Initiates an orderly shutdown in which previously submitted
# Line 481 | Line 459 | public class ScheduledThreadPoolExecutor
459          return super.shutdownNow();
460      }
461  
484
462      /**
463       * Returns the task queue used by this executor.  Each element of
464       * this queue is a {@link ScheduledFuture}, including those
# Line 497 | Line 474 | public class ScheduledThreadPoolExecutor
474          return super.getQueue();
475      }
476  
477 +    /**
478 +     * An annoying wrapper class to convince generics compiler to
479 +     * use a DelayQueue<ScheduledFutureTask> as a BlockingQueue<Runnable>
480 +     */
481 +    private static class DelayedWorkQueue
482 +        extends AbstractCollection<Runnable>
483 +        implements BlockingQueue<Runnable> {
484 +        
485 +        private final DelayQueue<ScheduledFutureTask> dq = new DelayQueue<ScheduledFutureTask>();
486 +        public Runnable poll() { return dq.poll(); }
487 +        public Runnable peek() { return dq.peek(); }
488 +        public Runnable take() throws InterruptedException { return dq.take(); }
489 +        public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
490 +            return dq.poll(timeout, unit);
491 +        }
492 +
493 +        public boolean add(Runnable x) { return dq.add((ScheduledFutureTask)x); }
494 +        public boolean offer(Runnable x) { return dq.offer((ScheduledFutureTask)x); }
495 +        public void put(Runnable x)  {
496 +            dq.put((ScheduledFutureTask)x);
497 +        }
498 +        public boolean offer(Runnable x, long timeout, TimeUnit unit) {
499 +            return dq.offer((ScheduledFutureTask)x, timeout, unit);
500 +        }
501 +
502 +        public Runnable remove() { return dq.remove(); }
503 +        public Runnable element() { return dq.element(); }
504 +        public void clear() { dq.clear(); }
505 +        public int drainTo(Collection<? super Runnable> c) { return dq.drainTo(c); }
506 +        public int drainTo(Collection<? super Runnable> c, int maxElements) {
507 +            return dq.drainTo(c, maxElements);
508 +        }
509 +
510 +        public int remainingCapacity() { return dq.remainingCapacity(); }
511 +        public boolean remove(Object x) { return dq.remove(x); }
512 +        public boolean contains(Object x) { return dq.contains(x); }
513 +        public int size() { return dq.size(); }
514 +        public boolean isEmpty() { return dq.isEmpty(); }
515 +        public Object[] toArray() { return dq.toArray(); }
516 +        public <T> T[] toArray(T[] array) { return dq.toArray(array); }
517 +        public Iterator<Runnable> iterator() {
518 +            return new Iterator<Runnable>() {
519 +                private Iterator<ScheduledFutureTask> it = dq.iterator();
520 +                public boolean hasNext() { return it.hasNext(); }
521 +                public Runnable next() { return it.next(); }
522 +                public void remove() {  it.remove(); }
523 +            };
524 +        }
525 +    }
526   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines