ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.81 by jsr166, Mon Sep 20 20:42:36 2010 UTC vs.
Revision 1.82 by dl, Sun Oct 10 11:56:11 2010 UTC

# Line 1019 | Line 1019 | public class ForkJoinPool extends Abstra
1019          int retries = 2 + (parallelism >> 2); // #helpJoins before blocking
1020          while (joinMe.status >= 0) {
1021              int wc;
1022 +            if (runState >= TERMINATING) {
1023 +                joinMe.cancelIgnoringExceptions();
1024 +                break;
1025 +            }
1026              worker.helpJoinTask(joinMe);
1027              if (joinMe.status < 0)
1028                  break;
# Line 1296 | Line 1300 | public class ForkJoinPool extends Abstra
1300      // Execution methods
1301  
1302      /**
1303 <     * Common code for execute, invoke and submit
1303 >     * Submits task and creates, starts, or resumes some workers if necessary
1304       */
1305      private <T> void doSubmit(ForkJoinTask<T> task) {
1302        if (task == null)
1303            throw new NullPointerException();
1304        if (runState >= SHUTDOWN)
1305            throw new RejectedExecutionException();
1306          submissionQueue.offer(task);
1307          int c; // try to increment event count -- CAS failure OK
1308          UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
1309 <        helpMaintainParallelism(); // create, start, or resume some workers
1309 >        helpMaintainParallelism();
1310      }
1311  
1312      /**
# Line 1319 | Line 1319 | public class ForkJoinPool extends Abstra
1319       *         scheduled for execution
1320       */
1321      public <T> T invoke(ForkJoinTask<T> task) {
1322 <        doSubmit(task);
1323 <        return task.join();
1322 >        if (task == null)
1323 >            throw new NullPointerException();
1324 >        if (runState >= SHUTDOWN)
1325 >            throw new RejectedExecutionException();
1326 >        Thread t = Thread.currentThread();
1327 >        if ((t instanceof ForkJoinWorkerThread) &&
1328 >            ((ForkJoinWorkerThread)t).pool == this)
1329 >            return task.invoke();  // bypass submit if in same pool
1330 >        else {
1331 >            doSubmit(task);
1332 >            return task.join();
1333 >        }
1334 >    }
1335 >
1336 >    /**
1337 >     * Unless terminating, forks task if within an ongoing FJ
1338 >     * computation in the current pool, else submits as external task.
1339 >     */
1340 >    private <T> void forkOrSubmit(ForkJoinTask<T> task) {
1341 >        if (runState >= SHUTDOWN)
1342 >            throw new RejectedExecutionException();
1343 >        Thread t = Thread.currentThread();
1344 >        if ((t instanceof ForkJoinWorkerThread) &&
1345 >            ((ForkJoinWorkerThread)t).pool == this)
1346 >            task.fork();
1347 >        else
1348 >            doSubmit(task);
1349      }
1350  
1351      /**
# Line 1332 | Line 1357 | public class ForkJoinPool extends Abstra
1357       *         scheduled for execution
1358       */
1359      public void execute(ForkJoinTask<?> task) {
1360 <        doSubmit(task);
1360 >        if (task == null)
1361 >            throw new NullPointerException();
1362 >        forkOrSubmit(task);
1363      }
1364  
1365      // AbstractExecutorService methods
# Line 1343 | Line 1370 | public class ForkJoinPool extends Abstra
1370       *         scheduled for execution
1371       */
1372      public void execute(Runnable task) {
1373 +        if (task == null)
1374 +            throw new NullPointerException();
1375          ForkJoinTask<?> job;
1376          if (task instanceof ForkJoinTask<?>) // avoid re-wrap
1377              job = (ForkJoinTask<?>) task;
1378          else
1379              job = ForkJoinTask.adapt(task, null);
1380 <        doSubmit(job);
1380 >        forkOrSubmit(job);
1381      }
1382  
1383      /**
# Line 1361 | Line 1390 | public class ForkJoinPool extends Abstra
1390       *         scheduled for execution
1391       */
1392      public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
1393 <        doSubmit(task);
1393 >        if (task == null)
1394 >            throw new NullPointerException();
1395 >        forkOrSubmit(task);
1396          return task;
1397      }
1398  
# Line 1371 | Line 1402 | public class ForkJoinPool extends Abstra
1402       *         scheduled for execution
1403       */
1404      public <T> ForkJoinTask<T> submit(Callable<T> task) {
1405 +        if (task == null)
1406 +            throw new NullPointerException();
1407          ForkJoinTask<T> job = ForkJoinTask.adapt(task);
1408 <        doSubmit(job);
1408 >        forkOrSubmit(job);
1409          return job;
1410      }
1411  
# Line 1382 | Line 1415 | public class ForkJoinPool extends Abstra
1415       *         scheduled for execution
1416       */
1417      public <T> ForkJoinTask<T> submit(Runnable task, T result) {
1418 +        if (task == null)
1419 +            throw new NullPointerException();
1420          ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
1421 <        doSubmit(job);
1421 >        forkOrSubmit(job);
1422          return job;
1423      }
1424  
# Line 1393 | Line 1428 | public class ForkJoinPool extends Abstra
1428       *         scheduled for execution
1429       */
1430      public ForkJoinTask<?> submit(Runnable task) {
1431 +        if (task == null)
1432 +            throw new NullPointerException();
1433          ForkJoinTask<?> job;
1434          if (task instanceof ForkJoinTask<?>) // avoid re-wrap
1435              job = (ForkJoinTask<?>) task;
1436          else
1437              job = ForkJoinTask.adapt(task, null);
1438 <        doSubmit(job);
1438 >        forkOrSubmit(job);
1439          return job;
1440      }
1441  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines