ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java (file contents):
Revision 1.39 by jsr166, Tue Jan 30 03:43:07 2007 UTC vs.
Revision 1.40 by dl, Sun Feb 18 23:16:35 2007 UTC

# Line 6 | Line 6
6  
7   package java.util.concurrent;
8   import java.util.concurrent.atomic.*;
9 + import java.util.concurrent.locks.*;
10   import java.util.*;
11  
12   /**
# Line 20 | Line 21 | import java.util.*;
21   * without any real-time guarantees about when, after they are
22   * enabled, they will commence. Tasks scheduled for exactly the same
23   * execution time are enabled in first-in-first-out (FIFO) order of
24 < * submission.
24 > * submission. Cancelled tasks are automatically removed from the
25 > * work queue.
26   *
27   * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
28   * of the inherited tuning methods are not useful for it. In
# Line 149 | Line 151 | public class ScheduledThreadPoolExecutor
151          private final long period;
152  
153          /**
154 +         * Index into delay queue, to support faster cancellation.
155 +         */
156 +        int heapIndex;
157 +
158 +        /**
159           * Creates a one-shot action with given nanoTime-based trigger time.
160           */
161          ScheduledFutureTask(Runnable r, V result, long ns) {
# Line 223 | Line 230 | public class ScheduledThreadPoolExecutor
230                  time = now() - p;
231          }
232  
233 +        public boolean cancel(boolean mayInterruptIfRunning) {
234 +            remove(this); //  unconditionally remove
235 +            return super.cancel(mayInterruptIfRunning);
236 +        }
237 +
238          /**
239           * Overrides FutureTask version so as to reset/requeue if periodic.
240           */
# Line 310 | Line 322 | public class ScheduledThreadPoolExecutor
322                  if (e instanceof RunnableScheduledFuture) {
323                      RunnableScheduledFuture<?> t =
324                          (RunnableScheduledFuture<?>)e;
325 <                    if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
326 <                        t.isCancelled()) { // also remove if already cancelled
315 <                        if (q.remove(t))
316 <                            t.cancel(false);
317 <                    }
325 >                    if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed))
326 >                        t.cancel(false);
327                  }
328              }
329          }
# Line 671 | Line 680 | public class ScheduledThreadPoolExecutor
680      }
681  
682      /**
683 <     * An annoying wrapper class to convince javac to use a
684 <     * DelayQueue<RunnableScheduledFuture> as a BlockingQueue<Runnable>
683 >     * Specialized delay queue. To mesh with TPE declarations, this
684 >     * class must be declared as a BlockingQueue<Runnable> even though
685 >     * it can only hold RunnableScheduledFutures
686       */
687 <    private static class DelayedWorkQueue
678 <        extends AbstractCollection<Runnable>
687 >    static class DelayedWorkQueue extends AbstractQueue<Runnable>
688          implements BlockingQueue<Runnable> {
689  
690 <        private final DelayQueue<RunnableScheduledFuture> dq = new DelayQueue<RunnableScheduledFuture>();
691 <        public Runnable poll() { return dq.poll(); }
692 <        public Runnable peek() { return dq.peek(); }
693 <        public Runnable take() throws InterruptedException { return dq.take(); }
694 <        public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
695 <            return dq.poll(timeout, unit);
690 >        /*
691 >         * A DelayedWorkQueue is based on a heap-based data structure
692 >         * like those in DelayQueue and PriorityQueue, except that
693 >         * every ScheduledFutureTask also records its index into the
694 >         * heap array. This eliminates the need to find a task upon
695 >         * cancellation, greatly speeding up removal (down from O(n)
696 >         * to O(log n)), and reducing garbage retention that would
697 >         * otherwise occur by waiting for the element to rise to top
698 >         * before clearing. But because the queue may also hold
699 >         * RunnableScheduledFutures that are not ScheduledFutureTasks,
700 >         * we are not guaranteed to have such indices available, in
701 >         * which case we fall back to linear search. (We expect that
702 >         * most tasks will not be decorated, and that the faster cases
703 >         * will be much more common.)
704 >         *
705 >         * All heap operations must record index changes -- mainly
706 >         * within siftUp and siftDown. Upon removal, a task's
707 >         * heapIndex is set to -1. Note that ScheduledFutureTasks can
708 >         * appear at most once in the queue (this need not be true for
709 >         * other kinds of tasks or work queues), so are uniquely
710 >         * identified by heapIndex.
711 >         */
712 >
713 >        private static final int INITIAL_CAPACITY = 64;
714 >        private transient RunnableScheduledFuture[] queue =
715 >            new RunnableScheduledFuture[INITIAL_CAPACITY];
716 >        private transient final ReentrantLock lock = new ReentrantLock();
717 >        private transient final Condition available = lock.newCondition();
718 >        private int size = 0;
719 >
720 >
721 >        /**
722 >         * Set f's heapIndex if it is a ScheduledFutureTask
723 >         */
724 >        private void setIndex(Object f, int idx) {
725 >            if (f instanceof ScheduledFutureTask)
726 >                ((ScheduledFutureTask)f).heapIndex = idx;
727 >        }
728 >
729 >        /**
730 >         * Sift element added at bottom up to its heap-ordered spot
731 >         * Call only when holding lock.
732 >         */
733 >        private void siftUp(int k, RunnableScheduledFuture key) {
734 >            while (k > 0) {
735 >                int parent = (k - 1) >>> 1;
736 >                RunnableScheduledFuture e = queue[parent];
737 >                if (key.compareTo(e) >= 0)
738 >                    break;
739 >                queue[k] = e;
740 >                setIndex(e, k);
741 >                k = parent;
742 >            }
743 >            queue[k] = key;
744 >            setIndex(key, k);
745 >        }
746 >
747 >        /**
748 >         * Sift element added at top down to its heap-ordered spot
749 >         * Call only when holding lock.
750 >         */
751 >        private void siftDown(int k, RunnableScheduledFuture key) {
752 >            int half = size >>> 1;        
753 >            while (k < half) {
754 >                int child = (k << 1) + 1;
755 >                RunnableScheduledFuture c = queue[child];
756 >                int right = child + 1;
757 >                if (right < size && c.compareTo(queue[right]) > 0)
758 >                    c = queue[child = right];
759 >                if (key.compareTo(c) <= 0)
760 >                    break;
761 >                queue[k] = c;
762 >                setIndex(c, k);
763 >                k = child;
764 >            }
765 >            queue[k] = key;
766 >            setIndex(key, k);
767 >        }
768 >
769 >        /**
770 >         * Performs common bookkeeping for poll and take: Replaces
771 >         * first element with last; sifts it down, and signals any
772 >         * waiting consumers.  Call only when holding lock.
773 >         * @param f the task to remove and return
774 >         */
775 >        private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
776 >            int s = --size;
777 >            RunnableScheduledFuture x = queue[s];
778 >            queue[s] = null;
779 >            if (s != 0) {
780 >                siftDown(0, x);
781 >                available.signalAll();
782 >            }
783 >            setIndex(f, -1);
784 >            return f;
785 >        }
786 >
787 >        /**
788 >         * Resize the heap array.  Call only when holding lock.
789 >         */
790 >        private void grow() {
791 >            int oldCapacity = queue.length;
792 >            int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
793 >            if (newCapacity < 0) // overflow
794 >                newCapacity = Integer.MAX_VALUE;
795 >            queue = Arrays.copyOf(queue, newCapacity);
796 >        }
797 >
798 >        /**
799 >         * Find index of given object, or -1 if absent
800 >         */
801 >        private int indexOf(Object x) {
802 >            if (x != null) {
803 >                for (int i = 0; i < size; i++)
804 >                    if (x.equals(queue[i]))
805 >                        return i;
806 >            }
807 >            return -1;
808 >        }
809 >
810 >        public boolean remove(Object x) {
811 >            boolean removed;
812 >            final ReentrantLock lock = this.lock;
813 >            lock.lock();
814 >            try {
815 >                int i;
816 >                if (x instanceof ScheduledFutureTask)
817 >                    i = ((ScheduledFutureTask)x).heapIndex;
818 >                else
819 >                    i = indexOf(x);
820 >                if (removed = (i >= 0 && i < size && queue[i] == x)) {
821 >                    setIndex(x, -1);
822 >                    int s = --size;
823 >                    RunnableScheduledFuture replacement = queue[s];
824 >                    queue[s] = null;
825 >                    if (s != i) {
826 >                        siftDown(i, replacement);
827 >                        if (queue[i] == replacement)
828 >                            siftUp(i, replacement);
829 >                    }
830 >                }
831 >            } finally {
832 >                lock.unlock();
833 >            }
834 >            return removed;
835 >        }
836 >
837 >        public int size() {
838 >            int s;
839 >            final ReentrantLock lock = this.lock;
840 >            lock.lock();
841 >            try {
842 >                s = size;
843 >            } finally {
844 >                lock.unlock();
845 >            }
846 >            return s;
847 >        }
848 >
849 >        public boolean isEmpty() {
850 >            return size() == 0;
851 >        }
852 >
853 >        public int remainingCapacity() {
854 >            return Integer.MAX_VALUE;
855 >        }
856 >
857 >        public RunnableScheduledFuture peek() {
858 >            final ReentrantLock lock = this.lock;
859 >            lock.lock();
860 >            try {
861 >                return queue[0];
862 >            } finally {
863 >                lock.unlock();
864 >            }
865          }
866  
689        public boolean add(Runnable x) {
690            return dq.add((RunnableScheduledFuture)x);
691        }
867          public boolean offer(Runnable x) {
868 <            return dq.offer((RunnableScheduledFuture)x);
868 >            if (x == null)
869 >                throw new NullPointerException();
870 >            RunnableScheduledFuture e = (RunnableScheduledFuture)x;
871 >            final ReentrantLock lock = this.lock;
872 >            lock.lock();
873 >            try {
874 >                int i = size;
875 >                if (i >= queue.length)
876 >                    grow();
877 >                size = i + 1;
878 >                boolean notify;
879 >                if (i == 0) {
880 >                    notify = true;
881 >                    queue[0] = e;
882 >                    setIndex(e, 0);
883 >                }
884 >                else {
885 >                    notify = e.compareTo(queue[0]) < 0;
886 >                    siftUp(i, e);
887 >                }
888 >                if (notify)
889 >                    available.signalAll();
890 >            } finally {
891 >                lock.unlock();
892 >            }
893 >            return true;
894          }
895 <        public void put(Runnable x) {
896 <            dq.put((RunnableScheduledFuture)x);
895 >
896 >        public void put(Runnable e) {
897 >            offer(e);
898 >        }
899 >
900 >        public boolean add(Runnable e) {
901 >            return offer(e);
902 >        }
903 >
904 >        public boolean offer(Runnable e, long timeout, TimeUnit unit) {
905 >            return offer(e);
906 >        }
907 >        
908 >        public RunnableScheduledFuture poll() {
909 >            final ReentrantLock lock = this.lock;
910 >            lock.lock();
911 >            try {
912 >                RunnableScheduledFuture first = queue[0];
913 >                if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
914 >                    return null;
915 >                else
916 >                    return finishPoll(first);
917 >            } finally {
918 >                lock.unlock();
919 >            }
920 >        }
921 >
922 >        public RunnableScheduledFuture take() throws InterruptedException {
923 >            final ReentrantLock lock = this.lock;
924 >            lock.lockInterruptibly();
925 >            try {
926 >                for (;;) {
927 >                    RunnableScheduledFuture first = queue[0];
928 >                    if (first == null)
929 >                        available.await();
930 >                    else {
931 >                        long delay =  first.getDelay(TimeUnit.NANOSECONDS);
932 >                        if (delay > 0)
933 >                            available.awaitNanos(delay);
934 >                        else
935 >                            return finishPoll(first);
936 >                    }
937 >                }
938 >            } finally {
939 >                lock.unlock();
940 >            }
941 >        }
942 >
943 >        public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
944 >            throws InterruptedException {
945 >            long nanos = unit.toNanos(timeout);
946 >            final ReentrantLock lock = this.lock;
947 >            lock.lockInterruptibly();
948 >            try {
949 >                for (;;) {
950 >                    RunnableScheduledFuture first = queue[0];
951 >                    if (first == null) {
952 >                        if (nanos <= 0)
953 >                            return null;
954 >                        else
955 >                            nanos = available.awaitNanos(nanos);
956 >                    } else {
957 >                        long delay = first.getDelay(TimeUnit.NANOSECONDS);
958 >                        if (delay > 0) {
959 >                            if (nanos <= 0)
960 >                                return null;
961 >                            if (delay > nanos)
962 >                                delay = nanos;
963 >                            long timeLeft = available.awaitNanos(delay);
964 >                            nanos -= delay - timeLeft;
965 >                        } else
966 >                            return finishPoll(first);
967 >                    }
968 >                }
969 >            } finally {
970 >                lock.unlock();
971 >            }
972 >        }
973 >
974 >        public void clear() {
975 >            final ReentrantLock lock = this.lock;
976 >            lock.lock();
977 >            try {
978 >                for (int i = 0; i < size; i++) {
979 >                    RunnableScheduledFuture t = queue[i];
980 >                    if (t != null) {
981 >                        queue[i] = null;
982 >                        setIndex(t, -1);
983 >                    }
984 >                }
985 >                size = 0;
986 >            } finally {
987 >                lock.unlock();
988 >            }
989          }
990 <        public boolean offer(Runnable x, long timeout, TimeUnit unit) {
991 <            return dq.offer((RunnableScheduledFuture)x, timeout, unit);
990 >
991 >        /**
992 >         * Return and remove first element only if it is expired.
993 >         * Used only by drainTo.  Call only when holding lock.
994 >         */
995 >        private RunnableScheduledFuture pollExpired() {
996 >            RunnableScheduledFuture first = queue[0];
997 >            if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
998 >                return null;
999 >            setIndex(first, -1);
1000 >            int s = --size;
1001 >            RunnableScheduledFuture x = queue[s];
1002 >            queue[s] = null;
1003 >            if (s != 0)
1004 >                siftDown(0, x);
1005 >            return first;
1006 >        }
1007 >
1008 >        public int drainTo(Collection<? super Runnable> c) {
1009 >            if (c == null)
1010 >                throw new NullPointerException();
1011 >            if (c == this)
1012 >                throw new IllegalArgumentException();
1013 >            final ReentrantLock lock = this.lock;
1014 >            lock.lock();
1015 >            try {
1016 >                int n = 0;
1017 >                for (;;) {
1018 >                    RunnableScheduledFuture first = pollExpired();
1019 >                    if (first != null) {
1020 >                        c.add(first);
1021 >                        ++n;
1022 >                    }
1023 >                    else
1024 >                        break;
1025 >                }
1026 >                if (n > 0)
1027 >                    available.signalAll();
1028 >                return n;
1029 >            } finally {
1030 >                lock.unlock();
1031 >            }
1032          }
1033  
702        public Runnable remove() { return dq.remove(); }
703        public Runnable element() { return dq.element(); }
704        public void clear() { dq.clear(); }
705        public int drainTo(Collection<? super Runnable> c) { return dq.drainTo(c); }
1034          public int drainTo(Collection<? super Runnable> c, int maxElements) {
1035 <            return dq.drainTo(c, maxElements);
1035 >            if (c == null)
1036 >                throw new NullPointerException();
1037 >            if (c == this)
1038 >                throw new IllegalArgumentException();
1039 >            if (maxElements <= 0)
1040 >                return 0;
1041 >            final ReentrantLock lock = this.lock;
1042 >            lock.lock();
1043 >            try {
1044 >                int n = 0;
1045 >                while (n < maxElements) {
1046 >                    RunnableScheduledFuture first = pollExpired();
1047 >                    if (first != null) {
1048 >                        c.add(first);
1049 >                        ++n;
1050 >                    }
1051 >                    else
1052 >                        break;
1053 >                }
1054 >                if (n > 0)
1055 >                    available.signalAll();
1056 >                return n;
1057 >            } finally {
1058 >                lock.unlock();
1059 >            }
1060 >        }
1061 >
1062 >        public Object[] toArray() {
1063 >            final ReentrantLock lock = this.lock;
1064 >            lock.lock();
1065 >            try {
1066 >                return Arrays.copyOf(queue, size);
1067 >            } finally {
1068 >                lock.unlock();
1069 >            }
1070 >        }
1071 >
1072 >        public <T> T[] toArray(T[] a) {
1073 >            final ReentrantLock lock = this.lock;
1074 >            lock.lock();
1075 >            try {
1076 >                if (a.length < size)
1077 >                    return (T[]) Arrays.copyOf(queue, size, a.getClass());
1078 >                System.arraycopy(queue, 0, a, 0, size);
1079 >                if (a.length > size)
1080 >                    a[size] = null;
1081 >                return a;
1082 >            } finally {
1083 >                lock.unlock();
1084 >            }
1085          }
1086  
710        public int remainingCapacity() { return dq.remainingCapacity(); }
711        public boolean remove(Object x) { return dq.remove(x); }
712        public boolean contains(Object x) { return dq.contains(x); }
713        public int size() { return dq.size(); }
714        public boolean isEmpty() { return dq.isEmpty(); }
715        public Object[] toArray() { return dq.toArray(); }
716        public <T> T[] toArray(T[] array) { return dq.toArray(array); }
1087          public Iterator<Runnable> iterator() {
1088 <            return new Iterator<Runnable>() {
1089 <                private Iterator<RunnableScheduledFuture> it = dq.iterator();
1090 <                public boolean hasNext() { return it.hasNext(); }
1091 <                public Runnable next() { return it.next(); }
1092 <                public void remove() { it.remove(); }
1093 <            };
1088 >            return new Itr(toArray());
1089 >        }
1090 >        
1091 >        /**
1092 >         * Snapshot iterator that works off copy of underlying q array.
1093 >         */
1094 >        private class Itr implements Iterator<Runnable> {
1095 >            final Object[] array; // Array of all elements
1096 >            int cursor;           // index of next element to return;
1097 >            int lastRet;          // index of last element, or -1 if no such
1098 >            
1099 >            Itr(Object[] array) {
1100 >                lastRet = -1;
1101 >                this.array = array;
1102 >            }
1103 >            
1104 >            public boolean hasNext() {
1105 >                return cursor < array.length;
1106 >            }
1107 >            
1108 >            public Runnable next() {
1109 >                if (cursor >= array.length)
1110 >                    throw new NoSuchElementException();
1111 >                lastRet = cursor;
1112 >                return (Runnable)array[cursor++];
1113 >            }
1114 >            
1115 >            public void remove() {
1116 >                if (lastRet < 0)
1117 >                    throw new IllegalStateException();
1118 >                DelayedWorkQueue.this.remove(array[lastRet]);
1119 >                lastRet = -1;
1120 >            }
1121          }
1122      }
1123   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines