ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.43 by jsr166, Tue Aug 4 00:55:13 2009 UTC vs.
Revision 1.51 by dl, Fri Dec 4 15:46:38 2009 UTC

# Line 23 | Line 23 | import java.util.concurrent.atomic.Atomi
23   * An {@link ExecutorService} for running {@link ForkJoinTask}s.
24   * A {@code ForkJoinPool} provides the entry point for submissions
25   * from non-{@code ForkJoinTask}s, as well as management and
26 < * monitoring operations.  
26 > * monitoring operations.
27   *
28   * <p>A {@code ForkJoinPool} differs from other kinds of {@link
29   * ExecutorService} mainly by virtue of employing
# Line 45 | Line 45 | import java.util.concurrent.atomic.Atomi
45   * #setMaintainsParallelism}, the pool attempts to maintain this
46   * number of active (or available) threads by dynamically adding,
47   * suspending, or resuming internal worker threads, even if some tasks
48 < * are waiting to join others. However, no such adjustments are
49 < * performed in the face of blocked IO or other unmanaged
48 > * are stalled waiting to join others. However, no such adjustments
49 > * are performed in the face of blocked IO or other unmanaged
50   * synchronization. The nested {@link ManagedBlocker} interface
51   * enables extension of the kinds of synchronization accommodated.
52   * The target parallelism level may also be changed dynamically
# Line 82 | Line 82 | import java.util.concurrent.atomic.Atomi
82   *
83   * <p><b>Implementation notes</b>: This implementation restricts the
84   * maximum number of running threads to 32767. Attempts to create
85 < * pools with greater than the maximum result in
85 > * pools with greater than the maximum number result in
86   * {@code IllegalArgumentException}.
87   *
88 + * <p>This implementation rejects submitted tasks (that is, by throwing
89 + * {@link RejectedExecutionException}) only when the pool is shut down.
90 + *
91   * @since 1.7
92   * @author Doug Lea
93   */
# Line 112 | Line 115 | public class ForkJoinPool extends Abstra
115           * Returns a new worker thread operating in the given pool.
116           *
117           * @param pool the pool this thread works in
118 <         * @throws NullPointerException if pool is null
118 >         * @throws NullPointerException if the pool is null
119           */
120          public ForkJoinWorkerThread newThread(ForkJoinPool pool);
121      }
# Line 384 | Line 387 | public class ForkJoinPool extends Abstra
387       *
388       * @param parallelism the parallelism level
389       * @throws IllegalArgumentException if parallelism less than or
390 <     * equal to zero
390 >     *         equal to zero, or greater than implementation limit
391       * @throws SecurityException if a security manager exists and
392       *         the caller is not permitted to modify threads
393       *         because it does not hold {@link
# Line 400 | Line 403 | public class ForkJoinPool extends Abstra
403       * thread factory.
404       *
405       * @param factory the factory for creating new threads
406 <     * @throws NullPointerException if factory is null
406 >     * @throws NullPointerException if the factory is null
407       * @throws SecurityException if a security manager exists and
408       *         the caller is not permitted to modify threads
409       *         because it does not hold {@link
# Line 417 | Line 420 | public class ForkJoinPool extends Abstra
420       * @param parallelism the parallelism level
421       * @param factory the factory for creating new threads
422       * @throws IllegalArgumentException if parallelism less than or
423 <     * equal to zero, or greater than implementation limit
424 <     * @throws NullPointerException if factory is null
423 >     *         equal to zero, or greater than implementation limit
424 >     * @throws NullPointerException if the factory is null
425       * @throws SecurityException if a security manager exists and
426       *         the caller is not permitted to modify threads
427       *         because it does not hold {@link
# Line 523 | Line 526 | public class ForkJoinPool extends Abstra
526                  ws = workers;
527                  if (ws == null) {
528                      int ps = parallelism;
529 +                    updateWorkerCount(ps);
530                      ws = ensureWorkerArrayCapacity(ps);
531                      for (int i = 0; i < ps; ++i) {
532                          ForkJoinWorkerThread w = createWorker(i);
533                          if (w != null) {
534                              ws[i] = w;
535                              w.start();
532                            updateWorkerCount(1);
536                          }
537 +                        else
538 +                            updateWorkerCount(-1);
539                      }
540                  }
541              } finally {
# Line 594 | Line 599 | public class ForkJoinPool extends Abstra
599       *
600       * @param task the task
601       * @return the task's result
602 <     * @throws NullPointerException if task is null
603 <     * @throws RejectedExecutionException if pool is shut down
602 >     * @throws NullPointerException if the task is null
603 >     * @throws RejectedExecutionException if the task cannot be
604 >     *         scheduled for execution
605       */
606      public <T> T invoke(ForkJoinTask<T> task) {
607          doSubmit(task);
# Line 606 | Line 612 | public class ForkJoinPool extends Abstra
612       * Arranges for (asynchronous) execution of the given task.
613       *
614       * @param task the task
615 <     * @throws NullPointerException if task is null
616 <     * @throws RejectedExecutionException if pool is shut down
615 >     * @throws NullPointerException if the task is null
616 >     * @throws RejectedExecutionException if the task cannot be
617 >     *         scheduled for execution
618       */
619      public void execute(ForkJoinTask<?> task) {
620          doSubmit(task);
# Line 615 | Line 622 | public class ForkJoinPool extends Abstra
622  
623      // AbstractExecutorService methods
624  
625 +    /**
626 +     * @throws NullPointerException if the task is null
627 +     * @throws RejectedExecutionException if the task cannot be
628 +     *         scheduled for execution
629 +     */
630      public void execute(Runnable task) {
631          ForkJoinTask<?> job;
632          if (task instanceof ForkJoinTask<?>) // avoid re-wrap
# Line 624 | Line 636 | public class ForkJoinPool extends Abstra
636          doSubmit(job);
637      }
638  
639 +    /**
640 +     * @throws NullPointerException if the task is null
641 +     * @throws RejectedExecutionException if the task cannot be
642 +     *         scheduled for execution
643 +     */
644      public <T> ForkJoinTask<T> submit(Callable<T> task) {
645          ForkJoinTask<T> job = ForkJoinTask.adapt(task);
646          doSubmit(job);
647          return job;
648      }
649  
650 +    /**
651 +     * @throws NullPointerException if the task is null
652 +     * @throws RejectedExecutionException if the task cannot be
653 +     *         scheduled for execution
654 +     */
655      public <T> ForkJoinTask<T> submit(Runnable task, T result) {
656          ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
657          doSubmit(job);
658          return job;
659      }
660  
661 +    /**
662 +     * @throws NullPointerException if the task is null
663 +     * @throws RejectedExecutionException if the task cannot be
664 +     *         scheduled for execution
665 +     */
666      public ForkJoinTask<?> submit(Runnable task) {
667          ForkJoinTask<?> job;
668          if (task instanceof ForkJoinTask<?>) // avoid re-wrap
# Line 651 | Line 678 | public class ForkJoinPool extends Abstra
678       *
679       * @param task the task to submit
680       * @return the task
681 +     * @throws NullPointerException if the task is null
682       * @throws RejectedExecutionException if the task cannot be
683       *         scheduled for execution
656     * @throws NullPointerException if the task is null
684       */
685      public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
686          doSubmit(task);
# Line 661 | Line 688 | public class ForkJoinPool extends Abstra
688      }
689  
690  
691 +    /**
692 +     * @throws NullPointerException       {@inheritDoc}
693 +     * @throws RejectedExecutionException {@inheritDoc}
694 +     */
695      public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
696          ArrayList<ForkJoinTask<T>> forkJoinTasks =
697              new ArrayList<ForkJoinTask<T>>(tasks.size());
# Line 770 | Line 801 | public class ForkJoinPool extends Abstra
801              if (isProcessingTasks()) {
802                  int p = this.parallelism;
803                  this.parallelism = parallelism;
804 <                if (parallelism > p)
805 <                    createAndStartAddedWorkers();
806 <                else
807 <                    trimSpares();
804 >                if (workers != null) {
805 >                    if (parallelism > p)
806 >                        createAndStartAddedWorkers();
807 >                    else
808 >                        trimSpares();
809 >                }
810              }
811          } finally {
812              lock.unlock();
# Line 804 | Line 837 | public class ForkJoinPool extends Abstra
837  
838      /**
839       * Returns the maximum number of threads allowed to exist in the
840 <     * pool.  Unless set using {@link #setMaximumPoolSize}, the
840 >     * pool. Unless set using {@link #setMaximumPoolSize}, the
841       * maximum is an implementation-defined value designed only to
842       * prevent runaway growth.
843       *
# Line 816 | Line 849 | public class ForkJoinPool extends Abstra
849  
850      /**
851       * Sets the maximum number of threads allowed to exist in the
852 <     * pool.  Setting this value has no effect on current pool
853 <     * size. It controls construction of new threads.
852 >     * pool. The given value should normally be greater than or equal
853 >     * to the {@link #getParallelism parallelism} level. Setting this
854 >     * value has no effect on current pool size. It controls
855 >     * construction of new threads.
856       *
857       * @throws IllegalArgumentException if negative or greater than
858       * internal implementation limit
# Line 1075 | Line 1110 | public class ForkJoinPool extends Abstra
1110      }
1111  
1112      private static String runStateToString(int rs) {
1113 <        switch(rs) {
1113 >        switch (rs) {
1114          case RUNNING: return "Running";
1115          case SHUTDOWN: return "Shutting down";
1116          case TERMINATING: return "Terminating";
# Line 1345 | Line 1380 | public class ForkJoinPool extends Abstra
1380          }
1381      }
1382  
1348
1383      /*
1384       * Nodes for event barrier to manage idle threads.  Queue nodes
1385       * are basic Treiber stack nodes, also used for spare stack.
# Line 1369 | Line 1403 | public class ForkJoinPool extends Abstra
1403       * handling: Method signalWork returns without advancing count if
1404       * the queue appears to be empty.  This would ordinarily result in
1405       * races causing some queued waiters not to be woken up. To avoid
1406 <     * this, the first worker enqueued in method sync (see
1407 <     * syncIsReleasable) rescans for tasks after being enqueued, and
1408 <     * helps signal if any are found. This works well because the
1409 <     * worker has nothing better to do, and so might as well help
1410 <     * alleviate the overhead and contention on the threads actually
1411 <     * doing work.  Also, since event counts increments on task
1412 <     * availability exist to maintain liveness (rather than to force
1413 <     * refreshes etc), it is OK for callers to exit early if
1380 <     * contending with another signaller.
1406 >     * this, the first worker enqueued in method sync rescans for
1407 >     * tasks after being enqueued, and helps signal if any are
1408 >     * found. This works well because the worker has nothing better to
1409 >     * do, and so might as well help alleviate the overhead and
1410 >     * contention on the threads actually doing work.  Also, since
1411 >     * event counts increments on task availability exist to maintain
1412 >     * liveness (rather than to force refreshes etc), it is OK for
1413 >     * callers to exit early if contending with another signaller.
1414       */
1415      static final class WaitQueueNode {
1416          WaitQueueNode next; // only written before enqueued
# Line 1390 | Line 1423 | public class ForkJoinPool extends Abstra
1423          }
1424  
1425          /**
1426 <         * Wakes up waiter, returning false if known to already
1426 >         * Wakes up waiter, returning false if known to already be awake
1427           */
1428          boolean signal() {
1429              ForkJoinWorkerThread t = thread;
# Line 1400 | Line 1433 | public class ForkJoinPool extends Abstra
1433              LockSupport.unpark(t);
1434              return true;
1435          }
1403
1404        /**
1405         * Awaits release on sync.
1406         */
1407        void awaitSyncRelease(ForkJoinPool p) {
1408            while (thread != null && !p.syncIsReleasable(this))
1409                LockSupport.park(this);
1410        }
1411
1412        /**
1413         * Awaits resumption as spare.
1414         */
1415        void awaitSpareRelease() {
1416            while (thread != null) {
1417                if (!Thread.interrupted())
1418                    LockSupport.park(this);
1419            }
1420        }
1436      }
1437  
1438      /**
1439       * Ensures that no thread is waiting for count to advance from the
1440       * current value of eventCount read on entry to this method, by
1441       * releasing waiting threads if necessary.
1427     *
1428     * @return the count
1442       */
1443 <    final long ensureSync() {
1443 >    final void ensureSync() {
1444          long c = eventCount;
1445          WaitQueueNode q;
1446          while ((q = syncStack) != null && q.count < c) {
# Line 1438 | Line 1451 | public class ForkJoinPool extends Abstra
1451                  break;
1452              }
1453          }
1441        return c;
1454      }
1455  
1456      /**
# Line 1453 | Line 1465 | public class ForkJoinPool extends Abstra
1465      /**
1466       * Signals threads waiting to poll a task. Because method sync
1467       * rechecks availability, it is OK to only proceed if queue
1468 <     * appears to be non-empty, and OK to skip under contention to
1469 <     * increment count (since some other thread succeeded).
1468 >     * appears to be non-empty, and OK if CAS to increment count
1469 >     * fails (since some other thread succeeded).
1470       */
1471      final void signalWork() {
1472 <        long c;
1473 <        WaitQueueNode q;
1474 <        if (syncStack != null &&
1475 <            casEventCount(c = eventCount, c+1) &&
1476 <            (((q = syncStack) != null && q.count <= c) &&
1477 <             (!casBarrierStack(q, q.next) || !q.signal())))
1478 <            ensureSync();
1472 >        if (syncStack != null) {
1473 >            long c;
1474 >            casEventCount(c = eventCount, c+1);
1475 >            WaitQueueNode q = syncStack;
1476 >            if (q != null && q.count <= c &&
1477 >                (!casBarrierStack(q, q.next) || !q.signal()))
1478 >                ensureSync();
1479 >        }
1480      }
1481  
1482      /**
# Line 1476 | Line 1489 | public class ForkJoinPool extends Abstra
1489      final void sync(ForkJoinWorkerThread w) {
1490          updateStealCount(w); // Transfer w's count while it is idle
1491  
1492 <        while (!w.isShutdown() && isProcessingTasks() && !suspendIfSpare(w)) {
1492 >        if (!w.isShutdown() && isProcessingTasks() && !suspendIfSpare(w)) {
1493              long prev = w.lastEventCount;
1494              WaitQueueNode node = null;
1495              WaitQueueNode h;
1496 <            while (eventCount == prev &&
1496 >            long ec;
1497 >            while ((ec = eventCount) == prev &&
1498                     ((h = syncStack) == null || h.count == prev)) {
1499                  if (node == null)
1500                      node = new WaitQueueNode(prev, w);
1501                  if (casBarrierStack(node.next = h, node)) {
1502 <                    node.awaitSyncRelease(this);
1502 >                    if (!Thread.interrupted() &&
1503 >                        node.thread != null &&
1504 >                        eventCount == prev &&
1505 >                        (h != null || // cover signalWork race
1506 >                         (!ForkJoinWorkerThread.hasQueuedTasks(workers) &&
1507 >                          eventCount == prev)))
1508 >                        LockSupport.park(this);
1509 >                    ec = eventCount;
1510 >                    if (node.thread != null) {
1511 >                        node.thread = null;
1512 >                        if (ec == prev)
1513 >                            casEventCount(prev, prev + 1); // help signal
1514 >                    }
1515                      break;
1516                  }
1517              }
1518 <            long ec = ensureSync();
1519 <            if (ec != prev) {
1494 <                w.lastEventCount = ec;
1495 <                break;
1496 <            }
1497 <        }
1498 <    }
1499 <
1500 <    /**
1501 <     * Returns {@code true} if worker waiting on sync can proceed:
1502 <     *  - on signal (thread == null)
1503 <     *  - on event count advance (winning race to notify vs signaller)
1504 <     *  - on interrupt
1505 <     *  - if the first queued node, we find work available
1506 <     * If node was not signalled and event count not advanced on exit,
1507 <     * then we also help advance event count.
1508 <     *
1509 <     * @return {@code true} if node can be released
1510 <     */
1511 <    final boolean syncIsReleasable(WaitQueueNode node) {
1512 <        long prev = node.count;
1513 <        if (!Thread.interrupted() && node.thread != null &&
1514 <            (node.next != null ||
1515 <             !ForkJoinWorkerThread.hasQueuedTasks(workers)) &&
1516 <            eventCount == prev)
1517 <            return false;
1518 <        if (node.thread != null) {
1519 <            node.thread = null;
1520 <            long ec = eventCount;
1521 <            if (prev <= ec) // help signal
1522 <                casEventCount(ec, ec+1);
1518 >            w.lastEventCount = ec;
1519 >            ensureSync();
1520          }
1524        return true;
1521      }
1522  
1523      /**
# Line 1530 | Line 1526 | public class ForkJoinPool extends Abstra
1526       */
1527      final boolean hasNewSyncEvent(ForkJoinWorkerThread w) {
1528          long lc = w.lastEventCount;
1529 <        long ec = ensureSync();
1530 <        if (ec == lc)
1531 <            return false;
1532 <        w.lastEventCount = ec;
1533 <        return true;
1529 >        long ec = eventCount;
1530 >        if (lc != ec)
1531 >            w.lastEventCount = ec;
1532 >        ensureSync();
1533 >        return lc != ec || lc != eventCount;
1534      }
1535  
1536      //  Parallelism maintenance
# Line 1576 | Line 1572 | public class ForkJoinPool extends Abstra
1572          while (spareStack == null || !tryResumeSpare(dec)) {
1573              int counts = workerCounts;
1574              if (dec || (dec = casWorkerCounts(counts, --counts))) {
1579                // CAS cheat
1575                  if (!needSpare(counts, maintainParallelism))
1576                      break;
1577                  if (joinMe.status < 0)
# Line 1701 | Line 1696 | public class ForkJoinPool extends Abstra
1696       */
1697      private boolean suspendIfSpare(ForkJoinWorkerThread w) {
1698          WaitQueueNode node = null;
1699 <        int s;
1700 <        while (parallelism < runningCountOf(s = workerCounts)) {
1699 >        for (;;) {
1700 >            int p = parallelism;
1701 >            int s = workerCounts;
1702 >            int r = runningCountOf(s);
1703 >            int t = totalCountOf(s);
1704 >            // use t as bound if r transiently out of sync
1705 >            if (t <= p || r <= p)
1706 >                return false; // not a spare
1707              if (node == null)
1708                  node = new WaitQueueNode(0, w);
1709 <            if (casWorkerCounts(s, s-1)) { // representation-dependent
1710 <                // push onto stack
1710 <                do {} while (!casSpareStack(node.next = spareStack, node));
1711 <                // block until released by resumeSpare
1712 <                node.awaitSpareRelease();
1713 <                return true;
1714 <            }
1709 >            if (casWorkerCounts(s, workerCountsFor(t, r - 1)))
1710 >                break;
1711          }
1712 <        return false;
1712 >        // push onto stack
1713 >        do {} while (!casSpareStack(node.next = spareStack, node));
1714 >        // block until released by resumeSpare
1715 >        while (!Thread.interrupted() && node.thread != null)
1716 >            LockSupport.park(this);
1717 >        return true;
1718      }
1719  
1720      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines