ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.45 by dl, Tue Aug 4 12:41:27 2009 UTC vs.
Revision 1.52 by dl, Sat Dec 5 11:39:03 2009 UTC

# Line 23 | Line 23 | import java.util.concurrent.atomic.Atomi
23   * An {@link ExecutorService} for running {@link ForkJoinTask}s.
24   * A {@code ForkJoinPool} provides the entry point for submissions
25   * from non-{@code ForkJoinTask}s, as well as management and
26 < * monitoring operations.  
26 > * monitoring operations.
27   *
28   * <p>A {@code ForkJoinPool} differs from other kinds of {@link
29   * ExecutorService} mainly by virtue of employing
# Line 82 | Line 82 | import java.util.concurrent.atomic.Atomi
82   *
83   * <p><b>Implementation notes</b>: This implementation restricts the
84   * maximum number of running threads to 32767. Attempts to create
85 < * pools with greater than the maximum result in
85 > * pools with greater than the maximum number result in
86   * {@code IllegalArgumentException}.
87   *
88 + * <p>This implementation rejects submitted tasks (that is, by throwing
89 + * {@link RejectedExecutionException}) only when the pool is shut down.
90 + *
91   * @since 1.7
92   * @author Doug Lea
93   */
# Line 112 | Line 115 | public class ForkJoinPool extends Abstra
115           * Returns a new worker thread operating in the given pool.
116           *
117           * @param pool the pool this thread works in
118 <         * @throws NullPointerException if pool is null
118 >         * @throws NullPointerException if the pool is null
119           */
120          public ForkJoinWorkerThread newThread(ForkJoinPool pool);
121      }
# Line 384 | Line 387 | public class ForkJoinPool extends Abstra
387       *
388       * @param parallelism the parallelism level
389       * @throws IllegalArgumentException if parallelism less than or
390 <     * equal to zero
390 >     *         equal to zero, or greater than implementation limit
391       * @throws SecurityException if a security manager exists and
392       *         the caller is not permitted to modify threads
393       *         because it does not hold {@link
# Line 400 | Line 403 | public class ForkJoinPool extends Abstra
403       * thread factory.
404       *
405       * @param factory the factory for creating new threads
406 <     * @throws NullPointerException if factory is null
406 >     * @throws NullPointerException if the factory is null
407       * @throws SecurityException if a security manager exists and
408       *         the caller is not permitted to modify threads
409       *         because it does not hold {@link
# Line 417 | Line 420 | public class ForkJoinPool extends Abstra
420       * @param parallelism the parallelism level
421       * @param factory the factory for creating new threads
422       * @throws IllegalArgumentException if parallelism less than or
423 <     * equal to zero, or greater than implementation limit
424 <     * @throws NullPointerException if factory is null
423 >     *         equal to zero, or greater than implementation limit
424 >     * @throws NullPointerException if the factory is null
425       * @throws SecurityException if a security manager exists and
426       *         the caller is not permitted to modify threads
427       *         because it does not hold {@link
# Line 523 | Line 526 | public class ForkJoinPool extends Abstra
526                  ws = workers;
527                  if (ws == null) {
528                      int ps = parallelism;
529 +                    updateWorkerCount(ps);
530                      ws = ensureWorkerArrayCapacity(ps);
531                      for (int i = 0; i < ps; ++i) {
532                          ForkJoinWorkerThread w = createWorker(i);
533                          if (w != null) {
534                              ws[i] = w;
535                              w.start();
532                            updateWorkerCount(1);
536                          }
537 +                        else
538 +                            updateWorkerCount(-1);
539                      }
540                  }
541              } finally {
# Line 594 | Line 599 | public class ForkJoinPool extends Abstra
599       *
600       * @param task the task
601       * @return the task's result
602 <     * @throws NullPointerException if task is null
603 <     * @throws RejectedExecutionException if pool is shut down
602 >     * @throws NullPointerException if the task is null
603 >     * @throws RejectedExecutionException if the task cannot be
604 >     *         scheduled for execution
605       */
606      public <T> T invoke(ForkJoinTask<T> task) {
607          doSubmit(task);
# Line 606 | Line 612 | public class ForkJoinPool extends Abstra
612       * Arranges for (asynchronous) execution of the given task.
613       *
614       * @param task the task
615 <     * @throws NullPointerException if task is null
616 <     * @throws RejectedExecutionException if pool is shut down
615 >     * @throws NullPointerException if the task is null
616 >     * @throws RejectedExecutionException if the task cannot be
617 >     *         scheduled for execution
618       */
619      public void execute(ForkJoinTask<?> task) {
620          doSubmit(task);
# Line 615 | Line 622 | public class ForkJoinPool extends Abstra
622  
623      // AbstractExecutorService methods
624  
625 +    /**
626 +     * @throws NullPointerException if the task is null
627 +     * @throws RejectedExecutionException if the task cannot be
628 +     *         scheduled for execution
629 +     */
630      public void execute(Runnable task) {
631          ForkJoinTask<?> job;
632          if (task instanceof ForkJoinTask<?>) // avoid re-wrap
# Line 624 | Line 636 | public class ForkJoinPool extends Abstra
636          doSubmit(job);
637      }
638  
639 +    /**
640 +     * @throws NullPointerException if the task is null
641 +     * @throws RejectedExecutionException if the task cannot be
642 +     *         scheduled for execution
643 +     */
644      public <T> ForkJoinTask<T> submit(Callable<T> task) {
645          ForkJoinTask<T> job = ForkJoinTask.adapt(task);
646          doSubmit(job);
647          return job;
648      }
649  
650 +    /**
651 +     * @throws NullPointerException if the task is null
652 +     * @throws RejectedExecutionException if the task cannot be
653 +     *         scheduled for execution
654 +     */
655      public <T> ForkJoinTask<T> submit(Runnable task, T result) {
656          ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
657          doSubmit(job);
658          return job;
659      }
660  
661 +    /**
662 +     * @throws NullPointerException if the task is null
663 +     * @throws RejectedExecutionException if the task cannot be
664 +     *         scheduled for execution
665 +     */
666      public ForkJoinTask<?> submit(Runnable task) {
667          ForkJoinTask<?> job;
668          if (task instanceof ForkJoinTask<?>) // avoid re-wrap
# Line 651 | Line 678 | public class ForkJoinPool extends Abstra
678       *
679       * @param task the task to submit
680       * @return the task
681 +     * @throws NullPointerException if the task is null
682       * @throws RejectedExecutionException if the task cannot be
683       *         scheduled for execution
656     * @throws NullPointerException if the task is null
684       */
685      public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
686          doSubmit(task);
# Line 661 | Line 688 | public class ForkJoinPool extends Abstra
688      }
689  
690  
691 +    /**
692 +     * @throws NullPointerException       {@inheritDoc}
693 +     * @throws RejectedExecutionException {@inheritDoc}
694 +     */
695      public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
696          ArrayList<ForkJoinTask<T>> forkJoinTasks =
697              new ArrayList<ForkJoinTask<T>>(tasks.size());
# Line 770 | Line 801 | public class ForkJoinPool extends Abstra
801              if (isProcessingTasks()) {
802                  int p = this.parallelism;
803                  this.parallelism = parallelism;
804 <                if (parallelism > p)
805 <                    createAndStartAddedWorkers();
806 <                else
807 <                    trimSpares();
804 >                if (workers != null) {
805 >                    if (parallelism > p)
806 >                        createAndStartAddedWorkers();
807 >                    else
808 >                        trimSpares();
809 >                }
810              }
811          } finally {
812              lock.unlock();
# Line 1077 | Line 1110 | public class ForkJoinPool extends Abstra
1110      }
1111  
1112      private static String runStateToString(int rs) {
1113 <        switch(rs) {
1113 >        switch (rs) {
1114          case RUNNING: return "Running";
1115          case SHUTDOWN: return "Shutting down";
1116          case TERMINATING: return "Terminating";
# Line 1347 | Line 1380 | public class ForkJoinPool extends Abstra
1380          }
1381      }
1382  
1350
1383      /*
1384       * Nodes for event barrier to manage idle threads.  Queue nodes
1385       * are basic Treiber stack nodes, also used for spare stack.
# Line 1371 | Line 1403 | public class ForkJoinPool extends Abstra
1403       * handling: Method signalWork returns without advancing count if
1404       * the queue appears to be empty.  This would ordinarily result in
1405       * races causing some queued waiters not to be woken up. To avoid
1406 <     * this, the first worker enqueued in method sync (see
1407 <     * syncIsReleasable) rescans for tasks after being enqueued, and
1408 <     * helps signal if any are found. This works well because the
1409 <     * worker has nothing better to do, and so might as well help
1410 <     * alleviate the overhead and contention on the threads actually
1411 <     * doing work.  Also, since event counts increments on task
1412 <     * availability exist to maintain liveness (rather than to force
1413 <     * refreshes etc), it is OK for callers to exit early if
1382 <     * contending with another signaller.
1406 >     * this, the first worker enqueued in method sync rescans for
1407 >     * tasks after being enqueued, and helps signal if any are
1408 >     * found. This works well because the worker has nothing better to
1409 >     * do, and so might as well help alleviate the overhead and
1410 >     * contention on the threads actually doing work.  Also, since
1411 >     * event counts increments on task availability exist to maintain
1412 >     * liveness (rather than to force refreshes etc), it is OK for
1413 >     * callers to exit early if contending with another signaller.
1414       */
1415      static final class WaitQueueNode {
1416          WaitQueueNode next; // only written before enqueued
# Line 1392 | Line 1423 | public class ForkJoinPool extends Abstra
1423          }
1424  
1425          /**
1426 <         * Wakes up waiter, returning false if known to already
1426 >         * Wakes up waiter, also clearing thread field
1427           */
1428 <        boolean signal() {
1428 >        void signal() {
1429              ForkJoinWorkerThread t = thread;
1430 <            if (t == null)
1431 <                return false;
1432 <            thread = null;
1402 <            LockSupport.unpark(t);
1403 <            return true;
1404 <        }
1405 <
1406 <        /**
1407 <         * Awaits release on sync.
1408 <         */
1409 <        void awaitSyncRelease(ForkJoinPool p) {
1410 <            while (thread != null && !p.syncIsReleasable(this))
1411 <                LockSupport.park(this);
1412 <        }
1413 <
1414 <        /**
1415 <         * Awaits resumption as spare.
1416 <         */
1417 <        void awaitSpareRelease() {
1418 <            while (thread != null) {
1419 <                if (!Thread.interrupted())
1420 <                    LockSupport.park(this);
1430 >            if (t != null) {
1431 >                thread = null;
1432 >                LockSupport.unpark(t);
1433              }
1434          }
1435      }
# Line 1426 | Line 1438 | public class ForkJoinPool extends Abstra
1438       * Ensures that no thread is waiting for count to advance from the
1439       * current value of eventCount read on entry to this method, by
1440       * releasing waiting threads if necessary.
1429     *
1430     * @return the count
1441       */
1442 <    final long ensureSync() {
1442 >    final void ensureSync() {
1443          long c = eventCount;
1444          WaitQueueNode q;
1445          while ((q = syncStack) != null && q.count < c) {
# Line 1440 | Line 1450 | public class ForkJoinPool extends Abstra
1450                  break;
1451              }
1452          }
1443        return c;
1453      }
1454  
1455      /**
# Line 1455 | Line 1464 | public class ForkJoinPool extends Abstra
1464      /**
1465       * Signals threads waiting to poll a task. Because method sync
1466       * rechecks availability, it is OK to only proceed if queue
1467 <     * appears to be non-empty, and OK to skip under contention to
1468 <     * increment count (since some other thread succeeded).
1467 >     * appears to be non-empty, and OK if CAS to increment count
1468 >     * fails (since some other thread succeeded).
1469       */
1470      final void signalWork() {
1471 <        long c;
1472 <        WaitQueueNode q;
1473 <        if (syncStack != null &&
1474 <            casEventCount(c = eventCount, c+1) &&
1475 <            (((q = syncStack) != null && q.count <= c) &&
1476 <             (!casBarrierStack(q, q.next) || !q.signal())))
1477 <            ensureSync();
1471 >        if (syncStack != null) {
1472 >            long c = eventCount;
1473 >            casEventCount(c, c+1);
1474 >            WaitQueueNode q = syncStack;
1475 >            if (q != null && q.count <= c) {
1476 >                if (casBarrierStack(q, q.next))
1477 >                    q.signal();
1478 >                else
1479 >                    ensureSync(); // awaken all on contention
1480 >            }
1481 >        }
1482      }
1483  
1484      /**
1485 <     * Waits until event count advances from last value held by
1486 <     * caller, or if excess threads, caller is resumed as spare, or
1485 >     * Possibly blocks until event count advances from last value held
1486 >     * by caller, or if excess threads, caller is resumed as spare, or
1487       * caller or pool is terminating. Updates caller's event on exit.
1488       *
1489       * @param w the calling worker thread
# Line 1478 | Line 1491 | public class ForkJoinPool extends Abstra
1491      final void sync(ForkJoinWorkerThread w) {
1492          updateStealCount(w); // Transfer w's count while it is idle
1493  
1494 <        while (!w.isShutdown() && isProcessingTasks() && !suspendIfSpare(w)) {
1494 >        if (!w.isShutdown() && isProcessingTasks() && !suspendIfSpare(w)) {
1495              long prev = w.lastEventCount;
1496              WaitQueueNode node = null;
1497              WaitQueueNode h;
1498 <            while (eventCount == prev &&
1498 >            long c;
1499 >            while ((c = eventCount) == prev &&
1500                     ((h = syncStack) == null || h.count == prev)) {
1501                  if (node == null)
1502                      node = new WaitQueueNode(prev, w);
1503                  if (casBarrierStack(node.next = h, node)) {
1504 <                    node.awaitSyncRelease(this);
1504 >                    if (!Thread.interrupted() &&
1505 >                        node.thread != null &&
1506 >                        eventCount == prev &&
1507 >                        (h != null || // cover signalWork race
1508 >                         (!ForkJoinWorkerThread.hasQueuedTasks(workers) &&
1509 >                          eventCount == prev)))
1510 >                        LockSupport.park(this);
1511 >                    c = eventCount;
1512 >                    if (node.thread != null) { // help signal if not unparked
1513 >                        node.thread = null;
1514 >                        if (c == prev)
1515 >                            casEventCount(prev, prev + 1);
1516 >                    }
1517                      break;
1518                  }
1519              }
1520 <            long ec = ensureSync();
1521 <            if (ec != prev) {
1496 <                w.lastEventCount = ec;
1497 <                break;
1498 <            }
1499 <        }
1500 <    }
1501 <
1502 <    /**
1503 <     * Returns {@code true} if worker waiting on sync can proceed:
1504 <     *  - on signal (thread == null)
1505 <     *  - on event count advance (winning race to notify vs signaller)
1506 <     *  - on interrupt
1507 <     *  - if the first queued node, we find work available
1508 <     * If node was not signalled and event count not advanced on exit,
1509 <     * then we also help advance event count.
1510 <     *
1511 <     * @return {@code true} if node can be released
1512 <     */
1513 <    final boolean syncIsReleasable(WaitQueueNode node) {
1514 <        long prev = node.count;
1515 <        if (!Thread.interrupted() && node.thread != null &&
1516 <            (node.next != null ||
1517 <             !ForkJoinWorkerThread.hasQueuedTasks(workers)) &&
1518 <            eventCount == prev)
1519 <            return false;
1520 <        if (node.thread != null) {
1521 <            node.thread = null;
1522 <            long ec = eventCount;
1523 <            if (prev <= ec) // help signal
1524 <                casEventCount(ec, ec+1);
1520 >            w.lastEventCount = c;
1521 >            ensureSync();
1522          }
1526        return true;
1523      }
1524  
1525      /**
# Line 1531 | Line 1527 | public class ForkJoinPool extends Abstra
1527       * call to sync or this method, if so, updating caller's count.
1528       */
1529      final boolean hasNewSyncEvent(ForkJoinWorkerThread w) {
1530 <        long lc = w.lastEventCount;
1531 <        long ec = ensureSync();
1532 <        if (ec == lc)
1533 <            return false;
1534 <        w.lastEventCount = ec;
1535 <        return true;
1530 >        long wc = w.lastEventCount;
1531 >        long c = eventCount;
1532 >        if (wc != c)
1533 >            w.lastEventCount = c;
1534 >        ensureSync();
1535 >        return wc != c || wc != eventCount;
1536      }
1537  
1538      //  Parallelism maintenance
# Line 1578 | Line 1574 | public class ForkJoinPool extends Abstra
1574          while (spareStack == null || !tryResumeSpare(dec)) {
1575              int counts = workerCounts;
1576              if (dec || (dec = casWorkerCounts(counts, --counts))) {
1581                // CAS cheat
1577                  if (!needSpare(counts, maintainParallelism))
1578                      break;
1579                  if (joinMe.status < 0)
# Line 1703 | Line 1698 | public class ForkJoinPool extends Abstra
1698       */
1699      private boolean suspendIfSpare(ForkJoinWorkerThread w) {
1700          WaitQueueNode node = null;
1701 <        int s;
1702 <        while (parallelism < runningCountOf(s = workerCounts)) {
1701 >        for (;;) {
1702 >            int s = workerCounts;
1703 >            int rc = runningCountOf(s);
1704 >            int tc = totalCountOf(s);
1705 >            int ps = parallelism;
1706 >            // use tc as bound if rc transiently out of sync
1707 >            if (tc <= ps || rc <= ps)
1708 >                return false; // not a spare
1709              if (node == null)
1710                  node = new WaitQueueNode(0, w);
1711 <            if (casWorkerCounts(s, s-1)) { // representation-dependent
1712 <                // push onto stack
1712 <                do {} while (!casSpareStack(node.next = spareStack, node));
1713 <                // block until released by resumeSpare
1714 <                node.awaitSpareRelease();
1715 <                return true;
1716 <            }
1711 >            if (casWorkerCounts(s, workerCountsFor(tc, rc - 1)))
1712 >                break;
1713          }
1714 <        return false;
1714 >        // push onto stack
1715 >        do {} while (!casSpareStack(node.next = spareStack, node));
1716 >        // block until released by resumeSpare
1717 >        while (!Thread.interrupted() && node.thread != null)
1718 >            LockSupport.park(this);
1719 >        return true;
1720      }
1721  
1722      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines