--- jsr166/src/jsr166y/ForkJoinPool.java 2009/03/19 05:10:42 1.5 +++ jsr166/src/jsr166y/ForkJoinPool.java 2009/07/16 15:32:34 1.6 @@ -27,7 +27,9 @@ import java.lang.reflect.*; * (eventually blocking if none exist). This makes them efficient when * most tasks spawn other subtasks (as do most ForkJoinTasks), as well * as the mixed execution of some plain Runnable- or Callable- based - * activities along with ForkJoinTasks. Otherwise, other + * activities along with ForkJoinTasks. When setting + * setAsyncMode, a ForkJoinPools may also be appropriate for + * use with fine-grained tasks that are never joined. Otherwise, other * ExecutorService implementations are typically more appropriate * choices. * @@ -39,7 +41,7 @@ import java.lang.reflect.*; * nested ManagedBlocker interface enables extension of * the kinds of synchronization accommodated. The target parallelism * level may also be changed dynamically (setParallelism) - * and dynamically thread construction can be limited using methods + * and thread construction can be limited using methods * setMaximumPoolSize and/or * setMaintainsParallelism. * @@ -131,11 +133,11 @@ public class ForkJoinPool extends Abstra new AtomicInteger(); /** - * Array holding all worker threads in the pool. Array size must - * be a power of two. Updates and replacements are protected by - * workerLock, but it is always kept in a consistent enough state - * to be randomly accessed without locking by workers performing - * work-stealing. + * Array holding all worker threads in the pool. Initialized upon + * first use. Array size must be a power of two. Updates and + * replacements are protected by workerLock, but it is always kept + * in a consistent enough state to be randomly accessed without + * locking by workers performing work-stealing. */ volatile ForkJoinWorkerThread[] workers; @@ -204,6 +206,11 @@ public class ForkJoinPool extends Abstra private volatile int parallelism; /** + * True if use local fifo, not default lifo, for local polling + */ + private volatile boolean locallyFifo; + + /** * Holds number of total (i.e., created and not yet terminated) * and running (i.e., not blocked on joins or other managed sync) * threads, packed into one int to ensure consistent snapshot when @@ -391,7 +398,7 @@ public class ForkJoinPool extends Abstra this.termination = workerLock.newCondition(); this.stealCount = new AtomicLong(); this.submissionQueue = new LinkedTransferQueue>(); - createAndStartInitialWorkers(parallelism); + // worker array and workers are lazily constructed } /** @@ -405,6 +412,7 @@ public class ForkJoinPool extends Abstra if (w != null) { w.poolIndex = index; w.setDaemon(true); + w.setAsyncMode(locallyFifo); w.setName("ForkJoinPool-" + poolNumber + "-worker-" + index); if (h != null) w.setUncaughtExceptionHandler(h); @@ -421,7 +429,8 @@ public class ForkJoinPool extends Abstra } /** - * Create or resize array if necessary to hold newLength + * Create or resize array if necessary to hold newLength. + * Call only under exlusion or lock * @return the array */ private ForkJoinWorkerThread[] ensureWorkerArrayCapacity(int newLength) { @@ -439,35 +448,42 @@ public class ForkJoinPool extends Abstra */ private void tryShrinkWorkerArray() { ForkJoinWorkerThread[] ws = workers; - int len = ws.length; - int last = len - 1; - while (last >= 0 && ws[last] == null) - --last; - int newLength = arraySizeFor(last+1); - if (newLength < len) - workers = Arrays.copyOf(ws, newLength); + if (ws != null) { + int len = ws.length; + int last = len - 1; + while (last >= 0 && ws[last] == null) + --last; + int newLength = arraySizeFor(last+1); + if (newLength < len) + workers = Arrays.copyOf(ws, newLength); + } } /** - * Initial worker array and worker creation and startup. (This - * must be done under lock to avoid interference by some of the - * newly started threads while creating others.) + * Initialize workers if necessary */ - private void createAndStartInitialWorkers(int ps) { - final ReentrantLock lock = this.workerLock; - lock.lock(); - try { - ForkJoinWorkerThread[] ws = ensureWorkerArrayCapacity(ps); - for (int i = 0; i < ps; ++i) { - ForkJoinWorkerThread w = createWorker(i); - if (w != null) { - ws[i] = w; - w.start(); - updateWorkerCount(1); + final void ensureWorkerInitialization() { + ForkJoinWorkerThread[] ws = workers; + if (ws == null) { + final ReentrantLock lock = this.workerLock; + lock.lock(); + try { + ws = workers; + if (ws == null) { + int ps = parallelism; + ws = ensureWorkerArrayCapacity(ps); + for (int i = 0; i < ps; ++i) { + ForkJoinWorkerThread w = createWorker(i); + if (w != null) { + ws[i] = w; + w.start(); + updateWorkerCount(1); + } + } } + } finally { + lock.unlock(); } - } finally { - lock.unlock(); } } @@ -513,6 +529,8 @@ public class ForkJoinPool extends Abstra private void doSubmit(ForkJoinTask task) { if (isShutdown()) throw new RejectedExecutionException(); + if (workers == null) + ensureWorkerInitialization(); submissionQueue.offer(task); signalIdleWorkers(); } @@ -683,10 +701,12 @@ public class ForkJoinPool extends Abstra old = ueh; ueh = h; ForkJoinWorkerThread[] ws = workers; - for (int i = 0; i < ws.length; ++i) { - ForkJoinWorkerThread w = ws[i]; - if (w != null) - w.setUncaughtExceptionHandler(h); + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread w = ws[i]; + if (w != null) + w.setUncaughtExceptionHandler(h); + } } } finally { lock.unlock(); @@ -793,6 +813,42 @@ public class ForkJoinPool extends Abstra } /** + * Establishes local first-in-first-out scheduling mode for forked + * tasks that are never joined. This mode may be more appropriate + * than default locally stack-based mode in applications in which + * worker threads only process asynchronous tasks. This method is + * designed to be invoked only when pool is quiescent, and + * typically only before any tasks are submitted. The effects of + * invocations at ather times may be unpredictable. + * + * @param async if true, use locally FIFO scheduling + * @return the previous mode. + */ + public boolean setAsyncMode(boolean async) { + boolean oldMode = locallyFifo; + locallyFifo = async; + ForkJoinWorkerThread[] ws = workers; + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread t = ws[i]; + if (t != null) + t.setAsyncMode(async); + } + } + return oldMode; + } + + /** + * Returns true if this pool uses local first-in-first-out + * scheduling mode for forked tasks that are never joined. + * + * @return true if this pool uses async mode. + */ + public boolean getAsyncMode() { + return locallyFifo; + } + + /** * Returns an estimate of the number of worker threads that are * not blocked waiting to join tasks or for other managed * synchronization. @@ -874,10 +930,12 @@ public class ForkJoinPool extends Abstra public long getQueuedTaskCount() { long count = 0; ForkJoinWorkerThread[] ws = workers; - for (int i = 0; i < ws.length; ++i) { - ForkJoinWorkerThread t = ws[i]; - if (t != null) - count += t.getQueueSize(); + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread t = ws[i]; + if (t != null) + count += t.getQueueSize(); + } } return count; } @@ -912,6 +970,35 @@ public class ForkJoinPool extends Abstra } /** + * Removes all available unexecuted submitted and forked tasks + * from scheduling queues and adds them to the given collection, + * without altering their execution status. These may include + * artifically generated or wrapped tasks. This method id designed + * to be invoked only when the pool is known to be + * quiescent. Invocations at other times may not remove all + * tasks. A failure encountered while attempting to add elements + * to collection c may result in elements being in + * neither, either or both collections when the associated + * exception is thrown. The behavior of this operation is + * undefined if the specified collection is modified while the + * operation is in progress. + * @param c the collection to transfer elements into + * @return the number of elements transferred + */ + protected int drainTasksTo(Collection> c) { + int n = submissionQueue.drainTo(c); + ForkJoinWorkerThread[] ws = workers; + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread w = ws[i]; + if (w != null) + n += w.drainTasksTo(c); + } + } + return n; + } + + /** * Returns a string identifying this pool, as well as its state, * including indications of run state, parallelism level, and * worker and task counts. @@ -972,8 +1059,10 @@ public class ForkJoinPool extends Abstra * waiting tasks. Tasks that are in the process of being * submitted or executed concurrently during the course of this * method may or may not be rejected. Unlike some other executors, - * this method cancels rather than collects non-executed tasks, - * so always returns an empty list. + * this method cancels rather than collects non-executed tasks + * upon termination, so always returns an empty list. However, you + * can use method drainTasksTo before invoking this + * method to transfer unexecuted tasks to another collection. * @return an empty list * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads @@ -1058,17 +1147,19 @@ public class ForkJoinPool extends Abstra lock.lock(); try { ForkJoinWorkerThread[] ws = workers; - int idx = w.poolIndex; - if (idx >= 0 && idx < ws.length && ws[idx] == w) - ws[idx] = null; - if (totalCountOf(workerCounts) == 0) { - terminate(); // no-op if already terminating - transitionRunStateTo(TERMINATED); - termination.signalAll(); - } - else if (!isTerminating()) { - tryShrinkWorkerArray(); - tryResumeSpare(true); // allow replacement + if (ws != null) { + int idx = w.poolIndex; + if (idx >= 0 && idx < ws.length && ws[idx] == w) + ws[idx] = null; + if (totalCountOf(workerCounts) == 0) { + terminate(); // no-op if already terminating + transitionRunStateTo(TERMINATED); + termination.signalAll(); + } + else if (!isTerminating()) { + tryShrinkWorkerArray(); + tryResumeSpare(true); // allow replacement + } } } finally { lock.unlock(); @@ -1116,10 +1207,12 @@ public class ForkJoinPool extends Abstra lock.lock(); try { ForkJoinWorkerThread[] ws = workers; - for (int i = 0; i < ws.length; ++i) { - ForkJoinWorkerThread t = ws[i]; - if (t != null) - t.cancelTasks(); + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread t = ws[i]; + if (t != null) + t.cancelTasks(); + } } } finally { lock.unlock(); @@ -1135,10 +1228,12 @@ public class ForkJoinPool extends Abstra lock.lock(); try { ForkJoinWorkerThread[] ws = workers; - for (int i = 0; i < ws.length; ++i) { - ForkJoinWorkerThread t = ws[i]; - if (t != null) - t.shutdownNow(); + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread t = ws[i]; + if (t != null) + t.shutdownNow(); + } } } finally { lock.unlock(); @@ -1155,12 +1250,14 @@ public class ForkJoinPool extends Abstra lock.lock(); try { ForkJoinWorkerThread[] ws = workers; - for (int i = 0; i < ws.length; ++i) { - ForkJoinWorkerThread t = ws[i]; - if (t != null && !t.isTerminated()) { - try { - t.interrupt(); - } catch (SecurityException ignore) { + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread t = ws[i]; + if (t != null && !t.isTerminated()) { + try { + t.interrupt(); + } catch (SecurityException ignore) { + } } } }