Compare commits

...

5 Commits

Author SHA1 Message Date
Soulter
264e3eedd8 fix: correct regex pattern for shell meta characters 2026-04-12 15:04:24 +08:00
Soulter
03cafea1ce chore: ruff format 2026-04-12 15:01:52 +08:00
Soulter
7c827e4ec8 Update astrbot/core/agent/mcp_client.py
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2026-04-12 15:01:29 +08:00
Soulter
70657907f2 Update astrbot/core/agent/mcp_client.py
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2026-04-12 15:01:04 +08:00
Soulter
dbafa07b52 perf: add validation for MCP stdio configuration 2026-04-12 14:55:17 +08:00
6 changed files with 217 additions and 5 deletions

View File

@@ -1,9 +1,11 @@
import asyncio
import logging
import os
import re
import sys
from contextlib import AsyncExitStack
from datetime import timedelta
from pathlib import Path, PureWindowsPath
from typing import Generic
from tenacity import (
@@ -21,6 +23,75 @@ from astrbot.core.utils.log_pipe import LogPipe
from .run_context import TContext
from .tool import FunctionTool
_DEFAULT_STDIO_COMMAND_ALLOWLIST = frozenset(
{
"python",
"python3",
"py",
"node",
"npx",
"npm",
"pnpm",
"yarn",
"bun",
"bunx",
"deno",
"uv",
"uvx",
}
)
_DENIED_STDIO_COMMANDS = frozenset(
{
"bash",
"sh",
"zsh",
"fish",
"cmd",
"cmd.exe",
"powershell",
"powershell.exe",
"pwsh",
"pwsh.exe",
"osascript",
"open",
"curl",
"wget",
"nc",
"netcat",
"telnet",
"ssh",
"scp",
"rm",
"mv",
"cp",
"dd",
"mkfs",
"sudo",
"su",
"chmod",
"chown",
"kill",
"killall",
"shutdown",
"reboot",
"poweroff",
"halt",
}
)
_SHELL_META_RE = re.compile(r"[\r\n\x00;&|<>`$]")
_PYTHON_INLINE_CODE_FLAGS = frozenset({"-c"})
_JS_INLINE_CODE_FLAGS = frozenset({"-e", "--eval", "-p", "--print"})
_DENIED_DOCKER_ARGS = frozenset(
{
"--privileged",
"--pid=host",
"--network=host",
"--net=host",
"--ipc=host",
}
)
_STDIO_ALLOWLIST_ENV = "ASTRBOT_MCP_STDIO_ALLOWED_COMMANDS"
try:
import anyio
import mcp
@@ -42,11 +113,129 @@ def _prepare_config(config: dict) -> dict:
"""Prepare configuration, handle nested format"""
if config.get("mcpServers"):
first_key = next(iter(config["mcpServers"]))
config = config["mcpServers"][first_key]
config = dict(config["mcpServers"][first_key])
else:
config = dict(config)
config.pop("active", None)
return config
def _normalize_stdio_command_name(command: str) -> str:
command = command.strip()
if "\\" in command:
command_name = PureWindowsPath(command).name
else:
command_name = Path(command).name
command_name = command_name.lower()
for suffix in (".exe", ".cmd", ".bat"):
if command_name.endswith(suffix):
return command_name[: -len(suffix)]
return command_name
def _get_stdio_command_allowlist() -> set[str]:
allowed = set(_DEFAULT_STDIO_COMMAND_ALLOWLIST)
configured = os.environ.get(_STDIO_ALLOWLIST_ENV, "")
if configured.strip():
allowed = {
_normalize_stdio_command_name(item)
for item in configured.split(",")
if item.strip()
}
return allowed
def _is_stdio_config(config: dict) -> bool:
cfg = _prepare_config(config.copy())
return "url" not in cfg
def _validate_stdio_args(command_name: str, args: object) -> None:
if args is None:
return
if not isinstance(args, list) or not all(isinstance(arg, str) for arg in args):
raise ValueError("MCP stdio args must be a list of strings.")
for arg in args:
if "\x00" in arg or "\r" in arg or "\n" in arg:
raise ValueError("MCP stdio args cannot contain control characters.")
if command_name.startswith("python") or command_name == "py":
if any(
arg == "-c"
or (arg.startswith("-") and not arg.startswith("--") and "c" in arg)
for arg in args
):
raise ValueError(
"MCP stdio Python servers must be launched from a module or file; inline code flags such as -c are not allowed."
)
elif command_name in {"node", "deno", "bun"} or command_name.startswith("node"):
if any(
arg in _JS_INLINE_CODE_FLAGS
or arg == "eval"
or (
arg.startswith("-")
and not arg.startswith("--")
and any(c in arg for c in "ep")
)
for arg in args
):
raise ValueError(
"MCP stdio JavaScript servers must be launched from a package or file; inline eval flags are not allowed."
)
elif command_name == "docker":
denied = []
for i, arg in enumerate(args):
if arg in _DENIED_DOCKER_ARGS:
denied.append(arg)
elif (
arg in {"--network", "--net", "--pid", "--ipc"}
and i + 1 < len(args)
and args[i + 1] == "host"
):
denied.append(f"{arg} {args[i + 1]}")
if denied:
raise ValueError(
f"MCP stdio Docker args are unsafe and not allowed: {', '.join(denied)}."
)
def validate_mcp_stdio_config(config: dict) -> None:
"""Validate stdio MCP config before any subprocess can be spawned."""
cfg = _prepare_config(config.copy())
if "url" in cfg:
return
command = cfg.get("command")
if not isinstance(command, str) or not command.strip():
raise ValueError("MCP stdio server requires a non-empty command.")
if _SHELL_META_RE.search(command):
raise ValueError("MCP stdio command contains unsafe shell metacharacters.")
command_name = _normalize_stdio_command_name(command)
if command_name in _DENIED_STDIO_COMMANDS:
raise ValueError(f"MCP stdio command `{command_name}` is not allowed.")
allowed = _get_stdio_command_allowlist()
if command_name not in allowed:
allowed_display = ", ".join(sorted(allowed))
raise ValueError(
f"MCP stdio command `{command_name}` is not allowed. "
f"Allowed commands: {allowed_display}. "
f"Set {_STDIO_ALLOWLIST_ENV} to override this list if you trust another launcher."
)
_validate_stdio_args(command_name, cfg.get("args"))
env = cfg.get("env")
if env is not None and not isinstance(env, dict):
raise ValueError("MCP stdio env must be an object.")
if isinstance(env, dict) and not all(
isinstance(key, str) and isinstance(value, str) for key, value in env.items()
):
raise ValueError("MCP stdio env keys and values must be strings.")
def _prepare_stdio_env(config: dict) -> dict:
"""Preserve Windows executable resolution for stdio subprocesses."""
if sys.platform != "win32":
@@ -243,6 +432,7 @@ class MCPClient:
)
else:
validate_mcp_stdio_config(cfg)
cfg = _prepare_stdio_env(cfg)
server_params = mcp.StdioServerParameters(
**cfg,

View File

@@ -3,7 +3,7 @@ import traceback
from quart import request
from astrbot.core import logger
from astrbot.core.agent.mcp_client import MCPTool
from astrbot.core.agent.mcp_client import MCPTool, validate_mcp_stdio_config
from astrbot.core.core_lifecycle import AstrBotCoreLifecycle
from astrbot.core.star import star_map
from astrbot.core.tools.registry import get_builtin_tool_config_statuses
@@ -153,6 +153,11 @@ class ToolsRoute(Route):
.__dict__
)
try:
validate_mcp_stdio_config(server_config)
except ValueError as e:
return Response().error(f"{e!s}").__dict__
config = self.tool_mgr.load_mcp_config()
if name in config["mcpServers"]:
@@ -256,6 +261,11 @@ class ToolsRoute(Route):
if key != "active": # 除了active之外的所有字段都保留
server_config[key] = value
try:
validate_mcp_stdio_config(server_config)
except ValueError as e:
return Response().error(f"{e!s}").__dict__
# config["mcpServers"][name] = server_config
if is_rename:
config["mcpServers"].pop(old_name)
@@ -415,6 +425,11 @@ class ToolsRoute(Route):
.__dict__
)
try:
validate_mcp_stdio_config(config)
except ValueError as e:
return Response().error(f"{e!s}").__dict__
tools_name = await self.tool_mgr.test_mcp_server_connection(config)
return (
Response()

View File

@@ -110,6 +110,10 @@
<small style="color: grey">*{{ tm('dialogs.addServer.tips.timeoutConfig') }}</small>
<v-alert type="info" variant="tonal" density="compact" class="mt-3">
{{ tm('dialogs.addServer.tips.transportRecommendation') }}
</v-alert>
<div class="monaco-container" style="margin-top: 16px;">
<VueMonacoEditor v-model:value="serverConfigJson" theme="vs-dark" language="json" :options="{
minimap: {

View File

@@ -95,7 +95,8 @@
"sync": "Sync"
},
"tips": {
"timeoutConfig": "Please configure tool call timeout separately in the configuration page"
"timeoutConfig": "Please configure tool call timeout separately in the configuration page",
"transportRecommendation": "Prefer Streamable HTTP or SSE mode. Stdio mode starts a local process on the AstrBot host and should only be used for trusted MCP servers."
}
},
"serverDetail": {

View File

@@ -95,7 +95,8 @@
"sync": "Синхронизировать"
},
"tips": {
"timeoutConfig": "Тайм-аут вызова инструментов настраивается отдельно на странице конфигурации"
"timeoutConfig": "Тайм-аут вызова инструментов настраивается отдельно на странице конфигурации",
"transportRecommendation": "Рекомендуется сначала использовать режим Streamable HTTP или SSE. Режим stdio запускает локальный процесс на хосте AstrBot и подходит только для доверенных MCP-серверов."
}
},
"serverDetail": {

View File

@@ -95,7 +95,8 @@
"sync": "同步"
},
"tips": {
"timeoutConfig": "工具调用的超时时间请前往配置页面单独配置"
"timeoutConfig": "工具调用的超时时间请前往配置页面单独配置",
"transportRecommendation": "建议优先使用 Streamable HTTP 或 SSE 模式。stdio 模式会在 AstrBot 主机上启动本地进程,仅适合可信 MCP 服务器。"
}
},
"serverDetail": {