mirror of
https://github.com/AstrBotDevs/AstrBot
synced 2026-07-01 18:20:16 +08:00
Compare commits
3 Commits
codex/fix-
...
codex/hard
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cdc9748a31 | ||
|
|
43efef442c | ||
|
|
6f6d081c5c |
@@ -2,6 +2,7 @@ import json
|
||||
import os
|
||||
import shlex
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
|
||||
from pydantic import Field
|
||||
from pydantic.dataclasses import dataclass
|
||||
@@ -14,9 +15,55 @@ from astrbot.core.astr_agent_context import AstrAgentContext
|
||||
from astrbot.core.computer.computer_client import get_booter
|
||||
from astrbot.core.message.message_event_result import MessageChain
|
||||
from astrbot.core.platform.message_session import MessageSession
|
||||
from astrbot.core.tools.computer_tools.util import check_admin_permission
|
||||
from astrbot.core.tools.computer_tools.util import (
|
||||
check_admin_permission,
|
||||
is_local_runtime,
|
||||
workspace_root,
|
||||
)
|
||||
from astrbot.core.tools.registry import builtin_tool
|
||||
from astrbot.core.utils.astrbot_path import get_astrbot_temp_path
|
||||
from astrbot.core.utils.astrbot_path import (
|
||||
get_astrbot_system_tmp_path,
|
||||
get_astrbot_temp_path,
|
||||
)
|
||||
|
||||
|
||||
def _file_send_allowed_roots(umo: str | None) -> tuple[Path, ...]:
|
||||
roots = []
|
||||
if umo:
|
||||
roots.append(workspace_root(umo))
|
||||
roots.extend(
|
||||
[
|
||||
Path(get_astrbot_temp_path()).resolve(strict=False),
|
||||
Path(get_astrbot_system_tmp_path()).resolve(strict=False),
|
||||
]
|
||||
)
|
||||
return tuple(roots)
|
||||
|
||||
|
||||
def _is_path_within(path: Path, roots: tuple[Path, ...]) -> bool:
|
||||
return any(path == root or path.is_relative_to(root) for root in roots)
|
||||
|
||||
|
||||
def _is_restricted_local_env(context: ContextWrapper[AstrAgentContext]) -> bool:
|
||||
if not is_local_runtime(context):
|
||||
return False
|
||||
cfg = context.context.context.get_config(
|
||||
umo=context.context.event.unified_msg_origin
|
||||
)
|
||||
provider_settings = cfg.get("provider_settings", {})
|
||||
require_admin = provider_settings.get("computer_use_require_admin", True)
|
||||
return require_admin and context.context.event.role != "admin"
|
||||
|
||||
|
||||
def _can_send_local_file(
|
||||
context: ContextWrapper[AstrAgentContext],
|
||||
local_path: Path,
|
||||
) -> bool:
|
||||
umo = context.context.event.unified_msg_origin
|
||||
allowed_roots = _file_send_allowed_roots(umo)
|
||||
if _is_path_within(local_path, allowed_roots):
|
||||
return True
|
||||
return is_local_runtime(context) and not _is_restricted_local_env(context)
|
||||
|
||||
|
||||
@builtin_tool
|
||||
@@ -85,23 +132,38 @@ class SendMessageToUserTool(FunctionTool[AstrAgentContext]):
|
||||
*,
|
||||
component_type: str = "file",
|
||||
) -> tuple[str, bool]:
|
||||
path = str(path)
|
||||
# if the path is relative, check if the file exists in user's local workspace
|
||||
path = str(path).strip()
|
||||
if not path:
|
||||
raise FileNotFoundError(f"{component_type} path is empty")
|
||||
|
||||
# Relative host paths are resolved only inside the user's workspace.
|
||||
if not os.path.isabs(path):
|
||||
unified_msg_origin = context.context.event.unified_msg_origin
|
||||
if unified_msg_origin:
|
||||
from astrbot.core.tools.computer_tools.util import workspace_root
|
||||
|
||||
try:
|
||||
ws_path = workspace_root(unified_msg_origin)
|
||||
ws_candidate = (ws_path / path).resolve()
|
||||
ws_candidate = (ws_path / path).resolve(strict=False)
|
||||
if ws_candidate.is_file() and ws_candidate.is_relative_to(ws_path):
|
||||
return str(ws_candidate), False
|
||||
except Exception:
|
||||
pass
|
||||
# check if the file exists in local environment (only allow absolute paths to prevent traversal)
|
||||
elif os.path.isfile(path):
|
||||
return path, False
|
||||
else:
|
||||
local_candidate = Path(path).expanduser().resolve(strict=False)
|
||||
if local_candidate.is_file():
|
||||
if _can_send_local_file(context, local_candidate):
|
||||
return str(local_candidate), False
|
||||
if is_local_runtime(context):
|
||||
allowed = ", ".join(
|
||||
str(root)
|
||||
for root in _file_send_allowed_roots(
|
||||
context.context.event.unified_msg_origin
|
||||
)
|
||||
)
|
||||
raise PermissionError(
|
||||
"Local file send is restricted for this user. "
|
||||
f"Allowed directories: {allowed}. "
|
||||
f"Blocked path: {local_candidate}."
|
||||
)
|
||||
|
||||
try:
|
||||
sb = await get_booter(
|
||||
@@ -221,6 +283,8 @@ class SendMessageToUserTool(FunctionTool[AstrAgentContext]):
|
||||
)
|
||||
except FileNotFoundError as exc:
|
||||
return f"error: {exc}"
|
||||
except PermissionError as exc:
|
||||
return f"error: {exc}"
|
||||
except Exception as exc:
|
||||
return f"error: failed to build messages[{idx}] component: {exc}"
|
||||
|
||||
|
||||
@@ -12,9 +12,15 @@ def _make_context(
|
||||
current_session="feishu:GroupMessage:oc_xxx",
|
||||
role="admin",
|
||||
require_admin=True,
|
||||
runtime="local",
|
||||
):
|
||||
"""Build a minimal ContextWrapper for SendMessageToUserTool."""
|
||||
cfg = {"provider_settings": {"computer_use_require_admin": require_admin}}
|
||||
cfg = {
|
||||
"provider_settings": {
|
||||
"computer_use_require_admin": require_admin,
|
||||
"computer_use_runtime": runtime,
|
||||
}
|
||||
}
|
||||
return SimpleNamespace(
|
||||
context=SimpleNamespace(
|
||||
event=SimpleNamespace(
|
||||
@@ -161,3 +167,71 @@ async def test_send_message_missing_image_path_stops_before_send(tmp_path, monke
|
||||
|
||||
assert "error: failed to build messages[1] component: sandbox unavailable" in result
|
||||
ctx.context.context.send_message.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_non_admin_cannot_send_arbitrary_local_absolute_file(tmp_path):
|
||||
"""Non-admin users cannot send host files outside the allowed local roots."""
|
||||
tool = SendMessageToUserTool()
|
||||
ctx = _make_context(role="member", require_admin=True)
|
||||
secret_path = tmp_path / "secret.txt"
|
||||
secret_path.write_text("secret", encoding="utf-8")
|
||||
|
||||
result = await tool.call(
|
||||
ctx,
|
||||
messages=[{"type": "file", "path": str(secret_path)}],
|
||||
)
|
||||
|
||||
assert "error: Local file send is restricted for this user" in result
|
||||
assert str(secret_path) in result
|
||||
ctx.context.context.send_message.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_non_admin_can_send_workspace_file(tmp_path, monkeypatch):
|
||||
"""Non-admin users can send files inside their per-session workspace."""
|
||||
tool = SendMessageToUserTool()
|
||||
ctx = _make_context(
|
||||
current_session="feishu:GroupMessage:oc_workspace",
|
||||
role="member",
|
||||
require_admin=True,
|
||||
)
|
||||
workspace_root = tmp_path / "workspaces"
|
||||
workspace_file = workspace_root / "feishu_GroupMessage_oc_workspace" / "result.txt"
|
||||
workspace_file.parent.mkdir(parents=True)
|
||||
workspace_file.write_text("result", encoding="utf-8")
|
||||
monkeypatch.setattr(
|
||||
"astrbot.core.tools.computer_tools.util.get_astrbot_workspaces_path",
|
||||
lambda: str(workspace_root),
|
||||
)
|
||||
|
||||
result = await tool.call(
|
||||
ctx,
|
||||
messages=[{"type": "file", "path": "result.txt"}],
|
||||
)
|
||||
|
||||
assert "Message sent to session" in result
|
||||
ctx.context.context.send_message.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_non_admin_can_send_temp_file(tmp_path, monkeypatch):
|
||||
"""Non-admin users can send generated files under AstrBot temp."""
|
||||
tool = SendMessageToUserTool()
|
||||
ctx = _make_context(role="member", require_admin=True)
|
||||
temp_root = tmp_path / "temp"
|
||||
temp_root.mkdir()
|
||||
output_path = temp_root / "output.txt"
|
||||
output_path.write_text("output", encoding="utf-8")
|
||||
monkeypatch.setattr(
|
||||
"astrbot.core.tools.message_tools.get_astrbot_temp_path",
|
||||
lambda: str(temp_root),
|
||||
)
|
||||
|
||||
result = await tool.call(
|
||||
ctx,
|
||||
messages=[{"type": "file", "path": str(output_path)}],
|
||||
)
|
||||
|
||||
assert "Message sent to session" in result
|
||||
ctx.context.context.send_message.assert_called_once()
|
||||
|
||||
Reference in New Issue
Block a user