ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java (file contents):
Revision 1.45 by jsr166, Tue Sep 18 01:04:35 2007 UTC vs.
Revision 1.46 by jsr166, Tue Sep 25 20:08:07 2007 UTC

# Line 17 | Line 17 | import java.util.*;
17   * flexibility or capabilities of {@link ThreadPoolExecutor} (which
18   * this class extends) are required.
19   *
20 < * <p> Delayed tasks execute no sooner than they are enabled, but
20 > * <p>Delayed tasks execute no sooner than they are enabled, but
21   * without any real-time guarantees about when, after they are
22   * enabled, they will commence. Tasks scheduled for exactly the same
23   * execution time are enabled in first-in-first-out (FIFO) order of
24 < * submission. If {@link #setRemoveOnCancelPolicy} is set {@code true},
25 < * cancelled tasks are automatically removed from the work queue.
24 > * submission.
25 > *
26 > * <p>When a submitted task is cancelled before it is run, execution
27 > * is suppressed. By default, such a cancelled task is not
28 > * automatically removed from the work queue until its delay
29 > * elapses. While this enables further inspection and monitoring, it
30 > * may also cause unbounded retention of cancelled tasks. To avoid
31 > * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which
32 > * causes tasks to be immediately removed from the work queue at
33 > * time of cancellation.
34   *
35   * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
36   * of the inherited tuning methods are not useful for it. In
# Line 84 | Line 92 | public class ScheduledThreadPoolExecutor
92       *    ScheduledExecutorService methods) which are treated as
93       *    delayed tasks with a delay of zero.
94       *
95 <     * 2. Using a custom queue (DelayedWorkQueue) based on an
95 >     * 2. Using a custom queue (DelayedWorkQueue), a variant of
96       *    unbounded DelayQueue. The lack of capacity constraint and
97       *    the fact that corePoolSize and maximumPoolSize are
98       *    effectively identical simplifies some execution mechanics
99 <     *    (see delayedExecute) compared to ThreadPoolExecutor
92 <     *    version.
93 <     *
94 <     *    The DelayedWorkQueue class is defined below for the sake of
95 <     *    ensuring that all elements are instances of
96 <     *    RunnableScheduledFuture.  Since DelayQueue otherwise
97 <     *    requires type be Delayed, but not necessarily Runnable, and
98 <     *    the workQueue requires the opposite, we need to explicitly
99 <     *    define a class that requires both to ensure that users don't
100 <     *    add objects that aren't RunnableScheduledFutures via
101 <     *    getQueue().add() etc.
99 >     *    (see delayedExecute) compared to ThreadPoolExecutor.
100       *
101       * 3. Supporting optional run-after-shutdown parameters, which
102       *    leads to overrides of shutdown methods to remove and cancel
# Line 647 | Line 645 | public class ScheduledThreadPoolExecutor
645      }
646  
647      /**
648 <     * Sets the policy on whether cancellation of a task should remove
649 <     * it from the work queue.  This value is by default {@code false}.
648 >     * Sets the policy on whether cancelled tasks should be immediately
649 >     * removed from the work queue at time of cancellation.  This value is
650 >     * by default {@code false}.
651       *
652       * @param value if {@code true}, remove on cancellation, else don't
653       * @see #getRemoveOnCancelPolicy
# Line 659 | Line 658 | public class ScheduledThreadPoolExecutor
658      }
659  
660      /**
661 <     * Gets the policy on whether cancellation of a task should remove
662 <     * it from the work queue.  This value is by default {@code false}.
661 >     * Gets the policy on whether cancelled tasks should be immediately
662 >     * removed from the work queue at time of cancellation.  This value is
663 >     * by default {@code false}.
664       *
665 <     * @return {@code true} if cancelled tasks are removed from the queue
665 >     * @return {@code true} if cancelled tasks are immediately removed
666 >     *         from the queue
667       * @see #setRemoveOnCancelPolicy
668       * @since 1.7
669       */
# Line 753 | Line 754 | public class ScheduledThreadPoolExecutor
754           * identified by heapIndex.
755           */
756  
757 <        private static final int INITIAL_CAPACITY = 64;
758 <        private transient RunnableScheduledFuture[] queue =
757 >        private static final int INITIAL_CAPACITY = 16;
758 >        private RunnableScheduledFuture[] queue =
759              new RunnableScheduledFuture[INITIAL_CAPACITY];
760 <        private transient final ReentrantLock lock = new ReentrantLock();
760 <        private transient final Condition available = lock.newCondition();
760 >        private final ReentrantLock lock = new ReentrantLock();
761          private int size = 0;
762  
763 +        /**
764 +         * Thread designated to wait for the task at the head of the
765 +         * queue.  This variant of the Leader-Follower pattern
766 +         * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
767 +         * minimize unnecessary timed waiting.  When a thread becomes
768 +         * the leader, it waits only for the next delay to elapse, but
769 +         * other threads await indefinitely.  The leader thread must
770 +         * signal some other thread before returning from take() or
771 +         * poll(...), unless some other thread becomes leader in the
772 +         * interim.  Whenever the head of the queue is replaced with a
773 +         * task with an earlier expiration time, the leader field is
774 +         * invalidated by being reset to null, and some waiting
775 +         * thread, but not necessarily the current leader, is
776 +         * signalled.  So waiting threads must be prepared to acquire
777 +         * and lose leadership while waiting.
778 +         */
779 +        private Thread leader = null;
780 +
781 +        /**
782 +         * Condition signalled when a newer task becomes available at the
783 +         * head of the queue or a new thread may need to become leader.
784 +         */
785 +        private final Condition available = lock.newCondition();
786  
787          /**
788           * Set f's heapIndex if it is a ScheduledFutureTask.
# Line 810 | Line 833 | public class ScheduledThreadPoolExecutor
833          }
834  
835          /**
813         * Performs common bookkeeping for poll and take: Replaces
814         * first element with last; sifts it down, and signals any
815         * waiting consumers.  Call only when holding lock.
816         * @param f the task to remove and return
817         */
818        private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
819            int s = --size;
820            RunnableScheduledFuture x = queue[s];
821            queue[s] = null;
822            if (s != 0) {
823                siftDown(0, x);
824                available.signalAll();
825            }
826            setIndex(f, -1);
827            return f;
828        }
829
830        /**
836           * Resize the heap array.  Call only when holding lock.
837           */
838          private void grow() {
# Line 936 | Line 941 | public class ScheduledThreadPoolExecutor
941                  } else {
942                      siftUp(i, e);
943                  }
944 <                if (queue[0] == e)
945 <                    available.signalAll();
944 >                if (queue[0] == e) {
945 >                    leader = null;
946 >                    available.signal();
947 >                }
948              } finally {
949                  lock.unlock();
950              }
# Line 956 | Line 963 | public class ScheduledThreadPoolExecutor
963              return offer(e);
964          }
965  
966 +        /**
967 +         * Performs common bookkeeping for poll and take: Replaces
968 +         * first element with last; sifts it down, and signals another
969 +         * waiting consumer.  Call only when holding lock.
970 +         * @param f the task to remove and return
971 +         */
972 +        private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
973 +            int s = --size;
974 +            RunnableScheduledFuture x = queue[s];
975 +            queue[s] = null;
976 +            if (s != 0)
977 +                siftDown(0, x);
978 +            setIndex(f, -1);
979 +            return f;
980 +        }
981 +
982          public RunnableScheduledFuture poll() {
983              final ReentrantLock lock = this.lock;
984              lock.lock();
# Line 979 | Line 1002 | public class ScheduledThreadPoolExecutor
1002                      if (first == null)
1003                          available.await();
1004                      else {
1005 <                        long delay = first.getDelay(TimeUnit.NANOSECONDS);
1006 <                        if (delay > 0)
1007 <                            available.awaitNanos(delay);
1008 <                        else
1009 <                            return finishPoll(first);
1005 >                        long delay = first.getDelay(TimeUnit.NANOSECONDS);
1006 >                        if (delay <= 0)
1007 >                            return finishPoll(first);
1008 >                        else if (leader != null)
1009 >                            available.await();
1010 >                        else {
1011 >                            Thread thisThread = Thread.currentThread();
1012 >                            leader = thisThread;
1013 >                            try {
1014 >                                available.awaitNanos(delay);
1015 >                            } finally {
1016 >                                if (leader == thisThread)
1017 >                                    leader = null;
1018 >                            }
1019 >                        }
1020                      }
1021                  }
1022              } finally {
1023 +                if (leader == null && queue[0] != null)
1024 +                    available.signal();
1025                  lock.unlock();
1026              }
1027          }
# Line 1006 | Line 1041 | public class ScheduledThreadPoolExecutor
1041                              nanos = available.awaitNanos(nanos);
1042                      } else {
1043                          long delay = first.getDelay(TimeUnit.NANOSECONDS);
1044 <                        if (delay > 0) {
1010 <                            if (nanos <= 0)
1011 <                                return null;
1012 <                            if (delay > nanos)
1013 <                                delay = nanos;
1014 <                            long timeLeft = available.awaitNanos(delay);
1015 <                            nanos -= delay - timeLeft;
1016 <                        } else
1044 >                        if (delay <= 0)
1045                              return finishPoll(first);
1046 <                    }
1047 <                }
1046 >                        if (nanos <= 0)
1047 >                            return null;
1048 >                        if (nanos < delay || leader != null)
1049 >                            nanos = available.awaitNanos(nanos);
1050 >                        else {
1051 >                            Thread thisThread = Thread.currentThread();
1052 >                            leader = thisThread;
1053 >                            try {
1054 >                                long timeLeft = available.awaitNanos(delay);
1055 >                                nanos -= delay - timeLeft;
1056 >                            } finally {
1057 >                                if (leader == thisThread)
1058 >                                    leader = null;
1059 >                            }
1060 >                        }
1061 >                    }
1062 >                }
1063              } finally {
1064 +                if (leader == null && queue[0] != null)
1065 +                    available.signal();
1066                  lock.unlock();
1067              }
1068          }
# Line 1047 | Line 1092 | public class ScheduledThreadPoolExecutor
1092              RunnableScheduledFuture first = queue[0];
1093              if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1094                  return null;
1095 <            setIndex(first, -1);
1051 <            int s = --size;
1052 <            RunnableScheduledFuture x = queue[s];
1053 <            queue[s] = null;
1054 <            if (s != 0)
1055 <                siftDown(0, x);
1056 <            return first;
1095 >            return finishPoll(first);
1096          }
1097  
1098          public int drainTo(Collection<? super Runnable> c) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines