ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk7/java/util/concurrent/ScheduledThreadPoolExecutor.java
(Generate patch)

Comparing jsr166/src/jdk7/java/util/concurrent/ScheduledThreadPoolExecutor.java (file contents):
Revision 1.8 by jsr166, Sun Jan 18 20:17:32 2015 UTC vs.
Revision 1.9 by jsr166, Thu Nov 5 16:45:06 2015 UTC

# Line 6 | Line 6
6  
7   package java.util.concurrent;
8  
9 import static java.util.concurrent.TimeUnit.NANOSECONDS;
9   import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 + import static java.util.concurrent.TimeUnit.NANOSECONDS;
11 +
12 + import java.util.AbstractQueue;
13 + import java.util.Arrays;
14 + import java.util.Collection;
15 + import java.util.Iterator;
16 + import java.util.List;
17 + import java.util.NoSuchElementException;
18   import java.util.concurrent.atomic.AtomicLong;
19   import java.util.concurrent.locks.Condition;
20   import java.util.concurrent.locks.ReentrantLock;
14 import java.util.*;
21  
22   /**
23   * A {@link ThreadPoolExecutor} that can additionally schedule
# Line 36 | Line 42 | import java.util.*;
42   * removed from the work queue at time of cancellation.
43   *
44   * <p>Successive executions of a periodic task scheduled via
45 < * {@link #scheduleAtFixedRate} or
46 < * {@link #scheduleWithFixedDelay} do not overlap. While different
47 < * executions may be performed by different threads, the effects of
48 < * prior executions <a
49 < * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
45 > * {@link #scheduleAtFixedRate scheduleAtFixedRate} or
46 > * {@link #scheduleWithFixedDelay scheduleWithFixedDelay}
47 > * do not overlap. While different executions may be performed by
48 > * different threads, the effects of prior executions
49 > * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
50   * those of subsequent ones.
51   *
52   * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
# Line 70 | Line 76 | import java.util.*;
76   * {@link FutureTask}. However, this may be modified or replaced using
77   * subclasses of the form:
78   *
79 < *  <pre> {@code
79 > * <pre> {@code
80   * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
81   *
82   *   static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
# Line 134 | Line 140 | public class ScheduledThreadPoolExecutor
140      /**
141       * True if ScheduledFutureTask.cancel should remove from queue.
142       */
143 <    private volatile boolean removeOnCancel = false;
143 >    volatile boolean removeOnCancel;
144  
145      /**
146       * Sequence number to break scheduling ties, and in turn to
# Line 142 | Line 148 | public class ScheduledThreadPoolExecutor
148       */
149      private static final AtomicLong sequencer = new AtomicLong();
150  
145    /**
146     * Returns current nanosecond time.
147     */
148    final long now() {
149        return System.nanoTime();
150    }
151
151      private class ScheduledFutureTask<V>
152              extends FutureTask<V> implements RunnableScheduledFuture<V> {
153  
# Line 156 | Line 155 | public class ScheduledThreadPoolExecutor
155          private final long sequenceNumber;
156  
157          /** The time the task is enabled to execute in nanoTime units */
158 <        private long time;
158 >        private volatile long time;
159  
160          /**
161           * Period in nanoseconds for repeating tasks.
# Line 177 | Line 176 | public class ScheduledThreadPoolExecutor
176          /**
177           * Creates a one-shot action with given nanoTime-based trigger time.
178           */
179 <        ScheduledFutureTask(Runnable r, V result, long triggerTime) {
179 >        ScheduledFutureTask(Runnable r, V result, long triggerTime,
180 >                            long sequenceNumber) {
181              super(r, result);
182              this.time = triggerTime;
183              this.period = 0;
184 <            this.sequenceNumber = sequencer.getAndIncrement();
184 >            this.sequenceNumber = sequenceNumber;
185          }
186  
187          /**
# Line 189 | Line 189 | public class ScheduledThreadPoolExecutor
189           * trigger time and period.
190           */
191          ScheduledFutureTask(Runnable r, V result, long triggerTime,
192 <                            long period) {
192 >                            long period, long sequenceNumber) {
193              super(r, result);
194              this.time = triggerTime;
195              this.period = period;
196 <            this.sequenceNumber = sequencer.getAndIncrement();
196 >            this.sequenceNumber = sequenceNumber;
197          }
198  
199          /**
200           * Creates a one-shot action with given nanoTime-based trigger time.
201           */
202 <        ScheduledFutureTask(Callable<V> callable, long triggerTime) {
202 >        ScheduledFutureTask(Callable<V> callable, long triggerTime,
203 >                            long sequenceNumber) {
204              super(callable);
205              this.time = triggerTime;
206              this.period = 0;
207 <            this.sequenceNumber = sequencer.getAndIncrement();
207 >            this.sequenceNumber = sequenceNumber;
208          }
209  
210          public long getDelay(TimeUnit unit) {
211 <            return unit.convert(time - now(), NANOSECONDS);
211 >            return unit.convert(time - System.nanoTime(), NANOSECONDS);
212          }
213  
214          public int compareTo(Delayed other) {
# Line 264 | Line 265 | public class ScheduledThreadPoolExecutor
265              if (!canRunInCurrentRunState(periodic))
266                  cancel(false);
267              else if (!periodic)
268 <                ScheduledFutureTask.super.run();
269 <            else if (ScheduledFutureTask.super.runAndReset()) {
268 >                super.run();
269 >            else if (super.runAndReset()) {
270                  setNextRunTime();
271                  reExecutePeriodic(outerTask);
272              }
# Line 491 | Line 492 | public class ScheduledThreadPoolExecutor
492       * Returns the nanoTime-based trigger time of a delayed action.
493       */
494      long triggerTime(long delay) {
495 <        return now() +
495 >        return System.nanoTime() +
496              ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
497      }
498  
# Line 523 | Line 524 | public class ScheduledThreadPoolExecutor
524              throw new NullPointerException();
525          RunnableScheduledFuture<Void> t = decorateTask(command,
526              new ScheduledFutureTask<Void>(command, null,
527 <                                          triggerTime(delay, unit)));
527 >                                          triggerTime(delay, unit),
528 >                                          sequencer.getAndIncrement()));
529          delayedExecute(t);
530          return t;
531      }
# Line 539 | Line 541 | public class ScheduledThreadPoolExecutor
541              throw new NullPointerException();
542          RunnableScheduledFuture<V> t = decorateTask(callable,
543              new ScheduledFutureTask<V>(callable,
544 <                                       triggerTime(delay, unit)));
544 >                                       triggerTime(delay, unit),
545 >                                       sequencer.getAndIncrement()));
546          delayedExecute(t);
547          return t;
548      }
# Line 555 | Line 558 | public class ScheduledThreadPoolExecutor
558                                                    TimeUnit unit) {
559          if (command == null || unit == null)
560              throw new NullPointerException();
561 <        if (period <= 0)
561 >        if (period <= 0L)
562              throw new IllegalArgumentException();
563          ScheduledFutureTask<Void> sft =
564              new ScheduledFutureTask<Void>(command,
565                                            null,
566                                            triggerTime(initialDelay, unit),
567 <                                          unit.toNanos(period));
567 >                                          unit.toNanos(period),
568 >                                          sequencer.getAndIncrement());
569          RunnableScheduledFuture<Void> t = decorateTask(command, sft);
570          sft.outerTask = t;
571          delayedExecute(t);
# Line 579 | Line 583 | public class ScheduledThreadPoolExecutor
583                                                       TimeUnit unit) {
584          if (command == null || unit == null)
585              throw new NullPointerException();
586 <        if (delay <= 0)
586 >        if (delay <= 0L)
587              throw new IllegalArgumentException();
588          ScheduledFutureTask<Void> sft =
589              new ScheduledFutureTask<Void>(command,
590                                            null,
591                                            triggerTime(initialDelay, unit),
592 <                                          unit.toNanos(-delay));
592 >                                          -unit.toNanos(delay),
593 >                                          sequencer.getAndIncrement());
594          RunnableScheduledFuture<Void> t = decorateTask(command, sft);
595          sft.outerTask = t;
596          delayedExecute(t);
# Line 758 | Line 763 | public class ScheduledThreadPoolExecutor
763      /**
764       * Attempts to stop all actively executing tasks, halts the
765       * processing of waiting tasks, and returns a list of the tasks
766 <     * that were awaiting execution.
766 >     * that were awaiting execution. These tasks are drained (removed)
767 >     * from the task queue upon return from this method.
768       *
769       * <p>This method does not wait for actively executing tasks to
770       * terminate.  Use {@link #awaitTermination awaitTermination} to
# Line 766 | Line 772 | public class ScheduledThreadPoolExecutor
772       *
773       * <p>There are no guarantees beyond best-effort attempts to stop
774       * processing actively executing tasks.  This implementation
775 <     * cancels tasks via {@link Thread#interrupt}, so any task that
775 >     * interrupts tasks via {@link Thread#interrupt}; any task that
776       * fails to respond to interrupts may never terminate.
777       *
778       * @return list of tasks that never commenced execution.
# Line 774 | Line 780 | public class ScheduledThreadPoolExecutor
780       *         For tasks submitted via one of the {@code schedule}
781       *         methods, the element will be identical to the returned
782       *         {@code ScheduledFuture}.  For tasks submitted using
783 <     *         {@link #execute}, the element will be a zero-delay {@code
784 <     *         ScheduledFuture}.
783 >     *         {@link #execute execute}, the element will be a
784 >     *         zero-delay {@code ScheduledFuture}.
785       * @throws SecurityException {@inheritDoc}
786       */
787      public List<Runnable> shutdownNow() {
# Line 783 | Line 789 | public class ScheduledThreadPoolExecutor
789      }
790  
791      /**
792 <     * Returns the task queue used by this executor.
793 <     * Each element of this list is a {@link ScheduledFuture}.
792 >     * Returns the task queue used by this executor.  Access to the
793 >     * task queue is intended primarily for debugging and monitoring.
794 >     * This queue may be in active use.  Retrieving the task queue
795 >     * does not prevent queued tasks from executing.
796 >     *
797 >     * <p>Each element of this queue is a {@link ScheduledFuture}.
798       * For tasks submitted via one of the {@code schedule} methods, the
799       * element will be identical to the returned {@code ScheduledFuture}.
800 <     * For tasks submitted using {@link #execute}, the element will be a
801 <     * zero-delay {@code ScheduledFuture}.
800 >     * For tasks submitted using {@link #execute execute}, the element
801 >     * will be a zero-delay {@code ScheduledFuture}.
802       *
803       * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
804       * tasks in the order in which they will execute.
# Line 1060 | Line 1070 | public class ScheduledThreadPoolExecutor
1070              lock.lock();
1071              try {
1072                  RunnableScheduledFuture<?> first = queue[0];
1073 <                if (first == null || first.getDelay(NANOSECONDS) > 0)
1074 <                    return null;
1075 <                else
1066 <                    return finishPoll(first);
1073 >                return (first == null || first.getDelay(NANOSECONDS) > 0)
1074 >                    ? null
1075 >                    : finishPoll(first);
1076              } finally {
1077                  lock.unlock();
1078              }
# Line 1079 | Line 1088 | public class ScheduledThreadPoolExecutor
1088                          available.await();
1089                      else {
1090                          long delay = first.getDelay(NANOSECONDS);
1091 <                        if (delay <= 0)
1091 >                        if (delay <= 0L)
1092                              return finishPoll(first);
1093                          first = null; // don't retain ref while waiting
1094                          if (leader != null)
# Line 1112 | Line 1121 | public class ScheduledThreadPoolExecutor
1121                  for (;;) {
1122                      RunnableScheduledFuture<?> first = queue[0];
1123                      if (first == null) {
1124 <                        if (nanos <= 0)
1124 >                        if (nanos <= 0L)
1125                              return null;
1126                          else
1127                              nanos = available.awaitNanos(nanos);
1128                      } else {
1129                          long delay = first.getDelay(NANOSECONDS);
1130 <                        if (delay <= 0)
1130 >                        if (delay <= 0L)
1131                              return finishPoll(first);
1132 <                        if (nanos <= 0)
1132 >                        if (nanos <= 0L)
1133                              return null;
1134                          first = null; // don't retain ref while waiting
1135                          if (nanos < delay || leader != null)
# Line 1252 | Line 1261 | public class ScheduledThreadPoolExecutor
1261           */
1262          private class Itr implements Iterator<Runnable> {
1263              final RunnableScheduledFuture<?>[] array;
1264 <            int cursor = 0;     // index of next element to return
1265 <            int lastRet = -1;   // index of last element, or -1 if no such
1264 >            int cursor;        // index of next element to return; initially 0
1265 >            int lastRet = -1;  // index of last element returned; -1 if no such
1266  
1267              Itr(RunnableScheduledFuture<?>[] array) {
1268                  this.array = array;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines