diff --git a/docs/commands/mcp-server.md b/docs/commands/mcp-server.md new file mode 100644 index 00000000..972c027d --- /dev/null +++ b/docs/commands/mcp-server.md @@ -0,0 +1,3 @@ +::: mkdocs-click + :module: rsconnect.main + :command: mcp_server diff --git a/mkdocs.yml b/mkdocs.yml index 96dd18fd..541da548 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -50,6 +50,7 @@ nav: - system: commands/system.md - version: commands/version.md - write-manifest: commands/write-manifest.md + - mcp-server: commands/mcp-server.md theme: diff --git a/pyproject.toml b/pyproject.toml index 7768298e..b163ffcd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,7 +13,7 @@ dependencies = [ "semver>=2.0.0,<4.0.0", "pyjwt>=2.4.0", "click>=8.0.0", - "toml>=0.10; python_version < '3.11'" + "toml>=0.10; python_version < '3.11'", ] dynamic = ["version"] @@ -37,8 +37,10 @@ test = [ "setuptools_scm[toml]>=3.4", "twine", "types-Flask", + "fastmcp==2.12.4; python_version >= '3.10'", ] snowflake = ["snowflake-cli"] +mcp = ["fastmcp==2.12.4; python_version >= '3.10'"] docs = [ "mkdocs-material", "mkdocs-click", diff --git a/rsconnect/main.py b/rsconnect/main.py index 357d2b9a..9f96b798 100644 --- a/rsconnect/main.py +++ b/rsconnect/main.py @@ -8,7 +8,17 @@ import traceback from functools import wraps from os.path import abspath, dirname, exists, isdir, join -from typing import Callable, ItemsView, Literal, Optional, Sequence, TypeVar, cast +from typing import ( + Any, + Callable, + Dict, + ItemsView, + Literal, + Optional, + Sequence, + TypeVar, + cast, +) import click @@ -392,6 +402,123 @@ def version(): click.echo(VERSION) +@cli.command( + short_help="Start the Model Context Protocol (MCP) server.", + help=( + "Start a Model Context Protocol (MCP) server to expose rsconnect-python capabilities to AI applications " + "through a standardized protocol interface." + "\n\n" + "The MCP server exposes a single tool:\n\n" + "`get_command_info`:\n\n" + " - Provides detailed parameter schemas for any rsconnect command. " + "This provides context for an LLM to understand how to construct valid rsconnect " + "commands dynamically without hard-coded knowledge of the CLI." + "\n\n" + "System Requirements:\n\n" + " - Python>=3.10\n" + " - fastmcp" + "\n\n" + "The server runs in stdio mode, communicating via standard input/output streams." + "\n\n" + "Usage with popular LLM clients:\n\n" + " - [codex](https://developers.openai.com/codex/mcp/#configuration---cli)\n" + " - [claude code](https://docs.claude.com/en/docs/claude-code/mcp#option-3%3A-add-a-local-stdio-server)\n" + " - [VS Code](https://code.visualstudio.com/docs/copilot/customization/mcp-servers#_add-an-mcp-server)\n\n" + "The command `uvx --from rsconnect-python rsconnect mcp-server` is a simple option for use in each of " + "the above options." + ), +) +def mcp_server(): + try: + from fastmcp import FastMCP + from fastmcp.exceptions import ToolError + except ImportError: + raise RSConnectException( + "The fastmcp package is required for MCP server functionality. " + "Install it with: pip install rsconnect-python[mcp]" + ) + + mcp = FastMCP("Connect MCP") + + # Discover all commands at startup + from .mcp_deploy_context import discover_all_commands + + all_commands_info = discover_all_commands(cli) + + def get_command_info( + command_path: str, + ) -> Dict[str, Any]: + try: + # split the command path into parts + parts = command_path.strip().split() + if not parts: + available_commands = list(all_commands_info["commands"].keys()) + return {"error": "Command path cannot be empty", "available_commands": available_commands} + + current_info = all_commands_info + current_path = [] + + for _, part in enumerate(parts): + # error if we find unexpected additional subcommands + if "commands" not in current_info: + return { + "error": f"'{' '.join(current_path)}' is not a command group. Unexpected part: '{part}'", + "type": "command", + "command_path": f"rsconnect {' '.join(current_path)}", + } + + # try to return useful messaging for invalid subcommands + if part not in current_info["commands"]: + available = list(current_info["commands"].keys()) + path_str = " ".join(current_path) if current_path else "top level" + return {"error": f"Command '{part}' not found in {path_str}", "available_commands": available} + + current_info = current_info["commands"][part] + current_path.append(part) + + # still return something useful if additional subcommands are needed + if "commands" in current_info: + return { + "type": "command_group", + "name": current_info.get("name", parts[-1]), + "description": current_info.get("description"), + "available_subcommands": list(current_info["commands"].keys()), + "message": f"The '{' '.join(parts)}' command requires a subcommand.", + } + else: + return { + "type": "command", + "command_path": f"rsconnect {' '.join(parts)}", + "name": current_info.get("name", parts[-1]), + "description": current_info.get("description"), + "parameters": current_info.get("parameters", []), + "shell": "bash", + } + except Exception as e: + raise ToolError(f"Failed to retrieve command info: {str(e)}") + + # dynamically build docstring with top level commands + # note: excluding mcp-server here + available_commands = sorted(cmd for cmd in all_commands_info["commands"].keys() if cmd != "mcp-server") + commands_list = "\n ".join(f"- {cmd}" for cmd in available_commands) + + get_command_info.__doc__ = f"""Get the parameter schema for any rsconnect command. + + Returns information about the parameters needed to construct an rsconnect command + that can be executed in a bash shell. Supports nested command groups of arbitrary depth. + + Available top-level commands: + {commands_list} + + :param command_path: space-separated command path (e.g., 'version', 'deploy notebook', 'content build add') + :return: dictionary with command parameter schema and execution metadata + """ + + mcp.tool(get_command_info) + + mcp.run() + + def _test_server_and_api(server: str, api_key: str, insecure: bool, ca_cert: str | None): """ Test the specified server information to make sure it works. If so, a @@ -433,7 +560,7 @@ def _test_spcs_creds(server: SPCSConnectServer): @cli.command( short_help="Create an initial admin user to bootstrap a Connect instance.", - help="Creates an initial admin user to bootstrap a Connect instance. Returns the provisionend API key.", + help="Creates an initial admin user to bootstrap a Connect instance. Returns the provisioned API key.", no_args_is_help=True, ) @click.option( diff --git a/rsconnect/mcp_deploy_context.py b/rsconnect/mcp_deploy_context.py new file mode 100644 index 00000000..b47639bd --- /dev/null +++ b/rsconnect/mcp_deploy_context.py @@ -0,0 +1,115 @@ +""" +Programmatically discover all parameters for rsconnect commands. +This helps MCP tools understand how to use the cli. +""" + +import json +from typing import Any, Dict + +import click + + +def extract_parameter_info(param: click.Parameter) -> Dict[str, Any]: + """Extract detailed information from a Click parameter.""" + info: Dict[str, Any] = {} + + if isinstance(param, click.Option) and param.opts: + # Use the longest option name (usually the full form without dashes) + mcp_arg_name = max(param.opts, key=len).lstrip("-").replace("-", "_") + info["name"] = mcp_arg_name + info["cli_flags"] = param.opts + info["param_type"] = "option" + else: + info["name"] = param.name + if isinstance(param, click.Argument): + info["param_type"] = "argument" + + # extract help text for added context + help_text = getattr(param, "help", None) + if help_text: + info["description"] = help_text + + if isinstance(param, click.Option): + # Boolean flags + if param.is_flag: + info["type"] = "boolean" + info["default"] = param.default or False + + # choices + elif param.type and hasattr(param.type, "choices"): + info["type"] = "string" + info["choices"] = list(param.type.choices) + + # multiple + elif param.multiple: + info["type"] = "array" + info["items"] = {"type": "string"} + + # files + elif isinstance(param.type, click.Path): + info["type"] = "string" + info["format"] = "path" + if param.type.exists: + info["path_must_exist"] = True + if param.type.file_okay and not param.type.dir_okay: + info["path_type"] = "file" + elif param.type.dir_okay and not param.type.file_okay: + info["path_type"] = "directory" + + # default + else: + info["type"] = "string" + + # defaults (important to avoid noise in returned command) + if param.default is not None and not param.is_flag: + if isinstance(param.default, tuple): + info["default"] = list(param.default) + elif isinstance(param.default, (str, int, float, bool, list, dict)): + info["default"] = param.default + + # required params + info["required"] = param.required + + return info + + +def discover_single_command(cmd: click.Command) -> Dict[str, Any]: + """Discover a single command and its parameters.""" + cmd_info = {"name": cmd.name, "description": cmd.help, "parameters": []} + + for param in cmd.params: + if param.name in ["verbose", "v"]: + continue + + param_info = extract_parameter_info(param) + cmd_info["parameters"].append(param_info) + + return cmd_info + + +def discover_command_group(group: click.Group) -> Dict[str, Any]: + """Discover all commands in a command group and their parameters.""" + result = {"name": group.name, "description": group.help, "commands": {}} + + for cmd_name, cmd in group.commands.items(): + if isinstance(cmd, click.Group): + # recursively discover nested command groups + result["commands"][cmd_name] = discover_command_group(cmd) + else: + result["commands"][cmd_name] = discover_single_command(cmd) + + return result + + +def discover_all_commands(cli: click.Group) -> Dict[str, Any]: + """Discover all commands in the CLI and their parameters.""" + return discover_command_group(cli) + + +if __name__ == "__main__": + from rsconnect.main import cli + + # Discover all commands in the CLI + # use this for testing/debugging + all_commands = discover_all_commands(cli) + print(json.dumps(all_commands, indent=2)) diff --git a/tests/test_mcp_deploy_context.py b/tests/test_mcp_deploy_context.py new file mode 100644 index 00000000..39a98ce3 --- /dev/null +++ b/tests/test_mcp_deploy_context.py @@ -0,0 +1,225 @@ +"""Tests for MCP deploy context.""" + +import pytest + +# Skip entire module if fastmcp is not available (requires Python 3.10+) +pytest.importorskip("fastmcp", reason="fastmcp library not installed (requires Python 3.10+)") + +from unittest import TestCase # noqa + +from rsconnect.main import cli # noqa +from rsconnect.mcp_deploy_context import discover_all_commands # noqa + + +class TestDiscoverAllCommands(TestCase): + def test_discover_rsconnect_cli(self): + result = discover_all_commands(cli) + + self.assertIn("commands", result) + self.assertIsNotNone(result["description"]) + + def test_top_level_commands(self): + result = discover_all_commands(cli) + + expected = [ + "version", + "mcp-server", + "add", + "list", + "remove", + "details", + "info", + "deploy", + "write-manifest", + "content", + "system", + "bootstrap", + ] + for cmd in expected: + self.assertIn(cmd, result["commands"]) + + def test_deploy_is_command_group(self): + result = discover_all_commands(cli) + self.assertIn("commands", result["commands"]["deploy"]) + + def test_deploy_subcommands(self): + result = discover_all_commands(cli) + + deploy = result["commands"]["deploy"] + expected = [ + "notebook", + "voila", + "manifest", + "quarto", + "tensorflow", + "html", + "api", + "flask", + "fastapi", + "dash", + "streamlit", + "bokeh", + "shiny", + "gradio", + ] + for subcmd in expected: + self.assertIn(subcmd, deploy["commands"]) + + def test_content_is_command_group(self): + result = discover_all_commands(cli) + self.assertIn("commands", result["commands"]["content"]) + + def test_content_subcommands(self): + result = discover_all_commands(cli) + + content = result["commands"]["content"] + expected = ["search", "describe", "download-bundle", "build"] + for subcmd in expected: + self.assertIn(subcmd, content["commands"]) + + def test_content_build_nested_group(self): + result = discover_all_commands(cli) + + build = result["commands"]["content"]["commands"]["build"] + self.assertIn("commands", build) + + expected = ["add", "rm", "ls", "history", "logs", "run"] + for subcmd in expected: + self.assertIn(subcmd, build["commands"]) + + def test_system_caches_nested_group(self): + result = discover_all_commands(cli) + + caches = result["commands"]["system"]["commands"]["caches"] + self.assertIn("commands", caches) + + expected = ["list", "delete"] + for subcmd in expected: + self.assertIn(subcmd, caches["commands"]) + + def test_write_manifest_is_command_group(self): + result = discover_all_commands(cli) + self.assertIn("commands", result["commands"]["write-manifest"]) + + def test_version_is_simple_command(self): + result = discover_all_commands(cli) + + version = result["commands"]["version"] + self.assertNotIn("commands", version) + self.assertIn("parameters", version) + + def test_mcp_server_command_exists(self): + result = discover_all_commands(cli) + self.assertIn("mcp-server", result["commands"]) + self.assertIn("parameters", result["commands"]["mcp-server"]) + + def test_deploy_notebook_has_parameters(self): + result = discover_all_commands(cli) + + notebook = result["commands"]["deploy"]["commands"]["notebook"] + param_names = [p["name"] for p in notebook["parameters"]] + + self.assertIn("file", param_names) + self.assertIn("name", param_names) + self.assertIn("server", param_names) + self.assertIn("api_key", param_names) + + def test_add_command_has_parameters(self): + result = discover_all_commands(cli) + + add = result["commands"]["add"] + param_names = [p["name"] for p in add["parameters"]] + + self.assertIn("name", param_names) + self.assertIn("server", param_names) + self.assertIn("api_key", param_names) + self.assertIn("insecure", param_names) + + def test_parameter_has_required_fields(self): + result = discover_all_commands(cli) + + for param in result["commands"]["add"]["parameters"]: + self.assertIn("name", param) + self.assertIn("param_type", param) + self.assertIn("required", param) + + if param["param_type"] == "option": + self.assertIn("cli_flags", param) + self.assertGreater(len(param["cli_flags"]), 0) + + def test_boolean_flags_identified(self): + result = discover_all_commands(cli) + + add = result["commands"]["add"] + insecure = next((p for p in add["parameters"] if p["name"] == "insecure"), None) + + self.assertIsNotNone(insecure) + self.assertEqual(insecure["type"], "boolean") + + def test_parameters_have_descriptions(self): + result = discover_all_commands(cli) + + add = result["commands"]["add"] + server = next((p for p in add["parameters"] if p["name"] == "server"), None) + + self.assertIsNotNone(server) + self.assertIn("description", server) + self.assertGreater(len(server["description"]), 0) + + def test_verbose_parameters_excluded(self): + result = discover_all_commands(cli) + + param_names = [p["name"] for p in result["commands"]["add"]["parameters"]] + self.assertNotIn("verbose", param_names) + self.assertNotIn("v", param_names) + + def test_all_commands_have_valid_structure(self): + def validate_command(cmd_info, path=""): + self.assertIn("name", cmd_info) + + if "commands" in cmd_info: + self.assertIsInstance(cmd_info["commands"], dict) + for subcmd_name, subcmd_info in cmd_info["commands"].items(): + validate_command(subcmd_info, f"{path}/{subcmd_name}") + else: + self.assertIn("parameters", cmd_info) + self.assertIsInstance(cmd_info["parameters"], list) + + for param in cmd_info["parameters"]: + self.assertIn("name", param) + self.assertIn("param_type", param) + self.assertIn("required", param) + + result = discover_all_commands(cli) + validate_command(result, "cli") + + def test_multiple_value_parameters(self): + result = discover_all_commands(cli) + + quarto = result["commands"]["deploy"]["commands"]["quarto"] + exclude = next((p for p in quarto["parameters"] if p["name"] == "exclude"), None) + + self.assertIsNotNone(exclude) + self.assertEqual(exclude["type"], "array") + + def test_required_parameters_marked(self): + result = discover_all_commands(cli) + + describe = result["commands"]["content"]["commands"]["describe"] + guid = next((p for p in describe["parameters"] if p["name"] == "guid"), None) + + self.assertIsNotNone(guid) + self.assertTrue(guid["required"]) + + def test_cli_flags_format(self): + result = discover_all_commands(cli) + + add = result["commands"]["add"] + name = next((p for p in add["parameters"] if p["name"] == "name"), None) + + self.assertIsNotNone(name) + self.assertIn("cli_flags", name) + self.assertGreater(len(name["cli_flags"]), 0) + + for flag in name["cli_flags"]: + self.assertTrue(flag.startswith("-"))