Compare commits

...

5 Commits

Author SHA1 Message Date
Soulter
af70151ff8 chore: bump version to 4.25.5 2026-06-08 01:23:06 +08:00
Weilong Liao
66ec415e56 fix: restrict local file paths in message tools (#8660)
* fix: restrict local file paths in message tool

* Update astrbot/core/tools/message_tools.py

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* fix: rf

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2026-06-08 01:20:47 +08:00
Weilong Liao
8f5178d265 fix: restore star context typing (#8659) 2026-06-08 00:24:45 +08:00
Soulter
05c137eb29 fix: qq official webhook mode can not restart normally 2026-06-07 18:10:45 +08:00
Copilot
1a04998787 perf: handle Anthropic usage=None on content-filtered responses (#8647)
* Initial plan

* fix: handle missing anthropic usage on filtered responses

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
2026-06-07 15:29:22 +08:00
10 changed files with 442 additions and 22 deletions

View File

@@ -5,7 +5,7 @@ import os
from astrbot.core.computer.booters.cua_defaults import CUA_DEFAULT_CONFIG
from astrbot.core.utils.astrbot_path import get_astrbot_data_path
VERSION = "4.25.4"
VERSION = "4.25.5"
DB_PATH = os.path.join(get_astrbot_data_path(), "data_v4.db")
PERSONAL_WECHAT_CONFIG_METADATA = {
"weixin_oc_base_url": {

View File

@@ -1,10 +1,13 @@
import asyncio
import json
import logging
import time
from binascii import Error as BinasciiError
from typing import cast
import quart
from botpy import BotAPI, BotHttp, BotWebSocket, Client, ConnectionSession, Token
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives.asymmetric import ed25519
from astrbot.api import logger
@@ -13,6 +16,57 @@ from astrbot.api import logger
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
_SIGNATURE_HEADER = "X-Signature-Ed25519"
_SIGNATURE_TIMESTAMP_HEADER = "X-Signature-Timestamp"
_ED25519_SEED_SIZE = 32
_ED25519_SIGNATURE_SIZE = 64
def _build_ed25519_seed(secret: str) -> bytes:
if not secret:
raise ValueError("QQ official bot secret is empty.")
seed = secret.encode("utf-8")
while len(seed) < _ED25519_SEED_SIZE:
seed *= 2
return seed[:_ED25519_SEED_SIZE]
def _sign_qq_webhook_payload(secret: str, timestamp: str, payload: bytes) -> str:
seed = _build_ed25519_seed(secret)
private_key = ed25519.Ed25519PrivateKey.from_private_bytes(seed)
return private_key.sign(timestamp.encode("utf-8") + payload).hex()
def _verify_qq_webhook_signature(
secret: str,
timestamp: str | None,
signature: str | None,
body: bytes,
) -> bool:
if not timestamp or not signature:
return False
try:
signature_buffer = bytes.fromhex(signature)
except (BinasciiError, ValueError):
return False
if (
len(signature_buffer) != _ED25519_SIGNATURE_SIZE
or signature_buffer[63] & 224 != 0
):
return False
try:
seed = _build_ed25519_seed(secret)
private_key = ed25519.Ed25519PrivateKey.from_private_bytes(seed)
public_key = private_key.public_key()
public_key.verify(signature_buffer, timestamp.encode("utf-8") + body)
except (InvalidSignature, ValueError):
return False
return True
class QQOfficialWebhook:
def __init__(
@@ -27,7 +81,12 @@ class QQOfficialWebhook:
if isinstance(self.port, str):
self.port = int(self.port)
self.http: BotHttp = BotHttp(timeout=300, is_sandbox=self.is_sandbox)
self.http: BotHttp = BotHttp(
timeout=300,
is_sandbox=self.is_sandbox,
app_id=self.appid,
secret=self.secret,
)
self.api: BotAPI = BotAPI(http=self.http)
self.token = Token(self.appid, self.secret)
@@ -40,6 +99,7 @@ class QQOfficialWebhook:
self.client = botpy_client
self.event_queue = event_queue
self.shutdown_event = asyncio.Event()
self._connection: ConnectionSession | None = None
# Cache for extra fields extracted from raw webhook payloads, keyed by message id
self._extra_data_cache: dict[str, dict] = {}
@@ -55,6 +115,13 @@ class QQOfficialWebhook:
# 直接注入到 botpy 的 Client移花接木
self.client.api = self.api
self.client.http = self.http
self._setup_connection()
def _setup_connection(self) -> None:
if self._connection is not None:
return
self.client.api = self.api
self.client.http = self.http
async def bot_connect() -> None:
pass
@@ -105,7 +172,24 @@ class QQOfficialWebhook:
Returns:
响应数据
"""
msg: dict = await request.json
body = await request.get_data()
if not _verify_qq_webhook_signature(
self.secret,
request.headers.get(_SIGNATURE_TIMESTAMP_HEADER),
request.headers.get(_SIGNATURE_HEADER),
body,
):
logger.warning("qq_official_webhook signature verification failed.")
return {"error": "Invalid signature"}, 401
try:
msg = json.loads(body.decode("utf-8"))
except json.JSONDecodeError:
logger.warning("qq_official_webhook callback body is not valid JSON.")
return {"error": "Invalid JSON"}, 400
if not isinstance(msg, dict):
return {"error": "Invalid JSON"}, 400
logger.debug(f"收到 qq_official_webhook 回调: {msg}")
event = msg.get("t")
@@ -136,6 +220,13 @@ class QQOfficialWebhook:
if event and opcode == BotWebSocket.WS_DISPATCH_EVENT:
event = msg["t"].lower()
if self._connection is None:
logger.warning(
"qq_official_webhook botpy connection is not initialized; "
"creating parser connection lazily.",
)
self._setup_connection()
# Extract extra fields from raw payload before botpy parses and discards them
if data:
msg_id = data.get("id")

View File

@@ -302,12 +302,14 @@ class ProviderAnthropic(Provider):
return system_prompt, new_messages
def _extract_usage(self, usage: Usage) -> TokenUsage:
def _extract_usage(self, usage: Usage | None) -> TokenUsage:
if usage is None:
return TokenUsage()
# https://docs.claude.com/en/docs/build-with-claude/prompt-caching#tracking-cache-performance
return TokenUsage(
input_other=usage.input_tokens or 0,
input_cached=usage.cache_read_input_tokens or 0,
output=usage.output_tokens,
output=usage.output_tokens or 0,
)
def _update_usage(self, token_usage: TokenUsage, usage: MessageDeltaUsage) -> None:

View File

@@ -1,7 +1,7 @@
from __future__ import annotations
import logging
from typing import Any, Protocol
from typing import TYPE_CHECKING, Any
from astrbot.core import html_renderer
from astrbot.core.utils.command_parser import CommandParserMixin
@@ -9,6 +9,9 @@ from astrbot.core.utils.plugin_kv_store import PluginKVStoreMixin
from .star import StarMetadata, star_map, star_registry
if TYPE_CHECKING:
from .context import Context
logger = logging.getLogger("astrbot")
@@ -17,11 +20,9 @@ class Star(CommandParserMixin, PluginKVStoreMixin):
author: str
name: str
context: Context
class _ContextLike(Protocol):
def get_config(self, umo: str | None = None) -> Any: ...
def __init__(self, context: _ContextLike, config: dict | None = None) -> None:
def __init__(self, context: Context, config: dict | None = None) -> None:
self.context = context
def _get_context_config(self) -> Any:

View File

@@ -2,6 +2,7 @@ import json
import os
import shlex
import uuid
from pathlib import Path
from pydantic import Field
from pydantic.dataclasses import dataclass
@@ -14,9 +15,55 @@ from astrbot.core.astr_agent_context import AstrAgentContext
from astrbot.core.computer.computer_client import get_booter
from astrbot.core.message.message_event_result import MessageChain
from astrbot.core.platform.message_session import MessageSession
from astrbot.core.tools.computer_tools.util import check_admin_permission
from astrbot.core.tools.computer_tools.util import (
check_admin_permission,
is_local_runtime,
workspace_root,
)
from astrbot.core.tools.registry import builtin_tool
from astrbot.core.utils.astrbot_path import get_astrbot_temp_path
from astrbot.core.utils.astrbot_path import (
get_astrbot_system_tmp_path,
get_astrbot_temp_path,
)
def _file_send_allowed_roots(umo: str | None) -> tuple[Path, ...]:
roots = []
if umo:
roots.append(workspace_root(umo))
roots.extend(
[
Path(get_astrbot_temp_path()).resolve(strict=False),
Path(get_astrbot_system_tmp_path()).resolve(strict=False),
]
)
return tuple(roots)
def _is_path_within(path: Path, roots: tuple[Path, ...]) -> bool:
return any(path == root or path.is_relative_to(root) for root in roots)
def _is_restricted_local_env(context: ContextWrapper[AstrAgentContext]) -> bool:
if not is_local_runtime(context):
return False
cfg = context.context.context.get_config(
umo=context.context.event.unified_msg_origin
)
provider_settings = cfg.get("provider_settings", {})
require_admin = provider_settings.get("computer_use_require_admin", True)
return require_admin and context.context.event.role != "admin"
def _can_send_local_file(
context: ContextWrapper[AstrAgentContext],
local_path: Path,
) -> bool:
umo = context.context.event.unified_msg_origin
allowed_roots = _file_send_allowed_roots(umo)
if _is_path_within(local_path, allowed_roots):
return True
return is_local_runtime(context) and not _is_restricted_local_env(context)
@builtin_tool
@@ -85,23 +132,38 @@ class SendMessageToUserTool(FunctionTool[AstrAgentContext]):
*,
component_type: str = "file",
) -> tuple[str, bool]:
path = str(path)
# if the path is relative, check if the file exists in user's local workspace
path = str(path).strip()
if not path:
raise FileNotFoundError(f"{component_type} path is empty")
# Relative host paths are resolved only inside the user's workspace.
if not os.path.isabs(path):
unified_msg_origin = context.context.event.unified_msg_origin
if unified_msg_origin:
from astrbot.core.tools.computer_tools.util import workspace_root
try:
ws_path = workspace_root(unified_msg_origin)
ws_candidate = (ws_path / path).resolve()
ws_candidate = (ws_path / path).resolve(strict=False)
if ws_candidate.is_file() and ws_candidate.is_relative_to(ws_path):
return str(ws_candidate), False
except Exception:
pass
# check if the file exists in local environment (only allow absolute paths to prevent traversal)
elif os.path.isfile(path):
return path, False
else:
local_candidate = Path(path).expanduser().resolve(strict=False)
if local_candidate.is_file():
if _can_send_local_file(context, local_candidate):
return str(local_candidate), False
if is_local_runtime(context):
allowed = ", ".join(
str(root)
for root in _file_send_allowed_roots(
context.context.event.unified_msg_origin
)
)
raise PermissionError(
"Local file send is restricted for this user. "
f"Allowed directories: {allowed}. "
f"Blocked path: {local_candidate}."
)
try:
sb = await get_booter(
@@ -221,6 +283,8 @@ class SendMessageToUserTool(FunctionTool[AstrAgentContext]):
)
except FileNotFoundError as exc:
return f"error: {exc}"
except PermissionError as exc:
return f"error: {exc}"
except Exception as exc:
return f"error: failed to build messages[{idx}] component: {exc}"

30
changelogs/v4.25.5.md Normal file
View File

@@ -0,0 +1,30 @@
- [更新日志(简体中文)](#chinese)
- [Changelog(English)](#english)
<a id="chinese"></a>
## What's Changed
### 修复
- 收紧消息工具对本地文件路径的处理边界,减少非预期路径被用于消息附件的情况。([#8660](https://github.com/AstrBotDevs/AstrBot/pull/8660))
- 修复 Star Context 类型定义,恢复相关 SDK 类型提示与运行兼容性。([#8659](https://github.com/AstrBotDevs/AstrBot/pull/8659))
- 修复 QQ 官方 Webhook 模式无法正常重启的问题。
### 优化
- 改进 Anthropic 在内容过滤响应中缺失 `usage` 字段时的处理,避免相关请求结果解析异常。([#8647](https://github.com/AstrBotDevs/AstrBot/pull/8647))
<a id="english"></a>
## What's Changed (EN)
### Bug Fixes
- Tightened local file path handling in the message tool to avoid unintended attachment path usage. ([#8660](https://github.com/AstrBotDevs/AstrBot/pull/8660))
- Fixed Star Context typing to restore related SDK type hints and runtime compatibility. ([#8659](https://github.com/AstrBotDevs/AstrBot/pull/8659))
- Fixed QQ Official Webhook mode not restarting correctly.
### Improvements
- Improved Anthropic response parsing when content-filtered responses omit the `usage` field. ([#8647](https://github.com/AstrBotDevs/AstrBot/pull/8647))

View File

@@ -1,6 +1,6 @@
[project]
name = "AstrBot"
version = "4.25.4"
version = "4.25.5"
description = "Easy-to-use multi-platform LLM chatbot and development framework"
readme = "README.md"
license = { text = "AGPL-3.0-or-later" }

View File

@@ -483,6 +483,40 @@ def _setup_provider_with_mock_client(monkeypatch) -> anthropic_source.ProviderAn
return provider
@pytest.mark.asyncio
async def test_query_handles_none_usage_when_content_filtered(monkeypatch):
provider = _setup_provider_with_mock_client(monkeypatch)
content_filter_message = (
"The request was rejected because it was considered high risk"
)
class _FakeMessageBlock:
def __init__(self, text: str):
self.type = "text"
self.text = text
class _FakeMessage:
def __init__(self):
self.id = "msg_content_filter"
self.content = [_FakeMessageBlock(content_filter_message)]
self.stop_reason = "content_filter"
self.usage = None
async def fake_create(**kwargs):
return _FakeMessage()
monkeypatch.setattr(anthropic_source, "Message", _FakeMessage)
provider.client.messages.create = fake_create
llm_response = await provider.text_chat(prompt="test")
assert llm_response.completion_text == content_filter_message
assert llm_response.usage is not None
assert llm_response.usage.input_other == 0
assert llm_response.usage.input_cached == 0
assert llm_response.usage.output == 0
@pytest.mark.asyncio
async def test_tool_choice_auto_converts_to_dict(monkeypatch):
"""tool_choice='auto' 应转换为 {'type': 'auto'}"""

View File

@@ -0,0 +1,124 @@
import asyncio
import json
import pytest
from astrbot.core.platform.sources.qqofficial_webhook.qo_webhook_server import (
_SIGNATURE_HEADER,
_SIGNATURE_TIMESTAMP_HEADER,
QQOfficialWebhook,
_sign_qq_webhook_payload,
_verify_qq_webhook_signature,
)
class FakeRequest:
def __init__(self, body: bytes, headers: dict[str, str] | None = None) -> None:
self._body = body
self.headers = headers or {}
async def get_data(self) -> bytes:
return self._body
class FakeBotpyClient:
api = None
http = None
def ws_dispatch(self, *_args, **_kwargs) -> None:
return None
def test_qq_webhook_signature_verification_accepts_valid_signature():
secret = "test-secret"
timestamp = "1710000000"
body = b'{"op":12,"d":0}'
signature = _sign_qq_webhook_payload(secret, timestamp, body)
assert _verify_qq_webhook_signature(secret, timestamp, signature, body)
def test_qq_webhook_signature_verification_rejects_tampered_body():
secret = "test-secret"
timestamp = "1710000000"
body = b'{"op":12,"d":0}'
signature = _sign_qq_webhook_payload(secret, timestamp, body)
assert not _verify_qq_webhook_signature(
secret,
timestamp,
signature,
b'{"op":12,"d":1}',
)
@pytest.mark.asyncio
async def test_qq_webhook_callback_rejects_missing_signature():
webhook = object.__new__(QQOfficialWebhook)
webhook.secret = "test-secret"
result = await webhook.handle_callback(FakeRequest(b'{"op":12,"d":0}'))
assert result == ({"error": "Invalid signature"}, 401)
@pytest.mark.asyncio
async def test_qq_webhook_callback_accepts_signed_validation():
secret = "test-secret"
event_ts = "1710000000"
plain_token = "plain-token"
body = json.dumps(
{"op": 13, "d": {"event_ts": event_ts, "plain_token": plain_token}},
separators=(",", ":"),
).encode("utf-8")
signature = _sign_qq_webhook_payload(secret, event_ts, body)
webhook = object.__new__(QQOfficialWebhook)
webhook.secret = secret
result = await webhook.handle_callback(
FakeRequest(
body,
{
_SIGNATURE_TIMESTAMP_HEADER: event_ts,
_SIGNATURE_HEADER: signature,
},
)
)
assert result == {
"plain_token": plain_token,
"signature": _sign_qq_webhook_payload(secret, event_ts, plain_token.encode()),
}
@pytest.mark.asyncio
async def test_qq_webhook_callback_lazily_creates_botpy_connection():
secret = "test-secret"
timestamp = "1710000000"
body = json.dumps(
{"op": 0, "t": "UNKNOWN_EVENT", "id": "event-id", "d": {"id": "message-id"}},
separators=(",", ":"),
).encode("utf-8")
signature = _sign_qq_webhook_payload(secret, timestamp, body)
webhook = QQOfficialWebhook(
{"appid": "123", "secret": secret},
asyncio.Queue(),
FakeBotpyClient(),
)
result = await webhook.handle_callback(
FakeRequest(
body,
{
_SIGNATURE_TIMESTAMP_HEADER: timestamp,
_SIGNATURE_HEADER: signature,
},
)
)
assert result == {"opcode": 12}
assert webhook._connection is not None
assert webhook.http._token is not None
assert webhook.http._token.app_id == "123"
assert webhook.client.api is webhook.api
assert webhook.client.http is webhook.http

View File

@@ -12,9 +12,15 @@ def _make_context(
current_session="feishu:GroupMessage:oc_xxx",
role="admin",
require_admin=True,
runtime="local",
):
"""Build a minimal ContextWrapper for SendMessageToUserTool."""
cfg = {"provider_settings": {"computer_use_require_admin": require_admin}}
cfg = {
"provider_settings": {
"computer_use_require_admin": require_admin,
"computer_use_runtime": runtime,
}
}
return SimpleNamespace(
context=SimpleNamespace(
event=SimpleNamespace(
@@ -161,3 +167,71 @@ async def test_send_message_missing_image_path_stops_before_send(tmp_path, monke
assert "error: failed to build messages[1] component: sandbox unavailable" in result
ctx.context.context.send_message.assert_not_called()
@pytest.mark.asyncio
async def test_non_admin_cannot_send_arbitrary_local_absolute_file(tmp_path):
"""Non-admin users cannot send host files outside the allowed local roots."""
tool = SendMessageToUserTool()
ctx = _make_context(role="member", require_admin=True)
secret_path = tmp_path / "secret.txt"
secret_path.write_text("secret", encoding="utf-8")
result = await tool.call(
ctx,
messages=[{"type": "file", "path": str(secret_path)}],
)
assert "error: Local file send is restricted for this user" in result
assert str(secret_path) in result
ctx.context.context.send_message.assert_not_called()
@pytest.mark.asyncio
async def test_non_admin_can_send_workspace_file(tmp_path, monkeypatch):
"""Non-admin users can send files inside their per-session workspace."""
tool = SendMessageToUserTool()
ctx = _make_context(
current_session="feishu:GroupMessage:oc_workspace",
role="member",
require_admin=True,
)
workspace_root = tmp_path / "workspaces"
workspace_file = workspace_root / "feishu_GroupMessage_oc_workspace" / "result.txt"
workspace_file.parent.mkdir(parents=True)
workspace_file.write_text("result", encoding="utf-8")
monkeypatch.setattr(
"astrbot.core.tools.computer_tools.util.get_astrbot_workspaces_path",
lambda: str(workspace_root),
)
result = await tool.call(
ctx,
messages=[{"type": "file", "path": "result.txt"}],
)
assert "Message sent to session" in result
ctx.context.context.send_message.assert_called_once()
@pytest.mark.asyncio
async def test_non_admin_can_send_temp_file(tmp_path, monkeypatch):
"""Non-admin users can send generated files under AstrBot temp."""
tool = SendMessageToUserTool()
ctx = _make_context(role="member", require_admin=True)
temp_root = tmp_path / "temp"
temp_root.mkdir()
output_path = temp_root / "output.txt"
output_path.write_text("output", encoding="utf-8")
monkeypatch.setattr(
"astrbot.core.tools.message_tools.get_astrbot_temp_path",
lambda: str(temp_root),
)
result = await tool.call(
ctx,
messages=[{"type": "file", "path": str(output_path)}],
)
assert "Message sent to session" in result
ctx.context.context.send_message.assert_called_once()