t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
/**
* Executes {@code command} with zero required delay.
* This has effect equivalent to
* {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
* Note that inspections of the queue and of the list returned by
* {@code shutdownNow} will access the zero-delayed
* {@link ScheduledFuture}, not the {@code command} itself.
*
* A consequence of the use of {@code ScheduledFuture} objects is
* that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
* called with a null second {@code Throwable} argument, even if the
* {@code command} terminated abruptly. Instead, the {@code Throwable}
* thrown by such a task can be obtained via {@link Future#get}.
*
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution because the
* executor has been shut down
* @throws NullPointerException {@inheritDoc}
*/
public void execute(Runnable command) {
schedule(command, 0, TimeUnit.NANOSECONDS);
}
// Override AbstractExecutorService methods
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future> submit(Runnable task) {
return schedule(task, 0, TimeUnit.NANOSECONDS);
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future submit(Runnable task, T result) {
return schedule(Executors.callable(task, result),
0, TimeUnit.NANOSECONDS);
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future submit(Callable task) {
return schedule(task, 0, TimeUnit.NANOSECONDS);
}
/**
* Sets the policy on whether to continue executing existing
* periodic tasks even when this executor has been {@code shutdown}.
* In this case, these tasks will only terminate upon
* {@code shutdownNow} or after setting the policy to
* {@code false} when already shutdown.
* This value is by default {@code false}.
*
* @param value if {@code true}, continue after shutdown, else don't.
* @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
*/
public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
continueExistingPeriodicTasksAfterShutdown = value;
if (!value && isShutdown())
onShutdown();
}
/**
* Gets the policy on whether to continue executing existing
* periodic tasks even when this executor has been {@code shutdown}.
* In this case, these tasks will only terminate upon
* {@code shutdownNow} or after setting the policy to
* {@code false} when already shutdown.
* This value is by default {@code false}.
*
* @return {@code true} if will continue after shutdown
* @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
*/
public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
return continueExistingPeriodicTasksAfterShutdown;
}
/**
* Sets the policy on whether to execute existing delayed
* tasks even when this executor has been {@code shutdown}.
* In this case, these tasks will only terminate upon
* {@code shutdownNow}, or after setting the policy to
* {@code false} when already shutdown.
* This value is by default {@code true}.
*
* @param value if {@code true}, execute after shutdown, else don't.
* @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
*/
public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
executeExistingDelayedTasksAfterShutdown = value;
if (!value && isShutdown())
onShutdown();
}
/**
* Gets the policy on whether to execute existing delayed
* tasks even when this executor has been {@code shutdown}.
* In this case, these tasks will only terminate upon
* {@code shutdownNow}, or after setting the policy to
* {@code false} when already shutdown.
* This value is by default {@code true}.
*
* @return {@code true} if will execute after shutdown
* @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
*/
public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
return executeExistingDelayedTasksAfterShutdown;
}
/**
* Sets the policy on whether cancellation of a task should remove
* it from the work queue. This value is by default {@code false}.
*
* @param value if {@code true}, remove on cancellation, else don't
* @see #getRemoveOnCancelPolicy
* @since 1.7
*/
public void setRemoveOnCancelPolicy(boolean value) {
removeOnCancel = value;
}
/**
* Gets the policy on whether cancellation of a task should remove
* it from the work queue. This value is by default {@code false}.
*
* @return {@code true} if cancelled tasks are removed from the queue
* @see #setRemoveOnCancelPolicy
* @since 1.7
*/
public boolean getRemoveOnCancelPolicy() {
return removeOnCancel;
}
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted. If the
* {@code ExecuteExistingDelayedTasksAfterShutdownPolicy} has
* been set {@code false}, existing delayed tasks whose delays
* have not yet elapsed are cancelled. And unless the
* {@code ContinueExistingPeriodicTasksAfterShutdownPolicy} has
* been set {@code true}, future executions of existing periodic
* tasks will be cancelled.
*
* @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
super.shutdown();
}
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution.
*
* There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via {@link Thread#interrupt}, so any task that
* fails to respond to interrupts may never terminate.
*
* @return list of tasks that never commenced execution.
* Each element of this list is a {@link ScheduledFuture},
* including those tasks submitted using {@code execute},
* which are for scheduling purposes used as the basis of a
* zero-delay {@code ScheduledFuture}.
* @throws SecurityException {@inheritDoc}
*/
public List shutdownNow() {
return super.shutdownNow();
}
/**
* Returns the task queue used by this executor. Each element of
* this queue is a {@link ScheduledFuture}, including those
* tasks submitted using {@code execute} which are for scheduling
* purposes used as the basis of a zero-delay
* {@code ScheduledFuture}. Iteration over this queue is
* not guaranteed to traverse tasks in the order in
* which they will execute.
*
* @return the task queue
*/
public BlockingQueue getQueue() {
return super.getQueue();
}
/**
* Specialized delay queue. To mesh with TPE declarations, this
* class must be declared as a BlockingQueue even though
* it can only hold RunnableScheduledFutures.
*/
static class DelayedWorkQueue extends AbstractQueue
implements BlockingQueue {
/*
* A DelayedWorkQueue is based on a heap-based data structure
* like those in DelayQueue and PriorityQueue, except that
* every ScheduledFutureTask also records its index into the
* heap array. This eliminates the need to find a task upon
* cancellation, greatly speeding up removal (down from O(n)
* to O(log n)), and reducing garbage retention that would
* otherwise occur by waiting for the element to rise to top
* before clearing. But because the queue may also hold
* RunnableScheduledFutures that are not ScheduledFutureTasks,
* we are not guaranteed to have such indices available, in
* which case we fall back to linear search. (We expect that
* most tasks will not be decorated, and that the faster cases
* will be much more common.)
*
* All heap operations must record index changes -- mainly
* within siftUp and siftDown. Upon removal, a task's
* heapIndex is set to -1. Note that ScheduledFutureTasks can
* appear at most once in the queue (this need not be true for
* other kinds of tasks or work queues), so are uniquely
* identified by heapIndex.
*/
private static final int INITIAL_CAPACITY = 64;
private transient RunnableScheduledFuture[] queue =
new RunnableScheduledFuture[INITIAL_CAPACITY];
private transient final ReentrantLock lock = new ReentrantLock();
private transient final Condition available = lock.newCondition();
private int size = 0;
/**
* Set f's heapIndex if it is a ScheduledFutureTask.
*/
private void setIndex(RunnableScheduledFuture f, int idx) {
if (f instanceof ScheduledFutureTask)
((ScheduledFutureTask)f).heapIndex = idx;
}
/**
* Sift element added at bottom up to its heap-ordered spot.
* Call only when holding lock.
*/
private void siftUp(int k, RunnableScheduledFuture key) {
while (k > 0) {
int parent = (k - 1) >>> 1;
RunnableScheduledFuture e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
/**
* Sift element added at top down to its heap-ordered spot.
* Call only when holding lock.
*/
private void siftDown(int k, RunnableScheduledFuture key) {
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1;
RunnableScheduledFuture c = queue[child];
int right = child + 1;
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
if (key.compareTo(c) <= 0)
break;
queue[k] = c;
setIndex(c, k);
k = child;
}
queue[k] = key;
setIndex(key, k);
}
/**
* Performs common bookkeeping for poll and take: Replaces
* first element with last; sifts it down, and signals any
* waiting consumers. Call only when holding lock.
* @param f the task to remove and return
*/
private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
int s = --size;
RunnableScheduledFuture x = queue[s];
queue[s] = null;
if (s != 0) {
siftDown(0, x);
available.signalAll();
}
setIndex(f, -1);
return f;
}
/**
* Resize the heap array. Call only when holding lock.
*/
private void grow() {
int oldCapacity = queue.length;
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;
queue = Arrays.copyOf(queue, newCapacity);
}
/**
* Find index of given object, or -1 if absent
*/
private int indexOf(Object x) {
if (x != null) {
if (x instanceof ScheduledFutureTask) {
int i = ((ScheduledFutureTask) x).heapIndex;
// Sanity check; x could conceivably be a
// ScheduledFutureTask from some other pool.
if (i >= 0 && i < size && queue[i] == x)
return i;
} else {
for (int i = 0; i < size; i++)
if (x.equals(queue[i]))
return i;
}
}
return -1;
}
public boolean contains(Object x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return indexOf(x) != -1;
} finally {
lock.unlock();
}
}
public boolean remove(Object x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = indexOf(x);
if (i < 0)
return false;
setIndex(queue[i], -1);
int s = --size;
RunnableScheduledFuture replacement = queue[s];
queue[s] = null;
if (s != i) {
siftDown(i, replacement);
if (queue[i] == replacement)
siftUp(i, replacement);
}
return true;
} finally {
lock.unlock();
}
}
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return size;
} finally {
lock.unlock();
}
}
public boolean isEmpty() {
return size() == 0;
}
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
public RunnableScheduledFuture peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return queue[0];
} finally {
lock.unlock();
}
}
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture e = (RunnableScheduledFuture)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
if (queue[0] == e)
available.signalAll();
} finally {
lock.unlock();
}
return true;
}
public void put(Runnable e) {
offer(e);
}
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable e, long timeout, TimeUnit unit) {
return offer(e);
}
public RunnableScheduledFuture poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
RunnableScheduledFuture first = queue[0];
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
return null;
else
return finishPoll(first);
} finally {
lock.unlock();
}
}
public RunnableScheduledFuture take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0)
available.awaitNanos(delay);
else
return finishPoll(first);
}
}
} finally {
lock.unlock();
}
}
public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture first = queue[0];
if (first == null) {
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
if (nanos <= 0)
return null;
if (delay > nanos)
delay = nanos;
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} else
return finishPoll(first);
}
}
} finally {
lock.unlock();
}
}
public void clear() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
for (int i = 0; i < size; i++) {
RunnableScheduledFuture t = queue[i];
if (t != null) {
queue[i] = null;
setIndex(t, -1);
}
}
size = 0;
} finally {
lock.unlock();
}
}
/**
* Return and remove first element only if it is expired.
* Used only by drainTo. Call only when holding lock.
*/
private RunnableScheduledFuture pollExpired() {
RunnableScheduledFuture first = queue[0];
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
return null;
setIndex(first, -1);
int s = --size;
RunnableScheduledFuture x = queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
return first;
}
public int drainTo(Collection super Runnable> c) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
RunnableScheduledFuture first;
int n = 0;
while ((first = pollExpired()) != null) {
c.add(first);
++n;
}
return n;
} finally {
lock.unlock();
}
}
public int drainTo(Collection super Runnable> c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final ReentrantLock lock = this.lock;
lock.lock();
try {
RunnableScheduledFuture first;
int n = 0;
while (n < maxElements && (first = pollExpired()) != null) {
c.add(first);
++n;
}
return n;
} finally {
lock.unlock();
}
}
public Object[] toArray() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return Arrays.copyOf(queue, size, Object[].class);
} finally {
lock.unlock();
}
}
@SuppressWarnings("unchecked")
public T[] toArray(T[] a) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (a.length < size)
return (T[]) Arrays.copyOf(queue, size, a.getClass());
System.arraycopy(queue, 0, a, 0, size);
if (a.length > size)
a[size] = null;
return a;
} finally {
lock.unlock();
}
}
public Iterator iterator() {
return new Itr(Arrays.copyOf(queue, size));
}
/**
* Snapshot iterator that works off copy of underlying q array.
*/
private class Itr implements Iterator {
final RunnableScheduledFuture[] array;
int cursor = 0; // index of next element to return
int lastRet = -1; // index of last element, or -1 if no such
Itr(RunnableScheduledFuture[] array) {
this.array = array;
}
public boolean hasNext() {
return cursor < array.length;
}
public Runnable next() {
if (cursor >= array.length)
throw new NoSuchElementException();
lastRet = cursor;
return array[cursor++];
}
public void remove() {
if (lastRet < 0)
throw new IllegalStateException();
DelayedWorkQueue.this.remove(array[lastRet]);
lastRet = -1;
}
}
}
}