Compare commits

...

3 Commits

Author SHA1 Message Date
Soulter
cdc9748a31 fix: rf 2026-06-08 01:20:31 +08:00
Weilong Liao
43efef442c Update astrbot/core/tools/message_tools.py
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2026-06-08 01:19:54 +08:00
Soulter
6f6d081c5c fix: restrict local file paths in message tool 2026-06-08 01:15:02 +08:00
2 changed files with 149 additions and 11 deletions

View File

@@ -2,6 +2,7 @@ import json
import os
import shlex
import uuid
from pathlib import Path
from pydantic import Field
from pydantic.dataclasses import dataclass
@@ -14,9 +15,55 @@ from astrbot.core.astr_agent_context import AstrAgentContext
from astrbot.core.computer.computer_client import get_booter
from astrbot.core.message.message_event_result import MessageChain
from astrbot.core.platform.message_session import MessageSession
from astrbot.core.tools.computer_tools.util import check_admin_permission
from astrbot.core.tools.computer_tools.util import (
check_admin_permission,
is_local_runtime,
workspace_root,
)
from astrbot.core.tools.registry import builtin_tool
from astrbot.core.utils.astrbot_path import get_astrbot_temp_path
from astrbot.core.utils.astrbot_path import (
get_astrbot_system_tmp_path,
get_astrbot_temp_path,
)
def _file_send_allowed_roots(umo: str | None) -> tuple[Path, ...]:
roots = []
if umo:
roots.append(workspace_root(umo))
roots.extend(
[
Path(get_astrbot_temp_path()).resolve(strict=False),
Path(get_astrbot_system_tmp_path()).resolve(strict=False),
]
)
return tuple(roots)
def _is_path_within(path: Path, roots: tuple[Path, ...]) -> bool:
return any(path == root or path.is_relative_to(root) for root in roots)
def _is_restricted_local_env(context: ContextWrapper[AstrAgentContext]) -> bool:
if not is_local_runtime(context):
return False
cfg = context.context.context.get_config(
umo=context.context.event.unified_msg_origin
)
provider_settings = cfg.get("provider_settings", {})
require_admin = provider_settings.get("computer_use_require_admin", True)
return require_admin and context.context.event.role != "admin"
def _can_send_local_file(
context: ContextWrapper[AstrAgentContext],
local_path: Path,
) -> bool:
umo = context.context.event.unified_msg_origin
allowed_roots = _file_send_allowed_roots(umo)
if _is_path_within(local_path, allowed_roots):
return True
return is_local_runtime(context) and not _is_restricted_local_env(context)
@builtin_tool
@@ -85,23 +132,38 @@ class SendMessageToUserTool(FunctionTool[AstrAgentContext]):
*,
component_type: str = "file",
) -> tuple[str, bool]:
path = str(path)
# if the path is relative, check if the file exists in user's local workspace
path = str(path).strip()
if not path:
raise FileNotFoundError(f"{component_type} path is empty")
# Relative host paths are resolved only inside the user's workspace.
if not os.path.isabs(path):
unified_msg_origin = context.context.event.unified_msg_origin
if unified_msg_origin:
from astrbot.core.tools.computer_tools.util import workspace_root
try:
ws_path = workspace_root(unified_msg_origin)
ws_candidate = (ws_path / path).resolve()
ws_candidate = (ws_path / path).resolve(strict=False)
if ws_candidate.is_file() and ws_candidate.is_relative_to(ws_path):
return str(ws_candidate), False
except Exception:
pass
# check if the file exists in local environment (only allow absolute paths to prevent traversal)
elif os.path.isfile(path):
return path, False
else:
local_candidate = Path(path).expanduser().resolve(strict=False)
if local_candidate.is_file():
if _can_send_local_file(context, local_candidate):
return str(local_candidate), False
if is_local_runtime(context):
allowed = ", ".join(
str(root)
for root in _file_send_allowed_roots(
context.context.event.unified_msg_origin
)
)
raise PermissionError(
"Local file send is restricted for this user. "
f"Allowed directories: {allowed}. "
f"Blocked path: {local_candidate}."
)
try:
sb = await get_booter(
@@ -221,6 +283,8 @@ class SendMessageToUserTool(FunctionTool[AstrAgentContext]):
)
except FileNotFoundError as exc:
return f"error: {exc}"
except PermissionError as exc:
return f"error: {exc}"
except Exception as exc:
return f"error: failed to build messages[{idx}] component: {exc}"

View File

@@ -12,9 +12,15 @@ def _make_context(
current_session="feishu:GroupMessage:oc_xxx",
role="admin",
require_admin=True,
runtime="local",
):
"""Build a minimal ContextWrapper for SendMessageToUserTool."""
cfg = {"provider_settings": {"computer_use_require_admin": require_admin}}
cfg = {
"provider_settings": {
"computer_use_require_admin": require_admin,
"computer_use_runtime": runtime,
}
}
return SimpleNamespace(
context=SimpleNamespace(
event=SimpleNamespace(
@@ -161,3 +167,71 @@ async def test_send_message_missing_image_path_stops_before_send(tmp_path, monke
assert "error: failed to build messages[1] component: sandbox unavailable" in result
ctx.context.context.send_message.assert_not_called()
@pytest.mark.asyncio
async def test_non_admin_cannot_send_arbitrary_local_absolute_file(tmp_path):
"""Non-admin users cannot send host files outside the allowed local roots."""
tool = SendMessageToUserTool()
ctx = _make_context(role="member", require_admin=True)
secret_path = tmp_path / "secret.txt"
secret_path.write_text("secret", encoding="utf-8")
result = await tool.call(
ctx,
messages=[{"type": "file", "path": str(secret_path)}],
)
assert "error: Local file send is restricted for this user" in result
assert str(secret_path) in result
ctx.context.context.send_message.assert_not_called()
@pytest.mark.asyncio
async def test_non_admin_can_send_workspace_file(tmp_path, monkeypatch):
"""Non-admin users can send files inside their per-session workspace."""
tool = SendMessageToUserTool()
ctx = _make_context(
current_session="feishu:GroupMessage:oc_workspace",
role="member",
require_admin=True,
)
workspace_root = tmp_path / "workspaces"
workspace_file = workspace_root / "feishu_GroupMessage_oc_workspace" / "result.txt"
workspace_file.parent.mkdir(parents=True)
workspace_file.write_text("result", encoding="utf-8")
monkeypatch.setattr(
"astrbot.core.tools.computer_tools.util.get_astrbot_workspaces_path",
lambda: str(workspace_root),
)
result = await tool.call(
ctx,
messages=[{"type": "file", "path": "result.txt"}],
)
assert "Message sent to session" in result
ctx.context.context.send_message.assert_called_once()
@pytest.mark.asyncio
async def test_non_admin_can_send_temp_file(tmp_path, monkeypatch):
"""Non-admin users can send generated files under AstrBot temp."""
tool = SendMessageToUserTool()
ctx = _make_context(role="member", require_admin=True)
temp_root = tmp_path / "temp"
temp_root.mkdir()
output_path = temp_root / "output.txt"
output_path.write_text("output", encoding="utf-8")
monkeypatch.setattr(
"astrbot.core.tools.message_tools.get_astrbot_temp_path",
lambda: str(temp_root),
)
result = await tool.call(
ctx,
messages=[{"type": "file", "path": str(output_path)}],
)
assert "Message sent to session" in result
ctx.context.context.send_message.assert_called_once()